mesa/my_model.py

587 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from random import shuffle
import platform
import networkx as nx
import pandas as pd
from mesa import Model
from mesa.space import MultiGrid, NetworkGrid
from mesa.datacollection import DataCollector
import numpy as np
from mesa_viz_tornado.modules import NetworkModule
from firm import FirmAgent
from orm import db_session, Result
from product import ProductAgent
from mesa.visualization import ModularServer
class MyModel(Model):
def __init__(self, params):
# 属性
self.is_prf_size = params['prf_size']
self.prf_conn = params['prf_conn']
self.cap_limit_prob_type = params['cap_limit_prob_type']
self.cap_limit_level = params['cap_limit_level']
self.diff_new_conn = params['diff_new_conn']
self.firm_network = nx.MultiDiGraph() # 有向多重图
self.firm_prod_network = nx.MultiDiGraph()
self.product_network = nx.MultiDiGraph() # 有向多重图
# NetworkGrid 用于管理网格
# NetworkX 图对象
self.t = 0
self.network_graph = nx.MultiDiGraph()
self.grid = NetworkGrid(self.network_graph)
self.data_collector = DataCollector(
agent_reporters={"Product": "name"}
)
# initialize graph bom
self.G_bom = nx.adjacency_graph(json.loads(params['g_bom']))
# Create the firm-product network graph
self.G_FirmProd = nx.MultiDiGraph()
# Create the firm network graph
self.G_Firm = nx.MultiDiGraph()
self.company_agents = []
self.product_agents = []
self.nprandom = np.random.default_rng(params['seed'])
# Initialize parameters from `params`
self.sample = params['sample']
self.int_stop_ts = 0
self.int_n_iter = int(params['n_iter'])
self.dct_lst_init_disrupt_firm_prod = params['dct_lst_init_disrupt_firm_prod']
# external variable
self.int_n_max_trial = int(params['n_max_trial'])
self.is_prf_size = bool(params['prf_size'])
self.remove_t = int(params['remove_t'])
self.int_netw_prf_n = int(params['netw_prf_n'])
# 方法执行
self.initialize_product_network(params)
self.resource_integration()
self.j_comp_consumed_produced()
self.initialize_agents()
self.initialize_firm_network()
self.initialize_firm_product_network()
self.add_edges_to_firm_network()
self.connect_unconnected_nodes()
self.initialize_disruptions()
def initialize_product_network(self, params):
try:
self.product_network = nx.adjacency_graph(json.loads(params['g_bom']))
except Exception as e:
print(f"Failed to initialize product network: {e}")
# 赋予 产业的量
# 产业种类
data = pd.read_csv('input_data/input_product_data/BomNodes.csv')
data['Code'] = data['Code'].astype('string')
self.type2 = data
self.id_code = data.groupby('Code')['Index'].apply(list)
# 设备c折旧比值
###
def initialize_firm_network(self):
# Read the firm data
firm = pd.read_csv("input_data/input_firm_data/firm_amended.csv")
firm['Code'] = firm['Code'].astype(str)
firm.fillna(0, inplace=True)
firm_attr = firm.loc[:, ["Code", "Type_Region", "Revenue_Log"]]
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype('string')
firm_product = []
grouped = firm_industry_relation.groupby('Firm_Code')['Product_Code'].apply(list)
firm_product.append(grouped)
firm_attr['Product_Code'] = firm_attr['Code'].map(grouped)
firm_attr.set_index('Code', inplace=True)
grouped = firm_industry_relation.groupby('Firm_Code')
self.firm_prod_labels_dict = {code: group['Product_Code'].tolist() for code, group in grouped}
# 遍历'Product_Code' 与 index 交换
for index, row in firm_attr.iterrows():
id_index_list = []
for i in row['Product_Code']:
for key_values in self.id_code.items():
if int(key_values[0]) == i:
for id in key_values[1]:
id_index_list.append(id)
firm_attr.at[index, 'Product_Code'] = id_index_list
self.G_Firm.add_nodes_from(firm["Code"])
# Assign attributes to the firm nodes
firm_labels_dict = {code: firm_attr.loc[code].to_dict() for code in self.G_Firm.nodes}
nx.set_node_attributes(self.G_Firm, firm_labels_dict)
self.Firm = firm
def initialize_firm_product_network(self):
firm_industry_relation = pd.read_csv("input_data/firm_industry_relation.csv")
firm_industry_relation['Firm_Code'] = firm_industry_relation['Firm_Code'].astype('string')
firm_industry_relation['Product_Code'] = firm_industry_relation['Product_Code'].apply(lambda x: [x])
# 将 'firm_prod' 表中的每一行作为图中的节点
self.G_FirmProd.add_nodes_from(firm_industry_relation.index)
# 为每个节点分配属性
# 遍历'Product_Code' 与 index 交换
for index, row in firm_industry_relation.iterrows():
id_index_list = []
for i in row['Product_Code']:
for key_values in self.id_code.items():
if int(key_values[0]) == i:
for id in key_values[1]:
id_index_list.append(id)
firm_industry_relation.at[index, 'Product_Code'] = id_index_list
firm_prod_labels_dict = {code: firm_industry_relation.loc[code].to_dict() for code in
firm_industry_relation.index}
nx.set_node_attributes(self.G_FirmProd, firm_prod_labels_dict)
def add_edges_to_firm_network(self):
""" Add edges between firms based on the product BOM relationships """
# Add edges to G_Firm according to G_bom
for node in nx.nodes(self.G_Firm):
lst_pred_product_code = []
for product_code in self.G_Firm.nodes[node]['Product_Code']:
lst_pred_product_code += list(self.G_bom.predecessors(product_code))
lst_pred_product_code = list(set(lst_pred_product_code))
lst_pred_product_code = list(sorted(lst_pred_product_code)) # Ensure consistency
for pred_product_code in lst_pred_product_code:
# Get a list of firms producing the component (pred_product_code)
lst_pred_firm = [firm_code for firm_code, product in self.firm_prod_labels_dict.items() if
pred_product_code in product]
# Select multiple suppliers (multi-sourcing)
n_pred_firm = self.int_netw_prf_n
if n_pred_firm > len(lst_pred_firm):
n_pred_firm = len(lst_pred_firm)
if self.is_prf_size:
# 获取 firm 的 size 列表
lst_pred_firm_size = [self.G_Firm.nodes[pred_firm]['Revenue_Log'] for pred_firm in lst_pred_firm]
# 检查 lst_pred_firm_size 是否为空或总和为 0
if len(lst_pred_firm_size) == 0 or sum(lst_pred_firm_size) == 0:
# print("警告: lst_pred_firm_size 为空或总和为 0无法生成概率分布")
lst_choose_firm = [] # 返回空结果,或根据需要处理
else:
# 计算总和
sum_pred_firm_size = sum(lst_pred_firm_size)
# 归一化生成 lst_prob
lst_prob = [size / sum_pred_firm_size for size in lst_pred_firm_size]
# 使用 np.isclose() 确保概率总和接近 1
if not np.isclose(sum(lst_prob), 1.0):
# print(f"警告: 概率总和为 {sum(lst_prob)},现在进行修正")
lst_prob = [prob / sum(lst_prob) for prob in lst_prob]
# 确保没有负值或 0
lst_prob = [max(0, prob) for prob in lst_prob]
# 根据修正后的概率选择 firm
lst_choose_firm = self.nprandom.choice(lst_pred_firm, n_pred_firm, replace=False, p=lst_prob)
else:
# 直接进行随机选择
lst_choose_firm = self.nprandom.choice(lst_pred_firm, n_pred_firm, replace=False)
# Add edges from predecessor firms to current node (firm)
lst_add_edge = [(pred_firm, node, {'Product': pred_product_code}) for pred_firm in lst_choose_firm]
self.G_Firm.add_edges_from(lst_add_edge)
# Add edges to firm-product network
self.add_edges_to_firm_product_network(node, pred_product_code, lst_choose_firm)
def add_edges_to_firm_product_network(self, node, pred_product_code, lst_choose_firm):
""" Helper function to add edges to the firm-product network """
set_node_prod_code = set(self.G_Firm.nodes[node]['Product_Code'])
set_pred_succ_code = set(self.G_bom.successors(pred_product_code))
lst_use_pred_prod_code = list(set_node_prod_code & set_pred_succ_code)
if len(lst_use_pred_prod_code) == 0:
print("错误")
pred_node_list = []
for pred_firm in lst_choose_firm:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == pred_product_code and v['Firm_Code'] == pred_firm:
pred_node_list.append(n)
if len(pred_node_list) != 0:
pred_node = pred_node_list[0]
else:
pred_node = -1
current_node_list = []
for use_pred_prod_code in lst_use_pred_prod_code:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == use_pred_prod_code and v['Firm_Code'] == node:
current_node_list.append(n)
if len(current_node_list) != 0:
current_node = current_node_list[0]
else:
current_node = -1
if current_node != -1 and pred_node != -1:
self.G_FirmProd.add_edge(pred_node, current_node)
def connect_unconnected_nodes(self):
""" Connect unconnected nodes in the firm network """
for node in nx.nodes(self.G_Firm):
if self.G_Firm.degree(node) == 0:
current_node_list = []
for product_code in self.G_Firm.nodes[node]['Product_Code']:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v['Firm_Code'] == node and v1 == product_code:
current_node_list.append(n)
if len(current_node_list) != 0:
current_node = current_node_list[0]
else:
current_node = -1
lst_succ_product_code = list(self.G_bom.successors(product_code))
for succ_product_code in lst_succ_product_code:
lst_succ_firm = [firm_code for firm_code, product in self.firm_prod_labels_dict.items() if
succ_product_code in product]
n_succ_firm = self.int_netw_prf_n
if n_succ_firm > len(lst_succ_firm):
n_succ_firm = len(lst_succ_firm)
if self.is_prf_size:
lst_succ_firm_size = [self.G_Firm.nodes[succ_firm]['Revenue_Log'] for succ_firm in
lst_succ_firm]
if len(lst_succ_firm_size) == 0 or sum(lst_succ_firm_size) == 0:
# print("警告: lst_pred_firm_size 为空或总和为 0无法生成概率分布")
lst_choose_firm = [] # 返回空结果,或根据需要处理
else:
# 计算总和
sum_pred_firm_size = sum(lst_succ_firm_size)
# 归一化生成 lst_prob
lst_prob = [size / sum_pred_firm_size for size in lst_succ_firm_size]
# 使用 np.isclose() 确保概率总和接近 1
if not np.isclose(sum(lst_prob), 1.0):
# print(f"警告: 概率总和为 {sum(lst_prob)},现在进行修正")
lst_prob = [prob / sum(lst_prob) for prob in lst_prob]
# 确保没有负值或 0
lst_prob = [max(0, prob) for prob in lst_prob]
lst_choose_firm = self.nprandom.choice(lst_succ_firm, n_succ_firm, replace=False,
p=lst_prob)
else:
lst_choose_firm = self.nprandom.choice(lst_succ_firm, n_succ_firm, replace=False)
lst_add_edge = [(node, succ_firm, {'Product': product_code}) for succ_firm in
lst_choose_firm]
self.G_Firm.add_edges_from(lst_add_edge)
# Add edges to firm-product network
succ_node_list = []
for succ_firm in lst_choose_firm:
for n, v in self.G_FirmProd.nodes(data=True):
for v1 in v['Product_Code']:
if v1 == succ_product_code and v['Firm_Code'] == succ_firm:
succ_node_list.append(n)
if len(succ_node_list) != 0:
succ_node = succ_node_list[0]
else:
succ_node = -1
if current_node != -1 and succ_node != -1:
self.G_FirmProd.add_edge(current_node, succ_node)
self.sample.g_firm = json.dumps(nx.adjacency_data(self.G_Firm))
self.firm_network = self.G_Firm # 直接使用 networkx 图对象
self.firm_prod_network = self.G_FirmProd # 直接使用 networkx 图对象
def initialize_agents(self):
""" Initialize agents and add them to the model. """
for ag_node, attr in self.product_network.nodes(data=True):
# 产业种类
# type2 = self.type2[self.type2["Index"] == ag_node]["产业种类"]
# depreciation ratio 折旧比值
# product_id = product_id.iloc[0]
product = ProductAgent(ag_node, self, name=attr['Name'], type2=0)
self.add_agent(product)
# self.grid.place_agent(product, ag_node)
##print(f"Product agent created: {product.name}, ID: {product.unique_id}")
for ag_node, attr in self.firm_network.nodes(data=True):
a_lst_product = [agent for agent in self.product_agents if agent.unique_id in attr['Product_Code']]
demand_quantity = self.data_consumed[self.data_consumed['Firm_Code'] == ag_node]
production_output = self.data_produced[self.data_consumed['Firm_Code'] == ag_node]
# c购买价格 数据预处理
# c_price = self.Firm.loc[self.Firm['Code'] == ag_node, 'c_price'].values[0]
# 资源 资源库存信息 利用 firm_resource
R = self.firm_resource_R.loc[int(ag_node)]
P = self.firm_resource_P.loc[int(ag_node)]
C = self.firm_resource_C.loc[int(ag_node)]
firm_agent = FirmAgent(
ag_node, self,
type_region=attr['Type_Region'],
revenue_log=attr['Revenue_Log'],
a_lst_product=a_lst_product,
demand_quantity=demand_quantity,
production_output=production_output,
# c_price=c_price,
R=R,
P=P,
C=C
)
self.add_agent(firm_agent)
##print(f"Firm agent created: {firm_agent.unique_id}, Products: {[p.name for p in a_lst_product]}")
# self.grid.place_agent(firm_agent, ag_node)
def initialize_disruptions(self):
# 初始化一部字典,用于存储每个公司及其对应的受干扰产品列表
t_dct = {}
# 遍历初始公司-产品干扰数据,将其转化为基于公司和产品的映射
for firm_code, lst_product in self.dct_lst_init_disrupt_firm_prod.items():
# 从 company_agents 列表中选择指定公司
firms = [firm for firm in self.company_agents if firm.unique_id == firm_code]
firm = firms[0] if firms else None
# 从总产品列表中选择该公司受干扰的产品
disrupted_products = [product for product in self.product_agents if product.unique_id in lst_product]
# 将公司与其受干扰的产品映射到字典中
if firm is not None:
t_dct[firm] = disrupted_products
# 更新 self.dct_lst_init_disrupt_firm_prod 字典,存储公司及其受干扰的产品
self.dct_lst_init_disrupt_firm_prod = t_dct
# 设置初始受干扰的公司产品状态
for firm, a_lst_product in self.dct_lst_init_disrupt_firm_prod.items():
for product in a_lst_product:
# 确保产品存在于公司的生产状态字典中
assert product in firm.dct_prod_up_prod_stat.keys(), \
f"Product {product.code} not in firm {firm.code}"
# 将产品状态更新为干扰状态,并记录干扰时间
firm.dct_prod_up_prod_stat[product]['p_stat'].append(('D', self.t))
def add_agent(self, agent):
if isinstance(agent, FirmAgent):
self.company_agents.append(agent)
elif isinstance(agent, ProductAgent):
self.product_agents.append(agent)
def resource_integration(self):
data_R = pd.read_csv("input_data/input_firm_data/firms_materials.csv")
data_C = pd.read_csv("input_data/input_firm_data/firms_devices.csv")
data_P = pd.read_csv("input_data/input_firm_data/firms_products.csv")
device_salvage_values = pd.read_csv('input_data/device_salvage_values.csv')
self.device_salvage_values = device_salvage_values
data_merged_C = pd.merge(data_C, device_salvage_values, on='设备id', how='left')
firm_resource_R = (data_R.groupby('Firm_Code')[['材料id', '材料数量']]
.apply(lambda x: x.values.tolist()))
firm_resource_C = (data_merged_C.groupby('Firm_Code')[['设备id', '设备数量', '设备残值']]
.apply(lambda x: x.values.tolist()))
firm_resource_P = (data_P.groupby('Firm_Code')[['产品id', '产品数量']]
.apply(lambda x: x.values.tolist()))
self.firm_resource_R = firm_resource_R
self.firm_resource_C = firm_resource_C
self.firm_resource_P = firm_resource_P
def j_comp_consumed_produced(self):
# 着重修改这 然后考虑逻辑 如何传递值
data_consumed = pd.read_csv('input_data/input_firm_data/firms_materials.csv')
data_produced = pd.read_csv('input_data/input_firm_data/firms_products.csv')
data_not_consumed = data_consumed.groupby('Firm_Code')[['材料id', '材料数量']] \
.apply(lambda x: dict(zip(x['材料id'], x['材料数量']))) \
.reset_index(name='Material_not_Consumed')
# 这里简单设置为折半 考虑 企业的设备量
# 可以引入 换算率 也就是 材料——计算产品比例 通过上游产业 现在假设为 2:1 的比例
data_consumed0 = data_consumed.groupby('Firm_Code')[['材料id', '材料数量']] \
.apply(lambda x: dict(zip(x['材料id'], x['材料数量'] / 2))) \
.reset_index(name='Material_not_Consumed')
data_produced = data_consumed.groupby('Firm_Code')[['材料id', '材料数量']] \
.apply(lambda x: dict(zip(x['材料id'], x['材料数量'] / 4))) \
.reset_index(name='Material_not_Consumed')
self.data_consumed = data_consumed0
self.data_produced = data_produced
def step(self):
# 1. Remove edge to customer and disrupt customer up product
for firm in self.company_agents:
for prod in firm.dct_prod_up_prod_stat.keys():
status, ts = firm.dct_prod_up_prod_stat[prod]['p_stat'][-1]
if status == 'D' and ts == self.t - 1:
firm.remove_edge_to_cus(prod)
for firm in self.company_agents:
for prod in firm.dct_prod_up_prod_stat.keys():
for up_prod in firm.dct_prod_up_prod_stat[prod]['s_stat'].keys():
if firm.dct_prod_up_prod_stat[prod]['s_stat'][up_prod]['set_disrupt_firm']:
firm.disrupt_cus_prod(prod, up_prod)
# 2. Trial Process
for n_trial in range(self.int_n_max_trial):
shuffle(self.company_agents) # 手动打乱代理顺序
is_stop_trial = True
for firm in self.company_agents:
lst_seek_prod = []
for prod in firm.dct_prod_up_prod_stat.keys():
status = firm.dct_prod_up_prod_stat[prod]['p_stat'][-1][0]
if status == 'D':
for supply in firm.dct_prod_up_prod_stat[prod]['s_stat'].keys():
if not firm.dct_prod_up_prod_stat[prod]['s_stat'][supply]['stat']:
lst_seek_prod.append(supply)
lst_seek_prod = list(set(lst_seek_prod))
if len(lst_seek_prod) > 0:
is_stop_trial = False
for supply in lst_seek_prod:
firm.seek_alt_supply(supply)
if is_stop_trial:
break
# Handle requests
shuffle(self.company_agents) # 手动打乱代理顺序
for firm in self.company_agents:
if len(firm.dct_request_prod_from_firm) > 0:
firm.handle_request()
# Reset dct_request_prod_from_firm
for firm in self.company_agents:
firm.clean_before_trial()
# 3. 判断是否需要购买资源 判断是否需要购买机器
purchase_material_firms = {}
purchase_machinery_firms = {}
material_list = []
machinery_list = []
list_seek_material_firm = [] # 每一个收到请求的企业
list_seek_machinery_firm = [] # 每一个收到请求的企业
for firm in self.company_agents:
# 资源
for sub_list in firm.R:
if sub_list[1] <= firm.s_r:
required_material_quantity = firm.S_r - sub_list[1]
(material_list.append([sub_list[0], required_material_quantity]))
purchase_material_firms[firm] = material_list
# 设备
for sub_list in firm.C:
# 对于设备的required_machinery_quantity 要有所改变 根据残值而言! 每一个周期固定减少残值值 x firm 里面定义
sub_list[2] -= firm.x
if sub_list[2] <= 0: # 残值小于等于 0 时
sub_list[1] -= 1
required_machinery_quantity = 1 # 补回原来的量 也就是 1
(machinery_list
.append([sub_list[0], required_machinery_quantity]))
purchase_machinery_firms[firm] = machinery_list
# 寻源并发送请求 决定是否接受供应 并更新
for material_firm_key, sub_list_values in purchase_material_firms.items():
for mater_list in sub_list_values:
result = material_firm_key.seek_material_supply(mater_list[0])
# 如果 result 不等于 -1才将其添加到 list_seek_material_firm 列表中
if result != -1:
list_seek_material_firm.append(result)
if len(list_seek_material_firm) != 0:
for seek_material_firm in list_seek_material_firm:
seek_material_firm.handle_material_request(mater_list) # 更新产品
for R_list in firm.R:
R_list[1] = firm.S_r
for machinery_firm, sub_list in purchase_machinery_firms.items():
for machi_list in sub_list:
# 执行一次调用 machinery_firm.seek_machinery_supply(machinery_list[0])
result = machinery_firm.seek_machinery_supply(machi_list[0])
# 如果 result 不等于 -1才将其添加到 list_seek_machinery_firm 列表中
if result != -1:
list_seek_machinery_firm.append(result)
if len(list_seek_machinery_firm) != 0:
for seek_machinery_firm in list_seek_machinery_firm:
seek_machinery_firm.handle_machinery_request(machi_list)
for C_list, C0_list in zip(firm.C, firm.C0):
C_list[1] = C0_list[1] # 赋值回去
C_list[2] = C0_list[2]
# 消耗资源过程
# 这里需要修改
for r_id, r_nums in firm.R.items():
for consumed_id, consumed_nums in firm.c_consumption:
if consumed_id == r_id:
r_nums = r_nums - consumed_nums
# 生产产品过程
for p_id, p_nums in firm.P.items():
for product_id, product_nums in firm.c_consumption:
if product_id == p_id:
p_nums = p_nums + product_nums
firm.refresh_R()
# 刷新 C状态
firm.refresh_C()
# 刷新 P状态
firm.refresh_P()
# Increment the time step
self.t += 1
def end(self):
# print('/' * 20, 'output', '/' * 20)
qry_result = db_session.query(Result).filter_by(s_id=self.sample.id)
if qry_result.count() == 0:
lst_result_info = []
for firm in self.company_agents:
for prod, dct_status_supply in \
firm.dct_prod_up_prod_stat.items():
lst_is_normal = [stat == 'N' for stat, _
in dct_status_supply['p_stat']]
if not all(lst_is_normal):
# print(f"{firm.name} {prod.code}:")
# print(dct_status_supply['p_stat'])
for status, ts in dct_status_supply['p_stat']:
db_r = Result(s_id=self.sample.id,
id_firm=firm.unique_id,
id_product=prod.unique_id,
ts=ts,
status=status)
lst_result_info.append(db_r)
db_session.bulk_save_objects(lst_result_info)
db_session.commit()
self.sample.is_done_flag = 1
self.sample.computer_name = platform.node()
self.sample.stop_t = self.int_stop_ts
db_session.commit()