import json import os import pickle import networkx as nx from sqlalchemy import text import pandas as pd from orm import connection # """ # 计算最脆弱前100产品的 Code 列表(去重)。 # """ # bom_file = r"../input_data/input_product_data/BomNodes.csv" # mapping_df = pd.read_csv(bom_file) # # with open("../SQL_analysis_risk.sql", "r", encoding="utf-8") as f: # str_sql = text(f.read()) # # result = pd.read_sql(sql=str_sql, con=connection) # # count_firm_prod = result.value_counts(subset=['id_firm', 'id_product']) # count_firm_prod.name = 'count' # count_firm_prod = count_firm_prod.to_frame().reset_index() # # count_prod = ( # count_firm_prod.groupby("id_product")["count"].sum().reset_index() # ) # # vulnerable100_index = count_prod.nsmallest(100, "count")["id_product"].tolist() # # 确保 index_to_code 的 key 都是 int # index_to_code = {int(k): v for k, v in zip(mapping_df["Index"], mapping_df["Code"])} # # # vulnerable100_index 也转成 int # vulnerable100_index_int = [int(i) for i in vulnerable100_index] # # # 获取 code # vulnerable100_code = [index_to_code[i] for i in vulnerable100_index_int if i in index_to_code] # # print(vulnerable100_code) # 读取 SQL ga_id="c40fd813" with open("SQL_analysis_risk_ga.sql", "r", encoding="utf-8") as f: str_sql = text(f.read()) print(str_sql[:300]) print(f"[信息] 正在查询 ga_id={ga_id} 的脆弱产品数据...") # 执行 SQL 查询,并绑定参数 ga_id result = pd.read_sql( sql=str_sql, con=connection, params={"ga_id": ga_id} # 绑定参数 ) # =============================== # 2️⃣ 统计每个企业-产品组合出现次数 # =============================== count_firm_prod = result.value_counts(subset=['id_firm', 'id_product']) count_firm_prod.name = 'count' count_firm_prod = count_firm_prod.to_frame().reset_index() count_firm_prod.to_csv('count_firm_prod.csv', index=False, encoding='utf-8-sig') # =============================== # 3️⃣ 统计每个企业出现的总次数 # =============================== count_firm = count_firm_prod.groupby('id_firm')['count'].sum().reset_index() count_firm.sort_values('count', ascending=False, inplace=True) count_firm.to_csv('count_firm.csv', index=False, encoding='utf-8-sig') # =============================== # 4️⃣ 统计每个产品出现的总次数 # =============================== count_prod = count_firm_prod.groupby('id_product')['count'].sum().reset_index() count_prod.sort_values('count', ascending=False, inplace=True) count_prod.to_csv('count_prod.csv', index=False, encoding='utf-8-sig') # =============================== # 5️⃣ 选出最脆弱的前100个产品(出现次数最多) # =============================== vulnerable100_product = count_prod.nlargest(100, "count")["id_product"].tolist() print(f"[信息] ga_id={ga_id} 查询完成,共找到 {len(vulnerable100_product)} 个脆弱产品") # =============================== # 6️⃣ 过滤 result,只保留前100脆弱产品 # =============================== result_vulnerable100 = result[result['id_product'].isin(vulnerable100_product)].copy() print(f"[信息] 筛选后剩余记录数: {len(result_vulnerable100)}") # =============================== # 7️⃣ 构造 DCP(Disruption Causing Probability) # =============================== result_dcp_list = [] for sid, group in result_vulnerable100.groupby('s_id'): ts_start = max(group['ts']) while ts_start >= 1: ts_end = ts_start - 1 while ts_end >= 0: up = group.loc[group['ts'] == ts_end, ['id_firm', 'id_product']] down = group.loc[group['ts'] == ts_start, ['id_firm', 'id_product']] for _, up_row in up.iterrows(): for _, down_row in down.iterrows(): result_dcp_list.append([sid] + up_row.tolist() + down_row.tolist()) ts_end -= 1 ts_start -= 1 # 转换为 DataFrame result_dcp = pd.DataFrame(result_dcp_list, columns=[ 's_id', 'up_id_firm', 'up_id_product', 'down_id_firm', 'down_id_product' ]) # =============================== # 8️⃣ 统计 DCP 出现次数 # =============================== count_dcp = result_dcp.value_counts( subset=['up_id_firm', 'up_id_product', 'down_id_firm', 'down_id_product'] ).reset_index(name='count') # 保存文件 count_dcp.to_csv('count_dcp.csv', index=False, encoding='utf-8-sig') # 输出结果 print(count_dcp) print(type(vulnerable100_product[0])) # industry_list = [ # # ① 半导体设备类 # {"product": "离子注入机", "category": "离子注入设备", "chain_id": 34538}, # {"product": "刻蚀设备 / 湿法刻蚀设备", "category": "刻蚀机", "chain_id": 34529}, # {"product": "沉积设备", "category": "薄膜生长设备(CVD/PVD)", "chain_id": 34539}, # {"product": "CVD", "category": "薄膜生长设备", "chain_id": 34539}, # {"product": "PVD", "category": "薄膜生长设备", "chain_id": 34539}, # {"product": "CMP", "category": "化学机械抛光设备", "chain_id": 34530}, # {"product": "光刻机", "category": "光刻机", "chain_id": 34533}, # {"product": "涂胶显影机", "category": "涂胶显影设备", "chain_id": 34535}, # {"product": "晶圆清洗设备", "category": "晶圆清洗机", "chain_id": 34531}, # {"product": "测试设备", "category": "测试机", "chain_id": 34554}, # {"product": "外延生长设备", "category": "薄膜生长设备", "chain_id": 34539}, # # # ② 半导体材料与化学品类 # {"product": "三氯乙烯", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438}, # {"product": "丙酮", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438}, # {"product": "异丙醇", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438}, # {"product": "其他醇类", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438}, # {"product": "光刻胶", "category": "光刻胶及配套试剂", "chain_id": 32445}, # {"product": "显影液", "category": "显影液", "chain_id": 46504}, # {"product": "蚀刻液", "category": "蚀刻液", "chain_id": 56341}, # {"product": "光阻去除剂", "category": "光阻去除剂", "chain_id": 32442}, # # # ③ 晶圆制造类 # {"product": "晶圆", "category": "单晶硅片 / 多晶硅片", "chain_id": 32338}, # {"product": "硅衬底", "category": "硅衬底", "chain_id": 36914}, # {"product": "外延片", "category": "硅外延片 / GaN外延片 / SiC外延片等", "chain_id": 32338}, # # # ④ 封装与测试类 # {"product": "封装", "category": "IC封装", "chain_id": 10}, # {"product": "测试", "category": "芯片测试 / 晶圆测试", "chain_id": 513742}, # {"product": "测试", "category": "芯片测试 / 晶圆测试", "chain_id": 11}, # # # ⑤ 芯片与设计EDA类 # {"product": "芯片(通用)", "category": "集成电路制造", "chain_id": 317589}, # {"product": "DRAM", "category": "存储芯片 → 集成电路制造", "chain_id": 317589}, # {"product": "GPU", "category": "图形芯片 → 集成电路制造", "chain_id": 317589}, # {"product": "处理器(CPU/SoC)", "category": "芯片设计", "chain_id": 9}, # {"product": "高频芯片", "category": "芯片设计", "chain_id": 9}, # {"product": "光子芯片(含激光)", "category": "芯片设计 / 功率半导体器件", "chain_id": 9}, # {"product": "光子芯片(含激光)", "category": "芯片设计 / 功率半导体器件", "chain_id": 2717}, # {"product": "先进节点制造设备", "category": "集成电路制造", "chain_id": 317589}, # {"product": "EDA及IP服务", "category": "设计辅助", "chain_id": 2515}, # {"product": "MPW服务", "category": "多项目晶圆流片", "chain_id": 2514}, # {"product": "芯片设计验证", "category": "设计验证", "chain_id": 513738}, # {"product": "过程工艺检测", "category": "制程检测", "chain_id": 513740} # ] # # 提取所有 chain_id,并去重 # chain_ids = set() # for item in industry_list: # # 如果 chain_id 是字符串包含多个编号,用逗号或斜杠拆分 # if isinstance(item["chain_id"], str): # for cid in item["chain_id"].replace("/", ",").split(","): # chain_ids.add(cid.strip()) # else: # chain_ids.add(str(item["chain_id"])) # print(list(chain_ids)) # fill g_bom # 结点属性值 相当于 图上点的 原始 产品名称 # bom_nodes = pd.read_csv('../input_data/input_product_data/BomNodes.csv') # bom_nodes['Code'] = bom_nodes['Code'].astype(str) # bom_nodes.set_index('Index', inplace=True) # # bom_cate_net = pd.read_csv('../input_data/input_product_data/合成结点.csv') # g_bom = nx.from_pandas_edgelist(bom_cate_net, source='UPID', target='ID', create_using=nx.MultiDiGraph()) # # 填充每一个结点 的具体内容 通过 相同的 code 并且通过BomNodes.loc[code].to_dict()字典化 格式类似 格式 { code(0) : {level: 0 ,name: 工业互联网 }} # bom_labels_dict = {} # for index in g_bom.nodes: # try: # bom_labels_dict[index] = bom_nodes.loc[index].to_dict() # # print(bom_labels_dict[index]) # except KeyError: # print(f"节点 {index} 不存在于 bom_nodes 中") # # 分配属性 给每一个结点 获得类似 格式:{1: {'label': 'A', 'value': 10}, # nx.set_node_attributes(g_bom, bom_labels_dict) # # 改为json 格式 # g_product_js = json.dumps(nx.adjacency_data(g_bom)) # # 假设 g_bom 是你的 NetworkX 图 # g_product_data = nx.adjacency_data(g_bom) # # # 保存为 pkl 文件 # with open("g_bom.pkl", "wb") as f: # pickle.dump(g_product_data, f) # # print("✅ 图数据已保存为 g_bom.pkl")