DailyMotion/DB.py

256 lines
8.5 KiB
Python

import json
import redis
import pymysql
import time
class DBVidcon:
_MYSQL_CONF = {
"host": "192.144.230.75",
"port": 3306,
"user": "db_vidcon",
"password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
_REDIS_CONF = {
"host": "192.144.230.75",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
def __init__(self):
self.list_key = "video_kw_queue"
self.record_list_key = "error_save_queue"
self.urgent_list_key = "video_urgent_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
self.redis = redis.Redis(**self._REDIS_CONF)
def _connect_redis(self):
"""初始化或重建 Redis 客户端"""
self.redis = redis.Redis(**self._REDIS_CONF)
def reconnect_redis(self):
"""当捕获到 ConnectionError 时,重连 Redis"""
try:
self._connect_redis()
except Exception as e:
print("[Redis reconnect error]", e)
time.sleep(2)
def push_record(self, data: dict):
raw = json.dumps(data, ensure_ascii=False)
try:
self.redis.lpush(self.record_list_key, raw)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
self.redis.lpush(self.record_list_key, raw)
def fetch_records(self, count: int = 100):
try:
raws = self.redis.lpop(self.record_list_key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(self.record_list_key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
return self.fetch_records(count)
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
data = json.loads(raw)
out.append((raw, data))
except json.JSONDecodeError:
continue
return out
def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key
try:
raws = self.redis.lpop(key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError as e:
print("[Redis pop error]", e)
self.reconnect_redis()
return []
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
out.append((raw, json.loads(raw)))
except json.JSONDecodeError:
continue
return out
def rollback_records(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.record_list_key, *raws)
def rollback_urgent(self, raws):
if isinstance(raws, str):
raws = [raws]
try:
self.redis.lpush(self.urgent_list_key, *raws)
except redis.exceptions.ConnectionError as e:
print("[Redis urgent rollback error]", e)
self.reconnect_redis()
self.redis.lpush(self.urgent_list_key, *raws)
def item_keyword(self, count: int = 100):
try:
urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key)
except Exception as e:
print("[Redis urgent pop error]", e)
self.reconnect_redis()
urgent_items = []
if urgent_items:
return urgent_items, 1
try:
items = self.fetch_from_redis(count, list_key=self.list_key)
except Exception as e:
print("[Redis normal pop error]", e)
self.reconnect_redis()
return [], 0
return items, 2
def rollback(self, payloads):
if not payloads:
return
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.list_key, *payloads)
print(f"[回滚] 已退回 {len(payloads)}")
def upsert_video(self, data: dict):
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0)
max_retries = 1 # 除了第一次外,再重试一次
attempt = 0
while True:
try:
# 1) 先读 is_repeat
select_repeat = """
SELECT is_repeat
FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
LIMIT 1
"""
self.cursor.execute(select_repeat, data)
row = self.cursor.fetchone()
if row:
data['is_repeat'] = row[0]
# 2) 插入到 op 表
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
# 3) 删除旧表行
sql_del = """
DELETE FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
"""
self.cursor.execute(sql_del, data)
# 4) 插入新表
sql_ins = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
is_piracy, edition, duration,
watch_number, follow_number, video_number,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime,
is_repeat
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
%(is_piracy)s, '', %(duration)s,
%(watch_number)s, %(fans)s, %(videos)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(),
%(is_repeat)s
)
"""
self.cursor.execute(sql_ins, data)
break # 成功跳出重试循环
except Exception as e:
# 回滚这次未提交的改动
self.conn.rollback()
print("[数据库写入异常]", str(e))
print("[出错数据]:", data)
if attempt < max_retries:
attempt += 1
print(f"{attempt + 1} 次重试…")
continue
else:
# 重试过后依然失败,推入 Redis 备用
print("重试失败,将数据写入 Redis 以便后续处理")
self.push_record(data)
print("[交由Redis处理]")
break
def flush(self):
"""批量执行完后手动提交。"""
self.conn.commit()
def close(self):
self.cursor.close()
self.conn.close()
def get_proxy(self, region_code: str) -> str:
"""
从 Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回。
如果队列为空,阻塞
"""
proxy = ""
while True:
key = f"proxy_queue:{region_code}"
proxy = self.redis.lpop(key)
if proxy is None:
time.sleep(10)
else:
break
return proxy