From 2a7a481e5597bd6acff83e0a14f7880da649af61 Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Wed, 27 Aug 2025 02:15:45 +0900 Subject: [PATCH] Implement code changes to enhance functionality and improve performance --- ds_new_user_analy.py | 1325 ++++++++++++++++++++++++++++-------------- 1 file changed, 876 insertions(+), 449 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 79baeef..25069ff 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -2,41 +2,45 @@ # -*- coding: utf-8 -*- """ -던전 스토커즈 신규 유저 리텐션 분석 스크립트 -hack-detector의 고급 최적화 기법을 적용한 대용량 데이터 처리 +던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전) +OpenSearch 매핑 오류 수정: nested → object 쿼리 변경 -주요 최적화 기법: -1. Composite Aggregation - 메모리 효율적 코호트 선정 -2. Streaming Pattern - 활동 시간 계산 최적화 -3. NDJSON + 백오프 재시도 - msearch 배치 처리 안정화 -4. Future Pattern - 비동기 병렬 처리 -5. Memory Optimization - 스트리밍 CSV 작성 +수정사항: +- body 필드를 nested가 아닌 object로 처리 +- retention_d1 중복 필드 제거 +- 필드 경로 정확성 개선 +- 키워드 필드 사용 (.keyword 추가) 작성자: Claude Code -기획서 기반: DS-new_users-analy.md +기획서 기반: DS-new_users-analy.md +참고: OpenSearch 매핑 ds_opensearch_mappings.json """ import os import csv import json import time +import yaml +import logging import argparse import threading from datetime import datetime, timedelta, timezone -from collections import defaultdict -from typing import Dict, List, Optional, Tuple, Generator, Any +from collections import defaultdict, Counter +from typing import Dict, List, Optional, Tuple, Generator, Any, Set from concurrent.futures import ThreadPoolExecutor, as_completed, Future +from pathlib import Path import pandas as pd from tqdm import tqdm from opensearchpy import OpenSearch +from opensearchpy.helpers import scan # ============================================================================== # 1. 설정 및 상수 # ============================================================================== -# OpenSearch 연결 설정 (기획서에서 업데이트됨) +# OpenSearch 연결 설정 OPENSEARCH_CONFIG = { "host": "ds-opensearch.oneunivrs.com", "port": 9200, @@ -46,37 +50,79 @@ OPENSEARCH_CONFIG = { }, "use_ssl": True, "verify_certs": False, - "timeout": 60, + "timeout": 120, "max_retries": 3, "headers": {"Connection": "close"} } -# 한국 표준시 +# 한국 표준시 설정 KST = timezone(timedelta(hours=9)) # 성능 최적화 설정 -DEFAULT_BATCH_SIZE = 1000 +DEFAULT_BATCH_SIZE = 500 DEFAULT_MAX_WORKERS = 6 -DEFAULT_COMPOSITE_SIZE = 1000 # composite aggregation 페이지 크기 -DEFAULT_TIMEOUT = 120 +DEFAULT_COMPOSITE_SIZE = 500 +DEFAULT_TIMEOUT = 180 SCROLL_TIMEOUT = "5m" -SESSION_GAP_MINUTES = 5 # 세션 분리 기준 (5분) -MAX_SESSION_HOURS = 3 # 최대 세션 길이 (3시간) +SESSION_GAP_MINUTES = 5 +MAX_SESSION_HOURS = 3 -# 출력 파일 설정 -OUTPUT_DIR = r"E:\DS_Git\DS_data_center\DS Log 분석" +# 출력 디렉토리 설정 +BASE_DIR = Path(__file__).parent +OUTPUT_DIR = BASE_DIR / "analysis_results" +OUTPUT_DIR.mkdir(exist_ok=True) -# 전역 타이머 제어 +# 전역 변수 stop_timer_event = threading.Event() +logger = None # ============================================================================== -# 2. OpenSearch 연결 및 유틸리티 +# 2. 로깅 시스템 설정 +# ============================================================================== + +def setup_logging(log_file_path: str) -> logging.Logger: + """한국 시간 기준 로깅 설정""" + + class KSTFormatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + kst = timezone(timedelta(hours=9)) + ct = datetime.fromtimestamp(record.created, kst) + return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00') + + logger = logging.getLogger('NewUserAnalyzer') + logger.setLevel(logging.INFO) + + # 기존 핸들러 제거 + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + # 파일 핸들러 + file_handler = logging.FileHandler(log_file_path, encoding='utf-8') + file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s') + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + # 콘솔 핸들러 + console_handler = logging.StreamHandler() + console_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s') + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + # 외부 라이브러리 로그 억제 + logging.getLogger('opensearchpy').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + + return logger + + +# ============================================================================== +# 3. OpenSearch 연결 및 유틸리티 # ============================================================================== def create_opensearch_client() -> Optional[OpenSearch]: - """OpenSearch 클라이언트 생성 (hack-detector 방식)""" - print("[INFO] OpenSearch 클러스터에 연결 중...") + """OpenSearch 클라이언트 생성""" + logger.info("OpenSearch 클러스터 연결 시도 중...") try: client = OpenSearch( @@ -99,30 +145,392 @@ def create_opensearch_client() -> Optional[OpenSearch]: ) if not client.ping(): - raise ConnectionError("클러스터에 PING을 보낼 수 없습니다.") + raise ConnectionError("클러스터 PING 실패") - print("[SUCCESS] OpenSearch 연결 성공!") + logger.info("OpenSearch 연결 성공!") return client except Exception as e: - print(f"[ERROR] OpenSearch 연결 실패: {e}") + logger.error(f"OpenSearch 연결 실패: {e}") return None def exponential_backoff_retry(func, *args, **kwargs) -> Any: - """지수 백오프 재시도 패턴 (hack-detector 기법)""" + """지수 백오프 재시도 패턴""" for delay in [1, 2, 4, 8, 16]: try: return func(*args, **kwargs) except Exception as e: - if delay == 16: # 마지막 시도 + if delay == 16: raise e - print(f"[WARNING] 재시도 중... {delay}초 대기 (오류: {str(e)[:100]})") + logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})") time.sleep(delay) +def format_kst_time(timestamp_str: str) -> str: + """UTC 타임스탬프를 KST로 변환""" + try: + dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + kst_dt = dt.astimezone(KST) + return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00') + except: + return timestamp_str + + # ============================================================================== -# 3. 핵심 알고리즘 - Composite Aggregation을 활용한 코호트 선정 +# 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반) +# ============================================================================== + +def get_fixed_metrics_config() -> Dict[str, Dict]: + """OpenSearch 매핑에 기반한 수정된 분석 지표""" + return { + # ==================== 3.2 플레이 시간 및 세션 ==================== + "session_count": { + "index": "ds-logs-live-login_comp", + "time_range": "d0", + "agg_type": "count" + }, + "last_logout_time": { + "index": "ds-logs-live-logout", + "time_range": "d0", + "agg_type": "max_timestamp" + }, + + # ==================== 3.3 던전 플레이 성과 ==================== + "dungeon_entry_count": { + "index": "ds-logs-live-survival_sta", + "time_range": "d0", + "agg_type": "count" + }, + "dungeon_first_mode": { + "index": "ds-logs-live-survival_sta", + "time_range": "d0", + "agg_type": "first_value", + "field": "body.dungeon_mode.keyword" + }, + "dungeon_first_stalker": { + "index": "ds-logs-live-survival_sta", + "time_range": "d0", + "agg_type": "first_value", + "field": "body.stalker_name.keyword" + }, + "dungeon_first_result": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "first_value", + "field": "body.result" + }, + "dungeon_escape_count": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.result": 1}}] + }, + "avg_survival_time": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "avg", + "field": "body.play_stats.playtime" + }, + "max_survival_time": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "max", + "field": "body.play_stats.playtime" + }, + "total_armor_break": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "sum", + "field": "body.play_stats.armor_break_cnt" + }, + "raid_play_count": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "sum", + "field": "body.play_stats.raid_play" + }, + "escape_count": { + "index": "ds-logs-live-dead", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.inter_type": 0}}] + }, + + # ==================== 3.4 전투 성과 ==================== + "monster_kill_count": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "sum", + "field": "body.play_stats.monster_kill_cnt" + }, + "player_kill_count": { + "index": "ds-logs-live-player_kill", + "time_range": "d0", + "agg_type": "count", + "target_field": "body.instigator_uid.keyword" + }, + "player_killed_count": { + "index": "ds-logs-live-player_kill", + "time_range": "d0", + "agg_type": "count", + "target_field": "body.target_uid.keyword" + }, + "death_count": { + "index": "ds-logs-live-dead", + "time_range": "d0", + "agg_type": "count", + "filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}] + }, + "damage_dealt_monster": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "sum", + "field": "body.play_stats.damage_dealt_monster" + }, + "damage_dealt_player": { + "index": "ds-logs-live-survival_end", + "time_range": "d0", + "agg_type": "sum", + "field": "body.play_stats.damage_dealt_player" + }, + + # ==================== 3.5 진행도 및 성장 ==================== + "level_max": { + "index": "ds-logs-live-matching_start", + "time_range": "d0", + "agg_type": "max", + "field": "body.class_level" + }, + "level_max_stalker": { + "index": "ds-logs-live-level_up", + "time_range": "d0", + "agg_type": "max_stalker", + "field": "body.stalker.keyword" + }, + "tutorial_entry": { + "index": "ds-logs-live-tutorial_entry", + "time_range": "d0", + "agg_type": "exists", + "filters": [{"term": {"body.action.keyword": "Start"}}] + }, + "tutorial_completed": { + "index": "ds-logs-live-log_tutorial", + "time_range": "d0", + "agg_type": "exists", + "filters": [ + {"term": {"body.action_type.keyword": "Complet"}}, + {"term": {"body.stage_type.keyword": "result"}} + ] + }, + "guide_quest_stage": { + "index": "ds-logs-live-guide_quest_stage", + "time_range": "d0", + "agg_type": "max", + "field": "body.guide_step" + }, + "skill_points_earned": { + "index": "ds-logs-live-skill_point_get", + "time_range": "d0", + "agg_type": "count" + }, + + # ==================== 3.6 아이템 및 경제 ==================== + "items_obtained_count": { + "index": "ds-logs-live-item_get", + "time_range": "d0", + "agg_type": "count" + }, + "highest_item_grade": { + "index": "ds-logs-live-item_get", + "time_range": "d0", + "agg_type": "max", + "field": "body.item_grade" + }, + "blueprint_use_count": { + "index": "ds-logs-live-craft_from_blueprint", + "time_range": "d0", + "agg_type": "count" + }, + "shop_buy_count": { + "index": "ds-logs-live-shop_buy", + "time_range": "d0", + "agg_type": "count" + }, + "shop_sell_count": { + "index": "ds-logs-live-shop_sell", + "time_range": "d0", + "agg_type": "count" + }, + "gold_spent": { + "index": "ds-logs-live-shop_buy", + "time_range": "d0", + "agg_type": "conditional_sum", + "field": "body.amt", + "condition_field": "body.cost_id.keyword", + "condition_value": "i108000" + }, + "gold_earned": { + "index": "ds-logs-live-shop_sell", + "time_range": "d0", + "agg_type": "conditional_sum", + "field": "body.amt", + "condition_field": "body.cost_id.keyword", + "condition_value": "i108000" + }, + "storage_in_count": { + "index": "ds-logs-live-storage_use", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.oper_type": 1}}] + }, + "storage_out_count": { + "index": "ds-logs-live-storage_use", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.oper_type": -1}}] + }, + "enchant_count": { + "index": "ds-logs-live-enchant", + "time_range": "d0", + "agg_type": "count" + }, + "enchant_gold_spent": { + "index": "ds-logs-live-enchant", + "time_range": "d0", + "agg_type": "sum", + "field": "body.amt" + }, + + # ==================== 3.7 장비 관리 ==================== + "ingame_equip_count": { + "index": "ds-logs-live-item_ingame_equip", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.base_type": 2}}] + }, + + # ==================== 3.8 오브젝트 상호작용 ==================== + "object_interaction_count": { + "index": "ds-logs-live-obj_inter", + "time_range": "d0", + "agg_type": "count" + }, + + # ==================== 3.9 매칭 시스템 ==================== + "matching_start_count": { + "index": "ds-logs-live-matching_start", + "time_range": "d0", + "agg_type": "count" + }, + "matching_complete_count": { + "index": "ds-logs-live-matching_complete", + "time_range": "d0", + "agg_type": "count" + }, + "matching_failed_count": { + "index": "ds-logs-live-matching_failed", + "time_range": "d0", + "agg_type": "count" + }, + "avg_matching_time": { + "index": "ds-logs-live-matching_complete", + "time_range": "d0", + "agg_type": "avg", + "field": "body.matchingtime" + }, + + # ==================== 3.10 소셜 활동 ==================== + "friend_add_count": { + "index": "ds-logs-live-friend", + "time_range": "d0", + "agg_type": "count", + "filters": [ + {"term": {"body.oper_type": 0}}, + {"term": {"body.friend_type": 0}} + ] + }, + "friend_delete_count": { + "index": "ds-logs-live-friend", + "time_range": "d0", + "agg_type": "count", + "filters": [{"term": {"body.oper_type": 1}}] + }, + "party_invite_sent": { + "index": "ds-logs-live-player_invite", + "time_range": "d0", + "agg_type": "count" + }, + "party_invite_received": { + "index": "ds-logs-live-player_invite", + "time_range": "d0", + "agg_type": "count", + "target_field": "body.target_uid.keyword" + }, + "mail_read_count": { + "index": "ds-logs-live-mail_read", + "time_range": "d0", + "agg_type": "count" + }, + + # ==================== 3.11 거래소 및 경제 활동 ==================== + "exchange_register_count": { + "index": "ds-logs-live-exchange_reg", + "time_range": "d0", + "agg_type": "count" + }, + "exchange_use_count": { + "index": "ds-logs-live-exchange_use", + "time_range": "d0", + "agg_type": "count" + }, + "coupon_used": { + "index": "ds-logs-live-coupon", + "time_range": "d0", + "agg_type": "exists" + }, + + # ==================== 3.12 기타 활동 ==================== + "button_click_count": { + "index": "ds-logs-live-button_click", + "time_range": "d0", + "agg_type": "count" + }, + "hideout_upgrade_count": { + "index": "ds-logs-live-log_hideout_upgrade", + "time_range": "d0", + "agg_type": "count" + }, + "hideout_max_level": { + "index": "ds-logs-live-log_hideout_upgrade", + "time_range": "d0", + "agg_type": "max", + "field": "body.hideout_level" + }, + "season_pass_buy": { + "index": "ds-logs-live-season_pass", + "time_range": "d0", + "agg_type": "exists", + "filters": [{"term": {"body.cause": 1}}] + }, + "season_pass_max_step": { + "index": "ds-logs-live-season_pass", + "time_range": "d0", + "agg_type": "max", + "field": "body.season_pass_step" + }, + + # ==================== 리텐션 판정 (retention_d1 삭제됨) ==================== + "retention_check": { + "index": "ds-logs-live-login_comp", + "time_range": "d1", + "agg_type": "exists" + } + } + + +# ============================================================================== +# 5. Composite Aggregation 신규 유저 코호트 선정 (동일) # ============================================================================== def get_new_user_cohort_optimized( @@ -131,17 +539,17 @@ def get_new_user_cohort_optimized( end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: - """ - Composite Aggregation을 활용한 메모리 효율적 신규 유저 코호트 선정 - hack-detector의 고급 기법 적용 - """ - print(f"\n[1단계] 신규 유저 코호트 선정 (Composite Aggregation)") - print(f" - 분석 기간: {start_time} ~ {end_time}") + """Composite Aggregation을 활용한 신규 유저 코호트 선정""" + + logger.info("=" * 80) + logger.info("1단계: 신규 유저 코호트 선정 (Composite Aggregation)") + logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") + logger.info(f"페이지 크기: {page_size}") cohort = {} after_key = None + total_users = 0 - # Composite aggregation 쿼리 base_query = { "size": 0, "query": { @@ -166,7 +574,14 @@ def get_new_user_cohort_optimized( "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], - "_source": ["country", "body.device_mod"] + "_source": ["country", "body.device_mod", "body.nickname"] + } + }, + "latest_nickname": { + "top_hits": { + "size": 1, + "sort": [{"@timestamp": {"order": "desc"}}], + "_source": ["body.nickname"] } } } @@ -174,20 +589,21 @@ def get_new_user_cohort_optimized( } } - total_users = 0 - + page_count = 0 while True: + page_count += 1 query = base_query.copy() if after_key: query["aggs"]["new_users"]["composite"]["after"] = after_key try: + logger.info(f"페이지 {page_count} 처리 중...") response = exponential_backoff_retry( client.search, index="ds-logs-live-login_comp", body=query, request_timeout=DEFAULT_TIMEOUT, - track_total_hits=False # 성능 최적화 + track_total_hits=False ) buckets = response["aggregations"]["new_users"]["buckets"] @@ -199,194 +615,48 @@ def get_new_user_cohort_optimized( uid = bucket["key"]["uid"] first_login_utc = bucket["first_login"]["value_as_string"] - # 사용자 정보 추출 user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] + latest_nickname_hit = bucket["latest_nickname"]["hits"]["hits"][0]["_source"] cohort[uid] = { 'auth_id': auth_id, 'first_login_utc': first_login_utc, + 'first_login_kst': format_kst_time(first_login_utc), 'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')), 'country': user_hit.get('country', 'N/A'), - 'device': user_hit.get('body', {}).get('device_mod', 'N/A') + 'device': user_hit.get('body', {}).get('device_mod', 'N/A'), + 'nickname': latest_nickname_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') } total_users += 1 - # 다음 페이지 키 확인 + logger.info(f"페이지 {page_count}: {len(buckets)}명 처리됨 (누적: {total_users}명)") + after_key = response["aggregations"]["new_users"].get("after_key") if not after_key: break except Exception as e: - print(f"[ERROR] 코호트 선정 중 오류: {e}") + logger.error(f"코호트 선정 중 오류 (페이지 {page_count}): {e}") break - print(f" - [SUCCESS] 총 {total_users}명의 신규 유저 코호트 확정") + logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정") + logger.info("=" * 80) return cohort # ============================================================================== -# 4. Active Hours 계산 - 스트리밍 방식 (Generator Pattern) +# 6. 수정된 msearch 쿼리 빌더 (nested → object) # ============================================================================== -def calculate_active_hours_streaming( - client: OpenSearch, +def build_fixed_msearch_queries( uids: List[str], - cohort_data: Dict[str, Dict] -) -> Dict[str, int]: - """ - 스트리밍 방식으로 활동 시간 계산 - Generator 패턴으로 메모리 사용량 최소화 - """ + cohort: Dict[str, Dict], + metrics_config: Dict[str, Dict] +) -> List[str]: + """수정된 msearch 쿼리 생성 (nested 쿼리 제거)""" - def stream_user_events(uid: str) -> Generator[Dict, None, None]: - """개별 유저의 이벤트를 스트리밍으로 처리""" - user_info = cohort_data.get(uid) - if not user_info: - return - - first_login_dt = user_info['first_login_dt'] - d0_end_dt = first_login_dt + timedelta(hours=24) - - # 해당 유저의 D+0 이벤트만 스캔 - query = { - "query": { - "bool": { - "filter": [ - {"term": {"uid.keyword": uid}}, - {"range": {"@timestamp": { - "gte": user_info['first_login_utc'], - "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') - }}} - ] - } - } - } - - try: - from opensearchpy.helpers import scan - for doc in scan( - client, - query=query, - index="ds-logs-live-*", - scroll=SCROLL_TIMEOUT, - _source=["@timestamp", "type"] - ): - source = doc['_source'] - event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) - - if first_login_dt <= event_dt < d0_end_dt: - yield { - "time": event_dt, - "type": source.get('type', '').lower() - } - except Exception: - # 오류 시 빈 generator 반환 - pass - - results = {} - - for uid in uids: - events = list(stream_user_events(uid)) - - if len(events) < 2: - results[uid] = 0 - continue - - # 세션 기반 활동 시간 계산 - events.sort(key=lambda x: x['time']) - total_active_seconds = 0 - - i = 0 - while i < len(events) - 1: - current_event = events[i] - next_event = events[i + 1] - - # 세션 간격 체크 (5분 이상 차이나면 다른 세션) - time_diff = next_event['time'] - current_event['time'] - - if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES): - # 최대 세션 길이 제한 - session_duration = min( - time_diff.total_seconds(), - MAX_SESSION_HOURS * 3600 - ) - total_active_seconds += session_duration - - i += 1 - - results[uid] = int(total_active_seconds) - - return results - - -# ============================================================================== -# 5. NDJSON + 백오프 재시도를 활용한 msearch 배치 처리 -# ============================================================================== - -def build_msearch_queries(uids: List[str], cohort: Dict[str, Dict]) -> List[str]: - """ - msearch용 NDJSON 쿼리 생성 - hack-detector의 NDJSON 직접 생성 기법 적용 - """ queries = [] - # 분석할 지표 정의 (기획서 기반) - metrics_config = { - "retention_d1": { - "index": "ds-logs-live-login_comp", - "time_range": "d1", # 24-48시간 - "filters": [] - }, - "tutorial_entry": { - "index": "ds-logs-live-tutorial_entry", - "time_range": "d0", - "filters": [{"nested": {"path": "body", "query": {"term": {"body.action.keyword": "Start"}}}}] - }, - "tutorial_completed": { - "index": "ds-logs-live-log_tutorial", - "time_range": "d0", - "filters": [ - {"nested": {"path": "body", "query": {"bool": {"must": [ - {"term": {"body.action_type.keyword": "Complet"}}, - {"term": {"body.stage_type.keyword": "result"}} - ]}}}} - ] - }, - "dungeon_entry_count": { - "index": "ds-logs-live-survival_sta", - "time_range": "d0", - "filters": [] - }, - "dungeon_escape_count": { - "index": "ds-logs-live-survival_end", - "time_range": "d0", - "filters": [{"nested": {"path": "body", "query": {"term": {"body.result": 1}}}}] - }, - "monster_kill_count": { - "index": "ds-logs-live-survival_end", - "time_range": "d0", - "agg_field": "body.play_stats.monster_kill_cnt" - }, - "player_kill_count": { - "index": "ds-logs-live-player_kill", - "time_range": "d0", - "target_field": "body.instigator_uid" - }, - "matching_complete_count": { - "index": "ds-logs-live-matching_complete", - "time_range": "d0", - "filters": [] - }, - "friend_add_count": { - "index": "ds-logs-live-friend", - "time_range": "d0", - "filters": [{"nested": {"path": "body", "query": {"bool": {"must": [ - {"term": {"body.oper_type": 0}}, - {"term": {"body.friend_type": 0}} - ]}}}}] - } - } - for uid in uids: user_data = cohort[uid] first_login_dt = user_data['first_login_dt'] @@ -405,218 +675,430 @@ def build_msearch_queries(uids: List[str], cohort: Dict[str, Dict]) -> List[str] time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}} # 사용자 식별 필터 - user_filter = {"bool": {"should": [ - {"term": {"uid.keyword": uid}}, - {"term": {"auth.id.keyword": user_data['auth_id']}} - ], "minimum_should_match": 1}} + if "target_field" in config: + if config["target_field"] == "body.instigator_uid.keyword": + user_filter = {"term": {config["target_field"]: uid}} + elif config["target_field"] == "body.target_uid.keyword": + user_filter = {"term": {config["target_field"]: uid}} + else: + user_filter = {"bool": {"should": [ + {"term": {"uid.keyword": uid}}, + {"term": {"auth.id.keyword": user_data['auth_id']}} + ], "minimum_should_match": 1}} + else: + user_filter = {"bool": {"should": [ + {"term": {"uid.keyword": uid}}, + {"term": {"auth.id.keyword": user_data['auth_id']}} + ], "minimum_should_match": 1}} - # 쿼리 구성 + # 쿼리 필터 구성 query_filters = [user_filter, time_filter] - # 추가 필터 적용 + # 추가 필터 적용 (nested 제거) if "filters" in config: query_filters.extend(config["filters"]) - # 특별한 필드 처리 (player_kill의 경우) - if "target_field" in config and config["target_field"] == "body.instigator_uid": - query_filters.append({"nested": {"path": "body", "query": {"term": {"body.instigator_uid.keyword": uid}}}}) + # 조건부 필터 + if config.get("agg_type") == "conditional_sum": + condition_filter = {"term": {config['condition_field']: config['condition_value']}} + query_filters.append(condition_filter) + # 쿼리 바디 구성 query_body = { - "size": 0 if "agg_field" not in config else 1000, + "size": 0 if config.get("agg_type") not in ["first_value", "max_stalker"] else 1000, "query": {"bool": {"filter": query_filters}}, "track_total_hits": False } - # Aggregation이 필요한 경우 - if "agg_field" in config: - query_body["aggs"] = { - "total": {"sum": {"field": config["agg_field"]}} - } + # Aggregation 설정 + agg_type = config.get("agg_type", "count") - # NDJSON 형태로 추가 + if agg_type in ["sum", "avg", "max", "conditional_sum"]: + agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type + query_body["aggs"] = { + "metric_value": {agg_func: {"field": config["field"]}} + } + elif agg_type == "max_timestamp": + query_body["aggs"] = { + "metric_value": {"max": {"field": "@timestamp"}} + } + elif agg_type in ["first_value", "max_stalker"]: + sort_field = config.get("field", "@timestamp") + sort_order = "asc" if agg_type == "first_value" else "desc" + + query_body["sort"] = [{sort_field: {"order": sort_order}}] + query_body["_source"] = [config.get("field", "@timestamp")] + + # NDJSON 추가 queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False)) queries.append(json.dumps(query_body, ensure_ascii=False)) return queries -def execute_msearch_with_backoff(client: OpenSearch, queries: List[str]) -> List[Dict]: - """ - NDJSON + 지수 백오프 재시도로 msearch 실행 - hack-detector의 안정화 기법 적용 - """ - # NDJSON 문자열 생성 - body_ndjson = "\n".join(queries) + "\n" +# ============================================================================== +# 7. 세션 지표 계산 (동일) +# ============================================================================== + +def calculate_comprehensive_session_metrics( + client: OpenSearch, + uids: List[str], + cohort_data: Dict[str, Dict] +) -> Dict[str, Dict]: + """포괄적인 세션 지표 계산""" - # 지수 백오프로 재시도 - response = exponential_backoff_retry( - client.msearch, - body=body_ndjson, - request_timeout=60 - ) + def stream_user_events(uid: str) -> Generator[Dict, None, None]: + user_info = cohort_data.get(uid) + if not user_info: + return + + first_login_dt = user_info['first_login_dt'] + d0_end_dt = first_login_dt + timedelta(hours=24) + + query = { + "query": { + "bool": { + "filter": [ + {"term": {"uid.keyword": uid}}, + {"range": {"@timestamp": { + "gte": user_info['first_login_utc'], + "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') + }}} + ] + } + } + } + + try: + for doc in scan( + client, + query=query, + index="ds-logs-live-*", + scroll=SCROLL_TIMEOUT, + _source=["@timestamp", "type"] + ): + source = doc['_source'] + event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) + + if first_login_dt <= event_dt < d0_end_dt: + yield { + "time": event_dt, + "type": source.get('type', '').lower() + } + except Exception: + pass - return response.get('responses', []) + results = {} + + for uid in uids: + events = list(stream_user_events(uid)) + + session_metrics = { + 'active_seconds': 0, + 'total_playtime_minutes': 0, + 'session_count': 0, + 'avg_session_length': 0, + 'logout_abnormal': 0 + } + + if len(events) < 2: + results[uid] = session_metrics + continue + + events.sort(key=lambda x: x['time']) + + login_events = [e for e in events if e['type'] == 'login_comp'] + logout_events = [e for e in events if e['type'] == 'logout'] + heartbeat_events = [e for e in events if e['type'] == 'heartbeat'] + + session_metrics['session_count'] = len(login_events) + + # 활동 시간 계산 + if heartbeat_events: + total_active_seconds = 0 + for i in range(len(heartbeat_events) - 1): + time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time'] + if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES): + total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600) + + session_metrics['active_seconds'] = int(total_active_seconds) + + # 총 플레이 시간 + if login_events: + total_playtime_seconds = 0 + session_lengths = [] + + for login_event in login_events: + login_time = login_event['time'] + logout_time = None + for logout_event in logout_events: + if logout_event['time'] > login_time: + logout_time = logout_event['time'] + break + + if not logout_time and events: + logout_time = events[-1]['time'] + + if logout_time: + session_duration = (logout_time - login_time).total_seconds() + session_duration = min(session_duration, MAX_SESSION_HOURS * 3600) + total_playtime_seconds += session_duration + session_lengths.append(session_duration) + + session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60) + + if session_lengths: + session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60) + + # 비정상 종료 체크 + if login_events and logout_events: + last_login = max(login_events, key=lambda x: x['time'])['time'] + last_logout = max(logout_events, key=lambda x: x['time'])['time'] + session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0 + elif login_events and not logout_events: + session_metrics['logout_abnormal'] = 1 + + results[uid] = session_metrics + + return results # ============================================================================== -# 6. Future Pattern을 활용한 병렬 처리 최적화 +# 8. 수정된 배치 처리 # ============================================================================== -def process_user_batch_optimized( +def process_fixed_batch( client: OpenSearch, batch_uids: List[str], - cohort: Dict[str, Dict] + cohort: Dict[str, Dict], + metrics_config: Dict[str, Dict] ) -> List[Dict]: - """ - 최적화된 배치 처리 함수 - Future Pattern + 스트리밍 + NDJSON 기법 결합 - """ + """수정된 배치 처리 함수""" - # 1. 활동 시간 계산 (스트리밍 방식) - active_hours_map = calculate_active_hours_streaming(client, batch_uids, cohort) + logger.info(f"배치 처리 시작: {len(batch_uids)}명") - # 2. msearch 쿼리 생성 및 실행 (NDJSON + 백오프) - msearch_queries = build_msearch_queries(batch_uids, cohort) - msearch_responses = execute_msearch_with_backoff(client, msearch_queries) - - # 3. 결과 집계 - batch_results = [] - metrics_per_user = 9 # 정의된 지표 수 - - for idx, uid in enumerate(batch_uids): - try: - user_data = cohort[uid] - user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] - - # 기본 정보 - result = { - 'uid': uid, - 'auth_id': user_data['auth_id'], - 'nickname': 'N/A', # TODO: nickname 조회 로직 추가 - 'first_login_time': user_data['first_login_utc'], - 'retention_status': 'Retained_d0', # 기본값 - 'country': user_data['country'], - 'device': user_data['device'], - 'active_seconds': active_hours_map.get(uid, 0) - } - - # msearch 결과 파싱 - metrics = [ - 'retention_d1', 'tutorial_entry', 'tutorial_completed', - 'dungeon_entry_count', 'dungeon_escape_count', 'monster_kill_count', - 'player_kill_count', 'matching_complete_count', 'friend_add_count' - ] - - for i, metric in enumerate(metrics): - response = user_responses[i] + try: + # 1. 세션 지표 계산 + session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort) + + # 2. 수정된 msearch 실행 + msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config) + + body_ndjson = "\n".join(msearch_queries) + "\n" + logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리") + + msearch_responses = exponential_backoff_retry( + client.msearch, + body=body_ndjson, + request_timeout=300 + ).get('responses', []) + + # 3. 결과 집계 + batch_results = [] + metrics_per_user = len(metrics_config) + + for idx, uid in enumerate(batch_uids): + try: + user_data = cohort[uid] + user_session_metrics = session_metrics.get(uid, {}) + user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] - if 'error' in response: - result[metric] = 0 - continue + # 기본 정보 + result = { + 'uid': uid, + 'auth_id': user_data['auth_id'], + 'nickname': user_data['nickname'], + 'first_login_time': user_data['first_login_kst'], + 'retention_status': 'Retained_d0', + 'country': user_data['country'], + 'device': user_data['device'], + 'active_seconds': user_session_metrics.get('active_seconds', 0), + 'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0), + 'session_count': user_session_metrics.get('session_count', 0), + 'avg_session_length': user_session_metrics.get('avg_session_length', 0), + 'logout_abnormal': user_session_metrics.get('logout_abnormal', 0) + } - hits_total = response.get('hits', {}).get('total', {}).get('value', 0) + # msearch 결과 파싱 + metric_names = list(metrics_config.keys()) - if metric == 'retention_d1': - result['retention_status'] = 'Retained_d1' if hits_total > 0 else 'Retained_d0' - result[metric] = 1 if hits_total > 0 else 0 - elif metric == 'monster_kill_count': - agg_value = response.get('aggregations', {}).get('total', {}).get('value', 0) - result[metric] = int(agg_value) if agg_value else 0 + for i, metric_name in enumerate(metric_names): + if i >= len(user_responses): + result[metric_name] = 0 + continue + + response = user_responses[i] + config = metrics_config[metric_name] + + if 'error' in response: + result[metric_name] = 0 + continue + + agg_type = config.get("agg_type", "count") + + if agg_type == "count" or agg_type == "exists": + hits_total = response.get('hits', {}).get('total', {}).get('value', 0) + result[metric_name] = 1 if hits_total > 0 else 0 + + elif agg_type in ["sum", "avg", "max", "conditional_sum"]: + agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value') + result[metric_name] = int(agg_value) if agg_value else 0 + + elif agg_type == "max_timestamp": + timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string') + result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None + + elif agg_type == "first_value": + hits = response.get('hits', {}).get('hits', []) + if hits: + source_value = hits[0]['_source'] + field_name = config.get("field", "") + + if field_name.startswith("body."): + keys = field_name.split(".") + value = source_value + for key in keys: + value = value.get(key, 0) if isinstance(value, dict) else 0 + result[metric_name] = value + else: + result[metric_name] = source_value.get(field_name, 0) + else: + if metric_name == "dungeon_first_result": + result[metric_name] = 2 # 미플레이 + else: + result[metric_name] = 0 + + elif agg_type == "max_stalker": + hits = response.get('hits', {}).get('hits', []) + if hits: + source_value = hits[0]['_source'] + field_name = config.get("field", "") + keys = field_name.split(".") + value = source_value + for key in keys: + value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A' + result[metric_name] = value + else: + result[metric_name] = 'N/A' + + else: + result[metric_name] = 0 + + # 리텐션 상태 업데이트 + if metric_name == "retention_check" and result[metric_name] > 0: + result['retention_status'] = 'Retained_d1' + + # 계산된 지표 + if result.get('dungeon_entry_count', 0) > 0: + escape_count = result.get('dungeon_escape_count', 0) + result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2) else: - result[metric] = hits_total - - batch_results.append(result) - - except Exception as e: - print(f" - ⚠️ UID '{uid}' 처리 중 오류: {e}") - - return batch_results + result['dungeon_escape_rate'] = 0 + + batch_results.append(result) + + except Exception as e: + logger.warning(f"UID '{uid}' 처리 중 오류: {e}") + + logger.info(f"배치 처리 완료: {len(batch_results)}명 성공") + return batch_results + + except Exception as e: + logger.error(f"배치 처리 중 심각한 오류: {e}") + return [] -def process_cohort_parallel( +# ============================================================================== +# 9. 병렬 처리 및 결과 저장 (동일) +# ============================================================================== + +def process_cohort_fixed_parallel( client: OpenSearch, cohort: Dict[str, Dict], batch_size: int, - max_workers: int + max_workers: int, + metrics_config: Dict[str, Dict] ) -> List[Dict]: - """ - Future Pattern을 활용한 병렬 처리 - 동적 청크 크기 조정 + 실패 재처리 - """ + """수정된 병렬 처리""" - # 동적 배치 크기 조정 (사용자 수 기반) - user_count = len(cohort) - if user_count < 1000: - adjusted_batch_size = min(batch_size, 100) - else: - adjusted_batch_size = batch_size + logger.info("=" * 80) + logger.info("2단계: 수정된 병렬 배치 처리") + logger.info(f"총 사용자: {len(cohort)}명") + logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}") + logger.info(f"분석 지표: {len(metrics_config)}개") - print(f"\n[2단계] 병렬 배치 처리 시작 (배치크기: {adjusted_batch_size}, 워커: {max_workers})") - - # UID 리스트를 청크로 분할 uid_list = list(cohort.keys()) - chunks = [uid_list[i:i + adjusted_batch_size] for i in range(0, len(uid_list), adjusted_batch_size)] + chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] all_results = [] failed_chunks = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Future 객체 생성 future_to_chunk = { - executor.submit(process_user_batch_optimized, client, chunk, cohort): chunk + executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config): chunk for chunk in chunks } - # 진행률 표시 - with tqdm(total=len(chunks), desc=" - 배치 처리 진행률") as pbar: + with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar: for future in as_completed(future_to_chunk): chunk = future_to_chunk[future] try: - batch_results = future.result(timeout=300) # 5분 타임아웃 + batch_results = future.result(timeout=600) if batch_results: all_results.extend(batch_results) else: failed_chunks.append(chunk) except Exception as e: - print(f" - ⚠️ 배치 처리 실패: {e}") + logger.warning(f"배치 처리 실패: {e}") failed_chunks.append(chunk) finally: pbar.update(1) - # 실패한 청크 재처리 (단일 스레드) + # 실패한 청크 재처리 if failed_chunks: - print(f"\n - 실패한 {len(failed_chunks)}개 배치 재처리 중...") + logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") for chunk in failed_chunks: try: - batch_results = process_user_batch_optimized(client, chunk, cohort) + batch_results = process_fixed_batch(client, chunk, cohort, metrics_config) all_results.extend(batch_results) except Exception as e: - print(f" - ❌ 재처리 실패: {e}") + logger.error(f"재처리 실패: {e}") + logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") + logger.info("=" * 80) return all_results -# ============================================================================== -# 7. 스트리밍 CSV 작성 (메모리 최적화) -# ============================================================================== - -def write_results_streaming(results: List[Dict], output_path: str) -> None: - """ - 스트리밍 방식으로 CSV 작성 - 메모리에 모든 데이터를 올리지 않고 직접 파일에 쓰기 - """ +def write_fixed_results(results: List[Dict], output_path: str) -> None: + """수정된 결과 저장 (retention_d1 제거)""" + + logger.info("=" * 80) + logger.info("3단계: 결과 저장") if not results: - print(" - [ERROR] 저장할 결과 데이터가 없습니다.") + logger.error("저장할 결과 데이터가 없습니다.") return - # CSV 헤더 정의 (기획서 기반) + # 수정된 헤더 (retention_d1 제거) headers = [ - 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', - 'country', 'device', 'active_seconds', 'retention_d1', - 'tutorial_entry', 'tutorial_completed', 'dungeon_entry_count', - 'dungeon_escape_count', 'monster_kill_count', 'player_kill_count', - 'matching_complete_count', 'friend_add_count' + 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'country', 'device', + 'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal', + 'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', + 'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time', + 'total_armor_break', 'raid_play_count', 'escape_count', + 'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count', + 'damage_dealt_monster', 'damage_dealt_player', + 'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed', + 'guide_quest_stage', 'skill_points_earned', + 'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count', + 'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count', + 'enchant_count', 'enchant_gold_spent', + 'ingame_equip_count', 'object_interaction_count', + 'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time', + 'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count', + 'exchange_register_count', 'exchange_use_count', 'coupon_used', + 'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step', + 'last_logout_time' ] try: @@ -624,179 +1106,124 @@ def write_results_streaming(results: List[Dict], output_path: str) -> None: writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore') writer.writeheader() - # 스트리밍으로 한 줄씩 작성 for result in results: writer.writerow(result) - print(f" - [SUCCESS] 결과 파일 저장 완료: {output_path}") - print(f" - [INFO] 총 {len(results)}명의 데이터가 저장되었습니다.") + logger.info(f"결과 파일 저장 완료: {output_path}") + logger.info(f"총 {len(results)}명의 데이터 저장") + logger.info(f"분석 지표: {len(headers)}개") except Exception as e: - print(f" - [ERROR] CSV 파일 저장 실패: {e}") + logger.error(f"CSV 파일 저장 실패: {e}") # ============================================================================== -# 8. 실시간 타이머 (유틸리티) -# ============================================================================== - -def live_timer(start_time: float, pbar: tqdm) -> None: - """실시간 경과 시간 표시""" - while not stop_timer_event.is_set(): - elapsed = str(timedelta(seconds=int(time.time() - start_time))) - pbar.set_postfix_str(f"경과 시간: {elapsed}") - time.sleep(1) - - -# ============================================================================== -# 9. 메인 함수 및 명령줄 인터페이스 +# 10. 메인 함수 # ============================================================================== def parse_arguments() -> argparse.Namespace: """명령줄 인자 파싱""" - parser = argparse.ArgumentParser( - description="던전 스토커즈 신규 유저 리텐션 분석 스크립트", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -예시: - python ds_new_user_analy.py --start-time "2025-08-22T12:00:00+09:00" --end-time "2025-08-25T12:00:00+09:00" - python ds_new_user_analy.py --start-time "2025-08-22T12:00:00+09:00" --end-time "2025-08-22T13:00:00+09:00" --sample-size 100 - """ - ) - - parser.add_argument( - '--start-time', - required=True, - help='분석 시작 시간 (KST, ISO 형식): "2025-08-22T12:00:00+09:00"' - ) - - parser.add_argument( - '--end-time', - required=True, - help='분석 종료 시간 (KST, ISO 형식): "2025-08-25T12:00:00+09:00"' - ) - - parser.add_argument( - '--output-dir', - default=OUTPUT_DIR, - help=f'결과 파일 저장 경로 (기본값: {OUTPUT_DIR})' - ) - - parser.add_argument( - '--batch-size', - type=int, - default=DEFAULT_BATCH_SIZE, - help=f'배치 처리 크기 (기본값: {DEFAULT_BATCH_SIZE})' - ) - - parser.add_argument( - '--max-workers', - type=int, - default=DEFAULT_MAX_WORKERS, - help=f'병렬 처리 스레드 수 (기본값: {DEFAULT_MAX_WORKERS})' - ) - - parser.add_argument( - '--sample-size', - type=int, - help='샘플 분석 크기 (None이면 전체 분석)' - ) - + parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)") + parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)') + parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)') + parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기') + parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수') + parser.add_argument('--sample-size', type=int, help='샘플 분석 크기') return parser.parse_args() def main(): """메인 실행 함수""" + global logger - # 시작 시간 기록 overall_start_time = time.time() + start_kst = datetime.now(KST) + timestamp = start_kst.strftime('%Y%m%d_%H%M%S') - print("=" * 80) - print("던전 스토커즈 신규 유저 리텐션 분석 v2.0 (Claude Code)") - print("hack-detector 고급 최적화 기법 적용") - print("=" * 80) + log_file_path = OUTPUT_DIR / f"ds-new-user-fixed-analysis_{timestamp}.log" + csv_file_path = OUTPUT_DIR / f"ds-new-user-fixed_{timestamp}.csv" + + logger = setup_logging(str(log_file_path)) + + logger.info("=" * 80) + logger.info("던전 스토커즈 신규 유저 분석 v4.0 (OpenSearch 매핑 수정)") + logger.info("nested 쿼리 오류 수정 + retention_d1 필드 제거") + logger.info("=" * 80) - # 명령줄 인자 파싱 args = parse_arguments() - # 시간 변환 (KST -> UTC) try: - start_kst = datetime.fromisoformat(args.start_time) - end_kst = datetime.fromisoformat(args.end_time) + start_kst_arg = datetime.fromisoformat(args.start_time) + end_kst_arg = datetime.fromisoformat(args.end_time) - start_utc = start_kst.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') - end_utc = end_kst.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') - print(f"[INFO] 분석 기간: {args.start_time} ~ {args.end_time}") - print(f"[INFO] UTC 변환: {start_utc} ~ {end_utc}") + logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}") + logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}") except Exception as e: - print(f"[ERROR] 시간 형식 오류: {e}") + logger.error(f"시간 형식 오류: {e}") return - # OpenSearch 클라이언트 생성 client = create_opensearch_client() if not client: return try: - # 1단계: 신규 유저 코호트 선정 (Composite Aggregation) + metrics_config = get_fixed_metrics_config() + logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}개") + cohort = get_new_user_cohort_optimized(client, start_utc, end_utc) if not cohort: - print("\n[ERROR] 분석할 신규 유저가 없습니다.") + logger.error("분석할 신규 유저가 없습니다.") return - # 샘플링 모드 if args.sample_size and args.sample_size < len(cohort): uid_list = list(cohort.keys()) sampled_uids = uid_list[:args.sample_size] cohort = {uid: cohort[uid] for uid in sampled_uids} - print(f"[WARNING] 샘플링 모드: {args.sample_size}명만 분석합니다.") + logger.info(f"샘플링 모드: {args.sample_size}명만 분석") - # 2단계: 병렬 배치 처리 (Future Pattern) - results = process_cohort_parallel( - client, - cohort, - args.batch_size, - args.max_workers + results = process_cohort_fixed_parallel( + client, cohort, args.batch_size, args.max_workers, metrics_config ) - # 3단계: 결과 저장 (스트리밍 CSV) - print(f"\n[3단계] 결과 저장") + write_fixed_results(results, str(csv_file_path)) - timestamp = datetime.now(KST).strftime('%Y%m%d_%H%M%S') - filename = f"ds_new_users_analy_{timestamp}.csv" - output_path = os.path.join(args.output_dir, filename) - - write_results_streaming(results, output_path) - - # 통계 요약 + # 요약 if results: + total_users = len(results) retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1') - retention_rate = (retained_d1 / len(results)) * 100 + retention_rate = (retained_d1 / total_users) * 100 - print(f"\n[SUMMARY] 분석 요약:") - print(f" - 총 신규 유저: {len(results)}명") - print(f" - D+1 리텐션: {retained_d1}명 ({retention_rate:.1f}%)") - print(f" - 평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / len(results) / 60:.1f}분") + logger.info("=" * 80) + logger.info("분석 요약") + logger.info("=" * 80) + logger.info(f"총 신규 유저: {total_users:,}명") + logger.info(f"D+1 리텐션: {retained_d1:,}명 ({retention_rate:.1f}%)") + logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}분") except KeyboardInterrupt: - print(f"\n[WARNING] 사용자에 의해 중단되었습니다.") + logger.warning("사용자에 의해 중단되었습니다.") except Exception as e: - print(f"\n[ERROR] 예상치 못한 오류: {e}") + logger.error(f"예상치 못한 오류: {e}") import traceback - traceback.print_exc() + logger.error(traceback.format_exc()) finally: - # 타이머 정지 stop_timer_event.set() - - # 총 소요 시간 end_time = time.time() total_time = str(timedelta(seconds=int(end_time - overall_start_time))) - print(f"\n[INFO] 총 소요 시간: {total_time}") - print("\n[SUCCESS] 분석 완료!") + end_kst = datetime.now(KST) + + logger.info("=" * 80) + logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}") + logger.info(f"총 소요 시간: {total_time}") + logger.info("수정된 분석 완료!") + logger.info("=" * 80) if __name__ == "__main__":