Files
douyin_cookie_yunsya/backend/sso/qr_login.py
T
travel 9e0644095f 重构为 HTTP SSO 扫码方案并引入 Vue3 前端
移除 Playwright 浏览器自动化,改用 passport/SSO HTTP 接口获取二维码与轮询登录;后端模块化拆分,前端替换为 Vue3 SPA。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-25 10:47:55 +08:00

414 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import time
import uuid
from typing import Any
import requests
from backend.config import (
DOUYIN_AID,
DOUYIN_REFERER,
DOUYIN_SERVICE,
QR_POLL_INTERVAL,
USER_AGENT,
)
from backend.cookies import cookies_to_json, extract_nickname, is_logged_in, match_login_cookies
from backend.proxy import get_proxy_from_api
from backend.session import login_session
logger = logging.getLogger(__name__)
SSO_GET_QR = "https://sso.douyin.com/get_qrcode/"
SSO_CHECK_QR = "https://sso.douyin.com/check_qrconnect/"
PASSPORT_GET_QR = "https://www.douyin.com/passport/web/get_qrcode/"
PASSPORT_CHECK_QR = "https://www.douyin.com/passport/web/check_qrconnect/"
TTWID_REGISTER = "https://ttwid.bytedance.com/ttwid/union/register/"
USER_INFO_URL = "https://www.douyin.com/aweme/v1/web/user/profile/self/"
class DouyinSSOError(Exception):
def __init__(self, message, error_code=None):
super().__init__(message)
self.error_code = error_code
class DouyinSSOClient:
"""纯 HTTP 抖音扫码登录(sso.douyin.com / passport API"""
def __init__(self, proxy_server: str | None = None):
self.session = requests.Session()
self.proxy_server = proxy_server
if proxy_server:
self.session.proxies = {"http": proxy_server, "https": proxy_server}
self.token = ""
self.qr_params: dict[str, str] = {}
self.check_url_base = SSO_CHECK_QR
self.api_mode = "sso"
def _headers(self, referer: str | None = None) -> dict[str, str]:
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json, text/plain, */*",
"Referer": referer or DOUYIN_REFERER,
"Origin": "https://www.douyin.com",
}
csrf = self._cookie_value("passport_csrf_token")
if csrf:
headers["x-tt-passport-csrf-token"] = csrf
return headers
def _cookie_value(self, name: str) -> str | None:
for cookie in self.session.cookies:
if cookie.name == name:
return cookie.value
return None
def init_session(self):
payload = {
"region": "cn",
"aid": 1768,
"needFid": False,
"service": "www.douyin.com",
"migrate_info": {"ticket": "", "source": "web"},
"cbUrlProtocol": "https",
"union": True,
}
resp = self.session.post(
TTWID_REGISTER,
json=payload,
headers={"User-Agent": USER_AGENT, "Content-Type": "application/json"},
timeout=20,
)
resp.raise_for_status()
data = resp.json()
redirect_url = data.get("redirect_url")
if redirect_url:
self.session.get(redirect_url, headers=self._headers(), timeout=20)
self.session.get(DOUYIN_REFERER, headers=self._headers(), timeout=20)
self.session.get(
"https://www.douyin.com/passport/web/login/",
headers=self._headers(),
timeout=20,
)
logger.info("SSO 会话初始化完成,cookies=%s", [c.name for c in self.session.cookies])
def _parse_api_json(self, resp: requests.Response) -> dict:
content_type = resp.headers.get("content-type", "")
if "json" not in content_type:
snippet = resp.text[:120].replace("\n", " ")
raise DouyinSSOError(
f"接口返回非 JSON(可能被风控拦截),请配置代理后重试。响应: {snippet}"
)
try:
return resp.json()
except ValueError as e:
raise DouyinSSOError(f"解析接口响应失败: {e}") from e
def _raise_api_error(self, data: dict, context: str):
payload = data.get("data") or {}
code = payload.get("error_code", data.get("error_code"))
desc = payload.get("description") or data.get("message") or context
if code == 4031:
desc = "当前 IP 被抖音安全策略拦截,请配置住宅代理 API 后重试"
raise DouyinSSOError(desc, error_code=code)
def fetch_qrcode(self) -> dict[str, Any]:
ts = str(int(time.time() * 1000))
passport_params = {
"aid": DOUYIN_AID,
"service": DOUYIN_SERVICE,
"need_logo": "false",
"is_vcd": "1",
"t": ts,
}
resp = self.session.post(
PASSPORT_GET_QR,
params=passport_params,
headers=self._headers(),
timeout=20,
)
data = self._parse_api_json(resp)
if data.get("message") == "success" and (data.get("data") or {}).get("token"):
self.api_mode = "passport"
self.check_url_base = PASSPORT_CHECK_QR
return self._apply_qr_data(data["data"], passport_params)
if data.get("message") != "success":
logger.warning("passport get_qrcode 失败 code=%s,尝试 sso 接口", (data.get("data") or {}).get("error_code"))
sso_params = {
"aid": DOUYIN_AID,
"service": DOUYIN_SERVICE,
"is_vcd": "1",
"t": ts,
}
resp = self.session.get(
SSO_GET_QR,
params=sso_params,
headers={**self._headers(), "authority": "sso.douyin.com"},
timeout=20,
)
data = self._parse_api_json(resp)
if data.get("message") != "success":
self._raise_api_error(data, "获取二维码失败")
self.api_mode = "sso"
self.check_url_base = SSO_CHECK_QR
return self._apply_qr_data(data["data"], sso_params)
def _apply_qr_data(self, qr_data: dict, base_params: dict) -> dict[str, Any]:
token = qr_data.get("token")
qrcode_b64 = qr_data.get("qrcode")
if not token or not qrcode_b64:
raise DouyinSSOError("接口未返回 token 或二维码图片")
self.token = token
self.qr_params = dict(base_params)
return {
"token": token,
"qrcode_b64": qrcode_b64,
"qrcode_index_url": qr_data.get("qrcode_index_url", ""),
"app_name": qr_data.get("app_name", "抖音"),
}
def check_qr_status(self) -> dict[str, Any]:
if not self.token:
raise DouyinSSOError("缺少 QR token,请重新获取二维码")
params = {**self.qr_params, "token": self.token, "t": str(int(time.time() * 1000))}
resp = self.session.get(
self.check_url_base,
params=params,
headers=self._headers(),
timeout=20,
)
data = self._parse_api_json(resp)
if data.get("message") != "success":
self._raise_api_error(data, "轮询扫码状态失败")
qr_data = data.get("data") or {}
status = str(qr_data.get("status", "")).lower()
result = {
"raw_status": status,
"redirect_url": qr_data.get("redirect_url", ""),
}
if status in ("1", "new"):
result["phase"] = "waiting"
result["message"] = "请使用抖音 App 扫码"
elif status in ("2", "scanned"):
result["phase"] = "scanned"
result["message"] = "已扫码,请在手机上确认登录"
elif status in ("3", "confirmed"):
result["phase"] = "confirmed"
result["message"] = "已确认,正在完成登录..."
elif status in ("5", "expired"):
result["phase"] = "expired"
result["message"] = "二维码已过期,请重新获取"
else:
result["phase"] = "unknown"
result["message"] = f"未知状态: {status}"
return result
def complete_login(self, redirect_url: str):
if not redirect_url:
raise DouyinSSOError("缺少 redirect_url")
self.session.get(redirect_url, headers=self._headers(), timeout=20, allow_redirects=True)
self.session.get(DOUYIN_REFERER, headers=self._headers(), timeout=20)
def get_cookies_json(self) -> list[dict]:
return cookies_to_json(self.session.cookies)
def fetch_nickname(self) -> str:
try:
resp = self.session.get(
USER_INFO_URL,
params={"aid": DOUYIN_AID, "t": str(int(time.time() * 1000))},
headers=self._headers(),
timeout=15,
)
data = resp.json()
user = (data.get("user") or data.get("data") or {})
if user.get("nickname"):
return user["nickname"]
except Exception as e:
logger.debug("获取昵称失败: %s", e)
cookies = self.get_cookies_json()
return extract_nickname(cookies)
def _proxy_server_from_api(proxy_api: str) -> str | None:
if not proxy_api:
return None
proxy = get_proxy_from_api(proxy_api)
return proxy["server"] if proxy else None
def start_qr_login(proxy_api: str = ""):
login_session.reset()
login_session.session_id = uuid.uuid4().hex[:12]
login_session.status = "loading"
login_session.message = "正在通过 SSO 接口获取二维码..."
login_session.proxy_used = proxy_api or "默认IP"
login_session.start_time = time.time()
try:
proxy_server = _proxy_server_from_api(proxy_api)
client = DouyinSSOClient(proxy_server=proxy_server)
client.init_session()
qr = client.fetch_qrcode()
login_session.sso_client = client
login_session.status = "qr_ready"
login_session.message = "请使用抖音 App 扫码"
img_b64 = qr["qrcode_b64"]
if not img_b64.startswith("data:"):
img_data_url = f"data:image/png;base64,{img_b64}"
else:
img_data_url = img_b64
return {
"status": "qr_ready",
"session_id": login_session.session_id,
"qr_image": img_data_url,
"qr_download": "/qrcode.png",
"proxy_used": login_session.proxy_used,
"api_mode": client.api_mode,
"message": "二维码已通过 HTTP 接口获取,无需启动浏览器",
}, img_b64
except DouyinSSOError as e:
login_session.status = "error"
login_session.message = str(e)
login_session.sso_client = None
return {"status": "error", "message": str(e), "error_code": e.error_code}, None
except Exception as e:
logger.error("SSO 获取二维码失败: %s", e, exc_info=True)
login_session.status = "error"
login_session.message = str(e)
login_session.sso_client = None
return {"status": "error", "message": str(e)}, None
def check_login_state() -> dict[str, Any]:
if not login_session.sso_client:
return {"status": "error", "message": "会话已失效,请重新获取二维码"}
if time.time() - login_session.start_time > login_session.login_timeout:
cleanup_session()
return {"status": "error", "message": "登录超时,请重新发起扫码"}
client: DouyinSSOClient = login_session.sso_client
cookies = client.get_cookies_json()
cookie_names = [c["name"] for c in cookies]
login_matched = match_login_cookies(cookie_names)
if is_logged_in(cookies, cookie_names):
nickname = client.fetch_nickname()
login_session.cookies = cookies
login_session.nickname = nickname
login_session.status = "success"
login_session.message = f"{nickname},登录成功!"
login_session.sso_client = None
logger.info("HTTP SSO 登录成功 [%s] %s", login_session.session_id, nickname)
return {
"status": "success",
"session_id": login_session.session_id,
"nickname": nickname,
"cookies": cookies,
"message": login_session.message,
}
try:
poll = client.check_qr_status()
except DouyinSSOError as e:
return {"status": "error", "message": str(e), "error_code": e.error_code}
phase = poll["phase"]
if phase == "expired":
login_session.status = "error"
login_session.message = poll["message"]
return {"status": "error", "message": poll["message"]}
if phase == "confirmed":
try:
client.complete_login(poll["redirect_url"])
except Exception as e:
logger.warning("跟随 redirect_url 失败: %s", e)
cookies = client.get_cookies_json()
cookie_names = [c["name"] for c in cookies]
if is_logged_in(cookies, cookie_names):
nickname = client.fetch_nickname()
login_session.cookies = cookies
login_session.nickname = nickname
login_session.status = "success"
login_session.message = f"{nickname},登录成功!"
login_session.sso_client = None
return {
"status": "success",
"session_id": login_session.session_id,
"nickname": nickname,
"cookies": cookies,
"message": login_session.message,
}
login_session.status = "scanning"
return {
"status": "scanning",
"message": "已确认,正在同步 Cookie...",
"cookie_count": len(cookies),
"login_matched": len(login_matched),
}
if phase == "scanned":
login_session.status = "scanning"
return {
"status": "scanning",
"message": poll["message"],
"cookie_count": len(cookies),
"login_matched": len(login_matched),
"qr_visible": False,
}
login_session.status = "qr_ready"
return {
"status": "qr_ready",
"message": poll["message"],
"cookie_count": len(cookies),
"login_matched": len(login_matched),
"qr_visible": True,
}
def cleanup_session():
login_session.sso_client = None
def debug_snapshot() -> dict[str, Any]:
client = login_session.sso_client
result = {
"session_id": login_session.session_id,
"status": login_session.status,
"api_mode": getattr(client, "api_mode", None) if client else None,
"has_session": client is not None,
"poll_interval": QR_POLL_INTERVAL,
"timestamp": time.time(),
}
if client:
cookies = client.get_cookies_json()
result["cookie_count"] = len(cookies)
result["cookie_names"] = [c["name"] for c in cookies]
result["token_prefix"] = (client.token or "")[:8]
try:
poll = client.check_qr_status()
result["qr_phase"] = poll.get("phase")
result["qr_message"] = poll.get("message")
except Exception as e:
result["poll_error"] = str(e)
elif login_session.cookies:
result["cookie_count"] = len(login_session.cookies)
result["cookie_names"] = [c["name"] for c in login_session.cookies]
return result