model firm product

This commit is contained in:
HaoYizhi 2023-02-27 22:02:46 +08:00
parent f8d91df954
commit c29a75177c
7 changed files with 243 additions and 35 deletions

16
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "C:\\Users\\25759\\OneDrive\\Project\\ScrAbm\\Dissertation\\IIabm\\model.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

Binary file not shown.

Binary file not shown.

106
firm.py
View File

@ -2,21 +2,107 @@ import agentpy as ap
class FirmAgent(ap.Agent):
def setup(self, code, name, type_region, revenue_log, list_product,
capacity):
def setup(self, code, name, type_region, revenue_log, a_list_product):
self.firm_network = self.model.firm_network
self.product_network = self.model.product_network
self.code = code
self.name = name
self.type_region = type_region
self.revenue_log = revenue_log
self.list_product = list_product
self.capacity = capacity
self.a_list_product = a_list_product
self.dct_prod_capacity = dict.fromkeys(self.a_list_product)
self.dct_product_is_disrupted = dict.fromkeys(list_product, False)
self.dct_product_is_removed = dict.fromkeys(list_product, False)
self.a_list_up_product_removed = ap.AgentList(self.model, [])
self.a_list_product_disrupted = ap.AgentList(self.model, [])
self.a_list_product_removed = ap.AgentList(self.model, [])
def remove_edge_to_customer_if_removed(self, remove_product):
t = self.firm_network.graph.out_edges(
self.firm_network.positions[self], keys=True, data=True)
print(t)
self.dct_num_trial_up_product_removed = {}
self.dct_request_prod_from_firm = {}
def remove_edge_to_cus_and_cus_up_prod(self, remove_product):
list_out_edges = list(
self.firm_network.graph.out_edges(
self.firm_network.positions[self], keys=True, data='Product'))
for n1, n2, key, product_code in list_out_edges:
if product_code == remove_product.code:
# remove edge
# print(n1, n2, key, product_code)
self.firm_network.graph.remove_edge(n1, n2, key)
# remove customer up product
customer = ap.AgentIter(self.model, n2).to_list()[0]
if remove_product not in customer.a_list_up_product_removed:
customer.a_list_up_product_removed.append(remove_product)
customer.dct_num_trial_up_product_removed[
remove_product] = 0
# # disrupt customer
# customer = ap.AgentIter(self.model, n2).to_list()[0]
# for product in customer.a_list_product:
# if product in remove_product.a_successors():
# if product not in customer.a_list_product_disrupted:
# customer.a_list_product_disrupted.append(
# product)
# print(customer.a_list_product_disrupted.code)
def seek_alt_supply(self):
print(self.name, 'seek_alt_supply')
for product in self.a_list_up_product_removed:
if self.dct_num_trial_up_product_removed[
product] <= self.model.int_n_max_trial:
candidate_alt_supply = self.model.a_list_total_firms.select([
product in firm.a_list_product
for firm in self.model.a_list_total_firms
])
# print(candidate_alt_supply)
# print(candidate_alt_supply.name)
# print(candidate_alt_supply.a_list_product.code)
# select based on size
list_prob = [
size / sum(candidate_alt_supply.revenue_log)
for size in candidate_alt_supply.revenue_log
]
select_alt_supply = self.model.nprandom.choice(
candidate_alt_supply, p=list_prob)
print('select_alt_supply', select_alt_supply.name)
assert product in select_alt_supply.a_list_product, \
f"{select_alt_supply} \
does not produce requested product {product}"
if product in select_alt_supply.dct_request_prod_from_firm.\
keys():
select_alt_supply.dct_request_prod_from_firm[
product].append(self)
else:
select_alt_supply.dct_request_prod_from_firm[product] = [
self
]
print({
key.code: [v.name for v in value]
for key, value in
select_alt_supply.dct_request_prod_from_firm.items()
})
self.dct_num_trial_up_product_removed[product] += 1
def handle_request(self):
print(self.name, 'handle_request')
for product, list_firm in self.dct_request_prod_from_firm.items():
# if self.dct_prod_capacity[product] > 0:
# if len(list_firm) == 1:
# self.accept_request(list_firm[0], product)
print(product.code, [firm.name for firm in list_firm])
def accept_request(self, down_firm, product):
self.firm_network.graph.add_edges_from([
(self.firm_network.positions[self],
self.firm_network.positions[down_firm], {
'Product': product.code
})
])
self.dct_prod_capacity[product] -= 1
self.dct_request_prod_from_firm[product].remove(down_firm)
down_firm.a_list_up_product_removed.remove(product)
def clean_before_trial(self):
self.dct_request_prod_from_firm = {}

102
model.py
View File

@ -3,16 +3,19 @@ import pandas as pd
import numpy as np
import networkx as nx
from firm import FirmAgent
from product import ProductAgent
sample = 0
seed = 0
n_iter = 3
dct_list_init_remove_firm_prod = {0: ['1.4.4'], 2: ['1.1.3']}
n_max_trial = 2
dct_sample_para = {
'sample': sample,
'seed': seed,
'n_iter': n_iter,
'dct_list_init_remove_firm_prod': dct_list_init_remove_firm_prod
'n_max_trial': n_max_trial,
'dct_list_init_remove_firm_prod': dct_list_init_remove_firm_prod,
}
@ -20,8 +23,9 @@ class Model(ap.Model):
def setup(self):
self.sample = self.p.sample
self.nprandom = np.random.default_rng(self.p.seed)
self.dct_list_remove_firm_prod = self.p.dct_list_init_remove_firm_prod
self.int_n_iter = int(self.p.n_iter)
self.int_n_max_trial = int(self.p.n_max_trial)
self.dct_list_remove_firm_prod = self.p.dct_list_init_remove_firm_prod
# init graph bom
BomNodes = pd.read_csv('BomNodes.csv', index_col=0)
@ -85,6 +89,7 @@ class Model(ap.Model):
# print('-' * 20)
self.firm_network = ap.Network(self, G_Firm)
self.product_network = ap.Network(self, G_bom)
# print([node.label for node in self.firm_network.nodes])
# print([list(self.firm_network.graph.predecessors(node))
# for node in self.firm_network.nodes])
@ -92,6 +97,15 @@ class Model(ap.Model):
# for node in self.firm_network.nodes])
# print([v for v in self.firm_network.graph.nodes(data=True)])
# init product
for ag_node, attr in self.product_network.graph.nodes(data=True):
product_agent = ProductAgent(self,
code=ag_node.label,
name=attr['Name'])
self.product_network.add_agents([product_agent], [ag_node])
self.a_list_total_products = ap.AgentList(self,
self.product_network.agents)
# init firm
for ag_node, attr in self.firm_network.graph.nodes(data=True):
firm_agent = FirmAgent(
@ -100,38 +114,56 @@ class Model(ap.Model):
name=attr['Name'],
type_region=attr['Type_Region'],
revenue_log=attr['Revenue_Log'],
list_product=attr['Product_Code'],
# init capacity as the degree of out edges
capacity=self.firm_network.graph.out_degree(ag_node))
a_list_product=self.a_list_total_products.select([
code in attr['Product_Code']
for code in self.a_list_total_products.code
]))
# init capacity as the degree of out edges of a specific product
list_out_edges = list(
self.firm_network.graph.out_edges(ag_node,
keys=True,
data='Product'))
for product in firm_agent.a_list_product:
capacity = len([
edge for edge in list_out_edges if edge[-1] == product.code
])
firm_agent.dct_prod_capacity[product] = capacity
# print(firm_agent.name, firm_agent.dct_prod_capacity)
self.firm_network.add_agents([firm_agent], [ag_node])
self.a_list_total_firms = ap.AgentList(self, self.firm_network.agents)
# print(list(zip(self.a_list_total_firms.code,
# self.a_list_total_firms.name,
# self.a_list_total_firms.capacity)))
# set the initial firm product that are removed
# init dct_list_remove_firm_prod (from string to agent)
t_dct = {}
for firm_code, list_product in self.dct_list_remove_firm_prod.items():
firm = self.a_list_total_firms.select(
self.a_list_total_firms.code == firm_code)[0]
for product in list_product:
assert product in firm.list_product, \
f"product {product} not in firm {firm_code}"
firm.dct_product_is_removed[product] = True
t_dct[firm] = self.a_list_total_products.select([
code in list_product
for code in self.a_list_total_products.code
])
self.dct_list_remove_firm_prod = t_dct
# set the initial firm product that are removed
for firm, a_list_product in self.dct_list_remove_firm_prod.items():
for product in a_list_product:
assert product in firm.a_list_product, \
f"product {product.code} not in firm {firm.code}"
firm.a_list_product_removed.append(product)
def update(self):
# Update the firm that is removed
# update the firm that is removed
self.dct_list_remove_firm_prod = {}
for firm in self.a_list_total_firms:
for product, flag in firm.dct_product_is_removed.items():
if flag is True:
if firm.code in self.dct_list_remove_firm_prod.keys():
self.dct_list_remove_firm_prod[firm.code].append(
product)
else:
self.dct_list_remove_firm_prod[firm.code] = [product]
if len(firm.a_list_product_removed) > 0:
self.dct_list_remove_firm_prod[
firm] = firm.a_list_product_removed
# print(self.dct_list_remove_firm_prod)
# Stop simulation if reached terminal number of iteration
# stop simulation if reached terminal number of iteration
if self.t == self.int_n_iter or len(
self.dct_list_remove_firm_prod) == 0:
self.stop()
@ -141,14 +173,34 @@ class Model(ap.Model):
dct_key_list = list(self.dct_list_remove_firm_prod.keys())
self.nprandom.shuffle(dct_key_list)
self.dct_list_remove_firm_prod = {
key: self.dct_list_remove_firm_prod[key]
key: self.dct_list_remove_firm_prod[key].shuffle()
for key in dct_key_list
}
for firm_code, list_product in self.dct_list_remove_firm_prod.items():
firm = self.a_list_total_firms.select(
self.a_list_total_firms.code == firm_code)[0]
for product in list_product:
firm.remove_edge_to_customer_if_removed(product)
# print(self.dct_list_remove_firm_prod)
# remove_edge_to_cus_and_cus_up_prod
for firm, a_list_product in self.dct_list_remove_firm_prod.items():
for product in a_list_product:
firm.remove_edge_to_cus_and_cus_up_prod(product)
for n_trial in range(self.int_n_max_trial):
print('='*20, n_trial, '='*20)
# seek_alt_supply
for firm in self.a_list_total_firms:
if len(firm.a_list_up_product_removed) > 0:
# print(firm.name)
# print(firm.a_list_up_product_removed.code)
firm.seek_alt_supply()
# handle_request
for firm in self.a_list_total_firms:
if len(firm.dct_request_prod_from_firm) > 0:
firm.handle_request()
# reset dct_request_prod_from_firm
self.a_list_total_firms.clean_before_trial()
# do not use:
# self.a_list_total_firms.dct_request_prod_from_firm = {} why?
def end(self):
pass

16
product.py Normal file
View File

@ -0,0 +1,16 @@
import agentpy as ap
class ProductAgent(ap.Agent):
def setup(self, code, name):
self.product_network = self.model.product_network
self.code = code
self.name = name
def a_successors(self):
nodes = self.product_network.graph.successors(
self.product_network.positions[self])
return ap.AgentList(
self.model,
[ap.AgentIter(self.model, node).to_list()[0] for node in nodes])

38
test.ipynb Normal file
View File

@ -0,0 +1,38 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "bcdafc093860683ffb58d6956591562b7f8ed5d58147d17d71a5d4d6605a08df"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}