mesa/my_model.py

889 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import pickle
from random import shuffle
import platform
import networkx as nx
import pandas as pd
from mesa import Model
from mesa.space import MultiGrid, NetworkGrid
from mesa.datacollection import DataCollector
import numpy as np
from mesa_viz_tornado.modules import NetworkModule
from firm import FirmAgent
from orm import db_session, Result
from product import ProductAgent
from mesa.visualization import ModularServer
class MyModel(Model):
def __init__(self, params):
"""
初始化模型,并设置模型的主要参数。
参数说明:
- params (dict): 包含模型所需的所有参数的字典。
主要参数:
- prf_size (bool): 是否在选择供应商时考虑企业规模。
- prf_conn (float): 企业建立新连接的概率。
- cap_limit_prob_type (str): 产能限制的概率分布类型。
- cap_limit_level (float): 产能限制的水平。
- diff_new_conn (bool): 是否允许差异化的新连接。
- g_bom (str): BOM物料清单图的 JSON 表示形式。
- sample (object): 包含实验数据的样本对象。
- n_iter (int): 仿真的迭代次数。
- dct_lst_init_disrupt_firm_prod (dict): 初始企业-产品干扰的字典。
- n_max_trial (int): 寻找新供应商的最大尝试次数。
- remove_t (int): 在网络中移除节点的时间步。
- netw_prf_n (int): 每个企业的首选供应商数量。
- seed (int): 随机种子的值,用于确保实验的可重复性。
"""
# 仿真参数
self.agent_map = None
self.firm_prod_labels_dict = None
self.firm_relationship_cache = None
self.firm_product_cache = None
self.t = 0
self.is_prf_size = params['prf_size'] # 是否在选择供应商时考虑企业规模。
self.prf_conn = params['prf_conn'] # 企业建立新连接的概率。
self.cap_limit_prob_type = params['cap_limit_prob_type'] # 产能限制的概率分布类型。
self.cap_limit_level = params['cap_limit_level'] # 产能限制的水平。
self.diff_new_conn = params['diff_new_conn'] # 是否允许差异化的新连接。
# 初始化停止时间步,可能是用户通过参数传入
self.int_stop_ts = params.get('n_iter', 3) # 默认停止时间为 100
# 网络初始化
self.firm_network = nx.MultiDiGraph() # 企业之间的有向多重图。
self.firm_prod_network = nx.MultiDiGraph() # 企业与产品关系的有向多重图。
self.product_network = nx.MultiDiGraph() # 产品之间的有向多重图。
# BOM物料清单
self.g_bom = nx.adjacency_graph(json.loads(params['g_bom'])) # 表示 BOM 结构的图。
# 随机数生成器
self.nprandom = np.random.default_rng(params['seed']) # 基于固定种子的随机数生成器。
# 样本和实验参数
self.sample = params['sample'] # 仿真的样本对象。
self.int_n_iter = int(params['n_iter']) # 仿真的迭代次数。
self.dct_lst_init_disrupt_firm_prod = params['dct_lst_init_disrupt_firm_prod'] # 初始企业-产品干扰关系。
# 外部变量
self.int_n_max_trial = int(params['n_max_trial']) # 寻找新供应商的最大尝试次数。
self.remove_t = int(params['remove_t']) # 在网络中移除节点的时间步。
self.int_netw_prf_n = int(params['netw_prf_n']) # 每个企业的首选供应商数量。
# 数据收集器
self.data_collector = DataCollector(
agent_reporters={"Product": "name"} # 收集代理的名称。
)
self.product_agents = [] # 初始化产品代理列表
self.company_agents = [] # 初始化公司代理列表
# 初始化模型的网络和代理
# 检查缓存是否存在
cache_file = "firm_network.pkl"
if os.path.exists(cache_file):
# 从缓存加载 firm_network
with open(cache_file, 'rb') as f:
self.firm_network = pickle.load(f)
print("Loaded firm network from cache.")
else:
# 执行完整的初始化流程
self.initialize_product_network(params)
self.initialize_firm_network()
self.build_firm_prod_labels_dict()
self.initialize_firm_product_network()
self.add_edges_to_firm_network()
self.connect_unconnected_nodes()
self.initialize_product_network(params) # 初始化产品网络。
self.resource_integration()
self.j_comp_consumed_produced()
self.initialize_agents() # 初始化代理。
self.initialize_disruptions() # 初始化干扰。
def initialize_product_network(self, params):
"""
初始化产品网络。
参数:
- params (dict): 包含模型初始化参数的字典。
功能:
1. 从参数中加载 BOM 图 (Bill of Materials) 并构建产品网络。
2. 加载产品节点数据并提取产品种类和索引。
"""
try:
# 从参数中解析 BOM 图,并构建 NetworkX 的图结构
self.product_network = nx.adjacency_graph(json.loads(params['g_bom']))
except Exception as e:
print(f"Failed to initialize product network: {e}")
self.product_network = nx.MultiDiGraph() # 初始化为空图,以防后续出错
# 加载产品数据
try:
data = pd.read_csv('input_data/input_product_data/BomNodes.csv') # 读取产品节点数据
data['Code'] = data['Code'].astype('string') # 确保 Code 字段为字符串类型
# 保存产品数据和产品类别索引
self.type2 = data # 全量产品数据
self.id_code = data.groupby('Code')['Index'].apply(list) # 根据产品代码分组索引
except FileNotFoundError:
print("Error: File 'BomNodes.csv' not found.")
self.type2 = pd.DataFrame() # 设为空 DataFrame 以防后续出错
self.id_code = {}
except Exception as e:
print(f"Error loading product data: {e}")
self.type2 = pd.DataFrame()
self.id_code = {}
# 此处可以进一步处理设备折旧比值(如果有具体逻辑,可以在此补充)
def initialize_firm_network(self):
"""
初始化企业网络,处理一个 Code 映射到多个 Index 的情况,并缓存所有相关属性。
"""
# 加载企业数据
firm_data = pd.read_csv("input_data/input_firm_data/firm_amended.csv", dtype={'Code': str})
firm_data['Code'] = firm_data['Code'].str.replace('.0', '', regex=False)
# 加载企业与产品关系数据
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv", dtype={'Firm_Code': str})
bom_nodes = pd.read_csv("input_data/input_product_data/BomNodes.csv")
# 构建 Code -> [Index] 的多值映射
code_to_indices = bom_nodes.groupby('Code')['Index'].apply(list).to_dict()
# 将 Product_Code 转换为 Product_Indices
firm_industry_relation['Product_Indices'] = firm_industry_relation['Product_Code'].map(code_to_indices)
# 检查并处理未映射的 Product_Code
unmapped_products = firm_industry_relation[firm_industry_relation['Product_Indices'].isna()]
if not unmapped_products.empty:
print("Warning: The following Product_Code values could not be mapped to Index:")
print(unmapped_products[['Firm_Code', 'Product_Code']])
firm_industry_relation['Product_Indices'] = firm_industry_relation['Product_Indices'].apply(
lambda x: x if isinstance(x, list) else []
)
# 按 Firm_Code 分组生成企业的 Product_Code 和 Product_Indices 映射
firm_product = (
firm_industry_relation.groupby('Firm_Code')['Product_Code'].apply(list)
)
firm_product_indices = (
firm_industry_relation.groupby('Firm_Code')['Product_Indices']
.apply(lambda indices: [idx for sublist in indices for idx in sublist])
)
# 设置企业属性并添加到网络中
firm_attributes = firm_data.copy()
firm_attributes['Product_Indices'] = firm_attributes['Code'].map(firm_product)
firm_attributes['Product_Code'] = firm_attributes['Code'].map(firm_product_indices)
firm_attributes.set_index('Code', inplace=True)
self.firm_network.add_nodes_from(firm_data['Code'])
# 为企业节点分配属性
firm_labels_dict = {code: firm_attributes.loc[code].to_dict() for code in self.firm_network.nodes}
nx.set_node_attributes(self.firm_network, firm_labels_dict)
# 构建企业-产品映射缓存
self.firm_product_cache = firm_product_indices.to_dict()
# 构建企业关系缓存
self.firm_relationship_cache = {
firm: self.compute_firm_relationship(firm, self.firm_product_cache)
for firm in self.firm_product_cache
}
def compute_firm_relationship(self, firm, firm_product_cache):
"""计算单个企业的供应链关系"""
lst_pred_product_code = []
for product_code in firm_product_cache[firm]:
lst_pred_product_code += list(self.g_bom.predecessors(product_code))
return list(set(lst_pred_product_code)) # 返回唯一值列表
def build_firm_prod_labels_dict(self):
"""
构建企业与产品的映射关系字典。
"""
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype(str)
self.firm_prod_labels_dict = (
firm_industry_relation.groupby('Firm_Code')['Product_Code']
.apply(list)
.to_dict()
)
def initialize_firm_product_network(self):
"""
初始化企业与产品的网络关系,并引入缓存机制。
功能:
1. 加载企业-行业关系数据。
2. 为每个企业和产品建立网络节点。
3. 将产品代码与索引进行映射,并为网络节点分配属性。
4. 缓存网络和相关数据以加速后续运行。
"""
# 加载企业-行业关系数据
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype(str)
firm_industry_relation['Product_Code'] = firm_industry_relation['Product_Code'].apply(lambda x: [x])
# 映射产品代码到索引
firm_industry_relation['Product_Code'] = firm_industry_relation['Product_Code'].apply(
lambda codes: [idx for code in codes for idx in self.id_code.get(str(code), [])]
)
# 创建企业-产品网络图,同时附带属性
nodes_with_attributes = [
(index, firm_industry_relation.loc[index].to_dict())
for index in firm_industry_relation.index
]
self.firm_prod_network.add_nodes_from(nodes_with_attributes)
def compute_firm_supply_chain(self, firm_industry_relation, g_bom):
"""
根据 firm_industry_relation 和 g_bom 生成供应链缓存。
:param firm_industry_relation: 企业-产品关系 DataFrame
:param g_bom: BOM 网络图
:return: 缓存的供应链关系字典
"""
supply_chain_cache = {}
for firm_code, product_codes in firm_industry_relation.groupby('Firm_Code')['Product_Code']:
predecessors = set()
for product_code in product_codes:
predecessors.update(g_bom.predecessors(product_code))
supply_chain_cache[firm_code] = list(predecessors)
return supply_chain_cache
def add_edges_to_firm_network(self):
for firm in self.firm_relationship_cache:
lst_pred_product_code = self.firm_relationship_cache[firm]
for pred_product_code in lst_pred_product_code:
lst_pred_firm = [
f for f, products in self.firm_product_cache.items()
if pred_product_code in products
]
lst_choose_firm = self.select_firms(lst_pred_firm)
# 添加边
edges = [(pred_firm, firm, {'Product': pred_product_code}) for pred_firm in lst_choose_firm]
self.firm_network.add_edges_from(edges)
def select_firms(self, lst_pred_firm):
"""
根据企业列表选择供应商。
"""
if not lst_pred_firm:
return [] # 如果列表为空,返回空列表
n_pred_firm = self.int_netw_prf_n # 最大选择的供应商数量
# 筛选有效节点并同步生成有效的企业规模
valid_firms = []
lst_pred_firm_size = []
for pred_firm in lst_pred_firm:
if pred_firm in self.firm_network.nodes and 'Revenue_Log' in self.firm_network.nodes[pred_firm]:
valid_firms.append(pred_firm)
lst_pred_firm_size.append(self.firm_network.nodes[pred_firm]['Revenue_Log'])
# 如果未启用企业规模加权,随机选择
if not self.is_prf_size:
return self.nprandom.choice(valid_firms, size=min(n_pred_firm, len(valid_firms)), replace=False)
# 如果考虑企业规模,计算概率分布
if lst_pred_firm_size:
total_size = sum(lst_pred_firm_size)
lst_prob = [size / total_size for size in lst_pred_firm_size]
else:
lst_prob = []
# 确保长度一致
if len(valid_firms) != len(lst_prob):
print(f"Error: valid_firms and lst_prob have different sizes. "
f"valid_firms: {len(valid_firms)}, lst_prob: {len(lst_prob)}")
return [] # 返回空列表以避免错误
# 调用 numpy.random.choice
return self.nprandom.choice(valid_firms, size=min(n_pred_firm, len(valid_firms)), replace=False, p=lst_prob)
def add_edges_to_firm_product_network(self, node, pred_product_code, lst_choose_firm):
""" Helper function to add edges to the firm-product network """
set_node_prod_code = set(self.firm_network.nodes[node]['Product_Code'])
set_pred_succ_code = set(self.g_bom.successors(pred_product_code))
lst_use_pred_prod_code = list(set_node_prod_code & set_pred_succ_code)
if len(lst_use_pred_prod_code) == 0:
print("错误")
pred_node_list = []
for pred_firm in lst_choose_firm:
for n, v in self.firm_prod_network.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == pred_product_code and v['Firm_Code'] == pred_firm:
pred_node_list.append(n)
if len(pred_node_list) != 0:
pred_node = pred_node_list[0]
else:
pred_node = -1
current_node_list = []
for use_pred_prod_code in lst_use_pred_prod_code:
for n, v in self.firm_prod_network.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == use_pred_prod_code and v['Firm_Code'] == node:
current_node_list.append(n)
if len(current_node_list) != 0:
current_node = current_node_list[0]
else:
current_node = -1
if current_node != -1 and pred_node != -1:
self.firm_prod_network.add_edge(pred_node, current_node)
def connect_unconnected_nodes(self):
"""
连接企业网络中未连接的节点。
功能:
- 遍历 G_Firm 图中未连接的节点。
- 为未连接节点添加边,连接到可能的下游企业。
- 同时更新 G_FirmProd 网络,反映企业与产品的关系。
"""
for node in nx.nodes(self.firm_network):
# 如果节点没有任何连接,则处理该节点
if self.firm_network.degree(node) == 0:
# 获取当前节点的产品列表
product_codes = self.firm_network.nodes[node].get('Product_Code', [])
for product_code in product_codes:
# 查找与当前产品相关的 FirmProd 节点
current_node_list = [
n for n, v in self.firm_prod_network.nodes(data=True)
if v['Firm_Code'] == node and product_code in v['Product_Code']
]
current_node = current_node_list[0] if current_node_list else -1
# 查找当前产品的所有下游产品代码
succ_product_codes = list(self.g_bom.successors(product_code))
for succ_product_code in succ_product_codes:
# 查找生产下游产品的企业
succ_firms = [
firm_code for firm_code, products in self.firm_prod_labels_dict.items()
if succ_product_code in products
]
# 确定供应商数量限制
n_succ_firm = min(len(succ_firms), self.int_netw_prf_n)
if n_succ_firm == 0:
continue
# 选择供应商
if self.is_prf_size:
# 基于企业规模选择供应商
succ_firm_sizes = [
self.firm_network.nodes[succ_firm].get('Revenue_Log', 0)
for succ_firm in succ_firms
]
if sum(succ_firm_sizes) > 0:
probs = [size / sum(succ_firm_sizes) for size in succ_firm_sizes]
selected_firms = self.nprandom.choice(succ_firms, size=n_succ_firm, replace=False,
p=probs)
else:
selected_firms = []
else:
# 随机选择供应商
selected_firms = self.nprandom.choice(succ_firms, size=n_succ_firm, replace=False)
# 添加边到 G_Firm 图
edges = [(node, firm, {'Product': product_code}) for firm in selected_firms]
self.firm_network.add_edges_from(edges)
# 更新 G_FirmProd 网络
for succ_firm in selected_firms:
succ_node_list = [
n for n, v in self.firm_prod_network.nodes(data=True)
if v['Firm_Code'] == succ_firm and succ_product_code in v['Product_Code']
]
succ_node = succ_node_list[0] if succ_node_list else -1
if current_node != -1 and succ_node != -1:
self.firm_prod_network.add_edge(current_node, succ_node)
# 保存构建完成的 firm_network 到缓存
cache_file = "firm_network.pkl"
os.makedirs("cache", exist_ok=True)
with open(cache_file, 'wb') as f:
pickle.dump(self.firm_network, f)
# print("Firm network has been saved to cache.")
def initialize_agents(self):
"""
初始化代理并添加到模型中。
功能:
1. 根据产品网络初始化产品代理。
2. 根据企业网络初始化企业代理。
"""
# 初始化产品代理
for ag_node, attr in self.product_network.nodes(data=True):
# 创建产品代理
product_agent = ProductAgent(
unique_id=ag_node,
model=self,
name=attr.get('Name', 'Unknown'), # 防止 Name 属性缺失
type2=0,
production_ratio=0
)
self.add_agent(product_agent)
# 初始化企业代理
for ag_node, attr in self.firm_network.nodes(data=True):
# 获取与企业相关的产品代理
a_lst_product = [
agent for agent in self.product_agents if agent.unique_id in attr.get('Product_Code', [])
]
# 获取企业的需求数量和生产输出
demand_quantity = self.data_materials.loc[self.data_materials['Firm_Code'] == int(ag_node)]
production_output = self.data_produced.loc[self.data_produced['Firm_Code'] == int(ag_node)]
# 获取企业的资源信息,同时处理 R、P、C 的情况
try:
R = self.firm_resource_R.loc[int(ag_node)]
P = self.firm_resource_P.get(int(ag_node))
C = self.firm_resource_C.loc[int(ag_node)]
except KeyError:
R, P, C = [], {}, [] # 如果任何资源不存在,返回空列表
# 在模型初始化时,构建 unique_id -> agent 的快速映射字典
self.agent_map = {agent.unique_id: agent for agent in self.company_agents}
# 创建企业代理
firm_agent = FirmAgent(
unique_id=ag_node,
model=self,
type_region=attr.get('Type_Region', 'Unknown'),
revenue_log=attr.get('Revenue_Log', 0),
a_lst_product=a_lst_product,
demand_quantity=demand_quantity,
production_output=production_output,
R=R,
P=P,
C=C
)
self.add_agent(firm_agent)
def initialize_disruptions(self):
"""
初始化公司与其受干扰产品的映射,并更新干扰状态。
功能:
- 构建公司与受干扰产品的映射字典。
- 更新公司与产品的生产状态为干扰状态。
"""
# 构建公司与受干扰产品的映射字典
disruption_mapping = {}
for firm_code, lst_product_indices in self.dct_lst_init_disrupt_firm_prod.items():
# 查找企业对象
firm = next((f for f in self.company_agents if f.unique_id == firm_code), None)
if not firm:
print(f"Warning: Firm {firm_code} not found. Skipping.")
continue
# 查找有效的产品代理
valid_products = [
product for product in self.product_agents if product.unique_id in lst_product_indices
]
if not valid_products:
print(f"Warning: No valid products found for Firm {firm_code}. Skipping.")
continue
# 更新映射
disruption_mapping[firm] = valid_products
# 更新干扰字典
self.dct_lst_init_disrupt_firm_prod = disruption_mapping
# 设置初始干扰状态
for firm, disrupted_products in disruption_mapping.items():
for product in disrupted_products:
# 检查产品是否在企业的生产状态中
if product not in firm.dct_prod_up_prod_stat:
print(
f"Warning: Product {product.unique_id} not found in firm "
f"{firm.unique_id}'s production status. Skipping."
)
continue
# 更新产品状态为干扰状态,并记录干扰时间
firm.dct_prod_up_prod_stat[product]['p_stat'].append(('D', self.t))
def add_agent(self, agent):
if isinstance(agent, FirmAgent):
self.company_agents.append(agent)
elif isinstance(agent, ProductAgent):
self.product_agents.append(agent)
def resource_integration(self):
"""
整合企业资源,包括材料、设备和产品数据。
功能:
- 加载并处理企业的材料、设备和产品数据。
- 合并设备数据与设备残值数据。
- 按企业分组生成资源列表。
"""
# 加载企业的材料、设备和产品数据
data_R = pd.read_csv("input_data/input_firm_data/firms_materials.csv")
data_C = pd.read_csv("input_data/input_firm_data/firms_devices.csv")
data_P = pd.read_csv("input_data/input_firm_data/firms_products.csv")
# 加载设备残值数据,并合并到设备数据中
device_salvage_values = pd.read_csv('input_data/device_salvage_values.csv')
self.device_salvage_values = device_salvage_values
# 合并设备数据和设备残值
data_merged_C = pd.merge(data_C, device_salvage_values, on='设备id', how='left')
# 按企业分组并生成资源列表
firm_resource_R = (
data_R.groupby('Firm_Code')[['材料id', '材料数量']]
.apply(lambda x: x.values.tolist())
)
firm_resource_C = (
data_merged_C.groupby('Firm_Code')[['设备id', '设备数量', '设备残值']]
.apply(lambda x: x.values.tolist())
)
firm_resource_P = (
data_P.groupby('Firm_Code')[['产品id', '产品数量']]
.apply(lambda x: x.values.tolist())
)
# 将结果存储到模型中
self.firm_resource_R = firm_resource_R
self.firm_resource_C = firm_resource_C
self.firm_resource_P = firm_resource_P
def j_comp_consumed_produced(self):
"""
处理企业的材料消耗与产品生产数据,并计算生产比例。
功能:
- 加载材料消耗数据、产品生产数据和生产比例数据。
- 按企业分组整理未消耗材料和未生产产品的数据。
- 整理生产比例数据,便于后续使用。
"""
try:
# 加载数据
data_materials = pd.read_csv('input_data/input_firm_data/firms_materials.csv')
data_produced = pd.read_csv('input_data/input_firm_data/firms_products.csv')
data_production_ratio = pd.read_csv('input_data/产品消耗制造比例.csv')
# 处理未消耗材料数据
data_not_consumed = (
data_materials.groupby('Firm_Code')[['材料id', '材料数量']]
.apply(lambda x: dict(zip(x['材料id'], x['材料数量'])))
.reset_index(name='Materials_not_Consumed')
)
# 处理未生产产品数据
data_not_produced = (
data_produced.groupby('Firm_Code')[['产品id', '产品数量']]
.apply(lambda x: dict(zip(x['产品id'], x['产品数量'])))
.reset_index(name='Products_not_Produced')
)
# 整理生产比例数据
data_production_ratio = (
data_production_ratio.groupby('IndustryID')[['MaterialID', 'Quantity']]
.apply(lambda x: dict(zip(x['MaterialID'], x['Quantity'])))
.reset_index(name='Production_Ratio')
)
# 将处理后的数据存储到模型中
self.data_materials = data_not_consumed
self.data_produced = data_not_produced
self.data_production_ratio = data_production_ratio
except FileNotFoundError as e:
print(f"Error: Missing input file - {e.filename}")
self.data_materials, self.data_produced, self.data_production_ratio = None, None, None
except Exception as e:
print(f"Error during consumption and production computation: {e}")
self.data_materials, self.data_produced, self.data_production_ratio = None, None, None
def step(self):
"""
模拟一个时间步,包括以下过程:
1. 移除客户边和中断产品。
2. 进行尝试过程,寻找替代供应链。
3. 判断资源和设备是否需要采购,并处理采购。
4. 资源消耗和产品生产。
"""
while self.t < self.int_stop_ts: # 使用循环控制时间步
# 1. 移除客户边并中断客户上游产品
self._remove_disrupted_edges()
self._disrupt_upstream_products()
# 2. 尝试寻找替代供应链
self._trial_process()
# 3. 判断是否需要采购资源和设备
self._handle_material_purchase()
self._handle_machinery_purchase()
# 4. 资源消耗和产品生产
self._consume_resources_and_produce()
# 5. 刷新企业干扰字典
self._process_firms_step()
# 增加时间步
self.t += 1
# 子方法定义
def _remove_disrupted_edges(self):
"""移除被中断的客户边。"""
for firm in self.company_agents:
for prod, prod_stat in firm.dct_prod_up_prod_stat.items():
status, ts = prod_stat['p_stat'][-1]
if status == 'D' and ts == self.t - 1:
firm.remove_edge_to_cus(prod)
def _disrupt_upstream_products(self):
"""中断客户的上游产品。"""
for firm in self.company_agents:
for prod, prod_stat in firm.dct_prod_up_prod_stat.items():
for up_prod, up_stat in prod_stat['s_stat'].items():
if up_stat['set_disrupt_firm']:
firm.disrupt_cus_prod(prod, up_prod)
def _trial_process(self):
"""尝试寻找替代供应链。"""
for n_trial in range(self.int_n_max_trial):
shuffle(self.company_agents) # 打乱顺序
is_stop_trial = True
for firm in self.company_agents:
lst_seek_prod = [
supply for prod, prod_stat in firm.dct_prod_up_prod_stat.items()
if prod_stat['p_stat'][-1][0] == 'D'
for supply, supply_stat in prod_stat['s_stat'].items()
if not supply_stat['stat']
]
if lst_seek_prod:
is_stop_trial = False
for supply in set(lst_seek_prod):
firm.seek_alt_supply(supply)
if is_stop_trial:
break
# 处理请求
shuffle(self.company_agents)
for firm in self.company_agents:
if firm.dct_request_prod_from_firm:
firm.handle_request()
# 重置请求状态
for firm in self.company_agents:
firm.clean_before_trial()
def _handle_material_purchase(self):
"""
判断并处理资源的采购。
"""
# 存储需要采购资源的企业及其需求
purchase_material_firms = {}
# 遍历所有企业,检查资源需求
for firm in self.company_agents:
if not firm.R: # 跳过没有资源的企业
continue
# 遍历资源列表,检查哪些资源需要补货
for resource_id, resource_quantity in firm.R:
if resource_quantity <= firm.s_r: # 如果资源低于阈值,记录需求
required_quantity = firm.S_r - resource_quantity
if firm not in purchase_material_firms:
purchase_material_firms[firm] = []
purchase_material_firms[firm].append((resource_id, required_quantity))
# 寻找供应商并处理补货
for firm, material_requests in purchase_material_firms.items():
for resource_id, required_quantity in material_requests:
# 寻找供应商
supplier = firm.seek_material_supply(resource_id)
if supplier != -1: # 如果找到供应商
# 供应商处理资源请求
supplier.handle_material_request([resource_id, required_quantity])
# 更新当前企业的资源数量
for resource in firm.R:
if resource[0] == resource_id:
resource[1] = firm.S_r
def _handle_machinery_purchase(self):
"""
判断并处理设备的采购。
"""
# 存储需要采购设备的企业及其需求
purchase_machinery_firms = {}
# 遍历所有企业,检查设备需求
for firm in self.company_agents:
if not firm.C: # 跳过没有设备的企业
continue
# 检查设备残值,记录需要补充的设备
for equipment in firm.C:
equipment_id, equipment_quantity, equipment_salvage = equipment
equipment_salvage -= firm.x # 减少设备残值
if equipment_salvage <= 0: # 如果残值小于等于 0
equipment_quantity -= 1
required_quantity = 1 # 需要补充的设备数量
if firm not in purchase_machinery_firms:
purchase_machinery_firms[firm] = []
purchase_machinery_firms[firm].append((equipment_id, required_quantity))
# 寻找供应商并处理设备补充
for firm, machinery_requests in purchase_machinery_firms.items():
for equipment_id, required_quantity in machinery_requests:
# 寻找供应商
supplier = firm.seek_machinery_supply(equipment_id)
if supplier != -1: # 如果找到供应商
# 供应商处理设备请求
supplier.handle_machinery_request([equipment_id, required_quantity])
# 恢复企业的设备数量和残值
for equipment, initial_equipment in zip(firm.C, firm.C0):
if equipment[0] == equipment_id:
equipment[1] = initial_equipment[1] # 恢复数量
equipment[2] = initial_equipment[2] # 恢复残值
def _consume_resources_and_produce(self):
"""
消耗资源并生产产品。
"""
k = 0.6 # 资源消耗比例
production_increase_ratio = 1.6 # 产品生产比例
# 遍历每个企业
for firm in self.company_agents:
# 计算资源消耗
consumed_resources = self._calculate_consumed_resources(firm, k)
# 消耗资源
self._consume_resources(firm, consumed_resources)
# 生产产品
self._produce_products(firm, production_increase_ratio)
# 刷新资源和设备状态
firm.refresh_R()
firm.refresh_C()
firm.refresh_P()
def _calculate_consumed_resources(self, firm, k):
"""
计算企业的资源消耗量。
"""
consumed_resources = {}
for industry in firm.indus_i:
consumed_quantity = sum(
product[1] * k
for product in firm.P
if product[0] == industry.unique_id
)
consumed_resources[industry.unique_id] = consumed_quantity
return consumed_resources
def _consume_resources(self, firm, consumed_resources):
"""
消耗企业的资源。
"""
for resource in firm.R:
resource_id, resource_quantity = resource[0], resource[1]
if resource_id in consumed_resources:
resource[1] = max(0, resource_quantity - consumed_resources[resource_id])
def _produce_products(self, firm, production_increase_ratio):
"""
生产企业的产品。
"""
for product in firm.P:
product[1] *= production_increase_ratio
def _process_firms_step(self):
"""
处理企业的状态更新,包括:
1. 刷新企业字典(清理前置步骤)。
2. 减少中断企业的规模。
3. 判断企业是否需要从中断状态转为移除状态。
4. 判断是否停止模拟。
"""
# 减少中断企业的规模
# 刷新企业字典
for firm in self.company_agents:
firm.clean_before_time_step()
for prod in firm.dct_prod_up_prod_stat.keys():
status, ts = firm.dct_prod_up_prod_stat[prod]['p_stat'][-1]
if status == 'D':
size = firm.size_stat[-1][0] - \
firm.size_stat[0][0] / len(firm.dct_prod_up_prod_stat.keys()) / self.remove_t
firm.size_stat.append((size, self.t))
lst_is_disrupt = [stat == 'D' for stat, _ in
firm.dct_prod_up_prod_stat[prod]['p_stat'][-self.remove_t:]]
if all(lst_is_disrupt):
# 转换中断企业为已移除企业
firm.dct_prod_up_prod_stat[prod]['p_stat'].append(('R', self.t))
# 判断是否需要停止模拟
if self.t > 0:
for firm in self.company_agents:
for prod in firm.dct_prod_up_prod_stat.keys():
status, _ = firm.dct_prod_up_prod_stat[prod]['p_stat'][-1]
is_init = firm in self.dct_lst_init_disrupt_firm_prod.keys() and prod in \
self.dct_lst_init_disrupt_firm_prod[firm]
if status == 'D' and not is_init:
break
else:
continue
break
else:
self.int_stop_ts = self.t
def end(self):
"""
结束模型运行并保存结果。
- 如果当前样本的结果未保存,则保存所有生产状态为非正常状态的结果。
- 更新样本状态为完成,并记录相关信息。
"""
# 检查当前样本结果是否已存在
if not db_session.query(Result).filter_by(s_id=self.sample.id).first():
# 生成需要保存的结果列表
lst_result_info = [
Result(
s_id=self.sample.id,
id_firm=firm.unique_id,
id_product=prod.unique_id,
ts=ts,
status=status
)
for firm in self.company_agents
for prod, dct_status_supply in firm.dct_prod_up_prod_stat.items()
if not all(stat == 'N' for stat, _ in dct_status_supply['p_stat'])
for status, ts in dct_status_supply['p_stat']
]
# 批量保存结果到数据库
if lst_result_info:
db_session.bulk_save_objects(lst_result_info)
db_session.commit()
# 更新样本状态为已完成
self.sample.is_done_flag = 1
self.sample.computer_name = platform.node()
self.sample.stop_t = self.int_stop_ts
db_session.commit()