feat(storage): 引入任务日志记录功能

- 新增 MongoLogRepository 类用于记录任务执行日志
- 在 FlowRunner 中集成日志记录,跟踪流程执行步骤
- 记录登录与业务流程的开始、成功、错误与中止状态
- 支持记录 XML 来源与快照(超长内容自动截断)- 扩展 README 文档说明日志字段与使用方式
-优化分页抓取逻辑,返回页面与记录统计数据
- 统一存储接口,暴露 MongoDB 客户端与数据库属性
- 增加步骤计数与执行时间统计功能
This commit is contained in:
2025-10-20 22:38:13 +08:00
parent f8370eb85e
commit a1a13aae65
4 changed files with 421 additions and 43 deletions

View File

@@ -243,6 +243,27 @@ python main.py
- `<excel_extract>` 用于声明需下载并解析 Excel 文件,当前实现会给出提示,具体逻辑可在 `FlowRunner._handle_excel_extract` 自行扩展。
- `MongoRepository.save_records` 会按照 `flow.unique_keys``unique_columns` 控制去重策略。
## 任务日志
每次执行登录流程或业务流程都会写入 MongoDB 的 `task_logs` 集合,记录执行状态与关键步骤,方便排查问题。
- 基础字段:
- `_id`UUID全局唯一的任务编号。
- `site_id` / `flow_id`:对应模板中的站点与流程。
- `status``start``success``error``aborted`
- `started_at` / `ended_at`UTC 时间戳,`duration_ms` 为执行耗时。
- `steps`:执行步骤列表,包含动作类型、分页点击、抽取结果等摘要。
- `metadata`:附加信息(如保存记录数量、分页页数、是否登录流程)。
- `xml_source`:原始 XML 的来源地址;`xml_snapshot`:当前执行所用 XML超出 50k 字符会截断并标记)。
- `error` / `abort_reason`:当流程异常或被中止时记录详细信息(包含堆栈)。
- 日志写入策略:
- 流程开始时插入 `status=start` 的文档;
- 正常结束时更新为 `success` 并补充 `steps``summary``metadata`
- 运行中出现异常时标记为 `error` 并写入堆栈;
- 收到 `KeyboardInterrupt` 或其它不可恢复的中断时标记为 `aborted`
可以根据 `site_id + flow_id + started_at` 建立索引用于快速查询,也可按 `status` 过滤待排查的任务。
## 开发与调试建议
- 日志级别默认 `INFO`,可自行设置环境变量 `PYTHONLOGGING` 或修改 `configure_logging`

View File

@@ -10,7 +10,7 @@ import requests
from .redis_queue import RedisConfigQueue
from .runner import FlowRunner
from .settings import Settings
from .storage import MongoRepository
from .storage import MongoLogRepository, MongoRepository
from .variables import VariableService
from .xml_parser import XMLSiteParser
@@ -29,10 +29,15 @@ class TemplateCrawlerApp:
self.settings.mongo_uri,
self.settings.mongo_database,
)
self.log_repository = MongoLogRepository(
self.mongo.client,
self.settings.mongo_database,
)
self.variable_service = VariableService(self.settings.redis_url)
self.parser = XMLSiteParser()
self.runner = FlowRunner(
storage=self.mongo,
log_storage=self.log_repository,
variable_service=self.variable_service,
)
self.http = requests.Session()
@@ -56,7 +61,11 @@ class TemplateCrawlerApp:
logger.info("Fetched XML location: %s", xml_location)
xml_payload = self._load_xml(xml_location)
site = self.parser.parse(xml_payload)
self.runner.run_site(site)
self.runner.run_site(
site,
xml_source=xml_location,
xml_content=xml_payload,
)
def _load_xml(self, location: str) -> str:
logger.debug("Downloading XML from %s", location)

View File

@@ -2,14 +2,15 @@ from __future__ import annotations
import logging
import time
from typing import Any, Dict, Optional
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from .actions.registry import ActionRegistry
from .browser import BrowserFactory, BrowserSession, BrowserError
from .extraction import Extractor
from .models import FlowConfig, SiteConfig, SelectorMode, ExcelExtractConfig
from .storage import MongoRepository
from .storage import MongoLogRepository, MongoRepository
from .variables import VariableResolver, VariableService
from .utils.selectors import is_xpath_selector
from .actions.base import ActionContext
@@ -21,24 +22,47 @@ class FlowRunner:
def __init__(
self,
storage: MongoRepository,
log_storage: MongoLogRepository,
variable_service: VariableService,
extractor: Optional[Extractor] = None,
) -> None:
self.storage = storage
self.log_storage = log_storage
self.variable_service = variable_service
self.extractor = extractor or Extractor()
def run_site(self, site: SiteConfig) -> None:
def run_site(
self,
site: SiteConfig,
xml_source: Optional[str] = None,
xml_content: Optional[str] = None,
) -> None:
factory = BrowserFactory(site)
resolver = VariableResolver(self.variable_service)
with factory.session() as session:
if site.login:
logger.info("Executing login flow for site %s", site.site_id)
self._run_flow(site, site.login, session, resolver, is_login=True)
self._run_flow(
site,
site.login,
session,
resolver,
is_login=True,
xml_source=xml_source,
xml_content=xml_content,
)
for flow in site.flows:
logger.info("Executing flow %s for site %s", flow.flow_id, site.site_id)
self._run_flow(site, flow, session, resolver, is_login=False)
self._run_flow(
site,
flow,
session,
resolver,
is_login=False,
xml_source=xml_source,
xml_content=xml_content,
)
def _run_flow(
self,
@@ -47,62 +71,203 @@ class FlowRunner:
session: BrowserSession,
resolver: VariableResolver,
is_login: bool,
xml_source: Optional[str],
xml_content: Optional[str],
) -> None:
entry_url = self._resolve_entry(site, flow)
site_context = self._build_context(site, flow, entry_url)
action_context = ActionContext(session, resolver, site_context)
metadata = {
"entry": flow.entry or flow.metadata.get("url"),
"data_type": flow.data_type,
"unique_keys": flow.unique_keys.value,
"unique_columns": flow.unique_columns,
"is_login_flow": is_login,
}
log_handle = self.log_storage.start(
site,
flow,
is_login=is_login,
xml_source=xml_source,
xml_snapshot=xml_content,
metadata=metadata,
)
steps: List[Dict[str, Any]] = []
if entry_url:
resolved_entry = resolver.resolve(entry_url, action_context.site_context) or entry_url
action_context.site_context["entry_url"] = resolved_entry
logger.debug("Flow %s navigating to %s", flow.flow_id, resolved_entry)
self._record_step(
steps,
event="goto",
url=resolved_entry,
)
session.goto(resolved_entry)
for action_config in flow.actions:
action_cls = ActionRegistry.get(action_config.type)
action = action_cls(action_config)
logger.debug("Executing action %s", action_config.type)
action.execute(action_context)
records_saved = 0
pages_processed = 0
try:
for action_config in flow.actions:
action_cls = ActionRegistry.get(action_config.type)
action = action_cls(action_config)
logger.debug("Executing action %s", action_config.type)
self._record_step(
steps,
event="action",
action_type=action_config.type,
selector=action_config.selector,
mode=action_config.mode.value if action_config.mode else None,
)
action.execute(action_context)
if is_login:
selector = flow.metadata.get("selector")
if selector:
selector_mode = self._resolve_selector_mode(selector, flow.metadata)
timeout = int(flow.metadata.get("timeout_ms", 10_000)) / 1000.0
try:
session.wait_dom_show(
resolver.resolve(selector, action_context.site_context) or selector,
selector_mode,
timeout,
if is_login:
selector = flow.metadata.get("selector")
if selector:
selector_mode = self._resolve_selector_mode(selector, flow.metadata)
timeout = int(flow.metadata.get("timeout_ms", 10_000)) / 1000.0
try:
session.wait_dom_show(
resolver.resolve(selector, action_context.site_context) or selector,
selector_mode,
timeout,
)
self._record_step(
steps,
event="login_check",
selector=selector,
mode=selector_mode.value,
status="found",
)
except BrowserError as exc:
self._record_step(
steps,
event="login_check",
selector=selector,
mode=selector_mode.value,
status="missing",
)
raise RuntimeError("Login verification selector not found.") from exc
actions_count = self._count_actions(steps)
self.log_storage.mark_success(
log_handle,
steps=steps,
summary={"actions": actions_count},
)
return
if not flow.extract:
if flow.excel_extract:
self._record_step(
steps,
event="excel_extract",
pattern=flow.excel_extract.file_pattern,
directory=flow.excel_extract.directory,
)
except BrowserError:
raise RuntimeError("Login verification selector not found.")
return
self._handle_excel_extract(site, flow, session)
actions_count = self._count_actions(steps)
self.log_storage.mark_success(
log_handle,
steps=steps,
summary={
"actions": actions_count,
"excel_extract": True,
},
)
else:
logger.info("Flow %s has no extract step; skipping storage.", flow.flow_id)
self._record_step(
steps,
event="no_extract",
)
actions_count = self._count_actions(steps)
self.log_storage.mark_success(
log_handle,
steps=steps,
summary={"actions": actions_count},
)
return
if not flow.extract:
if flow.excel_extract:
self._handle_excel_extract(site, flow, session)
if flow.paginate and flow.paginate.selector:
pagination_result = self._run_paginated(
site,
flow,
session,
steps,
)
records_saved = pagination_result.get("records", 0)
pages_processed = pagination_result.get("pages", 0)
else:
logger.info("Flow %s has no extract step; skipping storage.", flow.flow_id)
return
records = list(self.extractor.extract(session.html(), flow.extract))
self._record_step(
steps,
event="extract",
records=len(records),
)
records_saved = self.storage.save_records(site, flow, records)
if flow.paginate and flow.paginate.selector:
self._run_paginated(site, flow, session)
else:
records = self.extractor.extract(session.html(), flow.extract)
self.storage.save_records(site, flow, records)
actions_count = self._count_actions(steps)
summary = {
"actions": actions_count,
"records_saved": records_saved,
"pages_processed": pages_processed,
}
metadata_updates = {
"records_saved": records_saved,
"pages_processed": pages_processed,
}
self.log_storage.mark_success(
log_handle,
steps=steps,
summary=summary,
metadata=metadata_updates,
)
except Exception as exc:
metadata_updates = {
"records_saved": records_saved,
"pages_processed": pages_processed,
}
self.log_storage.mark_error(
log_handle,
exc,
steps=steps,
metadata=metadata_updates,
)
raise
except BaseException as exc: # noqa: BLE001
metadata_updates = {
"records_saved": records_saved,
"pages_processed": pages_processed,
}
self.log_storage.mark_aborted(
log_handle,
reason=exc.__class__.__name__,
steps=steps,
metadata=metadata_updates,
)
raise
def _run_paginated(
self,
site: SiteConfig,
flow: FlowConfig,
session: BrowserSession,
) -> None:
steps: List[Dict[str, Any]],
) -> Dict[str, int]:
page = 0
total_records = 0
while True:
page += 1
records = self.extractor.extract(session.html(), flow.extract)
self.storage.save_records(site, flow, records)
records = list(self.extractor.extract(session.html(), flow.extract))
saved = self.storage.save_records(site, flow, records)
total_records += saved
self._record_step(
steps,
event="extract",
page=page,
records=len(records),
saved=saved,
)
if flow.paginate.max_pages and page >= flow.paginate.max_pages:
break
@@ -111,10 +276,18 @@ class FlowRunner:
break
try:
session.click(selector, flow.paginate.mode, timeout=5)
self._record_step(
steps,
event="paginate_click",
selector=selector,
mode=flow.paginate.mode.value,
page=page,
)
except BrowserError:
logger.info("Pagination stopped at page %s", page)
break
time.sleep(1)
return {"pages": page, "records": total_records}
def _resolve_entry(self, site: SiteConfig, flow: FlowConfig) -> Optional[str]:
if flow.entry:
@@ -168,3 +341,19 @@ class FlowRunner:
flow.flow_id,
config.file_pattern,
)
def _record_step(
self,
steps: List[Dict[str, Any]],
event: str,
**payload: Any,
) -> None:
entry: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
}
entry.update({k: v for k, v in payload.items() if v is not None})
steps.append(entry)
def _count_actions(self, steps: List[Dict[str, Any]]) -> int:
return sum(1 for step in steps if step.get("event") == "action")

View File

@@ -3,37 +3,57 @@ from __future__ import annotations
import hashlib
import json
import logging
import traceback
import uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional
from pymongo import MongoClient, UpdateOne
from pymongo.collection import Collection
from pymongo.database import Database
from .models import FlowConfig, SiteConfig, UniqueKeyMode
logger = logging.getLogger(__name__)
@dataclass
class TaskLogHandle:
log_id: str
started_at: datetime
class MongoRepository:
def __init__(self, mongo_uri: str, database: str) -> None:
self._client = MongoClient(mongo_uri)
self._db = self._client[database]
@property
def client(self) -> MongoClient:
return self._client
@property
def database(self) -> Database:
return self._db
def save_records(
self,
site: SiteConfig,
flow: FlowConfig,
records: Iterable[Dict[str, Any]],
) -> None:
) -> int:
records_list = list(records)
if not records_list:
logger.info("Flow %s yielded no records", flow.flow_id)
return
return 0
collection_name = f"{site.site_id}_{flow.flow_id}"
collection = self._db[collection_name]
if flow.unique_keys is UniqueKeyMode.none:
collection.insert_many(records_list)
return
return len(records_list)
operations: List[UpdateOne] = []
for record in records_list:
@@ -49,15 +69,20 @@ class MongoRepository:
)
)
matched = 0
upserted = 0
if operations:
result = collection.bulk_write(operations, ordered=False)
matched = result.matched_count
upserted = len(result.upserted_ids or [])
logger.info(
"Saved records for %s/%s: matched=%s upserted=%s",
site.site_id,
flow.flow_id,
result.matched_count,
len(result.upserted_ids or []),
matched,
upserted,
)
return matched + upserted
def _unique_key(self, flow: FlowConfig, record: Dict[str, Any]) -> str:
if flow.unique_keys is UniqueKeyMode.custom and flow.unique_columns:
@@ -73,3 +98,137 @@ class MongoRepository:
if hasattr(obj, "isoformat"):
return obj.isoformat()
return str(obj)
class MongoLogRepository:
def __init__(
self,
client: MongoClient,
database: str,
collection: str = "task_logs",
) -> None:
self._db: Database = client[database]
self._collection: Collection = self._db[collection]
def start(
self,
site: SiteConfig,
flow: FlowConfig,
*,
is_login: bool,
xml_source: Optional[str],
xml_snapshot: Optional[str],
metadata: Optional[Dict[str, Any]] = None,
) -> TaskLogHandle:
log_id = str(uuid.uuid4())
started_at = datetime.utcnow()
doc: Dict[str, Any] = {
"_id": log_id,
"site_id": site.site_id,
"flow_id": flow.flow_id,
"is_login": is_login,
"status": "start",
"started_at": started_at,
"xml_source": xml_source,
"xml_snapshot": self._truncate_xml(xml_snapshot),
"metadata": metadata or {},
"steps": [],
}
self._collection.insert_one(doc)
return TaskLogHandle(log_id=log_id, started_at=started_at)
def append_steps(self, handle: TaskLogHandle, steps: List[Dict[str, Any]]) -> None:
if not steps:
return
self._collection.update_one(
{"_id": handle.log_id},
{"$push": {"steps": {"$each": steps}}},
)
def mark_success(
self,
handle: TaskLogHandle,
steps: Optional[List[Dict[str, Any]]] = None,
summary: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
self._finalize(
handle,
status="success",
steps=steps,
summary=summary,
metadata=metadata,
)
def mark_error(
self,
handle: TaskLogHandle,
exc: Exception,
steps: Optional[List[Dict[str, Any]]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
error_info = {
"type": exc.__class__.__name__,
"message": str(exc),
"stack": traceback.format_exc(),
}
self._finalize(
handle,
status="error",
steps=steps,
metadata=metadata,
extra_fields={"error": error_info},
)
def mark_aborted(
self,
handle: TaskLogHandle,
reason: str,
steps: Optional[List[Dict[str, Any]]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
self._finalize(
handle,
status="aborted",
steps=steps,
metadata=metadata,
extra_fields={"abort_reason": reason},
)
def _finalize(
self,
handle: TaskLogHandle,
*,
status: str,
steps: Optional[List[Dict[str, Any]]] = None,
summary: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
extra_fields: Optional[Dict[str, Any]] = None,
) -> None:
ended_at = datetime.utcnow()
duration_ms = int((ended_at - handle.started_at).total_seconds() * 1000)
update_fields: Dict[str, Any] = {
"status": status,
"ended_at": ended_at,
"duration_ms": duration_ms,
}
if steps is not None:
update_fields["steps"] = steps
if summary is not None:
update_fields["summary"] = summary
if metadata:
for key, value in metadata.items():
update_fields[f"metadata.{key}"] = value
if extra_fields:
update_fields.update(extra_fields)
self._collection.update_one(
{"_id": handle.log_id},
{"$set": update_fields},
)
def _truncate_xml(self, xml: Optional[str], limit: int = 50_000) -> Optional[str]:
if not xml:
return None
if len(xml) <= limit:
return xml
return xml[:limit] + f"\n<!-- truncated, original_length={len(xml)} -->"