diff --git a/README.md b/README.md index 5032efe..e162216 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,27 @@ python main.py - `` 用于声明需下载并解析 Excel 文件,当前实现会给出提示,具体逻辑可在 `FlowRunner._handle_excel_extract` 自行扩展。 - `MongoRepository.save_records` 会按照 `flow.unique_keys` 与 `unique_columns` 控制去重策略。 +## 任务日志 + +每次执行登录流程或业务流程都会写入 MongoDB 的 `task_logs` 集合,记录执行状态与关键步骤,方便排查问题。 + +- 基础字段: + - `_id`:UUID,全局唯一的任务编号。 + - `site_id` / `flow_id`:对应模板中的站点与流程。 + - `status`:`start`、`success`、`error`、`aborted`。 + - `started_at` / `ended_at`:UTC 时间戳,`duration_ms` 为执行耗时。 + - `steps`:执行步骤列表,包含动作类型、分页点击、抽取结果等摘要。 + - `metadata`:附加信息(如保存记录数量、分页页数、是否登录流程)。 + - `xml_source`:原始 XML 的来源地址;`xml_snapshot`:当前执行所用 XML(超出 50k 字符会截断并标记)。 + - `error` / `abort_reason`:当流程异常或被中止时记录详细信息(包含堆栈)。 +- 日志写入策略: + - 流程开始时插入 `status=start` 的文档; + - 正常结束时更新为 `success` 并补充 `steps`、`summary`、`metadata`; + - 运行中出现异常时标记为 `error` 并写入堆栈; + - 收到 `KeyboardInterrupt` 或其它不可恢复的中断时标记为 `aborted`。 + +可以根据 `site_id + flow_id + started_at` 建立索引用于快速查询,也可按 `status` 过滤待排查的任务。 + ## 开发与调试建议 - 日志级别默认 `INFO`,可自行设置环境变量 `PYTHONLOGGING` 或修改 `configure_logging`。 diff --git a/xspider/app.py b/xspider/app.py index 9e7dfca..94bf0da 100644 --- a/xspider/app.py +++ b/xspider/app.py @@ -10,7 +10,7 @@ import requests from .redis_queue import RedisConfigQueue from .runner import FlowRunner from .settings import Settings -from .storage import MongoRepository +from .storage import MongoLogRepository, MongoRepository from .variables import VariableService from .xml_parser import XMLSiteParser @@ -29,10 +29,15 @@ class TemplateCrawlerApp: self.settings.mongo_uri, self.settings.mongo_database, ) + self.log_repository = MongoLogRepository( + self.mongo.client, + self.settings.mongo_database, + ) self.variable_service = VariableService(self.settings.redis_url) self.parser = XMLSiteParser() self.runner = FlowRunner( storage=self.mongo, + log_storage=self.log_repository, variable_service=self.variable_service, ) self.http = requests.Session() @@ -56,7 +61,11 @@ class TemplateCrawlerApp: logger.info("Fetched XML location: %s", xml_location) xml_payload = self._load_xml(xml_location) site = self.parser.parse(xml_payload) - self.runner.run_site(site) + self.runner.run_site( + site, + xml_source=xml_location, + xml_content=xml_payload, + ) def _load_xml(self, location: str) -> str: logger.debug("Downloading XML from %s", location) diff --git a/xspider/runner.py b/xspider/runner.py index 055840c..4c55dd2 100644 --- a/xspider/runner.py +++ b/xspider/runner.py @@ -2,14 +2,15 @@ from __future__ import annotations import logging import time -from typing import Any, Dict, Optional +from datetime import datetime +from typing import Any, Dict, List, Optional from urllib.parse import urljoin from .actions.registry import ActionRegistry from .browser import BrowserFactory, BrowserSession, BrowserError from .extraction import Extractor from .models import FlowConfig, SiteConfig, SelectorMode, ExcelExtractConfig -from .storage import MongoRepository +from .storage import MongoLogRepository, MongoRepository from .variables import VariableResolver, VariableService from .utils.selectors import is_xpath_selector from .actions.base import ActionContext @@ -21,24 +22,47 @@ class FlowRunner: def __init__( self, storage: MongoRepository, + log_storage: MongoLogRepository, variable_service: VariableService, extractor: Optional[Extractor] = None, ) -> None: self.storage = storage + self.log_storage = log_storage self.variable_service = variable_service self.extractor = extractor or Extractor() - def run_site(self, site: SiteConfig) -> None: + def run_site( + self, + site: SiteConfig, + xml_source: Optional[str] = None, + xml_content: Optional[str] = None, + ) -> None: factory = BrowserFactory(site) resolver = VariableResolver(self.variable_service) with factory.session() as session: if site.login: logger.info("Executing login flow for site %s", site.site_id) - self._run_flow(site, site.login, session, resolver, is_login=True) + self._run_flow( + site, + site.login, + session, + resolver, + is_login=True, + xml_source=xml_source, + xml_content=xml_content, + ) for flow in site.flows: logger.info("Executing flow %s for site %s", flow.flow_id, site.site_id) - self._run_flow(site, flow, session, resolver, is_login=False) + self._run_flow( + site, + flow, + session, + resolver, + is_login=False, + xml_source=xml_source, + xml_content=xml_content, + ) def _run_flow( self, @@ -47,62 +71,203 @@ class FlowRunner: session: BrowserSession, resolver: VariableResolver, is_login: bool, + xml_source: Optional[str], + xml_content: Optional[str], ) -> None: entry_url = self._resolve_entry(site, flow) site_context = self._build_context(site, flow, entry_url) action_context = ActionContext(session, resolver, site_context) + metadata = { + "entry": flow.entry or flow.metadata.get("url"), + "data_type": flow.data_type, + "unique_keys": flow.unique_keys.value, + "unique_columns": flow.unique_columns, + "is_login_flow": is_login, + } + log_handle = self.log_storage.start( + site, + flow, + is_login=is_login, + xml_source=xml_source, + xml_snapshot=xml_content, + metadata=metadata, + ) + steps: List[Dict[str, Any]] = [] if entry_url: resolved_entry = resolver.resolve(entry_url, action_context.site_context) or entry_url action_context.site_context["entry_url"] = resolved_entry logger.debug("Flow %s navigating to %s", flow.flow_id, resolved_entry) + self._record_step( + steps, + event="goto", + url=resolved_entry, + ) session.goto(resolved_entry) - for action_config in flow.actions: - action_cls = ActionRegistry.get(action_config.type) - action = action_cls(action_config) - logger.debug("Executing action %s", action_config.type) - action.execute(action_context) + records_saved = 0 + pages_processed = 0 + try: + for action_config in flow.actions: + action_cls = ActionRegistry.get(action_config.type) + action = action_cls(action_config) + logger.debug("Executing action %s", action_config.type) + self._record_step( + steps, + event="action", + action_type=action_config.type, + selector=action_config.selector, + mode=action_config.mode.value if action_config.mode else None, + ) + action.execute(action_context) - if is_login: - selector = flow.metadata.get("selector") - if selector: - selector_mode = self._resolve_selector_mode(selector, flow.metadata) - timeout = int(flow.metadata.get("timeout_ms", 10_000)) / 1000.0 - try: - session.wait_dom_show( - resolver.resolve(selector, action_context.site_context) or selector, - selector_mode, - timeout, + if is_login: + selector = flow.metadata.get("selector") + if selector: + selector_mode = self._resolve_selector_mode(selector, flow.metadata) + timeout = int(flow.metadata.get("timeout_ms", 10_000)) / 1000.0 + try: + session.wait_dom_show( + resolver.resolve(selector, action_context.site_context) or selector, + selector_mode, + timeout, + ) + self._record_step( + steps, + event="login_check", + selector=selector, + mode=selector_mode.value, + status="found", + ) + except BrowserError as exc: + self._record_step( + steps, + event="login_check", + selector=selector, + mode=selector_mode.value, + status="missing", + ) + raise RuntimeError("Login verification selector not found.") from exc + actions_count = self._count_actions(steps) + self.log_storage.mark_success( + log_handle, + steps=steps, + summary={"actions": actions_count}, + ) + return + + if not flow.extract: + if flow.excel_extract: + self._record_step( + steps, + event="excel_extract", + pattern=flow.excel_extract.file_pattern, + directory=flow.excel_extract.directory, ) - except BrowserError: - raise RuntimeError("Login verification selector not found.") - return + self._handle_excel_extract(site, flow, session) + actions_count = self._count_actions(steps) + self.log_storage.mark_success( + log_handle, + steps=steps, + summary={ + "actions": actions_count, + "excel_extract": True, + }, + ) + else: + logger.info("Flow %s has no extract step; skipping storage.", flow.flow_id) + self._record_step( + steps, + event="no_extract", + ) + actions_count = self._count_actions(steps) + self.log_storage.mark_success( + log_handle, + steps=steps, + summary={"actions": actions_count}, + ) + return - if not flow.extract: - if flow.excel_extract: - self._handle_excel_extract(site, flow, session) + if flow.paginate and flow.paginate.selector: + pagination_result = self._run_paginated( + site, + flow, + session, + steps, + ) + records_saved = pagination_result.get("records", 0) + pages_processed = pagination_result.get("pages", 0) else: - logger.info("Flow %s has no extract step; skipping storage.", flow.flow_id) - return + records = list(self.extractor.extract(session.html(), flow.extract)) + self._record_step( + steps, + event="extract", + records=len(records), + ) + records_saved = self.storage.save_records(site, flow, records) - if flow.paginate and flow.paginate.selector: - self._run_paginated(site, flow, session) - else: - records = self.extractor.extract(session.html(), flow.extract) - self.storage.save_records(site, flow, records) + actions_count = self._count_actions(steps) + summary = { + "actions": actions_count, + "records_saved": records_saved, + "pages_processed": pages_processed, + } + metadata_updates = { + "records_saved": records_saved, + "pages_processed": pages_processed, + } + self.log_storage.mark_success( + log_handle, + steps=steps, + summary=summary, + metadata=metadata_updates, + ) + except Exception as exc: + metadata_updates = { + "records_saved": records_saved, + "pages_processed": pages_processed, + } + self.log_storage.mark_error( + log_handle, + exc, + steps=steps, + metadata=metadata_updates, + ) + raise + except BaseException as exc: # noqa: BLE001 + metadata_updates = { + "records_saved": records_saved, + "pages_processed": pages_processed, + } + self.log_storage.mark_aborted( + log_handle, + reason=exc.__class__.__name__, + steps=steps, + metadata=metadata_updates, + ) + raise def _run_paginated( self, site: SiteConfig, flow: FlowConfig, session: BrowserSession, - ) -> None: + steps: List[Dict[str, Any]], + ) -> Dict[str, int]: page = 0 + total_records = 0 while True: page += 1 - records = self.extractor.extract(session.html(), flow.extract) - self.storage.save_records(site, flow, records) + records = list(self.extractor.extract(session.html(), flow.extract)) + saved = self.storage.save_records(site, flow, records) + total_records += saved + self._record_step( + steps, + event="extract", + page=page, + records=len(records), + saved=saved, + ) if flow.paginate.max_pages and page >= flow.paginate.max_pages: break @@ -111,10 +276,18 @@ class FlowRunner: break try: session.click(selector, flow.paginate.mode, timeout=5) + self._record_step( + steps, + event="paginate_click", + selector=selector, + mode=flow.paginate.mode.value, + page=page, + ) except BrowserError: logger.info("Pagination stopped at page %s", page) break time.sleep(1) + return {"pages": page, "records": total_records} def _resolve_entry(self, site: SiteConfig, flow: FlowConfig) -> Optional[str]: if flow.entry: @@ -168,3 +341,19 @@ class FlowRunner: flow.flow_id, config.file_pattern, ) + + def _record_step( + self, + steps: List[Dict[str, Any]], + event: str, + **payload: Any, + ) -> None: + entry: Dict[str, Any] = { + "timestamp": datetime.utcnow().isoformat(), + "event": event, + } + entry.update({k: v for k, v in payload.items() if v is not None}) + steps.append(entry) + + def _count_actions(self, steps: List[Dict[str, Any]]) -> int: + return sum(1 for step in steps if step.get("event") == "action") diff --git a/xspider/storage.py b/xspider/storage.py index a2fc5c2..5fe5278 100644 --- a/xspider/storage.py +++ b/xspider/storage.py @@ -3,37 +3,57 @@ from __future__ import annotations import hashlib import json import logging +import traceback +import uuid +from dataclasses import dataclass +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional from pymongo import MongoClient, UpdateOne +from pymongo.collection import Collection +from pymongo.database import Database from .models import FlowConfig, SiteConfig, UniqueKeyMode logger = logging.getLogger(__name__) +@dataclass +class TaskLogHandle: + log_id: str + started_at: datetime + + class MongoRepository: def __init__(self, mongo_uri: str, database: str) -> None: self._client = MongoClient(mongo_uri) self._db = self._client[database] + @property + def client(self) -> MongoClient: + return self._client + + @property + def database(self) -> Database: + return self._db + def save_records( self, site: SiteConfig, flow: FlowConfig, records: Iterable[Dict[str, Any]], - ) -> None: + ) -> int: records_list = list(records) if not records_list: logger.info("Flow %s yielded no records", flow.flow_id) - return + return 0 collection_name = f"{site.site_id}_{flow.flow_id}" collection = self._db[collection_name] if flow.unique_keys is UniqueKeyMode.none: collection.insert_many(records_list) - return + return len(records_list) operations: List[UpdateOne] = [] for record in records_list: @@ -49,15 +69,20 @@ class MongoRepository: ) ) + matched = 0 + upserted = 0 if operations: result = collection.bulk_write(operations, ordered=False) + matched = result.matched_count + upserted = len(result.upserted_ids or []) logger.info( "Saved records for %s/%s: matched=%s upserted=%s", site.site_id, flow.flow_id, - result.matched_count, - len(result.upserted_ids or []), + matched, + upserted, ) + return matched + upserted def _unique_key(self, flow: FlowConfig, record: Dict[str, Any]) -> str: if flow.unique_keys is UniqueKeyMode.custom and flow.unique_columns: @@ -73,3 +98,137 @@ class MongoRepository: if hasattr(obj, "isoformat"): return obj.isoformat() return str(obj) + + +class MongoLogRepository: + def __init__( + self, + client: MongoClient, + database: str, + collection: str = "task_logs", + ) -> None: + self._db: Database = client[database] + self._collection: Collection = self._db[collection] + + def start( + self, + site: SiteConfig, + flow: FlowConfig, + *, + is_login: bool, + xml_source: Optional[str], + xml_snapshot: Optional[str], + metadata: Optional[Dict[str, Any]] = None, + ) -> TaskLogHandle: + log_id = str(uuid.uuid4()) + started_at = datetime.utcnow() + doc: Dict[str, Any] = { + "_id": log_id, + "site_id": site.site_id, + "flow_id": flow.flow_id, + "is_login": is_login, + "status": "start", + "started_at": started_at, + "xml_source": xml_source, + "xml_snapshot": self._truncate_xml(xml_snapshot), + "metadata": metadata or {}, + "steps": [], + } + self._collection.insert_one(doc) + return TaskLogHandle(log_id=log_id, started_at=started_at) + + def append_steps(self, handle: TaskLogHandle, steps: List[Dict[str, Any]]) -> None: + if not steps: + return + self._collection.update_one( + {"_id": handle.log_id}, + {"$push": {"steps": {"$each": steps}}}, + ) + + def mark_success( + self, + handle: TaskLogHandle, + steps: Optional[List[Dict[str, Any]]] = None, + summary: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + self._finalize( + handle, + status="success", + steps=steps, + summary=summary, + metadata=metadata, + ) + + def mark_error( + self, + handle: TaskLogHandle, + exc: Exception, + steps: Optional[List[Dict[str, Any]]] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + error_info = { + "type": exc.__class__.__name__, + "message": str(exc), + "stack": traceback.format_exc(), + } + self._finalize( + handle, + status="error", + steps=steps, + metadata=metadata, + extra_fields={"error": error_info}, + ) + + def mark_aborted( + self, + handle: TaskLogHandle, + reason: str, + steps: Optional[List[Dict[str, Any]]] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + self._finalize( + handle, + status="aborted", + steps=steps, + metadata=metadata, + extra_fields={"abort_reason": reason}, + ) + + def _finalize( + self, + handle: TaskLogHandle, + *, + status: str, + steps: Optional[List[Dict[str, Any]]] = None, + summary: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + extra_fields: Optional[Dict[str, Any]] = None, + ) -> None: + ended_at = datetime.utcnow() + duration_ms = int((ended_at - handle.started_at).total_seconds() * 1000) + update_fields: Dict[str, Any] = { + "status": status, + "ended_at": ended_at, + "duration_ms": duration_ms, + } + if steps is not None: + update_fields["steps"] = steps + if summary is not None: + update_fields["summary"] = summary + if metadata: + for key, value in metadata.items(): + update_fields[f"metadata.{key}"] = value + if extra_fields: + update_fields.update(extra_fields) + self._collection.update_one( + {"_id": handle.log_id}, + {"$set": update_fields}, + ) + + def _truncate_xml(self, xml: Optional[str], limit: int = 50_000) -> Optional[str]: + if not xml: + return None + if len(xml) <= limit: + return xml + return xml[:limit] + f"\n"