salary02/anova.py

83 lines
3.4 KiB
Python

import numpy as np
import pandas as pd
from scipy.stats import f
def do_print(lst_value, str_col):
str_data = '\t'.join([str(round(e, 4 if 'P value' in str_col else 3)) for e in lst_value])
print(f'{str_col}\t{str_data}')
def anova(lst_col_seg, n_level, oa_file, result_file):
df_oa = pd.read_fwf("oa25.txt", header=None, widths=[1]*6)
df_res = pd.read_excel(result_file, sheet_name=0, engine='openpyxl', index_col=0)
assert df_res.shape[1] == sum(lst_col_seg), 'the column number is wrong'
assert df_oa.shape[1] == lst_col_seg[0] + lst_col_seg[1], 'the column number is wrong'
lst_head = [f"{idx+1}_{ind_name}" for idx, ind_name in enumerate(df_res.columns)]
n_col_input = lst_col_seg[0] + lst_col_seg[1]
n_exp_row = df_res.shape[0]
n_degree_error = n_exp_row - 1 - (n_level - 1) * lst_col_seg[0]
df_output = df_res.iloc[:, n_col_input:]
print("Source\tSource\t" + '\t'.join(lst_head[:lst_col_seg[0]]) + "\te")
print("DOF\tDOF\t" + '\t'.join([str(n_level-1)] * lst_col_seg[0]) + f"\t{n_degree_error}")
lst_report = []
for idx_col in range(lst_col_seg[2]):
str_ind_name = lst_head[idx_col+n_col_input]
df_y_col = df_output.iloc[:, idx_col] # the y column
# sum_y_2 = np.sum(np.power(df_y_col, 2)) # sum(y^2)
df_y_col_repeated = np.tile(df_y_col, (n_col_input, 1)).T # repeat the y column
big_t = df_y_col.sum() # the big T
# generate T1, ..., T(n_levels)
lst_2d_big_t = []
for level in range(n_level):
arr_big_t = np.sum(df_y_col_repeated * np.where(df_oa == level, 1, 0), axis=0)
lst_2d_big_t.append(arr_big_t.tolist())
arr_big_t_2 = np.power(np.array(lst_2d_big_t), 2)
arr_s = np.sum(arr_big_t_2, axis=0) / (n_exp_row / n_level) - big_t * big_t / n_exp_row
assert arr_s.size == n_col_input, 'wrong arr_s size'
# sum_s = np.sum(arr_s)
# so far, the first table is computed. Now, compute the second table
df_s = pd.DataFrame(arr_s.reshape((1, n_col_input)), columns=lst_head[:n_col_input])
do_print(arr_s.tolist(), f'{str_ind_name}\tS')
df_s_non_error = df_s.iloc[:, :lst_col_seg[0]] / (n_level - 1)
ms_of_error = df_s.iloc[:, lst_col_seg[0]:].sum().sum() / n_degree_error
do_print(df_s_non_error.values.tolist()[0] + [ms_of_error], f'{str_ind_name}\tMS')
arr_f = df_s_non_error / ms_of_error
do_print(arr_f.values.tolist()[0], f'{str_ind_name}\tF ratio')
# alpha = 0.05
alpha = 0.1
arr_p_value = f.sf(arr_f, n_level - 1, n_degree_error)
do_print(arr_p_value.tolist()[0], f'{str_ind_name}\tP value')
lst_sig = [c for c, p in zip(lst_head[:lst_col_seg[0]], arr_p_value[0].tolist()) if p < alpha]
# print(df_output.columns[idx_col], arr_p_value, lst_sig)
if len(lst_sig) > 0:
lst_report.append(f"For indicator {str_ind_name}, the sig factors are {lst_sig}")
for s in lst_report:
print(s)
if __name__ == '__main__':
# first test
the_lst_col_seg = [5, 1, 6] # 11 factors, 2 for error, and 6 indicators
the_n_level = 5
anova(the_lst_col_seg, the_n_level, "oa25.txt", "result/experiment_result_25.xlsx")
# second test
# the_lst_col_seg = [3, 1, 1] # 11 factors, 2 for error, and 6 indicators
# the_n_level = 3
# anova(the_lst_col_seg, the_n_level, "oaL9-3-4.csv", "result.xlsx")