feat(variables):重构变量服务为 Redis 实现并增强作用域支持

- 将变量服务从 HTTP 接口迁移至 Redis 存储,提升性能与可靠性
- 支持显式作用域前缀:site:xxx 和 global:xxx
- 实现变量 TTL 与一次性读取功能(var_ttl、var_single_use)
- 新增 VariableScope 枚举与 VariableTarget 缓存键设计
- 改进 VariableResolver 缓存机制以兼容作用域隔离
- 更新 README 文档说明新变量语法与使用示例
- 移除 settings 中已弃用的 variable_service_url 配置项
- 调整 ActionRegistry 自动注册逻辑以适配模块化扫描
- 统一浏览器选择器模式分隔符由 '=' 改为 ':'
- 优化浏览器元素等待与属性设置的容错处理逻辑
This commit is contained in:
2025-10-20 21:47:58 +08:00
parent 952c90e537
commit f8370eb85e
11 changed files with 734 additions and 104 deletions

2
.idea/misc.xml generated
View File

@@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="/opt/homebrew/Caskroom/miniconda/base" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" />
</project>

2
.idea/xspider.iml generated
View File

@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.13" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.12" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

222
README.md
View File

@@ -1,48 +1,98 @@
# xspider 模板爬虫
基于 XML 配置驱动的爬虫执行引擎,可从 Redis 列表中获取模板地址,按流程控制使用 DrissionPage 浏览器执行登录业务流程,并将抽取的数据存储到 MongoDB。
基于 XML 模板驱动的浏览器自动化采集引擎,负责监听 Redis 队列获取任务模板,利用 DrissionPage 浏览器执行登录业务流程,并将抽取结果写入 MongoDB。本项目同时集成变量服务、验证码识别及分页、下载等常见采集场景的基础能力。
## 依赖
## 功能亮点
- 使用标准化 XML 模板描述站点、流程与字段,无需改动代码即可上线新任务。
- Redis 队列拉取模板地址,支持多实例横向扩展。
- 内置 Redis 变量服务,自动处理站点作用域与全局作用域的变量读写。
- 提供可扩展的 Action 注册机制,可按需新增自定义动作。
- 采集结果默认落地 MongoDB可按数据类型、唯一键去重。
## 目录结构
```
├── main.py # 应用入口,初始化日志并启动循环
├── xspider/
│ ├── app.py # TemplateCrawlerApp协调整体流程
│ ├── runner.py # FlowRunner调度动作与抽取逻辑
│ ├── browser.py # 浏览器会话封装,对接 DrissionPage
│ ├── actions/ # 内置动作定义、注册
│ ├── extraction.py # HTML 解析与字段抽取
│ ├── storage.py # MongoDB 存储实现
│ ├── redis_queue.py # Redis BLPOP 队列封装
│ ├── variables.py # 变量服务访问与缓存
│ ├── xml_parser.py # XML 模板解析
│ └── utils/ # 辅助方法(选择器判断等)
```
## 环境要求
- Python 3.10+
- [DrissionPage](https://github.com/g1879/DrissionPage)
- `redis`, `requests`, `pymongo`, `lxml`, `cssselect`
- 浏览器驱动建议使用 [DrissionPage](https://github.com/g1879/DrissionPage) 配套环境
使用 pip 安装:
安装依赖示例
```bash
pip install drissionpage redis requests pymongo lxml cssselect
```
若需要验证码识别、变量服务等能力,请根据业务另行实现
如需验证码识别、自定义变量服务或下载逻辑,请在业务环境中提供对应服务
## 环境变量
| 变量 | 默认值 | 说明 |
| --- | --- | --- |
| `XSPIDER_REDIS_URL` | `redis://localhost:6379/0` | Redis 连接串 |
| `XSPIDER_REDIS_LIST_KEY` | `xspider:config` | 待处理模板所在 `list` key |
| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | `BLPOP` 阻塞秒数 |
| `XSPIDER_REDIS_LIST_KEY` | `xspider:config` | 待消费的模板地址所在 `list` |
| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | Redis `BLPOP` 阻塞秒数 |
| `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 |
| `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 |
| `XSPIDER_VARIABLE_SERVICE` | `None` | 变量服务接口地址GET 查询POST 写入 |
变量服务要求:
**变量作用域说明**
- 对变量名使用 `site:变量名` 将强制读取/写入当前站点作用域;`global:变量名` 将强制使用全局作用域。
- 无前缀的变量名会先从站点作用域(依赖 `site_id`)查找,未命中再自动回退到全局作用域。
- 写入变量时默认落到当前站点作用域(若缺少 `site_id` 则写入全局),也可通过 `var_scope="global"` 强制写入全局。
- `var_ttl``var_single_use` 会转换为 Redis 过期时间与一次性读取语义,所有数据均保存在 `XSPIDER_REDIS_URL` 指定的实例中。
- `GET {base}?name=变量名&...` 返回 JSON包含 `value` 字段。
- `POST {base}` 提交 JSON `{name, value, ...}`
## 运行
## 启动方式
```bash
python main.py
```
程序将持续阻塞等待 Redis 列表推送 XML 模板地址,下载模板并执行流程
程序将持续拉取 Redis 队列中的 XML 模板地址,下载模板后解析 site → flow → action并完成采集与存储。如需停止可使用 `Ctrl+C`
## XML 模板
## XML 模板结构
模板结构参考示例:
### Schema 提示
若希望编辑器自动提示必填项与可选属性,可在 XML 顶部引入项目内置的 Schema
```xml
<site xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../schema/xspider.xsd"
id="example"
base="https://example.com">
...
</site>
```
- Schema 位于 `schema/xspider.xsd`,涵盖站点、配置、动作、抽取等全部节点。
- 修改为相对路径或网络路径后,可在常见 IDEVS Code、IntelliJ、XMLSpy 等)中获得自动补全、枚举提示与校验。
- Schema 对可扩展字段使用 `anyAttribute`,因此自定义参数不会报错。
### 顶层元素
| 元素 | 说明 |
| --- | --- |
| `<site id base?>` | 必填站点定义,可配置默认 `base` URL |
| `<config>` | 站点级配置:代理、重试、请求头等 |
| `<login>` | 登录流程,允许缺少 `extract`;执行完可配置校验选择器 |
| `<flows>` / `<flow>` | 业务流程,至少包含动作与抽取配置 |
### 模板示例
```xml
<site id="example" base="https://example.com">
@@ -70,10 +120,138 @@ python main.py
</site>
```
支持的 `action` 类型见 `xspider/actions/builtin.py`,如需扩展可继承 `BaseAction` 并注册到 `ActionRegistry`
## Action 基础属性
## 重要说明
所有 `<action>` 元素都会被解析为 `ActionConfig`,其通用属性如下:
- `CaptchaAction` 会自动截图(元素或整页)并调用 `https://captcha.lfei007s.workers.dev`,请求体为 `{image, type}`image 采用 `data:image/png;base64,...` 形式)。可通过 `captcha_config`JSON 字符串)自定义 `url``headers``timeout` 或额外字段。
- 下载文件监听、复杂的分页场景需要根据目标站点扩展。
- 为保证可维护性,所有动作执行过程中均进行了简单日志输出并允许扩展变量解析。需要对框架进行二次开发时,可直接扩展 Action、Extractor 以及 Runner。
| 属性 | 类型 / 默认值 | 说明 |
| --- | --- | --- |
| `type` | `str`(必填) | 决定实际执行的动作类 |
| `selector` | `str` / `None` | DOM 选择器;部分动作必须提供 |
| `mode` | `xpath`(默认)或 `css` | 选择器解析模式 |
| `timeout_ms` | `int` / `10_000` | 等待元素的超时,单位毫秒(仅在需要等待元素/动作时使用,部分动作会忽略该值) |
| `after_wait` | `int` / `0` | 执行完动作后的额外等待时间 |
| `params` | `dict` / `{}` | 其它自定义键值对,会透传给动作 |
`params` 支持 `${变量名}` 形式的变量占位符,运行时通过 `VariableResolver` 在站点上下文中解析。下文动作表格中出现的 `params.xxx` 表示 `<action>` 元素上的额外属性(除通用字段外),在解析时会放入 `params` 字典中。
**通用字段包括**`type``selector``mode``timeout_ms``after_wait`,这些字段会直接映射到 `ActionConfig` 对象,对应的属性不会出现在 `params` 中。
### `params` 字段举例
解析器会把 `<action>` 标签上的自定义属性收入 `params`,例如:
```xml
<!-- url 属性不是通用字段,因此会进入 params -->
<action type="goto" url="https://example.com/dashboard"/>
<!-- download_filename 也会进入 params执行时通过 self.config.params['download_filename'] 读取 -->
<action type="click"
selector="button.export"
download_filename="orders.xlsx"/>
<!-- 多个参数同样会被保留在 params 中 -->
<action type="set_var"
var_name="latest_token"
var_value="${token}"
var_scope="global"
var_ttl="86400"/>
```
上面的三个动作在 Python 中最终对应的 `ActionConfig.params` 分别为:
- `{"url": "https://example.com/dashboard"}`
- `{"download_filename": "orders.xlsx"}`
- `{"var_name": "latest_token", "var_value": "${token}", "var_scope": "global", "var_ttl": "86400"}`
动作类内部通过 `self.config.params[...]` 访问这些值,因此 README 中的 `params.url``params.download_filename``params.var_ttl` 等写法,正是指这些自定义属性在 `params` 字典里的访问方式。
## 内置 Action 类型
| 类型 | 必填字段 | 可选字段 | 说明 |
| --- | --- | --- | --- |
| `goto` | `params.url``selector` | 无 | 跳转到指定 URL为空时回退到 `entry_url`/`base_url`。 |
| `click` | `selector` | `params.button`(支持 `left/right/middle`)、`params.download_filename` | 点击元素,可触发下载。 |
| `type` | `selector``params.text` | 无 | 输入文本;执行前会清空原内容。 |
| `wait_dom_show` | `selector` | 无 | 等待元素出现并返回元素引用。 |
| `wait_dom_gone` | `selector` | 无 | 等待元素从 DOM 中移除。 |
| `wait_dom_hide` | `selector` | 无 | 等待元素隐藏style 含 `display: none`)。 |
| `wait_time` | 无 | `params.timeout_ms`(毫秒,若缺省使用 `timeout_ms` | 单纯 sleep 指定时间。 |
| `run_js` | `params.script``params.text` | 无 | 在当前页面执行 JavaScript并返回结果。 |
| `set_header` | `params.header_name``params.header_value` | 无 | 为当前会话追加请求头。 |
| `set_attr` | `selector``params.attr_name` | `params.attr_value`(默认空字符串) | 修改 DOM 属性值。 |
| `set_var` | `params.var_name``params.var_value` | `params.var_scope``params.var_ttl``params.var_single_use` | 将变量写入变量服务,可带过期策略。 |
| `captcha` | 无(若缺省会截取 `selector` 或整页截图) | `params.image``params.captcha_type``params.captcha_url``params.captcha_config``params.variable` | 调用远程接口识别验证码并写入变量。 |
> **提示**
> - 当 `selector` 与 `mode` 不匹配时(例如 CSS 模式中使用 XPath解析阶段会抛出异常。
> - 所有 `params` 字段均支持变量解析与自定义透传。
> - 若需要扩展新动作,继承 `BaseAction` 并设置 `type_name` 后自动被 `ActionRegistry` 注册。
### SetVarAction 使用示例
`SetVarAction` 用于把临时生成的数据写入 Redis 变量库,并同步到当前站点上下文,供后续步骤通过 `${变量名}``ctx.site_context[...]` 复用。其主要参数含义如下:
| 参数 | 说明 |
| --- | --- |
| `var_name` | 变量名称,支持 `${...}` 占位符,最终会作为变量服务的 `name` 字段。 |
| `var_value` | 变量值,可引用已存在的变量或静态字符串。 |
| `var_scope` | 变量作用域,可选,支持 `global`/`site`。未指定时默认写入当前站点作用域。 |
| `var_ttl` | 过期时间,单位秒,可选。变量服务可据此设置生命周期。 |
| `var_single_use` | 是否一次性变量,可选,常用于验证码、令牌等场景。 |
示例(写入站点作用域中的 `latest_token`,并设置 1 小时过期):
```xml
<action type="set_var"
var_name="latest_token"
var_value="${jwt_token}"
var_ttl="3600"
var_single_use="false"/>
```
动作执行步骤:
1. 解析所有参数并进行变量替换,例如 `${site_id}``${jwt_token}`
2. 根据 `var_scope` 与上下文自动选择站点或全局作用域,并附带 TTL/一次性读取标记写入 Redis。
3. `VariableResolver` 会将值缓存在内存中,后续解析 `${latest_token}``${site:latest_token}` 均可命中(前者会先查站点再回退全局)。
如需写入全局变量,可设置 `var_scope="global"` 或直接在变量名上使用 `global:token_name` 前缀。
### CaptchaAction 使用示例
`CaptchaAction` 会将验证码截图或指定的 base64 图片提交到远程接口,并把识别结果写入 Redis 变量库以及 `site_context`。使用方式如下:
```xml
<action type="captcha"
selector="//img[@id='captcha']"
mode="xpath"
captcha_type="text"
variable="login:captcha_code"
captcha_config='{"url": "https://captcha.example.com/api", "timeout": 60}'/>
```
执行流程:
- 若提供 `params.image`,会直接使用该 base64 字符串;否则优先根据 `selector` 截取元素截图,再退回到整页截图。
- 请求地址取 `params.captcha_url``captcha_config.url`,最后回退到默认 `https://captcha.lfei007s.workers.dev`
- 请求体默认为 `{"image": "...", "type": params.captcha_type}`,并支持通过 `captcha_config` 添加额外键值(如 `{"headers": {...}}`)。
- 服务返回 JSON 后,会尝试读取 `result``text``value``code``data` 等字段作为识别结果。
- 结果最终写入 Redis默认键为站点作用域下的 `captcha_result`,或使用 `params.variable` 覆盖),并同步到 `site_context` 供后续动作引用。
## 抽取与存储
- `<extract>` 支持 `record_css` / `record_xpath` 指定列表元素,`<field>` 定义字段名称、选择器、取值模式及可选的 `value_type`
- `<paginate>` 可配置下一页元素,并限制 `max_pages`
- `<excel_extract>` 用于声明需下载并解析 Excel 文件,当前实现会给出提示,具体逻辑可在 `FlowRunner._handle_excel_extract` 自行扩展。
- `MongoRepository.save_records` 会按照 `flow.unique_keys``unique_columns` 控制去重策略。
## 开发与调试建议
- 日志级别默认 `INFO`,可自行设置环境变量 `PYTHONLOGGING` 或修改 `configure_logging`
- 建议在本地通过伪造 XML 模板直接调用 `FlowRunner.run_site`,避免频繁依赖 Redis。
- 若需要 Network/Proxy、自定义 Headers可在 `<config>` 中补充;所有配置都会在浏览器会话创建前应用。
- `CaptchaAction` 默认使用 `https://captcha.lfei007s.workers.dev`,如需替换可在 `params.captcha_config` 传入 JSON例如 `{"url": "...", "timeout": 60}`)。
## 扩展方向
- **自定义 Action**:在 `xspider/actions` 新增子类并继承 `BaseAction`,设置 `type_name``_execute` 逻辑即可使用。
- **变量服务实现**:可基于 `VariableService` 接口扩展自定义存储,例如不同的 Redis 集群或 REST 服务。
- **采集结果落地**:若不使用 MongoDB可实现新的存储类并在初始化 `FlowRunner` 时注入。

198
schema/xspider.xsd Normal file
View File

@@ -0,0 +1,198 @@
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
elementFormDefault="qualified"
attributeFormDefault="unqualified">
<xs:annotation>
<xs:documentation>
xspider XML 模板 Schema用于在编辑器中提示必填项、枚举值与作用域规则。
适用于描述站点配置、流程、动作与抽取字段。
</xs:documentation>
</xs:annotation>
<!-- 基础类型 -->
<xs:simpleType name="BooleanFlag">
<xs:annotation>
<xs:documentation>
支持 true/false, 1/0, yes/no, on/off。
</xs:documentation>
</xs:annotation>
<xs:restriction base="xs:string">
<xs:enumeration value="true"/>
<xs:enumeration value="false"/>
<xs:enumeration value="1"/>
<xs:enumeration value="0"/>
<xs:enumeration value="yes"/>
<xs:enumeration value="no"/>
<xs:enumeration value="on"/>
<xs:enumeration value="off"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="SelectorMode">
<xs:restriction base="xs:string">
<xs:enumeration value="css"/>
<xs:enumeration value="xpath"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="UniqueKeysMode">
<xs:restriction base="xs:string">
<xs:enumeration value="all"/>
<xs:enumeration value="custom"/>
<xs:enumeration value="null"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="NonNegativeInt">
<xs:restriction base="xs:nonNegativeInteger"/>
</xs:simpleType>
<!-- Header -->
<xs:complexType name="HeaderType">
<xs:annotation>
<xs:documentation>HTTP 请求头设置</xs:documentation>
</xs:annotation>
<xs:attribute name="name" type="xs:string" use="required"/>
<xs:attribute name="value" type="xs:string" use="optional"/>
</xs:complexType>
<!-- Config -->
<xs:complexType name="ConfigType">
<xs:sequence>
<xs:element name="header" type="HeaderType" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="enable_proxy" type="BooleanFlag" default="false"/>
<xs:attribute name="rotate_ua" type="BooleanFlag" default="false"/>
<xs:attribute name="retry" type="xs:nonNegativeInteger" default="3"/>
</xs:complexType>
<!-- Action -->
<xs:complexType name="ActionType" mixed="true">
<xs:annotation>
<xs:documentation>
动作执行配置。内置类型包含goto、click、type、wait_dom_show、
wait_dom_gone、wait_dom_hide、wait_time、run_js、set_header、set_attr、
set_var、captcha。可按需扩展自定义类型。
</xs:documentation>
</xs:annotation>
<xs:sequence/>
<xs:attribute name="type" type="xs:string" use="required"/>
<xs:attribute name="selector" type="xs:string"/>
<xs:attribute name="mode" type="SelectorMode" default="xpath"/>
<xs:attribute name="timeout_ms" type="NonNegativeInt"/>
<xs:attribute name="after_wait" type="NonNegativeInt" default="0"/>
<xs:anyAttribute processContents="lax"/>
</xs:complexType>
<!-- Field -->
<xs:complexType name="FieldType">
<xs:annotation>
<xs:documentation>字段抽取规则selector 可为 CSS 或 XPath。</xs:documentation>
</xs:annotation>
<xs:attribute name="name" type="xs:string" use="required"/>
<xs:attribute name="selector" type="xs:string" use="required"/>
<xs:attribute name="mode" type="SelectorMode" default="css"/>
<xs:attribute name="value_type" type="xs:string"/>
</xs:complexType>
<!-- Download -->
<xs:complexType name="DownloadType">
<xs:annotation>
<xs:documentation>下载附件配置,将所有属性传入下载器。</xs:documentation>
</xs:annotation>
<xs:sequence/>
<xs:anyAttribute processContents="lax"/>
</xs:complexType>
<!-- Extract -->
<xs:complexType name="ExtractType">
<xs:annotation>
<xs:documentation>
表格/列表抽取配置。record_css 或 record_xpath 至少填写一个。
</xs:documentation>
</xs:annotation>
<xs:sequence>
<xs:element name="field" type="FieldType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="download" type="DownloadType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="record_css" type="xs:string"/>
<xs:attribute name="record_xpath" type="xs:string"/>
</xs:complexType>
<!-- Excel Extract -->
<xs:complexType name="ExcelExtractType">
<xs:annotation>
<xs:documentation>Excel 文件抽取配置file_pattern 与 pattern 二选一。</xs:documentation>
</xs:annotation>
<xs:sequence/>
<xs:attribute name="file_pattern" type="xs:string"/>
<xs:attribute name="pattern" type="xs:string"/>
<xs:attribute name="directory" type="xs:string"/>
</xs:complexType>
<!-- Pagination -->
<xs:complexType name="PaginateType">
<xs:annotation>
<xs:documentation>分页配置,可指定 XPath 或 CSS 选择器。</xs:documentation>
</xs:annotation>
<xs:sequence/>
<xs:attribute name="selector" type="xs:string"/>
<xs:attribute name="css" type="xs:string"/>
<xs:attribute name="mode" type="SelectorMode" default="xpath"/>
<xs:attribute name="max_pages" type="xs:nonNegativeInteger"/>
</xs:complexType>
<!-- Flow -->
<xs:complexType name="FlowType">
<xs:annotation>
<xs:documentation>
业务流程。建议至少配置 extract 或 excel_extract。
</xs:documentation>
</xs:annotation>
<xs:sequence>
<xs:element name="action" type="ActionType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="extract" type="ExtractType" minOccurs="0"/>
<xs:element name="excel_extract" type="ExcelExtractType" minOccurs="0"/>
<xs:element name="paginate" type="PaginateType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="id" type="xs:string"/>
<xs:attribute name="entry" type="xs:string"/>
<xs:attribute name="url" type="xs:string"/>
<xs:attribute name="data_type" type="xs:string"/>
<xs:attribute name="unique_keys" type="UniqueKeysMode" default="all"/>
<xs:attribute name="columns" type="xs:string"/>
<xs:anyAttribute processContents="lax"/>
</xs:complexType>
<xs:complexType name="FlowsType">
<xs:sequence>
<xs:element name="flow" type="FlowType" minOccurs="1" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="LoginType">
<xs:annotation>
<xs:documentation>登录流程,结构与 FlowType 相同但允许缺少抽取步骤。</xs:documentation>
</xs:annotation>
<xs:complexContent>
<xs:extension base="FlowType"/>
</xs:complexContent>
</xs:complexType>
<!-- Root -->
<xs:complexType name="SiteType">
<xs:sequence>
<xs:element name="config" type="ConfigType" minOccurs="0"/>
<xs:element name="login" type="LoginType" minOccurs="0"/>
<xs:element name="flows" type="FlowsType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required"/>
<xs:attribute name="base" type="xs:string"/>
<xs:anyAttribute processContents="lax"/>
</xs:complexType>
<xs:element name="site" type="SiteType"/>
</xs:schema>

View File

@@ -23,6 +23,7 @@ class ActionContext:
class BaseAction(ABC):
type_name: str
enabled: bool = True
def __init__(self, config: ActionConfig) -> None:
self.config = config

View File

@@ -19,6 +19,7 @@ def _timeout_seconds(action: ActionConfig) -> float:
class GotoAction(BaseAction):
type_name = "goto"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
url = ctx.resolver.resolve(
@@ -34,6 +35,7 @@ class GotoAction(BaseAction):
class ClickAction(BaseAction):
type_name = "click"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
if not self.config.selector:
@@ -59,6 +61,7 @@ class ClickAction(BaseAction):
class TypeAction(BaseAction):
type_name = "type"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
if not self.config.selector:
@@ -79,6 +82,7 @@ class TypeAction(BaseAction):
class WaitDomShowAction(BaseAction):
type_name = "wait_dom_show"
enabled = True
def _execute(self, ctx: ActionContext) -> Optional[object]:
if not self.config.selector:
@@ -92,6 +96,7 @@ class WaitDomShowAction(BaseAction):
class WaitDomGoneAction(BaseAction):
type_name = "wait_dom_gone"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
if not self.config.selector:
@@ -105,6 +110,7 @@ class WaitDomGoneAction(BaseAction):
class WaitDomHideAction(BaseAction):
type_name = "wait_dom_hide"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
if not self.config.selector:
@@ -118,6 +124,7 @@ class WaitDomHideAction(BaseAction):
class WaitTimeAction(BaseAction):
type_name = "wait_time"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
timeout_raw = self.config.params.get("timeout_ms", str(self.config.timeout_ms))
@@ -133,6 +140,7 @@ class WaitTimeAction(BaseAction):
class RunJsAction(BaseAction):
type_name = "run_js"
enabled = True
def _execute(self, ctx: ActionContext) -> object:
script = self.config.params.get("script") or self.config.params.get("text")
@@ -144,6 +152,7 @@ class RunJsAction(BaseAction):
class SetHeaderAction(BaseAction):
type_name = "set_header"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
header_name = self.config.params.get("header_name")
@@ -158,6 +167,7 @@ class SetHeaderAction(BaseAction):
class SetAttrAction(BaseAction):
type_name = "set_attr"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
selector = self.config.selector
@@ -179,6 +189,7 @@ class SetAttrAction(BaseAction):
class SetVarAction(BaseAction):
type_name = "set_var"
enabled = True
def _execute(self, ctx: ActionContext) -> None:
var_name = self.config.params.get("var_name")
@@ -216,6 +227,7 @@ class SetVarAction(BaseAction):
class CaptchaAction(BaseAction):
type_name = "captcha"
enabled = True
DEFAULT_ENDPOINT = "https://captcha.lfei007s.workers.dev"
_session = requests.Session()

View File

@@ -1,22 +1,10 @@
from __future__ import annotations
import inspect
from typing import Dict, Type
from . import builtin as builtin_actions
from .base import BaseAction
from .builtin import (
CaptchaAction,
ClickAction,
GotoAction,
RunJsAction,
SetAttrAction,
SetHeaderAction,
SetVarAction,
TypeAction,
WaitDomGoneAction,
WaitDomHideAction,
WaitDomShowAction,
WaitTimeAction,
)
class ActionRegistry:
@@ -34,20 +22,17 @@ class ActionRegistry:
@classmethod
def register_builtin(cls) -> None:
for action_cls in (
GotoAction,
ClickAction,
TypeAction,
WaitDomShowAction,
WaitDomGoneAction,
WaitDomHideAction,
WaitTimeAction,
RunJsAction,
SetHeaderAction,
SetAttrAction,
SetVarAction,
CaptchaAction,
for _, action_cls in inspect.getmembers(
builtin_actions, predicate=inspect.isclass
):
if action_cls is BaseAction:
continue
if not issubclass(action_cls, BaseAction):
continue
if action_cls.__module__ != builtin_actions.__name__:
continue
if not getattr(action_cls, "enabled", True):
continue
cls.register(action_cls)

View File

@@ -29,7 +29,7 @@ class TemplateCrawlerApp:
self.settings.mongo_uri,
self.settings.mongo_database,
)
self.variable_service = VariableService(self.settings.variable_service_url)
self.variable_service = VariableService(self.settings.redis_url)
self.parser = XMLSiteParser()
self.runner = FlowRunner(
storage=self.mongo,

View File

@@ -87,14 +87,30 @@ class BrowserSession:
def set_attr(self, selector: str, mode: SelectorMode, attr: str, value: str, timeout: float) -> None:
ele = self._wait_ele(selector, mode, timeout)
script = "arguments[0].setAttribute(arguments[1], arguments[2]);"
try:
self.page.run_js(
"arguments[0].setAttribute(arguments[1], arguments[2]);",
args=(ele, attr, value),
)
self.page.run_js(script, ele, attr, value)
return
except TypeError:
pass
except Exception as exc: # noqa: BLE001
raise BrowserError(f"Failed to set attribute {attr} on {selector}") from exc
attr_method = getattr(ele, "attr", None)
if callable(attr_method):
attr_method(attr, value)
return
set_attr_method = getattr(ele, "set_attr", None)
if callable(set_attr_method):
set_attr_method(attr, value)
return
set_attribute_method = getattr(ele, "set_attribute", None)
if callable(set_attribute_method):
set_attribute_method(attr, value)
return
raise BrowserError(f"Failed to set attribute {attr} on {selector}")
def download(self, filename: str) -> None:
# Placeholder for download handling.
logger.info("Download requested for %s", filename)
@@ -111,7 +127,7 @@ class BrowserSession:
stripped = selector.strip()
if not stripped:
return stripped
expected_prefix = f"{mode.value}="
expected_prefix = f"{mode.value}:"
lowered = stripped.lower()
if lowered.startswith(expected_prefix):
return stripped
@@ -121,11 +137,44 @@ class BrowserSession:
return f"{expected_prefix}{stripped}"
def _wait_ele(self, selector: str, mode: SelectorMode, timeout: float) -> Any:
try:
prefixed_selector = self._prefixed_selector(selector, mode)
return self.page.wait.ele(prefixed_selector, timeout=timeout, mode=mode.value)
last_exc: Optional[Exception] = None
waiter = getattr(self.page, "wait", None)
if waiter:
wait_ele = getattr(waiter, "ele", None)
if callable(wait_ele):
try:
return wait_ele(prefixed_selector, timeout=timeout, mode=mode.value)
except Exception as exc: # noqa: BLE001
raise BrowserError(f"Timeout locating element {selector}") from exc
last_exc = exc
wait_element = getattr(waiter, "element", None)
if callable(wait_element):
try:
return wait_element(prefixed_selector, timeout=timeout, mode=mode.value)
except Exception as exc: # noqa: BLE001
last_exc = exc
direct_wait = getattr(self.page, "wait_ele", None)
if callable(direct_wait):
try:
return direct_wait(prefixed_selector, timeout=timeout, mode=mode.value)
except Exception as exc: # noqa: BLE001
last_exc = exc
try:
element = self.page.ele(prefixed_selector, timeout=timeout, mode=mode.value)
except TypeError:
element = self.page.ele(prefixed_selector, timeout=timeout)
except Exception as exc: # noqa: BLE001
element = None
last_exc = exc
if element:
return element
if last_exc:
raise BrowserError(f"Timeout locating element {selector}") from last_exc
raise BrowserError(f"Timeout locating element {selector}")
def screenshot(
self,

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
@@ -11,7 +10,6 @@ class Settings:
redis_list_key: str
mongo_uri: str
mongo_database: str
variable_service_url: Optional[str]
redis_block_timeout: int
@classmethod
@@ -20,13 +18,11 @@ class Settings:
redis_list_key = os.getenv("XSPIDER_REDIS_LIST_KEY", "xspider:urls")
mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017")
mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider")
variable_service_url = os.getenv("XSPIDER_VARIABLE_SERVICE")
redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30"))
return cls(
redis_url=redis_url,
redis_list_key=redis_list_key,
mongo_uri=mongo_uri,
mongo_database=mongo_database,
variable_service_url=variable_service_url,
redis_block_timeout=redis_block_timeout,
)

View File

@@ -1,45 +1,220 @@
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, Optional
from enum import Enum
from typing import Dict, List, Optional, Tuple
import requests
import redis
logger = logging.getLogger(__name__)
VAR_PATTERN = re.compile(r"\$\{(?P<name>[a-zA-Z0-9_:\-\.]+)\}")
DEFAULT_NAMESPACE = "xspider:variables"
class VariableScope(str, Enum):
site = "site"
global_ = "global"
@classmethod
def from_value(cls, value: Optional[str]) -> Optional["VariableScope"]:
if value is None:
return None
lowered = value.lower()
if lowered == "site":
return cls.site
if lowered == "global":
return cls.global_
return None
@dataclass(frozen=True)
class VariableTarget:
scope: VariableScope
name: str
site_id: Optional[str] = None
def cache_key(self) -> Tuple[str, str, Optional[str]]:
return (self.scope.value, self.name, self.site_id)
@dataclass
class VariableRecord:
target: VariableTarget
value: str
single_use: bool = False
@dataclass
class VariableService:
base_url: Optional[str] = None
session: requests.Session = field(default_factory=requests.Session)
redis_url: str
namespace: str = DEFAULT_NAMESPACE
client: redis.Redis = field(init=False)
def fetch(self, name: str, context: Optional[Dict[str, str]] = None) -> str:
if not self.base_url:
raise RuntimeError(
f"Variable {name} requested but VARIABLE_SERVICE_URL not configured."
def __post_init__(self) -> None:
self.client = redis.Redis.from_url(self.redis_url)
# ---------------------------- Helper Methods ---------------------------- #
def build_targets(
self,
raw_name: str,
context: Optional[Dict[str, str]],
include_fallback: bool = True,
) -> List[VariableTarget]:
site_id = self._site_id_from_context(context)
explicit_scope, stripped_name = self._parse_explicit_scope(raw_name)
if explicit_scope is VariableScope.site:
if not site_id:
raise ValueError(
f"Variable '{raw_name}' requested site scope but site_id missing."
)
params = {"name": name}
if context:
params.update(context)
response = self.session.get(self.base_url, params=params, timeout=10)
response.raise_for_status()
payload = response.json()
if "value" not in payload:
raise KeyError(f"Variable service response missing 'value': {payload}")
return str(payload["value"])
return [VariableTarget(scope=VariableScope.site, name=stripped_name, site_id=site_id)]
if explicit_scope is VariableScope.global_:
return [VariableTarget(scope=VariableScope.global_, name=stripped_name)]
def set(self, name: str, value: str, context: Optional[Dict[str, str]] = None) -> None:
if not self.base_url:
raise RuntimeError("VARIABLE_SERVICE_URL not configured for set_var action.")
payload: Dict[str, str] = {"name": name, "value": value}
if context:
payload.update(context)
response = self.session.post(self.base_url, json=payload, timeout=10)
response.raise_for_status()
targets: List[VariableTarget] = []
resolved_name = stripped_name if explicit_scope else raw_name
if site_id:
targets.append(
VariableTarget(scope=VariableScope.site, name=resolved_name, site_id=site_id)
)
if include_fallback:
targets.append(
VariableTarget(scope=VariableScope.global_, name=resolved_name)
)
return targets
def determine_write_target(
self,
raw_name: str,
context: Optional[Dict[str, str]],
) -> VariableTarget:
site_id = self._site_id_from_context(context)
explicit_scope, stripped_name = self._parse_explicit_scope(raw_name)
if explicit_scope is VariableScope.site:
if not site_id:
raise ValueError(
f"Variable '{raw_name}' requested site scope but site_id missing."
)
return VariableTarget(scope=VariableScope.site, name=stripped_name, site_id=site_id)
if explicit_scope is VariableScope.global_:
return VariableTarget(scope=VariableScope.global_, name=stripped_name)
scope_hint = VariableScope.from_value(self._context_value(context, "var_scope"))
if scope_hint is VariableScope.global_:
return VariableTarget(scope=VariableScope.global_, name=raw_name)
if site_id:
return VariableTarget(scope=VariableScope.site, name=raw_name, site_id=site_id)
return VariableTarget(scope=VariableScope.global_, name=raw_name)
def fetch(
self,
raw_name: str,
context: Optional[Dict[str, str]] = None,
) -> VariableRecord:
targets = self.build_targets(raw_name, context)
for target in targets:
record = self._load_record(target)
if record:
if record.single_use:
self._delete(target)
return record
raise KeyError(f"Variable '{raw_name}' not found (context={context}).")
def set(
self,
raw_name: str,
value: str,
context: Optional[Dict[str, str]] = None,
) -> VariableRecord:
target = self.determine_write_target(raw_name, context)
ttl = self._parse_ttl(self._context_value(context, "var_ttl"))
single_use = self._parse_bool(self._context_value(context, "var_single_use"))
payload = json.dumps({"value": value, "single_use": single_use})
key = self._redis_key(target)
if ttl is not None:
self.client.setex(key, ttl, payload)
else:
self.client.set(key, payload)
return VariableRecord(target=target, value=value, single_use=single_use)
# ---------------------------- Internal helpers -------------------------- #
def _parse_explicit_scope(
self,
raw_name: str,
) -> Tuple[Optional[VariableScope], str]:
if raw_name.startswith("site:"):
return VariableScope.site, raw_name[len("site:") :]
if raw_name.startswith("global:"):
return VariableScope.global_, raw_name[len("global:") :]
return None, raw_name
def _site_id_from_context(self, context: Optional[Dict[str, str]]) -> Optional[str]:
if not context:
return None
for key in ("site_id", "site"):
if key in context and context[key]:
return context[key]
return None
def _context_value(self, context: Optional[Dict[str, str]], key: str) -> Optional[str]:
if not context:
return None
value = context.get(key)
if value is None:
return None
if isinstance(value, str):
return value.strip() or None
return str(value)
def _redis_key(self, target: VariableTarget) -> str:
if target.scope is VariableScope.site:
assert target.site_id, "site scope requires site_id"
return f"{self.namespace}:site:{target.site_id}:{target.name}"
return f"{self.namespace}:global:{target.name}"
def _load_record(self, target: VariableTarget) -> Optional[VariableRecord]:
key = self._redis_key(target)
raw = self.client.get(key)
if raw is None:
return None
try:
payload = json.loads(raw)
except json.JSONDecodeError as exc: # noqa: BLE001
logger.error("Invalid JSON stored for key %s: %s", key, raw)
raise ValueError(f"Invalid variable payload for {target}") from exc
value = payload.get("value")
if value is None:
logger.warning("Variable %s missing 'value' field.", key)
return None
single_use = bool(payload.get("single_use"))
return VariableRecord(target=target, value=str(value), single_use=single_use)
def _delete(self, target: VariableTarget) -> None:
key = self._redis_key(target)
self.client.delete(key)
def _parse_ttl(self, raw: Optional[str]) -> Optional[int]:
if raw is None:
return None
try:
ttl = int(float(raw))
except ValueError as exc:
raise ValueError(f"Invalid TTL value: {raw}") from exc
if ttl < 0:
raise ValueError(f"TTL cannot be negative: {raw}")
return ttl
def _parse_bool(self, raw: Optional[str]) -> bool:
if raw is None:
return False
lowered = raw.lower()
return lowered in {"1", "true", "yes", "on"}
class VariableResolver:
@@ -48,9 +223,13 @@ class VariableResolver:
service: VariableService,
) -> None:
self._service = service
self._cache: Dict[str, str] = {}
self._cache: Dict[Tuple[str, str, Optional[str]], str] = {}
def resolve(self, value: Optional[str], context: Optional[Dict[str, str]] = None) -> Optional[str]:
def resolve(
self,
value: Optional[str],
context: Optional[Dict[str, str]] = None,
) -> Optional[str]:
if not value:
return value
matches = list(VAR_PATTERN.finditer(value))
@@ -59,21 +238,53 @@ class VariableResolver:
result = value
for match in matches:
name = match.group("name")
replacement = self._cache.get(name)
if replacement is None:
try:
replacement = self._service.fetch(name, context)
self._cache[name] = replacement
except Exception as exc: # noqa: BLE001
logger.exception("Failed to resolve variable %s", name)
raise
raw_name = match.group("name")
replacement = self._resolve_single(raw_name, context)
result = result.replace(match.group(0), replacement)
return result
def resolve_dict(self, payload: Dict[str, str], context: Optional[Dict[str, str]] = None) -> Dict[str, str]:
def resolve_dict(
self,
payload: Dict[str, str],
context: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
return {key: self.resolve(value, context) for key, value in payload.items()}
def set(self, name: str, value: str, context: Optional[Dict[str, str]] = None) -> None:
self._cache[name] = value
self._service.set(name, value, context)
def set(
self,
name: str,
value: str,
context: Optional[Dict[str, str]] = None,
) -> None:
record = self._service.set(name, value, context)
cache_key = record.target.cache_key()
if not record.single_use:
self._cache[cache_key] = value
else:
self._cache.pop(cache_key, None)
# ---------------------------- Internal helpers -------------------------- #
def _resolve_single(
self,
raw_name: str,
context: Optional[Dict[str, str]],
) -> str:
targets = self._service.build_targets(raw_name, context)
for target in targets:
cache_key = target.cache_key()
if cache_key in self._cache:
return self._cache[cache_key]
try:
record = self._service.fetch(raw_name, context)
except Exception as exc: # noqa: BLE001
logger.exception("Failed to resolve variable %s", raw_name)
raise
cache_key = record.target.cache_key()
if not record.single_use:
self._cache[cache_key] = record.value
else:
self._cache.pop(cache_key, None)
return record.value