Files
congsh 5c028d7952 Initial commit: snapAna 截图智能整理工具
包含 FastAPI 后端、React 前端、队列/OCR/标签/待办等完整功能。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-27 15:45:50 +08:00

123 lines
4.1 KiB
Python

"""watchdog 监听被关注的目录。
中文路径与 OneDrive 同步盘下 NTFS 事件偶发不稳,因此默认使用 PollingObserver。
"""
from __future__ import annotations
import asyncio
import threading
from pathlib import Path
from sqlalchemy import select
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from app.core.db import session_scope
from app.core.logger import get_logger
from app.core.path_utils import is_accessible_dir, path_from_storage
from app.models.watch_folder import WatchFolder
from app.services.ingest import ingest_path
from app.services.thumbnail import is_supported
logger = get_logger(__name__)
class _ScreenshotEventHandler(FileSystemEventHandler):
"""新文件 -> 入库 -> 触发分析。"""
def __init__(self, loop: asyncio.AbstractEventLoop, notify) -> None: # noqa: ANN001
self._loop = loop
self._notify = notify
def on_created(self, event: FileSystemEvent) -> None:
if event.is_directory:
return
self._handle(Path(event.src_path))
def on_moved(self, event: FileSystemEvent) -> None:
if event.is_directory:
return
self._handle(Path(getattr(event, "dest_path", event.src_path)))
def _handle(self, path: Path) -> None:
if not is_supported(path):
return
# 等待写入完成(截图工具常会先创建空文件再写入)
try:
self._wait_file_ready(path)
except FileNotFoundError:
return
with session_scope() as session:
shot = ingest_path(session, path)
if shot is not None:
asyncio.run_coroutine_threadsafe(self._notify(), self._loop)
@staticmethod
def _wait_file_ready(path: Path, retries: int = 10, interval: float = 0.3) -> None:
"""轮询直至文件大小稳定。"""
import time
last = -1
for _ in range(retries):
if not path.exists():
raise FileNotFoundError(path)
size = path.stat().st_size
if size > 0 and size == last:
return
last = size
time.sleep(interval)
class WatcherService:
"""管理多个监听目录的生命周期。"""
def __init__(self) -> None:
self._observer: PollingObserver | None = None
self._lock = threading.Lock()
self._loop: asyncio.AbstractEventLoop | None = None
self._notify_cb = None
def start(self, loop: asyncio.AbstractEventLoop, notify) -> None: # noqa: ANN001
"""根据数据库中的目录列表启动监听。"""
with self._lock:
self._loop = loop
self._notify_cb = notify
self._stop_locked()
self._observer = PollingObserver(timeout=2.0)
handler = _ScreenshotEventHandler(loop, notify)
with session_scope() as session:
folders = session.scalars(
select(WatchFolder).where(WatchFolder.enabled.is_(True))
).all()
paths = [(f.path, f.recursive) for f in folders]
for path, recursive in paths:
p = path_from_storage(path)
if not is_accessible_dir(p):
logger.warning("监听目录不存在或不可访问,跳过: %s", path)
continue
logger.info("开始监听 %s (recursive=%s)", path, recursive)
self._observer.schedule(handler, str(p), recursive=recursive)
self._observer.start()
def reload(self) -> None:
"""监听目录变更后重启。"""
if self._loop is None or self._notify_cb is None:
return
self.start(self._loop, self._notify_cb)
def stop(self) -> None:
with self._lock:
self._stop_locked()
def _stop_locked(self) -> None:
if self._observer is not None:
try:
self._observer.stop()
self._observer.join(timeout=3)
finally:
self._observer = None
watcher_service = WatcherService()