import json import random from pathlib import Path from typing import List, Tuple from simulation_model import SimulationModel # set year year = json.load(open('year.json', 'r', encoding='utf-8'))['year'] filename = f"{year}" # Decision variables: # - month1, month2, month3, month4 efficiencies (hour/blade) # - factory-specific new-factory factors (from data/factory_mapping.json) FACTORY_MAPPING = json.loads(Path(f"data/{filename}/factory_mapping.json").read_text(encoding="utf-8")) FACTORY_IDS = list(FACTORY_MAPPING.values()) # Bounds for genes: (min, max) MONTH_BOUNDS = (30.0, 250.0) FACTOR_BOUNDS = (0.8, 3.0) POP_SIZE = 20 GENERATIONS = 2 MUTATION_RATE = 0.2 MUTATION_STD = 5.0 # hours for months; factors mutate separately def clip(val: float, bounds: Tuple[float, float]) -> float: lo, hi = bounds return max(lo, min(hi, val)) def evaluate(genes: List[float]) -> float: month1, month2, month3, month4 = genes[:4] factor_genes = genes[4:] factory_factors = {fid: val for fid, val in zip(FACTORY_IDS, factor_genes)} try: model = SimulationModel( month1=month1, month2=month2, month3=month3, month4=month4, factory_factors=factory_factors, output_enabled=False, ) while model.running: model.step() return model.error except PermissionError as e: print(f"文件访问冲突: {e}. 正在重试...") return float('inf') # 返回高值以惩罚该个体 except Exception as e: print(f"发生错误: {e}") return float('inf') def mutate(genes: List[float]) -> List[float]: new = genes.copy() for i in range(len(new)): if random.random() < MUTATION_RATE: if i < 4: new[i] = clip(new[i] + random.gauss(0, MUTATION_STD), MONTH_BOUNDS) else: jitter = random.gauss(0, 0.05) new[i] = clip(new[i] + jitter, FACTOR_BOUNDS) return new def crossover(p1: List[float], p2: List[float]) -> Tuple[List[float], List[float]]: point = random.randint(1, len(p1) - 1) c1 = p1[:point] + p2[point:] c2 = p2[:point] + p1[point:] return c1, c2 def init_population() -> List[List[float]]: pop = [] # Warm start from best params if available best_path = Path("data") / filename / "ga_best_params.json" seed_indiv = None if best_path.exists(): try: best = json.loads(best_path.read_text(encoding="utf-8")) seed_indiv = [ float(best.get("month1", random.uniform(*MONTH_BOUNDS))), float(best.get("month2", random.uniform(*MONTH_BOUNDS))), float(best.get("month3", random.uniform(*MONTH_BOUNDS))), float(best.get("month4", random.uniform(*MONTH_BOUNDS))), ] for fid in FACTORY_IDS: seed_indiv.append(float(best.get(f"factor_{fid}", random.uniform(*FACTOR_BOUNDS)))) except Exception: seed_indiv = None for _ in range(POP_SIZE): if seed_indiv is not None and _ == 0: indiv = seed_indiv else: indiv = [ random.uniform(*MONTH_BOUNDS), random.uniform(*MONTH_BOUNDS), random.uniform(*MONTH_BOUNDS), random.uniform(*MONTH_BOUNDS), ] indiv += [random.uniform(*FACTOR_BOUNDS) for _ in FACTORY_IDS] pop.append(indiv) return pop def main(): best_genes = None best_score = float("inf") population = init_population() for gen in range(GENERATIONS): scored = [] for indiv in population: score = evaluate(indiv) scored.append((score, indiv)) if score < best_score: best_score = score best_genes = indiv scored.sort(key=lambda x: x[0]) # Elitism: keep top 2 next_pop = [scored[0][1], scored[1][1]] # Fill rest via crossover + mutation while len(next_pop) < POP_SIZE: parents = random.sample(scored[:10], 2) c1, c2 = crossover(parents[0][1], parents[1][1]) next_pop.append(mutate(c1)) if len(next_pop) < POP_SIZE: next_pop.append(mutate(c2)) population = next_pop print(f"Generation {gen+1}: best={best_score:.4f}") # Save best parameters result = { "month1": best_genes[0], "month2": best_genes[1], "month3": best_genes[2], "month4": best_genes[3], } for fid, val in zip(FACTORY_IDS, best_genes[4:]): result[f"factor_{fid}"] = val data_path = Path("data") / filename / "ga_best_params.json" data_path.parent.mkdir(exist_ok=True) # Only overwrite if better if data_path.exists(): try: prev = json.loads(data_path.read_text(encoding="utf-8")) prev_score = float(prev.get("best_score", float("inf"))) except Exception: prev_score = float("inf") else: prev_score = float("inf") if best_score < prev_score: data_path.write_text( json.dumps({"best_score": best_score, **result}, ensure_ascii=False, indent=2), encoding="utf-8", ) print(f"New best mean abs error: {best_score:.4f}, saved to {data_path}") else: print(f"Best mean abs error: {best_score:.4f} (not better than {prev_score:.4f}, not saved)") if __name__ == "__main__": main()