264 lines
12 KiB
Python
264 lines
12 KiB
Python
import json
|
||
import os
|
||
|
||
import matplotlib
|
||
import pandas as pd
|
||
import solara
|
||
from mesa.visualization import SolaraViz, make_plot_component
|
||
|
||
from simulation_model import SimulationModel
|
||
|
||
# Configure matplotlib to support Chinese labels.
|
||
matplotlib.rcParams["font.family"] = ["Microsoft YaHei", "SimHei", "sans-serif"]
|
||
matplotlib.rcParams["axes.unicode_minus"] = False
|
||
|
||
# set year
|
||
year = json.load(open('year.json', 'r', encoding='utf-8'))['year']
|
||
filename = f"{year}"
|
||
|
||
# Load default parameters from data/model_params.json
|
||
with open(f"data/{filename}/model_params.json", "r", encoding="utf-8") as f:
|
||
_cfg = json.load(f)
|
||
|
||
def _load_factory_name_map():
|
||
try:
|
||
with open(f"data/{filename}/factory_mapping.json", "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
except FileNotFoundError:
|
||
return {}
|
||
# data is Chinese name -> code; invert it
|
||
return {v: k for k, v in data.items()}
|
||
|
||
|
||
FACTORY_NAME_MAP = _load_factory_name_map()
|
||
|
||
def _load_product_catalog():
|
||
try:
|
||
try:
|
||
df = pd.read_csv(f"data/{filename}/product.csv", encoding="utf-8")
|
||
except UnicodeDecodeError:
|
||
df = pd.read_csv(f"data/{filename}/product.csv", encoding="gbk")
|
||
except FileNotFoundError:
|
||
return pd.DataFrame()
|
||
df.columns = [c.strip() for c in df.columns]
|
||
rename_map = {
|
||
df.columns[0]: "平台",
|
||
df.columns[1]: "标准机型",
|
||
df.columns[2]: "叶片型号",
|
||
}
|
||
df = df.rename(columns=rename_map)
|
||
df["平台"] = df["平台"].astype(str).str.strip()
|
||
df["标准机型"] = df["标准机型"].astype(str).str.strip()
|
||
df["叶片型号"] = df["叶片型号"].astype(str).str.strip()
|
||
return df
|
||
|
||
|
||
PRODUCT_DF = _load_product_catalog()
|
||
ALL_PLATFORMS = sorted(PRODUCT_DF["平台"].dropna().unique()) if not PRODUCT_DF.empty else []
|
||
ALL_STANDARDS = sorted(PRODUCT_DF["标准机型"].dropna().unique()) if not PRODUCT_DF.empty else []
|
||
|
||
def _platform_options():
|
||
return ["全部"] + ALL_PLATFORMS
|
||
|
||
|
||
def _standard_options(platform):
|
||
if PRODUCT_DF.empty:
|
||
return ["全部"]
|
||
if platform and platform != "全部":
|
||
stds = sorted(PRODUCT_DF.loc[PRODUCT_DF["平台"] == platform, "标准机型"].dropna().unique())
|
||
else:
|
||
stds = ALL_STANDARDS
|
||
return ["全部"] + [s for s in stds if s]
|
||
|
||
|
||
def _product_set_for_selection(platform, standard):
|
||
if PRODUCT_DF.empty:
|
||
return []
|
||
df = PRODUCT_DF
|
||
if platform and platform != "全部":
|
||
df = df[df["平台"] == platform]
|
||
if standard and standard != "全部":
|
||
df = df[df["标准机型"] == standard]
|
||
return sorted(df["叶片型号"].dropna().unique())
|
||
|
||
def _legend_postprocess(label_map, ylabel=None):
|
||
def _post(ax):
|
||
handles, labels = ax.get_legend_handles_labels()
|
||
if labels:
|
||
mapped = [label_map.get(label, label) for label in labels]
|
||
ax.legend(handles, mapped, loc="best")
|
||
if ylabel:
|
||
ax.set_ylabel(ylabel)
|
||
return _post
|
||
|
||
|
||
def _split_product_set(text: str):
|
||
return [p.strip() for p in text.split(",") if p.strip()]
|
||
|
||
|
||
@solara.component
|
||
def Page():
|
||
platform_selected = solara.use_reactive("全部")
|
||
standard_selected = solara.use_reactive("全部")
|
||
initial_product_set_text = ",".join(_cfg.get("product_set", []))
|
||
product_set_value = solara.use_reactive(initial_product_set_text)
|
||
|
||
def sync_product_set(platform_val, standard_val):
|
||
blades = _product_set_for_selection(platform_val, standard_val)
|
||
if blades:
|
||
product_set_value.set(",".join(blades))
|
||
elif PRODUCT_DF.empty:
|
||
product_set_value.set(initial_product_set_text)
|
||
else:
|
||
product_set_value.set("")
|
||
|
||
def on_platform_change(val):
|
||
platform_selected.set(val)
|
||
std_opts = _standard_options(val)
|
||
if standard_selected.value not in std_opts:
|
||
standard_selected.set("全部")
|
||
active_std = "全部"
|
||
else:
|
||
active_std = standard_selected.value
|
||
sync_product_set(val, active_std)
|
||
|
||
def on_standard_change(val):
|
||
standard_selected.set(val)
|
||
sync_product_set(platform_selected.value, val)
|
||
|
||
# Build model params fresh each render so product_set value stays in sync
|
||
model_params = {
|
||
"product_set": {"type": "InputText", "value": product_set_value, "label": "产品集合(逗号分隔)"},
|
||
"is_within_region_allocation_only": {
|
||
"type": "Select",
|
||
"value": bool(_cfg.get("is_within_region_allocation_only", False)),
|
||
"values": [False, True],
|
||
"label": "仅区域内调拨?(布尔)",
|
||
},
|
||
"month1": {"type": "InputText", "value": _cfg.get("month1", 220), "label": "第1个月效率(小时/支)"},
|
||
"month2": {"type": "InputText", "value": _cfg.get("month2", 100), "label": "第2个月效率(小时/支)"},
|
||
"month3": {"type": "InputText", "value": _cfg.get("month3", 45), "label": "第3个月效率(小时/支)"},
|
||
"month4": {"type": "InputText", "value": _cfg.get("month4", 36), "label": "第4个月及以后效率(小时/支)"},
|
||
"holiday_days_1": {"type": "InputText", "value": _cfg.get("holiday_days_1", 2), "label": "1月假日天数(天)"},
|
||
"holiday_days_2": {"type": "InputText", "value": _cfg.get("holiday_days_2", 2), "label": "2月假日天数(天)"},
|
||
"holiday_days_3": {"type": "InputText", "value": _cfg.get("holiday_days_3", 2), "label": "3月假日天数(天)"},
|
||
"holiday_days_4": {"type": "InputText", "value": _cfg.get("holiday_days_4", 2), "label": "4月假日天数(天)"},
|
||
"holiday_days_5": {"type": "InputText", "value": _cfg.get("holiday_days_5", 2), "label": "5月假日天数(天)"},
|
||
"holiday_days_6": {"type": "InputText", "value": _cfg.get("holiday_days_6", 2), "label": "6月假日天数(天)"},
|
||
"holiday_days_7": {"type": "InputText", "value": _cfg.get("holiday_days_7", 2), "label": "7月假日天数(天)"},
|
||
"holiday_days_8": {"type": "InputText", "value": _cfg.get("holiday_days_8", 2), "label": "8月假日天数(天)"},
|
||
"holiday_days_9": {"type": "InputText", "value": _cfg.get("holiday_days_9", 2), "label": "9月假日天数(天)"},
|
||
"holiday_days_10": {"type": "InputText", "value": _cfg.get("holiday_days_10", 2), "label": "10月假日天数(天)"},
|
||
"holiday_days_11": {"type": "InputText", "value": _cfg.get("holiday_days_11", 2), "label": "11月假日天数(天)"},
|
||
"holiday_days_12": {"type": "InputText", "value": _cfg.get("holiday_days_12", 2), "label": "12月假日天数(天)"},
|
||
}
|
||
for key, val in _cfg.items():
|
||
if key.startswith("factor_"):
|
||
suffix = key.replace("factor_", "")
|
||
label_name = FACTORY_NAME_MAP.get(suffix, suffix)
|
||
model_params[key] = {"type": "InputText", "value": val, "label": f"{label_name} 磨合系数(倍率)"}
|
||
|
||
active_model = solara.use_memo(
|
||
lambda: SimulationModel(product_set=_split_product_set(product_set_value.value)),
|
||
dependencies=[product_set_value.value],
|
||
)
|
||
|
||
production_label_map = {region: f"{region}-产出[套]" for region in active_model.region_names}
|
||
demand_label_map = {f"demand_{region}": f"{region}-需求[套]" for region in active_model.demand_regions}
|
||
inventory_label_map = {f"inventory_{region}": f"{region}-库存[套]" for region in active_model.region_names}
|
||
fulfill_pct_label_map = {f"fulfill_pct_{region}": f"{region}-满足率[%]" for region in active_model.demand_regions}
|
||
unmet_label_map = {f"unmet_{region}": f"{region}-未满足需求[套]" for region in active_model.demand_regions}
|
||
overall_fulfill_label_map = {"fulfill_pct_overall": "总体满足率[%]"}
|
||
transport_cost_label_map = {"transport_cost": "运输成本[万元]"}
|
||
fulfill_month_label_map = {f"fulfill_month_{region}": f"{region}-当月满足率[%]" for region in active_model.demand_regions}
|
||
fulfill_cum_label_map = {f"fulfill_cum_{region}": f"{region}-累计满足率[%]" for region in active_model.demand_regions}
|
||
transport_units_label_map = {f"transport_units_{region}": f"{region}-运输量[套]" for region in active_model.demand_regions}
|
||
|
||
RegionPlot = make_plot_component(
|
||
{region: f"C{idx}" for idx, region in enumerate(active_model.region_names)},
|
||
post_process=_legend_postprocess(production_label_map, ylabel="产出[套]"),
|
||
)
|
||
DemandPlot = make_plot_component(
|
||
{f"demand_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(demand_label_map, ylabel="需求量[套]"),
|
||
)
|
||
InventoryPlot = make_plot_component(
|
||
{f"inventory_{region}": f"C{idx}" for idx, region in enumerate(active_model.region_names)},
|
||
post_process=_legend_postprocess(inventory_label_map, ylabel="库存量[套]"),
|
||
)
|
||
FulfillPlot = make_plot_component(
|
||
{f"fulfill_pct_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(fulfill_pct_label_map, ylabel="满足率[%]"),
|
||
)
|
||
UnmetPlot = make_plot_component(
|
||
{f"unmet_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(unmet_label_map, ylabel="未满足需求[套]"),
|
||
)
|
||
OverallFulfillPlot = make_plot_component(
|
||
{"fulfill_pct_overall": "tab:purple"},
|
||
post_process=_legend_postprocess(overall_fulfill_label_map, ylabel="总体满足率[%]"),
|
||
)
|
||
TransportCostPlot = make_plot_component(
|
||
{"transport_cost": "tab:brown"},
|
||
post_process=_legend_postprocess(transport_cost_label_map, ylabel="运输成本[万元]"),
|
||
)
|
||
FulfillMonthPlot = make_plot_component(
|
||
{f"fulfill_month_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(fulfill_month_label_map, ylabel="当月满足率[%]"),
|
||
)
|
||
FulfillCumPlot = make_plot_component(
|
||
{f"fulfill_cum_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(fulfill_cum_label_map, ylabel="累计满足率[%]"),
|
||
)
|
||
TransportUnitsPlot = make_plot_component(
|
||
{f"transport_units_{region}": f"C{idx}" for idx, region in enumerate(active_model.demand_regions)},
|
||
post_process=_legend_postprocess(transport_units_label_map, ylabel="运输量[套]"),
|
||
)
|
||
|
||
def get_total_production(model):
|
||
return solara.Markdown(f"**累计产量(机组):{model.cumulative_production:.2f}**")
|
||
|
||
def get_mean_abs_error(model):
|
||
return solara.Markdown(f"**工厂年产量均值绝对误差:{model.mean_abs_error:.2f}**")
|
||
|
||
with solara.Column() as main:
|
||
with solara.Card("产品筛选"):
|
||
solara.Select(
|
||
label="平台",
|
||
value=platform_selected,
|
||
values=_platform_options(),
|
||
on_value=on_platform_change,
|
||
)
|
||
solara.Select(
|
||
label="标准机型",
|
||
value=standard_selected,
|
||
values=_standard_options(platform_selected.value),
|
||
on_value=on_standard_change,
|
||
)
|
||
solara.Text(f"叶片型号数:{len(_split_product_set(product_set_value.value))}")
|
||
SolaraViz(
|
||
active_model,
|
||
renderer=None,
|
||
components=[
|
||
RegionPlot,
|
||
DemandPlot,
|
||
InventoryPlot,
|
||
FulfillPlot,
|
||
UnmetPlot,
|
||
OverallFulfillPlot,
|
||
TransportCostPlot,
|
||
FulfillMonthPlot,
|
||
FulfillCumPlot,
|
||
TransportUnitsPlot,
|
||
get_total_production,
|
||
get_mean_abs_error,
|
||
],
|
||
model_params=model_params,
|
||
name="产线生产与需求可视化",
|
||
show_parameter_values=True,
|
||
)
|
||
return main
|
||
|
||
|
||
page = Page()
|