diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 921f434..d8c0b6b 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -594,13 +594,13 @@ def get_new_user_cohort_optimized( for bucket in buckets: uid = bucket["key"]["uid"] - auth_id = bucket["key"]["auth_id"] + auth_id = bucket["key"]["auth_id"] # 빈값일 수 있음 first_create_utc = bucket["first_create"]["value_as_string"] # 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리) if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]: new_user_map[uid] = { - "auth_id": auth_id, + "auth_id": None, # heartbeat에서 수집 예정 "create_time": first_create_utc } @@ -616,15 +616,74 @@ def get_new_user_cohort_optimized( logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") - # Step 2: login_comp 인덱스에서 해당 유저들의 추가 정보 수집 + # Step 2: heartbeat 인덱스에서 auth.id 수집 if not new_user_map: logger.warning("신규 유저가 없습니다.") return cohort - # 유저 청크 단위로 처리 + logger.info("heartbeat 인덱스에서 auth.id 수집 중...") uid_list = list(new_user_map.keys()) chunk_size = 100 + for i in range(0, len(uid_list), chunk_size): + chunk_uids = uid_list[i:i+chunk_size] + + # heartbeat에서 auth.id 수집 + heartbeat_query = { + "size": 0, + "query": { + "bool": { + "filter": [ + {"terms": {"uid.keyword": chunk_uids}} + ] + } + }, + "aggs": { + "users": { + "terms": { + "field": "uid.keyword", + "size": chunk_size + }, + "aggs": { + "auth_info": { + "top_hits": { + "size": 1, + "sort": [{"@timestamp": {"order": "asc"}}], + "_source": ["auth.id"] + } + } + } + } + } + } + + try: + response = exponential_backoff_retry( + client.search, + index="ds-logs-live-heartbeat", + body=heartbeat_query, + request_timeout=DEFAULT_TIMEOUT, + track_total_hits=False + ) + + for bucket in response["aggregations"]["users"]["buckets"]: + uid = bucket["key"] + if bucket["auth_info"]["hits"]["hits"]: + auth_id = bucket["auth_info"]["hits"]["hits"][0]["_source"].get("auth", {}).get("id") + if auth_id and uid in new_user_map: + new_user_map[uid]["auth_id"] = auth_id + + except Exception as e: + logger.error(f"heartbeat에서 auth.id 수집 중 오류: {e}") + + # auth.id 수집 상태 확인 + auth_id_count = sum(1 for uid in new_user_map if new_user_map[uid]["auth_id"] is not None) + logger.info(f"auth.id 수집 완료: {auth_id_count}/{len(new_user_map)}명") + + # Step 3: login_comp 인덱스에서 추가 정보 수집 + logger.info("login_comp 인덱스에서 추가 정보 수집 중...") + # 유저 청크 단위로 처리 + for i in range(0, len(uid_list), chunk_size): chunk_uids = uid_list[i:i+chunk_size] @@ -680,14 +739,13 @@ def get_new_user_cohort_optimized( user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} - # create_uid 정보와 병합 + # create_uid 정보와 병합 (first_login 제거, create_time으로 통합) + # auth_id는 heartbeat에서 수집되었거나 N/A cohort[uid] = { - 'auth_id': new_user_map[uid]["auth_id"], + 'auth_id': new_user_map[uid]["auth_id"] or 'N/A', 'create_time_utc': new_user_map[uid]["create_time"], 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), - 'first_login_utc': first_login_utc, - 'first_login_kst': format_kst_time(first_login_utc), - 'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')), + 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), 'language': latest_info_hit.get('body', {}).get('language', 'N/A'), 'device': user_hit.get('body', {}).get('device_mod', 'N/A'), 'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') @@ -717,13 +775,13 @@ def build_fixed_msearch_queries( for uid in uids: user_data = cohort[uid] - first_login_dt = user_data['first_login_dt'] + create_time_dt = user_data['create_time_dt'] - # 시간 범위 정의 - d0_start = user_data['first_login_utc'] - d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') + # 시간 범위 정의 (create_time 기준) + d0_start = user_data['create_time_utc'] + d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') d1_start = d0_end - d1_end = (first_login_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ') + d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ') for metric_name, config in metrics_config.items(): # 시간 범위 선택 @@ -818,8 +876,8 @@ def calculate_comprehensive_session_metrics( if not user_info: return - first_login_dt = user_info['first_login_dt'] - d0_end_dt = first_login_dt + timedelta(hours=24) + create_time_dt = user_info['create_time_dt'] + d0_end_dt = create_time_dt + timedelta(hours=24) query = { "query": { @@ -827,7 +885,7 @@ def calculate_comprehensive_session_metrics( "filter": [ {"term": {"uid.keyword": uid}}, {"range": {"@timestamp": { - "gte": user_info['first_login_utc'], + "gte": user_info['create_time_utc'], "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') }}} ] @@ -846,7 +904,7 @@ def calculate_comprehensive_session_metrics( source = doc['_source'] event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) - if first_login_dt <= event_dt < d0_end_dt: + if create_time_dt <= event_dt < d0_end_dt: yield { "time": event_dt, "type": source.get('type', '').lower() @@ -937,15 +995,26 @@ def process_fixed_batch( client: OpenSearch, batch_uids: List[str], cohort: Dict[str, Dict], - metrics_config: Dict[str, Dict] + metrics_config: Dict[str, Dict], + include_session_metrics: bool = False ) -> List[Dict]: """수정된 배치 처리 함수""" logger.info(f"배치 처리 시작: {len(batch_uids)}명") try: - # 1. 세션 지표 계산 - session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort) + # 1. 세션 지표 계산 (--full 옵션일 때만) + if include_session_metrics: + session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort) + else: + # 기본값으로 빈 딕셔너리 생성 + session_metrics = {uid: { + 'active_seconds': 0, + 'total_playtime_minutes': 0, + 'session_count': 0, + 'avg_session_length': 0, + 'logout_abnormal': 0 + } for uid in batch_uids} # 2. 수정된 msearch 실행 msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config) @@ -969,14 +1038,13 @@ def process_fixed_batch( user_session_metrics = session_metrics.get(uid, {}) user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] - # 기본 정보 (create_time 추가) + # 기본 정보 (first_login_time 제거, create_time으로 통합) result = { 'uid': uid, - 'auth_id': user_data['auth_id'], + 'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리 'nickname': user_data['nickname'], - 'create_time': user_data.get('create_time_kst', user_data.get('first_login_kst')), # create_time이 있으면 사용, 없으면 first_login 사용 - 'first_login_time': user_data['first_login_kst'], - 'retention_status': 'Retained_d0', + 'create_time': user_data.get('create_time_kst', 'N/A'), + 'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트 'language': user_data['language'], 'device': user_data['device'], 'active_seconds': user_session_metrics.get('active_seconds', 0), @@ -1109,10 +1177,10 @@ def process_fixed_batch( # 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신) dungeon_count = result['dungeon_entry_count'] try: - # 시간 범위 가져오기 - first_login_dt = user_data['first_login_dt'] - d0_start = user_data['first_login_utc'] - d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') + # 시간 범위 가져오기 (create_time 기준) + create_time_dt = user_data['create_time_dt'] + d0_start = user_data['create_time_utc'] + d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') # survival_end 인덱스에서 직접 데미지 합계 조회 damage_query = { @@ -1174,7 +1242,8 @@ def process_cohort_fixed_parallel( cohort: Dict[str, Dict], batch_size: int, max_workers: int, - metrics_config: Dict[str, Dict] + metrics_config: Dict[str, Dict], + include_session_metrics: bool = False ) -> List[Dict]: """수정된 병렬 처리""" @@ -1183,6 +1252,7 @@ def process_cohort_fixed_parallel( logger.info(f"총 사용자: {len(cohort)}명") logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}") logger.info(f"분석 지표: {len(metrics_config)}개") + logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}") uid_list = list(cohort.keys()) chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] @@ -1192,7 +1262,7 @@ def process_cohort_fixed_parallel( with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_chunk = { - executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config): chunk + executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk for chunk in chunks } @@ -1216,7 +1286,7 @@ def process_cohort_fixed_parallel( logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") for chunk in failed_chunks: try: - batch_results = process_fixed_batch(client, chunk, cohort, metrics_config) + batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) all_results.extend(batch_results) except Exception as e: logger.error(f"재처리 실패: {e}") @@ -1236,9 +1306,9 @@ def write_fixed_results(results: List[Dict], output_path: str) -> None: logger.error("저장할 결과 데이터가 없습니다.") return - # 수정된 헤더 (create_time 추가) + # 수정된 헤더 (first_login_time 제거, create_time만 사용) headers = [ - 'uid', 'auth_id', 'nickname', 'create_time', 'first_login_time', 'retention_status', 'language', 'device', + 'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'device', 'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal', 'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', 'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time', @@ -1286,6 +1356,7 @@ def parse_arguments() -> argparse.Namespace: parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기') parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수') parser.add_argument('--sample-size', type=int, help='샘플 분석 크기') + parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석') return parser.parse_args() @@ -1303,8 +1374,8 @@ def main(): logger = setup_logging(str(log_file_path)) logger.info("=" * 80) - logger.info("던전 스토커즈 신규 유저 분석 v4.0 (OpenSearch 매핑 수정)") - logger.info("nested 쿼리 오류 수정 + retention_d1 필드 제거") + logger.info("던전 스토커즈 신규 유저 분석 v5.0") + logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집") logger.info("=" * 80) args = parse_arguments() @@ -1318,6 +1389,7 @@ def main(): logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}") logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}") + logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}") except Exception as e: logger.error(f"시간 형식 오류: {e}") @@ -1344,7 +1416,7 @@ def main(): logger.info(f"샘플링 모드: {args.sample_size}명만 분석") results = process_cohort_fixed_parallel( - client, cohort, args.batch_size, args.max_workers, metrics_config + client, cohort, args.batch_size, args.max_workers, metrics_config, args.full ) write_fixed_results(results, str(csv_file_path))