add GeneticAlgorithm

This commit is contained in:
Yofuria 2023-08-02 18:25:37 +08:00
parent 81a5a3696c
commit a0045c341b
4 changed files with 326 additions and 8 deletions

View File

@ -1 +1 @@
Order.py ga_new.py

View File

@ -9,6 +9,7 @@ import json
from Firm import Firm from Firm import Firm
# import passive agents # import passive agents
from Order import Order from Order import Order
from ga_new import GeneticAlgorithm
from fake_api import get_plan_by_pp_id, get_bom_by_prd_id from fake_api import get_plan_by_pp_id, get_bom_by_prd_id
@ -23,7 +24,7 @@ class FMSEnv(ap.Model):
# record data, define below # record data, define below
# op_os_n_total_order: int # op_os_n_total_order: int
# op_os_n_total_order_delayed: int # op_os_n_total_order_delayed: int
op_os_all_delay_time: list op_os_all_delay_time: float
# op_os_delay_ratio: float # op_os_delay_ratio: float
# op_is_flt_material_room_left: float # op_is_flt_material_room_left: float
# op_is_flt_product_room_left: float # op_is_flt_product_room_left: float
@ -55,6 +56,7 @@ class FMSEnv(ap.Model):
# self.ev_n_order_created = 0 # self.ev_n_order_created = 0
self.op_os_n_total_order = 0 self.op_os_n_total_order = 0
self.op_os_int_status = 0 self.op_os_int_status = 0
self.op_os_all_delay_time = 0
self.running = True self.running = True
self.t = 0 self.t = 0
@ -82,9 +84,10 @@ class FMSEnv(ap.Model):
if self.t >= self.int_stop_time: if self.t >= self.int_stop_time:
self.running = False self.running = False
self.stop() self.stop()
else: # else:
print(f"running the {self.t} step") #
print("当期延误时长为:{}".format(self.the_firm.the_os.ev_ave_delay_time)) # # print(f"running the {self.t} step")
# # print("当期延误时长为:{}".format(self.the_firm.the_os.ev_ave_delay_time))
# Record data after each simulation # Record data after each simulation
def update(self): # ? def update(self): # ?
@ -106,10 +109,20 @@ class FMSEnv(ap.Model):
self.record([att for att in self.__dict__.keys() if att.startswith('op_')]) # ? self.record([att for att in self.__dict__.keys() if att.startswith('op_')]) # ?
self.op_os_all_delay_time += self.the_firm.the_os.ev_ave_delay_time
# pass # pass
if __name__ == '__main__': def GA_run(inventory_bound=None):
material = tuple(pd.read_excel("initial_material.xlsx").iloc[:, 0])
s = tuple(tuple([i, j]) for i, j in
zip(material, inventory_bound[: len(pd.read_excel("initial_material.xlsx").to_numpy())]))
S = tuple(tuple([i, j]) for i, j in zip(material, inventory_bound[
len(pd.read_excel("initial_material.xlsx").to_numpy()): len(
pd.read_excel("initial_material.xlsx").to_numpy()) * 2]))
# print(s)
# print(S)
dct_para = { dct_para = {
'time': 300, # 进行总时间数 'time': 300, # 进行总时间数
# 'xv_int_max_order': random.randint(30, 50), # 'xv_int_max_order': random.randint(30, 50),
@ -134,8 +147,8 @@ if __name__ == '__main__':
# 初始原材料库存 115x2 # 初始原材料库存 115x2
'xv_ary_bom': tuple([tuple(x) for x in pd.read_excel("bom23.xlsx").values]), # bom表 'xv_ary_bom': tuple([tuple(x) for x in pd.read_excel("bom23.xlsx").values]), # bom表
'xv_ary_plan': tuple([tuple(x) for x in pd.read_excel("plan.xlsx").values]), # plan表 'xv_ary_plan': tuple([tuple(x) for x in pd.read_excel("plan.xlsx").values]), # plan表
'xv_ary_s': tuple([tuple(x) for x in pd.read_excel("rawmaterial - s.xlsx").values]), # s 'xv_ary_s': s, # s
'xv_ary_S': tuple([tuple(x) for x in pd.read_excel("rawmaterialS.xlsx").values]), # S 'xv_ary_S': S, # S
# 应读取遗传算法中随机生成的s暂写为'1' 创建两个excel分别存储产品和原材料的库存 每个excel中存系统代码和库存 # 应读取遗传算法中随机生成的s暂写为'1' 创建两个excel分别存储产品和原材料的库存 每个excel中存系统代码和库存
# 'xv_flt_initial_cash': 50000.0, # 'xv_flt_initial_cash': 50000.0,
# 'dct_status_info': json.dumps({ #需要引入生产状态表 # 'dct_status_info': json.dumps({ #需要引入生产状态表
@ -176,5 +189,83 @@ if __name__ == '__main__':
exp = ap.Experiment(FMSEnv, sample, iterations=1, record=True) exp = ap.Experiment(FMSEnv, sample, iterations=1, record=True)
results = exp.run() results = exp.run()
return results['variables']['FMSEnv']['op_os_all_delay_time'][dct_para['time']] / 2
if __name__ == '__main__':
# dct_para = {
# 'time': 60, # 进行总时间数
# # 'xv_int_max_order': random.randint(30, 50),
# # 'xv_dlv_product_para': tuple([(30, 100), (30, 50)]),
# # 'xv_dlv_product_para': tuple([30,40,30,20]), # 读取生产率 np.read.
# # 'xv_int_dlv_period_lam': 8.5,
# # 'xv_int_create_order_lam': 2,
# # 'xv_ary_price_product': tuple([0.3,0.2,0.5,1]),
# # 'xv_ary_cost_material_per': tuple([0.1,0.1,0.2,0.4]),
# # 'xv_ary_volume_material': tuple([1.0, 1.5]),
# # 'xv_ary_volume_product': tuple([3.0, 5.0]),
# # 'xv_array_lead_time': 2, # 读取原材料表格 np.read, 暂时不读 变量代表的含义
# # 'xv_int_lead_time_c': 3,
# # 'xv_int_lead_time_d': 1,
# 'xv_ary_product_id': tuple(pd.read_excel("initial_product.xlsx").iloc[:, 0]), # 产成品id顺序
# 'xv_ary_material_id': tuple(pd.read_excel("initial_material.xlsx").iloc[:, 0]), # 原材料id顺序
# 'xv_product_num': len(pd.read_excel("initial_product.xlsx").to_numpy()), # 产成品个数
# 'xv_material_num': len(pd.read_excel("initial_material.xlsx").to_numpy()), # 原材料个数
# 'xv_ary_initial_product_num': tuple([tuple(x) for x in pd.read_excel("initial_product.xlsx").values]),
# # 初始产成品库存 23x2
# 'xv_ary_initial_material_num': tuple([tuple(x) for x in pd.read_excel("initial_material.xlsx").values]),
# # 初始原材料库存 115x2
# 'xv_ary_bom': tuple([tuple(x) for x in pd.read_excel("bom23.xlsx").values]), # bom表
# 'xv_ary_plan': tuple([tuple(x) for x in pd.read_excel("plan.xlsx").values]), # plan表
# 'xv_ary_s': tuple([tuple(x) for x in pd.read_excel("rawmaterial - s.xlsx").values]), # s
# 'xv_ary_S': tuple([tuple(x) for x in pd.read_excel("rawmaterialS.xlsx").values]), # S
# # 应读取遗传算法中随机生成的s暂写为'1' 创建两个excel分别存储产品和原材料的库存 每个excel中存系统代码和库存
# # 'xv_flt_initial_cash': 50000.0,
# # 'dct_status_info': json.dumps({ #需要引入生产状态表
# # "0": {"xv_flt_produce_rate": tuple([0.0, 0.0]),
# # "xv_ary_mat_material": tuple([0.0, 0.0]),
# # "xv_flt_broken_rate": 0,
# # "xv_flt_run_cost": 0.0,
# # "name": "wait"
# # },
# # "1": {"xv_flt_produce_rate": tuple([90.0, 0.0]),
# # "xv_ary_mat_material": tuple([4.0, 1.0]),
# # "xv_flt_broken_rate": 0.03,
# # "xv_flt_run_cost": 40.0,
# # "name": "produceA"
# # },
# # "2": {"xv_flt_produce_rate": tuple([0.0, 60.0]),
# # "xv_ary_mat_material": tuple([1.5, 5.0]),
# # "xv_flt_broken_rate": 0.05,
# # "xv_flt_run_cost": 50.0,
# # "name": "produceB"
# # },
# # "3": {"xv_flt_produce_rate": tuple([55.0, 30.0]),
# # "xv_ary_mat_material": tuple([2.0, 1.5]),
# # "xv_flt_broken_rate": 0.07,
# # "xv_flt_run_cost": 60.0,
# # "name": "produceAB"
# # },
# # "-1": {"xv_flt_produce_rate": 0.0,
# # "xv_ary_mat_material": tuple([0.0, 0.0]),
# # "xv_flt_broken_rate": 0.1,
# # "xv_flt_run_cost": 100.0,
# # "name": "failed"
# # }
# # })
#
# }
# sample = ap.Sample(dct_para)
#
# exp = ap.Experiment(FMSEnv, sample, iterations=1, record=True)
# results = exp.run()
# print(results['variables']['FMSEnv']['op_os_all_delay_time'])
# print(results['variables']['FMSEnv']['op_os_all_delay_time'][dct_para['time']])
# results['variables']['FMSEnv'].to_excel(f"simulation-results-{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}.xlsx", # results['variables']['FMSEnv'].to_excel(f"simulation-results-{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}.xlsx",
# engine='openpyxl') # engine='openpyxl')
material_num = len(pd.read_excel("initial_material.xlsx").to_numpy()) # 原材料个数
GA = GeneticAlgorithm(function=GA_run, dim=material_num * 2, lb=[10 for i in range(material_num * 2)],
ub=[100 for i in range(material_num * 2)], int_var=[i for i in range(material_num * 2)])
GA.optimize()
# print(result1, result2)

Binary file not shown.

227
ga_new.py Normal file
View File

@ -0,0 +1,227 @@
import numpy
import numpy as np
class GeneticAlgorithm:
"""Genetic algorithm.
Implementation of the real-valued Genetic algorithm. The mutations are
normally distributed perturbations, the selection mechanism is a tournament
selection, and the crossover oepration is the standard linear combination
taken at a randomly generated cutting point.
The total number of evaluations are popsize x ngen
:param function: Function that can be used to evaluate the entire
population. It needs to take an input of size pop_size x dim and
return a numpy.array of size pop_size x 1
:type function: Object
:param dim: Number of dimensions
:type dim: int
:param lb: Lower variable bounds, of length dim
:type lb: numpy.array
:param ub: Lower variable bounds, of length dim
:type ub: numpy.array
:param int_var: List of indices with the integer valued variables
(e.g., [0, 1, 5])
:type int_var: list
:param pop_size: Population size
:type pop_size: int
:param num_gen: Number of generations
:type num_gen: int
:param start: Method for generating the initial population
:type start: string
:ivar nvariables: Number of variables (dimensions)
:ivar nindividuals: population size
:ivar lower_boundary: lower bounds for the optimization problem
:ivar upper_boundary: upper bounds for the optimization problem
:ivar integer_variables: List of variables that are integer valued
:ivar start: Method for generating the initial population
:ivar sigma: Perturbation radius. Each pertubation is N(0, sigma)
:ivar p_mutation: Mutation probability (1/dim)
:ivar tournament_size: Size of the tournament (5)
:ivar p_cross: Cross-over probability (0.9)
:ivar ngenerations: Number of generations
:ivar function: Object that can be used to evaluate the objective function
"""
def __init__(self, function, dim, lb, ub, int_var=None, pop_size=20, num_gen=300, start="Random"):
self.nvariables = dim # column
self.nindividuals = pop_size + (pop_size % 2) # Make sure this is even row
self.lower_boundary = np.array(lb)
self.upper_boundary = np.array(ub)
self.integer_variables = []
if int_var is not None:
self.integer_variables = np.array(int_var)
self.start = start
self.sigma = 0.2
self.p_mutation = 1.0 / dim
self.tournament_size = 5
self.p_cross = 0.9
self.ngenerations = num_gen
self.function = function
def optimize(self):
"""Method used to run the Genetic algorithm
:return: Returns the best individual and its function value
:rtype: numpy.array, float
"""
# Initialize population
if isinstance(self.start, np.ndarray):
if self.start.shape[0] != self.nindividuals or self.start.shape[1] != self.nvariables:
raise ValueError("Initial population has incorrect size")
if any(np.min(self.start, axis=0) < self.lower_boundary) or any(
np.max(self.start, axis=0) > self.upper_boundary
):
raise ValueError("Initial population is outside the domain", self.lower_boundary, self.upper_boundary,
self.start)
population = self.start
elif self.start == "SLHD":
from pySOT.experimental_design import SymmetricLatinHypercube
exp_des = SymmetricLatinHypercube(self.nvariables, self.nindividuals)
population = self.lower_boundary + exp_des.generate_points() * (self.upper_boundary - self.lower_boundary)
elif self.start == "LHD":
from pySOT.experimental_design import LatinHypercube
exp_des = LatinHypercube(self.nvariables, self.nindividuals)
population = self.lower_boundary + exp_des.generate_points() * (self.upper_boundary - self.lower_boundary)
elif self.start == "Random":
population = self.lower_boundary + np.random.rand(self.nindividuals, self.nvariables) * (
self.upper_boundary - self.lower_boundary
)
else:
raise ValueError("Unknown argument for initial population")
new_population = []
# Round positions
if len(self.integer_variables) > 0: # 对特定列进行操作
new_population = np.copy(population)
population[:, self.integer_variables] = np.round(population[:, self.integer_variables])
for i in self.integer_variables:
ind = np.where(population[:, i] < self.lower_boundary[i])
population[ind, i] += 1
ind = np.where(population[:, i] > self.upper_boundary[i])
population[ind, i] -= 1
# Evaluate all individuals
# function_values = self.function(population) we cannot compute in this way to ensure x is one-dim in policy
n_row, n_dim = population.shape
function_values = []
for r in range(n_row):
function_values.append(self.function(population[r, :]))
function_values = np.array(function_values)
if len(function_values.shape) == 2:
function_values = np.squeeze(np.asarray(function_values))
# Save the best individual
ind = np.argmin(function_values)
best_individual = np.copy(population[ind, :]) # 找到最优个体
best_value = function_values[ind]
if len(self.integer_variables) > 0:
population = new_population
# Main loop
for _ in range(self.ngenerations):
print('------------------------------')
print("当前为第{}".format(_))
print("最优个体为:{}".format(best_individual))
print("最优值为:{}".format(best_value))
print("------------------------------")
# Do tournament selection to select the parents
competitors = np.random.randint(0, self.nindividuals, (self.nindividuals, self.tournament_size))
ind = np.argmin(function_values[competitors], axis=1)
winner_indices = np.zeros(self.nindividuals, dtype=int)
for i in range(self.tournament_size): # This loop is short
winner_indices[np.where(ind == i)] = competitors[np.where(ind == i), i]
parent1 = population[winner_indices[0: self.nindividuals // 2], :]
parent2 = population[winner_indices[self.nindividuals // 2: self.nindividuals], :]
# Averaging Crossover
cross = np.where(np.random.rand(self.nindividuals // 2) < self.p_cross)[0]
nn = len(cross) # Number of crossovers
alpha = np.random.rand(nn, 1)
# Create the new chromosomes
parent1_new = np.multiply(alpha, parent1[cross, :]) + np.multiply(1 - alpha, parent2[cross, :])
parent2_new = np.multiply(alpha, parent2[cross, :]) + np.multiply(1 - alpha, parent1[cross, :])
parent1[cross, :] = parent1_new
parent2[cross, :] = parent2_new
population = np.concatenate((parent1, parent2))
# Apply mutation
scale_factors = self.sigma * (self.upper_boundary - self.lower_boundary) # Scale
perturbation = np.random.randn(self.nindividuals, self.nvariables) # Generate perturbations
perturbation = np.multiply(perturbation, scale_factors) # Scale accordingly
perturbation = np.multiply(
perturbation, (np.random.rand(self.nindividuals, self.nvariables) < self.p_mutation)
)
perturbation = round_vars(perturbation, self.integer_variables, self.lower_boundary, self.upper_boundary)
population = round_vars(population, self.integer_variables, self.lower_boundary, self.upper_boundary)
population += perturbation # Add perturbation
population = np.maximum(np.reshape(self.lower_boundary, (1, self.nvariables)), population)
population = np.minimum(np.reshape(self.upper_boundary, (1, self.nvariables)), population)
# Round chromosomes
new_population = []
if len(self.integer_variables) > 0:
new_population = np.copy(population)
population = round_vars(population, self.integer_variables, self.lower_boundary, self.upper_boundary)
# Keep the best individual
population[0, :] = best_individual
# Evaluate all individuals
# function_values = self.function(population) we cannot compute in this way to ensure x is one-dim in policy
n_row, n_dim = population.shape
function_values = []
for r in range(n_row):
function_values.append(self.function(population[r, :]))
function_values = np.array(function_values)
if len(function_values.shape) == 2:
function_values = np.squeeze(np.asarray(function_values))
# Save the best individual
ind = np.argmin(function_values)
best_individual = np.copy(population[ind, :])
best_value = function_values[ind]
# Use the positions that are not rounded
if len(self.integer_variables) > 0:
population = new_population
# return best_individual, best_value
def round_vars(x: np.ndarray, int_var, lb, ub):
"""Round integer variables to closest integer in the domain.
:param x: Set of points, of size npts x dim
:type x: numpy.array
:param int_var: Set of indices of integer variables
:type int_var: numpy.array
:param lb: Lower bounds, of size 1 x dim
:type lb: numpy.array
:param ub: Upper bounds, of size 1 x dim
:type ub: numpy.array
:return: The set of points with the integer variables
rounded to the closest integer in the domain
:rtype: numpy.array
"""
# Make sure we don't violate the bound constraints
for i in int_var:
ind = np.where(x[:, i] < lb[i])
x[ind, i] = lb[i]
ind = np.where(x[:, i] > ub[i])
x[ind, i] = ub[i]
if len(int_var) > 0:
# Round the original ranged integer variables
x[:, int_var] = np.round(x[:, int_var])
x = x.astype(numpy.int32, copy=True)
else:
x = x.astype(numpy.float64, copy=True)
return x