From 21e53a5164aa84ed7ece5116ecfe76e2a12603dc Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Sat, 30 Aug 2025 17:08:15 +0900 Subject: [PATCH] Refactor logging system: streamline console output, enhance message filtering, and replace progress logger with direct print function for better clarity. --- ds_new_user_analy.py | 100 ++++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 46f56b7..80d938f 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -17,6 +17,7 @@ OpenSearch 매핑 오류 수정: nested → object 쿼리 변경 """ import os +import sys import csv import json import time @@ -93,37 +94,34 @@ def setup_logging(log_file_path: str) -> logging.Logger: class ConsoleFormatter(logging.Formatter): """Console-optimized formatter - 간결한 콘솔 출력""" def format(self, record): - # 콘솔에는 시간 없이 중요 메시지만 표시 - if record.levelno >= logging.WARNING: # WARNING 이상은 상세 표시 + # WARNING 이상은 상세 표시 + if record.levelno >= logging.WARNING: return f"[{record.levelname}] {record.getMessage()}" - elif record.name == 'progress': # 진행률 추적용 특별 로거 - return record.getMessage() # 시간 없이 출력 - else: - # 일반 INFO 로그는 상당히 간결하게 - msg = record.getMessage() - # 주요 정보만 콘솔에 표시 - if any(keyword in msg for keyword in [ - "던전 스토커즈 신규 유저 분석", - "분석 기간", - "세션 지표 포함", - "총 신규 유저", - "분석 완료", - "분석 요약", - "리텐션 퍼널", - "D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7", - "총 소요 시간", - "Retained_d", - "평균 활동 시간", - "="*20 # 구분선 - ]): - return msg - else: - return None - - def formatMessage(self, record): - formatted = self.format(record) - return formatted if formatted is not None else "" + + # 일반 INFO 로그는 중요한 것만 표시 + msg = record.getMessage() + important_keywords = [ + "던전 스토커즈 신규 유저 분석", + "분석 기간", + "세션 지표 포함", + "총 신규 유저", + "분석 완료", + "분석 요약", + "리텐션 퍼널", + "D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7", + "총 소요 시간", + "Retained_d", + "평균 활동 시간", + "="*20 # 구분선 + ] + + if any(keyword in msg for keyword in important_keywords): + return msg + + # 중요하지 않은 로그는 빈 문자열 반환 (출력 안함) + return "" + # 기본 로거 설정 logger = logging.getLogger('NewUserAnalyzer') logger.setLevel(logging.INFO) @@ -141,8 +139,8 @@ def setup_logging(log_file_path: str) -> logging.Logger: console_handler = logging.StreamHandler() console_formatter = ConsoleFormatter() console_handler.setFormatter(console_formatter) - # 콘솔용 필터 추가 - console_handler.addFilter(lambda record: console_formatter.format(record) is not None) + # 빈 문자열은 출력하지 않도록 필터링 + console_handler.addFilter(lambda record: console_formatter.format(record) != "") logger.addHandler(console_handler) # 외부 라이브러리 로그 억제 @@ -152,6 +150,15 @@ def setup_logging(log_file_path: str) -> logging.Logger: return logger +def print_progress(message: str, end_with_newline: bool = False): + """진행률을 제자리에서 갱신하여 출력""" + if end_with_newline: + print(f"\r{message}") + sys.stdout.flush() + else: + print(f"\r{message}", end="", flush=True) + + # ============================================================================== # 3. OpenSearch 연결 및 유틸리티 # ============================================================================== @@ -880,7 +887,6 @@ def get_new_user_cohort_optimized( new_user_map = {} # uid -> {'create_time': ...} page_count = 0 total_pages_estimate = 0 - progress_logger = logging.getLogger('progress') while True: page_count += 1 @@ -891,9 +897,9 @@ def get_new_user_cohort_optimized( try: # 진행률 표시 (제자리 갱신) if page_count == 1: - progress_logger.info("\r코호트 선정: 페이지 1 처리 중...") + print_progress("코호트 선정: 페이지 1 처리 중...") else: - progress_logger.info(f"\r코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)") + print_progress(f"코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)") logger.info(f"create_uid 페이지 {page_count} 처리 중...") response = exponential_backoff_retry( @@ -920,7 +926,7 @@ def get_new_user_cohort_optimized( logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨") # 콘솔에는 간결하게 - progress_logger.info(f"\r코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)") + print_progress(f"코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)") after_key = response["aggregations"]["new_users"].get("after_key") if not after_key: @@ -931,7 +937,7 @@ def get_new_user_cohort_optimized( break # 최종 진행률 표시 - progress_logger.info(f"\r코호트 선정 완료: 총 {len(new_user_map)}명 확인 \n") + print_progress(f"코호트 선정 완료: 총 {len(new_user_map)}명 확인", end_with_newline=True) logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") # Step 2: 모든 create_uid 유저를 cohort에 추가 @@ -957,7 +963,7 @@ def get_new_user_cohort_optimized( } total_users += 1 - progress_logger.info(f"\r코호트 초기화 완료: {total_users}명 ") + print_progress(f"코호트 초기화 완료: {total_users}명") logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) @@ -1038,7 +1044,7 @@ def get_new_user_cohort_optimized( except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") - progress_logger.info(f"\rlogin_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료 ") + print_progress(f"login_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료") logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) @@ -1117,7 +1123,7 @@ def get_new_user_cohort_optimized( except Exception as e: logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") - progress_logger.info(f"\rlobby 추가 정보 수집: {len(lobby_collected)}명 완료 ") + print_progress(f"lobby 추가 정보 수집: {len(lobby_collected)}명 완료") logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") # 최종 통계 @@ -1395,7 +1401,6 @@ def process_fixed_batch( """수정된 배치 처리 함수""" # 진행률 추적용 로거 - progress_logger = logging.getLogger('progress') try: # 1. 세션 지표 계산 (--full 옵션일 때만) @@ -1416,7 +1421,7 @@ def process_fixed_batch( body_ndjson = "\n".join(msearch_queries) + "\n" # 콘솔에는 간단히, 파일에는 상세히 - progress_logger.info(f"\r배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리") + print_progress(f"배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리") logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리") msearch_responses = exponential_backoff_retry( @@ -1612,7 +1617,7 @@ def process_fixed_batch( except Exception as e: logger.warning(f"UID '{uid}' 처리 중 오류: {e}") - progress_logger.info(f"\r배치 완료: {len(batch_results)}명 ") + print_progress(f"배치 완료: {len(batch_results)}명") logger.info(f"배치 처리 완료: {len(batch_results)}명 성공") return batch_results @@ -1643,7 +1648,6 @@ def process_cohort_fixed_parallel( logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}") # 진행률 추적용 로거 - progress_logger = logging.getLogger('progress') uid_list = list(cohort.keys()) chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] @@ -1670,30 +1674,30 @@ def process_cohort_fixed_parallel( completed_chunks += 1 # 콘솔 진행률 업데이트 - progress_logger.info(f"\r병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") + print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") except Exception as e: logger.warning(f"배치 처리 실패: {e}") failed_chunks.append(chunk) completed_chunks += 1 - progress_logger.info(f"\r병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") + print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") finally: pbar.update(1) # 실패한 청크 재처리 if failed_chunks: logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") - progress_logger.info(f"\r재처리 중: {len(failed_chunks)}개 배치...") + print_progress(f"재처리 중: {len(failed_chunks)}개 배치...") for i, chunk in enumerate(failed_chunks): try: batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) all_results.extend(batch_results) - progress_logger.info(f"\r재처리 중: {i+1}/{len(failed_chunks)} 완료") + print_progress(f"재처리 중: {i+1}/{len(failed_chunks)} 완료") except Exception as e: logger.error(f"재처리 실패: {e}") # 최종 결과 - progress_logger.info(f"\r병렬 처리 완료: 총 {len(all_results)}명 성공 \\n") + print_progress(f"병렬 처리 완료: 총 {len(all_results)}명 성공", end_with_newline=True) logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") logger.info("=" * 80) return all_results