From 952c90e537e6899c3b9e88d25185d65264866e6a Mon Sep 17 00:00:00 2001 From: Flik Date: Fri, 17 Oct 2025 17:24:16 +0800 Subject: [PATCH] init --- .idea/.gitignore | 8 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + .idea/xspider.iml | 8 + README.md | 79 +++++ main.py | 11 + xspider/__init__.py | 7 + xspider/actions/__init__.py | 3 + xspider/actions/base.py | 38 +++ xspider/actions/builtin.py | 308 ++++++++++++++++++ xspider/actions/registry.py | 54 +++ xspider/app.py | 73 +++++ xspider/browser.py | 230 +++++++++++++ xspider/extraction.py | 74 +++++ xspider/models.py | 92 ++++++ xspider/redis_queue.py | 40 +++ xspider/runner.py | 170 ++++++++++ xspider/settings.py | 32 ++ xspider/storage.py | 75 +++++ xspider/utils/selectors.py | 29 ++ xspider/variables.py | 79 +++++ xspider/xml_parser.py | 284 ++++++++++++++++ 24 files changed, 1721 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/xspider.iml create mode 100644 README.md create mode 100644 main.py create mode 100644 xspider/__init__.py create mode 100644 xspider/actions/__init__.py create mode 100644 xspider/actions/base.py create mode 100644 xspider/actions/builtin.py create mode 100644 xspider/actions/registry.py create mode 100644 xspider/app.py create mode 100644 xspider/browser.py create mode 100644 xspider/extraction.py create mode 100644 xspider/models.py create mode 100644 xspider/redis_queue.py create mode 100644 xspider/runner.py create mode 100644 xspider/settings.py create mode 100644 xspider/storage.py create mode 100644 xspider/utils/selectors.py create mode 100644 xspider/variables.py create mode 100644 xspider/xml_parser.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..575fc36 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7b68d1c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/xspider.iml b/.idea/xspider.iml new file mode 100644 index 0000000..d8b3f6c --- /dev/null +++ b/.idea/xspider.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..27da86e --- /dev/null +++ b/README.md @@ -0,0 +1,79 @@ +# xspider 模板爬虫 + +基于 XML 配置驱动的爬虫执行引擎,可从 Redis 列表中获取模板地址,按流程控制使用 DrissionPage 浏览器执行登录和业务流程,并将抽取的数据存储到 MongoDB。 + +## 依赖 + +- Python 3.10+ +- [DrissionPage](https://github.com/g1879/DrissionPage) +- `redis`, `requests`, `pymongo`, `lxml`, `cssselect` + +使用 pip 安装: + +```bash +pip install drissionpage redis requests pymongo lxml cssselect +``` + +若需要验证码识别、变量服务等能力,请根据业务另行实现。 + +## 环境变量 + +| 变量 | 默认值 | 说明 | +| --- | --- | --- | +| `XSPIDER_REDIS_URL` | `redis://localhost:6379/0` | Redis 连接串 | +| `XSPIDER_REDIS_LIST_KEY` | `xspider:config` | 待处理模板所在的 `list` key | +| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | `BLPOP` 阻塞秒数 | +| `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 | +| `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 | +| `XSPIDER_VARIABLE_SERVICE` | `None` | 变量服务接口地址;GET 查询,POST 写入 | + +变量服务要求: + +- `GET {base}?name=变量名&...` 返回 JSON,包含 `value` 字段。 +- `POST {base}` 提交 JSON `{name, value, ...}`。 + +## 运行 + +```bash +python main.py +``` + +程序将持续阻塞等待 Redis 列表推送 XML 模板地址,下载模板并执行流程。 + +## XML 模板 + +模板结构参考示例: + +```xml + + +
+ + + + + + + + + + + + + + + + + + + + +``` + +支持的 `action` 类型见 `xspider/actions/builtin.py`,如需扩展可继承 `BaseAction` 并注册到 `ActionRegistry`。 + +## 重要说明 + +- `CaptchaAction` 会自动截图(元素或整页)并调用 `https://captcha.lfei007s.workers.dev`,请求体为 `{image, type}`(image 采用 `data:image/png;base64,...` 形式)。可通过 `captcha_config`(JSON 字符串)自定义 `url`、`headers`、`timeout` 或额外字段。 +- 下载文件监听、复杂的分页场景需要根据目标站点扩展。 +- 为保证可维护性,所有动作执行过程中均进行了简单日志输出并允许扩展变量解析。需要对框架进行二次开发时,可直接扩展 Action、Extractor 以及 Runner。 diff --git a/main.py b/main.py new file mode 100644 index 0000000..4d1f95b --- /dev/null +++ b/main.py @@ -0,0 +1,11 @@ +from xspider.app import TemplateCrawlerApp, configure_logging + + +def main() -> None: + configure_logging() + app = TemplateCrawlerApp() + app.run_forever() + + +if __name__ == "__main__": + main() diff --git a/xspider/__init__.py b/xspider/__init__.py new file mode 100644 index 0000000..91ff581 --- /dev/null +++ b/xspider/__init__.py @@ -0,0 +1,7 @@ +"""Template-driven crawling framework.""" + +__all__ = [ + "TemplateCrawlerApp", +] + +from .app import TemplateCrawlerApp diff --git a/xspider/actions/__init__.py b/xspider/actions/__init__.py new file mode 100644 index 0000000..71f189e --- /dev/null +++ b/xspider/actions/__init__.py @@ -0,0 +1,3 @@ +from .registry import ActionRegistry + +__all__ = ["ActionRegistry"] diff --git a/xspider/actions/base.py b/xspider/actions/base.py new file mode 100644 index 0000000..11d74d9 --- /dev/null +++ b/xspider/actions/base.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import time +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from ..browser import BrowserSession +from ..models import ActionConfig +from ..variables import VariableResolver + + +class ActionContext: + def __init__( + self, + session: BrowserSession, + resolver: VariableResolver, + site_context: Dict[str, str], + ) -> None: + self.session = session + self.resolver = resolver + self.site_context = site_context + + +class BaseAction(ABC): + type_name: str + + def __init__(self, config: ActionConfig) -> None: + self.config = config + + def execute(self, ctx: ActionContext) -> Optional[Any]: + result = self._execute(ctx) + if self.config.after_wait: + time.sleep(self.config.after_wait / 1000.0) + return result + + @abstractmethod + def _execute(self, ctx: ActionContext) -> Optional[Any]: + raise NotImplementedError diff --git a/xspider/actions/builtin.py b/xspider/actions/builtin.py new file mode 100644 index 0000000..cd29c94 --- /dev/null +++ b/xspider/actions/builtin.py @@ -0,0 +1,308 @@ +from __future__ import annotations + +import json +import logging +import time +from typing import Any, Dict, Optional + +import requests + +from ..models import ActionConfig +from .base import ActionContext, BaseAction + +logger = logging.getLogger(__name__) + + +def _timeout_seconds(action: ActionConfig) -> float: + return max(action.timeout_ms / 1000.0, 0.1) + + +class GotoAction(BaseAction): + type_name = "goto" + + def _execute(self, ctx: ActionContext) -> None: + url = ctx.resolver.resolve( + self.config.params.get("url") or self.config.selector, + ctx.site_context, + ) + if not url: + url = ctx.site_context.get("entry_url") or ctx.site_context.get("base_url") + if not url: + raise ValueError("goto action requires a URL or entry/base context") + ctx.session.goto(url) + + +class ClickAction(BaseAction): + type_name = "click" + + def _execute(self, ctx: ActionContext) -> None: + if not self.config.selector: + raise ValueError("click action requires selector") + button_raw = self.config.params.get("button") + button = ( + ctx.resolver.resolve(button_raw, ctx.site_context) + if button_raw + else None + ) + ctx.session.click( + selector=ctx.resolver.resolve(self.config.selector, ctx.site_context), + mode=self.config.mode, + timeout=_timeout_seconds(self.config), + button=button, + ) + download_name = self.config.params.get("download_filename") + if download_name: + ctx.session.download( + ctx.resolver.resolve(download_name, ctx.site_context) or download_name + ) + + +class TypeAction(BaseAction): + type_name = "type" + + def _execute(self, ctx: ActionContext) -> None: + if not self.config.selector: + raise ValueError("type action requires selector") + text = ctx.resolver.resolve( + self.config.params.get("text"), + ctx.site_context, + ) + if text is None: + raise ValueError("type action missing 'text'") + ctx.session.type( + selector=ctx.resolver.resolve(self.config.selector, ctx.site_context), + mode=self.config.mode, + text=text, + timeout=_timeout_seconds(self.config), + ) + + +class WaitDomShowAction(BaseAction): + type_name = "wait_dom_show" + + def _execute(self, ctx: ActionContext) -> Optional[object]: + if not self.config.selector: + raise ValueError("wait_dom_show action requires selector") + return ctx.session.wait_dom_show( + selector=ctx.resolver.resolve(self.config.selector, ctx.site_context), + mode=self.config.mode, + timeout=_timeout_seconds(self.config), + ) + + +class WaitDomGoneAction(BaseAction): + type_name = "wait_dom_gone" + + def _execute(self, ctx: ActionContext) -> None: + if not self.config.selector: + raise ValueError("wait_dom_gone action requires selector") + ctx.session.wait_dom_gone( + selector=ctx.resolver.resolve(self.config.selector, ctx.site_context), + mode=self.config.mode, + timeout=_timeout_seconds(self.config), + ) + + +class WaitDomHideAction(BaseAction): + type_name = "wait_dom_hide" + + def _execute(self, ctx: ActionContext) -> None: + if not self.config.selector: + raise ValueError("wait_dom_hide action requires selector") + ctx.session.wait_dom_hide( + selector=ctx.resolver.resolve(self.config.selector, ctx.site_context), + mode=self.config.mode, + timeout=_timeout_seconds(self.config), + ) + + +class WaitTimeAction(BaseAction): + type_name = "wait_time" + + def _execute(self, ctx: ActionContext) -> None: + timeout_raw = self.config.params.get("timeout_ms", str(self.config.timeout_ms)) + timeout_resolved = ctx.resolver.resolve(timeout_raw, ctx.site_context) + try: + timeout_ms = ( + int(timeout_resolved) if timeout_resolved else self.config.timeout_ms + ) + except ValueError as exc: + raise ValueError(f"Invalid timeout_ms value: {timeout_resolved}") from exc + time.sleep(max(timeout_ms, 0) / 1000.0) + + +class RunJsAction(BaseAction): + type_name = "run_js" + + def _execute(self, ctx: ActionContext) -> object: + script = self.config.params.get("script") or self.config.params.get("text") + script = ctx.resolver.resolve(script, ctx.site_context) + if not script: + raise ValueError("run_js action requires script") + return ctx.session.run_js(script) + + +class SetHeaderAction(BaseAction): + type_name = "set_header" + + def _execute(self, ctx: ActionContext) -> None: + header_name = self.config.params.get("header_name") + header_value = self.config.params.get("header_value") + if not header_name or header_value is None: + raise ValueError("set_header requires header_name and header_value") + ctx.session.set_header( + ctx.resolver.resolve(header_name, ctx.site_context) or header_name, + ctx.resolver.resolve(header_value, ctx.site_context) or header_value, + ) + + +class SetAttrAction(BaseAction): + type_name = "set_attr" + + def _execute(self, ctx: ActionContext) -> None: + selector = self.config.selector + attr_name = self.config.params.get("attr_name") + attr_value = self.config.params.get("attr_value") + if not selector or not attr_name: + raise ValueError("set_attr requires selector and attr_name") + resolved_selector = ctx.resolver.resolve(selector, ctx.site_context) or selector + ctx.session.set_attr( + selector=resolved_selector, + mode=self.config.mode, + attr=ctx.resolver.resolve(attr_name, ctx.site_context) or attr_name, + value=ctx.resolver.resolve(attr_value, ctx.site_context) + if attr_value + else "", + timeout=_timeout_seconds(self.config), + ) + + +class SetVarAction(BaseAction): + type_name = "set_var" + + def _execute(self, ctx: ActionContext) -> None: + var_name = self.config.params.get("var_name") + var_value = self.config.params.get("var_value") + if not var_name or var_value is None: + raise ValueError("set_var requires var_name and var_value") + resolved_name = ctx.resolver.resolve(var_name, ctx.site_context) or var_name + resolved_value = ( + ctx.resolver.resolve(var_value, ctx.site_context) or var_value + ) + payload = { + "scope": ctx.resolver.resolve( + self.config.params.get("var_scope"), ctx.site_context + ) + if self.config.params.get("var_scope") + else None, + "ttl": ctx.resolver.resolve( + self.config.params.get("var_ttl"), ctx.site_context + ) + if self.config.params.get("var_ttl") + else None, + "single_use": ctx.resolver.resolve( + self.config.params.get("var_single_use"), ctx.site_context + ) + if self.config.params.get("var_single_use") + else None, + } + payload = {k: v for k, v in payload.items() if v is not None} + ctx.resolver.set( + resolved_name, + resolved_value, + {**ctx.site_context, **payload}, + ) + + +class CaptchaAction(BaseAction): + type_name = "captcha" + + DEFAULT_ENDPOINT = "https://captcha.lfei007s.workers.dev" + _session = requests.Session() + + def _execute(self, ctx: ActionContext) -> None: + config = self._load_config(ctx) + api_url = ( + ctx.resolver.resolve( + self.config.params.get("captcha_url"), + ctx.site_context, + ) + or config.get("url") + or self.DEFAULT_ENDPOINT + ) + + captcha_type = ctx.resolver.resolve( + self.config.params.get("captcha_type"), + ctx.site_context, + ) + + image_source = self._resolve_image(ctx) + + payload: Dict[str, Any] = { + "image": image_source, + "type": captcha_type, + } + timeout = config.get("timeout", 30) + + logger.debug("Submitting captcha to %s", api_url) + response = self._session.post( + api_url, + json=payload, + timeout=timeout, + ) + response.raise_for_status() + result_payload = response.json() + logger.info("Captcha result: %s", result_payload) + solution = self._extract_solution(result_payload) + logger.info("Captcha recognized successfully.") + + variable_name = ( + ctx.resolver.resolve( + self.config.params.get("variable"), + ctx.site_context, + ) + or f"{ctx.site_context.get('site_id', 'site')}:captcha_result" + ) + + ctx.resolver.set(variable_name, solution, ctx.site_context) + ctx.site_context[variable_name] = solution + + def _load_config(self, ctx: ActionContext) -> Dict[str, Any]: + raw_config = self.config.params.get("captcha_config") + if not raw_config: + return {} + resolved = ctx.resolver.resolve(raw_config, ctx.site_context) + if not resolved: + return {} + try: + parsed = json.loads(resolved) + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError as exc: + raise ValueError("Invalid JSON in captcha_config") from exc + return {} + + def _resolve_image(self, ctx: ActionContext) -> str: + direct_image = ctx.resolver.resolve( + self.config.params.get("image"), + ctx.site_context, + ) + if direct_image: + return direct_image + + selector = self.config.selector + mode = self.config.mode + timeout = max(self.config.timeout_ms / 1000.0, 1.0) + screenshot_b64 = ctx.session.screenshot(selector, mode, timeout) + if screenshot_b64.startswith("data:"): + return screenshot_b64 + return f"data:image/png;base64,{screenshot_b64}" + + def _extract_solution(self, payload: Dict[str, Any]) -> str: + for key in ("result", "text", "value", "code", "data"): + value = payload.get(key) + if value: + return str(value) + raise ValueError( + f"Captcha service response missing solution field: {payload}" + ) diff --git a/xspider/actions/registry.py b/xspider/actions/registry.py new file mode 100644 index 0000000..9b876a4 --- /dev/null +++ b/xspider/actions/registry.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import Dict, Type + +from .base import BaseAction +from .builtin import ( + CaptchaAction, + ClickAction, + GotoAction, + RunJsAction, + SetAttrAction, + SetHeaderAction, + SetVarAction, + TypeAction, + WaitDomGoneAction, + WaitDomHideAction, + WaitDomShowAction, + WaitTimeAction, +) + + +class ActionRegistry: + _registry: Dict[str, Type[BaseAction]] = {} + + @classmethod + def register(cls, action_cls: Type[BaseAction]) -> None: + cls._registry[action_cls.type_name] = action_cls + + @classmethod + def get(cls, type_name: str) -> Type[BaseAction]: + if type_name not in cls._registry: + raise KeyError(f"Unknown action type '{type_name}'") + return cls._registry[type_name] + + @classmethod + def register_builtin(cls) -> None: + for action_cls in ( + GotoAction, + ClickAction, + TypeAction, + WaitDomShowAction, + WaitDomGoneAction, + WaitDomHideAction, + WaitTimeAction, + RunJsAction, + SetHeaderAction, + SetAttrAction, + SetVarAction, + CaptchaAction, + ): + cls.register(action_cls) + + +ActionRegistry.register_builtin() diff --git a/xspider/app.py b/xspider/app.py new file mode 100644 index 0000000..490c800 --- /dev/null +++ b/xspider/app.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import logging +import sys +import time +from typing import Optional + +import requests + +from .redis_queue import RedisConfigQueue +from .runner import FlowRunner +from .settings import Settings +from .storage import MongoRepository +from .variables import VariableService +from .xml_parser import XMLSiteParser + +logger = logging.getLogger(__name__) + + +class TemplateCrawlerApp: + def __init__(self, settings: Optional[Settings] = None) -> None: + self.settings = settings or Settings.from_env() + self.queue = RedisConfigQueue( + self.settings.redis_url, + self.settings.redis_list_key, + timeout=self.settings.redis_block_timeout, + ) + self.mongo = MongoRepository( + self.settings.mongo_uri, + self.settings.mongo_database, + ) + self.variable_service = VariableService(self.settings.variable_service_url) + self.parser = XMLSiteParser() + self.runner = FlowRunner( + storage=self.mongo, + variable_service=self.variable_service, + ) + self.http = requests.Session() + + def run_forever(self) -> None: + logger.info("Template crawler started. Waiting for XML configurations...") + while True: + try: + self._iterate() + except KeyboardInterrupt: + logger.info("Received interrupt; shutting down.") + break + except Exception: # noqa: BLE001 + logger.exception("Unexpected error during iteration.") + time.sleep(3) + + def _iterate(self) -> None: + xml_location = self.queue.fetch() + if not xml_location: + return + logger.info("Fetched XML location: %s", xml_location) + xml_payload = self._load_xml(xml_location) + site = self.parser.parse(xml_payload) + self.runner.run_site(site) + + def _load_xml(self, location: str) -> str: + logger.debug("Downloading XML from %s", location) + response = self.http.get(location, timeout=30) + response.raise_for_status() + return response.text + + +def configure_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stdout, + ) diff --git a/xspider/browser.py b/xspider/browser.py new file mode 100644 index 0000000..10729de --- /dev/null +++ b/xspider/browser.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +import logging +import base64 +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import Any, Optional + +from .models import SelectorMode, SiteConfig + +logger = logging.getLogger(__name__) + + +class BrowserError(Exception): + """Wrap driver-specific exceptions.""" + + +@dataclass +class BrowserSession: + site: SiteConfig + page: Any + + def goto(self, url: str, wait: Optional[int] = None) -> None: + logger.debug("Navigating to %s", url) + try: + self.page.get(url) + if wait: + self.page.wait.load_start(wait / 1000.0) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to navigate to {url}") from exc + + def find(self, selector: str, mode: SelectorMode) -> Any: + try: + prefixed_selector = self._prefixed_selector(selector, mode) + if mode is SelectorMode.xpath: + return self.page.ele(prefixed_selector, mode="xpath") + return self.page.ele(prefixed_selector, mode="css") + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to locate element {selector}") from exc + + def click(self, selector: str, mode: SelectorMode, timeout: float, button: Optional[str] = None) -> None: + ele = self._wait_ele(selector, mode, timeout) + try: + if button: + ele.click(button=button) + else: + ele.click() + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to click {selector}") from exc + + def type(self, selector: str, mode: SelectorMode, text: str, timeout: float) -> None: + ele = self._wait_ele(selector, mode, timeout) + try: + ele.clear() + ele.input(text) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to type into {selector}") from exc + + def run_js(self, script: str) -> Any: + try: + return self.page.run_js(script) + except Exception as exc: # noqa: BLE001 + raise BrowserError("Failed to execute script") from exc + + def wait_dom_show(self, selector: str, mode: SelectorMode, timeout: float) -> Any: + return self._wait_ele(selector, mode, timeout) + + def wait_dom_gone(self, selector: str, mode: SelectorMode, timeout: float) -> None: + try: + prefixed_selector = self._prefixed_selector(selector, mode) + self.page.wait.ele_gone(prefixed_selector, timeout=timeout, mode=mode.value) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Element {selector} did not disappear") from exc + + def wait_dom_hide(self, selector: str, mode: SelectorMode, timeout: float) -> None: + ele = self._wait_ele(selector, mode, timeout) + try: + self.page.wait.attr(ele, "style", "display: none", timeout=timeout) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Element {selector} did not hide") from exc + + def set_header(self, name: str, value: str) -> None: + try: + self.page.set_extra_headers({name: value}) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to set header {name}") from exc + + def set_attr(self, selector: str, mode: SelectorMode, attr: str, value: str, timeout: float) -> None: + ele = self._wait_ele(selector, mode, timeout) + try: + self.page.run_js( + "arguments[0].setAttribute(arguments[1], arguments[2]);", + args=(ele, attr, value), + ) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Failed to set attribute {attr} on {selector}") from exc + + def download(self, filename: str) -> None: + # Placeholder for download handling. + logger.info("Download requested for %s", filename) + + def html(self) -> str: + try: + if callable(getattr(self.page, "html", None)): + return self.page.html() + return self.page.html + except Exception as exc: # noqa: BLE001 + raise BrowserError("Failed to retrieve page HTML") from exc + + def _prefixed_selector(self, selector: str, mode: SelectorMode) -> str: + stripped = selector.strip() + if not stripped: + return stripped + expected_prefix = f"{mode.value}=" + lowered = stripped.lower() + if lowered.startswith(expected_prefix): + return stripped + alt_prefix = f"{mode.value}:" + if lowered.startswith(alt_prefix): + return f"{expected_prefix}{stripped[len(alt_prefix):]}" + return f"{expected_prefix}{stripped}" + + def _wait_ele(self, selector: str, mode: SelectorMode, timeout: float) -> Any: + try: + prefixed_selector = self._prefixed_selector(selector, mode) + return self.page.wait.ele(prefixed_selector, timeout=timeout, mode=mode.value) + except Exception as exc: # noqa: BLE001 + raise BrowserError(f"Timeout locating element {selector}") from exc + + def screenshot( + self, + selector: Optional[str] = None, + mode: SelectorMode = SelectorMode.css, + timeout: float = 5.0, + ) -> str: + if selector: + element = self._wait_ele(selector, mode, timeout) + return self._screenshot_element(element) + return self._screenshot_page() + + def _screenshot_element(self, element: Any) -> str: + # Try a few common DrissionPage/Selenium patterns. + candidates = [ + ("screenshot", {"base64": True}), + ("screenshot", {"as_base64": True}), + ("screenshot_as_base64", {}), + ("get_screenshot", {"as_base64": True}), + ("screenshot", {}), + ] + for method_name, kwargs in candidates: + method = getattr(element, method_name, None) + if not callable(method): + continue + try: + result = method(**kwargs) + return self._ensure_base64(result) + except TypeError: + # Retry without kwargs if not supported. + try: + result = method() + return self._ensure_base64(result) + except Exception: # noqa: BLE001 + continue + except Exception: # noqa: BLE001 + continue + raise BrowserError("Failed to capture captcha element screenshot.") + + def _screenshot_page(self) -> str: + candidates = [ + ("get_screenshot", {"as_base64": True}), + ("screenshot", {"as_base64": True}), + ("screenshot", {}), + ] + for method_name, kwargs in candidates: + method = getattr(self.page, method_name, None) + if not callable(method): + continue + try: + result = method(**kwargs) + return self._ensure_base64(result) + except TypeError: + try: + result = method() + return self._ensure_base64(result) + except Exception: # noqa: BLE001 + continue + except Exception: # noqa: BLE001 + continue + raise BrowserError("Failed to capture page screenshot.") + + def _ensure_base64(self, content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, bytes): + return base64.b64encode(content).decode("utf-8") + raise BrowserError("Unsupported screenshot content type.") + + +@dataclass +class BrowserFactory: + site: SiteConfig + options: Any = None + _page_kwargs: dict = field(default_factory=dict) + + def create(self) -> BrowserSession: + try: + from DrissionPage import ChromiumOptions, ChromiumPage + except ImportError as exc: # noqa: BLE001 + raise RuntimeError( + "DrissionPage is required for BrowserFactory. Install with `pip install drissionpage`." + ) from exc + + chromium_options = self.options or ChromiumOptions() + page = ChromiumPage(addr_or_opts=chromium_options, **self._page_kwargs) + + for header in self.site.settings.headers: + page.set_extra_headers({header.name: header.value}) + + return BrowserSession(site=self.site, page=page) + + @contextmanager + def session(self) -> BrowserSession: + browser = self.create() + try: + yield browser + finally: + try: + browser.page.close() + except Exception: # noqa: BLE001 + logger.debug("Failed to close browser cleanly", exc_info=True) diff --git a/xspider/extraction.py b/xspider/extraction.py new file mode 100644 index 0000000..ead19da --- /dev/null +++ b/xspider/extraction.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import datetime as dt +import logging +from typing import Any, Dict, List + +from lxml import html + +from .models import ExtractConfig, FieldConfig, SelectorMode + +logger = logging.getLogger(__name__) + + +class Extractor: + def __init__(self) -> None: + pass + + def extract(self, page_html: str, config: ExtractConfig) -> List[Dict[str, Any]]: + doc = html.fromstring(page_html) + if config.record_mode is SelectorMode.css: + nodes = doc.cssselect(config.record_selector) + else: + nodes = doc.xpath(config.record_selector) + + records: List[Dict[str, Any]] = [] + for node in nodes: + record: Dict[str, Any] = {} + for field in config.fields: + record[field.name] = self._extract_field(node, field) + records.append(record) + return records + + def _extract_field(self, node: html.HtmlElement, field: FieldConfig) -> Any: + raw_value = "" + try: + if field.mode is SelectorMode.css: + matches = node.cssselect(field.selector) + else: + matches = node.xpath(field.selector) + if matches: + if hasattr(matches[0], "text_content"): + raw_value = matches[0].text_content().strip() + else: + raw_value = str(matches[0]).strip() + except Exception: # noqa: BLE001 + logger.debug("Failed to extract field %s", field.name, exc_info=True) + raw_value = "" + return self._transform_value(raw_value, field.value_type) + + def _transform_value(self, value: str, value_type: str | None) -> Any: + if not value_type or value == "": + return value + if value_type == "string_lower": + return value.lower() + if value_type == "string_upper": + return value.upper() + if value_type == "int": + try: + return int(value) + except ValueError: + return value + if value_type == "float": + try: + return float(value) + except ValueError: + return value + if value_type == "date": + for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S"): + try: + return dt.datetime.strptime(value, fmt) + except ValueError: + continue + return value + return value diff --git a/xspider/models.py b/xspider/models.py new file mode 100644 index 0000000..9657bc9 --- /dev/null +++ b/xspider/models.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + + +class SelectorMode(str, Enum): + css = "css" + xpath = "xpath" + + +@dataclass +class HeaderConfig: + name: str + value: str + + +@dataclass +class SiteSettings: + enable_proxy: bool = False + rotate_ua: bool = False + retry: int = 3 + headers: List[HeaderConfig] = field(default_factory=list) + + +@dataclass +class ActionConfig: + type: str + selector: Optional[str] = None + mode: SelectorMode = SelectorMode.xpath + timeout_ms: int = 10_000 + after_wait: int = 0 + params: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class FieldConfig: + name: str + selector: str + mode: SelectorMode = SelectorMode.css + value_type: Optional[str] = None + + +@dataclass +class ExtractConfig: + record_selector: str + record_mode: SelectorMode = SelectorMode.css + fields: List[FieldConfig] = field(default_factory=list) + download: Optional[Dict[str, Any]] = None + + +@dataclass +class ExcelExtractConfig: + file_pattern: str + directory: Optional[str] = None + + +@dataclass +class PaginateConfig: + mode: SelectorMode = SelectorMode.xpath + selector: Optional[str] = None + max_pages: Optional[int] = None + + +class UniqueKeyMode(str, Enum): + all = "all" + custom = "custom" + none = "null" + + +@dataclass +class FlowConfig: + flow_id: str + entry: Optional[str] + data_type: Optional[str] + unique_keys: UniqueKeyMode = UniqueKeyMode.all + unique_columns: List[str] = field(default_factory=list) + actions: List[ActionConfig] = field(default_factory=list) + extract: Optional[ExtractConfig] = None + excel_extract: Optional[ExcelExtractConfig] = None + paginate: Optional[PaginateConfig] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class SiteConfig: + site_id: str + base: Optional[str] + settings: SiteSettings = field(default_factory=SiteSettings) + login: Optional[FlowConfig] = None + flows: List[FlowConfig] = field(default_factory=list) diff --git a/xspider/redis_queue.py b/xspider/redis_queue.py new file mode 100644 index 0000000..e078c2a --- /dev/null +++ b/xspider/redis_queue.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import logging +from typing import Optional + +import redis + +logger = logging.getLogger(__name__) + + +class RedisConfigQueue: + def __init__(self, redis_url: str, list_key: str, timeout: int = 30) -> None: + self._client = redis.Redis.from_url(redis_url) + self._list_key = list_key + self._timeout = timeout + + def fetch(self) -> Optional[str]: + item = self._client.brpop(self._list_key, timeout=self._timeout) + if not item: + return None + _, value = item + raw = value.decode("utf-8").strip() + if not raw: + return None + if raw[0] == raw[-1] and raw[0] in {'"', "'"}: + raw = raw[1:-1].strip() + raw = raw.strip() + return raw or None + + def push(self, value: str) -> None: + self._client.rpush(self._list_key, value) + + +if __name__ == '__main__': + queue = RedisConfigQueue( + "redis://:flik1513.@pve.92coco.cn:6379/0", + '', + timeout=30, + ) + print(queue.fetch()) diff --git a/xspider/runner.py b/xspider/runner.py new file mode 100644 index 0000000..055840c --- /dev/null +++ b/xspider/runner.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import logging +import time +from typing import Any, Dict, Optional +from urllib.parse import urljoin + +from .actions.registry import ActionRegistry +from .browser import BrowserFactory, BrowserSession, BrowserError +from .extraction import Extractor +from .models import FlowConfig, SiteConfig, SelectorMode, ExcelExtractConfig +from .storage import MongoRepository +from .variables import VariableResolver, VariableService +from .utils.selectors import is_xpath_selector +from .actions.base import ActionContext + +logger = logging.getLogger(__name__) + + +class FlowRunner: + def __init__( + self, + storage: MongoRepository, + variable_service: VariableService, + extractor: Optional[Extractor] = None, + ) -> None: + self.storage = storage + self.variable_service = variable_service + self.extractor = extractor or Extractor() + + def run_site(self, site: SiteConfig) -> None: + factory = BrowserFactory(site) + resolver = VariableResolver(self.variable_service) + + with factory.session() as session: + if site.login: + logger.info("Executing login flow for site %s", site.site_id) + self._run_flow(site, site.login, session, resolver, is_login=True) + for flow in site.flows: + logger.info("Executing flow %s for site %s", flow.flow_id, site.site_id) + self._run_flow(site, flow, session, resolver, is_login=False) + + def _run_flow( + self, + site: SiteConfig, + flow: FlowConfig, + session: BrowserSession, + resolver: VariableResolver, + is_login: bool, + ) -> None: + entry_url = self._resolve_entry(site, flow) + site_context = self._build_context(site, flow, entry_url) + action_context = ActionContext(session, resolver, site_context) + + if entry_url: + resolved_entry = resolver.resolve(entry_url, action_context.site_context) or entry_url + action_context.site_context["entry_url"] = resolved_entry + logger.debug("Flow %s navigating to %s", flow.flow_id, resolved_entry) + session.goto(resolved_entry) + + for action_config in flow.actions: + action_cls = ActionRegistry.get(action_config.type) + action = action_cls(action_config) + logger.debug("Executing action %s", action_config.type) + action.execute(action_context) + + if is_login: + selector = flow.metadata.get("selector") + if selector: + selector_mode = self._resolve_selector_mode(selector, flow.metadata) + timeout = int(flow.metadata.get("timeout_ms", 10_000)) / 1000.0 + try: + session.wait_dom_show( + resolver.resolve(selector, action_context.site_context) or selector, + selector_mode, + timeout, + ) + except BrowserError: + raise RuntimeError("Login verification selector not found.") + return + + if not flow.extract: + if flow.excel_extract: + self._handle_excel_extract(site, flow, session) + else: + logger.info("Flow %s has no extract step; skipping storage.", flow.flow_id) + return + + if flow.paginate and flow.paginate.selector: + self._run_paginated(site, flow, session) + else: + records = self.extractor.extract(session.html(), flow.extract) + self.storage.save_records(site, flow, records) + + def _run_paginated( + self, + site: SiteConfig, + flow: FlowConfig, + session: BrowserSession, + ) -> None: + page = 0 + while True: + page += 1 + records = self.extractor.extract(session.html(), flow.extract) + self.storage.save_records(site, flow, records) + + if flow.paginate.max_pages and page >= flow.paginate.max_pages: + break + selector = flow.paginate.selector + if not selector: + break + try: + session.click(selector, flow.paginate.mode, timeout=5) + except BrowserError: + logger.info("Pagination stopped at page %s", page) + break + time.sleep(1) + + def _resolve_entry(self, site: SiteConfig, flow: FlowConfig) -> Optional[str]: + if flow.entry: + if site.base: + return urljoin(site.base, flow.entry) + return flow.entry + return site.base + + def _build_context( + self, + site: SiteConfig, + flow: FlowConfig, + entry_url: Optional[str], + ) -> Dict[str, str]: + context = { + "site_id": site.site_id, + "flow_id": flow.flow_id, + } + if site.base: + context["base_url"] = site.base + if entry_url: + context["entry_url"] = entry_url + return context + + def _resolve_selector_mode(self, selector: str, metadata: Dict[str, Any]) -> SelectorMode: + mode_value = metadata.get("mode") + if mode_value: + try: + mode = SelectorMode(mode_value) + except ValueError as exc: # noqa: BLE001 + raise ValueError(f"Unsupported selector mode {mode_value}") from exc + if mode is SelectorMode.css and is_xpath_selector(selector): + raise ValueError( + f"Selector '{selector}' looks like XPath but mode='css' specified." + ) + return mode + return SelectorMode.xpath + + def _handle_excel_extract( + self, + site: SiteConfig, + flow: FlowConfig, + session: BrowserSession, + ) -> None: + config = flow.excel_extract + if not config: + return + logger.warning( + "Excel extraction for %s/%s not yet implemented. Expected file pattern %s.", + site.site_id, + flow.flow_id, + config.file_pattern, + ) diff --git a/xspider/settings.py b/xspider/settings.py new file mode 100644 index 0000000..a255790 --- /dev/null +++ b/xspider/settings.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Settings: + redis_url: str + redis_list_key: str + mongo_uri: str + mongo_database: str + variable_service_url: Optional[str] + redis_block_timeout: int + + @classmethod + def from_env(cls) -> "Settings": + redis_url = os.getenv("XSPIDER_REDIS_URL", "redis://:flik1513.@pve.92coco.cn:6379/0") + redis_list_key = os.getenv("XSPIDER_REDIS_LIST_KEY", "xspider:urls") + mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017") + mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider") + variable_service_url = os.getenv("XSPIDER_VARIABLE_SERVICE") + redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30")) + return cls( + redis_url=redis_url, + redis_list_key=redis_list_key, + mongo_uri=mongo_uri, + mongo_database=mongo_database, + variable_service_url=variable_service_url, + redis_block_timeout=redis_block_timeout, + ) diff --git a/xspider/storage.py b/xspider/storage.py new file mode 100644 index 0000000..a2fc5c2 --- /dev/null +++ b/xspider/storage.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import hashlib +import json +import logging +from typing import Any, Dict, Iterable, List, Optional + +from pymongo import MongoClient, UpdateOne + +from .models import FlowConfig, SiteConfig, UniqueKeyMode + +logger = logging.getLogger(__name__) + + +class MongoRepository: + def __init__(self, mongo_uri: str, database: str) -> None: + self._client = MongoClient(mongo_uri) + self._db = self._client[database] + + def save_records( + self, + site: SiteConfig, + flow: FlowConfig, + records: Iterable[Dict[str, Any]], + ) -> None: + records_list = list(records) + if not records_list: + logger.info("Flow %s yielded no records", flow.flow_id) + return + + collection_name = f"{site.site_id}_{flow.flow_id}" + collection = self._db[collection_name] + + if flow.unique_keys is UniqueKeyMode.none: + collection.insert_many(records_list) + return + + operations: List[UpdateOne] = [] + for record in records_list: + unique_key = self._unique_key(flow, record) + operations.append( + UpdateOne( + {"_unique": unique_key}, + { + "$set": {**record, "_unique": unique_key}, + "$setOnInsert": {"_site": site.site_id, "_flow": flow.flow_id}, + }, + upsert=True, + ) + ) + + if operations: + result = collection.bulk_write(operations, ordered=False) + logger.info( + "Saved records for %s/%s: matched=%s upserted=%s", + site.site_id, + flow.flow_id, + result.matched_count, + len(result.upserted_ids or []), + ) + + def _unique_key(self, flow: FlowConfig, record: Dict[str, Any]) -> str: + if flow.unique_keys is UniqueKeyMode.custom and flow.unique_columns: + payload = {key: record.get(key) for key in flow.unique_columns} + else: + payload = record + serialized = json.dumps(payload, sort_keys=True, default=self._json_default) + return hashlib.md5(serialized.encode("utf-8")).hexdigest() + + def _json_default(self, obj: Any) -> Any: + if isinstance(obj, (set, tuple)): + return list(obj) + if hasattr(obj, "isoformat"): + return obj.isoformat() + return str(obj) diff --git a/xspider/utils/selectors.py b/xspider/utils/selectors.py new file mode 100644 index 0000000..564010b --- /dev/null +++ b/xspider/utils/selectors.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from typing import Optional + + +def is_xpath_selector(selector: Optional[str]) -> bool: + """Heuristically determine if the selector string looks like XPath.""" + if not selector: + return False + + stripped = selector.strip() + if not stripped: + return False + + lowered = stripped.lower() + if lowered.startswith("xpath="): + return True + + if stripped.startswith(("//", ".//", "/")): + return True + + if stripped.startswith("(") and "//" in stripped: + return True + + if stripped.startswith("@") or stripped.startswith("text()"): + return True + + xpath_tokens = ("::", "[@", "]", " and ", " or ", "/@") + return any(token in stripped for token in xpath_tokens) diff --git a/xspider/variables.py b/xspider/variables.py new file mode 100644 index 0000000..8b3dd1b --- /dev/null +++ b/xspider/variables.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass, field +from typing import Dict, Optional + +import requests + +logger = logging.getLogger(__name__) + +VAR_PATTERN = re.compile(r"\$\{(?P[a-zA-Z0-9_:\-\.]+)\}") + + +@dataclass +class VariableService: + base_url: Optional[str] = None + session: requests.Session = field(default_factory=requests.Session) + + def fetch(self, name: str, context: Optional[Dict[str, str]] = None) -> str: + if not self.base_url: + raise RuntimeError( + f"Variable {name} requested but VARIABLE_SERVICE_URL not configured." + ) + params = {"name": name} + if context: + params.update(context) + response = self.session.get(self.base_url, params=params, timeout=10) + response.raise_for_status() + payload = response.json() + if "value" not in payload: + raise KeyError(f"Variable service response missing 'value': {payload}") + return str(payload["value"]) + + def set(self, name: str, value: str, context: Optional[Dict[str, str]] = None) -> None: + if not self.base_url: + raise RuntimeError("VARIABLE_SERVICE_URL not configured for set_var action.") + payload: Dict[str, str] = {"name": name, "value": value} + if context: + payload.update(context) + response = self.session.post(self.base_url, json=payload, timeout=10) + response.raise_for_status() + + +class VariableResolver: + def __init__( + self, + service: VariableService, + ) -> None: + self._service = service + self._cache: Dict[str, str] = {} + + def resolve(self, value: Optional[str], context: Optional[Dict[str, str]] = None) -> Optional[str]: + if not value: + return value + matches = list(VAR_PATTERN.finditer(value)) + if not matches: + return value + + result = value + for match in matches: + name = match.group("name") + replacement = self._cache.get(name) + if replacement is None: + try: + replacement = self._service.fetch(name, context) + self._cache[name] = replacement + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to resolve variable %s", name) + raise + result = result.replace(match.group(0), replacement) + return result + + def resolve_dict(self, payload: Dict[str, str], context: Optional[Dict[str, str]] = None) -> Dict[str, str]: + return {key: self.resolve(value, context) for key, value in payload.items()} + + def set(self, name: str, value: str, context: Optional[Dict[str, str]] = None) -> None: + self._cache[name] = value + self._service.set(name, value, context) diff --git a/xspider/xml_parser.py b/xspider/xml_parser.py new file mode 100644 index 0000000..fa9f2e8 --- /dev/null +++ b/xspider/xml_parser.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional +from xml.etree import ElementTree as ET + +from .models import ( + ActionConfig, + ExtractConfig, + ExcelExtractConfig, + FieldConfig, + FlowConfig, + HeaderConfig, + PaginateConfig, + SelectorMode, + SiteConfig, + SiteSettings, + UniqueKeyMode, +) +from .utils.selectors import is_xpath_selector + +logger = logging.getLogger(__name__) + + +def _as_bool(value: Optional[str], default: bool = False) -> bool: + if value is None: + return default + return value.lower() in {"true", "1", "yes", "on"} + + +def _as_int(value: Optional[str], default: int) -> int: + if value is None: + return default + try: + return int(value) + except ValueError as exc: + raise ValueError(f"Invalid integer value: {value}") from exc + + +def _selector_mode(value: Optional[str], default: SelectorMode = SelectorMode.css) -> SelectorMode: + if value is None: + return default + try: + return SelectorMode(value) + except ValueError as exc: + raise ValueError(f"Unsupported selector mode: {value}") from exc + + +class XMLSiteParser: + def parse(self, xml_payload: str) -> SiteConfig: + root = ET.fromstring(xml_payload) + self._strip_namespace(root) + if root.tag != "site": + raise ValueError("Root element must be ") + + site_id = root.attrib.get("id") + base = root.attrib.get("base") + + if not site_id: + raise ValueError(" missing required attribute 'id'") + + settings = self._parse_settings(root.find("config")) + login = self._parse_flow(root.find("login"), allow_missing_extract=True) + + flows_node = root.find("flows") + flows: List[FlowConfig] = [] + if flows_node is not None: + for flow_node in flows_node.findall("flow"): + flows.append(self._parse_flow(flow_node)) + + if not flows: + logger.warning("Site %s has no flows defined", site_id) + + return SiteConfig( + site_id=site_id, + base=base, + settings=settings, + login=login, + flows=flows, + ) + + def _strip_namespace(self, element: ET.Element) -> None: + """Remove XML namespaces in-place for easier processing.""" + for el in element.iter(): + if "}" in el.tag: + el.tag = el.tag.split("}", 1)[1] + + def _parse_settings(self, node: Optional[ET.Element]) -> SiteSettings: + if node is None: + return SiteSettings() + + attrs = node.attrib + enable_proxy = _as_bool(attrs.get("enable_proxy"), False) + rotate_ua = _as_bool(attrs.get("rotate_ua"), False) + retry = _as_int(attrs.get("retry"), 3) + + headers: List[HeaderConfig] = [] + for header in node.findall("header"): + name = header.attrib.get("name") + value = header.attrib.get("value", "") + if not name: + raise ValueError("
missing required attribute 'name'") + headers.append(HeaderConfig(name=name, value=value)) + + return SiteSettings( + enable_proxy=enable_proxy, + rotate_ua=rotate_ua, + retry=retry, + headers=headers, + ) + + def _parse_flow( + self, + node: Optional[ET.Element], + allow_missing_extract: bool = False, + ) -> Optional[FlowConfig]: + if node is None: + return None + + attrs = node.attrib + flow_id = attrs.get("id") or node.tag + entry = attrs.get("entry") or attrs.get("url") + data_type = attrs.get("data_type") + + unique_keys_attr = attrs.get("unique_keys", UniqueKeyMode.all.value) + try: + unique_keys = UniqueKeyMode(unique_keys_attr) + except ValueError as exc: + raise ValueError(f"Invalid unique_keys value: {unique_keys_attr}") from exc + + columns_attr = attrs.get("columns", "") + unique_columns = [col.strip() for col in columns_attr.split(",") if col.strip()] + + actions = [self._parse_action(action) for action in node.findall("action")] + + extract_node = node.find("extract") + extract = self._parse_extract(extract_node) if extract_node is not None else None + + excel_node = node.find("excel_extract") + excel_extract = ( + self._parse_excel_extract(excel_node) if excel_node is not None else None + ) + + if ( + not allow_missing_extract + and extract is None + and excel_extract is None + ): + raise ValueError(f" requires an extract section.") + + paginate_node = node.find("paginate") + paginate = ( + self._parse_paginate(paginate_node) if paginate_node is not None else None + ) + + metadata = { + key: value + for key, value in attrs.items() + if key + not in { + "id", + "entry", + "url", + "data_type", + "unique_keys", + "columns", + } + } + + return FlowConfig( + flow_id=flow_id, + entry=entry, + data_type=data_type, + unique_keys=unique_keys, + unique_columns=unique_columns, + actions=actions, + extract=extract, + excel_extract=excel_extract, + paginate=paginate, + metadata=metadata, + ) + + def _parse_action(self, node: ET.Element) -> ActionConfig: + attrs = node.attrib + action_type = attrs.get("type") + if not action_type: + raise ValueError(" missing required attribute 'type'") + + mode_attr = attrs.get("mode") + mode = _selector_mode(mode_attr, default=SelectorMode.xpath) + selector = attrs.get("selector") + if selector and mode is SelectorMode.css and is_xpath_selector(selector): + raise ValueError( + f"Selector '{selector}' looks like XPath but mode='css' specified." + ) + timeout_ms = _as_int(attrs.get("timeout_ms"), 10_000) + after_wait = _as_int(attrs.get("after_wait"), 0) + + params = { + key: value + for key, value in attrs.items() + if key + not in {"type", "selector", "mode", "timeout_ms", "after_wait"} + } + + # Support inline script text for run_js, etc. + if node.text and node.text.strip(): + params.setdefault("text", node.text.strip()) + + return ActionConfig( + type=action_type, + selector=selector, + mode=mode, + timeout_ms=timeout_ms, + after_wait=after_wait, + params=params, + ) + + def _parse_extract(self, node: ET.Element) -> ExtractConfig: + attrs = node.attrib + record_selector = attrs.get("record_css") or attrs.get("record_xpath") + if not record_selector: + raise ValueError(" requires record_css or record_xpath") + record_mode = ( + SelectorMode.css if "record_css" in attrs else SelectorMode.xpath + ) + + fields = [self._parse_field(field) for field in node.findall("field")] + + download = None + download_node = node.find("download") + if download_node is not None: + download = download_node.attrib.copy() + + return ExtractConfig( + record_selector=record_selector, + record_mode=record_mode, + fields=fields, + download=download, + ) + + def _parse_field(self, node: ET.Element) -> FieldConfig: + attrs = node.attrib + name = attrs.get("name") + selector = attrs.get("selector") + if not name or not selector: + raise ValueError(" requires 'name' and 'selector'") + + mode_attr = attrs.get("mode") + mode = _selector_mode(mode_attr) + value_type = attrs.get("value_type") + if mode is SelectorMode.css and is_xpath_selector(selector): + raise ValueError( + f"Field selector '{selector}' looks like XPath but mode='css' specified." + ) + + return FieldConfig( + name=name, + selector=selector, + mode=mode, + value_type=value_type, + ) + + def _parse_paginate(self, node: ET.Element) -> PaginateConfig: + attrs = node.attrib + selector = attrs.get("selector") or attrs.get("css") + mode_attr = attrs.get("mode") + mode = _selector_mode(mode_attr, default=SelectorMode.xpath) + if selector and mode is SelectorMode.css and is_xpath_selector(selector): + raise ValueError( + f"Paginate selector '{selector}' looks like XPath but mode='css' specified." + ) + max_pages = None + if "max_pages" in attrs: + max_pages = _as_int(attrs.get("max_pages"), 0) + return PaginateConfig(mode=mode, selector=selector, max_pages=max_pages) + + def _parse_excel_extract(self, node: ET.Element) -> ExcelExtractConfig: + attrs = node.attrib + file_pattern = attrs.get("file_pattern") or attrs.get("pattern") + if not file_pattern: + raise ValueError(" requires file_pattern attribute") + directory = attrs.get("directory") + return ExcelExtractConfig(file_pattern=file_pattern, directory=directory)