Compare commits

..

2 Commits

Author SHA1 Message Date
root 31c7411b5d Merge branch 'main' of https://git.grxiao.cn/travel/douyin_cookie_yunsya 2026-06-24 22:36:12 +08:00
root f9a06069b1 @
首次提交:抖音 Cookie 项目初始化

- Flask 应用主体
- 项目依赖配置
- 前端模板
- .gitignore 配置
@
2026-06-24 22:20:04 +08:00
4 changed files with 1561 additions and 0 deletions
+35
View File
@@ -0,0 +1,35 @@
# --- Python ---
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/
*.egg
# --- Virtual environment ---
venv/
.venv/
env/
# --- Environment & Config ---
.env
.user.ini
# --- Logs ---
logs/
*.log
# --- IDE & Editor ---
.vscode/
.idea/
*.swp
*.swo
*~
# --- OS ---
.DS_Store
Thumbs.db
# --- Static generated files ---
static/qrcode.png
+438
View File
@@ -0,0 +1,438 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import base64
import json
import os
import time
import logging
import threading
import random
from logging.handlers import RotatingFileHandler
from io import BytesIO
from PIL import Image
import requests
from flask import Flask, render_template, request, jsonify, send_file
from playwright.async_api import async_playwright
# 配置日志
LOG_DIR = '/www/wwwlogs/douyin_cookie'
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
handler = RotatingFileHandler(
f'{LOG_DIR}/app.log',
maxBytes=1024*1024*10,
backupCount=5,
encoding="utf-8"
)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
# 静态文件目录
STATIC_DIR = '/www/wwwroot/douyin_cookie_extractor/static'
os.makedirs(STATIC_DIR, exist_ok=True)
# 全局状态
class LoginSession:
def __init__(self):
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.login_page = None
self.status = "idle"
self.cookies = None
self.message = ""
self.start_time = 0
self.proxy_used = None
self.lock = threading.Lock()
self.loop = None
login_session = LoginSession()
def run_async_sync(coro):
if login_session.loop is None or login_session.loop.is_closed():
login_session.loop = asyncio.new_event_loop()
asyncio.set_event_loop(login_session.loop)
return login_session.loop.run_until_complete(coro)
def get_proxy_from_api(api_url):
if not api_url:
return None
try:
resp = requests.get(api_url, timeout=10)
resp.raise_for_status()
content = resp.text.strip()
try:
data = resp.json()
if "ip" in data and "port" in data:
proxy_str = f"{data['ip']}:{data['port']}"
else:
proxy_str = content
except json.JSONDecodeError:
proxy_str = content
if not proxy_str.startswith(("http://", "https://")):
proxy_str = f"http://{proxy_str}"
return {"server": proxy_str}
except Exception as e:
app.logger.error(f"获取代理失败: {e}")
return None
async def cleanup_session():
try:
if login_session.login_page:
await login_session.login_page.close()
if login_session.page:
await login_session.page.close()
if login_session.context:
await login_session.context.close()
if login_session.browser:
await login_session.browser.close()
if login_session.playwright:
await login_session.playwright.stop()
except Exception as e:
app.logger.warning(f"清理会话异常: {e}")
login_session.playwright = None
login_session.browser = None
login_session.context = None
login_session.page = None
login_session.login_page = None
login_session.status = "idle"
login_session.message = ""
async def inject_stealth(context):
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', { get: () => [] });
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','zh','en']});
""")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/qrcode.png')
def get_qrcode():
qr_path = os.path.join(STATIC_DIR, 'qrcode.png')
if os.path.exists(qr_path):
return send_file(qr_path, mimetype='image/png')
return "二维码未生成,请先点击获取二维码", 404
@app.route('/api/start_qr', methods=['POST'])
def start_qr():
if not login_session.lock.acquire(blocking=False):
return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"})
try:
data = request.get_json() or {}
proxy_api = data.get('proxy_api', '').strip()
result = run_async_sync(_start_qr(proxy_api))
return result
finally:
login_session.lock.release()
async def _start_qr(proxy_api):
await cleanup_session()
login_session.status = "loading"
login_session.message = "正在启动浏览器..."
login_session.proxy_used = proxy_api if proxy_api else "默认IP"
try:
login_session.playwright = await async_playwright().start()
launch_options = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--font-render-hinting=medium"
]
}
proxy = get_proxy_from_api(proxy_api) if proxy_api else None
if proxy:
launch_options["proxy"] = proxy
app.logger.info(f"使用代理: {proxy['server']}")
else:
app.logger.info("使用默认 IP")
login_session.browser = await login_session.playwright.chromium.launch(**launch_options)
login_session.context = await login_session.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 720},
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
await inject_stealth(login_session.context)
login_session.page = await login_session.context.new_page()
async def handle_new_page(new_p):
app.logger.info("检测到新开登录页面")
login_session.login_page = new_p
await new_p.wait_for_load_state("domcontentloaded")
login_session.context.on("page", handle_new_page)
app.logger.info("访问抖音主页")
await login_session.page.goto("https://www.douyin.com/", wait_until="domcontentloaded", timeout=30000)
await login_session.page.wait_for_timeout(random.uniform(2000, 4000))
page_target = login_session.page
login_btn = None
try:
login_btn = await page_target.wait_for_selector(
'button:has-text("登录"), a:has-text("登录"), div:has-text("登录")',
timeout=8000
)
except:
pass
if login_btn:
await login_btn.click()
app.logger.info("点击登录按钮成功")
else:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent.trim()==="登录" && el.click) {el.click(); return true;}
}
return false;
}''')
app.logger.info("JS兜底点击登录")
await login_session.page.wait_for_timeout(random.uniform(3000, 5000))
if login_session.login_page is not None:
page_target = login_session.login_page
app.logger.info("切换到新登录标签页")
await page_target.wait_for_load_state("domcontentloaded")
await page_target.wait_for_timeout(2000)
app.logger.info("等待登录弹窗...")
try:
await page_target.wait_for_selector(
'div[role="dialog"], .auth-modal, .login-box, [class*="login"], [class*="modal"]',
timeout=15000
)
except:
pass
await page_target.wait_for_timeout(2000)
# 强制点击“二维码登录”
app.logger.info("尝试切换到二维码登录...")
for attempt in range(3):
try:
qr_tab = await page_target.wait_for_selector(
'text=二维码登录, [data-e2e="qrcode-tab"], div:has-text("二维码登录")',
timeout=5000
)
if qr_tab:
await qr_tab.click()
app.logger.info(f"✅ 点击二维码登录选项卡 (第{attempt+1}次)")
await page_target.wait_for_timeout(3000)
break
except Exception as e:
app.logger.warning(f"点击二维码登录失败 {attempt+1}: {e}")
try:
await page_target.evaluate('''()=>{
let els = document.querySelectorAll("*");
for(let el of els){
if(el.textContent && el.textContent.includes("二维码登录") && el.click){
el.click();
return true;
}
}
return false;
}''')
app.logger.info("JS点击二维码登录")
await page_target.wait_for_timeout(3000)
break
except:
pass
await page_target.wait_for_timeout(2000)
app.logger.info("开始查找二维码...")
qr_img = None
max_wait = 60
start_time = time.time()
qr_selectors = [
'div[role="dialog"] img[src*="qrcode"]',
'div[role="dialog"] img[src*="passport"]',
'.auth-modal img[src*="qrcode"]',
'.login-box img[src*="qrcode"]',
'img[src*="qrcode"]',
'img[src*="passport"]',
'img[alt*="二维码"]',
'img[class*="qrcode"]',
'.qrcode-box img',
'.login-qrcode-image img',
'div[data-e2e="qrcode"] img',
'div[class*="qr"] img',
'canvas[class*="qrcode"]',
'svg[class*="qrcode"]'
]
while time.time() - start_time < max_wait:
await page_target.wait_for_load_state("domcontentloaded")
for sel in qr_selectors:
try:
elem = await page_target.query_selector(sel)
if elem:
if 'img' in sel:
src = await elem.get_attribute('src')
if src and len(src) > 10:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
else:
qr_img = elem
app.logger.info(f"✅ 找到二维码: {sel}")
break
except:
continue
if qr_img:
break
await page_target.wait_for_timeout(1000)
if int(time.time() - start_time) % 5 == 0:
app.logger.info(f"等待二维码... {int(time.time() - start_time)}s")
# ---------- 核心改动:全屏截图 ----------
if qr_img:
try:
await qr_img.wait_for_element_state("visible", timeout=5000)
except:
pass
# 优先截取登录弹窗,没有则全屏截图
try:
dialog = await page_target.query_selector('div[role="dialog"], .auth-modal, .login-box, [class*="modal"]')
if dialog:
img_bytes = await dialog.screenshot()
app.logger.info("✅ 截取登录弹窗区域")
else:
img_bytes = await page_target.screenshot(full_page=True)
app.logger.info("✅ 全屏截图")
except Exception as e:
app.logger.warning(f"区域截图失败,使用全屏截图: {e}")
img_bytes = await page_target.screenshot(full_page=True)
# -------------------------------------
# 保存到static
qr_file_path = os.path.join(STATIC_DIR, 'qrcode.png')
with open(qr_file_path, 'wb') as f:
f.write(img_bytes)
app.logger.info(f"二维码截图已保存至 {qr_file_path}")
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "请使用抖音 App 扫码"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used,
"qr_download": "/qrcode.png"
})
else:
app.logger.error(f"{max_wait}秒未检测到二维码,返回全屏截图")
img_bytes = await page_target.screenshot(full_page=True)
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
login_session.status = "qr_ready"
login_session.message = "未检测到二维码,全屏截图请查看"
login_session.start_time = time.time()
return jsonify({
"status": "qr_ready",
"qr_image": f"data:image/png;base64,{img_base64}",
"proxy_used": login_session.proxy_used
})
except Exception as e:
app.logger.error(f"启动扫码失败: {e}", exc_info=True)
login_session.status = "error"
login_session.message = str(e)
await cleanup_session()
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/check_login', methods=['GET'])
def check_login():
if login_session.status in ["idle", "loading"]:
return jsonify({"status": login_session.status, "message": login_session.message})
if login_session.status == "error":
return jsonify({"status": "error", "message": login_session.message})
if time.time() - login_session.start_time > 300:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "登录超时,请重新发起扫码"})
try:
if not login_session.context:
run_async_sync(cleanup_session())
return jsonify({"status": "error", "message": "会话已失效"})
cookies = run_async_sync(login_session.context.cookies())
app.logger.info(f"当前Cookie数量: {len(cookies)}")
cookie_names = [c['name'] for c in cookies]
need_keys = {"sessionid_ss", "sessionid", "sid_guard", "uid_tt"}
has_valid = any(k in cookie_names for k in need_keys)
if has_valid:
login_session.cookies = cookies
login_session.status = "success"
login_session.message = "登录成功"
app.logger.info(f"✅ 登录成功,获取有效Cookie {len(cookies)}")
run_async_sync(cleanup_session())
return jsonify({
"status": "success",
"cookies": cookies,
"message": "Cookie 提取成功"
})
else:
return jsonify({"status": "scanning", "message": "等待手机抖音确认登录..."})
except Exception as e:
app.logger.error(f"检查登录异常: {e}", exc_info=True)
return jsonify({"status": "error", "message": str(e)})
@app.route('/api/test_proxy', methods=['POST'])
def test_proxy():
data = request.get_json()
proxy_api = data.get('proxy_api', '').strip()
if not proxy_api:
return jsonify({"status": "error", "message": "请提供代理 API"})
try:
proxy = get_proxy_from_api(proxy_api)
if not proxy:
return jsonify({"status": "error", "message": "获取代理失败"})
proxies = {
'http': proxy['server'],
'https': proxy['server']
}
resp = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10)
if resp.status_code == 200:
data = resp.json()
return jsonify({
"status": "success",
"ip": data.get('origin', 'unknown'),
"message": "代理可用"
})
else:
return jsonify({"status": "error", "message": f"代理响应异常: {resp.status_code}"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)})
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=5001)
+6
View File
@@ -0,0 +1,6 @@
Flask==2.3.3
playwright==1.40.0
playwright-stealth==1.0.6
requests==2.31.0
gunicorn==21.2.0
eventlet==0.33.3
+1082
View File
File diff suppressed because it is too large Load Diff