DailyMotion/report_video.py

416 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import functools
import os
import re
from datetime import datetime
from sys import platform
import requests
from logger import logger
from playwright.sync_api import (
sync_playwright,
TimeoutError as PlaywrightTimeoutError,
Page,
Browser,
)
def solve_turnstile_capsolver(page: Page,
timeout: int = 120) -> bool:
"""
使用 CapSolver 自动完成当前 Page 上的 Cloudflare Turnstile。
成功返回 True失败/超时返回 False。
"""
cap_key = "CAP-A76C932D4C6CCB3CA748F77FDC07D996"
widget = page.query_selector("div.cf-turnstile[data-sitekey]")
if not widget:
return False
sitekey = widget.get_attribute("data-sitekey")
page_url = page.url
create_payload = {
"clientKey": cap_key,
"task": {
"type": "TurnstileTaskProxyLess",
"websiteURL": page_url,
"websiteKey": sitekey
}
}
create_resp = requests.post(
"https://api.capsolver.com/createTask",
json=create_payload, timeout=20
).json()
if create_resp.get("errorId"):
print("[CapSolver] createTask 失败:", create_resp)
return False
task_id = create_resp["taskId"]
poll_payload = {"clientKey": cap_key, "taskId": task_id}
token = None
elapsed, step = 0, 3
while elapsed < timeout:
time.sleep(step)
elapsed += step
res = requests.post(
"https://api.capsolver.com/getTaskResult",
json=poll_payload, timeout=15
).json()
if res.get("status") == "ready":
token = res["solution"]["token"]
break
if res.get("status") != "processing":
print("[CapSolver] getTaskResult 异常:", res)
return False
if not token:
print("[CapSolver] 超时未取到 token")
return False
page.evaluate(
"""(tk) => {
const ta = document.querySelector('textarea[name="cf-turnstile-response"]');
if (ta) ta.value = tk;
if (window.turnstileCallback)
try { window.turnstileCallback(tk); } catch(e){}
}""",
token
)
page.wait_for_timeout(1500)
return True
def require_login(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
self.ensure_login()
return func(self, *args, **kwargs)
return wrapper
class DailymotionClient:
url = "https://faq.dailymotion.com/hc/en-us/requests/new"
EMAIL = "copyright@qiyi.com"
PASSWORD = "ppsIQIYI2018@"
def __init__(self,email, password, headless: bool = None):
self.email = email
self.password = password
self.headless = headless
self.check_interval = 60 * 60
if self.headless is None:
self.headless = platform == "linux" or platform == "linux2"
if self.headless:
proxy = None
self.file_path = "/opt/ql/DailyMotion/oss/LOA.pdf"
self.file_path2 = "/opt/ql/DailyMotion/oss/BAZTSJT.pdf"
else:
proxy={'server': 'http://127.0.0.1:7890'}
self.file_path = "./oss/LOA.pdf"
self.file_path2 = "./oss/BAZTSJT.pdf"
logger.info(f"Launching DailymotionClient with headless={self.headless}, proxy={proxy}")
self._pw = sync_playwright().start()
self.browser: Browser = self._pw.chromium.launch(
headless=self.headless,
proxy=proxy,
)
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36",
locale="en-US",
viewport={"width": 1280, "height": 800},
timezone_id="Asia/Shanghai",
permissions=[],
)
self.context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self.page: Page = self.context.new_page()
self._last_check_ts = 0
self._last_check_result = False
os.makedirs('screenshots', exist_ok=True)
self.page.goto(self.url)
def _do_login(self) -> None:
self.page.goto(self.url, timeout=30000)
# self.page.wait_for_load_state("networkidle", timeout=30000)
self.page.wait_for_timeout(3000)
file_path = f'screenshots/{str(int(time.time()))}.png'
self.page.screenshot(path=file_path)
if self.page.query_selector("div.cf-turnstile[data-sitekey]"):
ok = solve_turnstile_capsolver(self.page)
if not ok:
raise RuntimeError("CapSolver 处理 Turnstile 失败")
logbtn = self.page.locator("//a[@class='login button']")
if logbtn.count() > 0:
logbtn.nth(0).click()
self.page.wait_for_selector("//input[@data-testid=\"emailInput\"]")
# “我了解”弹窗
i_now_btn = self.page.locator("button:has-text(\"I understand\")")
if i_now_btn.count() > 0:
i_now_btn.click()
# 输入账号密码
email_edit = self.page.locator("//input[@data-testid=\"emailInput\"]")
password_edit = self.page.locator("//input[@data-testid=\"passwordInput\"]")
if email_edit.count():
email_edit.fill(self.email)
if password_edit.count():
password_edit.fill(self.password)
# 登录
login_btn = self.page.locator('button[form="signin-form"][type="submit"]')
try:
self.page.wait_for_selector(
'button[form="signin-form"][type="submit"]:not([disabled])', timeout=20000
)
except PlaywrightTimeoutError:
pass
login_btn.click()
# 等待跳回
self.page.wait_for_url(self.url, timeout=30000)
time.sleep(1)
self._last_check_ts = time.time()
self._last_check_result = True
def _detect_login(self) -> bool:
self.page.goto(self.url, timeout=30000)
self.page.wait_for_timeout(3000)
return self.page.locator("//a[@class='login button']").count() == 0
def is_logged_in(self) -> bool:
now = time.time()
if now - self._last_check_ts < self.check_interval:
return self._last_check_result
try:
ok = self._detect_login()
except Exception:
ok = False
self._last_check_ts = now
self._last_check_result = ok
return ok
def ensure_login(self) -> None:
if not self.is_logged_in():
self._do_login()
@require_login
def process_ticket(self, lis: list):
titles = "\r\n"
links = ""
ids= []
title = ""
link = ""
assignment = True
for li in lis:
if assignment:
title = li['name_title']
link = li['link']
assignment = False
ids.append(li['id'])
titles += li['name_title'] + ",\r\n"
links += li['link'] + ",\r\n"
logger.info(f"Processing ticket for title: {titles}, link: {links}")
self.page.goto(self.url, timeout=3000)
titles_list = [title.strip() for title in titles.split(',')]
unique_titles = list(set(titles_list))
unique_titles.sort()
titles =",".join(unique_titles) # 去重
description = """We request that you take immediate actionto stop the infringing activity, take steps to ensure that iQIYI Content is notre-posted on, re-linked to, or otherwise available through your site. Pleaseinform us of the actions you have taken and their results.
1) please help remove these videos
2) The drama series titles are {}
""".format(titles)
# likls = ["\"" + l + "\"" for l in link]
# links = ','.join(likls)
if self.page.query_selector("div.cf-turnstile[data-sitekey]"):
ok = solve_turnstile_capsolver(self.page)
if not ok:
raise RuntimeError("CapSolver 处理 Turnstile 失败")
# file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png'
# self.page.screenshot(path=file_path)
resports = self.page.locator('li.blocks-item:nth-child(8)')
resports.click()
time.sleep(2)
cc = self.page.locator("input#request_collaborators_")
cc.scroll_into_view_if_needed()
cc.click()
cc.type("duke.chen@dailymotion.com")
self.page.get_by_role("button", name="Copyright infringement").click()
time.sleep(1)
self.page.get_by_role("button", name="Notification").nth(0).click()
time.sleep(1)
self.page.get_by_role("button", name="A legal entity").click()
time.sleep(1)
self.page.get_by_label("Corporate name").fill("Beijing iQIYI Science & Technology Co.,Ltd")
time.sleep(1)
self.page.get_by_label("Legal status").fill("Legal Department")
time.sleep(1)
self.page.get_by_label("Subject").fill("Copyright infringement Notification")
time.sleep(1)
self.page.get_by_label("Please indicate the URL of the video(s) you would like to report*").fill(links)
time.sleep(1)
self.page.get_by_label("Description").nth(1).fill(description)
time.sleep(1)
self.page.get_by_label("I state in good faith", exact=False).check()
time.sleep(1)
self.page.get_by_label("I state in good faith that the use of the Protected", exact=False).check()
time.sleep(1)
self.page.get_by_role("checkbox", name="I certify that all information provided", exact=False).check()
time.sleep(1)
self.page.get_by_role("checkbox", name="I acknowledge that my statements", exact=False).check()
time.sleep(1)
self.page.get_by_role("checkbox", name="The data provided through this form", exact=False).check()
time.sleep(1)
self.page.get_by_role("checkbox", name="By submitting the present form,", exact=False).check()
time.sleep(1)
self.page.set_input_files('input#request-attachments', [
self.file_path,
self.file_path2
])
self.page.wait_for_timeout(8000)
self.page.get_by_role("button", name="Submit").click()
time.sleep(2)
file_path = f'screenshots/{str(int(time.time()))}_{title}_{link.split("/")[-1]}.png'
locator = self.page.locator("//dt[normalize-space(.)='Id']/following-sibling::dd[1]")
raw_text = locator.text_content()
match = re.search(r'\d+', raw_text or '')
report_id = match.group() if match else None
status_raw = self.page.locator("span.status-label").text_content()
subsequent_status = status_raw.strip().lower() if status_raw else None
time_elem = self.page.locator("dt", has_text="Created").locator("xpath=following-sibling::dd[1]/time")
datetime_str = time_elem.get_attribute("datetime") # e.g. 2025-06-12T06:15:33+00:00
if datetime_str:
dt = datetime.fromisoformat(datetime_str.replace("Z", "+00:00")) # 安全处理 ISO 时间
timestamp = int(dt.timestamp())
else:
timestamp = None
self.page.screenshot(path=file_path)
if self.page.url != self.url:
self.page.goto(self.url, timeout=30000)
return ids, file_path, report_id, subsequent_status, timestamp
@require_login
def report_follow_up(self, report_id: str):
max_retries = 3
retry_delay = 2
loaded = False
subsequent_status = ""
for attempt in range(max_retries):
try:
self.page.goto(f"https://faq.dailymotion.com/hc/en-us/requests/{report_id}", timeout=30000)
# self.page.wait_for_load_state("networkidle") # 保证页面加载稳定
self.page.wait_for_selector("span.status-label", timeout=30000)
try:
status_raw = self.page.locator("span.status-label").text_content()
except Exception as e:
print(f"[警告] 获取状态标签失败: {e}")
status_raw = None
subsequent_status = status_raw.strip().lower() if status_raw else None
loaded = True
break
except Exception as e:
print(f"[ERROR] 尝试 {attempt + 1}/{max_retries} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
if not loaded:
return 1, "页面加载失败"
txt = (
"I am the authorized agent of Beijing iQIYI Technology Co., Ltd., responsible for dealing with "
"unauthorized overseas distribution of pirated videos of our works. "
"We have confirmed that the above links contain infringing content and we insist on requesting to takedown. Thank you!"
)
if "awaiting your reply" in subsequent_status:
span_show = self.page.locator('span.comment-show-container-content')
if span_show.count() > 0:
span_show.nth(0).click()
self.page.wait_for_timeout(1000)
textarea = self.page.locator('#request_comment_body')
textarea.type(txt, delay=30)
self.page.wait_for_timeout(1000)
self.page.get_by_role("button", name="Submit").click()
success = self.wait_for_selector_safe("span.status-label", timeout=30000, retries=3)
if not success:
return 1, "提交后未检测到状态更新"
span_show = self.page.locator('span.comment-show-container-content')
if span_show.count() > 0:
span_show.nth(0).click()
pic_path = f'screenshots/{str(int(time.time()))}_{report_id}.png'
self.page.screenshot(path=pic_path)
return 1, pic_path
elif "open" in subsequent_status:
return 1, ""
elif "solved" in subsequent_status:
return 2, ""
return 0, "未知状态"
def wait_for_selector_safe(self, selector: str, timeout=30000, retries=3, retry_delay=2):
for i in range(retries):
try:
self.page.wait_for_selector(selector, timeout=timeout)
return True
except Exception as e:
print(f"[重试] 第 {i + 1}/{retries} 次等待 {selector} 失败: {e}")
if i < retries - 1:
time.sleep(retry_delay)
return False
@require_login
def test(self):
logger.info(f"Testing DailymotionClient with email: {self.email}")
self.page.goto(self.url, timeout=30000)
file_path = f'screenshots/{str(int(time.time()))}_test.png'
self.page.screenshot(path=file_path)
self.page.wait_for_timeout(1000)
file_path = f"screenshots/{str(int(time.time()))}_test2.png"
self.page.screenshot(path=file_path)
logger.info(f"Test screenshot saved to {file_path}")
self.page.wait_for_timeout(1000)
file_path = f"screenshots/{str(int(time.time()))}_test3.png"
self.page.screenshot(path=file_path)
logger.info(f"Test screenshot saved to {file_path}")
def close(self):
try:
self.page.close()
except Exception:
pass
try:
self.browser.close()
except Exception:
pass
try:
self._pw.stop()
except Exception:
pass
# if __name__ == "__main__":
# dm = DailymotionClient("zhongpankeji@qq.com", "1q2w3eZp123@#")
# # dm.process_ticket("恋爱学园","https://www.dailymotion.com/video/x9lfr24")
# # dm.report_follow_up("2990081")