#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import base64 import json import os import time import logging import threading import random from logging.handlers import RotatingFileHandler from io import BytesIO from PIL import Image import requests from flask import Flask, render_template, request, jsonify, send_file from playwright.async_api import async_playwright # 配置日志 LOG_DIR = '/www/wwwlogs/douyin_cookie' if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) handler = RotatingFileHandler( f'{LOG_DIR}/app.log', maxBytes=1024*1024*10, backupCount=5, encoding="utf-8" ) handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) app = Flask(__name__) app.secret_key = os.urandom(24) app.logger.addHandler(handler) app.logger.setLevel(logging.INFO) # 静态文件目录 STATIC_DIR = '/www/wwwroot/douyin_cookie_extractor/static' os.makedirs(STATIC_DIR, exist_ok=True) # 全局状态 class LoginSession: def __init__(self): self.playwright = None self.browser = None self.context = None self.page = None self.login_page = None self.status = "idle" self.cookies = None self.message = "" self.start_time = 0 self.proxy_used = None self.lock = threading.Lock() self.loop = None login_session = LoginSession() def run_async_sync(coro): if login_session.loop is None or login_session.loop.is_closed(): login_session.loop = asyncio.new_event_loop() asyncio.set_event_loop(login_session.loop) return login_session.loop.run_until_complete(coro) def get_proxy_from_api(api_url): if not api_url: return None try: resp = requests.get(api_url, timeout=10) resp.raise_for_status() content = resp.text.strip() try: data = resp.json() if "ip" in data and "port" in data: proxy_str = f"{data['ip']}:{data['port']}" else: proxy_str = content except json.JSONDecodeError: proxy_str = content if not proxy_str.startswith(("http://", "https://")): proxy_str = f"http://{proxy_str}" return {"server": proxy_str} except Exception as e: app.logger.error(f"获取代理失败: {e}") return None async def cleanup_session(): try: if login_session.login_page: await login_session.login_page.close() if login_session.page: await login_session.page.close() if login_session.context: await login_session.context.close() if login_session.browser: await login_session.browser.close() if login_session.playwright: await login_session.playwright.stop() except Exception as e: app.logger.warning(f"清理会话异常: {e}") login_session.playwright = None login_session.browser = None login_session.context = None login_session.page = None login_session.login_page = None login_session.status = "idle" login_session.message = "" async def inject_stealth(context): await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.navigator.chrome = { runtime: {} }; Object.defineProperty(navigator, 'plugins', { get: () => [] }); Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','zh','en']}); """) @app.route('/') def index(): return render_template('index.html') @app.route('/qrcode.png') def get_qrcode(): qr_path = os.path.join(STATIC_DIR, 'qrcode.png') if os.path.exists(qr_path): return send_file(qr_path, mimetype='image/png') return "二维码未生成,请先点击获取二维码", 404 @app.route('/api/start_qr', methods=['POST']) def start_qr(): if not login_session.lock.acquire(blocking=False): return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"}) try: data = request.get_json() or {} proxy_api = data.get('proxy_api', '').strip() result = run_async_sync(_start_qr(proxy_api)) return result finally: login_session.lock.release() async def _start_qr(proxy_api): await cleanup_session() login_session.status = "loading" login_session.message = "正在启动浏览器..." login_session.proxy_used = proxy_api if proxy_api else "默认IP" try: login_session.playwright = await async_playwright().start() launch_options = { "headless": True, "args": [ "--disable-blink-features=AutomationControlled", "--disable-infobars", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--font-render-hinting=medium" ] } proxy = get_proxy_from_api(proxy_api) if proxy_api else None if proxy: launch_options["proxy"] = proxy app.logger.info(f"使用代理: {proxy['server']}") else: app.logger.info("使用默认 IP") login_session.browser = await login_session.playwright.chromium.launch(**launch_options) login_session.context = await login_session.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", viewport={"width": 1280, "height": 720}, locale="zh-CN", timezone_id="Asia/Shanghai" ) await inject_stealth(login_session.context) login_session.page = await login_session.context.new_page() async def handle_new_page(new_p): app.logger.info("检测到新开登录页面") login_session.login_page = new_p await new_p.wait_for_load_state("domcontentloaded") login_session.context.on("page", handle_new_page) app.logger.info("访问抖音主页") await login_session.page.goto("https://www.douyin.com/", wait_until="domcontentloaded", timeout=30000) await login_session.page.wait_for_timeout(random.uniform(2000, 4000)) page_target = login_session.page login_btn = None try: login_btn = await page_target.wait_for_selector( 'button:has-text("登录"), a:has-text("登录"), div:has-text("登录")', timeout=8000 ) except: pass if login_btn: await login_btn.click() app.logger.info("点击登录按钮成功") else: await page_target.evaluate('''()=>{ let els = document.querySelectorAll("*"); for(let el of els){ if(el.textContent.trim()==="登录" && el.click) {el.click(); return true;} } return false; }''') app.logger.info("JS兜底点击登录") await login_session.page.wait_for_timeout(random.uniform(3000, 5000)) if login_session.login_page is not None: page_target = login_session.login_page app.logger.info("切换到新登录标签页") await page_target.wait_for_load_state("domcontentloaded") await page_target.wait_for_timeout(2000) app.logger.info("等待登录弹窗...") try: await page_target.wait_for_selector( 'div[role="dialog"], .auth-modal, .login-box, [class*="login"], [class*="modal"]', timeout=15000 ) except: pass await page_target.wait_for_timeout(2000) # 强制点击“二维码登录” app.logger.info("尝试切换到二维码登录...") for attempt in range(3): try: qr_tab = await page_target.wait_for_selector( 'text=二维码登录, [data-e2e="qrcode-tab"], div:has-text("二维码登录")', timeout=5000 ) if qr_tab: await qr_tab.click() app.logger.info(f"✅ 点击二维码登录选项卡 (第{attempt+1}次)") await page_target.wait_for_timeout(3000) break except Exception as e: app.logger.warning(f"点击二维码登录失败 {attempt+1}: {e}") try: await page_target.evaluate('''()=>{ let els = document.querySelectorAll("*"); for(let el of els){ if(el.textContent && el.textContent.includes("二维码登录") && el.click){ el.click(); return true; } } return false; }''') app.logger.info("JS点击二维码登录") await page_target.wait_for_timeout(3000) break except: pass await page_target.wait_for_timeout(2000) app.logger.info("开始查找二维码...") qr_img = None max_wait = 60 start_time = time.time() qr_selectors = [ 'div[role="dialog"] img[src*="qrcode"]', 'div[role="dialog"] img[src*="passport"]', '.auth-modal img[src*="qrcode"]', '.login-box img[src*="qrcode"]', 'img[src*="qrcode"]', 'img[src*="passport"]', 'img[alt*="二维码"]', 'img[class*="qrcode"]', '.qrcode-box img', '.login-qrcode-image img', 'div[data-e2e="qrcode"] img', 'div[class*="qr"] img', 'canvas[class*="qrcode"]', 'svg[class*="qrcode"]' ] while time.time() - start_time < max_wait: await page_target.wait_for_load_state("domcontentloaded") for sel in qr_selectors: try: elem = await page_target.query_selector(sel) if elem: if 'img' in sel: src = await elem.get_attribute('src') if src and len(src) > 10: qr_img = elem app.logger.info(f"✅ 找到二维码: {sel}") break else: qr_img = elem app.logger.info(f"✅ 找到二维码: {sel}") break except: continue if qr_img: break await page_target.wait_for_timeout(1000) if int(time.time() - start_time) % 5 == 0: app.logger.info(f"等待二维码... {int(time.time() - start_time)}s") # ---------- 核心改动:全屏截图 ---------- if qr_img: try: await qr_img.wait_for_element_state("visible", timeout=5000) except: pass # 优先截取登录弹窗,没有则全屏截图 try: dialog = await page_target.query_selector('div[role="dialog"], .auth-modal, .login-box, [class*="modal"]') if dialog: img_bytes = await dialog.screenshot() app.logger.info("✅ 截取登录弹窗区域") else: img_bytes = await page_target.screenshot(full_page=True) app.logger.info("✅ 全屏截图") except Exception as e: app.logger.warning(f"区域截图失败,使用全屏截图: {e}") img_bytes = await page_target.screenshot(full_page=True) # ------------------------------------- # 保存到static qr_file_path = os.path.join(STATIC_DIR, 'qrcode.png') with open(qr_file_path, 'wb') as f: f.write(img_bytes) app.logger.info(f"二维码截图已保存至 {qr_file_path}") img_base64 = base64.b64encode(img_bytes).decode('utf-8') login_session.status = "qr_ready" login_session.message = "请使用抖音 App 扫码" login_session.start_time = time.time() return jsonify({ "status": "qr_ready", "qr_image": f"data:image/png;base64,{img_base64}", "proxy_used": login_session.proxy_used, "qr_download": "/qrcode.png" }) else: app.logger.error(f"{max_wait}秒未检测到二维码,返回全屏截图") img_bytes = await page_target.screenshot(full_page=True) img_base64 = base64.b64encode(img_bytes).decode('utf-8') login_session.status = "qr_ready" login_session.message = "未检测到二维码,全屏截图请查看" login_session.start_time = time.time() return jsonify({ "status": "qr_ready", "qr_image": f"data:image/png;base64,{img_base64}", "proxy_used": login_session.proxy_used }) except Exception as e: app.logger.error(f"启动扫码失败: {e}", exc_info=True) login_session.status = "error" login_session.message = str(e) await cleanup_session() return jsonify({"status": "error", "message": str(e)}) @app.route('/api/check_login', methods=['GET']) def check_login(): if login_session.status in ["idle", "loading"]: return jsonify({"status": login_session.status, "message": login_session.message}) if login_session.status == "error": return jsonify({"status": "error", "message": login_session.message}) if time.time() - login_session.start_time > 300: run_async_sync(cleanup_session()) return jsonify({"status": "error", "message": "登录超时,请重新发起扫码"}) try: if not login_session.context: run_async_sync(cleanup_session()) return jsonify({"status": "error", "message": "会话已失效"}) cookies = run_async_sync(login_session.context.cookies()) app.logger.info(f"当前Cookie数量: {len(cookies)}") cookie_names = [c['name'] for c in cookies] need_keys = {"sessionid_ss", "sessionid", "sid_guard", "uid_tt"} has_valid = any(k in cookie_names for k in need_keys) if has_valid: login_session.cookies = cookies login_session.status = "success" login_session.message = "登录成功" app.logger.info(f"✅ 登录成功,获取有效Cookie {len(cookies)}个") run_async_sync(cleanup_session()) return jsonify({ "status": "success", "cookies": cookies, "message": "Cookie 提取成功" }) else: return jsonify({"status": "scanning", "message": "等待手机抖音确认登录..."}) except Exception as e: app.logger.error(f"检查登录异常: {e}", exc_info=True) return jsonify({"status": "error", "message": str(e)}) @app.route('/api/test_proxy', methods=['POST']) def test_proxy(): data = request.get_json() proxy_api = data.get('proxy_api', '').strip() if not proxy_api: return jsonify({"status": "error", "message": "请提供代理 API"}) try: proxy = get_proxy_from_api(proxy_api) if not proxy: return jsonify({"status": "error", "message": "获取代理失败"}) proxies = { 'http': proxy['server'], 'https': proxy['server'] } resp = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10) if resp.status_code == 200: data = resp.json() return jsonify({ "status": "success", "ip": data.get('origin', 'unknown'), "message": "代理可用" }) else: return jsonify({"status": "error", "message": f"代理响应异常: {resp.status_code}"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}) if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=5001)