9e0644095f
移除 Playwright 浏览器自动化,改用 passport/SSO HTTP 接口获取二维码与轮询登录;后端模块化拆分,前端替换为 Vue3 SPA。 Co-authored-by: Cursor <cursoragent@cursor.com>
140 lines
4.6 KiB
Python
140 lines
4.6 KiB
Python
import base64
|
|
import logging
|
|
import os
|
|
import time
|
|
|
|
from flask import Blueprint, jsonify, request, send_file
|
|
|
|
from backend.config import LOGIN_TIMEOUT, STATIC_DIR
|
|
from backend.proxy import test_proxy_api
|
|
from backend.session import login_session
|
|
from backend.sso.qr_login import check_login_state, cleanup_session, debug_snapshot, start_qr_login
|
|
|
|
api_bp = Blueprint("api", __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
login_session.login_timeout = LOGIN_TIMEOUT
|
|
|
|
|
|
def _save_qr_png(qrcode_b64: str):
|
|
if not qrcode_b64:
|
|
return
|
|
raw = qrcode_b64.split(",", 1)[-1]
|
|
img_bytes = base64.b64decode(raw)
|
|
path = os.path.join(STATIC_DIR, "qrcode.png")
|
|
with open(path, "wb") as f:
|
|
f.write(img_bytes)
|
|
|
|
|
|
@api_bp.route("/api/start_qr", methods=["POST"])
|
|
def start_qr():
|
|
if not login_session.lock.acquire(blocking=False):
|
|
return jsonify({"status": "error", "message": "已有正在进行的扫码任务,请稍后重试"})
|
|
try:
|
|
data = request.get_json() or {}
|
|
proxy_api = data.get("proxy_api", "").strip()
|
|
result, qr_b64 = start_qr_login(proxy_api)
|
|
if qr_b64:
|
|
try:
|
|
_save_qr_png(qr_b64)
|
|
except Exception as e:
|
|
logger.warning("保存二维码图片失败: %s", e)
|
|
return jsonify(result)
|
|
finally:
|
|
login_session.lock.release()
|
|
|
|
|
|
@api_bp.route("/api/check_login", methods=["GET"])
|
|
def check_login():
|
|
req_sid = request.args.get("session_id", "")
|
|
if req_sid and login_session.session_id and req_sid != login_session.session_id:
|
|
return jsonify({"status": "idle", "message": "会话已过期,请重新获取二维码"})
|
|
|
|
if login_session.status in ("idle", "loading"):
|
|
return jsonify({"status": login_session.status, "message": login_session.message})
|
|
|
|
if login_session.status == "error":
|
|
return jsonify({"status": "error", "message": login_session.message})
|
|
|
|
if login_session.status == "success" and login_session.cookies:
|
|
return jsonify({
|
|
"status": "success",
|
|
"session_id": login_session.session_id,
|
|
"nickname": login_session.nickname,
|
|
"cookies": login_session.cookies,
|
|
"message": login_session.message,
|
|
})
|
|
|
|
try:
|
|
return jsonify(check_login_state())
|
|
except Exception as e:
|
|
logger.error("检查登录异常: %s", e, exc_info=True)
|
|
return jsonify({"status": "error", "message": str(e)})
|
|
|
|
|
|
@api_bp.route("/api/test_proxy", methods=["POST"])
|
|
def test_proxy():
|
|
data = request.get_json() or {}
|
|
proxy_api = data.get("proxy_api", "").strip()
|
|
if not proxy_api:
|
|
return jsonify({"status": "error", "message": "请提供代理 API"})
|
|
try:
|
|
return jsonify(test_proxy_api(proxy_api))
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)})
|
|
|
|
|
|
@api_bp.route("/api/status", methods=["GET"])
|
|
def api_status():
|
|
elapsed = ""
|
|
if login_session.status in ("qr_ready", "scanning", "success") and login_session.start_time:
|
|
elapsed_sec = int(time.time() - login_session.start_time)
|
|
elapsed = f" (已等待 {elapsed_sec}s)"
|
|
client = login_session.sso_client
|
|
return jsonify({
|
|
"status": login_session.status,
|
|
"message": login_session.message + elapsed,
|
|
"session_id": login_session.session_id or "",
|
|
"nickname": login_session.nickname or "",
|
|
"proxy_used": login_session.proxy_used,
|
|
"api_mode": getattr(client, "api_mode", None) if client else None,
|
|
"login_timeout": LOGIN_TIMEOUT,
|
|
})
|
|
|
|
|
|
@api_bp.route("/api/reset", methods=["POST"])
|
|
def api_reset():
|
|
cleanup_session()
|
|
login_session.reset()
|
|
login_session.message = "会话已重置"
|
|
return jsonify({"status": "success", "message": "会话已重置"})
|
|
|
|
|
|
@api_bp.route("/api/health", methods=["GET"])
|
|
def api_health():
|
|
client = login_session.sso_client
|
|
return jsonify({
|
|
"status": "ok",
|
|
"session_status": login_session.status,
|
|
"session_id": login_session.session_id or "",
|
|
"api_mode": getattr(client, "api_mode", None) if client else None,
|
|
"engine": "http_sso",
|
|
"timestamp": time.time(),
|
|
})
|
|
|
|
|
|
@api_bp.route("/api/debug", methods=["GET"])
|
|
def api_debug():
|
|
try:
|
|
return jsonify(debug_snapshot())
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)})
|
|
|
|
|
|
@api_bp.route("/qrcode.png")
|
|
def get_qrcode():
|
|
qr_path = os.path.join(STATIC_DIR, "qrcode.png")
|
|
if os.path.exists(qr_path):
|
|
return send_file(qr_path, mimetype="image/png")
|
|
return "二维码未生成,请先点击获取二维码", 404
|