import os import datetime from mesa import Model from typing import TYPE_CHECKING from my_model import MyModel if TYPE_CHECKING: from controller_db import ControllerDB class Computation: def __init__(self, c_db: 'ControllerDB'): # 控制不同进程 计算不同的样本 但使用同一个 数据库 c_db self.c_db = c_db self.pid = os.getpid() def run(self, str_code='0', s_id=None): sample_random = self.c_db.fetch_a_sample(s_id) if sample_random is None: return True # lock this row by update is_done_flag to 0 将运行后的样本设置为 flag 0 self.c_db.lock_the_sample(sample_random) print( f"Pid {self.pid} ({str_code}) is running " f"sample {sample_random.id} at {datetime.datetime.now()}") # 将sample 对应的 experiment 的一系列值 和 参数值 传入 模型 中 包括列名 和 值 dct_exp = {column: getattr(sample_random.experiment, column) for column in sample_random.experiment.__table__.c.keys()} # 删除不需要的 主键 del dct_exp['id'] dct_sample_para = {'sample': sample_random, 'seed': sample_random.seed, **dct_exp} model = MyModel(dct_sample_para) for i in range(100): model.step() return False