Compare commits

1 Commits

Author SHA1 Message Date
85b7ea4f6c feat(ai): 添加 AI 动作支持,集成 OpenAI GPT-4o 驱动自动化流程
- 新增 AIExecutor 类用于调用 OpenAI API 并解析结构化指令
- 在 Settings 中增加 OpenAI 相关配置项(API Key、模型、超时等)
- 扩展 ActionContext 以支持注入 AI 服务实例
- 实现 AIAction 类处理自然语言提示并执行 AI 规划的浏览器操作
- 支持通过 max_step 参数限制 AI 操作步数(默认5,最大20)
- 支持 include_html 和 include_variables 控制上下文内容传递
- 支持运行时覆盖模型和温度参数
- 增加详细的错误处理与日志记录机制- 更新 README 文档说明 AIAction 使用方法与配置选项
- 更新 XSD schema 支持 ai 类型动作定义- 在 FlowRunner 中完善步骤记录逻辑,支持 AI 执行结果输出
- 添加 plan_ai_action.md 设计文档描述实现细节与规划
2025-10-21 21:38:46 +08:00
11 changed files with 604 additions and 13 deletions

3
.idea/misc.xml generated
View File

@@ -4,4 +4,7 @@
<option name="sdkName" value="/opt/homebrew/Caskroom/miniconda/base" /> <option name="sdkName" value="/opt/homebrew/Caskroom/miniconda/base" />
</component> </component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>
</project> </project>

View File

@@ -49,6 +49,11 @@ pip install drissionpage redis requests pymongo lxml cssselect
| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | Redis `BLPOP` 阻塞秒数 | | `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | Redis `BLPOP` 阻塞秒数 |
| `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 | | `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 |
| `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 | | `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 |
| `OPENAI_API_KEY` | `""` | 调用 `gpt-4o` 时使用的 API Key留空则禁用 AI 动作) |
| `OPENAI_API_BASE` | `https://api.openai.com/v1` | OpenAI API 基础地址,可按需切换代理 |
| `OPENAI_MODEL` | `gpt-4o` | 默认模型名称 |
| `OPENAI_TIMEOUT` | `60` | OpenAI 请求超时,秒 |
| `OPENAI_TEMPERATURE` | `0.0` | OpenAI 采样温度 |
**变量作用域说明** **变量作用域说明**
- 对变量名使用 `site:变量名` 将强制读取/写入当前站点作用域;`global:变量名` 将强制使用全局作用域。 - 对变量名使用 `site:变量名` 将强制读取/写入当前站点作用域;`global:变量名` 将强制使用全局作用域。
@@ -181,6 +186,7 @@ python main.py
| `set_attr` | `selector``params.attr_name` | `params.attr_value`(默认空字符串) | 修改 DOM 属性值。 | | `set_attr` | `selector``params.attr_name` | `params.attr_value`(默认空字符串) | 修改 DOM 属性值。 |
| `set_var` | `params.var_name``params.var_value` | `params.var_scope``params.var_ttl``params.var_single_use` | 将变量写入变量服务,可带过期策略。 | | `set_var` | `params.var_name``params.var_value` | `params.var_scope``params.var_ttl``params.var_single_use` | 将变量写入变量服务,可带过期策略。 |
| `captcha` | 无(若缺省会截取 `selector` 或整页截图) | `params.image``params.captcha_type``params.captcha_url``params.captcha_config``params.variable` | 调用远程接口识别验证码并写入变量。 | | `captcha` | 无(若缺省会截取 `selector` 或整页截图) | `params.image``params.captcha_type``params.captcha_url``params.captcha_config``params.variable` | 调用远程接口识别验证码并写入变量。 |
| `ai` | `params.prompt` | `params.max_step``params.model``params.include_html``params.include_variables``params.temperature` | 调用 `gpt-4o` 生成 DrissionPage 操作步骤并执行,支持限制最大步骤数。 |
> **提示** > **提示**
> - 当 `selector` 与 `mode` 不匹配时(例如 CSS 模式中使用 XPath解析阶段会抛出异常。 > - 当 `selector` 与 `mode` 不匹配时(例如 CSS 模式中使用 XPath解析阶段会抛出异常。
@@ -236,6 +242,25 @@ python main.py
- 服务返回 JSON 后,会尝试读取 `result``text``value``code``data` 等字段作为识别结果。 - 服务返回 JSON 后,会尝试读取 `result``text``value``code``data` 等字段作为识别结果。
- 结果最终写入 Redis默认键为站点作用域下的 `captcha_result`,或使用 `params.variable` 覆盖),并同步到 `site_context` 供后续动作引用。 - 结果最终写入 Redis默认键为站点作用域下的 `captcha_result`,或使用 `params.variable` 覆盖),并同步到 `site_context` 供后续动作引用。
### AIAction 使用示例
`AIAction` 通过调用 OpenAI `gpt-4o` 规划并执行一组 DrissionPage 原生操作,适合处理动态页面或复杂交互。请确保已经在环境变量或 `Settings` 中配置 `OPENAI_API_KEY`
```xml
<action type="ai"
prompt="下载当前页面的首个报表附件,并在完成后返回列表页"
max_step="6"
include_html="true"
include_variables="false"/>
```
- `prompt`:必填,自然语言描述任务目标,支持变量占位符。
- `max_step`:限制模型输出的步骤数量(默认为 5最大 20
- `include_html` / `include_variables`:控制是否向模型提供页面 HTML 片段和当前变量上下文。
- 执行过程中会在 `task_logs``steps` 字段中记录每个 AI 子步骤的执行结果、异常信息与返回值。
若未配置 API Key 或 OpenAI 调用失败,动作会抛出异常并在任务日志中记录具体原因。
## 抽取与存储 ## 抽取与存储
- `<extract>` 支持 `record_css` / `record_xpath` 指定列表元素,`<field>` 定义字段名称、选择器、取值模式及可选的 `value_type` - `<extract>` 支持 `record_css` / `record_xpath` 指定列表元素,`<field>` 定义字段名称、选择器、取值模式及可选的 `value_type`

51
plan_ai_action.md Normal file
View File

@@ -0,0 +1,51 @@
> 目标:新增 `ai` 类型 Action使其能够根据自定义 prompt 调用 OpenAI 的 `gpt-4o`,按步骤直接驱动 DrissionPage 原生 `Page`/元素方法,并支持 `max_step` 等参数。
## 一、需求分析与约束
1. 明确期望的交互:`ai` 动作接收 prompt结合当前页面状态HTML、URL、变量等生成操作指令。
2. 确定 DrissionPage 原生方法集合(如 `page.ele(...)`, `click`, `input`, `wait.ele`, `run_js`, `scroll` 等),并定义统一的指令格式(动作名称、参数列表)。
3. 确认 OpenAI `gpt-4o` 接口调用方式REST API需要在设置中提供 `OPENAI_API_KEY` 等凭证,并确认网络访问策略(若受限需用户预先允许)。
## 二、ActionConfig 设计
1. 在 Schema / README 中补充 `ai` 行,定义可用参数:
- `params.prompt`:必填,自然语言描述的目标。
- `params.max_step`:可选,限制生成的操作步骤数量,默认值如 5。
- `params.model`:默认 `gpt-4o`,允许覆盖。
- `params.temperature``params.top_p` 等生成参数(可选)。
- `params.context_mode`:决定传递给模型的页面 HTML、变量、历史步骤范围。
2. 决定动作返回值(例如最后的操作总结或 AI 返回的原始响应),并写入日志步骤。
## 三、AI 执行器设计
1. 新增组件(如 `xspider/ai/executor.py`)封装:
- 组装系统提示+用户 prompt包含页面上下文、变量、执行限制、可用 DrissionPage API 列表)。
- 调用 OpenAI `gpt-4o` REST API使用 JSON 模式tool/response schema获取结构化指令。
- 校验 AI 返回值并转换为内部步骤列表(确保动作类型/参数合法)。
2. 定义步骤数据结构Action 指令列表),包括动作类型、定位参数(选择器/方式或直接传递 JS、输入值等。
3. 设计错误处理机制:解析失败、指令无效时的重试 / 回退策略。
## 四、DrissionPage 操作适配
1. 获取 `ActionContext.session.page`,直接调用 DrissionPage 原生方法(避免经过 `BrowserSession` 包装),必要时提供辅助函数简化重复逻辑。
2. 根据步骤类型调用对应的原生方法(`page.ele(...)`, `ele.click(...)`, `ele.input(...)`, `page.wait.ele(...)`, `page.run_js(...)` 等),执行前后记录步骤状态。
3. 若出现异常(元素不存在、方法报错),决定是立即终止、尝试下一步,或把错误反馈给 AI 进行下一轮迭代。
## 五、Action 实现
1.`xspider/actions` 中新增 `ai.py` 或直接扩展 `builtin.py`
- 创建 `AIAction(BaseAction)`,在 `_execute` 中初始化 AI 执行器、构建上下文、驱动步骤执行。
- 支持 `max_step` 限制,记录已执行步骤数量。
- 将执行结果附加到 `ActionContext.site_context`(必要时),供后续动作使用。
2.`ActionRegistry` 中注册 `ai`
3. 处理变量解析:允许 prompt、模型名等字段使用 `${...}` 占位符。
## 六、日志与调试
1. 扩展任务日志步骤记录:在 FlowRunner 中对 `ai` 动作追加详细信息AI prompt、返回指令、执行情况
- 注意隐私 / 成本:可只记录摘要或取前 N 个字符。
2. 提供调试标志(如 `params.debug=true`),决定是否保存原始 AI 响应。
## 七、配置与依赖
1.`Settings` 中新增 AI 服务相关配置(如 API URL、密钥、默认模型
2. 更新 README / Schema说明 `ai` 动作参数、OpenAI API Key 配置、`max_step` 行为及注意事项。
3. 如需第三方 SDK在依赖列表中追加或留空供用户自备
## 八、测试与验证
1. 编写单元测试:模拟 AI 服务返回固定指令,验证解析与执行流程。
2. 提供集成测试脚本(可打桩 AI 响应),检查与 DrissionPage 的联动。
3. 手动验证异常路径AI 返回空、指令超限、元素不存在等),确保有明确错误日志。

View File

@@ -73,7 +73,7 @@
<xs:documentation> <xs:documentation>
动作执行配置。内置类型包含goto、click、type、wait_dom_show、 动作执行配置。内置类型包含goto、click、type、wait_dom_show、
wait_dom_gone、wait_dom_hide、wait_time、run_js、set_header、set_attr、 wait_dom_gone、wait_dom_hide、wait_time、run_js、set_header、set_attr、
set_var、captcha。可按需扩展自定义类型。 set_var、captcha、ai。可按需扩展自定义类型。
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
<xs:sequence/> <xs:sequence/>
@@ -195,4 +195,3 @@
<xs:element name="site" type="SiteType"/> <xs:element name="site" type="SiteType"/>
</xs:schema> </xs:schema>

View File

@@ -15,10 +15,12 @@ class ActionContext:
session: BrowserSession, session: BrowserSession,
resolver: VariableResolver, resolver: VariableResolver,
site_context: Dict[str, str], site_context: Dict[str, str],
services: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
self.session = session self.session = session
self.resolver = resolver self.resolver = resolver
self.site_context = site_context self.site_context = site_context
self.services = services or {}
class BaseAction(ABC): class BaseAction(ABC):

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import json import json
import logging import logging
import time import time
from typing import Any, Dict, Optional from typing import Any, Dict, List, Optional
import requests import requests
@@ -279,6 +279,258 @@ class CaptchaAction(BaseAction):
ctx.resolver.set(variable_name, solution, ctx.site_context) ctx.resolver.set(variable_name, solution, ctx.site_context)
ctx.site_context[variable_name] = solution ctx.site_context[variable_name] = solution
class AIAction(BaseAction):
type_name = "ai"
enabled = True
CONTEXT_HTML_LIMIT = 20_000
def _execute(self, ctx: ActionContext) -> Dict[str, Any]:
executor = ctx.services.get("ai_executor")
if not executor:
raise RuntimeError("AI executor is not configured.")
prompt_raw = (
self.config.params.get("prompt")
or self.config.params.get("text")
or self.config.selector
)
prompt_resolved = ctx.resolver.resolve(prompt_raw, ctx.site_context)
if not prompt_resolved:
raise ValueError("ai action requires 'prompt' parameter or selector text.")
max_step_value = self.config.params.get("max_step") or self.config.params.get("max_steps")
if max_step_value:
resolved_max = ctx.resolver.resolve(max_step_value, ctx.site_context) or max_step_value
else:
resolved_max = None
try:
max_steps = int(resolved_max) if resolved_max is not None else 5
except ValueError as exc:
raise ValueError(f"Invalid max_step value: {resolved_max}") from exc
max_steps = max(1, min(max_steps, 20))
include_html_value = self.config.params.get("include_html", "true")
include_html_value = ctx.resolver.resolve(include_html_value, ctx.site_context) or include_html_value
include_html = str(include_html_value).lower() not in {"false", "0", "no", "off"}
include_variables_value = self.config.params.get("include_variables", "true")
include_variables_value = (
ctx.resolver.resolve(include_variables_value, ctx.site_context) or include_variables_value
)
include_variables = str(include_variables_value).lower() not in {"false", "0", "no", "off"}
model_override = None
if self.config.params.get("model"):
model_override = ctx.resolver.resolve(self.config.params["model"], ctx.site_context) or self.config.params["model"]
temperature_override = None
if self.config.params.get("temperature") is not None:
raw_temp = ctx.resolver.resolve(self.config.params.get("temperature"), ctx.site_context)
try:
temperature_override = float(raw_temp)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid temperature value '{raw_temp}'.") from exc
page = ctx.session.page
context_payload: Dict[str, Any] = {
"url": getattr(page, "url", ctx.site_context.get("entry_url")),
"title": getattr(page, "title", None),
}
if include_html:
html_content = self._safe_html(page)
if html_content:
context_payload["html"] = html_content[: self.CONTEXT_HTML_LIMIT]
if include_variables:
context_payload["variables"] = ctx.site_context
plan = executor.plan(
prompt_resolved,
context=context_payload,
max_steps=max_steps,
model=model_override,
temperature=temperature_override,
)
execution_log = self._execute_steps(ctx, plan.steps, max_steps)
return {
"prompt": prompt_resolved[:2000],
"requested_steps": len(plan.steps),
"executed_steps": execution_log,
"complete": plan.complete,
"summary": plan.summary,
}
def _execute_steps(
self,
ctx: ActionContext,
steps: List[Dict[str, Any]],
max_steps: int,
) -> List[Dict[str, Any]]:
page = ctx.session.page
executed: List[Dict[str, Any]] = []
for index, step in enumerate(steps, start=1):
if index > max_steps:
break
action = (step.get("action") or "").lower().strip()
record: Dict[str, Any] = {
"index": index,
"action": action,
"description": step.get("description"),
}
try:
if action == "click":
self._perform_click(page, step)
elif action == "type":
self._perform_type(page, step)
elif action == "wait_for":
self._perform_wait(page, step)
elif action == "run_js":
record["result"] = self._perform_run_js(page, step)
elif action == "scroll":
self._perform_scroll(page, step)
elif action == "goto":
self._perform_goto(page, step)
else:
raise ValueError(f"Unsupported AI action '{action}'.")
record["status"] = "success"
except Exception as exc: # noqa: BLE001
record["status"] = "error"
record["message"] = str(exc)
executed.append(record)
raise
executed.append(record)
return executed
def _perform_click(self, page: Any, step: Dict[str, Any]) -> None:
element = self._resolve_element(page, step)
button = step.get("button")
if button:
element.click(button=button)
else:
element.click()
def _perform_type(self, page: Any, step: Dict[str, Any]) -> None:
element = self._resolve_element(page, step)
text = step.get("text")
if text is None:
raise ValueError("AI type action requires 'text'.")
clear = step.get("clear", True)
if clear:
clear_method = getattr(element, "clear", None)
if callable(clear_method):
clear_method()
input_method = getattr(element, "input", None)
if callable(input_method):
input_method(text)
return
send_keys = getattr(element, "send_keys", None)
if callable(send_keys):
send_keys(text)
return
raise ValueError("Element does not support input typing.")
def _perform_wait(self, page: Any, step: Dict[str, Any]) -> None:
selector = step.get("selector")
mode = (step.get("mode") or "css").lower()
timeout_ms = self._timeout_ms(step.get("timeout"))
timeout_sec = timeout_ms / 1000.0
state = (step.get("state") or "visible").lower()
wait_obj = getattr(page, "wait", None)
pref_selector = self._format_selector(selector, mode)
if not wait_obj:
raise ValueError("DrissionPage wait helper is not available.")
if state == "visible":
wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
elif state in {"gone", "disappear"}:
wait_obj.ele_gone(pref_selector, timeout=timeout_sec, mode=mode)
else:
raise ValueError(f"Unsupported wait state '{state}'.")
def _perform_run_js(self, page: Any, step: Dict[str, Any]) -> Any:
script = step.get("script") or step.get("text")
if not script:
raise ValueError("AI run_js action requires 'script'.")
args = step.get("args") or []
if not isinstance(args, list):
raise ValueError("AI run_js args must be a list.")
return page.run_js(script, *args)
def _perform_scroll(self, page: Any, step: Dict[str, Any]) -> None:
direction = (step.get("direction") or "down").lower()
distance = step.get("value") or step.get("distance") or 600
try:
distance_int = int(distance)
except ValueError as exc:
raise ValueError(f"Invalid scroll distance '{distance}'.") from exc
if direction in {"down", "bottom"}:
page.run_js(f"window.scrollBy(0, {max(distance_int, 1)});")
elif direction in {"up", "top"}:
page.run_js(f"window.scrollBy(0, {-abs(distance_int)});")
else:
raise ValueError(f"Unsupported scroll direction '{direction}'.")
def _perform_goto(self, page: Any, step: Dict[str, Any]) -> None:
url = step.get("url") or step.get("value")
if not url:
raise ValueError("AI goto action requires 'url'.")
page.get(url)
def _resolve_element(self, page: Any, step: Dict[str, Any]) -> Any:
selector = step.get("selector")
if not selector:
raise ValueError("AI action requires 'selector'.")
mode = (step.get("mode") or "css").lower()
timeout_ms = self._timeout_ms(step.get("timeout"))
timeout_sec = timeout_ms / 1000.0
pref_selector = self._format_selector(selector, mode)
wait_obj = getattr(page, "wait", None)
if wait_obj:
try:
return wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
except Exception: # noqa: BLE001
pass
ele_getter = getattr(page, "ele", None)
if not callable(ele_getter):
raise ValueError("DrissionPage page.ele method is unavailable.")
element = ele_getter(pref_selector, timeout=timeout_sec, mode=mode)
if not element:
raise ValueError(f"Failed to locate element '{selector}' (mode={mode}).")
return element
def _format_selector(self, selector: Optional[str], mode: str) -> str:
if not selector:
raise ValueError("Selector must not be empty.")
selector = selector.strip()
lowered = selector.lower()
prefix = f"{mode}:"
if lowered.startswith("css:") or lowered.startswith("xpath:"):
return selector
return f"{prefix}{selector}"
def _safe_html(self, page: Any) -> Optional[str]:
html_attr = getattr(page, "html", None)
if callable(html_attr):
try:
return html_attr()
except Exception: # noqa: BLE001
return None
if isinstance(html_attr, str):
return html_attr
return None
def _timeout_ms(self, value: Optional[Any]) -> int:
if value is None:
return self.config.timeout_ms
resolved = value
if isinstance(value, str):
resolved = value.strip()
try:
timeout_int = int(float(resolved))
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid timeout value '{value}'.") from exc
return max(timeout_int, 0)
def _load_config(self, ctx: ActionContext) -> Dict[str, Any]: def _load_config(self, ctx: ActionContext) -> Dict[str, Any]:
raw_config = self.config.params.get("captcha_config") raw_config = self.config.params.get("captcha_config")
if not raw_config: if not raw_config:

3
xspider/ai/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .executor import AIExecutor, AIExecutionError, AIPlan
__all__ = ["AIExecutor", "AIExecutionError", "AIPlan"]

182
xspider/ai/executor.py Normal file
View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
class AIExecutionError(RuntimeError):
"""Wrap errors raised when requesting or parsing AI responses."""
@dataclass
class AIPlan:
steps: List[Dict[str, Any]]
summary: str
complete: bool
raw: Dict[str, Any]
class AIExecutor:
def __init__(
self,
api_key: str,
model: str,
*,
base_url: str = "https://api.openai.com/v1",
timeout: int = 60,
temperature: float = 0.0,
) -> None:
self.api_key = api_key or ""
self.model = model
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.temperature = temperature
def plan(
self,
prompt: str,
*,
context: Dict[str, Any],
max_steps: int,
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> AIPlan:
if not self.api_key:
raise AIExecutionError("OpenAI API key is not configured.")
payload = self._build_payload(
prompt,
context,
max_steps,
model_override=model,
temperature_override=temperature,
)
response = self._post("/chat/completions", payload)
plan = self._parse_plan(response, max_steps)
return plan
def _build_payload(
self,
prompt: str,
context: Dict[str, Any],
max_steps: int,
*,
model_override: Optional[str],
temperature_override: Optional[float],
) -> Dict[str, Any]:
system_prompt = (
"You are an assistant that plans browser automation steps using DrissionPage's "
"Python API. Respond ONLY with valid JSON that matches the schema below:\n"
"{\n"
' "steps": [\n'
' {\n'
' "action": "click" | "type" | "wait_for" | "run_js" | "scroll" | "goto",\n'
' "selector": "<css or xpath selector>",\n'
' "mode": "css" | "xpath",\n'
' "text": "<text to input when action is type>",\n'
' "script": "<javascript when action is run_js>",\n'
' "timeout": <timeout milliseconds>,\n'
' "button": "left" | "right" | "middle",\n'
' "description": "<short explanation>",\n'
' "direction": "<scroll direction when action is scroll>",\n'
' "value": "<optional extra value>"\n'
" }\n"
" ],\n"
' "summary": "<short natural language summary>",\n'
' "complete": true | false\n'
"}\n"
f"Limit the number of steps to at most {max_steps}. "
"If a selector is not needed for an action, omit it or set it to null. "
"Prefer XPath when selectors already look like XPath. "
"Do not describe the result in prose outside the JSON."
)
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": json.dumps(
{
"task": prompt,
"context": context,
"constraints": {
"max_steps": max_steps,
"allowed_actions": [
"click",
"type",
"wait_for",
"run_js",
"scroll",
"goto",
],
},
},
ensure_ascii=False,
),
},
]
payload = {
"model": model_override or self.model,
"messages": messages,
"temperature": temperature_override if temperature_override is not None else self.temperature,
"response_format": {"type": "json_object"},
}
return payload
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
except requests.RequestException as exc:
raise AIExecutionError(f"Failed to call OpenAI: {exc}") from exc
if response.status_code >= 400:
raise AIExecutionError(
f"OpenAI API returned status {response.status_code}: {response.text}"
)
try:
data: Dict[str, Any] = response.json()
except ValueError as exc:
raise AIExecutionError("Invalid JSON returned by OpenAI.") from exc
return data
def _parse_plan(self, response: Dict[str, Any], max_steps: int) -> AIPlan:
choices = response.get("choices") or []
if not choices:
raise AIExecutionError("OpenAI response missing choices.")
content = choices[0].get("message", {}).get("content")
if not content:
raise AIExecutionError("OpenAI response missing message content.")
try:
payload = json.loads(content)
except ValueError as exc:
raise AIExecutionError("Failed to parse AI response JSON.") from exc
steps_raw = payload.get("steps")
if not isinstance(steps_raw, list):
raise AIExecutionError("AI response missing 'steps' list.")
steps: List[Dict[str, Any]] = []
for item in steps_raw[:max_steps]:
if isinstance(item, dict):
steps.append(item)
else:
logger.warning("Ignoring malformed AI step: %s", item)
summary = payload.get("summary") or ""
complete = bool(payload.get("complete"))
return AIPlan(steps=steps, summary=summary, complete=complete, raw=payload)

View File

@@ -13,6 +13,7 @@ from .settings import Settings
from .storage import MongoLogRepository, MongoRepository from .storage import MongoLogRepository, MongoRepository
from .variables import VariableService from .variables import VariableService
from .xml_parser import XMLSiteParser from .xml_parser import XMLSiteParser
from .ai import AIExecutor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -33,11 +34,19 @@ class TemplateCrawlerApp:
self.mongo.client, self.mongo.client,
self.settings.mongo_database, self.settings.mongo_database,
) )
self.ai_executor = AIExecutor(
api_key=self.settings.openai_api_key,
model=self.settings.openai_model,
base_url=self.settings.openai_api_base,
timeout=self.settings.openai_timeout,
temperature=self.settings.openai_temperature,
)
self.variable_service = VariableService(self.settings.redis_url) self.variable_service = VariableService(self.settings.redis_url)
self.parser = XMLSiteParser() self.parser = XMLSiteParser()
self.runner = FlowRunner( self.runner = FlowRunner(
storage=self.mongo, storage=self.mongo,
log_storage=self.log_repository, log_storage=self.log_repository,
ai_executor=self.ai_executor,
variable_service=self.variable_service, variable_service=self.variable_service,
) )
self.http = requests.Session() self.http = requests.Session()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import time import time
from datetime import datetime from datetime import datetime
@@ -14,6 +15,7 @@ from .storage import MongoLogRepository, MongoRepository
from .variables import VariableResolver, VariableService from .variables import VariableResolver, VariableService
from .utils.selectors import is_xpath_selector from .utils.selectors import is_xpath_selector
from .actions.base import ActionContext from .actions.base import ActionContext
from .ai import AIExecutor, AIExecutionError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -23,11 +25,13 @@ class FlowRunner:
self, self,
storage: MongoRepository, storage: MongoRepository,
log_storage: MongoLogRepository, log_storage: MongoLogRepository,
ai_executor: AIExecutor,
variable_service: VariableService, variable_service: VariableService,
extractor: Optional[Extractor] = None, extractor: Optional[Extractor] = None,
) -> None: ) -> None:
self.storage = storage self.storage = storage
self.log_storage = log_storage self.log_storage = log_storage
self.ai_executor = ai_executor
self.variable_service = variable_service self.variable_service = variable_service
self.extractor = extractor or Extractor() self.extractor = extractor or Extractor()
@@ -76,7 +80,8 @@ class FlowRunner:
) -> None: ) -> None:
entry_url = self._resolve_entry(site, flow) entry_url = self._resolve_entry(site, flow)
site_context = self._build_context(site, flow, entry_url) site_context = self._build_context(site, flow, entry_url)
action_context = ActionContext(session, resolver, site_context) services = {"ai_executor": self.ai_executor}
action_context = ActionContext(session, resolver, site_context, services=services)
metadata = { metadata = {
"entry": flow.entry or flow.metadata.get("url"), "entry": flow.entry or flow.metadata.get("url"),
@@ -112,14 +117,16 @@ class FlowRunner:
action_cls = ActionRegistry.get(action_config.type) action_cls = ActionRegistry.get(action_config.type)
action = action_cls(action_config) action = action_cls(action_config)
logger.debug("Executing action %s", action_config.type) logger.debug("Executing action %s", action_config.type)
self._record_step( step_entry = self._record_step(
steps, steps,
event="action", event="action",
action_type=action_config.type, action_type=action_config.type,
selector=action_config.selector, selector=action_config.selector,
mode=action_config.mode.value if action_config.mode else None, mode=action_config.mode.value if action_config.mode else None,
) )
action.execute(action_context) result_payload = action.execute(action_context)
if result_payload is not None:
step_entry["result"] = self._sanitize_result(result_payload)
if is_login: if is_login:
selector = flow.metadata.get("selector") selector = flow.metadata.get("selector")
@@ -222,6 +229,18 @@ class FlowRunner:
summary=summary, summary=summary,
metadata=metadata_updates, metadata=metadata_updates,
) )
except AIExecutionError as exc:
metadata_updates = {
"records_saved": records_saved,
"pages_processed": pages_processed,
}
self.log_storage.mark_error(
log_handle,
exc,
steps=steps,
metadata=metadata_updates,
)
raise
except Exception as exc: except Exception as exc:
metadata_updates = { metadata_updates = {
"records_saved": records_saved, "records_saved": records_saved,
@@ -347,13 +366,44 @@ class FlowRunner:
steps: List[Dict[str, Any]], steps: List[Dict[str, Any]],
event: str, event: str,
**payload: Any, **payload: Any,
) -> None: ) -> Dict[str, Any]:
entry: Dict[str, Any] = { entry: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.utcnow().isoformat(),
"event": event, "event": event,
} }
entry.update({k: v for k, v in payload.items() if v is not None}) entry.update({k: v for k, v in payload.items() if v is not None})
steps.append(entry) steps.append(entry)
return entry
def _count_actions(self, steps: List[Dict[str, Any]]) -> int: def _count_actions(self, steps: List[Dict[str, Any]]) -> int:
return sum(1 for step in steps if step.get("event") == "action") return sum(1 for step in steps if step.get("event") == "action")
def _sanitize_result(self, result: Any) -> Any:
if isinstance(result, dict):
sanitized: Dict[str, Any] = {}
for key, value in result.items():
sanitized[key] = self._shorten(value)
return sanitized
if isinstance(result, list):
return [self._shorten(item) for item in result]
return self._shorten(result)
def _shorten(self, value: Any) -> Any:
if isinstance(value, str):
if len(value) <= 500:
return value
return value[:500] + "..."
if isinstance(value, (int, float, bool)) or value is None:
return value
if isinstance(value, (dict, list)):
try:
serialized = json.dumps(value, ensure_ascii=False)
except Exception: # noqa: BLE001
serialized = str(value)
if len(serialized) <= 500:
return value
return serialized[:500] + "..."
text = str(value)
if len(text) <= 500:
return text
return text[:500] + "..."

View File

@@ -11,6 +11,11 @@ class Settings:
mongo_uri: str mongo_uri: str
mongo_database: str mongo_database: str
redis_block_timeout: int redis_block_timeout: int
openai_api_base: str
openai_api_key: str
openai_model: str
openai_timeout: int
openai_temperature: float
@classmethod @classmethod
def from_env(cls) -> "Settings": def from_env(cls) -> "Settings":
@@ -19,10 +24,20 @@ class Settings:
mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017") mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017")
mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider") mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider")
redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30")) redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30"))
openai_api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1")
openai_api_key = os.getenv("OPENAI_API_KEY", "")
openai_model = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_timeout = int(os.getenv("OPENAI_TIMEOUT", "60"))
openai_temperature = float(os.getenv("OPENAI_TEMPERATURE", "0.0"))
return cls( return cls(
redis_url=redis_url, redis_url=redis_url,
redis_list_key=redis_list_key, redis_list_key=redis_list_key,
mongo_uri=mongo_uri, mongo_uri=mongo_uri,
mongo_database=mongo_database, mongo_database=mongo_database,
redis_block_timeout=redis_block_timeout, redis_block_timeout=redis_block_timeout,
openai_api_base=openai_api_base,
openai_api_key=openai_api_key,
openai_model=openai_model,
openai_timeout=openai_timeout,
openai_temperature=openai_temperature,
) )