Files

116 lines
3.7 KiB
Python
Raw Permalink Normal View History

"""
文件锁服务 - 管理 Agent 对文件的访问锁
"""
import asyncio
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from .storage import get_storage
@dataclass
class LockInfo:
"""文件锁信息"""
file_path: str
agent_id: str
acquired_at: str
agent_name: str = ""
@property
def elapsed_seconds(self) -> int:
acquired_time = datetime.fromisoformat(self.acquired_at)
return int((datetime.now() - acquired_time).total_seconds())
@property
def elapsed_display(self) -> str:
seconds = self.elapsed_seconds
if seconds < 60:
return f"{seconds}s ago"
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}m {secs:02d}s ago"
class FileLockService:
"""文件锁服务"""
LOCKS_FILE = "cache/file_locks.json"
LOCK_TIMEOUT = 300
def __init__(self):
self._storage = get_storage()
self._lock = asyncio.Lock()
async def _load_locks(self) -> Dict[str, Dict]:
return await self._storage.read_json(self.LOCKS_FILE)
async def _save_locks(self, locks: Dict[str, Dict]) -> None:
await self._storage.write_json(self.LOCKS_FILE, locks)
def _is_expired(self, lock_data: Dict) -> bool:
acquired_at = datetime.fromisoformat(lock_data["acquired_at"])
return (datetime.now() - acquired_at).total_seconds() >= self.LOCK_TIMEOUT
async def _cleanup_expired(self, locks: Dict[str, Dict]) -> Dict[str, Dict]:
return {k: v for k, v in locks.items() if not self._is_expired(v)}
async def acquire_lock(self, file_path: str, agent_id: str, agent_name: str = "") -> bool:
async with self._lock:
locks = await self._cleanup_expired(await self._load_locks())
if file_path in locks and locks[file_path]["agent_id"] != agent_id:
return False
locks[file_path] = asdict(LockInfo(
file_path=file_path,
agent_id=agent_id,
acquired_at=datetime.now().isoformat(),
agent_name=agent_name
))
await self._save_locks(locks)
return True
async def release_lock(self, file_path: str, agent_id: str) -> bool:
async with self._lock:
locks = await self._load_locks()
if file_path not in locks or locks[file_path]["agent_id"] != agent_id:
return False
del locks[file_path]
await self._save_locks(locks)
return True
async def get_locks(self) -> List[LockInfo]:
locks = await self._cleanup_expired(await self._load_locks())
return [LockInfo(**data) for data in locks.values()]
async def check_locked(self, file_path: str) -> Optional[str]:
locks = await self._cleanup_expired(await self._load_locks())
return locks.get(file_path, {}).get("agent_id")
async def get_agent_locks(self, agent_id: str) -> List[LockInfo]:
return [lock for lock in await self.get_locks() if lock.agent_id == agent_id]
async def release_all_agent_locks(self, agent_id: str) -> int:
async with self._lock:
locks = await self._load_locks()
to_remove = [k for k, v in locks.items() if v["agent_id"] == agent_id]
for k in to_remove:
del locks[k]
await self._save_locks(locks)
return len(to_remove)
# 简化单例实现
_file_lock_service: Optional[FileLockService] = None
def get_file_lock_service() -> FileLockService:
global _file_lock_service
if _file_lock_service is None:
_file_lock_service = FileLockService()
return _file_lock_service