feat: using Playwright

This commit is contained in:
晓丰 2025-06-01 15:01:57 +08:00
parent 30a28d66bb
commit 2c0a55232d

View File

@ -0,0 +1,133 @@
import json
import random
import time
from box import Box
from playwright.sync_api import sync_playwright
from DB import DBVidcon, DBSA
from urllib.parse import quote, quote_plus
db = DBVidcon()
kitm, flag = db.item_keyword(1)
kw = json.loads(kitm[0][0]).get('keyword')
print(kw)
parsekw = quote(kw)
url = f"https://www.dailymotion.com/search/{parsekw}/top-results"
print(url)
target_count = 200
def wait_for_search_response(page, timeout: int = 30000):
monitor_url = "https://graphql.api.dailymotion.com"
with page.expect_response(
lambda resp: (
monitor_url in resp.url
and resp.status == 200
and resp.request.post_data
and '"operationName":"SEARCH_QUERY"' in resp.request.post_data
),
timeout=timeout
) as resp_info:
pass
return resp_info.value
def human_scroll(page):
vs = page.viewport_size
# 大范围下滚:随机 5~8 次,每次滚动 视口高度的 30%~50%
# 先做几次大幅度下滚,确保触发下一页加载
for _ in range(random.randint(4, 6)):
# 随机抖动鼠标
x = random.randint(0, vs["width"])
y = random.randint(vs["height"] // 3, vs["height"])
page.mouse.move(x, y, steps=random.randint(5, 15))
page.wait_for_timeout(random.randint(50, 100))
# 向下滚动 视口高度的 40%~60%
amount = int(vs["height"] * random.uniform(0.4, 0.6))
page.mouse.wheel(0, amount)
page.wait_for_timeout(random.randint(200, 400))
# 若要“回滚”但不干扰下一次下滚触发,在下滚后隔几次才做小幅上滚
if random.random() < 0.3: # 30% 概率才做上滚
up_steps = random.randint(1, 2)
for _ in range(up_steps):
x = random.randint(0, vs["width"])
y = random.randint(vs["height"] // 3, vs["height"])
page.mouse.move(x, y, steps=random.randint(5, 15))
page.wait_for_timeout(random.randint(50, 100))
# 上滚 视口高度的 5%~10%
up_amount = int(vs["height"] * random.uniform(0.05, 0.1))
page.mouse.wheel(0, -up_amount)
page.wait_for_timeout(random.randint(150, 300))
with sync_playwright() as pw:
browser = pw.chromium.launch(
headless=False,
proxy={
"server": "http://127.0.0.1:7890"
}
)
page = browser.new_page()
def handle_route(route, request):
if request.resource_type in ["image", "font"]:
return route.abort()
return route.continue_()
page.route("**/*", handle_route)
page.goto(url)
resp = wait_for_search_response(page, timeout=20000)
box = Box(resp.json())
edges = box.data.search.stories.edges or []
all_edges = list(edges)
if len(edges) == 0:
page.reload()
page.wait_for_load_state("networkidle")
resp = wait_for_search_response(page)
box = Box(resp.json())
edges = box.data.search.stories.edges or []
all_edges = list(edges)
# 如果拿到了少于20条但大于0条则直接退出
if 0 < len(edges) < 20:
pass # all_edges 里已经是这些条目
# 如果正好是20条就进入上下滚循环直到累计 >= target_count 或拿到少于20但大于0条时退出
elif len(edges) == 20:
while True:
human_scroll(page)
time.sleep(0.5)
resp = wait_for_search_response(page)
box = Box(resp.json())
edges = box.data.search.stories.edges or []
all_edges.extend(edges)
# 累计超过 target_count就终止
if len(all_edges) >= target_count:
break
# 如果当前批次小于20条但大于0条就终止
if 0 < len(edges) < 20:
break
# 如果这一轮连一条也没拿到,就刷新页面再重试
if len(edges) == 0:
page.reload()
page.wait_for_load_state("networkidle")
resp = wait_for_search_response(page)
box = Box(resp.json())
edges = box.data.search.stories.edges or []
all_edges.extend(edges)
if edges and len(edges) < 20:
break
# 最终截断到 target_count
all_edges = all_edges[:target_count]
browser.close()