271 lines
9.1 KiB
Python
271 lines
9.1 KiB
Python
import json
|
|
import redis
|
|
import pymysql
|
|
import time
|
|
|
|
|
|
class DBVidcon:
|
|
_MYSQL_CONF = {
|
|
"host": "192.144.230.75", # "127.0.0.1", #
|
|
"port": 3306, # 3307, #
|
|
"user": "db_vidcon",
|
|
"password": "rexdK4fhCCiRE4BZ",
|
|
"database": "db_vidcon",
|
|
"charset": "utf8mb4",
|
|
"cursorclass": pymysql.cursors.DictCursor,
|
|
}
|
|
_REDIS_CONF = {
|
|
"host": "192.144.230.75", # "127.0.0.1", #
|
|
"port": 6379, # 6380, #
|
|
"password": "qwert@$123!&",
|
|
"decode_responses": True,
|
|
}
|
|
|
|
def __init__(self):
|
|
self.list_key = "video_kw_queue"
|
|
self.error_list_key = "error_save_queue"
|
|
self.urgent_list_key = "video_urgent_queue"
|
|
self.conn = pymysql.connect(**self._MYSQL_CONF)
|
|
self.cursor = self.conn.cursor()
|
|
self.redis = redis.Redis(**self._REDIS_CONF)
|
|
|
|
def _connect_redis(self):
|
|
"""初始化或重建 Redis 客户端"""
|
|
self.redis = redis.Redis(**self._REDIS_CONF)
|
|
|
|
def reconnect_redis(self):
|
|
"""当捕获到 ConnectionError 时,重连 Redis"""
|
|
try:
|
|
self._connect_redis()
|
|
except Exception as e:
|
|
print("[Redis reconnect error]", e)
|
|
time.sleep(2)
|
|
|
|
def push_record(self, data: dict):
|
|
raw = json.dumps(data, ensure_ascii=False)
|
|
try:
|
|
self.redis.lpush(self.record_list_key, raw)
|
|
except redis.exceptions.ConnectionError:
|
|
self.reconnect_redis()
|
|
self.redis.lpush(self.record_list_key, raw)
|
|
|
|
def fetch_records(self, count: int = 100):
|
|
try:
|
|
raws = self.redis.lpop(self.record_list_key, count)
|
|
except TypeError:
|
|
raws = []
|
|
for _ in range(count):
|
|
item = self.redis.rpop(self.record_list_key)
|
|
if item is None:
|
|
break
|
|
raws.append(item)
|
|
except redis.exceptions.ConnectionError:
|
|
self.reconnect_redis()
|
|
return self.fetch_records(count)
|
|
|
|
if not raws:
|
|
return []
|
|
if isinstance(raws, str):
|
|
raws = [raws]
|
|
|
|
out = []
|
|
for raw in raws:
|
|
try:
|
|
data = json.loads(raw)
|
|
out.append((raw, data))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return out
|
|
|
|
def fetch_from_redis(self, count: int = 100, list_key: str = None):
|
|
key = list_key
|
|
try:
|
|
raws = self.redis.lpop(key, count)
|
|
except TypeError:
|
|
raws = []
|
|
for _ in range(count):
|
|
item = self.redis.rpop(key)
|
|
if item is None:
|
|
break
|
|
raws.append(item)
|
|
except redis.exceptions.ConnectionError as e:
|
|
print("[Redis pop error]", e)
|
|
self.reconnect_redis()
|
|
return []
|
|
if not raws:
|
|
return []
|
|
if isinstance(raws, str):
|
|
raws = [raws]
|
|
out = []
|
|
for raw in raws:
|
|
try:
|
|
out.append((raw, json.loads(raw)))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return out
|
|
|
|
def get_proxy_agent_dict(self) -> dict:
|
|
sql = "SELECT rn, parameter FROM proxy_agent"
|
|
self.cursor.execute(sql)
|
|
rows = self.cursor.fetchall()
|
|
result = {row['rn']: row['parameter'] for row in rows}
|
|
return result
|
|
|
|
def get_proxy_parameter(self, rn: str) -> str:
|
|
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
|
|
self.cursor.execute(sql, (rn,))
|
|
result = self.cursor.fetchone()
|
|
print(result)
|
|
return result['parameter'] if result else None
|
|
|
|
def rollback_records(self, raws):
|
|
if isinstance(raws, str):
|
|
raws = [raws]
|
|
self.redis.lpush(self.urgent_list_key, *raws)
|
|
|
|
def rollback_urgent(self, raws):
|
|
if isinstance(raws, str):
|
|
raws = [raws]
|
|
try:
|
|
self.redis.lpush(self.urgent_list_key, *raws)
|
|
except redis.exceptions.ConnectionError as e:
|
|
print("[Redis urgent rollback error]", e)
|
|
self.reconnect_redis()
|
|
self.redis.lpush(self.urgent_list_key, *raws)
|
|
|
|
def item_keyword(self, count: int = 100):
|
|
try:
|
|
urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key)
|
|
except Exception as e:
|
|
print("[Redis urgent pop error]", e)
|
|
self.reconnect_redis()
|
|
urgent_items = []
|
|
if urgent_items:
|
|
return urgent_items, 1
|
|
try:
|
|
items = self.fetch_from_redis(count, list_key=self.list_key)
|
|
except Exception as e:
|
|
print("[Redis normal pop error]", e)
|
|
self.reconnect_redis()
|
|
return [], 0
|
|
return items, 2
|
|
|
|
def rollback(self, payloads):
|
|
if not payloads:
|
|
return
|
|
if isinstance(payloads, str):
|
|
payloads = [payloads]
|
|
self.redis.rpush(self.list_key, *payloads)
|
|
print(f"[回滚] 已退回 {len(payloads)} 条")
|
|
|
|
def upsert_video(self, data: dict):
|
|
data.setdefault("a_id", 0)
|
|
data.setdefault("history_status", "")
|
|
data.setdefault("is_piracy", 3)
|
|
data.setdefault("is_repeat", 3)
|
|
data["sort"] = data.get("index", 0)
|
|
|
|
max_retries = 1 # 除了第一次外,再重试一次
|
|
attempt = 0
|
|
|
|
while True:
|
|
try:
|
|
# 1) 先读 is_repeat
|
|
select_repeat = """
|
|
SELECT is_repeat
|
|
FROM sh_dm_video_v2
|
|
WHERE rn = %(rn)s
|
|
AND v_xid = %(v_xid)s
|
|
LIMIT 1
|
|
"""
|
|
self.cursor.execute(select_repeat, data)
|
|
row = self.cursor.fetchone()
|
|
if row:
|
|
data['is_repeat'] = row.get('is_repeat', 3)
|
|
|
|
# 2) 插入到 op 表
|
|
sql_op = """
|
|
INSERT INTO sh_dm_video_op_v2 (
|
|
v_id, v_xid, a_id, level, name_title,
|
|
keyword, rn, history_status, is_repeat,
|
|
sort, createtime, updatetime, batch, machine
|
|
) VALUES (
|
|
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
|
|
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
|
|
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
|
|
)
|
|
"""
|
|
self.cursor.execute(sql_op, data)
|
|
|
|
# 3) 删除旧表行
|
|
sql_del = """
|
|
DELETE FROM sh_dm_video_v2
|
|
WHERE rn = %(rn)s
|
|
AND v_xid = %(v_xid)s
|
|
"""
|
|
self.cursor.execute(sql_del, data)
|
|
|
|
# 4) 插入新表
|
|
sql_ins = """
|
|
INSERT INTO sh_dm_video_v2 (
|
|
v_id, v_xid, rn, v_name, title, link,
|
|
is_piracy, edition, duration,
|
|
watch_number, follow_number, video_number,
|
|
public_time, cover_pic, sort,
|
|
u_xid, u_id, u_pic, u_name,
|
|
status, createtime, updatetime,
|
|
is_repeat
|
|
) VALUES (
|
|
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
|
|
%(is_piracy)s, '', %(duration)s,
|
|
%(watch_number)s, %(fans)s, %(videos)s,
|
|
%(create_time)s, %(cover_pic)s, %(sort)s,
|
|
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
|
|
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(),
|
|
%(is_repeat)s
|
|
)
|
|
"""
|
|
self.cursor.execute(sql_ins, data)
|
|
break # 成功跳出重试循环
|
|
|
|
except Exception as e:
|
|
|
|
# 回滚这次未提交的改动
|
|
self.conn.rollback()
|
|
print("[数据库写入异常]", str(e))
|
|
print("[出错数据]:", data)
|
|
|
|
if attempt < max_retries:
|
|
attempt += 1
|
|
print(f"第 {attempt + 1} 次重试…")
|
|
continue
|
|
else:
|
|
# 重试过后依然失败,推入 Redis 备用
|
|
print("重试失败,将数据写入 Redis 以便后续处理")
|
|
self.push_record(data)
|
|
print("[交由Redis处理]")
|
|
break
|
|
|
|
def flush(self):
|
|
"""批量执行完后手动提交。"""
|
|
self.conn.commit()
|
|
|
|
def close(self):
|
|
self.cursor.close()
|
|
self.conn.close()
|
|
|
|
def get_proxy(self, region_code: str) -> str:
|
|
"""
|
|
从 Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回。
|
|
如果队列为空,阻塞
|
|
"""
|
|
proxy = ""
|
|
while True:
|
|
key = f"proxy_queue:{region_code}"
|
|
proxy = self.redis.lpop(key)
|
|
if proxy is None:
|
|
time.sleep(10)
|
|
else:
|
|
break
|
|
return proxy
|