mesa-GA/GA_Agent_0925/多功能.py

212 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import pickle
import networkx as nx
from sqlalchemy import text
import pandas as pd
from orm import connection
# """
# 计算最脆弱前100产品的 Code 列表(去重)。
# """
# bom_file = r"../input_data/input_product_data/BomNodes.csv"
# mapping_df = pd.read_csv(bom_file)
#
# with open("../SQL_analysis_risk.sql", "r", encoding="utf-8") as f:
# str_sql = text(f.read())
#
# result = pd.read_sql(sql=str_sql, con=connection)
#
# count_firm_prod = result.value_counts(subset=['id_firm', 'id_product'])
# count_firm_prod.name = 'count'
# count_firm_prod = count_firm_prod.to_frame().reset_index()
#
# count_prod = (
# count_firm_prod.groupby("id_product")["count"].sum().reset_index()
# )
#
# vulnerable100_index = count_prod.nsmallest(100, "count")["id_product"].tolist()
# # 确保 index_to_code 的 key 都是 int
# index_to_code = {int(k): v for k, v in zip(mapping_df["Index"], mapping_df["Code"])}
#
# # vulnerable100_index 也转成 int
# vulnerable100_index_int = [int(i) for i in vulnerable100_index]
#
# # 获取 code
# vulnerable100_code = [index_to_code[i] for i in vulnerable100_index_int if i in index_to_code]
#
# print(vulnerable100_code)
# 读取 SQL
ga_id="c40fd813"
with open("SQL_analysis_risk_ga.sql", "r", encoding="utf-8") as f:
str_sql = text(f.read())
print(str_sql[:300])
print(f"[信息] 正在查询 ga_id={ga_id} 的脆弱产品数据...")
# 执行 SQL 查询,并绑定参数 ga_id
result = pd.read_sql(
sql=str_sql,
con=connection,
params={"ga_id": ga_id} # 绑定参数
)
# ===============================
# 2⃣ 统计每个企业-产品组合出现次数
# ===============================
count_firm_prod = result.value_counts(subset=['id_firm', 'id_product'])
count_firm_prod.name = 'count'
count_firm_prod = count_firm_prod.to_frame().reset_index()
count_firm_prod.to_csv('count_firm_prod.csv', index=False, encoding='utf-8-sig')
# ===============================
# 3⃣ 统计每个企业出现的总次数
# ===============================
count_firm = count_firm_prod.groupby('id_firm')['count'].sum().reset_index()
count_firm.sort_values('count', ascending=False, inplace=True)
count_firm.to_csv('count_firm.csv', index=False, encoding='utf-8-sig')
# ===============================
# 4⃣ 统计每个产品出现的总次数
# ===============================
count_prod = count_firm_prod.groupby('id_product')['count'].sum().reset_index()
count_prod.sort_values('count', ascending=False, inplace=True)
count_prod.to_csv('count_prod.csv', index=False, encoding='utf-8-sig')
# ===============================
# 5⃣ 选出最脆弱的前100个产品出现次数最多
# ===============================
vulnerable100_product = count_prod.nlargest(100, "count")["id_product"].tolist()
print(f"[信息] ga_id={ga_id} 查询完成,共找到 {len(vulnerable100_product)} 个脆弱产品")
# ===============================
# 6⃣ 过滤 result只保留前100脆弱产品
# ===============================
result_vulnerable100 = result[result['id_product'].isin(vulnerable100_product)].copy()
print(f"[信息] 筛选后剩余记录数: {len(result_vulnerable100)}")
# ===============================
# 7⃣ 构造 DCPDisruption Causing Probability
# ===============================
result_dcp_list = []
for sid, group in result_vulnerable100.groupby('s_id'):
ts_start = max(group['ts'])
while ts_start >= 1:
ts_end = ts_start - 1
while ts_end >= 0:
up = group.loc[group['ts'] == ts_end, ['id_firm', 'id_product']]
down = group.loc[group['ts'] == ts_start, ['id_firm', 'id_product']]
for _, up_row in up.iterrows():
for _, down_row in down.iterrows():
result_dcp_list.append([sid] + up_row.tolist() + down_row.tolist())
ts_end -= 1
ts_start -= 1
# 转换为 DataFrame
result_dcp = pd.DataFrame(result_dcp_list, columns=[
's_id', 'up_id_firm', 'up_id_product', 'down_id_firm', 'down_id_product'
])
# ===============================
# 8⃣ 统计 DCP 出现次数
# ===============================
count_dcp = result_dcp.value_counts(
subset=['up_id_firm', 'up_id_product', 'down_id_firm', 'down_id_product']
).reset_index(name='count')
# 保存文件
count_dcp.to_csv('count_dcp.csv', index=False, encoding='utf-8-sig')
# 输出结果
print(count_dcp)
print(type(vulnerable100_product[0]))
# industry_list = [
# # ① 半导体设备类
# {"product": "离子注入机", "category": "离子注入设备", "chain_id": 34538},
# {"product": "刻蚀设备 / 湿法刻蚀设备", "category": "刻蚀机", "chain_id": 34529},
# {"product": "沉积设备", "category": "薄膜生长设备CVD/PVD", "chain_id": 34539},
# {"product": "CVD", "category": "薄膜生长设备", "chain_id": 34539},
# {"product": "PVD", "category": "薄膜生长设备", "chain_id": 34539},
# {"product": "CMP", "category": "化学机械抛光设备", "chain_id": 34530},
# {"product": "光刻机", "category": "光刻机", "chain_id": 34533},
# {"product": "涂胶显影机", "category": "涂胶显影设备", "chain_id": 34535},
# {"product": "晶圆清洗设备", "category": "晶圆清洗机", "chain_id": 34531},
# {"product": "测试设备", "category": "测试机", "chain_id": 34554},
# {"product": "外延生长设备", "category": "薄膜生长设备", "chain_id": 34539},
#
# # ② 半导体材料与化学品类
# {"product": "三氯乙烯", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438},
# {"product": "丙酮", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438},
# {"product": "异丙醇", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438},
# {"product": "其他醇类", "category": "清洗溶剂 → 通用湿电子化学品", "chain_id": 32438},
# {"product": "光刻胶", "category": "光刻胶及配套试剂", "chain_id": 32445},
# {"product": "显影液", "category": "显影液", "chain_id": 46504},
# {"product": "蚀刻液", "category": "蚀刻液", "chain_id": 56341},
# {"product": "光阻去除剂", "category": "光阻去除剂", "chain_id": 32442},
#
# # ③ 晶圆制造类
# {"product": "晶圆", "category": "单晶硅片 / 多晶硅片", "chain_id": 32338},
# {"product": "硅衬底", "category": "硅衬底", "chain_id": 36914},
# {"product": "外延片", "category": "硅外延片 / GaN外延片 / SiC外延片等", "chain_id": 32338},
#
# # ④ 封装与测试类
# {"product": "封装", "category": "IC封装", "chain_id": 10},
# {"product": "测试", "category": "芯片测试 / 晶圆测试", "chain_id": 513742},
# {"product": "测试", "category": "芯片测试 / 晶圆测试", "chain_id": 11},
#
# # ⑤ 芯片与设计EDA类
# {"product": "芯片(通用)", "category": "集成电路制造", "chain_id": 317589},
# {"product": "DRAM", "category": "存储芯片 → 集成电路制造", "chain_id": 317589},
# {"product": "GPU", "category": "图形芯片 → 集成电路制造", "chain_id": 317589},
# {"product": "处理器CPU/SoC", "category": "芯片设计", "chain_id": 9},
# {"product": "高频芯片", "category": "芯片设计", "chain_id": 9},
# {"product": "光子芯片(含激光)", "category": "芯片设计 / 功率半导体器件", "chain_id": 9},
# {"product": "光子芯片(含激光)", "category": "芯片设计 / 功率半导体器件", "chain_id": 2717},
# {"product": "先进节点制造设备", "category": "集成电路制造", "chain_id": 317589},
# {"product": "EDA及IP服务", "category": "设计辅助", "chain_id": 2515},
# {"product": "MPW服务", "category": "多项目晶圆流片", "chain_id": 2514},
# {"product": "芯片设计验证", "category": "设计验证", "chain_id": 513738},
# {"product": "过程工艺检测", "category": "制程检测", "chain_id": 513740}
# ]
# # 提取所有 chain_id并去重
# chain_ids = set()
# for item in industry_list:
# # 如果 chain_id 是字符串包含多个编号,用逗号或斜杠拆分
# if isinstance(item["chain_id"], str):
# for cid in item["chain_id"].replace("/", ",").split(","):
# chain_ids.add(cid.strip())
# else:
# chain_ids.add(str(item["chain_id"]))
# print(list(chain_ids))
# fill g_bom
# 结点属性值 相当于 图上点的 原始 产品名称
# bom_nodes = pd.read_csv('../input_data/input_product_data/BomNodes.csv')
# bom_nodes['Code'] = bom_nodes['Code'].astype(str)
# bom_nodes.set_index('Index', inplace=True)
#
# bom_cate_net = pd.read_csv('../input_data/input_product_data/合成结点.csv')
# g_bom = nx.from_pandas_edgelist(bom_cate_net, source='UPID', target='ID', create_using=nx.MultiDiGraph())
# # 填充每一个结点 的具体内容 通过 相同的 code 并且通过BomNodes.loc[code].to_dict()字典化 格式类似 格式 { code0 : {level: 0 ,name: 工业互联网 }}
# bom_labels_dict = {}
# for index in g_bom.nodes:
# try:
# bom_labels_dict[index] = bom_nodes.loc[index].to_dict()
# # print(bom_labels_dict[index])
# except KeyError:
# print(f"节点 {index} 不存在于 bom_nodes 中")
# # 分配属性 给每一个结点 获得类似 格式:{1: {'label': 'A', 'value': 10},
# nx.set_node_attributes(g_bom, bom_labels_dict)
# # 改为json 格式
# g_product_js = json.dumps(nx.adjacency_data(g_bom))
# # 假设 g_bom 是你的 NetworkX 图
# g_product_data = nx.adjacency_data(g_bom)
#
# # 保存为 pkl 文件
# with open("g_bom.pkl", "wb") as f:
# pickle.dump(g_product_data, f)
#
# print("✅ 图数据已保存为 g_bom.pkl")