diff --git a/.idea/misc.xml b/.idea/misc.xml
index fef645d..8b44876 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -4,4 +4,7 @@
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index e162216..d926e55 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,11 @@ pip install drissionpage redis requests pymongo lxml cssselect
| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | Redis `BLPOP` 阻塞秒数 |
| `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 |
| `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 |
+| `OPENAI_API_KEY` | `""` | 调用 `gpt-4o` 时使用的 API Key(留空则禁用 AI 动作) |
+| `OPENAI_API_BASE` | `https://api.openai.com/v1` | OpenAI API 基础地址,可按需切换代理 |
+| `OPENAI_MODEL` | `gpt-4o` | 默认模型名称 |
+| `OPENAI_TIMEOUT` | `60` | OpenAI 请求超时,秒 |
+| `OPENAI_TEMPERATURE` | `0.0` | OpenAI 采样温度 |
**变量作用域说明**
- 对变量名使用 `site:变量名` 将强制读取/写入当前站点作用域;`global:变量名` 将强制使用全局作用域。
@@ -181,6 +186,7 @@ python main.py
| `set_attr` | `selector`、`params.attr_name` | `params.attr_value`(默认空字符串) | 修改 DOM 属性值。 |
| `set_var` | `params.var_name`、`params.var_value` | `params.var_scope`、`params.var_ttl`、`params.var_single_use` | 将变量写入变量服务,可带过期策略。 |
| `captcha` | 无(若缺省会截取 `selector` 或整页截图) | `params.image`、`params.captcha_type`、`params.captcha_url`、`params.captcha_config`、`params.variable` | 调用远程接口识别验证码并写入变量。 |
+| `ai` | `params.prompt` | `params.max_step`、`params.model`、`params.include_html`、`params.include_variables`、`params.temperature` | 调用 `gpt-4o` 生成 DrissionPage 操作步骤并执行,支持限制最大步骤数。 |
> **提示**
> - 当 `selector` 与 `mode` 不匹配时(例如 CSS 模式中使用 XPath),解析阶段会抛出异常。
@@ -236,6 +242,25 @@ python main.py
- 服务返回 JSON 后,会尝试读取 `result`、`text`、`value`、`code`、`data` 等字段作为识别结果。
- 结果最终写入 Redis(默认键为站点作用域下的 `captcha_result`,或使用 `params.variable` 覆盖),并同步到 `site_context` 供后续动作引用。
+### AIAction 使用示例
+
+`AIAction` 通过调用 OpenAI `gpt-4o` 规划并执行一组 DrissionPage 原生操作,适合处理动态页面或复杂交互。请确保已经在环境变量或 `Settings` 中配置 `OPENAI_API_KEY`。
+
+```xml
+
+```
+
+- `prompt`:必填,自然语言描述任务目标,支持变量占位符。
+- `max_step`:限制模型输出的步骤数量(默认为 5,最大 20)。
+- `include_html` / `include_variables`:控制是否向模型提供页面 HTML 片段和当前变量上下文。
+- 执行过程中会在 `task_logs` 的 `steps` 字段中记录每个 AI 子步骤的执行结果、异常信息与返回值。
+
+若未配置 API Key 或 OpenAI 调用失败,动作会抛出异常并在任务日志中记录具体原因。
+
## 抽取与存储
- `` 支持 `record_css` / `record_xpath` 指定列表元素,`` 定义字段名称、选择器、取值模式及可选的 `value_type`。
diff --git a/plan_ai_action.md b/plan_ai_action.md
new file mode 100644
index 0000000..ebd99ef
--- /dev/null
+++ b/plan_ai_action.md
@@ -0,0 +1,51 @@
+> 目标:新增 `ai` 类型 Action,使其能够根据自定义 prompt 调用 OpenAI 的 `gpt-4o`,按步骤直接驱动 DrissionPage 原生 `Page`/元素方法,并支持 `max_step` 等参数。
+
+## 一、需求分析与约束
+1. 明确期望的交互:`ai` 动作接收 prompt,结合当前页面状态(HTML、URL、变量等)生成操作指令。
+2. 确定 DrissionPage 原生方法集合(如 `page.ele(...)`, `click`, `input`, `wait.ele`, `run_js`, `scroll` 等),并定义统一的指令格式(动作名称、参数列表)。
+3. 确认 OpenAI `gpt-4o` 接口调用方式(REST API),需要在设置中提供 `OPENAI_API_KEY` 等凭证,并确认网络访问策略(若受限需用户预先允许)。
+
+## 二、ActionConfig 设计
+1. 在 Schema / README 中补充 `ai` 行,定义可用参数:
+ - `params.prompt`:必填,自然语言描述的目标。
+ - `params.max_step`:可选,限制生成的操作步骤数量,默认值如 5。
+ - `params.model`:默认 `gpt-4o`,允许覆盖。
+ - `params.temperature`、`params.top_p` 等生成参数(可选)。
+ - `params.context_mode`:决定传递给模型的页面 HTML、变量、历史步骤范围。
+2. 决定动作返回值(例如最后的操作总结或 AI 返回的原始响应),并写入日志步骤。
+
+## 三、AI 执行器设计
+1. 新增组件(如 `xspider/ai/executor.py`)封装:
+ - 组装系统提示+用户 prompt(包含页面上下文、变量、执行限制、可用 DrissionPage API 列表)。
+ - 调用 OpenAI `gpt-4o` REST API,使用 JSON 模式(tool/response schema)获取结构化指令。
+ - 校验 AI 返回值并转换为内部步骤列表(确保动作类型/参数合法)。
+2. 定义步骤数据结构(Action 指令列表),包括动作类型、定位参数(选择器/方式或直接传递 JS)、输入值等。
+3. 设计错误处理机制:解析失败、指令无效时的重试 / 回退策略。
+
+## 四、DrissionPage 操作适配
+1. 获取 `ActionContext.session.page`,直接调用 DrissionPage 原生方法(避免经过 `BrowserSession` 包装),必要时提供辅助函数简化重复逻辑。
+2. 根据步骤类型调用对应的原生方法(`page.ele(...)`, `ele.click(...)`, `ele.input(...)`, `page.wait.ele(...)`, `page.run_js(...)` 等),执行前后记录步骤状态。
+3. 若出现异常(元素不存在、方法报错),决定是立即终止、尝试下一步,或把错误反馈给 AI 进行下一轮迭代。
+
+## 五、Action 实现
+1. 在 `xspider/actions` 中新增 `ai.py` 或直接扩展 `builtin.py`:
+ - 创建 `AIAction(BaseAction)`,在 `_execute` 中初始化 AI 执行器、构建上下文、驱动步骤执行。
+ - 支持 `max_step` 限制,记录已执行步骤数量。
+ - 将执行结果附加到 `ActionContext.site_context`(必要时),供后续动作使用。
+2. 在 `ActionRegistry` 中注册 `ai`。
+3. 处理变量解析:允许 prompt、模型名等字段使用 `${...}` 占位符。
+
+## 六、日志与调试
+1. 扩展任务日志步骤记录:在 FlowRunner 中对 `ai` 动作追加详细信息(AI prompt、返回指令、执行情况)。
+ - 注意隐私 / 成本:可只记录摘要或取前 N 个字符。
+2. 提供调试标志(如 `params.debug=true`),决定是否保存原始 AI 响应。
+
+## 七、配置与依赖
+1. 在 `Settings` 中新增 AI 服务相关配置(如 API URL、密钥、默认模型)。
+2. 更新 README / Schema,说明 `ai` 动作参数、OpenAI API Key 配置、`max_step` 行为及注意事项。
+3. 如需第三方 SDK,在依赖列表中追加(或留空供用户自备)。
+
+## 八、测试与验证
+1. 编写单元测试:模拟 AI 服务返回固定指令,验证解析与执行流程。
+2. 提供集成测试脚本(可打桩 AI 响应),检查与 DrissionPage 的联动。
+3. 手动验证异常路径(AI 返回空、指令超限、元素不存在等),确保有明确错误日志。
diff --git a/schema/xspider.xsd b/schema/xspider.xsd
index 434fca4..874f829 100644
--- a/schema/xspider.xsd
+++ b/schema/xspider.xsd
@@ -73,7 +73,7 @@
动作执行配置。内置类型包含:goto、click、type、wait_dom_show、
wait_dom_gone、wait_dom_hide、wait_time、run_js、set_header、set_attr、
- set_var、captcha。可按需扩展自定义类型。
+ set_var、captcha、ai。可按需扩展自定义类型。
@@ -195,4 +195,3 @@
-
diff --git a/xspider/actions/base.py b/xspider/actions/base.py
index 4dc4e5e..5d73bf0 100644
--- a/xspider/actions/base.py
+++ b/xspider/actions/base.py
@@ -15,10 +15,12 @@ class ActionContext:
session: BrowserSession,
resolver: VariableResolver,
site_context: Dict[str, str],
+ services: Optional[Dict[str, Any]] = None,
) -> None:
self.session = session
self.resolver = resolver
self.site_context = site_context
+ self.services = services or {}
class BaseAction(ABC):
diff --git a/xspider/actions/builtin.py b/xspider/actions/builtin.py
index a534340..cd5d255 100644
--- a/xspider/actions/builtin.py
+++ b/xspider/actions/builtin.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import json
import logging
import time
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional
import requests
@@ -279,6 +279,258 @@ class CaptchaAction(BaseAction):
ctx.resolver.set(variable_name, solution, ctx.site_context)
ctx.site_context[variable_name] = solution
+
+class AIAction(BaseAction):
+ type_name = "ai"
+ enabled = True
+
+ CONTEXT_HTML_LIMIT = 20_000
+
+ def _execute(self, ctx: ActionContext) -> Dict[str, Any]:
+ executor = ctx.services.get("ai_executor")
+ if not executor:
+ raise RuntimeError("AI executor is not configured.")
+
+ prompt_raw = (
+ self.config.params.get("prompt")
+ or self.config.params.get("text")
+ or self.config.selector
+ )
+ prompt_resolved = ctx.resolver.resolve(prompt_raw, ctx.site_context)
+ if not prompt_resolved:
+ raise ValueError("ai action requires 'prompt' parameter or selector text.")
+
+ max_step_value = self.config.params.get("max_step") or self.config.params.get("max_steps")
+ if max_step_value:
+ resolved_max = ctx.resolver.resolve(max_step_value, ctx.site_context) or max_step_value
+ else:
+ resolved_max = None
+ try:
+ max_steps = int(resolved_max) if resolved_max is not None else 5
+ except ValueError as exc:
+ raise ValueError(f"Invalid max_step value: {resolved_max}") from exc
+ max_steps = max(1, min(max_steps, 20))
+
+ include_html_value = self.config.params.get("include_html", "true")
+ include_html_value = ctx.resolver.resolve(include_html_value, ctx.site_context) or include_html_value
+ include_html = str(include_html_value).lower() not in {"false", "0", "no", "off"}
+
+ include_variables_value = self.config.params.get("include_variables", "true")
+ include_variables_value = (
+ ctx.resolver.resolve(include_variables_value, ctx.site_context) or include_variables_value
+ )
+ include_variables = str(include_variables_value).lower() not in {"false", "0", "no", "off"}
+
+ model_override = None
+ if self.config.params.get("model"):
+ model_override = ctx.resolver.resolve(self.config.params["model"], ctx.site_context) or self.config.params["model"]
+ temperature_override = None
+ if self.config.params.get("temperature") is not None:
+ raw_temp = ctx.resolver.resolve(self.config.params.get("temperature"), ctx.site_context)
+ try:
+ temperature_override = float(raw_temp)
+ except (TypeError, ValueError) as exc:
+ raise ValueError(f"Invalid temperature value '{raw_temp}'.") from exc
+
+ page = ctx.session.page
+ context_payload: Dict[str, Any] = {
+ "url": getattr(page, "url", ctx.site_context.get("entry_url")),
+ "title": getattr(page, "title", None),
+ }
+ if include_html:
+ html_content = self._safe_html(page)
+ if html_content:
+ context_payload["html"] = html_content[: self.CONTEXT_HTML_LIMIT]
+ if include_variables:
+ context_payload["variables"] = ctx.site_context
+
+ plan = executor.plan(
+ prompt_resolved,
+ context=context_payload,
+ max_steps=max_steps,
+ model=model_override,
+ temperature=temperature_override,
+ )
+
+ execution_log = self._execute_steps(ctx, plan.steps, max_steps)
+ return {
+ "prompt": prompt_resolved[:2000],
+ "requested_steps": len(plan.steps),
+ "executed_steps": execution_log,
+ "complete": plan.complete,
+ "summary": plan.summary,
+ }
+
+ def _execute_steps(
+ self,
+ ctx: ActionContext,
+ steps: List[Dict[str, Any]],
+ max_steps: int,
+ ) -> List[Dict[str, Any]]:
+ page = ctx.session.page
+ executed: List[Dict[str, Any]] = []
+ for index, step in enumerate(steps, start=1):
+ if index > max_steps:
+ break
+ action = (step.get("action") or "").lower().strip()
+ record: Dict[str, Any] = {
+ "index": index,
+ "action": action,
+ "description": step.get("description"),
+ }
+ try:
+ if action == "click":
+ self._perform_click(page, step)
+ elif action == "type":
+ self._perform_type(page, step)
+ elif action == "wait_for":
+ self._perform_wait(page, step)
+ elif action == "run_js":
+ record["result"] = self._perform_run_js(page, step)
+ elif action == "scroll":
+ self._perform_scroll(page, step)
+ elif action == "goto":
+ self._perform_goto(page, step)
+ else:
+ raise ValueError(f"Unsupported AI action '{action}'.")
+ record["status"] = "success"
+ except Exception as exc: # noqa: BLE001
+ record["status"] = "error"
+ record["message"] = str(exc)
+ executed.append(record)
+ raise
+ executed.append(record)
+ return executed
+
+ def _perform_click(self, page: Any, step: Dict[str, Any]) -> None:
+ element = self._resolve_element(page, step)
+ button = step.get("button")
+ if button:
+ element.click(button=button)
+ else:
+ element.click()
+
+ def _perform_type(self, page: Any, step: Dict[str, Any]) -> None:
+ element = self._resolve_element(page, step)
+ text = step.get("text")
+ if text is None:
+ raise ValueError("AI type action requires 'text'.")
+ clear = step.get("clear", True)
+ if clear:
+ clear_method = getattr(element, "clear", None)
+ if callable(clear_method):
+ clear_method()
+ input_method = getattr(element, "input", None)
+ if callable(input_method):
+ input_method(text)
+ return
+ send_keys = getattr(element, "send_keys", None)
+ if callable(send_keys):
+ send_keys(text)
+ return
+ raise ValueError("Element does not support input typing.")
+
+ def _perform_wait(self, page: Any, step: Dict[str, Any]) -> None:
+ selector = step.get("selector")
+ mode = (step.get("mode") or "css").lower()
+ timeout_ms = self._timeout_ms(step.get("timeout"))
+ timeout_sec = timeout_ms / 1000.0
+ state = (step.get("state") or "visible").lower()
+ wait_obj = getattr(page, "wait", None)
+ pref_selector = self._format_selector(selector, mode)
+ if not wait_obj:
+ raise ValueError("DrissionPage wait helper is not available.")
+ if state == "visible":
+ wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
+ elif state in {"gone", "disappear"}:
+ wait_obj.ele_gone(pref_selector, timeout=timeout_sec, mode=mode)
+ else:
+ raise ValueError(f"Unsupported wait state '{state}'.")
+
+ def _perform_run_js(self, page: Any, step: Dict[str, Any]) -> Any:
+ script = step.get("script") or step.get("text")
+ if not script:
+ raise ValueError("AI run_js action requires 'script'.")
+ args = step.get("args") or []
+ if not isinstance(args, list):
+ raise ValueError("AI run_js args must be a list.")
+ return page.run_js(script, *args)
+
+ def _perform_scroll(self, page: Any, step: Dict[str, Any]) -> None:
+ direction = (step.get("direction") or "down").lower()
+ distance = step.get("value") or step.get("distance") or 600
+ try:
+ distance_int = int(distance)
+ except ValueError as exc:
+ raise ValueError(f"Invalid scroll distance '{distance}'.") from exc
+ if direction in {"down", "bottom"}:
+ page.run_js(f"window.scrollBy(0, {max(distance_int, 1)});")
+ elif direction in {"up", "top"}:
+ page.run_js(f"window.scrollBy(0, {-abs(distance_int)});")
+ else:
+ raise ValueError(f"Unsupported scroll direction '{direction}'.")
+
+ def _perform_goto(self, page: Any, step: Dict[str, Any]) -> None:
+ url = step.get("url") or step.get("value")
+ if not url:
+ raise ValueError("AI goto action requires 'url'.")
+ page.get(url)
+
+ def _resolve_element(self, page: Any, step: Dict[str, Any]) -> Any:
+ selector = step.get("selector")
+ if not selector:
+ raise ValueError("AI action requires 'selector'.")
+ mode = (step.get("mode") or "css").lower()
+ timeout_ms = self._timeout_ms(step.get("timeout"))
+ timeout_sec = timeout_ms / 1000.0
+ pref_selector = self._format_selector(selector, mode)
+ wait_obj = getattr(page, "wait", None)
+ if wait_obj:
+ try:
+ return wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
+ except Exception: # noqa: BLE001
+ pass
+ ele_getter = getattr(page, "ele", None)
+ if not callable(ele_getter):
+ raise ValueError("DrissionPage page.ele method is unavailable.")
+ element = ele_getter(pref_selector, timeout=timeout_sec, mode=mode)
+ if not element:
+ raise ValueError(f"Failed to locate element '{selector}' (mode={mode}).")
+ return element
+
+ def _format_selector(self, selector: Optional[str], mode: str) -> str:
+ if not selector:
+ raise ValueError("Selector must not be empty.")
+ selector = selector.strip()
+ lowered = selector.lower()
+ prefix = f"{mode}:"
+ if lowered.startswith("css:") or lowered.startswith("xpath:"):
+ return selector
+ return f"{prefix}{selector}"
+
+ def _safe_html(self, page: Any) -> Optional[str]:
+ html_attr = getattr(page, "html", None)
+ if callable(html_attr):
+ try:
+ return html_attr()
+ except Exception: # noqa: BLE001
+ return None
+ if isinstance(html_attr, str):
+ return html_attr
+ return None
+
+ def _timeout_ms(self, value: Optional[Any]) -> int:
+ if value is None:
+ return self.config.timeout_ms
+ resolved = value
+ if isinstance(value, str):
+ resolved = value.strip()
+ try:
+ timeout_int = int(float(resolved))
+ except (TypeError, ValueError) as exc:
+ raise ValueError(f"Invalid timeout value '{value}'.") from exc
+ return max(timeout_int, 0)
+
def _load_config(self, ctx: ActionContext) -> Dict[str, Any]:
raw_config = self.config.params.get("captcha_config")
if not raw_config:
diff --git a/xspider/ai/__init__.py b/xspider/ai/__init__.py
new file mode 100644
index 0000000..d37dba1
--- /dev/null
+++ b/xspider/ai/__init__.py
@@ -0,0 +1,3 @@
+from .executor import AIExecutor, AIExecutionError, AIPlan
+
+__all__ = ["AIExecutor", "AIExecutionError", "AIPlan"]
diff --git a/xspider/ai/executor.py b/xspider/ai/executor.py
new file mode 100644
index 0000000..b3f4914
--- /dev/null
+++ b/xspider/ai/executor.py
@@ -0,0 +1,182 @@
+from __future__ import annotations
+
+import json
+import logging
+from dataclasses import dataclass
+from typing import Any, Dict, List, Optional
+
+import requests
+
+logger = logging.getLogger(__name__)
+
+
+class AIExecutionError(RuntimeError):
+ """Wrap errors raised when requesting or parsing AI responses."""
+
+
+@dataclass
+class AIPlan:
+ steps: List[Dict[str, Any]]
+ summary: str
+ complete: bool
+ raw: Dict[str, Any]
+
+
+class AIExecutor:
+ def __init__(
+ self,
+ api_key: str,
+ model: str,
+ *,
+ base_url: str = "https://api.openai.com/v1",
+ timeout: int = 60,
+ temperature: float = 0.0,
+ ) -> None:
+ self.api_key = api_key or ""
+ self.model = model
+ self.base_url = base_url.rstrip("/")
+ self.timeout = timeout
+ self.temperature = temperature
+
+ def plan(
+ self,
+ prompt: str,
+ *,
+ context: Dict[str, Any],
+ max_steps: int,
+ model: Optional[str] = None,
+ temperature: Optional[float] = None,
+ ) -> AIPlan:
+ if not self.api_key:
+ raise AIExecutionError("OpenAI API key is not configured.")
+ payload = self._build_payload(
+ prompt,
+ context,
+ max_steps,
+ model_override=model,
+ temperature_override=temperature,
+ )
+ response = self._post("/chat/completions", payload)
+ plan = self._parse_plan(response, max_steps)
+ return plan
+
+ def _build_payload(
+ self,
+ prompt: str,
+ context: Dict[str, Any],
+ max_steps: int,
+ *,
+ model_override: Optional[str],
+ temperature_override: Optional[float],
+ ) -> Dict[str, Any]:
+ system_prompt = (
+ "You are an assistant that plans browser automation steps using DrissionPage's "
+ "Python API. Respond ONLY with valid JSON that matches the schema below:\n"
+ "{\n"
+ ' "steps": [\n'
+ ' {\n'
+ ' "action": "click" | "type" | "wait_for" | "run_js" | "scroll" | "goto",\n'
+ ' "selector": "",\n'
+ ' "mode": "css" | "xpath",\n'
+ ' "text": "",\n'
+ ' "script": "",\n'
+ ' "timeout": ,\n'
+ ' "button": "left" | "right" | "middle",\n'
+ ' "description": "",\n'
+ ' "direction": "",\n'
+ ' "value": ""\n'
+ " }\n"
+ " ],\n"
+ ' "summary": "",\n'
+ ' "complete": true | false\n'
+ "}\n"
+ f"Limit the number of steps to at most {max_steps}. "
+ "If a selector is not needed for an action, omit it or set it to null. "
+ "Prefer XPath when selectors already look like XPath. "
+ "Do not describe the result in prose outside the JSON."
+ )
+
+ messages = [
+ {"role": "system", "content": system_prompt},
+ {
+ "role": "user",
+ "content": json.dumps(
+ {
+ "task": prompt,
+ "context": context,
+ "constraints": {
+ "max_steps": max_steps,
+ "allowed_actions": [
+ "click",
+ "type",
+ "wait_for",
+ "run_js",
+ "scroll",
+ "goto",
+ ],
+ },
+ },
+ ensure_ascii=False,
+ ),
+ },
+ ]
+
+ payload = {
+ "model": model_override or self.model,
+ "messages": messages,
+ "temperature": temperature_override if temperature_override is not None else self.temperature,
+ "response_format": {"type": "json_object"},
+ }
+ return payload
+
+ def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
+ url = f"{self.base_url}{path}"
+ headers = {
+ "Authorization": f"Bearer {self.api_key}",
+ "Content-Type": "application/json",
+ }
+ try:
+ response = requests.post(
+ url,
+ headers=headers,
+ json=payload,
+ timeout=self.timeout,
+ )
+ except requests.RequestException as exc:
+ raise AIExecutionError(f"Failed to call OpenAI: {exc}") from exc
+ if response.status_code >= 400:
+ raise AIExecutionError(
+ f"OpenAI API returned status {response.status_code}: {response.text}"
+ )
+ try:
+ data: Dict[str, Any] = response.json()
+ except ValueError as exc:
+ raise AIExecutionError("Invalid JSON returned by OpenAI.") from exc
+ return data
+
+ def _parse_plan(self, response: Dict[str, Any], max_steps: int) -> AIPlan:
+ choices = response.get("choices") or []
+ if not choices:
+ raise AIExecutionError("OpenAI response missing choices.")
+ content = choices[0].get("message", {}).get("content")
+ if not content:
+ raise AIExecutionError("OpenAI response missing message content.")
+ try:
+ payload = json.loads(content)
+ except ValueError as exc:
+ raise AIExecutionError("Failed to parse AI response JSON.") from exc
+
+ steps_raw = payload.get("steps")
+ if not isinstance(steps_raw, list):
+ raise AIExecutionError("AI response missing 'steps' list.")
+
+ steps: List[Dict[str, Any]] = []
+ for item in steps_raw[:max_steps]:
+ if isinstance(item, dict):
+ steps.append(item)
+ else:
+ logger.warning("Ignoring malformed AI step: %s", item)
+
+ summary = payload.get("summary") or ""
+ complete = bool(payload.get("complete"))
+ return AIPlan(steps=steps, summary=summary, complete=complete, raw=payload)
diff --git a/xspider/app.py b/xspider/app.py
index 94bf0da..33de7ea 100644
--- a/xspider/app.py
+++ b/xspider/app.py
@@ -13,6 +13,7 @@ from .settings import Settings
from .storage import MongoLogRepository, MongoRepository
from .variables import VariableService
from .xml_parser import XMLSiteParser
+from .ai import AIExecutor
logger = logging.getLogger(__name__)
@@ -33,11 +34,19 @@ class TemplateCrawlerApp:
self.mongo.client,
self.settings.mongo_database,
)
+ self.ai_executor = AIExecutor(
+ api_key=self.settings.openai_api_key,
+ model=self.settings.openai_model,
+ base_url=self.settings.openai_api_base,
+ timeout=self.settings.openai_timeout,
+ temperature=self.settings.openai_temperature,
+ )
self.variable_service = VariableService(self.settings.redis_url)
self.parser = XMLSiteParser()
self.runner = FlowRunner(
storage=self.mongo,
log_storage=self.log_repository,
+ ai_executor=self.ai_executor,
variable_service=self.variable_service,
)
self.http = requests.Session()
diff --git a/xspider/runner.py b/xspider/runner.py
index 4c55dd2..d9f94f5 100644
--- a/xspider/runner.py
+++ b/xspider/runner.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import json
import logging
import time
from datetime import datetime
@@ -14,6 +15,7 @@ from .storage import MongoLogRepository, MongoRepository
from .variables import VariableResolver, VariableService
from .utils.selectors import is_xpath_selector
from .actions.base import ActionContext
+from .ai import AIExecutor, AIExecutionError
logger = logging.getLogger(__name__)
@@ -23,11 +25,13 @@ class FlowRunner:
self,
storage: MongoRepository,
log_storage: MongoLogRepository,
+ ai_executor: AIExecutor,
variable_service: VariableService,
extractor: Optional[Extractor] = None,
) -> None:
self.storage = storage
self.log_storage = log_storage
+ self.ai_executor = ai_executor
self.variable_service = variable_service
self.extractor = extractor or Extractor()
@@ -76,7 +80,8 @@ class FlowRunner:
) -> None:
entry_url = self._resolve_entry(site, flow)
site_context = self._build_context(site, flow, entry_url)
- action_context = ActionContext(session, resolver, site_context)
+ services = {"ai_executor": self.ai_executor}
+ action_context = ActionContext(session, resolver, site_context, services=services)
metadata = {
"entry": flow.entry or flow.metadata.get("url"),
@@ -112,14 +117,16 @@ class FlowRunner:
action_cls = ActionRegistry.get(action_config.type)
action = action_cls(action_config)
logger.debug("Executing action %s", action_config.type)
- self._record_step(
+ step_entry = self._record_step(
steps,
event="action",
action_type=action_config.type,
selector=action_config.selector,
mode=action_config.mode.value if action_config.mode else None,
)
- action.execute(action_context)
+ result_payload = action.execute(action_context)
+ if result_payload is not None:
+ step_entry["result"] = self._sanitize_result(result_payload)
if is_login:
selector = flow.metadata.get("selector")
@@ -222,6 +229,18 @@ class FlowRunner:
summary=summary,
metadata=metadata_updates,
)
+ except AIExecutionError as exc:
+ metadata_updates = {
+ "records_saved": records_saved,
+ "pages_processed": pages_processed,
+ }
+ self.log_storage.mark_error(
+ log_handle,
+ exc,
+ steps=steps,
+ metadata=metadata_updates,
+ )
+ raise
except Exception as exc:
metadata_updates = {
"records_saved": records_saved,
@@ -261,12 +280,12 @@ class FlowRunner:
records = list(self.extractor.extract(session.html(), flow.extract))
saved = self.storage.save_records(site, flow, records)
total_records += saved
- self._record_step(
- steps,
- event="extract",
- page=page,
- records=len(records),
- saved=saved,
+ self._record_step(
+ steps,
+ event="extract",
+ page=page,
+ records=len(records),
+ saved=saved,
)
if flow.paginate.max_pages and page >= flow.paginate.max_pages:
@@ -347,13 +366,44 @@ class FlowRunner:
steps: List[Dict[str, Any]],
event: str,
**payload: Any,
- ) -> None:
+ ) -> Dict[str, Any]:
entry: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
}
entry.update({k: v for k, v in payload.items() if v is not None})
steps.append(entry)
+ return entry
def _count_actions(self, steps: List[Dict[str, Any]]) -> int:
return sum(1 for step in steps if step.get("event") == "action")
+
+ def _sanitize_result(self, result: Any) -> Any:
+ if isinstance(result, dict):
+ sanitized: Dict[str, Any] = {}
+ for key, value in result.items():
+ sanitized[key] = self._shorten(value)
+ return sanitized
+ if isinstance(result, list):
+ return [self._shorten(item) for item in result]
+ return self._shorten(result)
+
+ def _shorten(self, value: Any) -> Any:
+ if isinstance(value, str):
+ if len(value) <= 500:
+ return value
+ return value[:500] + "..."
+ if isinstance(value, (int, float, bool)) or value is None:
+ return value
+ if isinstance(value, (dict, list)):
+ try:
+ serialized = json.dumps(value, ensure_ascii=False)
+ except Exception: # noqa: BLE001
+ serialized = str(value)
+ if len(serialized) <= 500:
+ return value
+ return serialized[:500] + "..."
+ text = str(value)
+ if len(text) <= 500:
+ return text
+ return text[:500] + "..."
diff --git a/xspider/settings.py b/xspider/settings.py
index 3a8db65..2dc39d6 100644
--- a/xspider/settings.py
+++ b/xspider/settings.py
@@ -11,6 +11,11 @@ class Settings:
mongo_uri: str
mongo_database: str
redis_block_timeout: int
+ openai_api_base: str
+ openai_api_key: str
+ openai_model: str
+ openai_timeout: int
+ openai_temperature: float
@classmethod
def from_env(cls) -> "Settings":
@@ -19,10 +24,20 @@ class Settings:
mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017")
mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider")
redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30"))
+ openai_api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1")
+ openai_api_key = os.getenv("OPENAI_API_KEY", "")
+ openai_model = os.getenv("OPENAI_MODEL", "gpt-4o")
+ openai_timeout = int(os.getenv("OPENAI_TIMEOUT", "60"))
+ openai_temperature = float(os.getenv("OPENAI_TEMPERATURE", "0.0"))
return cls(
redis_url=redis_url,
redis_list_key=redis_list_key,
mongo_uri=mongo_uri,
mongo_database=mongo_database,
redis_block_timeout=redis_block_timeout,
+ openai_api_base=openai_api_base,
+ openai_api_key=openai_api_key,
+ openai_model=openai_model,
+ openai_timeout=openai_timeout,
+ openai_temperature=openai_temperature,
)