feat: implement plugin API request handling with HTTP Basic Auth support
Some checks failed
Build Multi-Platform Binaries / build-frontend (push) Successful in 31s
Build Multi-Platform Binaries / build-binaries (amd64, darwin, server, false) (push) Successful in 4m16s
Build Multi-Platform Binaries / build-binaries (amd64, linux, client, true) (push) Successful in 2m21s
Build Multi-Platform Binaries / build-binaries (amd64, linux, server, true) (push) Successful in 5m31s
Build Multi-Platform Binaries / build-binaries (amd64, windows, server, true) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, client, true) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, server, true) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm64, darwin, server, false) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm64, linux, client, true) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm64, linux, server, true) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (arm64, windows, server, false) (push) Has been cancelled
Build Multi-Platform Binaries / build-binaries (amd64, windows, client, true) (push) Has been cancelled

This commit is contained in:
2026-01-04 20:32:21 +08:00
parent 458bb35005
commit 78982a26b0
9 changed files with 620 additions and 48 deletions

View File

@@ -117,18 +117,45 @@ func (c *Client) SetPluginRegistry(registry *plugin.Registry) {
c.pluginRegistry = registry
}
// logf 安全地记录日志(同时输出到标准日志和日志收集器)
func (c *Client) logf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
log.Print(msg)
if c.logger != nil {
c.logger.Printf(msg)
}
}
// logErrorf 安全地记录错误日志
func (c *Client) logErrorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
log.Print(msg)
if c.logger != nil {
c.logger.Errorf(msg)
}
}
// logWarnf 安全地记录警告日志
func (c *Client) logWarnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
log.Print(msg)
if c.logger != nil {
c.logger.Warnf(msg)
}
}
// Run 启动客户端(带断线重连)
func (c *Client) Run() error {
for {
if err := c.connect(); err != nil {
log.Printf("[Client] Connect error: %v", err)
log.Printf("[Client] Reconnecting in %v...", reconnectDelay)
c.logErrorf("[Client] Connect error: %v", err)
c.logf("[Client] Reconnecting in %v...", reconnectDelay)
time.Sleep(reconnectDelay)
continue
}
c.handleSession()
log.Printf("[Client] Disconnected, reconnecting...")
c.logWarnf("[Client] Disconnected, reconnecting...")
time.Sleep(disconnectDelay)
}
}
@@ -175,10 +202,10 @@ func (c *Client) connect() error {
if authResp.ClientID != "" && authResp.ClientID != c.ID {
c.ID = authResp.ClientID
saveClientID(c.ID)
log.Printf("[Client] New ID assigned and saved: %s", c.ID)
c.logf("[Client] New ID assigned and saved: %s", c.ID)
}
log.Printf("[Client] Authenticated as %s", c.ID)
c.logf("[Client] Authenticated as %s", c.ID)
session, err := yamux.Client(conn, nil)
if err != nil {
@@ -251,6 +278,8 @@ func (c *Client) handleStream(stream net.Conn) {
c.handleLogStop(stream, msg)
case protocol.MsgTypePluginStatusQuery:
c.handlePluginStatusQuery(stream, msg)
case protocol.MsgTypePluginAPIRequest:
c.handlePluginAPIRequest(stream, msg)
}
}
@@ -258,7 +287,7 @@ func (c *Client) handleStream(stream net.Conn) {
func (c *Client) handleProxyConfig(msg *protocol.Message) {
var cfg protocol.ProxyConfig
if err := msg.ParsePayload(&cfg); err != nil {
log.Printf("[Client] Parse proxy config error: %v", err)
c.logErrorf("[Client] Parse proxy config error: %v", err)
return
}
@@ -266,9 +295,9 @@ func (c *Client) handleProxyConfig(msg *protocol.Message) {
c.rules = cfg.Rules
c.mu.Unlock()
log.Printf("[Client] Received %d proxy rules", len(cfg.Rules))
c.logf("[Client] Received %d proxy rules", len(cfg.Rules))
for _, r := range cfg.Rules {
log.Printf("[Client] %s: %s:%d", r.Name, r.LocalIP, r.LocalPort)
c.logf("[Client] %s: %s:%d", r.Name, r.LocalIP, r.LocalPort)
}
}
@@ -276,7 +305,7 @@ func (c *Client) handleProxyConfig(msg *protocol.Message) {
func (c *Client) handleNewProxy(stream net.Conn, msg *protocol.Message) {
var req protocol.NewProxyRequest
if err := msg.ParsePayload(&req); err != nil {
log.Printf("[Client] Parse new proxy request error: %v", err)
c.logErrorf("[Client] Parse new proxy request error: %v", err)
return
}
@@ -291,14 +320,14 @@ func (c *Client) handleNewProxy(stream net.Conn, msg *protocol.Message) {
c.mu.RUnlock()
if rule == nil {
log.Printf("[Client] Unknown port %d", req.RemotePort)
c.logWarnf("[Client] Unknown port %d", req.RemotePort)
return
}
localAddr := fmt.Sprintf("%s:%d", rule.LocalIP, rule.LocalPort)
localConn, err := net.DialTimeout("tcp", localAddr, localDialTimeout)
if err != nil {
log.Printf("[Client] Connect %s error: %v", localAddr, err)
c.logErrorf("[Client] Connect %s error: %v", localAddr, err)
return
}
@@ -408,24 +437,24 @@ func (c *Client) findRuleByPort(port int) *protocol.ProxyRule {
func (c *Client) handlePluginConfig(msg *protocol.Message) {
var cfg protocol.PluginConfigSync
if err := msg.ParsePayload(&cfg); err != nil {
log.Printf("[Client] Parse plugin config error: %v", err)
c.logErrorf("[Client] Parse plugin config error: %v", err)
return
}
log.Printf("[Client] Received config for plugin: %s", cfg.PluginName)
c.logf("[Client] Received config for plugin: %s", cfg.PluginName)
// 应用配置到插件
if c.pluginRegistry != nil {
handler, err := c.pluginRegistry.GetClient(cfg.PluginName)
if err != nil {
log.Printf("[Client] Plugin %s not found: %v", cfg.PluginName, err)
c.logWarnf("[Client] Plugin %s not found: %v", cfg.PluginName, err)
return
}
if err := handler.Init(cfg.Config); err != nil {
log.Printf("[Client] Plugin %s init error: %v", cfg.PluginName, err)
c.logErrorf("[Client] Plugin %s init error: %v", cfg.PluginName, err)
return
}
log.Printf("[Client] Plugin %s config applied", cfg.PluginName)
c.logf("[Client] Plugin %s config applied", cfg.PluginName)
}
}
@@ -439,7 +468,7 @@ func (c *Client) handleClientPluginStart(stream net.Conn, msg *protocol.Message)
return
}
log.Printf("[Client] Starting plugin %s for rule %s", req.PluginName, req.RuleName)
c.logf("[Client] Starting plugin %s for rule %s", req.PluginName, req.RuleName)
// 获取插件
if c.pluginRegistry == nil {
@@ -471,7 +500,7 @@ func (c *Client) handleClientPluginStart(stream net.Conn, msg *protocol.Message)
c.runningPlugins[key] = handler
c.pluginMu.Unlock()
log.Printf("[Client] Plugin %s started at %s", req.PluginName, localAddr)
c.logf("[Client] Plugin %s started at %s", req.PluginName, localAddr)
c.sendPluginStatus(stream, req.PluginName, req.RuleName, true, localAddr, "")
}
@@ -502,7 +531,7 @@ func (c *Client) handleClientPluginConn(stream net.Conn, msg *protocol.Message)
c.pluginMu.RUnlock()
if !ok {
log.Printf("[Client] Plugin %s not running", key)
c.logWarnf("[Client] Plugin %s not running", key)
stream.Close()
return
}
@@ -521,15 +550,15 @@ func (c *Client) handleJSPluginInstall(stream net.Conn, msg *protocol.Message) {
return
}
log.Printf("[Client] Installing JS plugin: %s", req.PluginName)
c.logf("[Client] Installing JS plugin: %s", req.PluginName)
// 如果插件已经在运行,先停止它
key := req.PluginName + ":" + req.RuleName
c.pluginMu.Lock()
if existingHandler, ok := c.runningPlugins[key]; ok {
log.Printf("[Client] Stopping existing plugin %s before reinstall", key)
c.logf("[Client] Stopping existing plugin %s before reinstall", key)
if err := existingHandler.Stop(); err != nil {
log.Printf("[Client] Stop existing plugin error: %v", err)
c.logErrorf("[Client] Stop existing plugin error: %v", err)
}
delete(c.runningPlugins, key)
}
@@ -537,11 +566,11 @@ func (c *Client) handleJSPluginInstall(stream net.Conn, msg *protocol.Message) {
// 验证官方签名
if err := c.verifyJSPluginSignature(req.PluginName, req.Source, req.Signature); err != nil {
log.Printf("[Client] JS plugin %s signature verification failed: %v", req.PluginName, err)
c.logErrorf("[Client] JS plugin %s signature verification failed: %v", req.PluginName, err)
c.sendJSPluginResult(stream, req.PluginName, false, "signature verification failed: "+err.Error())
return
}
log.Printf("[Client] JS plugin %s signature verified", req.PluginName)
c.logf("[Client] JS plugin %s signature verified", req.PluginName)
// 创建 JS 插件
jsPlugin, err := script.NewJSPlugin(req.PluginName, req.Source)
@@ -555,7 +584,7 @@ func (c *Client) handleJSPluginInstall(stream net.Conn, msg *protocol.Message) {
c.pluginRegistry.RegisterClient(jsPlugin)
}
log.Printf("[Client] JS plugin %s installed", req.PluginName)
c.logf("[Client] JS plugin %s installed", req.PluginName)
c.sendJSPluginResult(stream, req.PluginName, true, "")
// 保存版本信息(防止降级攻击)
@@ -586,13 +615,13 @@ func (c *Client) sendJSPluginResult(stream net.Conn, name string, success bool,
// startJSPlugin 启动 JS 插件
func (c *Client) startJSPlugin(handler plugin.ClientPlugin, req protocol.JSPluginInstallRequest) {
if err := handler.Init(req.Config); err != nil {
log.Printf("[Client] JS plugin %s init error: %v", req.PluginName, err)
c.logErrorf("[Client] JS plugin %s init error: %v", req.PluginName, err)
return
}
localAddr, err := handler.Start()
if err != nil {
log.Printf("[Client] JS plugin %s start error: %v", req.PluginName, err)
c.logErrorf("[Client] JS plugin %s start error: %v", req.PluginName, err)
return
}
@@ -601,7 +630,7 @@ func (c *Client) startJSPlugin(handler plugin.ClientPlugin, req protocol.JSPlugi
c.runningPlugins[key] = handler
c.pluginMu.Unlock()
log.Printf("[Client] JS plugin %s started at %s", req.PluginName, localAddr)
c.logf("[Client] JS plugin %s started at %s", req.PluginName, localAddr)
}
// verifyJSPluginSignature 验证 JS 插件签名
@@ -664,13 +693,13 @@ func (c *Client) handleClientPluginStop(stream net.Conn, msg *protocol.Message)
handler, ok := c.runningPlugins[key]
if ok {
if err := handler.Stop(); err != nil {
log.Printf("[Client] Plugin %s stop error: %v", key, err)
c.logErrorf("[Client] Plugin %s stop error: %v", key, err)
}
delete(c.runningPlugins, key)
}
c.pluginMu.Unlock()
log.Printf("[Client] Plugin %s stopped", key)
c.logf("[Client] Plugin %s stopped", key)
c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", "")
}
@@ -681,7 +710,7 @@ func (c *Client) handleClientRestart(stream net.Conn, msg *protocol.Message) {
var req protocol.ClientRestartRequest
msg.ParsePayload(&req)
log.Printf("[Client] Restart requested: %s", req.Reason)
c.logf("[Client] Restart requested: %s", req.Reason)
// 发送响应
resp := protocol.ClientRestartResponse{
@@ -694,7 +723,7 @@ func (c *Client) handleClientRestart(stream net.Conn, msg *protocol.Message) {
// 停止所有运行中的插件
c.pluginMu.Lock()
for key, handler := range c.runningPlugins {
log.Printf("[Client] Stopping plugin %s for restart", key)
c.logf("[Client] Stopping plugin %s for restart", key)
handler.Stop()
}
c.runningPlugins = make(map[string]plugin.ClientPlugin)
@@ -717,7 +746,7 @@ func (c *Client) handlePluginConfigUpdate(stream net.Conn, msg *protocol.Message
}
key := req.PluginName + ":" + req.RuleName
log.Printf("[Client] Config update for plugin %s", key)
c.logf("[Client] Config update for plugin %s", key)
c.pluginMu.RLock()
handler, ok := c.runningPlugins[key]
@@ -732,7 +761,7 @@ func (c *Client) handlePluginConfigUpdate(stream net.Conn, msg *protocol.Message
// 停止并重启插件
c.pluginMu.Lock()
if err := handler.Stop(); err != nil {
log.Printf("[Client] Plugin %s stop error: %v", key, err)
c.logErrorf("[Client] Plugin %s stop error: %v", key, err)
}
delete(c.runningPlugins, key)
c.pluginMu.Unlock()
@@ -753,7 +782,7 @@ func (c *Client) handlePluginConfigUpdate(stream net.Conn, msg *protocol.Message
c.runningPlugins[key] = handler
c.pluginMu.Unlock()
log.Printf("[Client] Plugin %s restarted at %s with new config", key, localAddr)
c.logf("[Client] Plugin %s restarted at %s with new config", key, localAddr)
}
c.sendPluginConfigUpdateResult(stream, req.PluginName, req.RuleName, true, "")
@@ -777,17 +806,17 @@ func (c *Client) handleUpdateDownload(stream net.Conn, msg *protocol.Message) {
var req protocol.UpdateDownloadRequest
if err := msg.ParsePayload(&req); err != nil {
log.Printf("[Client] Parse update request error: %v", err)
c.logErrorf("[Client] Parse update request error: %v", err)
c.sendUpdateResult(stream, false, "invalid request")
return
}
log.Printf("[Client] Update download requested: %s", req.DownloadURL)
c.logf("[Client] Update download requested: %s", req.DownloadURL)
// 异步执行更新
go func() {
if err := c.performSelfUpdate(req.DownloadURL); err != nil {
log.Printf("[Client] Update failed: %v", err)
c.logErrorf("[Client] Update failed: %v", err)
}
}()
@@ -806,7 +835,7 @@ func (c *Client) sendUpdateResult(stream net.Conn, success bool, message string)
// performSelfUpdate 执行自更新
func (c *Client) performSelfUpdate(downloadURL string) error {
log.Printf("[Client] Starting self-update from: %s", downloadURL)
c.logf("[Client] Starting self-update from: %s", downloadURL)
// 使用共享的下载和解压逻辑
binaryPath, cleanup, err := update.DownloadAndExtract(downloadURL, "client")
@@ -853,7 +882,7 @@ func (c *Client) performSelfUpdate(downloadURL string) error {
// 删除备份
os.Remove(backupPath)
log.Printf("[Client] Update completed, restarting...")
c.logf("[Client] Update completed, restarting...")
// 重启进程
restartClientProcess(currentPath, c.ServerAddr, c.Token, c.ID)
@@ -864,7 +893,7 @@ func (c *Client) performSelfUpdate(downloadURL string) error {
func (c *Client) stopAllPlugins() {
c.pluginMu.Lock()
for key, handler := range c.runningPlugins {
log.Printf("[Client] Stopping plugin %s for update", key)
c.logf("[Client] Stopping plugin %s for update", key)
handler.Stop()
}
c.runningPlugins = make(map[string]plugin.ClientPlugin)
@@ -1015,3 +1044,61 @@ func (c *Client) handleLogStop(stream net.Conn, msg *protocol.Message) {
c.logger.Unsubscribe(req.SessionID)
}
// handlePluginAPIRequest 处理插件 API 请求
func (c *Client) handlePluginAPIRequest(stream net.Conn, msg *protocol.Message) {
defer stream.Close()
var req protocol.PluginAPIRequest
if err := msg.ParsePayload(&req); err != nil {
c.sendPluginAPIResponse(stream, 400, nil, "", "invalid request: "+err.Error())
return
}
c.logf("[Client] Plugin API request: %s %s for plugin %s", req.Method, req.Path, req.PluginName)
// 查找运行中的插件
c.pluginMu.RLock()
var handler plugin.ClientPlugin
for key, p := range c.runningPlugins {
// key 格式为 "pluginName:ruleName"
if strings.HasPrefix(key, req.PluginName+":") {
handler = p
break
}
}
c.pluginMu.RUnlock()
if handler == nil {
c.sendPluginAPIResponse(stream, 404, nil, "", "plugin not running: "+req.PluginName)
return
}
// 类型断言为 JSPlugin
jsPlugin, ok := handler.(*script.JSPlugin)
if !ok {
c.sendPluginAPIResponse(stream, 500, nil, "", "plugin does not support API routing")
return
}
// 调用插件的 API 处理函数
status, headers, body, err := jsPlugin.HandleAPIRequest(req.Method, req.Path, req.Query, req.Headers, req.Body)
if err != nil {
c.sendPluginAPIResponse(stream, 500, nil, "", err.Error())
return
}
c.sendPluginAPIResponse(stream, status, headers, body, "")
}
// sendPluginAPIResponse 发送插件 API 响应
func (c *Client) sendPluginAPIResponse(stream net.Conn, status int, headers map[string]string, body, errMsg string) {
resp := protocol.PluginAPIResponse{
Status: status,
Headers: headers,
Body: body,
Error: errMsg,
}
msg, _ := protocol.NewMessage(protocol.MsgTypePluginAPIResponse, resp)
protocol.WriteMessage(stream, msg)
}