Files
multiAgentTry/backend/app/routers/locks.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

89 lines
2.4 KiB
Python

"""
文件锁 API 路由
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Optional
import time
router = APIRouter()
locks_db = [
{
"file_path": "src/main.py",
"agent_id": "claude-001",
"agent_name": "Claude Code",
"locked_at": time.time() - 3600
},
{
"file_path": "src/utils.py",
"agent_id": "kimi-001",
"agent_name": "Kimi CLI",
"locked_at": time.time() - 1800
}
]
class FileLock(BaseModel):
file_path: str
agent_id: str
agent_name: str = ""
locked_at: float
def format_elapsed(locked_at: float) -> str:
"""格式化已锁定时间"""
elapsed = time.time() - locked_at
if elapsed < 60:
return f"{int(elapsed)}"
elif elapsed < 3600:
return f"{int(elapsed / 60)}分钟"
else:
return f"{elapsed / 3600:.1f}小时"
@router.get("/")
async def list_locks():
"""获取所有文件锁列表"""
locks_with_display = []
for lock in locks_db:
lock_copy = lock.copy()
lock_copy["elapsed_display"] = format_elapsed(lock["locked_at"])
locks_with_display.append(lock_copy)
return {"locks": locks_with_display}
@router.post("/acquire")
async def acquire_lock(lock: FileLock):
"""获取文件锁"""
# 检查是否已被锁定
for existing in locks_db:
if existing["file_path"] == lock.file_path:
return {"success": False, "message": "File already locked"}
locks_db.append({
"file_path": lock.file_path,
"agent_id": lock.agent_id,
"agent_name": lock.agent_name or lock.agent_id,
"locked_at": time.time()
})
return {"success": True, "message": "Lock acquired"}
@router.post("/release")
async def release_lock(data: dict):
"""释放文件锁"""
file_path = data.get("file_path", "")
agent_id = data.get("agent_id", "")
global locks_db
locks_db = [l for l in locks_db if not (l["file_path"] == file_path and l["agent_id"] == agent_id)]
return {"success": True, "message": "Lock released"}
@router.get("/check")
async def check_lock(file_path: str):
"""检查文件锁定状态"""
for lock in locks_db:
if lock["file_path"] == file_path:
return {"file_path": file_path, "locked": True, "locked_by": lock["agent_id"]}
return {"file_path": file_path, "locked": False}