diff --git a/DB.py b/DB.py index 0449979..7154462 100644 --- a/DB.py +++ b/DB.py @@ -11,6 +11,8 @@ from sqlalchemy import ( BigInteger, Integer, String, Text ) from sqlalchemy.dialects.mysql import insert as mysql_insert +from logger import logger + MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" @@ -80,11 +82,11 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): return fn(self, *args, **kwargs) except pymysql.InterfaceError as e: wait = base_delay * (2 ** (attempt - 1)) - print(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError:{e},等待 {wait:.1f}s 后重连…") + logger.info(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError:{e},等待 {wait:.1f}s 后重连…") time.sleep(wait) self._reconnect_mysql() if attempt == max_retries: - print("[MySQL] 重试多次仍失败,抛出异常") + logger.info("[MySQL] 重试多次仍失败,抛出异常") raise return wrapper @@ -104,12 +106,12 @@ def redis_retry(max_retries: int = 3): try: return fn(self, *args, **kwargs) except (ConnectionError, TimeoutError) as e: - print(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}") + logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}") self.reconnect_redis() if attempt == max_retries: - print("[Redis] 连接彻底失败") + logger.info("[Redis] 连接彻底失败") raise - print(f"[Redis] 重连后第 {attempt + 1} 次重试…") + logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…") return wrapper @@ -151,7 +153,7 @@ class DBVidcon: try: self._connect_redis() except Exception as e: - print("[Redis reconnect error]", e) + logger.info("[Redis reconnect error]", e) time.sleep(2) @redis_retry(max_retries=3) @@ -172,7 +174,7 @@ class DBVidcon: break raws.append(item) except redis.exceptions.ConnectionError as e: - print("[Redis pop error]", e) + logger.info("[Redis pop error]", e) self.reconnect_redis() return [] if not raws: @@ -193,7 +195,7 @@ class DBVidcon: if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) - print(f"[写入l0] 已推入 {len(raws)} 条") + logger.info(f"[写入l0] 已推入 {len(raws)} 条") @redis_retry(max_retries=3) def push_l1(self, payloads): @@ -201,7 +203,7 @@ class DBVidcon: if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) - print(f"[写入l1] 已推入 {len(payloads)} 条") + logger.info(f"[写入l1] 已推入 {len(payloads)} 条") @redis_retry(max_retries=3) def push_l2(self, raws): @@ -209,7 +211,7 @@ class DBVidcon: if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) - print(f"[写入l2] 已推入 {len(raws)} 条") + logger.info(f"[写入l2] 已推入 {len(raws)} 条") @mysql_retry() def get_proxy_agent_dict(self) -> dict: @@ -224,7 +226,7 @@ class DBVidcon: sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1" self.cursor.execute(sql, (rn,)) result = self.cursor.fetchone() - print(result) + logger.info(result) return result['parameter'] if result else None @redis_retry(max_retries=3) @@ -232,7 +234,7 @@ class DBVidcon: try: items = self.fetch_from_redis(count, list_key=self.l0_list_key) except Exception as e: - print("[Redis l0 pop error]", e) + logger.info("[Redis l0 pop error]", e) self.reconnect_redis() items = [] @@ -241,7 +243,7 @@ class DBVidcon: try: items = self.fetch_from_redis(count, list_key=self.l1_list_key) except Exception as e: - print("[Redis l1 pop error]", e) + logger.info("[Redis l1 pop error]", e) self.reconnect_redis() items = [] @@ -250,7 +252,7 @@ class DBVidcon: try: items = self.fetch_from_redis(count, list_key=self.l2_list_key) except Exception as e: - print("[Redis l2 pop error]", e) + logger.info("[Redis l2 pop error]", e) self.reconnect_redis() items = [] return items, 99 @@ -260,25 +262,25 @@ class DBVidcon: if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) - print(f"[回滚l1] 已退回 {len(payloads)} 条") + logger.info(f"[回滚l1] 已退回 {len(payloads)} 条") @redis_retry(max_retries=3) def rollback_l0(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) - print(f"[回滚l0] 已退回 {len(raws)} 条") + logger.info(f"[回滚l0] 已退回 {len(raws)} 条") @redis_retry(max_retries=3) def rollback_l2(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) - print(f"[回滚l2] 已退回 {len(raws)} 条") + logger.info(f"[回滚l2] 已退回 {len(raws)} 条") @mysql_retry() def upsert_video(self, data: dict): - print(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}") + logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}") data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_piracy", 3) @@ -331,18 +333,18 @@ class DBVidcon: # 回滚这次未提交的改动 self.conn.rollback() - print("[数据库写入异常]", str(e)) - print("[出错数据]:", data) + logger.info("[数据库写入异常]", str(e)) + logger.info("[出错数据]:", data) if attempt < max_retries: attempt += 1 - print(f"第 {attempt + 1} 次重试…") + logger.info(f"第 {attempt + 1} 次重试…") continue else: # 重试过后依然失败,推入 Redis 备用 - print("重试失败,将数据写入 Redis 以便后续处理") + logger.info("重试失败,将数据写入 Redis 以便后续处理") self.push_record(data) - print("[交由Redis处理]") + logger.info("[交由Redis处理]") break @mysql_retry() @@ -395,7 +397,7 @@ class DBVidcon: self.conn.commit() return self.cursor.lastrowid except Exception as e: - print(f"[log_batch_start] 插入失败:{e}") + logger.info(f"[log_batch_start] 插入失败:{e}") return None @mysql_retry() @@ -418,7 +420,7 @@ class DBVidcon: else: self.redis.connection_pool.disconnect() except Exception as e: - print("[Redis close error]", e) + logger.info("[Redis close error]", e) @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: @@ -481,7 +483,7 @@ class DBSA: _lock = threading.Lock() push_record_many = staticmethod( - lambda rows: print("[退回Redis] cnt=", len(rows)) + lambda rows: logger.info("[退回Redis] cnt=", len(rows)) ) @classmethod @@ -516,7 +518,7 @@ class DBSA: cls._buf_vid.append(vid_row) cls._buf_payload.append(data) # 保存原始 buf_len = len(cls._buf_vid) - print(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") + logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") need_flush = False flush_reason = "" @@ -528,7 +530,7 @@ class DBSA: flush_reason = "TIME" if need_flush: - print(f"DBSA 落 ({flush_reason}) ...") + logger.info(f"DBSA 落 ({flush_reason}) ...") cls.flush() @classmethod @@ -577,20 +579,20 @@ class DBSA: try: cls._bulk_insert(op_rows) cls._bulk_upsert(vid_rows) - print(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s") + logger.info(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s") except Exception as e: - print(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}") + logger.info(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}") # 批量退回原始 payload,字段最全 try: cls.push_record_many(payloads) except Exception as re: - print("[Redis 回退失败]", re) + logger.info("[Redis 回退失败]", re) @classmethod def update_video_stats(cls, locator:dict, stats:dict) -> int: """ - 立即更新 sh_dm_video_v2 表中的统计字段。 + 立即更新 sh_dm_video_v2 表中的统计字段。n :param locator: 用于定位行的字典,必须包含: v_xid, rn :param stats: 需要更新的统计字段,如 {"fans": 633, "videos": 10090, "view": 1678408} :return: 受影响的行数 diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..7258e70 --- /dev/null +++ b/logger.py @@ -0,0 +1,57 @@ +import logging +import os +from datetime import datetime +from logging.handlers import TimedRotatingFileHandler + + +class CustomHourlyHandler(TimedRotatingFileHandler): + def __init__(self, base_filename, level=logging.INFO, is_error=False): + # 初始化路径 + self.is_error = is_error + self.base_filename = base_filename + self.log_dir = self._get_log_dir() + filename = self._build_log_path() + + # 初始化 handler + super().__init__( + filename, + when='H', + interval=1, + backupCount=336, # 14天 * 24小时 = 336 小时日志 + encoding='utf-8', + utc=False + ) + self.setLevel(level) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + self.setFormatter(formatter) + + def _get_log_dir(self): + today = datetime.now().strftime("%Y-%m-%d") + log_dir = os.path.join("logs", today) + os.makedirs(log_dir, exist_ok=True) + return log_dir + + def _build_log_path(self): + hour = datetime.now().strftime("%H") + suffix = "err.log" if self.is_error else "app.log" + return os.path.join(self.log_dir, f"{hour}_{suffix}") + + def shouldRollover(self, record): + # 每小时轮换并重设路径 + result = super().shouldRollover(record) + if result: + self.baseFilename = os.path.abspath(self._build_log_path()) + return result + + + +logger = logging.getLogger("DailyMotion") +logger.setLevel(logging.DEBUG) + +if not logger.handlers: + logger.addHandler(CustomHourlyHandler("app", level=logging.DEBUG, is_error=False)) + logger.addHandler(CustomHourlyHandler("err", level=logging.ERROR, is_error=True)) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) + logger.addHandler(ch) diff --git a/main.py b/main.py index 4beb4f6..7fe5636 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ from dateutil import parser as date_parser import copy from threading import Lock from concurrent.futures import ThreadPoolExecutor, as_completed +from logger import logger db = DBVidcon() @@ -39,7 +40,7 @@ def clean_dash_to_zero(val): try: return int(val) except (ValueError, TypeError) as e: - print(f"[字段异常] val = {val} → {str(e)}") + logger.info(f"[字段异常] val = {val} → {str(e)}") return 0 @@ -48,7 +49,7 @@ def format_create_time(timestr): dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: - print(f"[时间格式错误] {timestr} → {str(e)}") + logger.info(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" @@ -114,8 +115,8 @@ def get_proxies(g): try: proxy_data = response.json()['data'][0] except Exception: - print(g) - print("数据返回解析错误!" + str(response.text)) + logger.info(g) + logger.info("数据返回解析错误!" + str(response.text)) time.sleep(5) return get_proxies(g) proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" @@ -145,7 +146,7 @@ def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, ) if resp.status_code == 401 and not token_refreshed: if verbose: - print("[post_with_retry] 收到 401,刷新 token 后重试") + logger.info("[post_with_retry] 收到 401,刷新 token 后重试") gettoken() token_refreshed = True continue @@ -155,22 +156,22 @@ def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, except RequestException as e: if verbose: - print(f"[{attempt}/{retries}] 请求失败: {e}") + logger.info(f"[{attempt}/{retries}] 请求失败: {e}") # 如果还没刷新过 token,就刷新一次 if not token_refreshed: if verbose: - print("[post_with_retry] 刷新 token 后再试") + logger.info("[post_with_retry] 刷新 token 后再试") gettoken(proxy_name) token_refreshed = True continue if attempt == retries: if verbose: - print(f"[post_with_retry] 最终失败:{url}") + logger.info(f"[post_with_retry] 最终失败:{url}") return None sleep_time = backoff_factor * (2 ** (attempt - 1)) if verbose: - print(f"[post_with_retry] 等待 {sleep_time}s 后重试…") + logger.info(f"[post_with_retry] 等待 {sleep_time}s 后重试…") time.sleep(sleep_time) @@ -217,14 +218,14 @@ def gettoken(proxy, r=2): _headers_cache = copy_headers return copy_headers except Exception as e: - print("[gettoken] 失败:", e) + logger.info("[gettoken] 失败:", e) if r > 0: time.sleep(5) return gettoken(proxy, r - 1) else: with _cache_lock: if _headers_cache: - print("[gettoken] 用缓存 headers 兜底") + logger.info("[gettoken] 用缓存 headers 兜底") return copy.deepcopy(_headers_cache) # 仍然没有 → 返回模板(没有 Auth) return copy.deepcopy(headers1) @@ -232,7 +233,7 @@ def gettoken(proxy, r=2): def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: - print(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") + logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] max_page = 2 limit = 30 @@ -600,16 +601,16 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): if errors or stories is None: # 有错误 或 stories 为 null if r == 0: - print("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) + logger.info("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) return None time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) resinfo = stories["edges"] - print("resinfo :", len(resinfo)) + logger.info("resinfo :", len(resinfo)) except Exception: if r < 0: - print("[搜索接口] 未知:未处理", response.text) - print("返回字段解析错误!") + logger.info("[搜索接口] 未知:未处理", response.text) + logger.info("返回字段解析错误!") return None else: time.sleep((3 - r) * 5) @@ -663,7 +664,7 @@ def search_worker(payload, kitem, flag): v_list = [] return True, flag, payload, kitem, v_list # 成功 except Exception as e: - print(f"[线程异常] {kitem['keyword']} → {e}") + logger.info(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() return False, flag, payload, kitem, [] # 失败 @@ -762,7 +763,7 @@ def parse_args() -> argparse.Namespace: if __name__ == '__main__': parse_args() start_time = datetime.datetime.now() - print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data_parallel() end_time = datetime.datetime.now() duration = end_time - start_time