gw/simulation_model.py

1252 lines
54 KiB
Python

import json
import pandas as pd
import os
from datetime import datetime
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from mesa.datacollection import DataCollector
from mesa.model import Model
import pulp
import warnings
import sys
import traceback
from production_line import ProductionLineAgent
from demand_agent import DemandAgent
# Configure matplotlib fonts for Chinese output in plots.
matplotlib.rcParams["font.family"] = ["Microsoft YaHei", "SimHei", "sans-serif"]
matplotlib.rcParams["axes.unicode_minus"] = False
# set year
year = json.load(open('year.json', 'r', encoding='utf-8'))['year']
filename = f"{year}"
# Silence noisy deprecation warnings from dependencies; prefer explicit conversions.
warnings.filterwarnings("ignore", category=FutureWarning)
class _FilterMessageTracer:
"""Silence and trace unexpected 'filter element' messages."""
def __init__(self, stream, log_path):
self._stream = stream
self._log_path = log_path
def write(self, msg):
lower_msg = msg.lower()
if "filter element" in lower_msg:
# Record stack once per message and swallow it from console.
with open(self._log_path, "a", encoding="utf-8") as f:
f.write(msg)
traceback.print_stack(file=f)
return len(msg)
return self._stream.write(msg)
def flush(self):
return self._stream.flush()
# Capture mysterious "filter element" prints while keeping stdout/stderr usable.
_filter_log = os.path.join("output", filename, "debug_filter.log")
os.makedirs(os.path.dirname(_filter_log), exist_ok=True)
sys.stdout = _FilterMessageTracer(sys.stdout, _filter_log)
sys.stderr = _FilterMessageTracer(sys.stderr, _filter_log)
class SimulationModel(Model):
"""
Minimal model that instantiates one ProductionLineAgent per production line
entry and tracks the number of lines per region over time.
"""
def __init__(
self,
factory_factors: dict | None = None,
output_enabled: bool = False,
is_within_region_allocation_only: bool | None = None,
product_set: tuple | list | set | str | None = None,
is_calibration_mode: bool = False,
**kwargs,
):
super().__init__()
cfg = self._load_model_params()
# Apply overrides if provided
self.month_holiday_days = self._load_month_holiday_days(cfg)
self.ramp_ranges = self._load_product_month_efficiency()
self.factory_mapping = self._load_factory_mapping()
self.factory_factors = {}
self.use_error_max = self._to_bool(cfg["is_error_max"])
merged_factors = {}
merged_factors.update({k: v for k, v in kwargs.items() if k.startswith("factor_")})
if factory_factors:
merged_factors.update(factory_factors)
if merged_factors:
self._merge_factory_factors(merged_factors)
self.within_region_only = self._to_bool(
is_within_region_allocation_only if is_within_region_allocation_only is not None else cfg.get("is_within_region_allocation_only", False)
)
self.current_month = 1
self.cumulative_production = 0
self.monthly_totals = {}
self.production_log = []
self.error = 0.0
# Allow explicit product_set override; otherwise fall back to config.
self.product_set = self._parse_product_set(product_set if product_set is not None else cfg.get("product_set"))
self.output_enabled = output_enabled
self.region_totals = {}
self.factory_error_df = None
self.demand_agents = []
self.region_demand_totals = {}
self.region_alloc_inventory = {}
self.region_demand_fulfilled = {}
self.backlog_city_prod = {}
self.region_fulfill_pct = {}
self.region_unmet_backlog = {}
self.region_transport_month = {}
self.inv_history = []
self.fulfill_history_month = []
self.fulfill_history_cum = []
self.unmet_history = []
self.assignment_log = []
self.overall_fulfill_pct = 0.0
self.monthly_transport_cost = 0.0
self.fulfill_overall_history = []
self.transport_cost_history = []
self.transport_units_history = []
self.product_names = set()
self.product_list = []
self.monthly_allocation_summary = []
self.blade_production_log = []
self.blade_stock_log = []
self.calibration_mode = bool(is_calibration_mode)
self.line_factor = {}
self._load_month_hours()
self._load_agents_from_csv()
self._load_demand_agents_from_csv()
self._load_transport_data()
# Initialize fulfillment tracking before the first DataCollector run
demand_regions = getattr(self, "demand_regions", [])
self.region_fulfill_pct_month = {r: 0 for r in demand_regions}
self.region_fulfill_pct_cum = {r: 0 for r in demand_regions}
self.region_transport_month = {r: 0 for r in demand_regions}
reporters = {
region: (lambda m, region=region: m.region_totals.get(region, 0))
for region in self.region_names
}
reporters.update(
{f"demand_{region}": (lambda m, region=region: m.region_demand_totals.get(region, 0))
for region in self.demand_regions}
)
reporters.update(
{
f"inventory_{region}": (lambda m, region=region: m.region_alloc_inventory.get(region, 0))
for region in self.region_names
}
)
reporters.update(
{
f"demand_fulfilled_{region}": (lambda m, region=region: m.region_demand_fulfilled.get(region, 0))
for region in self.demand_regions
}
)
reporters.update(
{
"cumulative_production": lambda m: m.cumulative_production,
"monthly_total": lambda m: m.monthly_totals.get(m.current_month, 0),
"error": lambda m: m.error,
"fulfill_pct_overall": lambda m: m.overall_fulfill_pct,
"transport_cost": lambda m: m.monthly_transport_cost,
}
)
reporters.update(
{f"demand_{region}": (lambda m, region=region: m.region_demand_totals.get(region, 0))
for region in self.demand_regions}
)
reporters.update(
{
f"inventory_{region}": (lambda m, region=region: m.region_alloc_inventory.get(region, 0))
for region in self.region_names
}
)
reporters.update(
{
f"fulfill_pct_{region}": (lambda m, region=region: m.region_fulfill_pct.get(region, 0))
for region in self.demand_regions
}
)
reporters.update(
{
f"unmet_{region}": (lambda m, region=region: m.region_unmet_backlog.get(region, 0))
for region in self.demand_regions
}
)
reporters.update(
{
f"transport_units_{region}": (lambda m, region=region: m.region_transport_month.get(region, 0))
for region in self.demand_regions
}
)
# Ensure fulfillment tracking exists before reporter access
if not hasattr(self, "region_fulfill_pct_month"):
self.region_fulfill_pct_month = {r: 0 for r in self.demand_regions}
if not hasattr(self, "region_fulfill_pct_cum"):
self.region_fulfill_pct_cum = {r: 0 for r in self.demand_regions}
reporters.update(
{
f"fulfill_month_{region}": (lambda m, region=region: m.region_fulfill_pct_month.get(region, 0))
for region in self.demand_regions
}
)
reporters.update(
{
f"fulfill_cum_{region}": (lambda m, region=region: m.region_fulfill_pct_cum.get(region, 0))
for region in self.demand_regions
}
)
self.datacollector = DataCollector(model_reporters=reporters)
self.running = True
self.datacollector.collect(self)
def _compute_factory_error_stats(
self,
factory_pivot: pd.DataFrame,
benchmark_sorted: pd.DataFrame,
total_col: str = "总计",
use_max: bool = True,
) -> dict:
"""
Compute cumulative deviation ratios per factory (no aggregation).
Ratios are unitless (e.g., 0.02 == 2%).
"""
factory_max_ratio = {}
factory_cum_ratio = {}
month_cols = [f"{m}" for m in range(1, 13)]
bench_name_col = benchmark_sorted.columns[0]
for _, prow in factory_pivot.iterrows():
fname = prow["工厂名称"]
brow = benchmark_sorted[benchmark_sorted[bench_name_col] == fname]
if brow.empty:
continue
prod_months = pd.Series([float(prow[col]) for col in month_cols])
bench_months = pd.Series([float(brow.iloc[0][col]) for col in month_cols])
prod_cum = prod_months.cumsum()
bench_cum = bench_months.cumsum()
actual_total = float(brow.iloc[0][total_col]) if total_col in brow.columns else float(bench_cum.iloc[-1])
if actual_total <= 0:
continue
pct_ratio = (prod_cum - bench_cum) / actual_total # signed ratio
factory_cum_ratio[fname] = pct_ratio
factory_max_ratio[fname] = float(pct_ratio.abs().max())
return {
"factory_max_ratio": factory_max_ratio,
"factory_cum_ratio": factory_cum_ratio,
}
def _get_output_timestamp(self) -> str:
if not hasattr(self, "_output_timestamp"):
self._output_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return self._output_timestamp
def _load_month_hours(self):
with open(f"data/{filename}/month_hours.json", "r", encoding="utf-8") as f:
self.month_days = {int(k): v for k, v in json.load(f).items()}
def _load_model_params(self):
with open(f"data/{filename}/model_params.json", "r", encoding="utf-8") as f:
return json.load(f)
def _load_product_month_efficiency(self) -> dict:
path = f"data/{filename}/month_efficiency.xlsx"
if not os.path.exists(path):
raise FileNotFoundError(f"Missing month efficiency file: {path}")
df = pd.read_excel(path)
df.columns = [str(c).strip().lower() for c in df.columns]
required = {"product_id", "month1", "month2", "month3", "month4"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"month_efficiency.xlsx is missing columns: {', '.join(sorted(missing))}")
ramp = {}
for _, row in df.iterrows():
product = str(row["product_id"]).strip()
if not product:
continue
values = {}
for idx in range(1, 5):
val = row[f"month{idx}"]
if pd.isna(val):
raise ValueError(f"产品 {product} 的 month{idx} 为空,请在 month_efficiency.xlsx 中补充。")
values[idx] = float(val)
ramp[product] = values
if not ramp:
raise ValueError("month_efficiency.xlsx contains no product rows.")
return ramp
def _load_month_holiday_days(self, cfg: dict) -> dict:
# Default 2 days off per month unless overridden
holidays = {m: 2 for m in range(1, 13)}
for m in range(1, 13):
key = f"holiday_days_{m}"
if key in cfg:
try:
holidays[m] = int(float(cfg[key]))
except Exception:
continue
return holidays
def _sanitize_product(self, name: str) -> str:
return str(name).replace("-", "").replace(".", "")
def _load_factory_mapping(self) -> dict:
with open(f"data/{filename}/factory_mapping.json", "r", encoding="utf-8") as f:
mapping = json.load(f)
return mapping
def _parse_product_set(self, val) -> tuple:
if val is None:
return tuple()
if isinstance(val, str):
stripped = val.strip()
if stripped.startswith("(") and stripped.endswith(")"):
stripped = stripped[1:-1]
parts = [p.strip() for p in stripped.split(",") if p.strip()]
return tuple(parts)
if isinstance(val, (list, tuple, set)):
return tuple(str(v).strip() for v in val if str(v).strip())
return tuple()
def _to_bool(self, val):
if isinstance(val, str):
return val.strip().lower() in {"true", "1", "yes", "y", "", ""}
return bool(val)
def _load_factory_factors(self) -> dict:
# Deprecated: factors now provided per production line; keep for compatibility with overrides.
return {}
def _merge_factory_factors(self, overrides: dict):
for key, val in overrides.items():
suffix = key
if key.startswith("factor_"):
suffix = key[len("factor_") :]
self.factory_factors[self._sanitize_product(suffix)] = float(val)
def get_factory_code(self, factory_name: str) -> str:
return self.factory_mapping.get(factory_name, self._sanitize_product(factory_name))
def get_factory_factor(self, factory_name: str, line_id: str | None = None, line_factor: float | None = None) -> float:
code = self.get_factory_code(factory_name)
if line_id and line_id in self.line_factor:
base = self.line_factor.get(line_id)
else:
base = None
override = self.factory_factors.get(code)
if override is not None:
return float(override)
if base is not None:
return float(base)
if line_factor is not None:
return float(line_factor)
raise KeyError(f"未找到工厂 {factory_name} (line_id={line_id}) 的磨合系数。")
def _load_agents_from_csv(self):
encodings = ("utf-8", "utf-8-sig", "gbk")
last_error = None
for enc in encodings:
try:
df = pd.read_csv(f"data/{filename}/ProductionLine.csv", encoding=enc)
break
except UnicodeDecodeError as exc:
last_error = exc
continue
else:
raise last_error
df["生产型号"] = df["生产型号"].astype(str).str.strip()
if self.product_set:
df = df[df["生产型号"].isin(self.product_set)]
self.region_names = sorted(df["区域名"].unique()) if not df.empty else []
self.line_factory = {}
self.line_region = {}
for line_id, group in df.groupby("产线ID"):
first = group.iloc[0]
schedule = []
line_factor_val = None
for _, row in group.iterrows():
product = str(row["生产型号"]).strip()
if product not in self.ramp_ranges:
raise KeyError(f"缺少产品 {product} 的生产效率,请在 month_efficiency.xlsx 中补充。")
schedule.append(
{
"product": product,
"start_month": int(row["开始月份"]),
"end_month": int(row["结束月份"]),
}
)
self.product_names.add(product)
is_new_factory = str(first["是否新工厂"]).strip() in {"", "Yes", "True", "true", "1"}
self.line_factory[line_id] = first["工厂名"]
self.line_region[line_id] = first["区域名"]
if "磨合系数" not in first:
raise KeyError("ProductionLine.csv 缺少磨合系数字段。")
try:
line_factor_val = float(first.get("磨合系数"))
except Exception:
raise ValueError(f"无法解析产线 {line_id} 的磨合系数。")
self.line_factor[line_id] = line_factor_val
ProductionLineAgent(
model=self,
line_id=line_id,
region=first["区域名"],
factory=first["工厂名"],
is_new_factory=is_new_factory,
schedule=schedule,
ramp_ranges=self.ramp_ranges,
line_factor=line_factor_val,
)
def _load_demand_agents_from_csv(self):
encodings = ("utf-8", "utf-8-sig", "gbk")
last_error = None
for enc in encodings:
try:
df = pd.read_csv(f"data/{filename}/demand.csv", encoding=enc)
break
except UnicodeDecodeError as exc:
last_error = exc
continue
else:
raise last_error
product_col = df.columns[0]
df[product_col] = df[product_col].astype(str).str.strip()
if self.product_set:
df = df[df[product_col].isin(self.product_set)]
month_cols = [col for col in df.columns if col.endswith("")]
df[month_cols] = df[month_cols].apply(pd.to_numeric, errors="coerce")
# total_demand = df[month_cols].stack().sum()
# region_totals = df.groupby("需求区域")[month_cols].sum().sum(axis=1)
# print(f"载入需求总量:{total_demand}")
# print(f"分区域需求:{region_totals.to_dict()}")
month_cols = [col for col in df.columns if col.endswith("")]
grouped = (
df.groupby([product_col, "", "需求区域"])[month_cols]
.sum()
.reset_index()
)
self.demand_regions = sorted(grouped["需求区域"].unique())
self.demand_by_city_product = {}
self.city_to_region = {}
# Aggregate per city-region into product -> monthly dict
city_region_groups = grouped.groupby(["", "需求区域"])
for (city, region), sub in city_region_groups:
prod_monthly = {}
for _, row in sub.iterrows():
prod_name = str(row[product_col]).strip()
monthly = {int(col.rstrip("")): row[col] for col in month_cols}
prod_monthly[prod_name] = monthly
self.demand_by_city_product[(city, prod_name)] = monthly
self.city_to_region[str(city).strip()] = region
self.product_names.add(prod_name)
DemandAgent(
model=self,
city=city,
region=region,
product_monthly_demand=prod_monthly,
)
def _load_transport_data(self):
# Distance matrix
dist_df = pd.read_csv(f"data/{filename}/distance_matrix.csv", encoding="utf-8")
self.distance_lookup = {
(row["factory"], row["demand_city"]): float(row["distance_km"])
for _, row in dist_df.iterrows()
}
# Transportation prices
price_df = pd.read_csv(f"data/{filename}/transportation_price.csv", encoding="gbk")
price_df.columns = [c.strip() for c in price_df.columns]
price_df["产品型号"] = price_df["产品型号"].str.strip()
if self.product_set:
generic = {"others", "other", "其它", "其他"}
price_df = price_df[
price_df["产品型号"].isin(self.product_set)
| price_df["产品型号"].str.lower().isin(generic)
]
price_df["里程区间[公里]"] = price_df["里程区间[公里]"].str.strip()
for col in price_df.columns:
if "单价" in col or "整车价" in col:
price_df[col] = (
price_df[col]
.astype(str)
.str.replace(",", "", regex=False)
.str.replace(" ", "", regex=False)
)
price_df[col] = pd.to_numeric(price_df[col], errors="coerce")
self.transport_prices = []
for _, row in price_df.iterrows():
rng = str(row["里程区间[公里]"])
if "-" in rng:
low, high = rng.split("-")
low = float(low)
high = float(high)
else:
low = 0.0
high = float("inf")
self.transport_prices.append(
{
"category": row["产品型号"],
"low": low,
"high": high,
"unit_price": row.get("平均单价[元/(公里*片)]", None),
"truck_price": row.get("平均整车价[元/片]", None),
}
)
self.special_products = {"GWBD-A2", "GWBD-A3", "GWBD-B"}
self.big_M = 1e6
# Finalize product list once all sources are loaded
self.product_list = sorted(self.product_names)
def _cost_per_unit(self, factory, city, product):
dist = self.distance_lookup.get((factory, city))
if dist is None or pd.isna(dist):
return self.big_M
prod = str(product).strip()
best_cost = None
for row in self.transport_prices:
row_cat = str(row["category"]).strip()
# Match logic: exact product, specials grouped, or generic others buckets
is_special_prod = prod in self.special_products
is_special_row = row_cat in self.special_products
is_generic_row = row_cat.lower() in {"others", "other", "其它", "其他"}
match = False
if row_cat == prod:
match = True
elif is_special_prod and is_special_row:
match = True
elif is_generic_row:
match = True
if not match:
continue
if row["low"] <= dist <= row["high"]:
unit_price = row["unit_price"]
truck_price = row["truck_price"]
cost = None
if pd.notna(unit_price):
cost = unit_price * dist * 3
elif pd.notna(truck_price):
cost = truck_price * 3
if cost is not None:
best_cost = cost if best_cost is None else min(best_cost, cost)
return best_cost if best_cost is not None else self.big_M
def get_available_hours(self, month: int) -> float:
days = self.month_days.get(month, 30)
holidays = self.month_holiday_days.get(month, 0)
days = max(days - holidays, 0)
return days * 24
def record_demand(self, region: str, demand_units: float):
self.region_demand_totals[region] = self.region_demand_totals.get(region, 0) + demand_units
def count_region(self, region):
return sum(1 for agent in self.agents if agent.region == region)
def record_production(self, line_id, factory, region, month, product, units):
if units <= 0:
return
self.production_log.append(
{
"line_id": line_id,
"factory": factory,
"region": region,
"month": month,
"product": product,
"units": units,
}
)
self.monthly_totals[month] = self.monthly_totals.get(month, 0) + units
self.cumulative_production += units
self.region_totals[region] = self.region_totals.get(region, 0) + units
def record_blade_production(self, line_id, factory, region, month, product, blades):
self.blade_production_log.append(
{
"line_id": line_id,
"factory": factory,
"region": region,
"month": month,
"product": product,
"blades": float(blades),
}
)
def record_blade_stock(self, line_id, factory, region, month, product, blades_stock):
self.blade_stock_log.append(
{
"line_id": line_id,
"factory": factory,
"region": region,
"month": month,
"product": product,
"blades_stock": float(blades_stock),
}
)
def step(self):
for agent in list(self.agents):
agent.step()
if self.calibration_mode:
if self.current_month >= 12:
self.running = False
self._finalize_factory_errors(write_files=self.output_enabled)
if self.output_enabled:
self._write_report()
self.datacollector.collect(self)
self.current_month += 1
return
# Allocation after production for current month
self._run_allocation()
self._update_region_inventory()
self._record_histories()
if self.current_month >= 12:
self.running = False
self._finalize_factory_errors(write_files=self.output_enabled)
if self.output_enabled:
self._write_report()
self._write_allocation_workbook()
self.datacollector.collect(self)
if not self.running and self.output_enabled:
self._write_visualization_outputs()
self.current_month += 1
def _write_report(self):
if not self.production_log:
return
df = pd.DataFrame(self.production_log)
# Group by product, factory, line and pivot months
pivot = (
df.groupby(["product", "line_id", "factory", "month"])["units"]
.sum()
.reset_index()
.pivot_table(
index=["product", "line_id", "factory"],
columns="month",
values="units",
fill_value=0,
)
)
# Ensure columns 1-12 exist
for m in range(1, 13):
if m not in pivot.columns:
pivot[m] = 0
pivot = pivot[sorted([c for c in pivot.columns if isinstance(c, int)])]
pivot["total"] = pivot.sum(axis=1)
pivot.reset_index(inplace=True)
pivot.columns = (
["生产型号", "产线名称", "工厂名"]
+ [f"{m}" for m in range(1, 13)]
+ ["总计"]
)
timestamp = self._get_output_timestamp()
output_dir = f"output/{filename}"
os.makedirs(output_dir, exist_ok=True)
out_path = os.path.join(output_dir, f"production_report_{timestamp}.csv")
pivot.to_csv(out_path, index=False, encoding="utf-8-sig")
self._write_blade_reports(timestamp, output_dir)
def _write_blade_reports(self, timestamp: str, output_dir: str):
def build_and_write(log, value_col, filename_prefix, value_name):
if not log:
return
df = pd.DataFrame(log)
pivot = (
df.groupby(["product", "line_id", "factory", "month"])[value_col]
.sum()
.reset_index()
.pivot_table(
index=["product", "line_id", "factory"],
columns="month",
values=value_col,
fill_value=0.0,
)
)
for m in range(1, 13):
if m not in pivot.columns:
pivot[m] = 0.0
pivot = pivot[sorted([c for c in pivot.columns if isinstance(c, int)])]
pivot["total"] = pivot.sum(axis=1)
pivot.reset_index(inplace=True)
pivot.columns = (
["生产型号", "产线名称", "工厂名"]
+ [f"{m}" for m in range(1, 13)]
+ ["总计"]
)
out_path = os.path.join(output_dir, f"{filename_prefix}_{timestamp}.csv")
pivot.to_csv(out_path, index=False, encoding="utf-8-sig")
build_and_write(self.blade_production_log, "blades", "production_blade_current_report", "blades")
build_and_write(self.blade_stock_log, "blades_stock", "production_blade_stock_report", "blades_stock")
def _write_allocation_workbook(self):
if not self.monthly_allocation_summary:
return
timestamp = self._get_output_timestamp()
output_dir = f"output/{filename}"
os.makedirs(output_dir, exist_ok=True)
out_path = os.path.join(output_dir, f"allocation_summary_{timestamp}.xlsx")
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
for month in range(1, 13):
rows = [row for row in self.monthly_allocation_summary if row["month"] == month]
if not rows and self.product_list:
rows = [
{
"month": month,
"product": product,
"inventory_before": 0,
"demand_before": 0,
"allocated": 0,
"inventory_after": 0,
"demand_after": 0,
}
for product in self.product_list
]
if not rows:
continue
df = pd.DataFrame(rows)
df = df.sort_values(by="product")
df = df[
["product", "inventory_before", "demand_before", "allocated", "inventory_after", "demand_after"]
]
df.columns = ["叶片型号", "调拨前总库存", "调拨前总需求", "当月调拨总量", "调拨后总库存", "调拨后总需求"]
sheet_name = f"{month}"
df.to_excel(writer, sheet_name=sheet_name, index=False)
def _write_visualization_outputs(self):
df = self.datacollector.get_model_vars_dataframe()
if df.empty:
return
timestamp = self._get_output_timestamp()
output_dir = f"output/{filename}"
os.makedirs(output_dir, exist_ok=True)
def plot_series(columns, label_map, ylabel, title, filename, colors=None):
available = [col for col in columns if col in df.columns]
if not available:
return
plt.figure(figsize=(10, 6))
for col in available:
color = colors.get(col) if isinstance(colors, dict) else None
plt.plot(df.index, df[col], label=label_map.get(col, col), color=color)
plt.xlabel("步数")
if ylabel:
plt.ylabel(ylabel)
if title:
plt.title(title)
plt.legend(loc="best")
plt.tight_layout()
plt.savefig(os.path.join(output_dir, f"{filename}_{timestamp}.png"))
plt.close()
def make_color_map(keys):
return {key: f"C{idx}" for idx, key in enumerate(keys)}
production_cols = [region for region in self.region_names if region in df.columns]
demand_cols = [f"demand_{region}" for region in self.demand_regions if f"demand_{region}" in df.columns]
inventory_cols = [f"inventory_{region}" for region in self.region_names if f"inventory_{region}" in df.columns]
fulfill_cols = [f"fulfill_pct_{region}" for region in self.demand_regions if f"fulfill_pct_{region}" in df.columns]
unmet_cols = [f"unmet_{region}" for region in self.demand_regions if f"unmet_{region}" in df.columns]
fulfill_month_cols = [f"fulfill_month_{region}" for region in self.demand_regions if f"fulfill_month_{region}" in df.columns]
fulfill_cum_cols = [f"fulfill_cum_{region}" for region in self.demand_regions if f"fulfill_cum_{region}" in df.columns]
transport_unit_cols = [f"transport_units_{region}" for region in self.demand_regions if f"transport_units_{region}" in df.columns]
plot_series(
production_cols,
{col: f"{col}-产出[套]" for col in production_cols},
"产出[套]",
"区域产出",
"production_by_region",
colors=make_color_map(production_cols),
)
plot_series(
demand_cols,
{col: f"{col.replace('demand_', '')}-需求[套]" for col in demand_cols},
"需求量[套]",
"区域需求",
"demand_by_region",
colors=make_color_map(demand_cols),
)
plot_series(
inventory_cols,
{col: f"{col.replace('inventory_', '')}-库存[套]" for col in inventory_cols},
"库存量[套]",
"区域库存",
"inventory_by_region",
colors=make_color_map(inventory_cols),
)
plot_series(
fulfill_cols,
{col: f"{col.replace('fulfill_pct_', '')}-满足率[%]" for col in fulfill_cols},
"满足率[%]",
"区域满足率(累计)",
"fulfill_pct_by_region",
colors=make_color_map(fulfill_cols),
)
plot_series(
fulfill_month_cols,
{col: f"{col.replace('fulfill_month_', '')}-当月满足率[%]" for col in fulfill_month_cols},
"当月满足率[%]",
"区域满足率(当月)",
"fulfill_pct_month_by_region",
colors=make_color_map(fulfill_month_cols),
)
plot_series(
fulfill_cum_cols,
{col: f"{col.replace('fulfill_cum_', '')}-累计满足率[%]" for col in fulfill_cum_cols},
"累计满足率[%]",
"区域满足率(累计)",
"fulfill_pct_cum_by_region",
colors=make_color_map(fulfill_cum_cols),
)
plot_series(
unmet_cols,
{col: f"{col.replace('unmet_', '')}-未满足需求[套]" for col in unmet_cols},
"未满足需求[套]",
"区域未满足需求",
"unmet_by_region",
colors=make_color_map(unmet_cols),
)
plot_series(
transport_unit_cols,
{col: f"{col.replace('transport_units_', '')}-运输量[套]" for col in transport_unit_cols},
"运输量[套]",
"区域运输量",
"transport_units_by_region",
colors=make_color_map(transport_unit_cols),
)
if "fulfill_pct_overall" in df.columns:
plot_series(
["fulfill_pct_overall"],
{"fulfill_pct_overall": "总体满足率[%]"},
"总体满足率[%]",
"总体满足率",
"fulfill_pct_overall",
)
if "transport_cost" in df.columns:
plot_series(
["transport_cost"],
{"transport_cost": "运输成本[万元]"},
"运输成本[万元]",
"运输成本",
"transport_cost",
)
def _finalize_factory_errors(self, write_files: bool):
# Always reset cached factory error table so callers don't see stale data.
self.factory_error_df = None
if not self.production_log:
self.error = float("inf")
return
df = pd.DataFrame(self.production_log)
factory_pivot = (
df.groupby(["factory", "month"])["units"]
.sum()
.reset_index()
.pivot_table(index="factory", columns="month", values="units", fill_value=0)
)
for m in range(1, 13):
if m not in factory_pivot.columns:
factory_pivot[m] = 0
factory_pivot = factory_pivot[sorted([c for c in factory_pivot.columns if isinstance(c, int)])]
factory_pivot["total"] = factory_pivot.sum(axis=1)
factory_pivot.reset_index(inplace=True)
factory_pivot = factory_pivot.sort_values(by="factory")
factory_pivot.columns = ["工厂名称"] + [f"{m}" for m in range(1, 13)] + ["总计"]
# Benchmark comparison
try:
benchmark = pd.read_csv(f"data/{filename}/benchmark.csv", encoding="utf-8-sig")
except UnicodeDecodeError:
benchmark = pd.read_csv(f"data/{filename}/benchmark.csv", encoding="gbk")
benchmark = benchmark.rename(columns=lambda c: str(c).strip())
month_cols = [col for col in benchmark.columns if str(col).strip().endswith("") and str(col).strip()[:-1].isdigit()]
month_cols = sorted(month_cols, key=lambda x: int(str(x).strip()[:-1]))
if len(month_cols) < 12:
raise KeyError(f"benchmark.csv 缺少月份列,找到: {month_cols}")
benchmark_sorted = benchmark.sort_values(by=benchmark.columns[0])
total_col = "总计"
if total_col not in benchmark_sorted.columns:
benchmark_sorted[total_col] = benchmark_sorted[month_cols].sum(axis=1)
stats = self._compute_factory_error_stats(
factory_pivot,
benchmark_sorted,
total_col=total_col,
use_max=self.use_error_max,
)
cum_ratio_by_factory = stats["factory_cum_ratio"]
metric_fn = (lambda pct: pct.abs().max()) if self.use_error_max else (lambda pct: pct.abs().mean())
factory_metric = {fname: float(metric_fn(pct)) for fname, pct in cum_ratio_by_factory.items()}
# Aggregate factory-level errors (mean across factories of max/mean per-month absolute ratio)
factory_errors = list(factory_metric.values())
if not factory_errors:
self.error = float("inf")
else:
# Aggregate across factories: max when use_error_max=True, otherwise mean.
self.error = max(factory_errors) if self.use_error_max else sum(factory_errors) / len(factory_errors)
# Expose per-factory error (unitless ratio) even when files are not written (used by GA).
factory_df = pd.DataFrame(
{
"name": list(factory_metric.keys()),
"error_ratio": list(factory_metric.values()),
}
)
if not factory_df.empty:
factory_df = factory_df.sort_values(by="error_ratio")
factory_df["error_pct"] = factory_df["error_ratio"] * 100
else:
factory_df["error_pct"] = []
self.factory_error_df = factory_df.reset_index(drop=True)
if not write_files:
return
timestamp = self._get_output_timestamp()
output_dir = f"output/{filename}"
os.makedirs(output_dir, exist_ok=True)
factory_report_path = os.path.join(output_dir, f"factory_report_{timestamp}.csv")
factory_pivot.to_csv(factory_report_path, index=False, encoding="utf-8-sig")
# Error summary based on cumulative deviation ratio (unitless, consistent with GA)
metric_col = "最大累计偏差" if self.use_error_max else "平均累计偏差"
error_df = (
pd.DataFrame([{ "工厂名称": name, metric_col: val} for name, val in factory_metric.items()])
if factory_metric
else pd.DataFrame(columns=["工厂名称", metric_col])
)
error_path = os.path.join(output_dir, f"factory_error_{timestamp}.csv")
error_df.to_csv(error_path, index=False, encoding="utf-8-sig")
# Error bar charts by month using cumulative percentage deviations
if cum_ratio_by_factory:
month_labels = [f"{m}" for m in range(1, 13)]
month_ratio_df = pd.DataFrame(
{fname: pct.reset_index(drop=True) for fname, pct in cum_ratio_by_factory.items()}
).transpose()
month_abs_means = month_ratio_df.abs().mean(axis=0)
month_abs_means.index = month_labels[: len(month_abs_means)]
month_abs_means_pct = month_abs_means * 100
plt.figure(figsize=(10, 5))
month_abs_means_pct.plot(kind="bar")
plt.ylabel("月度累计偏差平均值[%]")
plt.title("按月份的累计偏差(取绝对值后平均)")
plt.tight_layout()
month_plot_path = os.path.join(output_dir, f"error_by_month_{timestamp}.png")
plt.savefig(month_plot_path)
plt.close()
plt.figure(figsize=(12, 6))
ax = factory_df["error_pct"].reset_index(drop=True).plot(kind="bar")
metric_label = "最大" if self.use_error_max else "平均"
plt.ylabel(f"{metric_label}误差[%]")
plt.title(f"工厂{metric_label}累计偏差分布")
plt.tight_layout()
for idx, row in factory_df.reset_index(drop=True).iterrows():
val = row["error_pct"]
if abs(val) >= 50:
ax.text(
idx,
val,
row["name"],
rotation=90,
va="bottom" if val >= 0 else "top",
ha="center",
fontsize=8,
)
factory_plot_path = os.path.join(output_dir, f"error_by_factory_{timestamp}.png")
plt.savefig(factory_plot_path)
plt.close()
# Save histories for inventory, fulfillment pct, unmet backlog
inv_path = os.path.join(output_dir, f"inventory_history_{timestamp}.csv")
fulfill_path = os.path.join(output_dir, f"fulfill_history_{timestamp}.csv")
unmet_path = os.path.join(output_dir, f"unmet_history_{timestamp}.csv")
if self.inv_history:
pd.DataFrame(self.inv_history).to_csv(inv_path, index=False, encoding="utf-8-sig")
if self.fulfill_history_cum:
pd.DataFrame(self.fulfill_history_cum).to_csv(fulfill_path, index=False, encoding="utf-8-sig")
if self.unmet_history:
pd.DataFrame(self.unmet_history).to_csv(unmet_path, index=False, encoding="utf-8-sig")
if self.assignment_log:
assign_path = os.path.join(output_dir, f"assignments_{timestamp}.csv")
pd.DataFrame(self.assignment_log).to_csv(assign_path, index=False, encoding="utf-8-sig")
if self.fulfill_overall_history:
overall_path = os.path.join(output_dir, f"fulfill_overall_history_{timestamp}.csv")
pd.DataFrame(self.fulfill_overall_history).to_csv(overall_path, index=False, encoding="utf-8-sig")
if self.transport_cost_history:
cost_path = os.path.join(output_dir, f"transport_cost_history_{timestamp}.csv")
pd.DataFrame(self.transport_cost_history).to_csv(cost_path, index=False, encoding="utf-8-sig")
if self.fulfill_history_month:
pd.DataFrame(self.fulfill_history_month).to_csv(
os.path.join(output_dir, f"fulfill_month_history_{timestamp}.csv"), index=False, encoding="utf-8-sig"
)
if self.transport_units_history:
pd.DataFrame(self.transport_units_history).to_csv(
os.path.join(output_dir, f"transport_units_history_{timestamp}.csv"), index=False, encoding="utf-8-sig"
)
def _update_monthly_fulfill(self):
# Per-region fulfillment percentage for current month
region_pct = {}
for region in self.demand_regions:
# pct computed during allocation
pct = self.region_fulfill_pct.get(region, 0)
region_pct[region] = pct
self.region_fulfill_pct = region_pct
self.region_unmet_backlog = {
self.city_to_region.get(str(city).strip()): self.region_unmet_backlog.get(self.city_to_region.get(str(city).strip()), 0)
for city in []
}
def _record_allocation_report(self, month, inventory_before, demand_before, shipped):
products = self.product_list or sorted(set(inventory_before) | set(demand_before) | set(shipped))
for product in products:
before_inv = float(inventory_before.get(product, 0) or 0)
before_dem = float(demand_before.get(product, 0) or 0)
shipped_qty = float(shipped.get(product, 0) or 0)
after_inv = max(before_inv - shipped_qty, 0)
after_dem = max(before_dem - shipped_qty, 0)
self.monthly_allocation_summary.append(
{
"month": month,
"product": product,
"inventory_before": before_inv,
"demand_before": before_dem,
"allocated": shipped_qty,
"inventory_after": after_inv,
"demand_after": after_dem,
}
)
def _run_allocation(self):
# reset transport count for current month
self.region_transport_month = {r: 0 for r in self.demand_regions}
# Supply from line unit_stock (assembled units)
supplies = {}
inventory_before = {}
for agent in self.agents:
if not isinstance(agent, ProductionLineAgent):
continue
for prod, units in agent.unit_stock.items():
qty = int(units)
if qty > 0:
supplies[(agent.line_id, str(prod).strip())] = qty
inventory_before[str(prod).strip()] = inventory_before.get(str(prod).strip(), 0) + qty
# Demand per city-product for current month including backlog
demands = {}
region_month_demand = {}
demand_before = {}
for (city, prod), monthly in self.demand_by_city_product.items():
current = monthly.get(self.current_month, 0)
backlog = self.backlog_city_prod.get((city, prod), 0)
qty = backlog + current
if qty > 0:
key = (city, str(prod).strip())
demands[key] = qty
region = self.city_to_region.get(str(city).strip())
if region:
region_month_demand[region] = region_month_demand.get(region, 0) + qty
demand_before[str(prod).strip()] = demand_before.get(str(prod).strip(), 0) + qty
if not supplies:
self._record_allocation_report(self.current_month, inventory_before, demand_before, {})
self.monthly_transport_cost = 0.0
return
if not demands:
self.region_fulfill_pct = {r: 0 for r in self.demand_regions}
self._record_allocation_report(self.current_month, inventory_before, demand_before, {})
self.monthly_transport_cost = 0.0
return
prob = pulp.LpProblem("allocation", pulp.LpMinimize)
x = {}
for (line, prod), supply in supplies.items():
factory = self.line_factory.get(line)
for (city, dprod), dem in demands.items():
if dprod != prod:
continue
if self.within_region_only:
demand_region = self.city_to_region.get(str(city).strip())
line_region = self.line_region.get(line)
if demand_region is not None and line_region is not None and demand_region != line_region:
continue
x[(line, city, prod)] = pulp.LpVariable(f"x_{line}_{city}_{prod}", lowBound=0, cat="Integer")
shortage = {
(city, prod): pulp.LpVariable(f"s_{city}_{prod}", lowBound=0, cat="Continuous")
for (city, prod) in demands
}
# Objective: transport cost + penalty on shortage
obj_terms = []
shortage_penalty = self.big_M * 10 # Penalize shortage higher than any transport cost
for (line, city, prod), var in x.items():
factory = self.line_factory.get(line)
factory_region = self.line_region.get(line.strip()).strip()
demand_region = self.city_to_region.get(str(city).strip()).strip()
if "国际" in factory_region or "国际" in demand_region:
cost = 0
else:
cost = self._cost_per_unit(factory, city, prod)
obj_terms.append(cost * var)
obj_terms += [shortage_penalty * s for s in shortage.values()]
prob += pulp.lpSum(obj_terms)
# Supply constraints
for (line, prod), supply in supplies.items():
prob += pulp.lpSum(var for (l, c, p), var in x.items() if l == line and p == prod) <= supply
# Demand soft constraints
for (city, prod), dem in demands.items():
prob += pulp.lpSum(var for (l, c, p), var in x.items() if c == city and p == prod) + shortage[(city, prod)] >= dem
status = prob.solve(pulp.PULP_CBC_CMD(msg=False))
if pulp.LpStatus[status] != "Optimal":
self.monthly_transport_cost = 0.0
self._record_allocation_report(self.current_month, inventory_before, demand_before, {})
return
# Update inventories and backlogs
shipped = {}
shipped_city_prod = {}
shipped_product = {}
for (line, city, prod), var in x.items():
qty = int(pulp.value(var) or 0)
if qty > 0:
shipped[(line, prod)] = shipped.get((line, prod), 0) + qty
shipped_city_prod[(city, prod)] = shipped_city_prod.get((city, prod), 0) + qty
shipped_product[prod] = shipped_product.get(prod, 0) + qty
# update demand fulfillment by region of demand
region = self.city_to_region.get(str(city).strip())
if region:
self.region_demand_fulfilled[region] = self.region_demand_fulfilled.get(region, 0) + qty
if self.output_enabled:
factory_region = self.line_region.get(line)
demand_region = self.city_to_region.get(str(city).strip())
distance_km = float(self.distance_lookup.get((factory, city), 0) or 0)
if factory_region == "国际" or demand_region == "国际":
unit_cost = 0.0
else:
unit_cost = float(self._cost_per_unit(factory, city, prod) or 0.0)
assign_cost = (unit_cost * qty) / 10000.0
self.assignment_log.append(
{
"month": self.current_month,
"line_id": line,
"product": prod,
"demand_city": city,
"quantity": qty,
"transport_distance_km": distance_km,
"transport_cost_wan": assign_cost,
}
)
if shipped:
for agent in self.agents:
if isinstance(agent, ProductionLineAgent):
for (l, p), qty in shipped.items():
if agent.line_id == l and p in agent.unit_stock:
agent.unit_stock[p] = max(0, agent.unit_stock[p] - qty)
# Update backlogs and region fulfillment
region_shipped = {}
total_shipped = 0
for (city, prod), qty in shipped_city_prod.items():
region = self.city_to_region.get(str(city).strip())
if region:
region_shipped[region] = region_shipped.get(region, 0) + qty
total_shipped += qty
region_unmet = {}
region_pct_month = {}
region_pct_cum = {}
region_transport_month = {r: 0 for r in self.demand_regions}
for (city, prod), demand in demands.items():
shipped_qty = shipped_city_prod.get((city, prod), 0)
remaining = max(0, demand - shipped_qty)
self.backlog_city_prod[(city, prod)] = remaining
region = self.city_to_region.get(str(city).strip())
if region:
region_unmet[region] = region_unmet.get(region, 0) + remaining
for region, qty in region_shipped.items():
region_transport_month[region] = qty
for region in self.demand_regions:
total = region_month_demand.get(region, 0)
total_current = sum(
monthly.get(self.current_month, 0)
for (city, prod), monthly in self.demand_by_city_product.items()
if self.city_to_region.get(str(city).strip()) == region
)
fulfilled = region_shipped.get(region, 0)
pct_cum = (fulfilled / total) * 100 if total > 0 else 0.0
pct_month = (fulfilled / total_current) * 100 if total_current > 0 else 0.0
region_pct_cum[region] = pct_cum
region_pct_month[region] = pct_month
self.region_fulfill_pct = region_pct_cum
self.region_fulfill_pct_month = region_pct_month
self.region_fulfill_pct_cum = region_pct_cum
self.region_unmet_backlog = region_unmet
self.region_transport_month = region_transport_month
# Overall fulfillment and cost
total_demand_all = sum(region_month_demand.values())
self.overall_fulfill_pct = (total_shipped / total_demand_all) * 100 if total_demand_all > 0 else 0.0
# Compute realized transport cost directly from allocation results (exclude shortage penalties)
total_transport_cost = 0.0
for (line, city, prod), var in x.items():
qty = int(pulp.value(var) or 0)
if qty <= 0:
continue
factory = self.line_factory.get(line)
factory_region = self.line_region.get(line)
demand_region = self.city_to_region.get(str(city).strip())
if factory_region == "国际" or demand_region == "国际":
unit_cost = 0.0
else:
unit_cost = float(self._cost_per_unit(factory, city, prod) or 0.0)
total_transport_cost += unit_cost * qty
self.monthly_transport_cost = total_transport_cost / 10000.0
self._record_allocation_report(self.current_month, inventory_before, demand_before, shipped_product)
def _update_region_inventory(self):
region_inv = {}
for agent in self.agents:
if isinstance(agent, ProductionLineAgent):
total_units = sum(agent.unit_stock.values())
region_inv[agent.region] = region_inv.get(agent.region, 0) + total_units
self.region_alloc_inventory = region_inv
def _record_histories(self):
self.inv_history.append({"month": self.current_month, **self.region_alloc_inventory})
self.fulfill_history_month.append({"month": self.current_month, **getattr(self, "region_fulfill_pct_month", {})})
self.fulfill_history_cum.append({"month": self.current_month, **getattr(self, "region_fulfill_pct_cum", {})})
self.unmet_history.append({"month": self.current_month, **self.region_unmet_backlog})
self.fulfill_overall_history.append({"month": self.current_month, "fulfill_pct_overall": self.overall_fulfill_pct})
self.transport_cost_history.append({"month": self.current_month, "transport_cost": self.monthly_transport_cost})
self.transport_units_history.append({"month": self.current_month, **self.region_transport_month})
if __name__ == "__main__":
model = SimulationModel(output_enabled=True)
while model.running:
model.step()