From c08c4cf4832d6f0dfe33d2529b2def37c1a81751 Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Fri, 29 Aug 2025 14:26:04 +0900 Subject: [PATCH] Optimize new user cohort selection by prioritizing auth.id collection and enhancing logging details --- ds_new_user_analy.py | 129 +++++++++++++++---------------------------- 1 file changed, 45 insertions(+), 84 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index cf84da3..196a9c4 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -528,19 +528,19 @@ def get_new_user_cohort_optimized( end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: - """Composite Aggregation을 활용한 신규 유저 코호트 선정 + """최적화된 신규 유저 코호트 선정 (auth.id 수집 우선순위 적용) ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별 + auth.id 수집 우선순위: 1) login_comp, 2) log_return_to_lobby """ logger.info("=" * 80) - logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준)") + logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준, 최적화 버전)") logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"페이지 크기: {page_size}") cohort = {} after_key = None - total_users = 0 # Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출 create_uid_query = { @@ -569,7 +569,7 @@ def get_new_user_cohort_optimized( } # 신규 생성된 유저 수집 - new_user_map = {} # uid -> {'auth_id': ..., 'create_time': ...} + new_user_map = {} # uid -> {'create_time': ...} page_count = 0 while True: @@ -594,13 +594,11 @@ def get_new_user_cohort_optimized( for bucket in buckets: uid = bucket["key"]["uid"] - auth_id = bucket["key"]["auth_id"] # 빈값일 수 있음 first_create_utc = bucket["first_create"]["value_as_string"] # 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리) if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]: new_user_map[uid] = { - "auth_id": None, # heartbeat에서 수집 예정 "create_time": first_create_utc } @@ -616,77 +614,19 @@ def get_new_user_cohort_optimized( logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") - # Step 2: heartbeat 인덱스에서 auth.id 수집 + # Step 2: 모든 create_uid 유저를 cohort에 추가 if not new_user_map: logger.warning("신규 유저가 없습니다.") return cohort - logger.info("heartbeat 인덱스에서 auth.id 수집 중...") uid_list = list(new_user_map.keys()) chunk_size = 100 + total_users = 0 - for i in range(0, len(uid_list), chunk_size): - chunk_uids = uid_list[i:i+chunk_size] - - # heartbeat에서 auth.id 수집 - heartbeat_query = { - "size": 0, - "query": { - "bool": { - "filter": [ - {"terms": {"uid.keyword": chunk_uids}} - ] - } - }, - "aggs": { - "users": { - "terms": { - "field": "uid.keyword", - "size": chunk_size - }, - "aggs": { - "auth_info": { - "top_hits": { - "size": 1, - "sort": [{"@timestamp": {"order": "asc"}}], - "_source": ["auth.id"] - } - } - } - } - } - } - - try: - response = exponential_backoff_retry( - client.search, - index="ds-logs-live-heartbeat", - body=heartbeat_query, - request_timeout=DEFAULT_TIMEOUT, - track_total_hits=False - ) - - for bucket in response["aggregations"]["users"]["buckets"]: - uid = bucket["key"] - if bucket["auth_info"]["hits"]["hits"]: - auth_id = bucket["auth_info"]["hits"]["hits"][0]["_source"].get("auth", {}).get("id") - if auth_id and uid in new_user_map: - new_user_map[uid]["auth_id"] = auth_id - - except Exception as e: - logger.error(f"heartbeat에서 auth.id 수집 중 오류: {e}") - - # auth.id 수집 상태 확인 - auth_id_count = sum(1 for uid in new_user_map if new_user_map[uid]["auth_id"] is not None) - logger.info(f"auth.id 수집 완료: {auth_id_count}/{len(new_user_map)}명") - - # Step 3: 모든 create_uid 유저를 cohort에 추가하고, 추가 정보 수집 - logger.info("모든 신규 유저를 cohort에 추가하고 추가 정보 수집 중...") - - # 모든 create_uid 유저를 cohort에 먼저 추가 + # 모든 create_uid 유저를 cohort에 먼저 추가 (auth_id는 N/A로 초기화) for uid in uid_list: cohort[uid] = { - 'auth_id': new_user_map[uid]["auth_id"] or 'N/A', + 'auth_id': 'N/A', 'create_time_utc': new_user_map[uid]["create_time"], 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), @@ -696,8 +636,10 @@ def get_new_user_cohort_optimized( } total_users += 1 - # login_comp 인덱스에서 추가 정보 수집 (1차 우선) - logger.info("login_comp 인덱스에서 추가 정보 수집 중...") + logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") + + # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) + logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...") login_comp_collected = set() for i in range(0, len(uid_list), chunk_size): @@ -723,14 +665,14 @@ def get_new_user_cohort_optimized( "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], - "_source": ["body.device_mod", "body.nickname"] + "_source": ["body.device_mod", "body.nickname", "auth.id"] } }, "latest_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], - "_source": ["body.nickname", "body.language"] + "_source": ["body.nickname", "body.language", "auth.id"] } } } @@ -753,23 +695,29 @@ def get_new_user_cohort_optimized( user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} - # 기존 cohort 정보 업데이트 + # cohort 정보 업데이트 (auth.id 1순위 수집) if uid in cohort: cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A') cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A') cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') + + # auth.id 수집 (1순위) + auth_id = latest_info_hit.get('auth', {}).get('id') or user_hit.get('auth', {}).get('id') + if auth_id: + cohort[uid]['auth_id'] = auth_id + login_comp_collected.add(uid) except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") - logger.info(f"login_comp에서 {len(login_comp_collected)}명의 추가 정보 수집 완료") + logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") - # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (nickname만) - missing_uids = [uid for uid in uid_list if uid not in login_comp_collected and cohort[uid]['nickname'] == 'N/A'] + # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) + missing_uids = [uid for uid in uid_list if uid not in login_comp_collected] if missing_uids: - logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중...") + logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...") lobby_collected = set() for i in range(0, len(missing_uids), chunk_size): @@ -791,11 +739,11 @@ def get_new_user_cohort_optimized( "size": chunk_size }, "aggs": { - "nickname_info": { + "info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], - "_source": ["body.nickname"] + "_source": ["body.nickname", "auth.id"] } } } @@ -815,17 +763,30 @@ def get_new_user_cohort_optimized( for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] - nickname_hit = bucket["nickname_info"]["hits"]["hits"][0]["_source"] if bucket["nickname_info"]["hits"]["hits"] else {} + info_hit = bucket["info"]["hits"]["hits"][0]["_source"] if bucket["info"]["hits"]["hits"] else {} - # nickname만 업데이트 (다른 정보는 login_comp가 우선) - if uid in cohort and cohort[uid]['nickname'] == 'N/A': - cohort[uid]['nickname'] = nickname_hit.get('body', {}).get('nickname', 'N/A') + # 차선 정보 업데이트 (login_comp에서 수집되지 않은 유저만) + if uid in cohort: + # nickname 업데이트 (없는 경우에만) + if cohort[uid]['nickname'] == 'N/A': + cohort[uid]['nickname'] = info_hit.get('body', {}).get('nickname', 'N/A') + + # auth.id 수집 (2순위, 없는 경우에만) + if cohort[uid]['auth_id'] == 'N/A': + auth_id = info_hit.get('auth', {}).get('id') + if auth_id: + cohort[uid]['auth_id'] = auth_id + lobby_collected.add(uid) except Exception as e: logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") - logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 닉네임 수집 완료") + logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") + + # 최종 통계 + auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A') + logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}명") logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info("=" * 80)