컴퓨터/Python
python 엑셀투파이썬 판다스 pivot_table 03countifsumif_chatgpt_skiprow.py xls xlsx xlsm 파일 읽고 저장
풍경소리^^
2024. 11. 4. 17:00
xls xlsx xlsm 파일 읽고 저장 pivot_table
import os
import pandas as pd
import xlrd # .xls 파일을 읽기 위해 사용
# import xlwt # .xls 파일을 쓰기 위해 사용
from xlutils.copy import copy # xlrd로 읽은 파일을 xlwt로 수정 가능하게 하기 위해 사용
import win32api
import time
from openpyxl.utils import column_index_from_string, get_column_letter
import xlwings as xw
start = time.time() # 실행경과시간 측정
# xls*****************************************************************************************************
# xls 파일 처리 함수 (데이터프레임으로 변환)
def process_dataframe_xls(file, sheet_name, col_start, col_end, skip_rows, col_title_list=None):
# pandas를 사용하여 해당 시트의 데이터를 DataFrame으로 가져오기
df = pd.read_excel(file, sheet_name=sheet_name,
header=skip_rows,
usecols=f"{col_start}:{col_end}",
names=col_title_list,
engine='xlrd')
# 첫 번째 공백 행 이후의 모든 행 제거
first_blank_index = df[df.isnull().all(axis=1)].index.min()
if pd.notna(first_blank_index): # 공백 행이 있는 경우
df = df.iloc[:first_blank_index]
return df
# Target 시트 업데이트 (xlwt 사용)
def update_target_pivot_sheet_xls(file, pivot_data, target_sheet, col_start, col_end, skip_rows):
# xlrd로 파일 열기
wb = xlrd.open_workbook(file, formatting_info=True) # 포맷 정보를 가져오기
ws = wb.sheet_by_name(target_sheet)
# 수정할 Workbook 생성
new_wb = copy(wb) # 기존 워크북 복사
new_ws = new_wb.get_sheet(target_sheet) # 첫 번째 시트 가져오기
# 데이터 지우기
for column_letter in range(column_index_from_string(col_start), column_index_from_string(col_end) + 1):
for row in range(skip_rows + 1, ws.nrows): # START_ROW부터 끝까지 반복 0,1,2,3,4
new_ws.write(row, column_letter - 1, None) # 값을 빈 문자열로 설정하여 지우기
# 피벗 데이터프레임을 리스트로 변환
data_rows = pivot_data.reset_index().values.tolist()
# 열 제목을 추가
column_titles = pivot_data.reset_index().columns.tolist()
# 수정된 파일 저장
new_wb.save(file)
# 새로운 데이터 추가
# 열 제목 쓰기
for j, title in enumerate(column_titles):
new_ws.write(skip_rows, column_index_from_string(col_start) + j-1, title) # 열 제목 쓰기
# 데이터 쓰기
for i, row in enumerate(data_rows):
for j, value in enumerate(row):
new_ws.write(skip_rows + 1 + i, column_index_from_string(col_start) + j-1, value)
# new_ws.write(start_row + i, start_col + j, value)
# 수정된 파일 저장
new_wb.save(file)
# xlsx*************************************************************************************************
import openpyxl
# 특정 시트에서 제목 찾고 skip row 계산
# def FN_skip_row_count_xlsx(file_path, sheet_name, col_start, find_title):
# wb = openpyxl.load_workbook(file_path)
# ws = wb[sheet_name]
# for count_skiprow_name, row in enumerate(ws.iter_rows(min_col=column_index_from_string(col_start),
# max_col=column_index_from_string(col_start))):
# if row[0].value == find_title:
# break
# return count_skiprow_name
# 데이터 행 수 계산
# def calculate_data_row_count(ws, col_start, skip_rows):
# max_row = skip_rows
# for cell in ws[col_start][skip_rows + 1:]:
# if cell.value is not None:
# max_row += 1
# else:
# break
# return max_row - skip_rows
# df1과 df2 데이터를 처리하는 함수
def process_dataframe_xlsx(file_path, sheet_name, col_start, col_end, skip_rows, col_title_list=None):#######################################
# file_ext = os.path.splitext(file_path)[1]
# skip_rows = FN_skip_row_count_xlsx(file_path, sheet_name, col_start, find_title)
# wb = load_workbook_by_ext(file_path, file_ext)
wb = openpyxl.load_workbook(file_path)
ws = wb[sheet_name]
# data_row_count_xlsx = calculate_data_row_count(ws, col_start, skip_rows)
df = pd.read_excel(file_path, sheet_name=sheet_name,
header=skip_rows,
usecols=f"{col_start}:{col_end}",
names=col_title_list,
engine='openpyxl')
# 첫 번째 공백 행 이후의 모든 행 제거
first_blank_index = df[df.isnull().all(axis=1)].index.min()
if pd.notna(first_blank_index): # 공백 행이 있는 경우
df = df.iloc[:first_blank_index]
return df
# Target 시트 업데이트
def update_target_pivot_sheet_xlsx(file_path, pdf, target_sheet, col_start, col_end, skip_rows): ##################################여기수정요망
wb = openpyxl.load_workbook(file_path)
ws = wb[target_sheet]
# 병합된 셀 해제
# merged_cells = list(ws.merged_cells.ranges)
# for merged_cell in merged_cells:
# # print(merged_cell)
# ws.unmerge_cells(str(merged_cell))
# 기존 데이터를 제거
for col_letter in range(column_index_from_string(col_start), column_index_from_string(col_end) + 1):
column_cells = ws[get_column_letter(col_letter)][skip_rows + 1:]
for cell in column_cells:
if cell.value is not None:
cell.value = None
# # 새로운 데이터 추가
wb.save(file_path)
wb.close()
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl', if_sheet_exists='overlay') as writer:
pdf.to_excel(writer, sheet_name=target_sheet, index=True, header=True, #여기수정요망 pd1 index=True header=True
startcol=column_index_from_string(col_start) - 1,
startrow=skip_rows)
# xlsm******************************************************************************************************
# import xlwings as xw # xlwings를 사용하여 .xlsm 파일 처리
# from openpyxl.utils.cell import column_index_from_string, get_column_letter
# # 특정 시트에서 제목 찾고 skip row 계산
# # def FN_skip_row_count_xlsm(ws, col_start, find_title):
# # max_row = ws.used_range.last_cell.row # 최대 행 수 가져오기
# # for count_skiprow_name, row in enumerate(ws.range(f"{col_start}1:{col_start}{max_row}").value):
# # if row == find_title:
# # break
# # return count_skiprow_name
# # 데이터 행 수 계산
# def calculate_data_row_count(ws, col_start, skip_rows):
# max_row = ws.used_range.last_cell.row # 최대 행 수 가져오기
# # 데이터가 있는 셀만 확인
# cells = ws.range(f"{col_start}{skip_rows + 2}:{col_start}{max_row}").value
# actual_data_count = sum(1 for cell in cells if cell is not None)
# return actual_data_count
# # df1과 df2 데이터를 처리하는 함수
def process_dataframe_xlsm(file, sheet_name, col_start, col_end, skip_rows, col_title_list=None):
wb = xw.Book(file)
ws = wb.sheets[sheet_name]
# # skip_rows 계산
# # skip_rows = FN_skip_row_count_xlsm(ws, col_start, find_title)
# pandas를 사용하여 해당 시트의 데이터를 DataFrame으로 가져오기
df = pd.read_excel(wb.fullname, sheet_name=sheet_name,
header=skip_rows,
usecols=f"{col_start}:{col_end}",
names=col_title_list,
engine='openpyxl')
# 첫 번째 공백 행 이후의 모든 행 제거
first_blank_index = df[df.isnull().all(axis=1)].index.min()
if pd.notna(first_blank_index): # 공백 행이 있는 경우
df = df.iloc[:first_blank_index]
return df
# Target 시트 업데이트
def update_target_pivot_sheet_xlsm(file, pdf, target_sheet, col_start, col_end, skip_rows):
wb = xw.Book(file)
ws = wb.sheets[target_sheet]
# Target 시트에서 제목 찾고 skip row 계산
# skip_rows = FN_skip_row_count_xlsm(ws, col_start, find_title)
# 병합된 셀 해제
# merged_cells = list(ws.merged_cells.ranges)
# for merged_cell in merged_cells:
# # print(merged_cell)
# ws.unmerge_cells(str(merged_cell))
# 기존 데이터를 제거
for col_letter in range(column_index_from_string(col_start), column_index_from_string(col_end) + 1):
ws.range(f"{get_column_letter(col_letter)}{skip_rows + 2}:{get_column_letter(col_letter)}{ws.used_range.last_cell.row}").clear_contents()
# 인덱스와 열 이름 기록
start_row = skip_rows
start_col = column_index_from_string(col_start)
# # 데이터 기록
ws.range((start_row + 1, start_col)).options(pd.DataFrame, header=True, index=True).value = pdf
# 변경 사항 저장
wb.save()
# 메인 실행 부분
def main():
file_ext = os.path.splitext(FILE)[1][1:]
if file_ext == "xls":
# df1 처리
# df1 = process_dataframe(FILE, "Sheet1", "A", "B", "이름")
df1 = process_dataframe_xls(FILE, SOURCE_SH_1, COL_START_DF1, COL_END_DF1, SKIP_ROWS_DF1)
# df2 처리
# df2 = process_dataframe(FILE, "Sheet1", "F", "G", "제품", ['제품', '가격'])
df2 = process_dataframe_xls(FILE, SOURCE_SH_2, COL_START_DF2, COL_END_DF2, SKIP_ROWS_DF2, COL_TITLE_LIST_DF2)
# df1과 df2 결합
df3 = df1.merge(df2, on= COL_STANDARD_NAME_DF3, how= JOIN_HOW_DF3)
# pivot_table
pdf1 = df3.pivot_table(PIVOT1_VALUE, index=PIVOT1_INDEX, columns=None, aggfunc=PIVOT1_AGGFUNC)
pdf2 = df3.pivot_table(PIVOT2_VALUE, index=PIVOT2_INDEX, aggfunc=PIVOT2_AGGFUNC)
# print(pdf2)
# Target 시트 업데이트
update_target_pivot_sheet_xls(FILE, pdf1, SOURCE_SH_TARGET1, COL_START_TARGET1, COL_END_TARGET1, SKIP_ROWS_TARGET1)
update_target_pivot_sheet_xls(FILE, pdf2, SOURCE_SH_TARGET2, COL_START_TARGET2, COL_END_TARGET2, SKIP_ROWS_TARGET2)
# 시간측정
# end = time.time()
# print("xls 경과시간:"f"{end - start:.5f} sec")
elif file_ext == "xlsx":
# df1 처리
# df1 = process_dataframe(FILE, "Sheet1", "A", "B", "이름")
df1 = process_dataframe_xlsx(FILE, SOURCE_SH_1, COL_START_DF1, COL_END_DF1, SKIP_ROWS_DF1)
# df2 처리
# df2 = process_dataframe(FILE, "Sheet1", "F", "G", "제품", ['제품', '가격'])
df2 = process_dataframe_xlsx(FILE, SOURCE_SH_2, COL_START_DF2, COL_END_DF2, SKIP_ROWS_DF2, COL_TITLE_LIST_DF2)
# df1과 df2 결합
df3 = df1.merge(df2, on= COL_STANDARD_NAME_DF3, how= JOIN_HOW_DF3)
# pivot_table
# df1.pivot_table("수량", index="업체", columns="메뉴", aggfunc="sum")
# pdf1 = df3.pivot_table("수량", index=["업체","메뉴"], aggfunc="count")
pdf1 = df3.pivot_table(PIVOT1_VALUE, index=PIVOT1_INDEX, columns=None, aggfunc=PIVOT1_AGGFUNC)
# pdf2 = df3.pivot_table("가격", index="업체", aggfunc="sum")
pdf2 = df3.pivot_table(PIVOT2_VALUE, index=PIVOT2_INDEX, aggfunc=PIVOT2_AGGFUNC)
# Target 시트 업데이트
update_target_pivot_sheet_xlsx(FILE, pdf1, SOURCE_SH_TARGET1, COL_START_TARGET1, COL_END_TARGET1, SKIP_ROWS_TARGET1) #########################
update_target_pivot_sheet_xlsx(FILE, pdf2, SOURCE_SH_TARGET2, COL_START_TARGET2, COL_END_TARGET2, SKIP_ROWS_TARGET2) #########################
# 시간측정
# end = time.time()
# print("xlsx 경과시간:"f"{end - start:.5f} sec")
elif file_ext == "xlsm":
app = xw.App(visible=False)
try:
# Process xlsx and xlsm files
df1 = process_dataframe_xlsm(FILE, SOURCE_SH_1, COL_START_DF1, COL_END_DF1, SKIP_ROWS_DF1)
df2 = process_dataframe_xlsm(FILE, SOURCE_SH_2, COL_START_DF2, COL_END_DF2, SKIP_ROWS_DF2, COL_TITLE_LIST_DF2)
df3 = df1.merge(df2, on=COL_STANDARD_NAME_DF3, how=JOIN_HOW_DF3)
pdf1 = df3.pivot_table(PIVOT1_VALUE, index=PIVOT1_INDEX, aggfunc=PIVOT1_AGGFUNC)
pdf2 = df3.pivot_table(PIVOT2_VALUE, index=PIVOT2_INDEX, aggfunc=PIVOT2_AGGFUNC)
update_target_pivot_sheet_xlsm(FILE, pdf1, SOURCE_SH_TARGET1, COL_START_TARGET1, COL_END_TARGET1, SKIP_ROWS_TARGET1)
update_target_pivot_sheet_xlsm(FILE, pdf2, SOURCE_SH_TARGET2, COL_START_TARGET2, COL_END_TARGET2, SKIP_ROWS_TARGET2)
finally:
# Excel 애플리케이션을 종료
# wb.close()
app.quit()
# 시간측정
end = time.time()
print(f"{file_ext} 경과시간: {end - start:.5f} sec")
# 완료 메시지
win32api.MessageBox(0, "작업을 완료했습니다.", "작업 완료", 0)
if __name__ == "__main__":
# FILE = "03countifsumif_chatgpt.xls" # .xls 파일 경로
# FILE = "03countifsumif_chatgpt.xlsx" # .xlsx 파일 경로
FILE = "03countifsumif_chatgpt.xlsm" # .xlsm 파일 경로
# df1 설정 -----------------------------------------------------
SOURCE_SH_1 = "Sheet1" # Sheet1
COL_START_DF1 = "B" # 데이터 시작열 # Sheet1
COL_END_DF1 = "E" # 데이터 종료열 # Sheet1
SKIP_ROWS_DF1 = 3 # 열제목으로 skiprow 계산 - "이름"# Sheet1
# SOURCE_SH_1 = "Sheet2" # Sheet2
# COL_START_DF1 = "A" # 데이터 시작열 # Sheet2
# COL_END_DF1 = "D" # 데이터 종료열 # Sheet2
# FIND_TITLE_DF1 = "이름" # 열제목으로 skiprow 계산 - "이름"# Sheet2
# df2 설정 -----------------------------------------------------
SOURCE_SH_2 = "Sheet1" # Sheet1
COL_START_DF2 = "H" # Sheet1
COL_END_DF2 = "J" # Sheet1
SKIP_ROWS_DF2 = 3 # Sheet1
COL_TITLE_LIST_DF2 = ['업체', '메뉴', '가격'] # Sheet1
# SOURCE_SH_2 = "Sheet3" # Sheet3
# COL_START_DF2 = "A" # Sheet3
# COL_END_DF2 = "C" # Sheet3
# FIND_TITLE_DF2 = "업체" # Sheet3
# COL_TITLE_LIST_DF2 = ['업체', '메뉴', '가격'] # Sheet3
# df3 설정 join ------------------------------------------------
COL_STANDARD_NAME_DF3 = ["업체", "메뉴"]
JOIN_HOW_DF3 = "left"
# pdf1 설정 ----------------------------------------------------
PIVOT1_VALUE ="수량"
PIVOT1_INDEX=["업체","메뉴"]
PIVOT1_AGGFUNC="count"
# pdf2 설정 ----------------------------------------------------
PIVOT2_VALUE ="가격"
PIVOT2_INDEX ="업체"
PIVOT2_AGGFUNC ="sum"
# target 설정 --------------------------------------------------
SOURCE_SH_TARGET1 = "Sheet1"
COL_START_TARGET1 = "R"
COL_END_TARGET1 = "T"
SKIP_ROWS_TARGET1 = 13
# target 설정 --------------------------------------------------
SOURCE_SH_TARGET2 = "Sheet1"
COL_START_TARGET2 = "R"
COL_END_TARGET2 = "S"
SKIP_ROWS_TARGET2 = 23
##############################################################################
main()
xls 경과시간: 0.12919 sec
xlsx 경과시간: 0.61403 sec
xlsm 경과시간: 1.03821 sec
03countifsumif_chatgpt.xls
0.06MB
03countifsumif_chatgpt.xlsm
0.03MB
03countifsumif_chatgpt.xlsx
0.02MB
03countifsumif_chatgpt_skiprow.py
0.02MB