Files
xspider/xspider/storage.py
2025-10-17 17:26:46 +08:00

76 lines
2.4 KiB
Python

from __future__ import annotations
import hashlib
import json
import logging
from typing import Any, Dict, Iterable, List, Optional
from pymongo import MongoClient, UpdateOne
from .models import FlowConfig, SiteConfig, UniqueKeyMode
logger = logging.getLogger(__name__)
class MongoRepository:
def __init__(self, mongo_uri: str, database: str) -> None:
self._client = MongoClient(mongo_uri)
self._db = self._client[database]
def save_records(
self,
site: SiteConfig,
flow: FlowConfig,
records: Iterable[Dict[str, Any]],
) -> None:
records_list = list(records)
if not records_list:
logger.info("Flow %s yielded no records", flow.flow_id)
return
collection_name = f"{site.site_id}_{flow.flow_id}"
collection = self._db[collection_name]
if flow.unique_keys is UniqueKeyMode.none:
collection.insert_many(records_list)
return
operations: List[UpdateOne] = []
for record in records_list:
unique_key = self._unique_key(flow, record)
operations.append(
UpdateOne(
{"_unique": unique_key},
{
"$set": {**record, "_unique": unique_key},
"$setOnInsert": {"_site": site.site_id, "_flow": flow.flow_id},
},
upsert=True,
)
)
if operations:
result = collection.bulk_write(operations, ordered=False)
logger.info(
"Saved records for %s/%s: matched=%s upserted=%s",
site.site_id,
flow.flow_id,
result.matched_count,
len(result.upserted_ids or []),
)
def _unique_key(self, flow: FlowConfig, record: Dict[str, Any]) -> str:
if flow.unique_keys is UniqueKeyMode.custom and flow.unique_columns:
payload = {key: record.get(key) for key in flow.unique_columns}
else:
payload = record
serialized = json.dumps(payload, sort_keys=True, default=self._json_default)
return hashlib.md5(serialized.encode("utf-8")).hexdigest()
def _json_default(self, obj: Any) -> Any:
if isinstance(obj, (set, tuple)):
return list(obj)
if hasattr(obj, "isoformat"):
return obj.isoformat()
return str(obj)