5c028d7952
包含 FastAPI 后端、React 前端、队列/OCR/标签/待办等完整功能。 Co-authored-by: Cursor <cursoragent@cursor.com>
123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
"""watchdog 监听被关注的目录。
|
|
|
|
中文路径与 OneDrive 同步盘下 NTFS 事件偶发不稳,因此默认使用 PollingObserver。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import select
|
|
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
|
from watchdog.observers.polling import PollingObserver
|
|
|
|
from app.core.db import session_scope
|
|
from app.core.logger import get_logger
|
|
from app.core.path_utils import is_accessible_dir, path_from_storage
|
|
from app.models.watch_folder import WatchFolder
|
|
from app.services.ingest import ingest_path
|
|
from app.services.thumbnail import is_supported
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class _ScreenshotEventHandler(FileSystemEventHandler):
|
|
"""新文件 -> 入库 -> 触发分析。"""
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, notify) -> None: # noqa: ANN001
|
|
self._loop = loop
|
|
self._notify = notify
|
|
|
|
def on_created(self, event: FileSystemEvent) -> None:
|
|
if event.is_directory:
|
|
return
|
|
self._handle(Path(event.src_path))
|
|
|
|
def on_moved(self, event: FileSystemEvent) -> None:
|
|
if event.is_directory:
|
|
return
|
|
self._handle(Path(getattr(event, "dest_path", event.src_path)))
|
|
|
|
def _handle(self, path: Path) -> None:
|
|
if not is_supported(path):
|
|
return
|
|
# 等待写入完成(截图工具常会先创建空文件再写入)
|
|
try:
|
|
self._wait_file_ready(path)
|
|
except FileNotFoundError:
|
|
return
|
|
with session_scope() as session:
|
|
shot = ingest_path(session, path)
|
|
if shot is not None:
|
|
asyncio.run_coroutine_threadsafe(self._notify(), self._loop)
|
|
|
|
@staticmethod
|
|
def _wait_file_ready(path: Path, retries: int = 10, interval: float = 0.3) -> None:
|
|
"""轮询直至文件大小稳定。"""
|
|
import time
|
|
|
|
last = -1
|
|
for _ in range(retries):
|
|
if not path.exists():
|
|
raise FileNotFoundError(path)
|
|
size = path.stat().st_size
|
|
if size > 0 and size == last:
|
|
return
|
|
last = size
|
|
time.sleep(interval)
|
|
|
|
|
|
class WatcherService:
|
|
"""管理多个监听目录的生命周期。"""
|
|
|
|
def __init__(self) -> None:
|
|
self._observer: PollingObserver | None = None
|
|
self._lock = threading.Lock()
|
|
self._loop: asyncio.AbstractEventLoop | None = None
|
|
self._notify_cb = None
|
|
|
|
def start(self, loop: asyncio.AbstractEventLoop, notify) -> None: # noqa: ANN001
|
|
"""根据数据库中的目录列表启动监听。"""
|
|
with self._lock:
|
|
self._loop = loop
|
|
self._notify_cb = notify
|
|
self._stop_locked()
|
|
self._observer = PollingObserver(timeout=2.0)
|
|
handler = _ScreenshotEventHandler(loop, notify)
|
|
with session_scope() as session:
|
|
folders = session.scalars(
|
|
select(WatchFolder).where(WatchFolder.enabled.is_(True))
|
|
).all()
|
|
paths = [(f.path, f.recursive) for f in folders]
|
|
for path, recursive in paths:
|
|
p = path_from_storage(path)
|
|
if not is_accessible_dir(p):
|
|
logger.warning("监听目录不存在或不可访问,跳过: %s", path)
|
|
continue
|
|
logger.info("开始监听 %s (recursive=%s)", path, recursive)
|
|
self._observer.schedule(handler, str(p), recursive=recursive)
|
|
self._observer.start()
|
|
|
|
def reload(self) -> None:
|
|
"""监听目录变更后重启。"""
|
|
if self._loop is None or self._notify_cb is None:
|
|
return
|
|
self.start(self._loop, self._notify_cb)
|
|
|
|
def stop(self) -> None:
|
|
with self._lock:
|
|
self._stop_locked()
|
|
|
|
def _stop_locked(self) -> None:
|
|
if self._observer is not None:
|
|
try:
|
|
self._observer.stop()
|
|
self._observer.join(timeout=3)
|
|
finally:
|
|
self._observer = None
|
|
|
|
|
|
watcher_service = WatcherService()
|