""" RapidOCR HTTP Server 为 Node.js 后端提供 OCR API 服务 """ from rapidocr_onnxruntime import RapidOCR from http.server import HTTPServer, BaseHTTPRequestHandler import json import base64 import numpy as np from io import BytesIO from PIL import Image import cv2 # 初始化 RapidOCR ocr = RapidOCR() class OCRHandler(BaseHTTPRequestHandler): def _set_cors(self): """设置 CORS""" self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') def _send_json(self, data, status=200): """发送 JSON 响应""" self.send_response(status) self.send_header('Content-Type', 'application/json') self._set_cors() self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def do_OPTIONS(self): """处理 OPTIONS 请求""" self.send_response(200) self._set_cors() self.end_headers() def do_GET(self): """健康检查""" if self.path == '/health': self._send_json({"status": "ok", "service": "rapidocr"}) else: self._send_json({"error": "Not found"}, 404) def do_POST(self): """处理 OCR 请求""" if self.path == '/ocr': try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) # 获取图片数据 images = data.get('images', []) if not images: self._send_json({"error": "No images provided"}, 400) return # 处理第一张图片 image_base64 = images[0] image_data = base64.b64decode(image_base64) # 转换为 OpenCV 格式 nparr = np.frombuffer(image_data, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: self._send_json({"error": "Failed to decode image"}, 400) return # 执行 OCR result, _ = ocr(img) # 格式化结果 ocr_results = [] for line in result: ocr_results.append({ "text": line[0], "score": float(line[1]), "box": line[2] }) # 计算平均置信度 avg_confidence = 0 if ocr_results: avg_confidence = sum(r["score"] for r in ocr_results) / len(ocr_results) self._send_json({ "code": 200, "msg": "success", "data": ocr_results }) except Exception as e: self._send_json({"error": str(e)}, 500) else: self._send_json({"error": "Not found"}, 404) def log_message(self, format, *args): """减少日志输出""" pass def run_server(port=8080): """启动服务器""" server_address = ('', port) httpd = HTTPServer(server_address, OCRHandler) print(f"RapidOCR Server running on port {port}") print(f"Health check: http://localhost:{port}/health") print(f"OCR endpoint: http://localhost:{port}/ocr") httpd.serve_forever() if __name__ == '__main__': run_server()