import os import argparse import time from playwright.sync_api import sync_playwright import logging logging.basicConfig(level=logging.INFO) # 日志会输出到 stderr,不会污染 stdout logging.info("服务器启动") def save_login_state(auth_file, keyword, display=0): with sync_playwright() as p: os.makedirs(os.path.dirname(auth_file), exist_ok=True) # headless=False 表示显示浏览器窗口,方便用户扫码登录 browser = p.chromium.launch(headless=False,env={"DISPLAY": display}) context = browser.new_context() page = context.new_page() login_success = False def on_response(response): nonlocal login_success if '/login_jump' in response.url: if response.status == 200: logging.info(f"[网络监听] 检测到登录API响应成功") logging.info(f" - 请求地址: {response.url}") logging.info(f" - 状态码: {response.status}") login_success = True def on_url_change(frame): nonlocal login_success current_url = page.url if 'home/index?sid' in current_url: logging.info(f"[URL监听] 检测到页面已跳转") logging.info(f" - 当前URL: {current_url}") login_success = True # 其他可用的事件:'request'(请求发送时)、'requestfailed'(请求失败时)、'requestfinished'(请求完成时) page.on('response', on_response) page.on('framenavigated', on_url_change) page.goto('https://mail.qq.com') try: # 毫秒单位,300000ms = 300秒 = 5分钟 page.wait_for_function(f'document.body.innerText.includes("{keyword}")', timeout=300000) logging.info(f"[文本监听] 检测到'{keyword}'文字,登录确认成功!") except Exception as e: logging.warning(f"[警告] 未检测到'{keyword}'文字,可能页面结构有变化或登录超时") logging.error(f" 错误信息: {e}") if login_success: logging.info("已确认登录成功!") else: logging.warning("\n⚠️ 未明确检测到登录成功标志,但仍将保存当前状态") logging.info(" 如果登录成功,状态应该是有效的") page.wait_for_timeout(5000) # 2000毫秒 = 2秒 context.storage_state(path=auth_file) logging.info(f"✅ 登录状态已保存到: {auth_file}") browser.close() logging.info("浏览器已关闭") # 1. 从 Playwright 保存的 auth.json 中提取 cookies def extract_cookies_from_auth(auth_file): """ 从 Playwright 的 auth.json 提取 cookies Args: auth_file: auth.json 文件路径 Returns: dict: requests 可用的 cookies 字典 """ with open(auth_file, 'r', encoding='utf-8') as f: auth_data = json.load(f) cookies = {} for cookie in auth_data.get('cookies', []): # Playwright 保存的每个 cookie 包含 name 和 value cookie_name = cookie['name'] cookie_value = cookie['value'] cookies[cookie_name] = cookie_value return cookies def crawl_with_saved_state(auth_file,display=0): with sync_playwright() as p: # 加载之前保存的登录状态 logging.info(f"加载认证文件: {auth_file}") browser = p.chromium.launch(headless=False,env={"DISPLAY": display}) # 可以无头模式了 context = browser.new_context(storage_state=auth_file) page = context.new_page() page.goto('https://mail.qq.com') logging.info(f"当前URL: {page.url}") page.wait_for_load_state('networkidle') page.wait_for_timeout(2000) # 等待2秒 # 可以使用 BeautifulSoup 解析 # from bs4 import BeautifulSoup # soup = BeautifulSoup(content, 'html.parser') items = page.locator('.mail-subject').all() for item in items: logging.info(f"邮件标题: {item.text_content()}") # page.get_by_title("mail-subject mail-unread").click() page.wait_for_timeout(20000) browser.close() def start(account, display=0): logging.info(f"用户名{account}") if not account: logging.error("请输入用户名") exit(1) auth_file_path = "./auth/mail" file_path = f"{auth_file_path}/{account}.json" logging.info(f"认证文件路径: {file_path}") if os.path.exists(file_path): # 检查认证文件是否超过48小时 file_age_hours = (time.time() - os.path.getmtime(file_path)) / 3600 if file_age_hours > 48: logging.warning("认证文件已过期,请重新登录") save_login_state(file_path, "收件箱", display) else: logging.info("检测到认证文件,尝试使用保存的登录状态进行爬取") crawl_with_saved_state(file_path,display) else: logging.info("未检测到认证文件,启动登录流程") save_login_state(file_path, "收件箱",display) if __name__ == '__main__': start("123", display=0)