147 lines
4.3 KiB
Python
147 lines
4.3 KiB
Python
|
|
"""
|
|||
|
|
基础存储服务 - 提供 JSON 文件的异步读写操作
|
|||
|
|
所有服务共享的底层存储抽象
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import asyncio
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Any, Dict, Optional
|
|||
|
|
import aiofiles
|
|||
|
|
import aiofiles.os
|
|||
|
|
|
|||
|
|
|
|||
|
|
class StorageService:
|
|||
|
|
"""异步 JSON 文件存储服务"""
|
|||
|
|
|
|||
|
|
def __init__(self, base_path: str = ".doc"):
|
|||
|
|
"""
|
|||
|
|
初始化存储服务
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
base_path: 基础存储路径,默认为 .doc
|
|||
|
|
"""
|
|||
|
|
self.base_path = Path(base_path)
|
|||
|
|
self._lock = asyncio.Lock() # 简单的内存锁,防止并发写入
|
|||
|
|
|
|||
|
|
async def ensure_dir(self, path: str) -> None:
|
|||
|
|
"""
|
|||
|
|
确保目录存在,不存在则创建
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 目录路径(相对于 base_path 或绝对路径)
|
|||
|
|
"""
|
|||
|
|
dir_path = self.base_path / path if not Path(path).is_absolute() else Path(path)
|
|||
|
|
await aiofiles.os.makedirs(dir_path, exist_ok=True)
|
|||
|
|
|
|||
|
|
async def read_json(self, path: str) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
读取 JSON 文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 文件路径(相对于 base_path 或绝对路径)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
解析后的 JSON 字典,文件不存在或为空时返回空字典
|
|||
|
|
"""
|
|||
|
|
file_path = self.base_path / path if not Path(path).is_absolute() else Path(path)
|
|||
|
|
|
|||
|
|
if not await aiofiles.os.path.exists(file_path):
|
|||
|
|
return {}
|
|||
|
|
|
|||
|
|
async with aiofiles.open(file_path, mode="r", encoding="utf-8") as f:
|
|||
|
|
content = await f.read()
|
|||
|
|
if not content.strip():
|
|||
|
|
return {}
|
|||
|
|
return json.loads(content)
|
|||
|
|
|
|||
|
|
async def write_json(self, path: str, data: Dict[str, Any]) -> None:
|
|||
|
|
"""
|
|||
|
|
写入 JSON 文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 文件路径(相对于 base_path 或绝对路径)
|
|||
|
|
data: 要写入的 JSON 数据
|
|||
|
|
"""
|
|||
|
|
file_path = self.base_path / path if not Path(path).is_absolute() else Path(path)
|
|||
|
|
|
|||
|
|
# 确保父目录存在
|
|||
|
|
await self.ensure_dir(str(file_path.parent))
|
|||
|
|
|
|||
|
|
# 使用锁防止并发写入冲突
|
|||
|
|
async with self._lock:
|
|||
|
|
async with aiofiles.open(file_path, mode="w", encoding="utf-8") as f:
|
|||
|
|
await f.write(json.dumps(data, ensure_ascii=False, indent=2))
|
|||
|
|
|
|||
|
|
async def append_json_list(self, path: str, item: Any) -> None:
|
|||
|
|
"""
|
|||
|
|
向 JSON 数组文件追加一项
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 文件路径
|
|||
|
|
item: 要追加的项
|
|||
|
|
"""
|
|||
|
|
data = await self.read_json(path)
|
|||
|
|
if not isinstance(data, list):
|
|||
|
|
data = []
|
|||
|
|
data.append(item)
|
|||
|
|
await self.write_json(path, {"items": data})
|
|||
|
|
|
|||
|
|
async def delete(self, path: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
删除文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 文件路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否成功删除
|
|||
|
|
"""
|
|||
|
|
file_path = self.base_path / path if not Path(path).is_absolute() else Path(path)
|
|||
|
|
|
|||
|
|
if await aiofiles.os.path.exists(file_path):
|
|||
|
|
await aiofiles.os.remove(file_path)
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
async def exists(self, path: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
检查文件是否存在
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: 文件路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
文件是否存在
|
|||
|
|
"""
|
|||
|
|
file_path = self.base_path / path if not Path(path).is_absolute() else Path(path)
|
|||
|
|
return await aiofiles.os.path.exists(file_path)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 全局单例实例
|
|||
|
|
_storage_instance: Optional[StorageService] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _find_project_root() -> Path:
|
|||
|
|
"""查找项目根目录(包含 CLAUDE.md 的目录)"""
|
|||
|
|
current = Path.cwd()
|
|||
|
|
# 向上查找项目根目录
|
|||
|
|
for parent in [current] + list(current.parents):
|
|||
|
|
if (parent / "CLAUDE.md").exists():
|
|||
|
|
return parent
|
|||
|
|
# 如果找不到,使用当前目录的父目录(假设从 backend/ 运行)
|
|||
|
|
if current.name == "backend":
|
|||
|
|
return current.parent
|
|||
|
|
# 默认使用当前目录
|
|||
|
|
return current
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_storage() -> StorageService:
|
|||
|
|
"""获取存储服务单例,使用项目根目录下的 .doc 目录"""
|
|||
|
|
global _storage_instance
|
|||
|
|
if _storage_instance is None:
|
|||
|
|
project_root = _find_project_root()
|
|||
|
|
doc_path = project_root / ".doc"
|
|||
|
|
_storage_instance = StorageService(str(doc_path))
|
|||
|
|
return _storage_instance
|