上传文件至 '0829'

This commit is contained in:
SongQi 2022-08-29 10:34:57 +08:00
parent 6baaa8ecba
commit 5524b52632
4 changed files with 338 additions and 0 deletions

91
0829/env.py Normal file
View File

@ -0,0 +1,91 @@
import agentpy as ap
from random import uniform
from worker import WorkerAgent
from firm import FirmAgent
class Env(ap.Model):
float_market_size: float
percent_rh: float
percent_search: float
n_worker: int
n_firm: int
e_revenue: float
a_lst_worker: ap.AgentList
a_lst_firm: ap.AgentList
def setup(self):
# 工作人员、企业数量、搜寻企业数量赋值
self.n_worker = self.p.n_worker
self.n_firm = self.p.n_firm
self.percent_search = self.p.percent_search
# 工人、企业列表
self.a_lst_worker = ap.AgentList(self)
self.a_lst_firm = ap.AgentList(self)
self.e_revenue = 119.3
# 在工人列表中添加工人
for i in range(self.n_worker):
# 初始化 workeragent并把alpha属性传过去
w = WorkerAgent(self, self.p.alpha)
self.a_lst_worker.append(w)
# 在企业列表中添加企业放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的
for i in range(self.n_firm):
# 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断
f = FirmAgent(self, self.p.is_RH_ratio >= uniform(0, 1))
self.a_lst_firm.append(f)
def update_e_revenue(self):
self.e_revenue += 0.01 * self.e_revenue
def step(self):
self.update_e_revenue()
# 先清空每次的选择列表
self.a_lst_firm.empty_apply()
# 一开始worker要去选择很多firm
self.a_lst_worker.select_firm()
# 第二步, firm 去选 worker
self.a_lst_firm.select_worker()
if self.t == 100:
self.stop()
pass
def provide_lst_random_firms(self, the_worker: WorkerAgent):
'''选择企业数量 = 企业总数*百分比
选择企业的列表 = 随机选择的企业的个数
如果员工处于被雇佣的状态
如果员工工作的企业在随机选定的企业列表中
打开列表中的企业
移除该企业
返回值移除后再重新选择随机选择企业
否则
返回值选择企业列表
'''
n_select_firms = int(self.percent_search * self.n_firm)
a_lst_select_firms = self.a_lst_firm.random(n_select_firms)
if the_worker.s_is_hired:
if the_worker.working_firm in a_lst_select_firms:
# 转换为 list
lst_f = list(self.a_lst_firm)
lst_f.remove(the_worker.working_firm)
return ap.AgentList(self, lst_f).random(n_select_firms)
# 假如以上都不满足, 直接返回
return ap.AgentList(self, a_lst_select_firms)
if __name__ == '__main__':
dict_para = {'n_worker': 100,
'n_firm': 20,
'percent_search': 0.2,
'alpha': 0.5,
'is_RH_ratio': 0.5}
my_model = Env(dict_para)
my_model.run()

138
0829/firm.py Normal file
View File

@ -0,0 +1,138 @@
import math
from typing import Union, Any
import agentpy as ap
from random import uniform, randint
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from worker import WorkerAgent
class FirmAgent(ap.Agent):
c_incentive: float
s_IsRH: bool
s_avg_senior_yield: float
s_avg_junior_yield: float
s_a_yield: float
# s_salary: float
initial_f_salary: float
s_revenue: float
s_profit: float
l_senior_workers: list
l_junior_workers: list
l_applied_workers: list
def setup(self, is_RH):
self.c_incentive = uniform(0, 1)
# self.s_profit = randint(10, 20)
self.l_applied_workers = []
self.s_IsRH = is_RH
self.initial_f_salary = randint(8000, 10000)
def apply(self, the_worker):
self.l_applied_workers.append(the_worker)
def empty_apply(self):
self.l_applied_workers = []
def select_worker(self):
'''
企业找到最想招聘的员工
:return:
'''
n_workers = len(self.l_applied_workers)
if n_workers > 0:
selected_worker = None
if n_workers > 1:
# 对员工产出进行排队和比较
# 先判断是什么选择方式
if self.s_IsRH:
max_s_yield, best_worker = 0.0, None
for the_worker in self.l_applied_workers:
if the_worker.s_yield > max_s_yield:
max_s_yield = the_worker.s_yield
best_worker = the_worker
selected_worker = best_worker
else:
selected_worker = self.l_applied_workers[0] # TODO
# print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}')
# return best_worker
# 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工
# if self.s_IsRH :
# # 计算该名员工的工资水平: 原有工资水平* 1+incentive
# # bw_salary = self.best_worker.salary()
#
# # 将该名员工的产出与公司原有员工的产出进行对比如果高于senior列表中的最后一名的产出就进入senior_list, 否则进入junior_list
# self.l_senior_workers.append(best_worker)
# else:
# self.l_junior_workers.append(best_worker)
# # best_worker.apply(self)
# else:
# # 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值
else:
selected_worker = self.l_applied_workers[0]
selected_worker.update_working_firm_is_hired(self)
self.update_two_worker_list(selected_worker)
def update_two_worker_list(self, new_worker):
lst_all_worker = self.l_senior_workers + self.l_junior_workers + [new_worker]
lst_sorted = lst_all_worker.sort(key=lambda x: x['s_yield'], reverse=True) # from highest yield to lowest
n_all_worker = len(lst_all_worker)
# 向上取整数值
n_senior_worker = math.ceil(n_all_worker * 0.2)
n_junior_worker = n_all_worker - n_senior_worker
self.l_senior_workers = lst_sorted[:n_senior_worker]
if n_junior_worker == 0:
self.l_junior_workers = []
else:
self.l_junior_workers = lst_sorted[n_senior_worker:]
def update_yields(self): # 需要改
n_sw, n_jw = len(self.l_senior_workers), len(self.l_junior_workers)
# acc_all_yield = 0
if n_sw == 0:
self.s_avg_senior_yield = 0
else:
s_acc_senior_yield = 0
for sw in self.l_senior_workers:
# 加入工人主体的产出属性
s_acc_senior_yield += sw.s_yield
# acc_all_yield += sw.s_yield
self.s_avg_senior_yield = s_acc_senior_yield / n_sw
if n_jw == 0:
self.s_avg_junior_yield = 0
else:
s_acc_junior_yield = 0
for jw in self.l_junior_workers:
# 加入工人主体的产出属性
s_acc_junior_yield += jw.s_yield
self.s_avg_junior_yield = s_acc_junior_yield / n_jw
self.s_a_yield = 0.8 * self.s_avg_senior_yield + 0.2 * self.s_avg_junior_yield
def logit_share(self):
logit_share = math.exp(self.s_a_yield) / np.sum(math.exp(self.s_a_yield))
return logit_share
def sum_salary(self, l_senior_workers, l_junior_workers):
'''
计算某公司整体的薪金水平
'''
l_all_workers = l_senior_workers + l_junior_workers
sum_salary = self.salary
for the_worker in l_all_workers:
sum_salary += the_worker.salary
return
def s_profit(self):
# ?
self.s_profit = self.logit_share() * self.p.e_revenue - self.sum_salary()
return
def step(self):
pass

16
0829/main.py Normal file
View File

@ -0,0 +1,16 @@
# This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('PyCharm')
# See PyCharm help at https://www.jetbrains.com/help/pycharm/

93
0829/worker.py Normal file
View File

@ -0,0 +1,93 @@
import math
import agentpy as ap
from random import uniform, randint
# 编程可以自动补充一些东西,减少报错
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from firm import FirmAgent
class WorkerAgent(ap.Agent):
select_firm: object
c_effort: float
s_is_hired: bool
# c_work_months: int
s_work_duration: int
working_firm: 'FirmAgent'
s_salary: float
s_yield: float
# s_w_applied: bool
c_alpha: float
def setup(self, alpha):
# super().__init__(unique_id, model)
# self.num_workers = 10000
self.c_effort = uniform(0, 1)
# self.c_work_months = randint(0, 60)
self.s_is_hired = False
self.s_work_duration = randint(0, 60)
self.c_alpha = alpha
self.update_yield()
self.s_salary = 0
# return
# def A_Utility(self, c_w_weight=0.5):
# a = np.exp(FirmAgent.c_f_incentive, c_w_weight)
# b = np.exp(FirmAgent.s_f_profit/max(FirmAgent.s_f_profit), 1-c_w_weight)
# self.select_firm = a * b
def select_firm(self):
'''
挑选出来的企业列表
数量列表的长度
'''
lst_firms = self.model.provide_lst_random_firms(self)
n_firms = len(lst_firms)
# find the max incentive and profit among all firms
max_incentive, max_profit = 0, 0
for f in lst_firms:
if f.c_incentive > max_incentive:
max_incentive = f.c_incentive
if f.s_profit > max_profit:
max_profit = f.s_profit
# computer the utility for each firm
max_utility, best_firm = 0, None
for f in lst_firms:
u = math.pow(f.c_incentive / max_incentive, self.c_alpha) * math.pow(f.s_profit/max_profit, 1-self.c_alpha)
if u > max_utility:
max_utility = u
best_firm = f
# print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}')
# 选出能够给自己带来最好的效用的企业,并输出/返回
best_firm.apply(self)
return best_firm
def update_wd_by_is_hired(self):
if self.s_is_hired == 1:
self.s_work_duration += 1
self.update_yield()
self.update_salary()
def update_salary(self, the_firm: 'FirmAgent'):
if self.s_salary == 0:
self.s_salary = the_firm.initial_f_salary
pass
else:
self.s_salary = self.s_salary * (1 + the_firm.c_incentive)
pass
def update_yield(self):
self.s_yield = 2 / (1 + math.exp(-0.01 * self.s_work_duration * self.c_effort)) - 1
def update_working_firm_is_hired(self, f: 'FirmAgent'):
self.s_is_hired = True
self.working_firm = f
def step(self):
self.update_wd_by_is_hired()
'''
是否更换公司成功的状态转换
'''