import time import functools import os from playwright.sync_api import ( sync_playwright, TimeoutError as PlaywrightTimeoutError, Page, Browser, ) def solve_turnstile_capsolver(page: Page, timeout: int = 120) -> bool: """ 使用 CapSolver 自动完成当前 Page 上的 Cloudflare Turnstile。 成功返回 True,失败/超时返回 False。 """ cap_key = "CAP-A76C932D4C6CCB3CA748F77FDC07D996" widget = page.query_selector("div.cf-turnstile[data-sitekey]") if not widget: return False sitekey = widget.get_attribute("data-sitekey") page_url = page.url create_payload = { "clientKey": cap_key, "task": { "type": "TurnstileTaskProxyLess", "websiteURL": page_url, "websiteKey": sitekey } } create_resp = requests.post( "https://api.capsolver.com/createTask", json=create_payload, timeout=20 ).json() if create_resp.get("errorId"): print("[CapSolver] createTask 失败:", create_resp) return False task_id = create_resp["taskId"] poll_payload = {"clientKey": cap_key, "taskId": task_id} token = None elapsed, step = 0, 3 while elapsed < timeout: time.sleep(step) elapsed += step res = requests.post( "https://api.capsolver.com/getTaskResult", json=poll_payload, timeout=15 ).json() if res.get("status") == "ready": token = res["solution"]["token"] break if res.get("status") != "processing": print("[CapSolver] getTaskResult 异常:", res) return False if not token: print("[CapSolver] 超时未取到 token") return False page.evaluate( """(tk) => { const ta = document.querySelector('textarea[name="cf-turnstile-response"]'); if (ta) ta.value = tk; if (window.turnstileCallback) try { window.turnstileCallback(tk); } catch(e){} }""", token ) page.wait_for_timeout(1500) return True def require_login(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): self.ensure_login() return func(self, *args, **kwargs) return wrapper class DailymotionClient: url = "https://faq.dailymotion.com/hc/en-us/requests/new" EMAIL = "copyright@qiyi.com" PASSWORD = "ppsIQIYI2018@" def __init__(self, headless: bool = True): self.email = self.EMAIL self.password = self.PASSWORD self.headless = headless self.check_interval = 60 * 60 self._pw = sync_playwright().start() self.browser: Browser = self._pw.chromium.launch( headless=self.headless, # proxy={'server':'http://127.0.0.1:7890'} ) self.context = self.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36", locale="en-US", viewport={"width": 1280, "height": 800}, timezone_id="Asia/Shanghai", permissions=[], ) self.context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") self.page: Page = self.context.new_page() self._last_check_ts = 0 self._last_check_result = False os.makedirs('screenshots', exist_ok=True) self.page.goto(self.url) def _do_login(self) -> None: self.page.goto(self.url, timeout=30000) self.page.wait_for_load_state("networkidle", timeout=30000) file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png' self.page.screenshot(path=file_path) if self.page.query_selector("div.cf-turnstile[data-sitekey]"): ok = solve_turnstile_capsolver(self.page) if not ok: raise RuntimeError("CapSolver 处理 Turnstile 失败") logbtn = self.page.locator("//a[@class='login button']") if logbtn.count() > 0: logbtn.nth(0).click() self.page.wait_for_selector("//input[@data-testid=\"emailInput\"]") # “我了解”弹窗 i_now_btn = self.page.locator("button:has-text(\"I understand\")") if i_now_btn.count() > 0: i_now_btn.click() # 输入账号密码 email_edit = self.page.locator("//input[@data-testid=\"emailInput\"]") password_edit = self.page.locator("//input[@data-testid=\"passwordInput\"]") if email_edit.count(): email_edit.fill(self.email) if password_edit.count(): password_edit.fill(self.password) # 登录 login_btn = self.page.locator('button[form="signin-form"][type="submit"]') try: self.page.wait_for_selector( 'button[form="signin-form"][type="submit"]:not([disabled])', timeout=20000 ) except PlaywrightTimeoutError: pass login_btn.click() # 等待跳回 self.page.wait_for_url(self.url, timeout=30000) time.sleep(1) self._last_check_ts = time.time() self._last_check_result = True def _detect_login(self) -> bool: self.page.goto(self.url, timeout=30000) self.page.wait_for_load_state("networkidle", timeout=30000) return self.page.locator("//a[@class='login button']").count() == 0 def is_logged_in(self) -> bool: now = time.time() if now - self._last_check_ts < self.check_interval: return self._last_check_result try: ok = self._detect_login() except Exception: ok = False self._last_check_ts = now self._last_check_result = ok return ok def ensure_login(self) -> None: if not self.is_logged_in(): self._do_login() @require_login def process_ticket(self, title, link): description = """We request that you take immediate actionto stop the infringing activity, take steps to ensure that iQIYI Content is notre-posted on, re-linked to, or otherwise available through your site. Pleaseinform us of the actions you have taken and their results. 1) please help remove these videos 2) The drama series titles are {} """.format(title) if self.page.query_selector("div.cf-turnstile[data-sitekey]"): ok = solve_turnstile_capsolver(self.page) if not ok: raise RuntimeError("CapSolver 处理 Turnstile 失败") file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png' self.page.screenshot(path=file_path) resports = self.page.locator('li.blocks-item:nth-child(8)') resports.click() time.sleep(2) cc = self.page.locator("input#request_collaborators_") cc.scroll_into_view_if_needed() cc.click() cc.type("duke.chen@dailymotion.com") self.page.get_by_role("button", name="Copyright infringement").click() time.sleep(1) self.page.get_by_role("button", name="Notification").nth(0).click() time.sleep(1) self.page.get_by_role("button", name="A legal entity").click() time.sleep(1) self.page.get_by_label("Corporate name").fill("Beijing iQIYI Science & Technology Co.,Ltd") time.sleep(1) self.page.get_by_label("Legal status").fill("Legal Department") time.sleep(1) self.page.get_by_label("Subject").fill("Copyright infringement Notification") time.sleep(1) self.page.get_by_label("Please indicate the URL of the video(s) you would like to report*").fill(link) time.sleep(1) self.page.get_by_label("Description").nth(1).fill(description) time.sleep(1) self.page.get_by_label("I state in good faith", exact=False).check() time.sleep(1) self.page.get_by_label("I state in good faith that the use of the Protected", exact=False).check() time.sleep(1) self.page.get_by_role("checkbox", name="I certify that all information provided", exact=False).check() time.sleep(1) self.page.get_by_role("checkbox", name="I acknowledge that my statements", exact=False).check() time.sleep(1) self.page.get_by_role("checkbox", name="The data provided through this form", exact=False).check() time.sleep(1) self.page.get_by_role("checkbox", name="By submitting the present form,", exact=False).check() time.sleep(1) self.page.get_by_role("button", name="Submit").click() time.sleep(2) file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png' self.page.screenshot(path=file_path) if self.page.url != self.url: self.page.goto(self.url, timeout=30000) return file_path def close(self): try: self.page.close() except Exception: pass try: self.browser.close() except Exception: pass try: self._pw.stop() except Exception: pass if __name__ == "__main__": dm = DailymotionClient() dm.process_ticket()