first
This commit is contained in:
167
genetic_calibration.py
Normal file
167
genetic_calibration.py
Normal file
@@ -0,0 +1,167 @@
|
||||
import json
|
||||
import random
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
from simulation_model import SimulationModel
|
||||
|
||||
|
||||
# Decision variables:
|
||||
# - month1, month2, month3, month4 efficiencies (hour/blade)
|
||||
# - factory-specific new-factory factors (from data/factory_mapping.json)
|
||||
FACTORY_MAPPING = json.loads(Path("data/factory_mapping.json").read_text(encoding="utf-8"))
|
||||
FACTORY_IDS = list(FACTORY_MAPPING.values())
|
||||
|
||||
# Bounds for genes: (min, max)
|
||||
MONTH_BOUNDS = (30.0, 250.0)
|
||||
FACTOR_BOUNDS = (0.8, 3.0)
|
||||
|
||||
POP_SIZE = 20
|
||||
GENERATIONS = 200
|
||||
MUTATION_RATE = 0.2
|
||||
MUTATION_STD = 5.0 # hours for months; factors mutate separately
|
||||
|
||||
|
||||
def clip(val: float, bounds: Tuple[float, float]) -> float:
|
||||
lo, hi = bounds
|
||||
return max(lo, min(hi, val))
|
||||
|
||||
|
||||
def evaluate(genes: List[float]) -> float:
|
||||
month1, month2, month3, month4 = genes[:4]
|
||||
factor_genes = genes[4:]
|
||||
factory_factors = {fid: val for fid, val in zip(FACTORY_IDS, factor_genes)}
|
||||
try:
|
||||
model = SimulationModel(
|
||||
month1=month1,
|
||||
month2=month2,
|
||||
month3=month3,
|
||||
month4=month4,
|
||||
factory_factors=factory_factors,
|
||||
output_enabled=False,
|
||||
)
|
||||
while model.running:
|
||||
model.step()
|
||||
return model.mean_abs_error
|
||||
except PermissionError as e:
|
||||
print(f"文件访问冲突: {e}. 正在重试...")
|
||||
return float('inf') # 返回高值以惩罚该个体
|
||||
except Exception as e:
|
||||
print(f"发生错误: {e}")
|
||||
return float('inf')
|
||||
|
||||
|
||||
def mutate(genes: List[float]) -> List[float]:
|
||||
new = genes.copy()
|
||||
for i in range(len(new)):
|
||||
if random.random() < MUTATION_RATE:
|
||||
if i < 4:
|
||||
new[i] = clip(new[i] + random.gauss(0, MUTATION_STD), MONTH_BOUNDS)
|
||||
else:
|
||||
jitter = random.gauss(0, 0.05)
|
||||
new[i] = clip(new[i] + jitter, FACTOR_BOUNDS)
|
||||
return new
|
||||
|
||||
|
||||
def crossover(p1: List[float], p2: List[float]) -> Tuple[List[float], List[float]]:
|
||||
point = random.randint(1, len(p1) - 1)
|
||||
c1 = p1[:point] + p2[point:]
|
||||
c2 = p2[:point] + p1[point:]
|
||||
return c1, c2
|
||||
|
||||
|
||||
def init_population() -> List[List[float]]:
|
||||
pop = []
|
||||
# Warm start from best params if available
|
||||
best_path = Path("data") / "ga_best_params.json"
|
||||
seed_indiv = None
|
||||
if best_path.exists():
|
||||
try:
|
||||
best = json.loads(best_path.read_text(encoding="utf-8"))
|
||||
seed_indiv = [
|
||||
float(best.get("month1", random.uniform(*MONTH_BOUNDS))),
|
||||
float(best.get("month2", random.uniform(*MONTH_BOUNDS))),
|
||||
float(best.get("month3", random.uniform(*MONTH_BOUNDS))),
|
||||
float(best.get("month4", random.uniform(*MONTH_BOUNDS))),
|
||||
]
|
||||
for fid in FACTORY_IDS:
|
||||
seed_indiv.append(float(best.get(f"factor_{fid}", random.uniform(*FACTOR_BOUNDS))))
|
||||
except Exception:
|
||||
seed_indiv = None
|
||||
|
||||
for _ in range(POP_SIZE):
|
||||
if seed_indiv is not None and _ == 0:
|
||||
indiv = seed_indiv
|
||||
else:
|
||||
indiv = [
|
||||
random.uniform(*MONTH_BOUNDS),
|
||||
random.uniform(*MONTH_BOUNDS),
|
||||
random.uniform(*MONTH_BOUNDS),
|
||||
random.uniform(*MONTH_BOUNDS),
|
||||
]
|
||||
indiv += [random.uniform(*FACTOR_BOUNDS) for _ in FACTORY_IDS]
|
||||
pop.append(indiv)
|
||||
return pop
|
||||
|
||||
|
||||
def main():
|
||||
best_genes = None
|
||||
best_score = float("inf")
|
||||
population = init_population()
|
||||
|
||||
for gen in range(GENERATIONS):
|
||||
scored = []
|
||||
for indiv in population:
|
||||
score = evaluate(indiv)
|
||||
scored.append((score, indiv))
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
best_genes = indiv
|
||||
scored.sort(key=lambda x: x[0])
|
||||
# Elitism: keep top 2
|
||||
next_pop = [scored[0][1], scored[1][1]]
|
||||
# Fill rest via crossover + mutation
|
||||
while len(next_pop) < POP_SIZE:
|
||||
parents = random.sample(scored[:10], 2)
|
||||
c1, c2 = crossover(parents[0][1], parents[1][1])
|
||||
next_pop.append(mutate(c1))
|
||||
if len(next_pop) < POP_SIZE:
|
||||
next_pop.append(mutate(c2))
|
||||
population = next_pop
|
||||
print(f"Generation {gen+1}: best={best_score:.4f}")
|
||||
|
||||
# Save best parameters
|
||||
result = {
|
||||
"month1": best_genes[0],
|
||||
"month2": best_genes[1],
|
||||
"month3": best_genes[2],
|
||||
"month4": best_genes[3],
|
||||
}
|
||||
for fid, val in zip(FACTORY_IDS, best_genes[4:]):
|
||||
result[f"factor_{fid}"] = val
|
||||
|
||||
out_path = Path("output") / "ga_best_params.json"
|
||||
data_path = Path("data") / "ga_best_params.json"
|
||||
data_path.parent.mkdir(exist_ok=True)
|
||||
# Only overwrite if better
|
||||
if data_path.exists():
|
||||
try:
|
||||
prev = json.loads(data_path.read_text(encoding="utf-8"))
|
||||
prev_score = float(prev.get("best_score", float("inf")))
|
||||
except Exception:
|
||||
prev_score = float("inf")
|
||||
else:
|
||||
prev_score = float("inf")
|
||||
|
||||
if best_score < prev_score:
|
||||
data_path.write_text(
|
||||
json.dumps({"best_score": best_score, **result}, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
print(f"New best mean abs error: {best_score:.4f}, saved to {data_path}")
|
||||
else:
|
||||
print(f"Best mean abs error: {best_score:.4f} (not better than {prev_score:.4f}, not saved)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user