import os import datetime from model import Model from typing import TYPE_CHECKING if TYPE_CHECKING: from controller_db import ControllerDB class Computation: def __init__(self, c_db: 'ControllerDB'): self.c_db = c_db self.pid = os.getpid() def run(self, str_code='0', s_id=None): sample_random = self.c_db.fetch_a_sample(s_id) if sample_random is None: return True # lock this row by update is_done_flag to 0 self.c_db.lock_the_sample(sample_random) print( f"Pid {self.pid} ({str_code}) is running " f"sample {sample_random.id} at {datetime.datetime.now()}") dct_exp = {column: getattr(sample_random.experiment, column) for column in sample_random.experiment.__table__.c.keys()} del dct_exp['id'] dct_sample_para = {'sample': sample_random, 'seed': sample_random.seed, **dct_exp} model = Model(dct_sample_para) model.run(display=False) return False