DailyMotion/main.py

456 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import traceback
from urllib.parse import quote
import argparse
import time
import uuid
import concurrent.futures
import requests
import datetime
from requests import RequestException
from DB import DBVidcon, DBSA
from dateutil import parser as date_parser
import copy
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed
from logger import logger
import os
import urllib3
db = DBVidcon()
MACHINE_ID = None
MAX_WORKERS = 10
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
UserAgent = [
'User-Agent,Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'User-Agent,Mozilla/5.0 (Windows NT 6.1; rv,2.0.1) Gecko/20100101 Firefox/4.0.1',
'User-Agent,Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11',
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)',
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.2669.400 QQBrowser/9.6.10990.400',
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/5.0.3.4000 Chrome/47.0.2526.73 Safari/537.36',
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)']
def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take))
if max(part_ids) >= part_num:
raise ValueError(f"分片编号超出范围PART_IDS={part_ids} 超过 PART_NUM={part_num}")
next_offset = offset + take
if next_offset < part_num:
print(f"[提示] 下一台机器 offset 应该为: {next_offset}")
else:
print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器")
return part_ids
def clean_dash_to_zero(val):
if val in ('-', '', None):
return 0
try:
return int(val)
except (ValueError, TypeError) as e:
logger.exception(f"[字段异常] val = {val}{str(e)}")
return 0
def format_create_time(timestr):
try:
dt = date_parser.isoparse(timestr)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
logger.exception(f"[时间格式错误] {timestr}{str(e)}")
return "1970-01-01 00:00:00"
def format_duration(seconds):
try:
seconds = int(seconds)
return f"{seconds // 60:02}:{seconds % 60:02}"
except Exception:
return "00:00"
headers1 = {
'Accept': '*/*, */*',
# 'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# 'Content-Length': '8512',
'Content-Type': 'application/json, application/json',
'Host': 'graphql.api.dailymotion.com',
'Origin': 'https://www.dailymotion.com',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'X-DM-AppInfo-Id': 'com.dailymotion.neon',
'X-DM-AppInfo-Type': 'website',
'X-DM-AppInfo-Version': 'v2025-05-26T13:45:05.666Z',
'X-DM-Neon-SSR': '0',
'X-DM-Preferred-Country': 'tw',
'accept-language': 'zh-CN',
'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiY0c1Z1RocGRBbFIwVEVZeVhEVWNBMnNDTDFrUFFncDNRUTBNS3ciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ4NTI0MDU5LCJleHAiOjE3NDg1NjAwMDcsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.h27sfMMETgt0xKhQvFAGIpwInouNj2sFLOeb1Y74Orc',
'sec-ch-ua': '"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'x-dm-visit-id': '1748480937099',
'x-dm-visitor-id': '1032a5f1-d07f-4bef-b96d-7783939abfc9',
}
_headers_cache = None # 保存最近一次成功的 headers
_cache_lock = Lock()
Gproxies = None
def get_proxies(g):
url = "https://www.kookeey.com/pickdynamicips"
params = {
"auth": "pwd",
"format": "1",
"n": "1",
"p": "http",
"gate": "sea",
"g": g,
"r": "0",
"type": "json",
"sign": "10099426b05c7119e9c4dbd6a7a0aa4e",
"accessid": "2207189",
"dl": ","
}
try:
response = requests.get(url, params=params)
except RequestException:
return get_proxies(g)
try:
proxy_data = response.json()['data'][0]
except Exception:
logger.exception(g)
logger.exception("数据返回解析错误!" + str(response.text))
time.sleep(5)
return get_proxies(g)
proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}"
proxies = {
"http": proxies_url,
"https": proxies_url,
}
return proxies
def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None,
retries=5, timeout=10, backoff_factor=2, verbose=True):
token_refreshed = False
for attempt in range(1, retries + 1):
try:
proxy_str = db.get_proxy(proxy_name)
proxies = {"http": proxy_str, "https": proxy_str}
resp = requests.post(
url,
json=json_payload,
data=data,
headers=headers,
proxies=proxies,
timeout=timeout,
)
if resp.status_code == 401 and not token_refreshed:
if verbose:
logger.info("[post_with_retry] 收到 401刷新 token 后重试")
gettoken()
token_refreshed = True
continue
resp.raise_for_status()
return resp
except RequestException as e:
if verbose:
logger.info(f"[{attempt}/{retries}] 请求失败: {e}")
# 如果还没刷新过 token就刷新一次
if not token_refreshed:
if verbose:
logger.info("[post_with_retry] 刷新 token 后再试")
gettoken(proxy_name)
token_refreshed = True
continue
if attempt == retries:
if verbose:
logger.info(f"[post_with_retry] 最终失败:{url}")
return None
sleep_time = backoff_factor * (2 ** (attempt - 1))
if verbose:
logger.info(f"[post_with_retry] 等待 {sleep_time}s 后重试…")
time.sleep(sleep_time)
def gettoken(proxy, r=2):
global _headers_cache
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'https://www.dailymotion.com',
'Pragma': 'no-cache',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
u = uuid.uuid4()
uuid_with_dash = str(u)
# uuid_no_dash = u.hex
traffic_segment = str(random.randint(100_000, 999_999))
data = {
'client_id': 'f1a362d288c1b98099c7',
'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5',
'grant_type': 'client_credentials',
'traffic_segment': traffic_segment,
'visitor_id': uuid_with_dash,
}
try:
proxy_str = db.get_proxy(proxy)
url = 'https://graphql.api.dailymotion.com/oauth/token'
response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str},
verify=False)
token = response.json()['access_token']
copy_headers = copy.deepcopy(headers1)
copy_headers['authorization'] = "Bearer " + token
copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000))
copy_headers['x-dm-visitor-id'] = uuid_with_dash
copy_headers['User-Agent'] = UserAgent[random.randint(0, len(UserAgent) - 1)]
with _cache_lock:
_headers_cache = copy_headers
return copy_headers
except Exception as e:
logger.exception("[gettoken] 失败:", e)
if r > 0:
time.sleep(5)
return gettoken(proxy, r - 1)
else:
with _cache_lock:
if _headers_cache:
logger.info("[gettoken] 用缓存 headers 兜底")
return copy.deepcopy(_headers_cache)
# 仍然没有 → 返回模板(没有 Auth
return copy.deepcopy(headers1)
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
if r == 2:
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
video_list = []
max_page = 4
limit = 10
if level == 0 or level == 1:
max_page = 10
limit = 20
for j in range(1, max_page):
# 别展开 = = !
data = (
'{"operationName":"SEARCH_QUERY","variables":{"query":"%s","shouldIncludeTopResults":true,"shouldIncludeChannels":false,"shouldIncludePlaylists":false,"shouldIncludeHashtags":false,"shouldIncludeVideos":false,"shouldIncludeLives":false,"page":%d,"limit":%d,"recaptchaToken":null},"query":"fragment VIDEO_BASE_FRAGMENT on Video {\\n id\\n xid\\n title\\n createdAt\\n duration\\n aspectRatio\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment CHANNEL_BASE_FRAG on Channel {\\n id\\n xid\\n name\\n displayName\\n accountType\\n isFollowed\\n avatar(height: SQUARE_120) {\\n id\\n url\\n __typename\\n }\\n followerEngagement {\\n id\\n followDate\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n followers {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment PLAYLIST_BASE_FRAG on Collection {\\n id\\n xid\\n name\\n description\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n videos(filter: {visibility: {eq: PUBLIC}}) {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment HASHTAG_BASE_FRAG on Hashtag {\\n id\\n xid\\n name\\n metrics {\\n id\\n engagement {\\n id\\n videos {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment LIVE_BASE_FRAGMENT on Live {\\n id\\n xid\\n title\\n audienceCount\\n aspectRatio\\n isOnAir\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nquery SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) {\\n search(token: $recaptchaToken) {\\n id\\n stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n ...VIDEO_BASE_FRAGMENT\\n ...CHANNEL_BASE_FRAG\\n ...PLAYLIST_BASE_FRAG\\n ...HASHTAG_BASE_FRAG\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n videos(\\n query: $query\\n first: $limit\\n page: $page\\n sort: $sortByVideos\\n durationMin: $durationMinVideos\\n durationMax: $durationMaxVideos\\n createdAfter: $createdAfterVideos\\n ) @include(if: $shouldIncludeVideos) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...VIDEO_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...CHANNEL_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...PLAYLIST_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...HASHTAG_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n}\\n"}' % (
keyword,
j, limit)).encode()
response = post_with_retry(
"https://graphql.api.dailymotion.com/",
data=data,
headers=headers,
proxy_name=proxy_name
)
if response is None:
return None
jsondata = response.json()
try:
errors = jsondata.get("errors") # GraphQL errors 数组
stories = jsondata.get("data", {}).get("search", {}).get("stories")
if errors or stories is None: # 有错误 或 stories 为 null
if r == 0:
logger.info("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False))
return None
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
resinfo = stories["edges"]
logger.info(f"resinfo: {len(resinfo)}")
except Exception:
if r < 0:
logger.exception("[搜索接口] 未知:未处理", response.text)
logger.exception("返回字段解析错误!")
return None
else:
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * limit
node = iteminfo['node']
if node['__typename'] != "Video":
continue
creator = node['creator']
duration = node.get('duration')
if duration <= 300:
continue
v_data = {
"index": calculated_index,
"v_id": node.get("id"),
"v_xid": node.get('xid'),
"link": "https://www.dailymotion.com/video/" + node.get('xid'),
"title": node.get("title"),
"createtime": node.get("createdAt"),
"duration": node.get("duration"),
"pic": node.get("thumbnail", {}).get("url"),
"view": 0,
"fans": 0,
"videos": 0,
"u_id": creator.get('id'),
"u_xid": creator.get('xid'),
"u_name": creator.get('name'),
"u_pic": node.get('thumbnail').get('url')
}
video_list.append(v_data)
time.sleep(1)
return video_list
proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem, flag):
try:
gproxies = proxiesdict[kitem['rn']]
header = gettoken(gproxies)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies)
if not v_list:
for i in range(2):
time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies)
if v_list:
break
time.sleep(2)
if not v_list:
v_list = []
return True, flag, payload, kitem, v_list # 成功
except Exception as e:
logger.exception(f"[线程异常] {kitem['keyword']}{e}")
traceback.print_exc()
return False, flag, payload, kitem, [] # 失败
executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
def integrate_data_parallel():
while True:
global proxiesdict
proxiesdict = db.get_proxy_agent_dict()
tasks, flag = db.item_keyword()
if not tasks:
time.sleep(10)
continue
for payload, kitem in tasks:
proxname = proxiesdict.get(kitem['rn'])
print(proxname)
h = gettoken(proxname)
print(h)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], h, proxiesdict)
# futures = [
# executor.submit(search_worker, payload, kitem, flag)
# for payload, kitem in tasks
# ]
#
# # 统计回滚
# rollback = {0: [], 1: [], 2: []}
#
# for fut in concurrent.futures.as_completed(futures):
# ok, f_flag, payload, kitem, v_list = fut.result()
#
# if not ok:
# rollback[f_flag].append(payload)
# continue
for item in v_list:
if not v_list:
continue
DBSA.upsert_video({
"keyword": kitem["keyword"],
"v_name": kitem["v_name"],
"v_id": item["v_id"],
"v_xid": item["v_xid"],
"link": item["link"],
"title": item["title"],
"duration": format_duration(item["duration"]),
"fans": clean_dash_to_zero(item["fans"]),
"videos": clean_dash_to_zero(item["videos"]),
"watch_number": clean_dash_to_zero(item["view"]),
"create_time": format_create_time(item["createtime"]),
"cover_pic": item["pic"],
"index": item["index"],
"u_id": item["u_id"],
"u_xid": item["u_xid"],
"u_name": item["u_name"],
"u_pic": item["u_pic"],
"rn": kitem["rn"],
"batch": kitem["batch"],
"machine_id": MACHINE_ID,
"level": kitem["level"],
})
DBSA.flush()
if rollback[0]:
db.rollback_l0(rollback[0])
if rollback[1]:
db.rollback_l1(rollback[1])
if rollback[2]:
db.rollback_l2(rollback[2])
time.sleep(10)
def parse_args() -> argparse.Namespace:
global MACHINE_ID, MAX_WORKERS
parser = argparse.ArgumentParser(
description="Configure worker settings."
)
parser.add_argument(
"-m", "--machine-id",
type=int,
help=f"Machine identifier (default: {MACHINE_ID})"
)
parser.add_argument(
"-w", "--max-workers",
type=int,
help=f"Maximum concurrent workers (default: {MAX_WORKERS})"
)
args = parser.parse_args()
if args.machine_id is not None:
MACHINE_ID = args.machine_id
if args.max_workers is not None:
if args.max_workers <= 0:
parser.error("--max-workers 不能是 0")
MAX_WORKERS = args.max_workers
if MACHINE_ID is None:
raise ValueError("请指定机器编号")
return args
if __name__ == '__main__':
parse_args()
start_time = datetime.datetime.now()
logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
integrate_data_parallel()
end_time = datetime.datetime.now()
duration = end_time - start_time