commit f9a06069b11a80aaea936e8f6d63ed12d8b6081e Author: root Date: Wed Jun 24 22:20:04 2026 +0800 @ 首次提交:抖音 Cookie 项目初始化 - Flask 应用主体 - 项目依赖配置 - 前端模板 - .gitignore 配置 @ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..020cefb --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# --- Python --- +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.eggs/ +*.egg + +# --- Virtual environment --- +venv/ +.venv/ +env/ + +# --- Environment & Config --- +.env +.user.ini + +# --- Logs --- +logs/ +*.log + +# --- IDE & Editor --- +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# --- OS --- +.DS_Store +Thumbs.db + +# --- Static generated files --- +static/qrcode.png diff --git a/app.py b/app.py new file mode 100644 index 0000000..d45beea --- /dev/null +++ b/app.py @@ -0,0 +1,438 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import asyncio +import base64 +import json +import os +import time +import logging +import threading +import random +from logging.handlers import RotatingFileHandler +from io import BytesIO +from PIL import Image + +import requests +from flask import Flask, render_template, request, jsonify, send_file +from playwright.async_api import async_playwright + +# 配置日志 +LOG_DIR = '/www/wwwlogs/douyin_cookie' +if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR, exist_ok=True) + +handler = RotatingFileHandler( + f'{LOG_DIR}/app.log', + maxBytes=1024*1024*10, + backupCount=5, + encoding="utf-8" +) +handler.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +app = Flask(__name__) +app.secret_key = os.urandom(24) +app.logger.addHandler(handler) +app.logger.setLevel(logging.INFO) + +# 静态文件目录 +STATIC_DIR = '/www/wwwroot/douyin_cookie_extractor/static' +os.makedirs(STATIC_DIR, exist_ok=True) + +# 全局状态 +class LoginSession: + def __init__(self): + self.playwright = None + self.browser = None + self.context = None + self.page = None + self.login_page = None + self.status = "idle" + self.cookies = None + self.message = "" + self.start_time = 0 + self.proxy_used = None + self.lock = threading.Lock() + self.loop = None + +login_session = LoginSession() + +def run_async_sync(coro): + if login_session.loop is None or login_session.loop.is_closed(): + login_session.loop = asyncio.new_event_loop() + asyncio.set_event_loop(login_session.loop) + return login_session.loop.run_until_complete(coro) + +def get_proxy_from_api(api_url): + if not api_url: + return None + try: + resp = requests.get(api_url, timeout=10) + resp.raise_for_status() + content = resp.text.strip() + try: + data = resp.json() + if "ip" in data and "port" in data: + proxy_str = f"{data['ip']}:{data['port']}" + else: + proxy_str = content + except json.JSONDecodeError: + proxy_str = content + if not proxy_str.startswith(("http://", "https://")): + proxy_str = f"http://{proxy_str}" + return {"server": proxy_str} + except Exception as e: + app.logger.error(f"获取代理失败: {e}") + return None + +async def cleanup_session(): + try: + if login_session.login_page: + await login_session.login_page.close() + if login_session.page: + await login_session.page.close() + if login_session.context: + await login_session.context.close() + if login_session.browser: + await login_session.browser.close() + if login_session.playwright: + await login_session.playwright.stop() + except Exception as e: + app.logger.warning(f"清理会话异常: {e}") + login_session.playwright = None + login_session.browser = None + login_session.context = None + login_session.page = None + login_session.login_page = None + login_session.status = "idle" + login_session.message = "" + +async def inject_stealth(context): + await context.add_init_script(""" + Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); + window.navigator.chrome = { runtime: {} }; + Object.defineProperty(navigator, 'plugins', { get: () => [] }); + Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','zh','en']}); + """) + +@app.route('/') +def index(): + return render_template('index.html') + +@app.route('/qrcode.png') +def get_qrcode(): + qr_path = os.path.join(STATIC_DIR, 'qrcode.png') + if os.path.exists(qr_path): + return send_file(qr_path, mimetype='image/png') + return "二维码未生成,请先点击获取二维码", 404 + +@app.route('/api/start_qr', methods=['POST']) +def start_qr(): + if not login_session.lock.acquire(blocking=False): + return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"}) + try: + data = request.get_json() or {} + proxy_api = data.get('proxy_api', '').strip() + result = run_async_sync(_start_qr(proxy_api)) + return result + finally: + login_session.lock.release() + +async def _start_qr(proxy_api): + await cleanup_session() + + login_session.status = "loading" + login_session.message = "正在启动浏览器..." + login_session.proxy_used = proxy_api if proxy_api else "默认IP" + + try: + login_session.playwright = await async_playwright().start() + + launch_options = { + "headless": True, + "args": [ + "--disable-blink-features=AutomationControlled", + "--disable-infobars", + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-gpu", + "--font-render-hinting=medium" + ] + } + + proxy = get_proxy_from_api(proxy_api) if proxy_api else None + if proxy: + launch_options["proxy"] = proxy + app.logger.info(f"使用代理: {proxy['server']}") + else: + app.logger.info("使用默认 IP") + + login_session.browser = await login_session.playwright.chromium.launch(**launch_options) + login_session.context = await login_session.browser.new_context( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", + viewport={"width": 1280, "height": 720}, + locale="zh-CN", + timezone_id="Asia/Shanghai" + ) + await inject_stealth(login_session.context) + login_session.page = await login_session.context.new_page() + + async def handle_new_page(new_p): + app.logger.info("检测到新开登录页面") + login_session.login_page = new_p + await new_p.wait_for_load_state("domcontentloaded") + login_session.context.on("page", handle_new_page) + + app.logger.info("访问抖音主页") + await login_session.page.goto("https://www.douyin.com/", wait_until="domcontentloaded", timeout=30000) + await login_session.page.wait_for_timeout(random.uniform(2000, 4000)) + + page_target = login_session.page + login_btn = None + try: + login_btn = await page_target.wait_for_selector( + 'button:has-text("登录"), a:has-text("登录"), div:has-text("登录")', + timeout=8000 + ) + except: + pass + if login_btn: + await login_btn.click() + app.logger.info("点击登录按钮成功") + else: + await page_target.evaluate('''()=>{ + let els = document.querySelectorAll("*"); + for(let el of els){ + if(el.textContent.trim()==="登录" && el.click) {el.click(); return true;} + } + return false; + }''') + app.logger.info("JS兜底点击登录") + + await login_session.page.wait_for_timeout(random.uniform(3000, 5000)) + + if login_session.login_page is not None: + page_target = login_session.login_page + app.logger.info("切换到新登录标签页") + await page_target.wait_for_load_state("domcontentloaded") + await page_target.wait_for_timeout(2000) + + app.logger.info("等待登录弹窗...") + try: + await page_target.wait_for_selector( + 'div[role="dialog"], .auth-modal, .login-box, [class*="login"], [class*="modal"]', + timeout=15000 + ) + except: + pass + await page_target.wait_for_timeout(2000) + + # 强制点击“二维码登录” + app.logger.info("尝试切换到二维码登录...") + for attempt in range(3): + try: + qr_tab = await page_target.wait_for_selector( + 'text=二维码登录, [data-e2e="qrcode-tab"], div:has-text("二维码登录")', + timeout=5000 + ) + if qr_tab: + await qr_tab.click() + app.logger.info(f"✅ 点击二维码登录选项卡 (第{attempt+1}次)") + await page_target.wait_for_timeout(3000) + break + except Exception as e: + app.logger.warning(f"点击二维码登录失败 {attempt+1}: {e}") + try: + await page_target.evaluate('''()=>{ + let els = document.querySelectorAll("*"); + for(let el of els){ + if(el.textContent && el.textContent.includes("二维码登录") && el.click){ + el.click(); + return true; + } + } + return false; + }''') + app.logger.info("JS点击二维码登录") + await page_target.wait_for_timeout(3000) + break + except: + pass + await page_target.wait_for_timeout(2000) + + app.logger.info("开始查找二维码...") + qr_img = None + max_wait = 60 + start_time = time.time() + + qr_selectors = [ + 'div[role="dialog"] img[src*="qrcode"]', + 'div[role="dialog"] img[src*="passport"]', + '.auth-modal img[src*="qrcode"]', + '.login-box img[src*="qrcode"]', + 'img[src*="qrcode"]', + 'img[src*="passport"]', + 'img[alt*="二维码"]', + 'img[class*="qrcode"]', + '.qrcode-box img', + '.login-qrcode-image img', + 'div[data-e2e="qrcode"] img', + 'div[class*="qr"] img', + 'canvas[class*="qrcode"]', + 'svg[class*="qrcode"]' + ] + + while time.time() - start_time < max_wait: + await page_target.wait_for_load_state("domcontentloaded") + for sel in qr_selectors: + try: + elem = await page_target.query_selector(sel) + if elem: + if 'img' in sel: + src = await elem.get_attribute('src') + if src and len(src) > 10: + qr_img = elem + app.logger.info(f"✅ 找到二维码: {sel}") + break + else: + qr_img = elem + app.logger.info(f"✅ 找到二维码: {sel}") + break + except: + continue + if qr_img: + break + await page_target.wait_for_timeout(1000) + if int(time.time() - start_time) % 5 == 0: + app.logger.info(f"等待二维码... {int(time.time() - start_time)}s") + + # ---------- 核心改动:全屏截图 ---------- + if qr_img: + try: + await qr_img.wait_for_element_state("visible", timeout=5000) + except: + pass + + # 优先截取登录弹窗,没有则全屏截图 + try: + dialog = await page_target.query_selector('div[role="dialog"], .auth-modal, .login-box, [class*="modal"]') + if dialog: + img_bytes = await dialog.screenshot() + app.logger.info("✅ 截取登录弹窗区域") + else: + img_bytes = await page_target.screenshot(full_page=True) + app.logger.info("✅ 全屏截图") + except Exception as e: + app.logger.warning(f"区域截图失败,使用全屏截图: {e}") + img_bytes = await page_target.screenshot(full_page=True) + # ------------------------------------- + + # 保存到static + qr_file_path = os.path.join(STATIC_DIR, 'qrcode.png') + with open(qr_file_path, 'wb') as f: + f.write(img_bytes) + app.logger.info(f"二维码截图已保存至 {qr_file_path}") + + img_base64 = base64.b64encode(img_bytes).decode('utf-8') + login_session.status = "qr_ready" + login_session.message = "请使用抖音 App 扫码" + login_session.start_time = time.time() + return jsonify({ + "status": "qr_ready", + "qr_image": f"data:image/png;base64,{img_base64}", + "proxy_used": login_session.proxy_used, + "qr_download": "/qrcode.png" + }) + else: + app.logger.error(f"{max_wait}秒未检测到二维码,返回全屏截图") + img_bytes = await page_target.screenshot(full_page=True) + img_base64 = base64.b64encode(img_bytes).decode('utf-8') + login_session.status = "qr_ready" + login_session.message = "未检测到二维码,全屏截图请查看" + login_session.start_time = time.time() + return jsonify({ + "status": "qr_ready", + "qr_image": f"data:image/png;base64,{img_base64}", + "proxy_used": login_session.proxy_used + }) + + except Exception as e: + app.logger.error(f"启动扫码失败: {e}", exc_info=True) + login_session.status = "error" + login_session.message = str(e) + await cleanup_session() + return jsonify({"status": "error", "message": str(e)}) + +@app.route('/api/check_login', methods=['GET']) +def check_login(): + if login_session.status in ["idle", "loading"]: + return jsonify({"status": login_session.status, "message": login_session.message}) + + if login_session.status == "error": + return jsonify({"status": "error", "message": login_session.message}) + + if time.time() - login_session.start_time > 300: + run_async_sync(cleanup_session()) + return jsonify({"status": "error", "message": "登录超时,请重新发起扫码"}) + + try: + if not login_session.context: + run_async_sync(cleanup_session()) + return jsonify({"status": "error", "message": "会话已失效"}) + + cookies = run_async_sync(login_session.context.cookies()) + app.logger.info(f"当前Cookie数量: {len(cookies)}") + + cookie_names = [c['name'] for c in cookies] + need_keys = {"sessionid_ss", "sessionid", "sid_guard", "uid_tt"} + has_valid = any(k in cookie_names for k in need_keys) + + if has_valid: + login_session.cookies = cookies + login_session.status = "success" + login_session.message = "登录成功" + app.logger.info(f"✅ 登录成功,获取有效Cookie {len(cookies)}个") + run_async_sync(cleanup_session()) + return jsonify({ + "status": "success", + "cookies": cookies, + "message": "Cookie 提取成功" + }) + else: + return jsonify({"status": "scanning", "message": "等待手机抖音确认登录..."}) + + except Exception as e: + app.logger.error(f"检查登录异常: {e}", exc_info=True) + return jsonify({"status": "error", "message": str(e)}) + +@app.route('/api/test_proxy', methods=['POST']) +def test_proxy(): + data = request.get_json() + proxy_api = data.get('proxy_api', '').strip() + if not proxy_api: + return jsonify({"status": "error", "message": "请提供代理 API"}) + try: + proxy = get_proxy_from_api(proxy_api) + if not proxy: + return jsonify({"status": "error", "message": "获取代理失败"}) + proxies = { + 'http': proxy['server'], + 'https': proxy['server'] + } + resp = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10) + if resp.status_code == 200: + data = resp.json() + return jsonify({ + "status": "success", + "ip": data.get('origin', 'unknown'), + "message": "代理可用" + }) + else: + return jsonify({"status": "error", "message": f"代理响应异常: {resp.status_code}"}) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}) + +if __name__ == '__main__': + app.run(debug=False, host='0.0.0.0', port=5001) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0bbf3d9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +Flask==2.3.3 +playwright==1.40.0 +playwright-stealth==1.0.6 +requests==2.31.0 +gunicorn==21.2.0 +eventlet==0.33.3 \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..894cb83 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,1082 @@ + + + + + + 抖音 Cookie 扫码提取器 + + + + + + +
+ +
+
+
+ 🔄 + 0 +
+
总提取次数
+
+
+
+ + 0 +
+
成功次数
+
+
+
+ ⏱️ + 0s +
+
平均耗时
+
+
+
+ 📊 + 0% +
+
成功率
+
+
+ + +
+
+
🍪
+
+

抖音 Cookie 提取器

+
扫码登录 · 一键提取 JSON 格式 Cookie
+
+
+ + +
+
+
+ + 就绪 +
+ - +
+
等待操作...
+
+
+
+
+ + +
+ +
+
+ + +
+ + + + + + +
+ + +
+
+ + +
+ + 默认:服务器 IP +
+
+ +
+ 留空则使用服务器默认 IP +
+
+
+
+
+ + +
+ + +
+ + + + + \ No newline at end of file