""" 文件锁 API 路由 """ from fastapi import APIRouter from pydantic import BaseModel from typing import List, Optional import time router = APIRouter() locks_db = [ { "file_path": "src/main.py", "agent_id": "claude-001", "agent_name": "Claude Code", "locked_at": time.time() - 3600 }, { "file_path": "src/utils.py", "agent_id": "kimi-001", "agent_name": "Kimi CLI", "locked_at": time.time() - 1800 } ] class FileLock(BaseModel): file_path: str agent_id: str agent_name: str = "" locked_at: float def format_elapsed(locked_at: float) -> str: """格式化已锁定时间""" elapsed = time.time() - locked_at if elapsed < 60: return f"{int(elapsed)}秒" elif elapsed < 3600: return f"{int(elapsed / 60)}分钟" else: return f"{elapsed / 3600:.1f}小时" @router.get("") @router.get("/") async def list_locks(): """获取所有文件锁列表""" locks_with_display = [] for lock in locks_db: lock_copy = lock.copy() lock_copy["elapsed_display"] = format_elapsed(lock["locked_at"]) locks_with_display.append(lock_copy) return {"locks": locks_with_display} @router.post("/acquire") async def acquire_lock(lock: FileLock): """获取文件锁""" # 检查是否已被锁定 for existing in locks_db: if existing["file_path"] == lock.file_path: return {"success": False, "message": "File already locked"} locks_db.append({ "file_path": lock.file_path, "agent_id": lock.agent_id, "agent_name": lock.agent_name or lock.agent_id, "locked_at": time.time() }) return {"success": True, "message": "Lock acquired"} @router.post("/release") async def release_lock(data: dict): """释放文件锁""" file_path = data.get("file_path", "") agent_id = data.get("agent_id", "") global locks_db locks_db = [l for l in locks_db if not (l["file_path"] == file_path and l["agent_id"] == agent_id)] return {"success": True, "message": "Lock released"} @router.get("/check") async def check_lock(file_path: str): """检查文件锁定状态""" for lock in locks_db: if lock["file_path"] == file_path: return {"file_path": file_path, "locked": True, "locked_by": lock["agent_id"]} return {"file_path": file_path, "locked": False}