Files
douyin_cookie_yunsya/app.py
T
travel 853dacf528 @
性能优化 & QR 修复:消除卡顿,二维码提取重构

【速度优化】
- 页面加载:wait_until 从 domcontentloaded 改为 commit(更快)
- 等待时间大幅缩减:主页加载 2-4s→1-2s,点击后 3-5s→1-2s
  QR切换等待 3s→1.5s,弹窗检测等待 2s→1s,轮询间隔 800ms→500ms
- 代理测试超时从 10s 降为 5s,get_proxy_from_api 支持可配置超时

【二维码修复 — 3 级策略】
- 策略 1:提取真实 QR <img> 的 src(data URI 直接解码 / CDN URL 下载)
- 策略 2:截取 QR 元素本身(仅二维码区域,非整个弹窗)
- 策略 3:截图弹窗/全屏兜底
→ 解决二维码显示异常(之前是整个登录弹窗截图,包含大量无关 UI)

【前端瘦身 — 消除外部 CDN 阻塞】
- 移除 highlight.js(~100KB)& font-awesome(~90KB)
- 全部图标改用 Unicode/Emoji,轻量 CSS spinner 替代 fa-spinner
- 轮询频率优化:status 2s→3s,check_login 2s→2.5s
- 首页仅 37KB,零外部依赖,即时渲染

Co-Authored-By: Claude <noreply@anthropic.com>
@
2026-06-25 07:51:00 +08:00

550 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import base64
import json
import os
import time
import logging
import threading
import random
from logging.handlers import RotatingFileHandler
from io import BytesIO
from PIL import Image
import requests
from flask import Flask, render_template, request, jsonify, send_file
from playwright.async_api import async_playwright
# 配置日志 — 优先使用环境变量,否则使用项目相对路径
LOG_DIR = os.environ.get('LOG_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs'))
os.makedirs(LOG_DIR, exist_ok=True)
handler = RotatingFileHandler(
f'{LOG_DIR}/app.log',
maxBytes=1024*1024*10,
backupCount=5,
encoding="utf-8"
)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app = Flask(__name__)
# 持久化 secret_key:优先使用环境变量,否则从文件读取,都不存在则生成并保存
_secret_key_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.secret_key')
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key and os.path.exists(_secret_key_path):
with open(_secret_key_path, 'rb') as _f:
_secret_key = _f.read()
if not _secret_key:
_secret_key = os.urandom(24)
with open(_secret_key_path, 'wb') as _f:
_f.write(_secret_key)
app.secret_key = _secret_key
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
# 静态文件目录 — 优先使用环境变量,否则使用项目相对路径
STATIC_DIR = os.environ.get('STATIC_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static'))
os.makedirs(STATIC_DIR, exist_ok=True)
# 全局状态
class LoginSession:
def __init__(self):
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.login_page = None
self.status = "idle"
self.cookies = None
self.message = ""
self.start_time = 0
self.proxy_used = None
self.lock = threading.Lock()
login_session = LoginSession()
# 每线程独立 event loop,避免 Flask 多线程下的竞态问题
_thread_local = threading.local()
def run_async_sync(coro):
"""在独立线程安全的 event loop 中运行异步协程"""
loop = getattr(_thread_local, 'loop', None)
if loop is None or loop.is_closed():
loop = asyncio.new_event_loop()
_thread_local.loop = loop
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
def get_proxy_from_api(api_url, max_retries=3, timeout=10):
if not api_url:
return None
last_error = None
for attempt in range(max_retries):
try:
resp = requests.get(api_url, timeout=timeout)
resp.raise_for_status()
content = resp.text.strip()
if not content:
raise ValueError("代理 API 返回空内容")
try:
data = resp.json()
if "ip" in data and "port" in data:
proxy_str = f"{data['ip']}:{data['port']}"
else:
proxy_str = content
except json.JSONDecodeError:
proxy_str = content
if not proxy_str.startswith(("http://", "https://")):
proxy_str = f"http://{proxy_str}"
return {"server": proxy_str}
except Exception as e:
last_error = e
if attempt < max_retries - 1:
wait = 2 ** attempt # 指数退避: 1s, 2s, 4s
app.logger.warning(f"获取代理失败 (第{attempt+1}次): {e}{wait}s 后重试...")
time.sleep(wait)
app.logger.error(f"获取代理失败,已重试{max_retries}次: {last_error}")
return None
async def cleanup_session():
"""逐一清理浏览器资源,每步独立容错,防止单点失败导致资源泄漏"""
resources = []
if login_session.login_page:
resources.append(("login_page", login_session.login_page))
if login_session.page:
resources.append(("page", login_session.page))
if login_session.context:
resources.append(("context", login_session.context))
if login_session.browser:
resources.append(("browser", login_session.browser))
if login_session.playwright:
resources.append(("playwright", login_session.playwright))
for name, resource in resources:
try:
if name == "playwright":
await resource.stop()
else:
await resource.close()
except Exception as e:
app.logger.warning(f"清理 {name} 异常: {e}")
login_session.playwright = None
login_session.browser = None
login_session.context = None
login_session.page = None
login_session.login_page = None
login_session.status = "idle"
login_session.message = ""
async def inject_stealth(context):
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', { get: () => [] });
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','zh','en']});
""")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/qrcode.png')
def get_qrcode():
qr_path = os.path.join(STATIC_DIR, 'qrcode.png')
if os.path.exists(qr_path):
return send_file(qr_path, mimetype='image/png')
return "二维码未生成,请先点击获取二维码", 404
@app.route('/api/start_qr', methods=['POST'])
def start_qr():
if not login_session.lock.acquire(blocking=False):
return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"})
try:
data = request.get_json() or {}
proxy_api = data.get('proxy_api', '').strip()
result = run_async_sync(_start_qr(proxy_api))
return result
finally:
login_session.lock.release()
async def _start_qr(proxy_api):
await cleanup_session()
login_session.status = "loading"
login_session.message = "正在启动浏览器..."
login_session.proxy_used = proxy_api if proxy_api else "默认IP"
try:
login_session.playwright = await async_playwright().start()
launch_options = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--font-render-hinting=medium"
]
}
proxy = get_proxy_from_api(proxy_api) if proxy_api else None
if proxy:
launch_options["proxy"] = proxy
app.logger.info(f"使用代理: {proxy['server']}")
else:
app.logger.info("使用默认 IP")
login_session.browser = await login_session.playwright.chromium.launch(**launch_options)
login_session.context = await login_session.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 720},
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
await inject_stealth(login_session.context)
login_session.page = await login_session.context.new_page()
async def handle_new_page(new_p):
app.logger.info("检测到新开登录页面")
login_session.login_page = new_p
await new_p.wait_for_load_state("domcontentloaded")
login_session.context.on("page", handle_new_page)
app.logger.info("访问抖音主页")
await login_session.page.goto("https://www.douyin.com/", wait_until="commit", timeout=20000)
await login_session.page.wait_for_timeout(random.uniform(1000, 2000))
page_target = login_session.page
login_btn = None
try:
login_btn = await page_target.wait_for_selector(
'button:has-text("登录"), a:has-text("登录"), div:has-text("登录")',
timeout=8000
)
except:
pass
if login_btn:
await login_btn.click()
app.logger.info("点击登录按钮成功")
else:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent.trim()==="登录" && el.click) {el.click(); return true;}
}
return false;
}''')
app.logger.info("JS兜底点击登录")
await login_session.page.wait_for_timeout(random.uniform(1000, 2000))
if login_session.login_page is not None:
page_target = login_session.login_page
app.logger.info("切换到新登录标签页")
await page_target.wait_for_load_state("domcontentloaded")
await page_target.wait_for_timeout(1000)
app.logger.info("等待登录弹窗...")
try:
await page_target.wait_for_selector(
'div[role="dialog"], .auth-modal, .login-box, [class*="login"], [class*="modal"]',
timeout=10000
)
except:
pass
await page_target.wait_for_timeout(1000)
# 强制点击“二维码登录”
app.logger.info("尝试切换到二维码登录...")
for attempt in range(3):
try:
qr_tab = await page_target.wait_for_selector(
'text=二维码登录, [data-e2e="qrcode-tab"], div:has-text("二维码登录")',
timeout=5000
)
if qr_tab:
await qr_tab.click()
app.logger.info(f"✅ 点击二维码登录选项卡 (第{attempt+1}次)")
await page_target.wait_for_timeout(1500)
break
except Exception as e:
app.logger.warning(f"点击二维码登录失败 {attempt+1}: {e}")
try:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent && el.textContent.includes("二维码登录") && el.click){
el.click();
return true;
}
}
return false;
}''')
app.logger.info("JS点击二维码登录")
await page_target.wait_for_timeout(1500)
break
except:
pass
await page_target.wait_for_timeout(1000)
app.logger.info("开始查找二维码...")
qr_img = None
max_wait = 60
start_time = time.time()
qr_selectors = [
'div[role="dialog"] img[src*="qrcode"]',
'div[role="dialog"] img[src*="passport"]',
'.auth-modal img[src*="qrcode"]',
'.login-box img[src*="qrcode"]',
'img[src*="qrcode"]',
'img[src*="passport"]',
'img[alt*="二维码"]',
'img[class*="qrcode"]',
'.qrcode-box img',
'.login-qrcode-image img',
'div[data-e2e="qrcode"] img',
'div[class*="qr"] img',
'canvas[class*="qrcode"]',
'svg[class*="qrcode"]'
]
# 前 3 次等待 DOM 稳定,后续用 500ms 快速轮询
iteration = 0
while time.time() - start_time < max_wait:
iteration += 1
if iteration <= 3:
await page_target.wait_for_load_state("domcontentloaded")
else:
await page_target.wait_for_timeout(500)
for sel in qr_selectors:
try:
elem = await page_target.query_selector(sel)
if elem:
if 'img' in sel:
src = await elem.get_attribute('src')
if src and len(src) > 10:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
else:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
except:
continue
if qr_img:
break
if int(time.time() - start_time) % 5 == 0:
app.logger.info(f"等待二维码... {int(time.time() - start_time)}s")
# ---------- 核心优化:优先提取真实 QR 图片,而非截图 ----------
if qr_img:
try:
await qr_img.wait_for_element_state("visible", timeout=3000)
except:
pass
img_bytes = None
# 策略 1:直接获取 img 元素的 src 数据(data URI 或 CDN 图片)
if 'img' in (qr_img._impl_obj._selector if hasattr(qr_img, '_impl_obj') else ''):
pass # skip complex check, just try
try:
src = await qr_img.get_attribute('src')
if src:
if src.startswith('data:image/'):
# data URI — 直接解码
app.logger.info("✅ 提取到 data URI 二维码")
header, b64 = src.split(',', 1)
img_bytes = base64.b64decode(b64)
elif src.startswith('http'):
# CDN URL — 下载原图
app.logger.info(f"✅ 下载 QR 原图: {src[:80]}...")
try:
resp = requests.get(src, timeout=10,
headers={'Referer': 'https://www.douyin.com/'})
if resp.status_code == 200 and len(resp.content) > 100:
img_bytes = resp.content
app.logger.info(f"✅ QR 原图下载成功 ({len(img_bytes)} bytes)")
except Exception as e:
app.logger.warning(f"下载 QR 原图失败: {e}")
except Exception as e:
app.logger.warning(f"获取 QR src 失败: {e}")
# 策略 2:截取二维码元素本身(仅 QR 图片区域,非整个弹窗)
if not img_bytes:
try:
img_bytes = await qr_img.screenshot()
app.logger.info(f"✅ 截取 QR 元素区域 ({len(img_bytes)} bytes)")
except Exception as e:
app.logger.warning(f"截取 QR 元素失败: {e}")
# 策略 3:截取登录弹窗 / 全屏(最后的兜底)
if not img_bytes:
try:
dialog = await page_target.query_selector(
'div[role="dialog"], .auth-modal, .login-box, [class*="modal"]')
if dialog:
img_bytes = await dialog.screenshot()
app.logger.info("⚠ 使用弹窗截图兜底")
else:
img_bytes = await page_target.screenshot(full_page=True)
app.logger.info("⚠ 使用全屏截图兜底")
except Exception as e:
app.logger.warning(f"弹窗截图失败: {e}")
img_bytes = await page_target.screenshot(full_page=True)
# -------------------------------------
# 保存到 static
qr_file_path = os.path.join(STATIC_DIR, 'qrcode.png')
with open(qr_file_path, 'wb') as f:
f.write(img_bytes)
app.logger.info(f"二维码已保存至 {qr_file_path} ({len(img_bytes)} bytes)")
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "请使用抖音 App 扫码"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used,
"qr_download": "/qrcode.png"
})
else:
app.logger.error(f"{max_wait}秒未检测到二维码,返回全屏截图")
img_bytes = await page_target.screenshot(full_page=True)
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "未检测到二维码,全屏截图请查看"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used
})
except Exception as e:
app.logger.error(f"启动扫码失败: {e}", exc_info=True)
login_session.status = "error"
login_session.message = str(e)
await cleanup_session()
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/check_login', methods=['GET'])
def check_login():
if login_session.status in ["idle", "loading"]:
return jsonify({"status": login_session.status, "message": login_session.message})
if login_session.status == "error":
return jsonify({"status": "error", "message": login_session.message})
if time.time() - login_session.start_time > 300:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "登录超时,请重新发起扫码"})
try:
if not login_session.context:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "会话已失效"})
cookies = run_async_sync(login_session.context.cookies())
app.logger.info(f"当前Cookie数量: {len(cookies)}")
cookie_names = [c['name'] for c in cookies]
need_keys = {"sessionid_ss", "sessionid", "sid_guard", "uid_tt", "uid_tt_ss"}
# 至少匹配 2 个关键字段才算登录成功,降低误判概率
matched = sum(1 for k in need_keys if k in cookie_names)
has_valid = matched >= 2
if has_valid:
login_session.cookies = cookies
login_session.status = "success"
login_session.message = "登录成功"
app.logger.info(f"✅ 登录成功,获取有效Cookie {len(cookies)}个")
run_async_sync(cleanup_session())
return jsonify({
"status": "success",
"cookies": cookies,
"message": "Cookie 提取成功"
})
else:
return jsonify({"status": "scanning", "message": "等待手机抖音确认登录..."})
except Exception as e:
app.logger.error(f"检查登录异常: {e}", exc_info=True)
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/test_proxy', methods=['POST'])
def test_proxy():
data = request.get_json()
proxy_api = data.get('proxy_api', '').strip()
if not proxy_api:
return jsonify({"status": "error", "message": "请提供代理 API"})
try:
proxy = get_proxy_from_api(proxy_api, timeout=5)
if not proxy:
return jsonify({"status": "error", "message": "获取代理失败"})
proxies = {
'http': proxy['server'],
'https': proxy['server']
}
resp = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10)
if resp.status_code == 200:
data = resp.json()
return jsonify({
"status": "success",
"ip": data.get('origin', 'unknown'),
"message": "代理可用"
})
else:
return jsonify({"status": "error", "message": f"代理响应异常: {resp.status_code}"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/status', methods=['GET'])
def api_status():
"""返回当前会话状态,供前端轮询"""
elapsed = ""
if login_session.status in ("qr_ready", "scanning", "success") and login_session.start_time:
elapsed_sec = int(time.time() - login_session.start_time)
elapsed = f" (已等待 {elapsed_sec}s)"
return jsonify({
"status": login_session.status,
"message": login_session.message + elapsed,
"proxy_used": login_session.proxy_used
})
@app.route('/api/reset', methods=['POST'])
def api_reset():
"""强制重置当前会话"""
try:
run_async_sync(cleanup_session())
except Exception as e:
app.logger.warning(f"重置会话异常: {e}")
login_session.status = "idle"
login_session.message = "会话已重置"
login_session.cookies = None
login_session.start_time = 0
login_session.proxy_used = None
return jsonify({"status": "success", "message": "会话已重置"})
@app.route('/api/health', methods=['GET'])
def api_health():
"""健康检查端点,用于生产环境监控"""
return jsonify({
"status": "ok",
"session_status": login_session.status,
"timestamp": time.time()
})
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=5001)