Files
AIChatRoom/backend/utils/proxy_handler.py
Claude Code edbddf855d feat: AI聊天室多Agent协作讨论平台
- 实现Agent管理,支持AI辅助生成系统提示词
- 支持多个AI提供商(OpenRouter、智谱、MiniMax等)
- 实现聊天室和讨论引擎
- WebSocket实时消息推送
- 前端使用React + Ant Design
- 后端使用FastAPI + MongoDB

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 19:20:02 +08:00

136 lines
3.4 KiB
Python

"""
代理处理模块
处理HTTP代理配置
"""
from typing import Optional, Dict, Any
import httpx
from loguru import logger
from config import settings
def get_proxy_dict(
use_proxy: bool,
proxy_config: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, str]]:
"""
获取代理配置字典
Args:
use_proxy: 是否使用代理
proxy_config: 代理配置
Returns:
代理配置字典或None
"""
if not use_proxy:
return None
proxies = {}
if proxy_config:
http_proxy = proxy_config.get("http_proxy")
https_proxy = proxy_config.get("https_proxy")
else:
# 使用全局默认代理
http_proxy = settings.DEFAULT_HTTP_PROXY
https_proxy = settings.DEFAULT_HTTPS_PROXY
if http_proxy:
proxies["http://"] = http_proxy
if https_proxy:
proxies["https://"] = https_proxy
return proxies if proxies else None
def get_http_client(
use_proxy: bool = False,
proxy_config: Optional[Dict[str, Any]] = None,
timeout: int = 60,
**kwargs
) -> httpx.AsyncClient:
"""
获取配置好的HTTP异步客户端
Args:
use_proxy: 是否使用代理
proxy_config: 代理配置
timeout: 超时时间(秒)
**kwargs: 其他httpx参数
Returns:
配置好的httpx.AsyncClient实例
"""
proxies = get_proxy_dict(use_proxy, proxy_config)
client_kwargs = {
"timeout": httpx.Timeout(timeout),
"follow_redirects": True,
**kwargs
}
if proxies:
client_kwargs["proxies"] = proxies
logger.debug(f"HTTP客户端使用代理: {proxies}")
return httpx.AsyncClient(**client_kwargs)
async def test_proxy_connection(
proxy_config: Dict[str, Any],
test_url: str = "https://www.google.com"
) -> Dict[str, Any]:
"""
测试代理连接是否可用
Args:
proxy_config: 代理配置
test_url: 测试URL
Returns:
测试结果字典,包含 success, message, latency_ms
"""
try:
async with get_http_client(
use_proxy=True,
proxy_config=proxy_config,
timeout=10
) as client:
import time
start = time.time()
response = await client.get(test_url)
latency = (time.time() - start) * 1000
if response.status_code == 200:
return {
"success": True,
"message": "代理连接正常",
"latency_ms": round(latency, 2)
}
else:
return {
"success": False,
"message": f"代理返回状态码: {response.status_code}",
"latency_ms": round(latency, 2)
}
except httpx.ProxyError as e:
return {
"success": False,
"message": f"代理连接失败: {str(e)}",
"latency_ms": None
}
except httpx.TimeoutException:
return {
"success": False,
"message": "代理连接超时",
"latency_ms": None
}
except Exception as e:
return {
"success": False,
"message": f"连接错误: {str(e)}",
"latency_ms": None
}