feat(ai): 添加 AI 动作支持,集成 OpenAI GPT-4o 驱动自动化流程

- 新增 AIExecutor 类用于调用 OpenAI API 并解析结构化指令
- 在 Settings 中增加 OpenAI 相关配置项(API Key、模型、超时等)
- 扩展 ActionContext 以支持注入 AI 服务实例
- 实现 AIAction 类处理自然语言提示并执行 AI 规划的浏览器操作
- 支持通过 max_step 参数限制 AI 操作步数(默认5,最大20)
- 支持 include_html 和 include_variables 控制上下文内容传递
- 支持运行时覆盖模型和温度参数
- 增加详细的错误处理与日志记录机制- 更新 README 文档说明 AIAction 使用方法与配置选项
- 更新 XSD schema 支持 ai 类型动作定义- 在 FlowRunner 中完善步骤记录逻辑,支持 AI 执行结果输出
- 添加 plan_ai_action.md 设计文档描述实现细节与规划
This commit is contained in:
2025-10-21 21:38:46 +08:00
parent a1a13aae65
commit 85b7ea4f6c
11 changed files with 604 additions and 13 deletions

3
.idea/misc.xml generated
View File

@@ -4,4 +4,7 @@
<option name="sdkName" value="/opt/homebrew/Caskroom/miniconda/base" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>
</project>

View File

@@ -49,6 +49,11 @@ pip install drissionpage redis requests pymongo lxml cssselect
| `XSPIDER_REDIS_BLOCK_TIMEOUT` | `30` | Redis `BLPOP` 阻塞秒数 |
| `XSPIDER_MONGO_URI` | `mongodb://localhost:27017` | MongoDB 连接串 |
| `XSPIDER_MONGO_DB` | `xspider` | MongoDB 数据库名称 |
| `OPENAI_API_KEY` | `""` | 调用 `gpt-4o` 时使用的 API Key留空则禁用 AI 动作) |
| `OPENAI_API_BASE` | `https://api.openai.com/v1` | OpenAI API 基础地址,可按需切换代理 |
| `OPENAI_MODEL` | `gpt-4o` | 默认模型名称 |
| `OPENAI_TIMEOUT` | `60` | OpenAI 请求超时,秒 |
| `OPENAI_TEMPERATURE` | `0.0` | OpenAI 采样温度 |
**变量作用域说明**
- 对变量名使用 `site:变量名` 将强制读取/写入当前站点作用域;`global:变量名` 将强制使用全局作用域。
@@ -181,6 +186,7 @@ python main.py
| `set_attr` | `selector``params.attr_name` | `params.attr_value`(默认空字符串) | 修改 DOM 属性值。 |
| `set_var` | `params.var_name``params.var_value` | `params.var_scope``params.var_ttl``params.var_single_use` | 将变量写入变量服务,可带过期策略。 |
| `captcha` | 无(若缺省会截取 `selector` 或整页截图) | `params.image``params.captcha_type``params.captcha_url``params.captcha_config``params.variable` | 调用远程接口识别验证码并写入变量。 |
| `ai` | `params.prompt` | `params.max_step``params.model``params.include_html``params.include_variables``params.temperature` | 调用 `gpt-4o` 生成 DrissionPage 操作步骤并执行,支持限制最大步骤数。 |
> **提示**
> - 当 `selector` 与 `mode` 不匹配时(例如 CSS 模式中使用 XPath解析阶段会抛出异常。
@@ -236,6 +242,25 @@ python main.py
- 服务返回 JSON 后,会尝试读取 `result``text``value``code``data` 等字段作为识别结果。
- 结果最终写入 Redis默认键为站点作用域下的 `captcha_result`,或使用 `params.variable` 覆盖),并同步到 `site_context` 供后续动作引用。
### AIAction 使用示例
`AIAction` 通过调用 OpenAI `gpt-4o` 规划并执行一组 DrissionPage 原生操作,适合处理动态页面或复杂交互。请确保已经在环境变量或 `Settings` 中配置 `OPENAI_API_KEY`
```xml
<action type="ai"
prompt="下载当前页面的首个报表附件,并在完成后返回列表页"
max_step="6"
include_html="true"
include_variables="false"/>
```
- `prompt`:必填,自然语言描述任务目标,支持变量占位符。
- `max_step`:限制模型输出的步骤数量(默认为 5最大 20
- `include_html` / `include_variables`:控制是否向模型提供页面 HTML 片段和当前变量上下文。
- 执行过程中会在 `task_logs``steps` 字段中记录每个 AI 子步骤的执行结果、异常信息与返回值。
若未配置 API Key 或 OpenAI 调用失败,动作会抛出异常并在任务日志中记录具体原因。
## 抽取与存储
- `<extract>` 支持 `record_css` / `record_xpath` 指定列表元素,`<field>` 定义字段名称、选择器、取值模式及可选的 `value_type`

51
plan_ai_action.md Normal file
View File

@@ -0,0 +1,51 @@
> 目标:新增 `ai` 类型 Action使其能够根据自定义 prompt 调用 OpenAI 的 `gpt-4o`,按步骤直接驱动 DrissionPage 原生 `Page`/元素方法,并支持 `max_step` 等参数。
## 一、需求分析与约束
1. 明确期望的交互:`ai` 动作接收 prompt结合当前页面状态HTML、URL、变量等生成操作指令。
2. 确定 DrissionPage 原生方法集合(如 `page.ele(...)`, `click`, `input`, `wait.ele`, `run_js`, `scroll` 等),并定义统一的指令格式(动作名称、参数列表)。
3. 确认 OpenAI `gpt-4o` 接口调用方式REST API需要在设置中提供 `OPENAI_API_KEY` 等凭证,并确认网络访问策略(若受限需用户预先允许)。
## 二、ActionConfig 设计
1. 在 Schema / README 中补充 `ai` 行,定义可用参数:
- `params.prompt`:必填,自然语言描述的目标。
- `params.max_step`:可选,限制生成的操作步骤数量,默认值如 5。
- `params.model`:默认 `gpt-4o`,允许覆盖。
- `params.temperature``params.top_p` 等生成参数(可选)。
- `params.context_mode`:决定传递给模型的页面 HTML、变量、历史步骤范围。
2. 决定动作返回值(例如最后的操作总结或 AI 返回的原始响应),并写入日志步骤。
## 三、AI 执行器设计
1. 新增组件(如 `xspider/ai/executor.py`)封装:
- 组装系统提示+用户 prompt包含页面上下文、变量、执行限制、可用 DrissionPage API 列表)。
- 调用 OpenAI `gpt-4o` REST API使用 JSON 模式tool/response schema获取结构化指令。
- 校验 AI 返回值并转换为内部步骤列表(确保动作类型/参数合法)。
2. 定义步骤数据结构Action 指令列表),包括动作类型、定位参数(选择器/方式或直接传递 JS、输入值等。
3. 设计错误处理机制:解析失败、指令无效时的重试 / 回退策略。
## 四、DrissionPage 操作适配
1. 获取 `ActionContext.session.page`,直接调用 DrissionPage 原生方法(避免经过 `BrowserSession` 包装),必要时提供辅助函数简化重复逻辑。
2. 根据步骤类型调用对应的原生方法(`page.ele(...)`, `ele.click(...)`, `ele.input(...)`, `page.wait.ele(...)`, `page.run_js(...)` 等),执行前后记录步骤状态。
3. 若出现异常(元素不存在、方法报错),决定是立即终止、尝试下一步,或把错误反馈给 AI 进行下一轮迭代。
## 五、Action 实现
1.`xspider/actions` 中新增 `ai.py` 或直接扩展 `builtin.py`
- 创建 `AIAction(BaseAction)`,在 `_execute` 中初始化 AI 执行器、构建上下文、驱动步骤执行。
- 支持 `max_step` 限制,记录已执行步骤数量。
- 将执行结果附加到 `ActionContext.site_context`(必要时),供后续动作使用。
2.`ActionRegistry` 中注册 `ai`
3. 处理变量解析:允许 prompt、模型名等字段使用 `${...}` 占位符。
## 六、日志与调试
1. 扩展任务日志步骤记录:在 FlowRunner 中对 `ai` 动作追加详细信息AI prompt、返回指令、执行情况
- 注意隐私 / 成本:可只记录摘要或取前 N 个字符。
2. 提供调试标志(如 `params.debug=true`),决定是否保存原始 AI 响应。
## 七、配置与依赖
1.`Settings` 中新增 AI 服务相关配置(如 API URL、密钥、默认模型
2. 更新 README / Schema说明 `ai` 动作参数、OpenAI API Key 配置、`max_step` 行为及注意事项。
3. 如需第三方 SDK在依赖列表中追加或留空供用户自备
## 八、测试与验证
1. 编写单元测试:模拟 AI 服务返回固定指令,验证解析与执行流程。
2. 提供集成测试脚本(可打桩 AI 响应),检查与 DrissionPage 的联动。
3. 手动验证异常路径AI 返回空、指令超限、元素不存在等),确保有明确错误日志。

View File

@@ -73,7 +73,7 @@
<xs:documentation>
动作执行配置。内置类型包含goto、click、type、wait_dom_show、
wait_dom_gone、wait_dom_hide、wait_time、run_js、set_header、set_attr、
set_var、captcha。可按需扩展自定义类型。
set_var、captcha、ai。可按需扩展自定义类型。
</xs:documentation>
</xs:annotation>
<xs:sequence/>
@@ -195,4 +195,3 @@
<xs:element name="site" type="SiteType"/>
</xs:schema>

View File

@@ -15,10 +15,12 @@ class ActionContext:
session: BrowserSession,
resolver: VariableResolver,
site_context: Dict[str, str],
services: Optional[Dict[str, Any]] = None,
) -> None:
self.session = session
self.resolver = resolver
self.site_context = site_context
self.services = services or {}
class BaseAction(ABC):

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import json
import logging
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
import requests
@@ -279,6 +279,258 @@ class CaptchaAction(BaseAction):
ctx.resolver.set(variable_name, solution, ctx.site_context)
ctx.site_context[variable_name] = solution
class AIAction(BaseAction):
type_name = "ai"
enabled = True
CONTEXT_HTML_LIMIT = 20_000
def _execute(self, ctx: ActionContext) -> Dict[str, Any]:
executor = ctx.services.get("ai_executor")
if not executor:
raise RuntimeError("AI executor is not configured.")
prompt_raw = (
self.config.params.get("prompt")
or self.config.params.get("text")
or self.config.selector
)
prompt_resolved = ctx.resolver.resolve(prompt_raw, ctx.site_context)
if not prompt_resolved:
raise ValueError("ai action requires 'prompt' parameter or selector text.")
max_step_value = self.config.params.get("max_step") or self.config.params.get("max_steps")
if max_step_value:
resolved_max = ctx.resolver.resolve(max_step_value, ctx.site_context) or max_step_value
else:
resolved_max = None
try:
max_steps = int(resolved_max) if resolved_max is not None else 5
except ValueError as exc:
raise ValueError(f"Invalid max_step value: {resolved_max}") from exc
max_steps = max(1, min(max_steps, 20))
include_html_value = self.config.params.get("include_html", "true")
include_html_value = ctx.resolver.resolve(include_html_value, ctx.site_context) or include_html_value
include_html = str(include_html_value).lower() not in {"false", "0", "no", "off"}
include_variables_value = self.config.params.get("include_variables", "true")
include_variables_value = (
ctx.resolver.resolve(include_variables_value, ctx.site_context) or include_variables_value
)
include_variables = str(include_variables_value).lower() not in {"false", "0", "no", "off"}
model_override = None
if self.config.params.get("model"):
model_override = ctx.resolver.resolve(self.config.params["model"], ctx.site_context) or self.config.params["model"]
temperature_override = None
if self.config.params.get("temperature") is not None:
raw_temp = ctx.resolver.resolve(self.config.params.get("temperature"), ctx.site_context)
try:
temperature_override = float(raw_temp)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid temperature value '{raw_temp}'.") from exc
page = ctx.session.page
context_payload: Dict[str, Any] = {
"url": getattr(page, "url", ctx.site_context.get("entry_url")),
"title": getattr(page, "title", None),
}
if include_html:
html_content = self._safe_html(page)
if html_content:
context_payload["html"] = html_content[: self.CONTEXT_HTML_LIMIT]
if include_variables:
context_payload["variables"] = ctx.site_context
plan = executor.plan(
prompt_resolved,
context=context_payload,
max_steps=max_steps,
model=model_override,
temperature=temperature_override,
)
execution_log = self._execute_steps(ctx, plan.steps, max_steps)
return {
"prompt": prompt_resolved[:2000],
"requested_steps": len(plan.steps),
"executed_steps": execution_log,
"complete": plan.complete,
"summary": plan.summary,
}
def _execute_steps(
self,
ctx: ActionContext,
steps: List[Dict[str, Any]],
max_steps: int,
) -> List[Dict[str, Any]]:
page = ctx.session.page
executed: List[Dict[str, Any]] = []
for index, step in enumerate(steps, start=1):
if index > max_steps:
break
action = (step.get("action") or "").lower().strip()
record: Dict[str, Any] = {
"index": index,
"action": action,
"description": step.get("description"),
}
try:
if action == "click":
self._perform_click(page, step)
elif action == "type":
self._perform_type(page, step)
elif action == "wait_for":
self._perform_wait(page, step)
elif action == "run_js":
record["result"] = self._perform_run_js(page, step)
elif action == "scroll":
self._perform_scroll(page, step)
elif action == "goto":
self._perform_goto(page, step)
else:
raise ValueError(f"Unsupported AI action '{action}'.")
record["status"] = "success"
except Exception as exc: # noqa: BLE001
record["status"] = "error"
record["message"] = str(exc)
executed.append(record)
raise
executed.append(record)
return executed
def _perform_click(self, page: Any, step: Dict[str, Any]) -> None:
element = self._resolve_element(page, step)
button = step.get("button")
if button:
element.click(button=button)
else:
element.click()
def _perform_type(self, page: Any, step: Dict[str, Any]) -> None:
element = self._resolve_element(page, step)
text = step.get("text")
if text is None:
raise ValueError("AI type action requires 'text'.")
clear = step.get("clear", True)
if clear:
clear_method = getattr(element, "clear", None)
if callable(clear_method):
clear_method()
input_method = getattr(element, "input", None)
if callable(input_method):
input_method(text)
return
send_keys = getattr(element, "send_keys", None)
if callable(send_keys):
send_keys(text)
return
raise ValueError("Element does not support input typing.")
def _perform_wait(self, page: Any, step: Dict[str, Any]) -> None:
selector = step.get("selector")
mode = (step.get("mode") or "css").lower()
timeout_ms = self._timeout_ms(step.get("timeout"))
timeout_sec = timeout_ms / 1000.0
state = (step.get("state") or "visible").lower()
wait_obj = getattr(page, "wait", None)
pref_selector = self._format_selector(selector, mode)
if not wait_obj:
raise ValueError("DrissionPage wait helper is not available.")
if state == "visible":
wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
elif state in {"gone", "disappear"}:
wait_obj.ele_gone(pref_selector, timeout=timeout_sec, mode=mode)
else:
raise ValueError(f"Unsupported wait state '{state}'.")
def _perform_run_js(self, page: Any, step: Dict[str, Any]) -> Any:
script = step.get("script") or step.get("text")
if not script:
raise ValueError("AI run_js action requires 'script'.")
args = step.get("args") or []
if not isinstance(args, list):
raise ValueError("AI run_js args must be a list.")
return page.run_js(script, *args)
def _perform_scroll(self, page: Any, step: Dict[str, Any]) -> None:
direction = (step.get("direction") or "down").lower()
distance = step.get("value") or step.get("distance") or 600
try:
distance_int = int(distance)
except ValueError as exc:
raise ValueError(f"Invalid scroll distance '{distance}'.") from exc
if direction in {"down", "bottom"}:
page.run_js(f"window.scrollBy(0, {max(distance_int, 1)});")
elif direction in {"up", "top"}:
page.run_js(f"window.scrollBy(0, {-abs(distance_int)});")
else:
raise ValueError(f"Unsupported scroll direction '{direction}'.")
def _perform_goto(self, page: Any, step: Dict[str, Any]) -> None:
url = step.get("url") or step.get("value")
if not url:
raise ValueError("AI goto action requires 'url'.")
page.get(url)
def _resolve_element(self, page: Any, step: Dict[str, Any]) -> Any:
selector = step.get("selector")
if not selector:
raise ValueError("AI action requires 'selector'.")
mode = (step.get("mode") or "css").lower()
timeout_ms = self._timeout_ms(step.get("timeout"))
timeout_sec = timeout_ms / 1000.0
pref_selector = self._format_selector(selector, mode)
wait_obj = getattr(page, "wait", None)
if wait_obj:
try:
return wait_obj.ele(pref_selector, timeout=timeout_sec, mode=mode)
except Exception: # noqa: BLE001
pass
ele_getter = getattr(page, "ele", None)
if not callable(ele_getter):
raise ValueError("DrissionPage page.ele method is unavailable.")
element = ele_getter(pref_selector, timeout=timeout_sec, mode=mode)
if not element:
raise ValueError(f"Failed to locate element '{selector}' (mode={mode}).")
return element
def _format_selector(self, selector: Optional[str], mode: str) -> str:
if not selector:
raise ValueError("Selector must not be empty.")
selector = selector.strip()
lowered = selector.lower()
prefix = f"{mode}:"
if lowered.startswith("css:") or lowered.startswith("xpath:"):
return selector
return f"{prefix}{selector}"
def _safe_html(self, page: Any) -> Optional[str]:
html_attr = getattr(page, "html", None)
if callable(html_attr):
try:
return html_attr()
except Exception: # noqa: BLE001
return None
if isinstance(html_attr, str):
return html_attr
return None
def _timeout_ms(self, value: Optional[Any]) -> int:
if value is None:
return self.config.timeout_ms
resolved = value
if isinstance(value, str):
resolved = value.strip()
try:
timeout_int = int(float(resolved))
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid timeout value '{value}'.") from exc
return max(timeout_int, 0)
def _load_config(self, ctx: ActionContext) -> Dict[str, Any]:
raw_config = self.config.params.get("captcha_config")
if not raw_config:

3
xspider/ai/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .executor import AIExecutor, AIExecutionError, AIPlan
__all__ = ["AIExecutor", "AIExecutionError", "AIPlan"]

182
xspider/ai/executor.py Normal file
View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
class AIExecutionError(RuntimeError):
"""Wrap errors raised when requesting or parsing AI responses."""
@dataclass
class AIPlan:
steps: List[Dict[str, Any]]
summary: str
complete: bool
raw: Dict[str, Any]
class AIExecutor:
def __init__(
self,
api_key: str,
model: str,
*,
base_url: str = "https://api.openai.com/v1",
timeout: int = 60,
temperature: float = 0.0,
) -> None:
self.api_key = api_key or ""
self.model = model
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.temperature = temperature
def plan(
self,
prompt: str,
*,
context: Dict[str, Any],
max_steps: int,
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> AIPlan:
if not self.api_key:
raise AIExecutionError("OpenAI API key is not configured.")
payload = self._build_payload(
prompt,
context,
max_steps,
model_override=model,
temperature_override=temperature,
)
response = self._post("/chat/completions", payload)
plan = self._parse_plan(response, max_steps)
return plan
def _build_payload(
self,
prompt: str,
context: Dict[str, Any],
max_steps: int,
*,
model_override: Optional[str],
temperature_override: Optional[float],
) -> Dict[str, Any]:
system_prompt = (
"You are an assistant that plans browser automation steps using DrissionPage's "
"Python API. Respond ONLY with valid JSON that matches the schema below:\n"
"{\n"
' "steps": [\n'
' {\n'
' "action": "click" | "type" | "wait_for" | "run_js" | "scroll" | "goto",\n'
' "selector": "<css or xpath selector>",\n'
' "mode": "css" | "xpath",\n'
' "text": "<text to input when action is type>",\n'
' "script": "<javascript when action is run_js>",\n'
' "timeout": <timeout milliseconds>,\n'
' "button": "left" | "right" | "middle",\n'
' "description": "<short explanation>",\n'
' "direction": "<scroll direction when action is scroll>",\n'
' "value": "<optional extra value>"\n'
" }\n"
" ],\n"
' "summary": "<short natural language summary>",\n'
' "complete": true | false\n'
"}\n"
f"Limit the number of steps to at most {max_steps}. "
"If a selector is not needed for an action, omit it or set it to null. "
"Prefer XPath when selectors already look like XPath. "
"Do not describe the result in prose outside the JSON."
)
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": json.dumps(
{
"task": prompt,
"context": context,
"constraints": {
"max_steps": max_steps,
"allowed_actions": [
"click",
"type",
"wait_for",
"run_js",
"scroll",
"goto",
],
},
},
ensure_ascii=False,
),
},
]
payload = {
"model": model_override or self.model,
"messages": messages,
"temperature": temperature_override if temperature_override is not None else self.temperature,
"response_format": {"type": "json_object"},
}
return payload
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
except requests.RequestException as exc:
raise AIExecutionError(f"Failed to call OpenAI: {exc}") from exc
if response.status_code >= 400:
raise AIExecutionError(
f"OpenAI API returned status {response.status_code}: {response.text}"
)
try:
data: Dict[str, Any] = response.json()
except ValueError as exc:
raise AIExecutionError("Invalid JSON returned by OpenAI.") from exc
return data
def _parse_plan(self, response: Dict[str, Any], max_steps: int) -> AIPlan:
choices = response.get("choices") or []
if not choices:
raise AIExecutionError("OpenAI response missing choices.")
content = choices[0].get("message", {}).get("content")
if not content:
raise AIExecutionError("OpenAI response missing message content.")
try:
payload = json.loads(content)
except ValueError as exc:
raise AIExecutionError("Failed to parse AI response JSON.") from exc
steps_raw = payload.get("steps")
if not isinstance(steps_raw, list):
raise AIExecutionError("AI response missing 'steps' list.")
steps: List[Dict[str, Any]] = []
for item in steps_raw[:max_steps]:
if isinstance(item, dict):
steps.append(item)
else:
logger.warning("Ignoring malformed AI step: %s", item)
summary = payload.get("summary") or ""
complete = bool(payload.get("complete"))
return AIPlan(steps=steps, summary=summary, complete=complete, raw=payload)

View File

@@ -13,6 +13,7 @@ from .settings import Settings
from .storage import MongoLogRepository, MongoRepository
from .variables import VariableService
from .xml_parser import XMLSiteParser
from .ai import AIExecutor
logger = logging.getLogger(__name__)
@@ -33,11 +34,19 @@ class TemplateCrawlerApp:
self.mongo.client,
self.settings.mongo_database,
)
self.ai_executor = AIExecutor(
api_key=self.settings.openai_api_key,
model=self.settings.openai_model,
base_url=self.settings.openai_api_base,
timeout=self.settings.openai_timeout,
temperature=self.settings.openai_temperature,
)
self.variable_service = VariableService(self.settings.redis_url)
self.parser = XMLSiteParser()
self.runner = FlowRunner(
storage=self.mongo,
log_storage=self.log_repository,
ai_executor=self.ai_executor,
variable_service=self.variable_service,
)
self.http = requests.Session()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import json
import logging
import time
from datetime import datetime
@@ -14,6 +15,7 @@ from .storage import MongoLogRepository, MongoRepository
from .variables import VariableResolver, VariableService
from .utils.selectors import is_xpath_selector
from .actions.base import ActionContext
from .ai import AIExecutor, AIExecutionError
logger = logging.getLogger(__name__)
@@ -23,11 +25,13 @@ class FlowRunner:
self,
storage: MongoRepository,
log_storage: MongoLogRepository,
ai_executor: AIExecutor,
variable_service: VariableService,
extractor: Optional[Extractor] = None,
) -> None:
self.storage = storage
self.log_storage = log_storage
self.ai_executor = ai_executor
self.variable_service = variable_service
self.extractor = extractor or Extractor()
@@ -76,7 +80,8 @@ class FlowRunner:
) -> None:
entry_url = self._resolve_entry(site, flow)
site_context = self._build_context(site, flow, entry_url)
action_context = ActionContext(session, resolver, site_context)
services = {"ai_executor": self.ai_executor}
action_context = ActionContext(session, resolver, site_context, services=services)
metadata = {
"entry": flow.entry or flow.metadata.get("url"),
@@ -112,14 +117,16 @@ class FlowRunner:
action_cls = ActionRegistry.get(action_config.type)
action = action_cls(action_config)
logger.debug("Executing action %s", action_config.type)
self._record_step(
step_entry = self._record_step(
steps,
event="action",
action_type=action_config.type,
selector=action_config.selector,
mode=action_config.mode.value if action_config.mode else None,
)
action.execute(action_context)
result_payload = action.execute(action_context)
if result_payload is not None:
step_entry["result"] = self._sanitize_result(result_payload)
if is_login:
selector = flow.metadata.get("selector")
@@ -222,6 +229,18 @@ class FlowRunner:
summary=summary,
metadata=metadata_updates,
)
except AIExecutionError as exc:
metadata_updates = {
"records_saved": records_saved,
"pages_processed": pages_processed,
}
self.log_storage.mark_error(
log_handle,
exc,
steps=steps,
metadata=metadata_updates,
)
raise
except Exception as exc:
metadata_updates = {
"records_saved": records_saved,
@@ -347,13 +366,44 @@ class FlowRunner:
steps: List[Dict[str, Any]],
event: str,
**payload: Any,
) -> None:
) -> Dict[str, Any]:
entry: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
}
entry.update({k: v for k, v in payload.items() if v is not None})
steps.append(entry)
return entry
def _count_actions(self, steps: List[Dict[str, Any]]) -> int:
return sum(1 for step in steps if step.get("event") == "action")
def _sanitize_result(self, result: Any) -> Any:
if isinstance(result, dict):
sanitized: Dict[str, Any] = {}
for key, value in result.items():
sanitized[key] = self._shorten(value)
return sanitized
if isinstance(result, list):
return [self._shorten(item) for item in result]
return self._shorten(result)
def _shorten(self, value: Any) -> Any:
if isinstance(value, str):
if len(value) <= 500:
return value
return value[:500] + "..."
if isinstance(value, (int, float, bool)) or value is None:
return value
if isinstance(value, (dict, list)):
try:
serialized = json.dumps(value, ensure_ascii=False)
except Exception: # noqa: BLE001
serialized = str(value)
if len(serialized) <= 500:
return value
return serialized[:500] + "..."
text = str(value)
if len(text) <= 500:
return text
return text[:500] + "..."

View File

@@ -11,6 +11,11 @@ class Settings:
mongo_uri: str
mongo_database: str
redis_block_timeout: int
openai_api_base: str
openai_api_key: str
openai_model: str
openai_timeout: int
openai_temperature: float
@classmethod
def from_env(cls) -> "Settings":
@@ -19,10 +24,20 @@ class Settings:
mongo_uri = os.getenv("XSPIDER_MONGO_URI", "mongodb://hpower:hpower666.@192.168.8.154:27017")
mongo_database = os.getenv("XSPIDER_MONGO_DB", "xspider")
redis_block_timeout = int(os.getenv("XSPIDER_REDIS_BLOCK_TIMEOUT", "30"))
openai_api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1")
openai_api_key = os.getenv("OPENAI_API_KEY", "")
openai_model = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_timeout = int(os.getenv("OPENAI_TIMEOUT", "60"))
openai_temperature = float(os.getenv("OPENAI_TEMPERATURE", "0.0"))
return cls(
redis_url=redis_url,
redis_list_key=redis_list_key,
mongo_uri=mongo_uri,
mongo_database=mongo_database,
redis_block_timeout=redis_block_timeout,
openai_api_base=openai_api_base,
openai_api_key=openai_api_key,
openai_model=openai_model,
openai_timeout=openai_timeout,
openai_temperature=openai_temperature,
)