mesa/my_model.py

866 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import pickle
from random import shuffle
import platform
import networkx as nx
import pandas as pd
from mesa import Model
from mesa.space import MultiGrid, NetworkGrid
from mesa.datacollection import DataCollector
import numpy as np
from mesa_viz_tornado.modules import NetworkModule
from firm import FirmAgent
from orm import db_session, Result
from product import ProductAgent
from mesa.visualization import ModularServer
class MyModel(Model):
def __init__(self, params):
"""
初始化模型,并设置模型的主要参数。
参数说明:
- params (dict): 包含模型所需的所有参数的字典。
主要参数:
- prf_size (bool): 是否在选择供应商时考虑企业规模。
- prf_conn (float): 企业建立新连接的概率。
- cap_limit_prob_type (str): 产能限制的概率分布类型。
- cap_limit_level (float): 产能限制的水平。
- diff_new_conn (bool): 是否允许差异化的新连接。
- g_bom (str): BOM物料清单图的 JSON 表示形式。
- sample (object): 包含实验数据的样本对象。
- n_iter (int): 仿真的迭代次数。
- dct_lst_init_disrupt_firm_prod (dict): 初始企业-产品干扰的字典。
- n_max_trial (int): 寻找新供应商的最大尝试次数。
- remove_t (int): 在网络中移除节点的时间步。
- netw_prf_n (int): 每个企业的首选供应商数量。
- seed (int): 随机种子的值,用于确保实验的可重复性。
"""
# 仿真参数
self.firm_prod_labels_dict = None
self.firm_relationship_cache = None
self.firm_product_cache = None
self.t = 0
self.is_prf_size = params['prf_size'] # 是否在选择供应商时考虑企业规模。
self.prf_conn = params['prf_conn'] # 企业建立新连接的概率。
self.cap_limit_prob_type = params['cap_limit_prob_type'] # 产能限制的概率分布类型。
self.cap_limit_level = params['cap_limit_level'] # 产能限制的水平。
self.diff_new_conn = params['diff_new_conn'] # 是否允许差异化的新连接。
# 初始化停止时间步,可能是用户通过参数传入
self.int_stop_ts = params.get('stop_t', 3) # 默认停止时间为 100
# 网络初始化
self.firm_network = nx.MultiDiGraph() # 企业之间的有向多重图。
self.firm_prod_network = nx.MultiDiGraph() # 企业与产品关系的有向多重图。
self.product_network = nx.MultiDiGraph() # 产品之间的有向多重图。
self.G_FirmProd = nx.MultiDiGraph() # 初始化 企业-产品 关系的图
self.G_Firm = nx.MultiDiGraph() # 使用 NetworkX 的有向多重图
# BOM物料清单
self.G_bom = nx.adjacency_graph(json.loads(params['g_bom'])) # 表示 BOM 结构的图。
# 随机数生成器
self.nprandom = np.random.default_rng(params['seed']) # 基于固定种子的随机数生成器。
# 样本和实验参数
self.sample = params['sample'] # 仿真的样本对象。
self.int_n_iter = int(params['n_iter']) # 仿真的迭代次数。
self.dct_lst_init_disrupt_firm_prod = params['dct_lst_init_disrupt_firm_prod'] # 初始企业-产品干扰关系。
# 外部变量
self.int_n_max_trial = int(params['n_max_trial']) # 寻找新供应商的最大尝试次数。
self.remove_t = int(params['remove_t']) # 在网络中移除节点的时间步。
self.int_netw_prf_n = int(params['netw_prf_n']) # 每个企业的首选供应商数量。
# 数据收集器
self.data_collector = DataCollector(
agent_reporters={"Product": "name"} # 收集代理的名称。
)
self.product_agents = [] # 初始化产品代理列表
self.company_agents = [] # 初始化公司代理列表
# 初始化模型的网络和代理
self.initialize_product_network(params) # 初始化产品网络。
self.initialize_firm_network() # 初始化企业网络。
self.build_firm_prod_labels_dict() # 构建企业与产品的映射关系字典
self.initialize_firm_product_network() # 初始化企业与产品的网络。
self.add_edges_to_firm_network() # 添加企业之间的边。
self.connect_unconnected_nodes() # 连接未连接的节点。
self.resource_integration()
self.j_comp_consumed_produced()
self.initialize_agents() # 初始化代理。
self.initialize_disruptions() # 初始化干扰。
def initialize_product_network(self, params):
"""
初始化产品网络。
参数:
- params (dict): 包含模型初始化参数的字典。
功能:
1. 从参数中加载 BOM 图 (Bill of Materials) 并构建产品网络。
2. 加载产品节点数据并提取产品种类和索引。
"""
try:
# 从参数中解析 BOM 图,并构建 NetworkX 的图结构
self.product_network = nx.adjacency_graph(json.loads(params['g_bom']))
except Exception as e:
print(f"Failed to initialize product network: {e}")
self.product_network = nx.MultiDiGraph() # 初始化为空图,以防后续出错
# 加载产品数据
try:
data = pd.read_csv('input_data/input_product_data/BomNodes.csv') # 读取产品节点数据
data['Code'] = data['Code'].astype('string') # 确保 Code 字段为字符串类型
# 保存产品数据和产品类别索引
self.type2 = data # 全量产品数据
self.id_code = data.groupby('Code')['Index'].apply(list) # 根据产品代码分组索引
except FileNotFoundError:
print("Error: File 'BomNodes.csv' not found.")
self.type2 = pd.DataFrame() # 设为空 DataFrame 以防后续出错
self.id_code = {}
except Exception as e:
print(f"Error loading product data: {e}")
self.type2 = pd.DataFrame()
self.id_code = {}
# 此处可以进一步处理设备折旧比值(如果有具体逻辑,可以在此补充)
def initialize_firm_network(self):
"""
初始化企业网络,处理一个 Code 映射到多个 Index 的情况,并缓存所有相关属性。
"""
cache_file = "firm_network_cache.pkl"
# 检查是否存在缓存
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
# 加载缓存的属性
self.G_Firm = cached_data['G_Firm']
self.firm_product_cache = cached_data['firm_product_cache']
self.firm_relationship_cache = cached_data['firm_relationship_cache']
print("Loaded firm network and related data from cache.")
return
# 如果没有缓存,则从头初始化网络
try:
# 加载企业数据
firm_data = pd.read_csv("input_data/input_firm_data/firm_amended.csv", dtype={'Code': str})
firm_data['Code'] = firm_data['Code'].str.replace('.0', '', regex=False)
# 创建企业网络图
self.G_Firm = nx.Graph()
self.G_Firm.add_nodes_from(firm_data['Code'])
# 设置节点属性
firm_attributes = firm_data.set_index('Code').to_dict('index')
nx.set_node_attributes(self.G_Firm, firm_attributes)
print(f"Initialized G_Firm with {len(self.G_Firm.nodes)} nodes.")
# 加载企业与产品关系数据
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv", dtype={'Firm_Code': str})
bom_nodes = pd.read_csv("input_data/input_product_data/BomNodes.csv")
# 构建 Code -> [Index] 的多值映射
code_to_indices = bom_nodes.groupby('Code')['Index'].apply(list).to_dict()
# 将 Product_Code 转换为 Product_Indices
firm_industry_relation['Product_Indices'] = firm_industry_relation['Product_Code'].map(code_to_indices)
# 检查并处理未映射的 Product_Code
unmapped_products = firm_industry_relation[firm_industry_relation['Product_Indices'].isna()]
if not unmapped_products.empty:
print("Warning: The following Product_Code values could not be mapped to Index:")
print(unmapped_products[['Firm_Code', 'Product_Code']])
firm_industry_relation['Product_Indices'] = firm_industry_relation['Product_Indices'].apply(
lambda x: x if isinstance(x, list) else []
)
# 构建企业-产品映射缓存
self.firm_product_cache = (
firm_industry_relation.groupby('Firm_Code')['Product_Indices']
.apply(lambda indices: [idx for sublist in indices for idx in sublist]) # 展平嵌套列表
.to_dict()
)
print(f"Built firm_product_cache with {len(self.firm_product_cache)} entries.")
# 构建企业关系缓存
self.firm_relationship_cache = {
firm: self.compute_firm_relationship(firm, self.firm_product_cache)
for firm in self.firm_product_cache
}
print(f"Built firm_relationship_cache with {len(self.firm_relationship_cache)} entries.")
# 保存所有关键属性到缓存
cached_data = {
'G_Firm': self.G_Firm,
'firm_product_cache': self.firm_product_cache,
'firm_relationship_cache': self.firm_relationship_cache
}
with open(cache_file, 'wb') as f:
pickle.dump(cached_data, f)
print("Saved firm network and related data to cache.")
except Exception as e:
print(f"Error during network initialization: {e}")
def compute_firm_relationship(self, firm, firm_product_cache):
"""计算单个企业的供应链关系"""
lst_pred_product_code = []
for product_code in firm_product_cache[firm]:
lst_pred_product_code += list(self.G_bom.predecessors(product_code))
return list(set(lst_pred_product_code)) # 返回唯一值列表
def build_firm_prod_labels_dict(self):
"""
构建企业与产品的映射关系字典。
"""
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype(str)
self.firm_prod_labels_dict = (
firm_industry_relation.groupby('Firm_Code')['Product_Code']
.apply(list)
.to_dict()
)
def initialize_firm_product_network(self):
"""
初始化企业与产品的网络关系,并引入缓存机制。
功能:
1. 加载企业-行业关系数据。
2. 为每个企业和产品建立网络节点。
3. 将产品代码与索引进行映射,并为网络节点分配属性。
4. 缓存网络和相关数据以加速后续运行。
"""
cache_file = "firm_product_network_cache.pkl"
# 检查缓存文件是否存在
if os.path.exists(cache_file):
try:
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
self.G_FirmProd = cached_data['G_FirmProd']
print("Loaded firm-product network from cache.")
return
except Exception as e:
print(f"Error loading cache: {e}. Reinitializing network.")
try:
# 加载企业-行业关系数据
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype(str)
firm_industry_relation['Product_Code'] = firm_industry_relation['Product_Code'].apply(lambda x: [x])
# 创建企业-产品网络图
self.G_FirmProd.add_nodes_from(firm_industry_relation.index)
# 遍历数据行,将产品代码映射到索引,并更新关系表
for index, row in firm_industry_relation.iterrows():
id_index_list = []
for product_code in row['Product_Code']:
if str(product_code) in self.id_code:
id_index_list.extend(self.id_code[str(product_code)])
firm_industry_relation.at[index, 'Product_Code'] = id_index_list
# 为每个节点分配属性
firm_prod_labels_dict = {code: firm_industry_relation.loc[code].to_dict() for code in
firm_industry_relation.index}
nx.set_node_attributes(self.G_FirmProd, firm_prod_labels_dict)
print(f"Initialized G_FirmProd with {len(self.G_FirmProd.nodes)} nodes.")
# 缓存网络和数据
cached_data = {'G_FirmProd': self.G_FirmProd}
with open(cache_file, 'wb') as f:
pickle.dump(cached_data, f)
print("Saved firm-product network to cache.")
except FileNotFoundError as e:
print(f"Error: {e}. File not found.")
except Exception as e:
print(f"Error initializing firm-product network: {e}")
def compute_firm_supply_chain(self, firm_industry_relation, g_bom):
"""
根据 firm_industry_relation 和 g_bom 生成供应链缓存。
:param firm_industry_relation: 企业-产品关系 DataFrame
:param g_bom: BOM 网络图
:return: 缓存的供应链关系字典
"""
supply_chain_cache = {}
for firm_code, product_codes in firm_industry_relation.groupby('Firm_Code')['Product_Code']:
predecessors = set()
for product_code in product_codes:
predecessors.update(g_bom.predecessors(product_code))
supply_chain_cache[firm_code] = list(predecessors)
return supply_chain_cache
def add_edges_to_firm_network(self):
"""利用缓存加速企业网络边的添加"""
cache_file = "G_Firm_add_edges.pkl"
# 检查是否存在缓存
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
# 加载缓存的属性
self.G_Firm = cached_data
print("Loaded G_Firm_add_edges cache.")
return
for firm in self.firm_relationship_cache:
lst_pred_product_code = self.firm_relationship_cache[firm]
for pred_product_code in lst_pred_product_code:
lst_pred_firm = [
f for f, products in self.firm_product_cache.items()
if pred_product_code in products
]
# 使用缓存,避免重复查询
lst_choose_firm = self.select_firms(lst_pred_firm)
# 添加边
edges = [(pred_firm, firm, {'Product': pred_product_code}) for pred_firm in lst_choose_firm]
self.G_Firm.add_edges_from(edges)
cached_data = self.G_Firm
with open(cache_file, 'wb') as f:
pickle.dump(cached_data, f)
print("Saved G_Firm_add_edges to cache.")
def select_firms(self, lst_pred_firm):
"""
根据企业列表选择供应商。
"""
if not lst_pred_firm:
return [] # 如果列表为空,返回空列表
n_pred_firm = self.int_netw_prf_n # 最大选择的供应商数量
# 筛选有效节点并同步生成有效的企业规模
valid_firms = []
lst_pred_firm_size = []
for pred_firm in lst_pred_firm:
if pred_firm in self.G_Firm.nodes and 'Revenue_Log' in self.G_Firm.nodes[pred_firm]:
valid_firms.append(pred_firm)
lst_pred_firm_size.append(self.G_Firm.nodes[pred_firm]['Revenue_Log'])
# 如果未启用企业规模加权,随机选择
if not self.is_prf_size:
return self.nprandom.choice(valid_firms, size=min(n_pred_firm, len(valid_firms)), replace=False)
# 如果考虑企业规模,计算概率分布
if lst_pred_firm_size:
total_size = sum(lst_pred_firm_size)
lst_prob = [size / total_size for size in lst_pred_firm_size]
else:
lst_prob = []
# 确保长度一致
if len(valid_firms) != len(lst_prob):
print(f"Error: valid_firms and lst_prob have different sizes. "
f"valid_firms: {len(valid_firms)}, lst_prob: {len(lst_prob)}")
return [] # 返回空列表以避免错误
# 调用 numpy.random.choice
return self.nprandom.choice(valid_firms, size=min(n_pred_firm, len(valid_firms)), replace=False, p=lst_prob)
def add_edges_to_firm_product_network(self, node, pred_product_code, lst_choose_firm):
""" Helper function to add edges to the firm-product network """
cache_file = "G_FirmProd_cache.pkl"
# 检查是否存在缓存
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
# 加载缓存的属性
self.G_FirmProd = cached_data
print("Loaded add_edges_to_firm from cache.")
return
set_node_prod_code = set(self.G_Firm.nodes[node]['Product_Code'])
set_pred_succ_code = set(self.G_bom.successors(pred_product_code))
lst_use_pred_prod_code = list(set_node_prod_code & set_pred_succ_code)
if len(lst_use_pred_prod_code) == 0:
print("错误")
pred_node_list = []
for pred_firm in lst_choose_firm:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == pred_product_code and v['Firm_Code'] == pred_firm:
pred_node_list.append(n)
if len(pred_node_list) != 0:
pred_node = pred_node_list[0]
else:
pred_node = -1
current_node_list = []
for use_pred_prod_code in lst_use_pred_prod_code:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == use_pred_prod_code and v['Firm_Code'] == node:
current_node_list.append(n)
if len(current_node_list) != 0:
current_node = current_node_list[0]
else:
current_node = -1
if current_node != -1 and pred_node != -1:
self.G_FirmProd.add_edge(pred_node, current_node)
# 保存所有关键属性到缓存
cached_data = self.G_FirmProd
with open(cache_file, 'wb') as f:
pickle.dump(cached_data, f)
print("Saved G_FirmProd to cache.")
def connect_unconnected_nodes(self):
"""
连接企业网络中未连接的节点。
功能:
- 遍历 G_Firm 图中未连接的节点。
- 为未连接节点添加边,连接到可能的下游企业。
- 同时更新 G_FirmProd 网络,反映企业与产品的关系。
"""
cache_file = "connect_unconnected_nodes_cache.pkl"
# 检查是否存在缓存
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
# 加载缓存的属性
self.G_Firm = cached_data['firm_network']
self.firm_product_cache = cached_data['firm_prod_network']
print("Loaded G_Firm and firm_product_cache from cache.")
return
for node in nx.nodes(self.G_Firm):
# 如果节点没有任何连接,则处理该节点
if self.G_Firm.degree(node) == 0:
# 获取当前节点的产品列表
product_codes = self.G_Firm.nodes[node].get('Product_Code', [])
for product_code in product_codes:
# 查找与当前产品相关的 FirmProd 节点
current_node_list = [
n for n, v in self.G_FirmProd.nodes(data=True)
if v['Firm_Code'] == node and product_code in v['Product_Code']
]
current_node = current_node_list[0] if current_node_list else -1
# 查找当前产品的所有下游产品代码
succ_product_codes = list(self.G_bom.successors(product_code))
for succ_product_code in succ_product_codes:
# 查找生产下游产品的企业
succ_firms = [
firm_code for firm_code, products in self.firm_prod_labels_dict.items()
if succ_product_code in products
]
# 确定供应商数量限制
n_succ_firm = min(len(succ_firms), self.int_netw_prf_n)
if n_succ_firm == 0:
continue
# 选择供应商
if self.is_prf_size:
# 基于企业规模选择供应商
succ_firm_sizes = [
self.G_Firm.nodes[succ_firm].get('Revenue_Log', 0)
for succ_firm in succ_firms
]
if sum(succ_firm_sizes) > 0:
probs = [size / sum(succ_firm_sizes) for size in succ_firm_sizes]
selected_firms = self.nprandom.choice(succ_firms, size=n_succ_firm, replace=False,
p=probs)
else:
selected_firms = []
else:
# 随机选择供应商
selected_firms = self.nprandom.choice(succ_firms, size=n_succ_firm, replace=False)
# 添加边到 G_Firm 图
edges = [(node, firm, {'Product': product_code}) for firm in selected_firms]
self.G_Firm.add_edges_from(edges)
# 更新 G_FirmProd 网络
for succ_firm in selected_firms:
succ_node_list = [
n for n, v in self.G_FirmProd.nodes(data=True)
if v['Firm_Code'] == succ_firm and succ_product_code in v['Product_Code']
]
succ_node = succ_node_list[0] if succ_node_list else -1
if current_node != -1 and succ_node != -1:
self.G_FirmProd.add_edge(current_node, succ_node)
# 保存网络数据到样本
self.firm_network = self.G_Firm # 使用 networkx 图对象表示的企业网络
self.firm_prod_network = self.G_FirmProd # 使用 networkx 图对象表示的企业与产品关系网络
cached_data = {
'firm_network': self.firm_network,
'firm_prod_network': self.firm_prod_network,
}
with open(cache_file, 'wb') as f:
pickle.dump(cached_data, f)
print("Saved firm network and related data to cache.")
def initialize_agents(self):
"""
初始化代理并添加到模型中。
功能:
1. 根据产品网络初始化产品代理。
2. 根据企业网络初始化企业代理。
"""
# 初始化产品代理
for ag_node, attr in self.product_network.nodes(data=True):
# 创建产品代理
product_agent = ProductAgent(
unique_id=ag_node,
model=self,
name=attr.get('Name', 'Unknown'), # 防止 Name 属性缺失
type2=0,
production_ratio=0
)
self.add_agent(product_agent)
# 初始化企业代理
for ag_node, attr in self.firm_network.nodes(data=True):
# 获取与企业相关的产品代理
a_lst_product = [
agent for agent in self.product_agents if agent.unique_id in attr.get('Product_Code', [])
]
# 获取企业的需求数量和生产输出
demand_quantity = self.data_materials.loc[self.data_materials['Firm_Code'] == ag_node]
production_output = self.data_produced.loc[self.data_produced['Firm_Code'] == ag_node]
# 获取企业的资源信息
try:
R = self.firm_resource_R.loc[int(ag_node)]
P = self.firm_resource_P.loc[int(ag_node)]
C = self.firm_resource_C.loc[int(ag_node)]
except KeyError:
# 如果资源数据缺失,提供默认值
R, P, C = [], [], []
# 创建企业代理
firm_agent = FirmAgent(
unique_id=ag_node,
model=self,
type_region=attr.get('Type_Region', 'Unknown'),
revenue_log=attr.get('Revenue_Log', 0),
a_lst_product=a_lst_product,
demand_quantity=demand_quantity,
production_output=production_output,
R=R,
P=P,
C=C
)
self.add_agent(firm_agent)
def initialize_disruptions(self):
"""
初始化公司与其受干扰产品的映射,并更新干扰状态。
功能:
- 构建公司与受干扰产品的映射字典。
- 更新公司与产品的生产状态为干扰状态。
"""
# 构建公司与受干扰产品的映射字典
disruption_mapping = {
firm: [
product for product in self.product_agents
if product.unique_id in lst_product
]
for firm_code, lst_product in self.dct_lst_init_disrupt_firm_prod.items()
if (firm := next((f for f in self.company_agents if f.unique_id == firm_code), None))
}
# 更新干扰字典
self.dct_lst_init_disrupt_firm_prod = disruption_mapping
# 设置初始干扰状态
for firm, disrupted_products in disruption_mapping.items():
for product in disrupted_products:
# 确保产品在公司的生产状态中
if product not in firm.dct_prod_up_prod_stat:
raise ValueError(
f"Product {product.unique_id} not found in firm {firm.unique_id}'s production status.")
# 更新产品状态为干扰状态,并记录干扰时间
firm.dct_prod_up_prod_stat[product]['p_stat'].append(('D', self.t))
def add_agent(self, agent):
if isinstance(agent, FirmAgent):
self.company_agents.append(agent)
elif isinstance(agent, ProductAgent):
self.product_agents.append(agent)
def resource_integration(self):
"""
整合企业资源,包括材料、设备和产品数据。
功能:
- 加载并处理企业的材料、设备和产品数据。
- 合并设备数据与设备残值数据。
- 按企业分组生成资源列表。
"""
try:
# 加载企业的材料、设备和产品数据
data_R = pd.read_csv("input_data/input_firm_data/firms_materials.csv")
data_C = pd.read_csv("input_data/input_firm_data/firms_devices.csv")
data_P = pd.read_csv("input_data/input_firm_data/firms_products.csv")
# 加载设备残值数据,并合并到设备数据中
device_salvage_values = pd.read_csv('input_data/device_salvage_values.csv')
self.device_salvage_values = device_salvage_values
# 合并设备数据和设备残值
data_merged_C = pd.merge(data_C, device_salvage_values, on='设备id', how='left')
# 按企业分组并生成资源列表
firm_resource_R = (
data_R.groupby('Firm_Code')[['材料id', '材料数量']]
.apply(lambda x: x.values.tolist())
.to_dict() # 转换为字典格式,便于快速查询
)
firm_resource_C = (
data_merged_C.groupby('Firm_Code')[['设备id', '设备数量', '设备残值']]
.apply(lambda x: x.values.tolist())
.to_dict()
)
firm_resource_P = (
data_P.groupby('Firm_Code')[['产品id', '产品数量']]
.apply(lambda x: x.values.tolist())
.to_dict()
)
# 将结果存储到模型中
self.firm_resource_R = firm_resource_R
self.firm_resource_C = firm_resource_C
self.firm_resource_P = firm_resource_P
except FileNotFoundError as e:
print(f"Error: Missing input file - {e.filename}")
self.firm_resource_R, self.firm_resource_C, self.firm_resource_P = {}, {}, {}
except Exception as e:
print(f"Error during resource integration: {e}")
self.firm_resource_R, self.firm_resource_C, self.firm_resource_P = {}, {}, {}
def j_comp_consumed_produced(self):
"""
处理企业的材料消耗与产品生产数据,并计算生产比例。
功能:
- 加载材料消耗数据、产品生产数据和生产比例数据。
- 按企业分组整理未消耗材料和未生产产品的数据。
- 整理生产比例数据,便于后续使用。
"""
try:
# 加载数据
data_materials = pd.read_csv('input_data/input_firm_data/firms_materials.csv')
data_produced = pd.read_csv('input_data/input_firm_data/firms_products.csv')
data_production_ratio = pd.read_csv('input_data/产品消耗制造比例.csv')
# 处理未消耗材料数据
data_not_consumed = (
data_materials.groupby('Firm_Code')[['材料id', '材料数量']]
.apply(lambda x: dict(zip(x['材料id'], x['材料数量'])))
.reset_index(name='Materials_not_Consumed')
)
# 处理未生产产品数据
data_not_produced = (
data_produced.groupby('Firm_Code')[['产品id', '产品数量']]
.apply(lambda x: dict(zip(x['产品id'], x['产品数量'])))
.reset_index(name='Products_not_Produced')
)
# 整理生产比例数据
data_production_ratio = (
data_production_ratio.groupby('IndustryID')[['MaterialID', 'Quantity']]
.apply(lambda x: dict(zip(x['MaterialID'], x['Quantity'])))
.reset_index(name='Production_Ratio')
)
# 将处理后的数据存储到模型中
self.data_materials = data_not_consumed
self.data_produced = data_not_produced
self.data_production_ratio = data_production_ratio
except FileNotFoundError as e:
print(f"Error: Missing input file - {e.filename}")
self.data_materials, self.data_produced, self.data_production_ratio = None, None, None
except Exception as e:
print(f"Error during consumption and production computation: {e}")
self.data_materials, self.data_produced, self.data_production_ratio = None, None, None
def step(self):
"""
模拟一个时间步,包括以下过程:
1. 移除客户边和中断产品。
2. 进行尝试过程,寻找替代供应链。
3. 判断资源和设备是否需要采购,并处理采购。
4. 资源消耗和产品生产。
"""
# 1. 移除客户边并中断客户上游产品
self._remove_disrupted_edges()
self._disrupt_upstream_products()
# 2. 尝试寻找替代供应链
self._trial_process()
# 3. 判断是否需要采购资源和设备
self._handle_resource_and_machinery_purchase()
# 4. 资源消耗和产品生产
self._consume_resources_and_produce()
# 增加时间步
self.t += 1
# 子方法定义
def _remove_disrupted_edges(self):
"""移除被中断的客户边。"""
for firm in self.company_agents:
for prod, prod_stat in firm.dct_prod_up_prod_stat.items():
status, ts = prod_stat['p_stat'][-1]
if status == 'D' and ts == self.t - 1:
firm.remove_edge_to_cus(prod)
def _disrupt_upstream_products(self):
"""中断客户的上游产品。"""
for firm in self.company_agents:
for prod, prod_stat in firm.dct_prod_up_prod_stat.items():
for up_prod, up_stat in prod_stat['s_stat'].items():
if up_stat['set_disrupt_firm']:
firm.disrupt_cus_prod(prod, up_prod)
def _trial_process(self):
"""尝试寻找替代供应链。"""
for n_trial in range(self.int_n_max_trial):
shuffle(self.company_agents) # 打乱顺序
is_stop_trial = True
for firm in self.company_agents:
lst_seek_prod = [
supply for prod, prod_stat in firm.dct_prod_up_prod_stat.items()
if prod_stat['p_stat'][-1][0] == 'D'
for supply, supply_stat in prod_stat['s_stat'].items()
if not supply_stat['stat']
]
if lst_seek_prod:
is_stop_trial = False
for supply in set(lst_seek_prod):
firm.seek_alt_supply(supply)
if is_stop_trial:
break
# 处理请求
shuffle(self.company_agents)
for firm in self.company_agents:
if firm.dct_request_prod_from_firm:
firm.handle_request()
# 重置请求状态
for firm in self.company_agents:
firm.clean_before_trial()
def _handle_resource_and_machinery_purchase(self):
"""处理资源和设备的采购。"""
for firm in self.company_agents:
# 判断资源需求
for material_id, material_quantity in firm.R:
if material_quantity <= firm.s_r:
required_quantity = firm.S_r - material_quantity
firm.request_material_purchase(material_id, required_quantity)
# 判断设备需求
for device_id, device_quantity, device_salvage in firm.C:
device_salvage -= firm.x
if device_salvage <= 0: # 如果设备残值小于等于 0
device_quantity -= 1
firm.request_device_purchase(device_id, 1)
def _consume_resources_and_produce(self):
"""消耗资源并生产产品。"""
k = 0.6 # 资源消耗比例
for firm in self.company_agents:
# 计算消耗量
consumed_resources = {}
for industry in firm.indus_i:
for product_id, product_quantity in firm.P.items():
if product_id == industry.unique_id:
consumed_resources[industry] = product_quantity * k
# 消耗资源
for resource_id, resource_quantity in firm.R.items():
for industry, consumed_quantity in consumed_resources.items():
if resource_id in industry.resource_ids:
firm.R[resource_id] -= consumed_quantity
# 生产产品
for product_id, product_quantity in firm.P.items():
firm.P[product_id] = product_quantity * 1.6
# 刷新资源和设备状态
firm.refresh_R()
firm.refresh_C()
firm.refresh_P()
def end(self):
"""
结束模型运行并保存结果。
功能:
- 检查结果是否已存在,避免重复写入。
- 保存企业和产品的生产状态到数据库。
- 更新样本的状态为完成,并记录停止时间和计算机名称。
"""
# 检查当前样本结果是否已存在
qry_result = db_session.query(Result).filter_by(s_id=self.sample.id)
if qry_result.count() == 0:
# 收集所有结果
lst_result_info = []
for firm in self.company_agents:
for prod, dct_status_supply in firm.dct_prod_up_prod_stat.items():
# 检查产品状态是否都为正常
if not all(status == 'N' for status, _ in dct_status_supply['p_stat']):
for status, ts in dct_status_supply['p_stat']:
# 创建结果对象
lst_result_info.append(Result(
s_id=self.sample.id,
id_firm=firm.unique_id,
id_product=prod.unique_id,
ts=ts,
status=status
))
# 批量保存结果
if lst_result_info:
db_session.bulk_save_objects(lst_result_info)
db_session.commit()
# 更新样本状态为已完成
self.sample.is_done_flag = 1
self.sample.computer_name = platform.node()
self.sample.stop_t = self.int_stop_ts
db_session.commit()