Files
douyin_cookie_yunsya/app.py
T
root f9fb8478d6 @
修复:路径可移植性、密钥持久化、补充缺失API

- 硬编码路径改为环境变量 + 项目相对路径
- secret_key 持久化到 .secret_key 文件,避免重启失效
- 新增 /api/status 路由,支持前端轮询状态
- 新增 /api/reset 路由,支持前端重置会话
- .gitignore 添加 .secret_key 排除
@
2026-06-25 06:57:25 +08:00

474 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import base64
import json
import os
import time
import logging
import threading
import random
from logging.handlers import RotatingFileHandler
from io import BytesIO
from PIL import Image
import requests
from flask import Flask, render_template, request, jsonify, send_file
from playwright.async_api import async_playwright
# 配置日志 — 优先使用环境变量,否则使用项目相对路径
LOG_DIR = os.environ.get('LOG_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs'))
os.makedirs(LOG_DIR, exist_ok=True)
handler = RotatingFileHandler(
f'{LOG_DIR}/app.log',
maxBytes=1024*1024*10,
backupCount=5,
encoding="utf-8"
)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app = Flask(__name__)
# 持久化 secret_key:优先使用环境变量,否则从文件读取,都不存在则生成并保存
_secret_key_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.secret_key')
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key and os.path.exists(_secret_key_path):
with open(_secret_key_path, 'rb') as _f:
_secret_key = _f.read()
if not _secret_key:
_secret_key = os.urandom(24)
with open(_secret_key_path, 'wb') as _f:
_f.write(_secret_key)
app.secret_key = _secret_key
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
# 静态文件目录 — 优先使用环境变量,否则使用项目相对路径
STATIC_DIR = os.environ.get('STATIC_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static'))
os.makedirs(STATIC_DIR, exist_ok=True)
# 全局状态
class LoginSession:
def __init__(self):
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.login_page = None
self.status = "idle"
self.cookies = None
self.message = ""
self.start_time = 0
self.proxy_used = None
self.lock = threading.Lock()
self.loop = None
login_session = LoginSession()
def run_async_sync(coro):
if login_session.loop is None or login_session.loop.is_closed():
login_session.loop = asyncio.new_event_loop()
asyncio.set_event_loop(login_session.loop)
return login_session.loop.run_until_complete(coro)
def get_proxy_from_api(api_url):
if not api_url:
return None
try:
resp = requests.get(api_url, timeout=10)
resp.raise_for_status()
content = resp.text.strip()
try:
data = resp.json()
if "ip" in data and "port" in data:
proxy_str = f"{data['ip']}:{data['port']}"
else:
proxy_str = content
except json.JSONDecodeError:
proxy_str = content
if not proxy_str.startswith(("http://", "https://")):
proxy_str = f"http://{proxy_str}"
return {"server": proxy_str}
except Exception as e:
app.logger.error(f"获取代理失败: {e}")
return None
async def cleanup_session():
try:
if login_session.login_page:
await login_session.login_page.close()
if login_session.page:
await login_session.page.close()
if login_session.context:
await login_session.context.close()
if login_session.browser:
await login_session.browser.close()
if login_session.playwright:
await login_session.playwright.stop()
except Exception as e:
app.logger.warning(f"清理会话异常: {e}")
login_session.playwright = None
login_session.browser = None
login_session.context = None
login_session.page = None
login_session.login_page = None
login_session.status = "idle"
login_session.message = ""
async def inject_stealth(context):
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', { get: () => [] });
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','zh','en']});
""")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/qrcode.png')
def get_qrcode():
qr_path = os.path.join(STATIC_DIR, 'qrcode.png')
if os.path.exists(qr_path):
return send_file(qr_path, mimetype='image/png')
return "二维码未生成,请先点击获取二维码", 404
@app.route('/api/start_qr', methods=['POST'])
def start_qr():
if not login_session.lock.acquire(blocking=False):
return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"})
try:
data = request.get_json() or {}
proxy_api = data.get('proxy_api', '').strip()
result = run_async_sync(_start_qr(proxy_api))
return result
finally:
login_session.lock.release()
async def _start_qr(proxy_api):
await cleanup_session()
login_session.status = "loading"
login_session.message = "正在启动浏览器..."
login_session.proxy_used = proxy_api if proxy_api else "默认IP"
try:
login_session.playwright = await async_playwright().start()
launch_options = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--font-render-hinting=medium"
]
}
proxy = get_proxy_from_api(proxy_api) if proxy_api else None
if proxy:
launch_options["proxy"] = proxy
app.logger.info(f"使用代理: {proxy['server']}")
else:
app.logger.info("使用默认 IP")
login_session.browser = await login_session.playwright.chromium.launch(**launch_options)
login_session.context = await login_session.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 720},
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
await inject_stealth(login_session.context)
login_session.page = await login_session.context.new_page()
async def handle_new_page(new_p):
app.logger.info("检测到新开登录页面")
login_session.login_page = new_p
await new_p.wait_for_load_state("domcontentloaded")
login_session.context.on("page", handle_new_page)
app.logger.info("访问抖音主页")
await login_session.page.goto("https://www.douyin.com/", wait_until="domcontentloaded", timeout=30000)
await login_session.page.wait_for_timeout(random.uniform(2000, 4000))
page_target = login_session.page
login_btn = None
try:
login_btn = await page_target.wait_for_selector(
'button:has-text("登录"), a:has-text("登录"), div:has-text("登录")',
timeout=8000
)
except:
pass
if login_btn:
await login_btn.click()
app.logger.info("点击登录按钮成功")
else:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent.trim()==="登录" && el.click) {el.click(); return true;}
}
return false;
}''')
app.logger.info("JS兜底点击登录")
await login_session.page.wait_for_timeout(random.uniform(3000, 5000))
if login_session.login_page is not None:
page_target = login_session.login_page
app.logger.info("切换到新登录标签页")
await page_target.wait_for_load_state("domcontentloaded")
await page_target.wait_for_timeout(2000)
app.logger.info("等待登录弹窗...")
try:
await page_target.wait_for_selector(
'div[role="dialog"], .auth-modal, .login-box, [class*="login"], [class*="modal"]',
timeout=15000
)
except:
pass
await page_target.wait_for_timeout(2000)
# 强制点击“二维码登录”
app.logger.info("尝试切换到二维码登录...")
for attempt in range(3):
try:
qr_tab = await page_target.wait_for_selector(
'text=二维码登录, [data-e2e="qrcode-tab"], div:has-text("二维码登录")',
timeout=5000
)
if qr_tab:
await qr_tab.click()
app.logger.info(f"✅ 点击二维码登录选项卡 (第{attempt+1}次)")
await page_target.wait_for_timeout(3000)
break
except Exception as e:
app.logger.warning(f"点击二维码登录失败 {attempt+1}: {e}")
try:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent && el.textContent.includes("二维码登录") && el.click){
el.click();
return true;
}
}
return false;
}''')
app.logger.info("JS点击二维码登录")
await page_target.wait_for_timeout(3000)
break
except:
pass
await page_target.wait_for_timeout(2000)
app.logger.info("开始查找二维码...")
qr_img = None
max_wait = 60
start_time = time.time()
qr_selectors = [
'div[role="dialog"] img[src*="qrcode"]',
'div[role="dialog"] img[src*="passport"]',
'.auth-modal img[src*="qrcode"]',
'.login-box img[src*="qrcode"]',
'img[src*="qrcode"]',
'img[src*="passport"]',
'img[alt*="二维码"]',
'img[class*="qrcode"]',
'.qrcode-box img',
'.login-qrcode-image img',
'div[data-e2e="qrcode"] img',
'div[class*="qr"] img',
'canvas[class*="qrcode"]',
'svg[class*="qrcode"]'
]
while time.time() - start_time < max_wait:
await page_target.wait_for_load_state("domcontentloaded")
for sel in qr_selectors:
try:
elem = await page_target.query_selector(sel)
if elem:
if 'img' in sel:
src = await elem.get_attribute('src')
if src and len(src) > 10:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
else:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
except:
continue
if qr_img:
break
await page_target.wait_for_timeout(1000)
if int(time.time() - start_time) % 5 == 0:
app.logger.info(f"等待二维码... {int(time.time() - start_time)}s")
# ---------- 核心改动:全屏截图 ----------
if qr_img:
try:
await qr_img.wait_for_element_state("visible", timeout=5000)
except:
pass
# 优先截取登录弹窗,没有则全屏截图
try:
dialog = await page_target.query_selector('div[role="dialog"], .auth-modal, .login-box, [class*="modal"]')
if dialog:
img_bytes = await dialog.screenshot()
app.logger.info("✅ 截取登录弹窗区域")
else:
img_bytes = await page_target.screenshot(full_page=True)
app.logger.info("✅ 全屏截图")
except Exception as e:
app.logger.warning(f"区域截图失败,使用全屏截图: {e}")
img_bytes = await page_target.screenshot(full_page=True)
# -------------------------------------
# 保存到static
qr_file_path = os.path.join(STATIC_DIR, 'qrcode.png')
with open(qr_file_path, 'wb') as f:
f.write(img_bytes)
app.logger.info(f"二维码截图已保存至 {qr_file_path}")
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "请使用抖音 App 扫码"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used,
"qr_download": "/qrcode.png"
})
else:
app.logger.error(f"{max_wait}秒未检测到二维码,返回全屏截图")
img_bytes = await page_target.screenshot(full_page=True)
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "未检测到二维码,全屏截图请查看"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used
})
except Exception as e:
app.logger.error(f"启动扫码失败: {e}", exc_info=True)
login_session.status = "error"
login_session.message = str(e)
await cleanup_session()
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/check_login', methods=['GET'])
def check_login():
if login_session.status in ["idle", "loading"]:
return jsonify({"status": login_session.status, "message": login_session.message})
if login_session.status == "error":
return jsonify({"status": "error", "message": login_session.message})
if time.time() - login_session.start_time > 300:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "登录超时,请重新发起扫码"})
try:
if not login_session.context:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "会话已失效"})
cookies = run_async_sync(login_session.context.cookies())
app.logger.info(f"当前Cookie数量: {len(cookies)}")
cookie_names = [c['name'] for c in cookies]
need_keys = {"sessionid_ss", "sessionid", "sid_guard", "uid_tt"}
has_valid = any(k in cookie_names for k in need_keys)
if has_valid:
login_session.cookies = cookies
login_session.status = "success"
login_session.message = "登录成功"
app.logger.info(f"✅ 登录成功,获取有效Cookie {len(cookies)}个")
run_async_sync(cleanup_session())
return jsonify({
"status": "success",
"cookies": cookies,
"message": "Cookie 提取成功"
})
else:
return jsonify({"status": "scanning", "message": "等待手机抖音确认登录..."})
except Exception as e:
app.logger.error(f"检查登录异常: {e}", exc_info=True)
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/test_proxy', methods=['POST'])
def test_proxy():
data = request.get_json()
proxy_api = data.get('proxy_api', '').strip()
if not proxy_api:
return jsonify({"status": "error", "message": "请提供代理 API"})
try:
proxy = get_proxy_from_api(proxy_api)
if not proxy:
return jsonify({"status": "error", "message": "获取代理失败"})
proxies = {
'http': proxy['server'],
'https': proxy['server']
}
resp = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10)
if resp.status_code == 200:
data = resp.json()
return jsonify({
"status": "success",
"ip": data.get('origin', 'unknown'),
"message": "代理可用"
})
else:
return jsonify({"status": "error", "message": f"代理响应异常: {resp.status_code}"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/status', methods=['GET'])
def api_status():
"""返回当前会话状态,供前端轮询"""
elapsed = ""
if login_session.status in ("qr_ready", "scanning", "success") and login_session.start_time:
elapsed_sec = int(time.time() - login_session.start_time)
elapsed = f" (已等待 {elapsed_sec}s)"
return jsonify({
"status": login_session.status,
"message": login_session.message + elapsed,
"proxy_used": login_session.proxy_used
})
@app.route('/api/reset', methods=['POST'])
def api_reset():
"""强制重置当前会话"""
try:
run_async_sync(cleanup_session())
except Exception as e:
app.logger.warning(f"重置会话异常: {e}")
login_session.status = "idle"
login_session.message = "会话已重置"
login_session.cookies = None
login_session.start_time = 0
login_session.proxy_used = None
return jsonify({"status": "success", "message": "会话已重置"})
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=5001)