305 lines
11 KiB
Python
305 lines
11 KiB
Python
import time
|
||
import functools
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
|
||
import requests
|
||
from playwright.sync_api import (
|
||
sync_playwright,
|
||
TimeoutError as PlaywrightTimeoutError,
|
||
Page,
|
||
Browser,
|
||
)
|
||
|
||
|
||
def solve_turnstile_capsolver(page: Page,
|
||
timeout: int = 120) -> bool:
|
||
"""
|
||
使用 CapSolver 自动完成当前 Page 上的 Cloudflare Turnstile。
|
||
成功返回 True,失败/超时返回 False。
|
||
"""
|
||
cap_key = "CAP-A76C932D4C6CCB3CA748F77FDC07D996"
|
||
widget = page.query_selector("div.cf-turnstile[data-sitekey]")
|
||
if not widget:
|
||
return False
|
||
sitekey = widget.get_attribute("data-sitekey")
|
||
page_url = page.url
|
||
|
||
create_payload = {
|
||
"clientKey": cap_key,
|
||
"task": {
|
||
"type": "TurnstileTaskProxyLess",
|
||
"websiteURL": page_url,
|
||
"websiteKey": sitekey
|
||
}
|
||
}
|
||
create_resp = requests.post(
|
||
"https://api.capsolver.com/createTask",
|
||
json=create_payload, timeout=20
|
||
).json()
|
||
if create_resp.get("errorId"):
|
||
print("[CapSolver] createTask 失败:", create_resp)
|
||
return False
|
||
task_id = create_resp["taskId"]
|
||
|
||
poll_payload = {"clientKey": cap_key, "taskId": task_id}
|
||
token = None
|
||
elapsed, step = 0, 3
|
||
while elapsed < timeout:
|
||
time.sleep(step)
|
||
elapsed += step
|
||
res = requests.post(
|
||
"https://api.capsolver.com/getTaskResult",
|
||
json=poll_payload, timeout=15
|
||
).json()
|
||
if res.get("status") == "ready":
|
||
token = res["solution"]["token"]
|
||
break
|
||
if res.get("status") != "processing":
|
||
print("[CapSolver] getTaskResult 异常:", res)
|
||
return False
|
||
|
||
if not token:
|
||
print("[CapSolver] 超时未取到 token")
|
||
return False
|
||
|
||
page.evaluate(
|
||
"""(tk) => {
|
||
const ta = document.querySelector('textarea[name="cf-turnstile-response"]');
|
||
if (ta) ta.value = tk;
|
||
if (window.turnstileCallback)
|
||
try { window.turnstileCallback(tk); } catch(e){}
|
||
}""",
|
||
token
|
||
)
|
||
page.wait_for_timeout(1500)
|
||
return True
|
||
|
||
|
||
def require_login(func):
|
||
@functools.wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
self.ensure_login()
|
||
return func(self, *args, **kwargs)
|
||
|
||
return wrapper
|
||
|
||
|
||
class DailymotionClient:
|
||
url = "https://faq.dailymotion.com/hc/en-us/requests/new"
|
||
EMAIL = "copyright@qiyi.com"
|
||
PASSWORD = "ppsIQIYI2018@"
|
||
|
||
def __init__(self, headless: bool = False):
|
||
self.email = self.EMAIL
|
||
self.password = self.PASSWORD
|
||
self.headless = headless
|
||
self.check_interval = 60 * 60
|
||
|
||
self._pw = sync_playwright().start()
|
||
self.browser: Browser = self._pw.chromium.launch(
|
||
headless=self.headless,
|
||
proxy={'server': 'http://127.0.0.1:7890'}
|
||
)
|
||
self.context = self.browser.new_context(
|
||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/122.0.0.0 Safari/537.36",
|
||
locale="en-US",
|
||
viewport={"width": 1280, "height": 800},
|
||
timezone_id="Asia/Shanghai",
|
||
permissions=[],
|
||
)
|
||
self.context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
self.page: Page = self.context.new_page()
|
||
|
||
self._last_check_ts = 0
|
||
self._last_check_result = False
|
||
os.makedirs('screenshots', exist_ok=True)
|
||
self.page.goto(self.url)
|
||
|
||
def _do_login(self) -> None:
|
||
self.page.goto(self.url, timeout=30000)
|
||
self.page.wait_for_load_state("networkidle", timeout=30000)
|
||
|
||
file_path = f'screenshots/{str(int(time.time()))}.png'
|
||
self.page.screenshot(path=file_path)
|
||
|
||
if self.page.query_selector("div.cf-turnstile[data-sitekey]"):
|
||
ok = solve_turnstile_capsolver(self.page)
|
||
if not ok:
|
||
raise RuntimeError("CapSolver 处理 Turnstile 失败")
|
||
|
||
logbtn = self.page.locator("//a[@class='login button']")
|
||
if logbtn.count() > 0:
|
||
logbtn.nth(0).click()
|
||
|
||
self.page.wait_for_selector("//input[@data-testid=\"emailInput\"]")
|
||
|
||
# “我了解”弹窗
|
||
i_now_btn = self.page.locator("button:has-text(\"I understand\")")
|
||
if i_now_btn.count() > 0:
|
||
i_now_btn.click()
|
||
|
||
# 输入账号密码
|
||
email_edit = self.page.locator("//input[@data-testid=\"emailInput\"]")
|
||
password_edit = self.page.locator("//input[@data-testid=\"passwordInput\"]")
|
||
if email_edit.count():
|
||
email_edit.fill(self.email)
|
||
if password_edit.count():
|
||
password_edit.fill(self.password)
|
||
|
||
# 登录
|
||
login_btn = self.page.locator('button[form="signin-form"][type="submit"]')
|
||
try:
|
||
self.page.wait_for_selector(
|
||
'button[form="signin-form"][type="submit"]:not([disabled])', timeout=20000
|
||
)
|
||
except PlaywrightTimeoutError:
|
||
pass
|
||
login_btn.click()
|
||
|
||
# 等待跳回
|
||
self.page.wait_for_url(self.url, timeout=30000)
|
||
time.sleep(1)
|
||
self._last_check_ts = time.time()
|
||
self._last_check_result = True
|
||
|
||
def _detect_login(self) -> bool:
|
||
self.page.goto(self.url, timeout=30000)
|
||
self.page.wait_for_load_state("networkidle", timeout=30000)
|
||
return self.page.locator("//a[@class='login button']").count() == 0
|
||
|
||
def is_logged_in(self) -> bool:
|
||
now = time.time()
|
||
if now - self._last_check_ts < self.check_interval:
|
||
return self._last_check_result
|
||
|
||
try:
|
||
ok = self._detect_login()
|
||
except Exception:
|
||
ok = False
|
||
|
||
self._last_check_ts = now
|
||
self._last_check_result = ok
|
||
return ok
|
||
|
||
def ensure_login(self) -> None:
|
||
if not self.is_logged_in():
|
||
self._do_login()
|
||
|
||
@require_login
|
||
def process_ticket(self, title, link):
|
||
# titles = '\r\n'.join(title)
|
||
description = """We request that you take immediate actionto stop the infringing activity, take steps to ensure that iQIYI Content is notre-posted on, re-linked to, or otherwise available through your site. Pleaseinform us of the actions you have taken and their results.
|
||
1) please help remove these videos
|
||
2) The drama series titles are {}
|
||
""".format(title)
|
||
# likls = ["\"" + l + "\"" for l in link]
|
||
# links = ','.join(likls)
|
||
if self.page.query_selector("div.cf-turnstile[data-sitekey]"):
|
||
ok = solve_turnstile_capsolver(self.page)
|
||
if not ok:
|
||
raise RuntimeError("CapSolver 处理 Turnstile 失败")
|
||
file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png'
|
||
self.page.screenshot(path=file_path)
|
||
resports = self.page.locator('li.blocks-item:nth-child(8)')
|
||
resports.click()
|
||
|
||
time.sleep(2)
|
||
|
||
cc = self.page.locator("input#request_collaborators_")
|
||
cc.scroll_into_view_if_needed()
|
||
cc.click()
|
||
cc.type("duke.chen@dailymotion.com")
|
||
|
||
self.page.get_by_role("button", name="Copyright infringement").click()
|
||
time.sleep(1)
|
||
self.page.get_by_role("button", name="Notification").nth(0).click()
|
||
time.sleep(1)
|
||
self.page.get_by_role("button", name="A legal entity").click()
|
||
time.sleep(1)
|
||
self.page.get_by_label("Corporate name").fill("Beijing iQIYI Science & Technology Co.,Ltd")
|
||
time.sleep(1)
|
||
self.page.get_by_label("Legal status").fill("Legal Department")
|
||
time.sleep(1)
|
||
self.page.get_by_label("Subject").fill("Copyright infringement Notification")
|
||
time.sleep(1)
|
||
self.page.get_by_label("Please indicate the URL of the video you would like to report.").fill(link)
|
||
time.sleep(1)
|
||
self.page.get_by_label("Description").nth(1).fill(description)
|
||
time.sleep(1)
|
||
self.page.get_by_label("I state in good faith", exact=False).check()
|
||
time.sleep(1)
|
||
self.page.get_by_label("I state in good faith that the use of the Protected", exact=False).check()
|
||
time.sleep(1)
|
||
self.page.get_by_role("checkbox", name="I certify that all information provided", exact=False).check()
|
||
time.sleep(1)
|
||
self.page.get_by_role("checkbox", name="I acknowledge that my statements", exact=False).check()
|
||
time.sleep(1)
|
||
self.page.get_by_role("checkbox", name="The data provided through this form", exact=False).check()
|
||
time.sleep(1)
|
||
self.page.get_by_role("checkbox", name="By submitting the present form,", exact=False).check()
|
||
|
||
time.sleep(1)
|
||
self.page.get_by_role("button", name="Submit").click()
|
||
time.sleep(2)
|
||
file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png'
|
||
locator = self.page.locator("//dt[normalize-space(.)='Id']/following-sibling::dd[1]")
|
||
raw_text = locator.text_content()
|
||
match = re.search(r'\d+', raw_text or '')
|
||
report_id = match.group() if match else None
|
||
status_raw = self.page.locator("span.status-label").text_content()
|
||
subsequent_status = status_raw.strip().lower() if status_raw else None
|
||
time_elem = self.page.locator("dt", has_text="Created").locator("xpath=following-sibling::dd[1]/time")
|
||
|
||
datetime_str = time_elem.get_attribute("datetime") # e.g. 2025-06-12T06:15:33+00:00
|
||
if datetime_str:
|
||
dt = datetime.fromisoformat(datetime_str.replace("Z", "+00:00")) # 安全处理 ISO 时间
|
||
timestamp = int(dt.timestamp())
|
||
else:
|
||
timestamp = None
|
||
self.page.screenshot(path=file_path)
|
||
if self.page.url != self.url:
|
||
self.page.goto(self.url, timeout=30000)
|
||
|
||
return file_path, report_id, subsequent_status, timestamp
|
||
|
||
@require_login
|
||
def report_follow_up(self, report_id: str):
|
||
txt = "I am the authorized agent of Beijing iQIYI Technology Co., Ltd., responsible for dealing with unauthorized overseas distribution of pirated videos of our works.We have confirmed that the above links contain infringing content and we insist on requesting to takedown. Thank you!"
|
||
self.page.goto("https://faq.dailymotion.com/hc/en-us/requests/{}".format(report_id), timeout=30000)
|
||
self.page.wait_for_load_state("networkidle", timeout=3000)
|
||
|
||
status_raw = self.page.locator("span.status-label").text_content()
|
||
subsequent_status = status_raw.strip().lower() if status_raw else None
|
||
if "awaiting your reply" in subsequent_status:
|
||
pass
|
||
# TODO: 处理其他状态
|
||
elif "open" in subsequent_status:
|
||
pass
|
||
# 处理 Open 状态
|
||
elif "solved" in subsequent_status:
|
||
# 处理 Solved 状态
|
||
return
|
||
|
||
def close(self):
|
||
try:
|
||
self.page.close()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self.browser.close()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self._pw.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
dm = DailymotionClient()
|
||
dm.process_ticket()
|