""" 代理处理模块 处理HTTP代理配置 """ from typing import Optional, Dict, Any import httpx from loguru import logger from config import settings def get_proxy_dict( use_proxy: bool, proxy_config: Optional[Dict[str, Any]] = None ) -> Optional[Dict[str, str]]: """ 获取代理配置字典 Args: use_proxy: 是否使用代理 proxy_config: 代理配置 Returns: 代理配置字典或None """ if not use_proxy: return None proxies = {} if proxy_config: http_proxy = proxy_config.get("http_proxy") https_proxy = proxy_config.get("https_proxy") else: # 使用全局默认代理 http_proxy = settings.DEFAULT_HTTP_PROXY https_proxy = settings.DEFAULT_HTTPS_PROXY if http_proxy: proxies["http://"] = http_proxy if https_proxy: proxies["https://"] = https_proxy return proxies if proxies else None def get_http_client( use_proxy: bool = False, proxy_config: Optional[Dict[str, Any]] = None, timeout: int = 60, **kwargs ) -> httpx.AsyncClient: """ 获取配置好的HTTP异步客户端 Args: use_proxy: 是否使用代理 proxy_config: 代理配置 timeout: 超时时间(秒) **kwargs: 其他httpx参数 Returns: 配置好的httpx.AsyncClient实例 """ proxies = get_proxy_dict(use_proxy, proxy_config) client_kwargs = { "timeout": httpx.Timeout(timeout), "follow_redirects": True, **kwargs } if proxies: client_kwargs["proxies"] = proxies logger.debug(f"HTTP客户端使用代理: {proxies}") return httpx.AsyncClient(**client_kwargs) async def test_proxy_connection( proxy_config: Dict[str, Any], test_url: str = "https://www.google.com" ) -> Dict[str, Any]: """ 测试代理连接是否可用 Args: proxy_config: 代理配置 test_url: 测试URL Returns: 测试结果字典,包含 success, message, latency_ms """ try: async with get_http_client( use_proxy=True, proxy_config=proxy_config, timeout=10 ) as client: import time start = time.time() response = await client.get(test_url) latency = (time.time() - start) * 1000 if response.status_code == 200: return { "success": True, "message": "代理连接正常", "latency_ms": round(latency, 2) } else: return { "success": False, "message": f"代理返回状态码: {response.status_code}", "latency_ms": round(latency, 2) } except httpx.ProxyError as e: return { "success": False, "message": f"代理连接失败: {str(e)}", "latency_ms": None } except httpx.TimeoutException: return { "success": False, "message": "代理连接超时", "latency_ms": None } except Exception as e: return { "success": False, "message": f"连接错误: {str(e)}", "latency_ms": None }