上传文件至 ''

This commit is contained in:
SongQi 2022-08-29 10:39:29 +08:00
parent 5524b52632
commit 94474cbf91
4 changed files with 337 additions and 274 deletions

168
env.py
View File

@ -1,77 +1,91 @@
import agentpy as ap import agentpy as ap
from random import uniform from random import uniform
from worker import WorkerAgent from worker import WorkerAgent
from firm import FirmAgent from firm import FirmAgent
class Env(ap.Model): class Env(ap.Model):
float_market_size: float float_market_size: float
percent_rh: float percent_rh: float
percent_search: float percent_search: float
n_worker: int n_worker: int
n_firm: int n_firm: int
e_revenue: float
a_lst_worker: ap.AgentList
a_lst_firm: ap.AgentList a_lst_worker: ap.AgentList
a_lst_firm: ap.AgentList
def setup(self):
# 工作人员、企业数量、搜寻企业数量赋值 def setup(self):
self.n_worker = self.p.n_worker # 工作人员、企业数量、搜寻企业数量赋值
self.n_firm = self.p.n_firm self.n_worker = self.p.n_worker
self.percent_search = self.p.percent_search self.n_firm = self.p.n_firm
# 工人、企业列表 self.percent_search = self.p.percent_search
self.a_lst_worker = ap.AgentList(self) # 工人、企业列表
self.a_lst_firm = ap.AgentList(self) self.a_lst_worker = ap.AgentList(self)
self.a_lst_firm = ap.AgentList(self)
# 在工人列表中添加工人 self.e_revenue = 119.3
for i in range(self.n_worker):
# 初始化 workeragent并把alpha属性传过去 # 在工人列表中添加工人
w = WorkerAgent(self, self.p.alpha) for i in range(self.n_worker):
self.a_lst_worker.append(w) # 初始化 workeragent并把alpha属性传过去
w = WorkerAgent(self, self.p.alpha)
# 在企业列表中添加企业 self.a_lst_worker.append(w)
for i in range(self.n_firm):
f = FirmAgent(self) # 在企业列表中添加企业放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的
self.a_lst_firm.append(f) for i in range(self.n_firm):
# 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断
def step(self): f = FirmAgent(self, self.p.is_RH_ratio >= uniform(0, 1))
self.a_lst_worker.select_firm() self.a_lst_firm.append(f)
if self.t == 100: def update_e_revenue(self):
self.stop() self.e_revenue += 0.01 * self.e_revenue
pass
def step(self):
def provide_lst_random_firms(self, the_worker: WorkerAgent): self.update_e_revenue()
'''选择企业数量 = 企业总数*百分比 # 先清空每次的选择列表
选择企业的列表 = 随机选择的企业的个数 self.a_lst_firm.empty_apply()
如果员工处于被雇佣的状态 # 一开始worker要去选择很多firm
如果员工工作的企业在随机选定的企业列表中 self.a_lst_worker.select_firm()
打开列表中的企业 # 第二步, firm 去选 worker
移除该企业 self.a_lst_firm.select_worker()
返回值移除后再重新选择随机选择企业
否则 if self.t == 100:
返回值选择企业列表 self.stop()
''' pass
n_select_firms = int(self.percent_search * self.n_firm)
a_lst_select_firms = self.a_lst_firm.random(n_select_firms) def provide_lst_random_firms(self, the_worker: WorkerAgent):
if the_worker.s_is_hired: '''选择企业数量 = 企业总数*百分比
if the_worker.working_firm in a_lst_select_firms: 选择企业的列表 = 随机选择的企业的个数
lst_f = self.a_lst_firm.to_list() 如果员工处于被雇佣的状态
lst_f.remove(the_worker.working_firm) 如果员工工作的企业在随机选定的企业列表中
return ap.AgentList(self, lst_f).random(n_select_firms) 打开列表中的企业
else: 移除该企业
return ap.AgentList(self, a_lst_select_firms) 返回值移除后再重新选择随机选择企业
否则
返回值选择企业列表
if __name__ == '__main__': '''
dict_para = {'n_worker': 100, n_select_firms = int(self.percent_search * self.n_firm)
'n_firm': 10, a_lst_select_firms = self.a_lst_firm.random(n_select_firms)
'percent_search': 0.2, if the_worker.s_is_hired:
'alpha': 0.5} if the_worker.working_firm in a_lst_select_firms:
my_model = Env(dict_para) # 转换为 list
my_model.run() lst_f = list(self.a_lst_firm)
lst_f.remove(the_worker.working_firm)
return ap.AgentList(self, lst_f).random(n_select_firms)
# 假如以上都不满足, 直接返回
return ap.AgentList(self, a_lst_select_firms)
if __name__ == '__main__':
dict_para = {'n_worker': 100,
'n_firm': 20,
'percent_search': 0.2,
'alpha': 0.5,
'is_RH_ratio': 0.5}
my_model = Env(dict_para)
my_model.run()

243
firm.py
View File

@ -1,105 +1,138 @@
from typing import Union, Any import math
from typing import Union, Any
import agentpy as ap
from random import uniform, randint import agentpy as ap
from random import uniform, randint
class FirmAgent(ap.Agent):
c_incentive: float from typing import TYPE_CHECKING
s_IsRH: bool
s_a_senior_yield: float import numpy as np
s_a_junior_yield: float
s_a_yield: float if TYPE_CHECKING:
s_salary: float from worker import WorkerAgent
s_revenue: float
s_profit: float
l_senior_workers: list class FirmAgent(ap.Agent):
l_junior_workers: list c_incentive: float
l_applied_workers: list s_IsRH: bool
s_avg_senior_yield: float
def setup(self): s_avg_junior_yield: float
self.c_incentive = uniform(0, 1) s_a_yield: float
self.s_profit = randint(10, 20) # s_salary: float
self.l_applied_workers = [] initial_f_salary: float
s_revenue: float
def apply(self, the_worker): s_profit: float
self.l_applied_workers.append(the_worker) l_senior_workers: list
l_junior_workers: list
def select_worker(self): l_applied_workers: list
'''
企业找到最想招聘的员工 def setup(self, is_RH):
:return: self.c_incentive = uniform(0, 1)
''' # self.s_profit = randint(10, 20)
n_a_firms = len(self.l_applied_workers) self.l_applied_workers = []
# 对员工产出进行排队和比较 self.s_IsRH = is_RH
max_s_yield: float self.initial_f_salary = randint(8000, 10000)
max_s_yield, best_worker = 0, None
for i in self.l_applied_workers: def apply(self, the_worker):
y = max_s_yield self.l_applied_workers.append(the_worker)
if y > max_s_yield:
max_s_yield = y def empty_apply(self):
best_worker = i self.l_applied_workers = []
# print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}')
# return best_worker def select_worker(self):
# 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工 '''
'''if self.s_IsRH : 企业找到最想招聘的员工
# 计算该名员工的工资水平: 原有工资水平* 1+incentive :return:
bw_salary = self.best_worker.salary() '''
n_workers = len(self.l_applied_workers)
# 将该名员工的产出与公司原有员工的产出进行对比如果高于senior列表中的最后一名的产出就进入senior_list, 否则进入junior_list if n_workers > 0:
self.l_senior_workers.append(best_worker) selected_worker = None
else: if n_workers > 1:
self.l_junior_workers.append(best_worker) # 对员工产出进行排队和比较
# best_worker.apply(self) # 先判断是什么选择方式
else: if self.s_IsRH:
# 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值 max_s_yield, best_worker = 0.0, None
''' for the_worker in self.l_applied_workers:
if the_worker.s_yield > max_s_yield:
def S_a_Yield(self): max_s_yield = the_worker.s_yield
# 更新总产出 best_worker = the_worker
# s_total_senior_yield = self.s_a_senior_yield * len(self.l_senior_workers) selected_worker = best_worker
# s_total_junior_yield = self.s_a_junior_yield * len(self.l_senior_workers) else:
l_s_workers = self.l_senior_workers selected_worker = self.l_applied_workers[0] # TODO
l_j_workers = self.l_junior_workers # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}')
# s_a_yield = s_total_yield / len(self.l_senior_workers + self.l_junior_workers) # return best_worker
s_total_senior_yield, s_total_junior_yield = 0, 0 # 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工
'''for i in range(len(self.l_senior_workers)): # if self.s_IsRH :
# 加入工人主体的产出属性 # # 计算该名员工的工资水平: 原有工资水平* 1+incentive
s_total_senior_yield += '工人的yield' # # bw_salary = self.best_worker.salary()
for i in range (len(self.l_junior_workers)): #
s_total_junior_yield += '工人的yield' # # 将该名员工的产出与公司原有员工的产出进行对比如果高于senior列表中的最后一名的产出就进入senior_list, 否则进入junior_list
s_a_senior_yield = s_total_senior_yield / len(l_s_workers) # self.l_senior_workers.append(best_worker)
s_a_junior_yield = s_total_junior_yield / len(l_j_workers) # else:
s_a_yield = 0.8*s_a_senior_yield+0.2*s_a_junior_yield # self.l_junior_workers.append(best_worker)
return s_a_yield # # best_worker.apply(self)
''' # else:
return # # 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值
else:
def LogitShare(self): selected_worker = self.l_applied_workers[0]
''' selected_worker.update_working_firm_is_hired(self)
self.logitshare = exp(S_a_Yield)/ np.sum(exp(S_a_Yield)) self.update_two_worker_list(selected_worker)
:return:
''' def update_two_worker_list(self, new_worker):
return lst_all_worker = self.l_senior_workers + self.l_junior_workers + [new_worker]
lst_sorted = lst_all_worker.sort(key=lambda x: x['s_yield'], reverse=True) # from highest yield to lowest
def Sum_salary(self, l_senior_workers, l_junior_workers): n_all_worker = len(lst_all_worker)
''' # 向上取整数值
计算某公司整体的薪金水平 n_senior_worker = math.ceil(n_all_worker * 0.2)
''' n_junior_worker = n_all_worker - n_senior_worker
l_all_workers = l_senior_workers + l_junior_workers self.l_senior_workers = lst_sorted[:n_senior_worker]
for i in l_all_workers: if n_junior_worker == 0:
#self.sum_salary = np.sum() self.l_junior_workers = []
return else:
self.l_junior_workers = lst_sorted[n_senior_worker:]
def S_Profit(self):
# self.s_profit =self.LogitShare() * EnvironmentAgent.s_e_Irevenue - self.Sum_salary() def update_yields(self): # 需要改
return n_sw, n_jw = len(self.l_senior_workers), len(self.l_junior_workers)
# acc_all_yield = 0
def step(self):
''' if n_sw == 0:
更新三个集合 self.s_avg_senior_yield = 0
首先判断员工i在企业j第t时期的l_applied_workers列表里 else:
第二步判断企业j 的s_IsRH是0或者是1 s_acc_senior_yield = 0
如果是1则将应聘员工中s_w_yield最大的员工从l_applied_workers列表转移到l_senior_workers或l_junior_workers列表 for sw in self.l_senior_workers:
否则即如果是0则计算每名员工如果进入企业下一期企业利润的提升max的员工 # 加入工人主体的产出属性
从l_applied_workers列表转移到l_senior_workers或l_junior_workers列表 s_acc_senior_yield += sw.s_yield
''' # acc_all_yield += sw.s_yield
self.s_avg_senior_yield = s_acc_senior_yield / n_sw
if n_jw == 0:
self.s_avg_junior_yield = 0
else:
s_acc_junior_yield = 0
for jw in self.l_junior_workers:
# 加入工人主体的产出属性
s_acc_junior_yield += jw.s_yield
self.s_avg_junior_yield = s_acc_junior_yield / n_jw
self.s_a_yield = 0.8 * self.s_avg_senior_yield + 0.2 * self.s_avg_junior_yield
def logit_share(self):
logit_share = math.exp(self.s_a_yield) / np.sum(math.exp(self.s_a_yield))
return logit_share
def sum_salary(self, l_senior_workers, l_junior_workers):
'''
计算某公司整体的薪金水平
'''
l_all_workers = l_senior_workers + l_junior_workers
sum_salary = self.salary
for the_worker in l_all_workers:
sum_salary += the_worker.salary
return
def s_profit(self):
# ?
self.s_profit = self.logit_share() * self.p.e_revenue - self.sum_salary()
return
def step(self):
pass

32
main.py
View File

@ -1,16 +1,16 @@
# This is a sample Python script. # This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your code. # Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings. # Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
def print_hi(name): def print_hi(name):
# Use a breakpoint in the code line below to debug your script. # Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint. print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
# Press the green button in the gutter to run the script. # Press the green button in the gutter to run the script.
if __name__ == '__main__': if __name__ == '__main__':
print_hi('PyCharm') print_hi('PyCharm')
# See PyCharm help at https://www.jetbrains.com/help/pycharm/ # See PyCharm help at https://www.jetbrains.com/help/pycharm/

168
worker.py
View File

@ -1,77 +1,93 @@
import math import math
import agentpy as ap import agentpy as ap
from random import uniform, randint from random import uniform, randint
from firm import FirmAgent
# 编程可以自动补充一些东西,减少报错
from typing import TYPE_CHECKING
class WorkerAgent(ap.Agent): if TYPE_CHECKING:
select_firm: object from firm import FirmAgent
c_effort: float
s_is_hired: bool
c_work_months: int class WorkerAgent(ap.Agent):
s_work_duration: int select_firm: object
working_firm: FirmAgent c_effort: float
s_salary: float s_is_hired: bool
s_yield: float # c_work_months: int
# s_w_applied: bool s_work_duration: int
c_alpha: float working_firm: 'FirmAgent'
s_salary: float
def setup(self, alpha): s_yield: float
# super().__init__(unique_id, model) # s_w_applied: bool
# self.num_workers = 10000 c_alpha: float
self.c_effort = uniform(0, 1)
self.c_work_months = randint(0, 60) def setup(self, alpha):
self.s_is_hired = False # super().__init__(unique_id, model)
self.c_alpha = alpha # self.num_workers = 10000
# return self.c_effort = uniform(0, 1)
# self.c_work_months = randint(0, 60)
# def A_Utility(self, c_w_weight=0.5): self.s_is_hired = False
# a = np.exp(FirmAgent.c_f_incentive, c_w_weight) self.s_work_duration = randint(0, 60)
# b = np.exp(FirmAgent.s_f_profit/max(FirmAgent.s_f_profit), 1-c_w_weight) self.c_alpha = alpha
# self.select_firm = a * b self.update_yield()
self.s_salary = 0
def select_firm(self):
''' # return
挑选出来的企业列表
数量列表的长度 # def A_Utility(self, c_w_weight=0.5):
''' # a = np.exp(FirmAgent.c_f_incentive, c_w_weight)
lst_firms = self.model.provide_lst_random_firms(self) # b = np.exp(FirmAgent.s_f_profit/max(FirmAgent.s_f_profit), 1-c_w_weight)
n_firms = len(lst_firms) # self.select_firm = a * b
# find the max incentive and profit among all firms
max_incentive, max_profit = 0, 0 def select_firm(self):
for f in lst_firms: '''
if f.c_incentive > max_incentive: 挑选出来的企业列表
max_incentive = f.c_incentive 数量列表的长度
if f.s_profit > max_profit: '''
max_profit = f.s_profit lst_firms = self.model.provide_lst_random_firms(self)
# computer the utility for each firm n_firms = len(lst_firms)
max_utility, best_firm = 0, None # find the max incentive and profit among all firms
for f in lst_firms: max_incentive, max_profit = 0, 0
u = math.pow(f.c_incentive / max_incentive, self.c_alpha) * math.pow(f.s_profit/max_profit, 1-self.c_alpha) for f in lst_firms:
if u > max_utility: if f.c_incentive > max_incentive:
max_utility = u max_incentive = f.c_incentive
best_firm = f if f.s_profit > max_profit:
# print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') max_profit = f.s_profit
# 选出能够给自己带来最好的效用的企业,并输出/返回 # computer the utility for each firm
best_firm.apply(self) max_utility, best_firm = 0, None
return best_firm for f in lst_firms:
u = math.pow(f.c_incentive / max_incentive, self.c_alpha) * math.pow(f.s_profit/max_profit, 1-self.c_alpha)
def salary(self, the_firm: FirmAgent): if u > max_utility:
''' max_utility = u
如果员工首次受到雇佣薪资 = 某公司初始薪资 best_firm = f
如果员工更换了公司 薪资 = 原薪资 * 1+c_incentive # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}')
? 换公司 # 选出能够给自己带来最好的效用的企业,并输出/返回
:return: best_firm.apply(self)
''' return best_firm
pass
def update_wd_by_is_hired(self):
if self.s_is_hired == 1:
self.s_work_duration += 1
def step(self): self.update_yield()
if self.s_is_hired == 1: self.update_salary()
self.s_work_duration +=1
self.s_yield = 2 / (1 + math.exp(-0.01 * self.s_work_duration * self.c_effort)) - 1 def update_salary(self, the_firm: 'FirmAgent'):
''' if self.s_salary == 0:
是否更换公司成功的状态转换 self.s_salary = the_firm.initial_f_salary
pass
else:
self.s_salary = self.s_salary * (1 + the_firm.c_incentive)
pass
def update_yield(self):
self.s_yield = 2 / (1 + math.exp(-0.01 * self.s_work_duration * self.c_effort)) - 1
def update_working_firm_is_hired(self, f: 'FirmAgent'):
self.s_is_hired = True
self.working_firm = f
def step(self):
self.update_wd_by_is_hired()
'''
是否更换公司成功的状态转换
''' '''