diff --git a/go.mod b/go.mod index 624a05f..45e2d7d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/gotunnel go 1.24.0 require ( - github.com/google/uuid v1.6.0 + github.com/golang-jwt/jwt/v5 v5.3.0 github.com/hashicorp/yamux v0.1.1 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.41.0 @@ -11,11 +11,10 @@ require ( require ( github.com/dustin/go-humanize v1.0.1 // indirect - github.com/golang-jwt/jwt/v5 v5.3.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/tetratelabs/wazero v1.11.0 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/sys v0.38.0 // indirect modernc.org/libc v1.66.10 // indirect diff --git a/go.sum b/go.sum index e6b60c3..d29685e 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/tetratelabs/wazero v1.11.0 h1:+gKemEuKCTevU4d7ZTzlsvgd1uaToIDtlQlmNbwqYhA= -github.com/tetratelabs/wazero v1.11.0/go.mod h1:eV28rsN8Q+xwjogd7f4/Pp4xFxO7uOGbLcD/LzB1wiU= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= @@ -23,8 +21,6 @@ golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= diff --git a/internal/client/plugin/cache.go b/internal/client/plugin/cache.go deleted file mode 100644 index 4d4c742..0000000 --- a/internal/client/plugin/cache.go +++ /dev/null @@ -1,114 +0,0 @@ -package plugin - -import ( - "crypto/sha256" - "encoding/hex" - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/gotunnel/pkg/plugin" -) - -// CachedPlugin 缓存的 plugin 信息 -type CachedPlugin struct { - Metadata plugin.PluginMetadata - Path string - LoadedAt time.Time -} - -// Cache 管理本地 plugin 存储 -type Cache struct { - dir string - plugins map[string]*CachedPlugin - mu sync.RWMutex -} - -// NewCache 创建 plugin 缓存 -func NewCache(cacheDir string) (*Cache, error) { - if err := os.MkdirAll(cacheDir, 0755); err != nil { - return nil, err - } - - return &Cache{ - dir: cacheDir, - plugins: make(map[string]*CachedPlugin), - }, nil -} - -// Get 返回缓存的 plugin(如果有效) -func (c *Cache) Get(name, version, checksum string) (*CachedPlugin, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - cached, ok := c.plugins[name] - if !ok { - return nil, nil - } - - // 验证版本和 checksum - if cached.Metadata.Version != version { - return nil, nil - } - if checksum != "" && cached.Metadata.Checksum != checksum { - return nil, nil - } - - return cached, nil -} - -// Store 保存 plugin 到缓存 -func (c *Cache) Store(meta plugin.PluginMetadata, wasmData []byte) error { - c.mu.Lock() - defer c.mu.Unlock() - - // 验证 checksum - hash := sha256.Sum256(wasmData) - checksum := hex.EncodeToString(hash[:]) - if meta.Checksum != "" && meta.Checksum != checksum { - return fmt.Errorf("checksum mismatch") - } - meta.Checksum = checksum - - // 写入文件 - path := filepath.Join(c.dir, meta.Name+".wasm") - if err := os.WriteFile(path, wasmData, 0644); err != nil { - return err - } - - c.plugins[meta.Name] = &CachedPlugin{ - Metadata: meta, - Path: path, - LoadedAt: time.Now(), - } - return nil -} - -// Remove 删除缓存的 plugin -func (c *Cache) Remove(name string) error { - c.mu.Lock() - defer c.mu.Unlock() - - cached, ok := c.plugins[name] - if !ok { - return nil - } - - os.Remove(cached.Path) - delete(c.plugins, name) - return nil -} - -// List 返回所有缓存的 plugins -func (c *Cache) List() []plugin.PluginMetadata { - c.mu.RLock() - defer c.mu.RUnlock() - - var result []plugin.PluginMetadata - for _, cached := range c.plugins { - result = append(result, cached.Metadata) - } - return result -} diff --git a/internal/client/plugin/manager.go b/internal/client/plugin/manager.go index d332e2c..42cb2a7 100644 --- a/internal/client/plugin/manager.go +++ b/internal/client/plugin/manager.go @@ -1,43 +1,25 @@ package plugin import ( - "context" "log" "sync" "github.com/gotunnel/pkg/plugin" "github.com/gotunnel/pkg/plugin/builtin" - "github.com/gotunnel/pkg/plugin/wasm" ) // Manager 客户端 plugin 管理器 type Manager struct { registry *plugin.Registry - cache *Cache - runtime *wasm.Runtime mu sync.RWMutex } // NewManager 创建客户端 plugin 管理器 -func NewManager(cacheDir string) (*Manager, error) { - ctx := context.Background() - - cache, err := NewCache(cacheDir) - if err != nil { - return nil, err - } - - runtime, err := wasm.NewRuntime(ctx) - if err != nil { - return nil, err - } - +func NewManager() (*Manager, error) { registry := plugin.NewRegistry() m := &Manager{ registry: registry, - cache: cache, - runtime: runtime, } // 注册内置 plugins @@ -49,13 +31,19 @@ func NewManager(cacheDir string) (*Manager, error) { } // registerBuiltins 注册内置 plugins -// 注意: tcp, udp, http, https 是内置类型,直接在 tunnel 中处理 func (m *Manager) registerBuiltins() error { - // 使用统一的插件注册入口 + // 注册服务端插件 if err := m.registry.RegisterAll(builtin.GetAll()); err != nil { return err } - log.Printf("[Plugin] Registered %d builtin plugins", len(builtin.GetAll())) + // 注册客户端插件 + for _, h := range builtin.GetAllClientPlugins() { + if err := m.registry.RegisterClientPlugin(h); err != nil { + return err + } + } + log.Printf("[Plugin] Registered %d server plugins, %d client plugins", + len(builtin.GetAll()), len(builtin.GetAllClientPlugins())) return nil } @@ -63,8 +51,3 @@ func (m *Manager) registerBuiltins() error { func (m *Manager) GetHandler(proxyType string) (plugin.ProxyHandler, error) { return m.registry.Get(proxyType) } - -// Close 关闭管理器 -func (m *Manager) Close(ctx context.Context) error { - return m.runtime.Close(ctx) -} diff --git a/internal/client/tunnel/client.go b/internal/client/tunnel/client.go index 40034c1..5ce76d6 100644 --- a/internal/client/tunnel/client.go +++ b/internal/client/tunnel/client.go @@ -29,27 +29,29 @@ const ( // Client 隧道客户端 type Client struct { - ServerAddr string - Token string - ID string - TLSEnabled bool - TLSConfig *tls.Config - session *yamux.Session - rules []protocol.ProxyRule - mu sync.RWMutex - pluginRegistry *plugin.Registry + ServerAddr string + Token string + ID string + TLSEnabled bool + TLSConfig *tls.Config + session *yamux.Session + rules []protocol.ProxyRule + mu sync.RWMutex + pluginRegistry *plugin.Registry + runningPlugins map[string]plugin.ClientHandler // 运行中的客户端插件 + pluginMu sync.RWMutex } // NewClient 创建客户端 func NewClient(serverAddr, token, id string) *Client { - // 如果未指定 ID,尝试从本地文件加载 if id == "" { id = loadClientID() } return &Client{ - ServerAddr: serverAddr, - Token: token, - ID: id, + ServerAddr: serverAddr, + Token: token, + ID: id, + runningPlugins: make(map[string]plugin.ClientHandler), } } @@ -197,6 +199,10 @@ func (c *Client) handleStream(stream net.Conn) { case protocol.MsgTypePluginConfig: defer stream.Close() c.handlePluginConfig(msg) + case protocol.MsgTypeClientPluginStart: + c.handleClientPluginStart(stream, msg) + case protocol.MsgTypeClientPluginConn: + c.handleClientPluginConn(stream, msg) } } @@ -374,3 +380,85 @@ func (c *Client) handlePluginConfig(msg *protocol.Message) { log.Printf("[Client] Plugin %s config applied", cfg.PluginName) } } + +// handleClientPluginStart 处理客户端插件启动请求 +func (c *Client) handleClientPluginStart(stream net.Conn, msg *protocol.Message) { + defer stream.Close() + + var req protocol.ClientPluginStartRequest + if err := msg.ParsePayload(&req); err != nil { + c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", err.Error()) + return + } + + log.Printf("[Client] Starting plugin %s for rule %s", req.PluginName, req.RuleName) + + // 获取插件 + if c.pluginRegistry == nil { + c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", "plugin registry not set") + return + } + + handler, err := c.pluginRegistry.GetClientPlugin(req.PluginName) + if err != nil { + c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", err.Error()) + return + } + + // 初始化并启动 + if err := handler.Init(req.Config); err != nil { + c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", err.Error()) + return + } + + localAddr, err := handler.Start() + if err != nil { + c.sendPluginStatus(stream, req.PluginName, req.RuleName, false, "", err.Error()) + return + } + + // 保存运行中的插件 + key := req.PluginName + ":" + req.RuleName + c.pluginMu.Lock() + c.runningPlugins[key] = handler + c.pluginMu.Unlock() + + log.Printf("[Client] Plugin %s started at %s", req.PluginName, localAddr) + c.sendPluginStatus(stream, req.PluginName, req.RuleName, true, localAddr, "") +} + +// sendPluginStatus 发送插件状态响应 +func (c *Client) sendPluginStatus(stream net.Conn, pluginName, ruleName string, running bool, localAddr, errMsg string) { + resp := protocol.ClientPluginStatusResponse{ + PluginName: pluginName, + RuleName: ruleName, + Running: running, + LocalAddr: localAddr, + Error: errMsg, + } + msg, _ := protocol.NewMessage(protocol.MsgTypeClientPluginStatus, resp) + protocol.WriteMessage(stream, msg) +} + +// handleClientPluginConn 处理客户端插件连接 +func (c *Client) handleClientPluginConn(stream net.Conn, msg *protocol.Message) { + var req protocol.ClientPluginConnRequest + if err := msg.ParsePayload(&req); err != nil { + stream.Close() + return + } + + key := req.PluginName + ":" + req.RuleName + c.pluginMu.RLock() + handler, ok := c.runningPlugins[key] + c.pluginMu.RUnlock() + + if !ok { + log.Printf("[Client] Plugin %s not running", key) + stream.Close() + return + } + + // 让插件处理连接 + handler.HandleConn(stream) +} diff --git a/internal/server/plugin/manager.go b/internal/server/plugin/manager.go index 45be645..fc8f868 100644 --- a/internal/server/plugin/manager.go +++ b/internal/server/plugin/manager.go @@ -1,40 +1,25 @@ package plugin import ( - "context" - "fmt" "log" "sync" - "github.com/gotunnel/internal/server/db" "github.com/gotunnel/pkg/plugin" "github.com/gotunnel/pkg/plugin/builtin" - "github.com/gotunnel/pkg/plugin/wasm" ) // Manager 服务端 plugin 管理器 type Manager struct { registry *plugin.Registry - store db.PluginStore - runtime *wasm.Runtime mu sync.RWMutex } // NewManager 创建 plugin 管理器 -func NewManager(pluginStore db.PluginStore) (*Manager, error) { - ctx := context.Background() - - runtime, err := wasm.NewRuntime(ctx) - if err != nil { - return nil, fmt.Errorf("create wasm runtime: %w", err) - } - +func NewManager() (*Manager, error) { registry := plugin.NewRegistry() m := &Manager{ registry: registry, - store: pluginStore, - runtime: runtime, } // 注册内置 plugins @@ -46,67 +31,20 @@ func NewManager(pluginStore db.PluginStore) (*Manager, error) { } // registerBuiltins 注册内置 plugins -// 注意: tcp, udp, http, https 是内置类型,直接在 tunnel 中处理 -// 这里只注册需要通过 plugin 系统提供的协议 func (m *Manager) registerBuiltins() error { - // 使用统一的插件注册入口 + // 注册服务端插件 if err := m.registry.RegisterAll(builtin.GetAll()); err != nil { return err } - log.Printf("[Plugin] Registered %d builtin plugins", len(builtin.GetAll())) - return nil -} - -// LoadStoredPlugins 从数据库加载所有 plugins -func (m *Manager) LoadStoredPlugins(ctx context.Context) error { - if m.store == nil { - return nil - } - - plugins, err := m.store.GetAllPlugins() - if err != nil { - return err - } - - for _, p := range plugins { - data, err := m.store.GetPluginWASM(p.Name) - if err != nil { - log.Printf("[Plugin] Failed to load %s: %v", p.Name, err) - continue - } - - if err := m.loadWASMPlugin(ctx, p.Name, data); err != nil { - log.Printf("[Plugin] Failed to init %s: %v", p.Name, err) - } - } - - return nil -} - -// loadWASMPlugin 加载 WASM plugin -func (m *Manager) loadWASMPlugin(ctx context.Context, name string, data []byte) error { - _, err := m.runtime.LoadModule(ctx, name, data) - if err != nil { - return err - } - log.Printf("[Plugin] WASM plugin loaded: %s", name) - return nil -} - -// InstallPlugin 安装新的 WASM plugin -func (m *Manager) InstallPlugin(ctx context.Context, p *db.PluginData) error { - m.mu.Lock() - defer m.mu.Unlock() - - // 存储到数据库 - if m.store != nil { - if err := m.store.SavePlugin(p); err != nil { + // 注册客户端插件 + for _, h := range builtin.GetAllClientPlugins() { + if err := m.registry.RegisterClientPlugin(h); err != nil { return err } } - - // 加载到运行时 - return m.loadWASMPlugin(ctx, p.Name, p.WASMData) + log.Printf("[Plugin] Registered %d server plugins, %d client plugins", + len(builtin.GetAll()), len(builtin.GetAllClientPlugins())) + return nil } // GetHandler 返回指定代理类型的 handler @@ -119,7 +57,7 @@ func (m *Manager) ListPlugins() []plugin.PluginInfo { return m.registry.List() } -// Close 关闭管理器 -func (m *Manager) Close(ctx context.Context) error { - return m.runtime.Close(ctx) +// GetRegistry 返回插件注册表 +func (m *Manager) GetRegistry() *plugin.Registry { + return m.registry } diff --git a/internal/server/router/api.go b/internal/server/router/api.go index 87f467c..88c5079 100644 --- a/internal/server/router/api.go +++ b/internal/server/router/api.go @@ -22,19 +22,21 @@ func validateClientID(id string) bool { // ClientStatus 客户端状态 type ClientStatus struct { - ID string `json:"id"` - Nickname string `json:"nickname,omitempty"` - Online bool `json:"online"` - LastPing string `json:"last_ping,omitempty"` - RuleCount int `json:"rule_count"` + ID string `json:"id"` + Nickname string `json:"nickname,omitempty"` + Online bool `json:"online"` + LastPing string `json:"last_ping,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + RuleCount int `json:"rule_count"` } // ServerInterface 服务端接口 type ServerInterface interface { - GetClientStatus(clientID string) (online bool, lastPing string) + GetClientStatus(clientID string) (online bool, lastPing string, remoteAddr string) GetAllClientStatus() map[string]struct { - Online bool - LastPing string + Online bool + LastPing string + RemoteAddr string } ReloadConfig() error GetBindAddr() string @@ -158,6 +160,7 @@ func (h *APIHandler) getClients(rw http.ResponseWriter) { if s, ok := statusMap[c.ID]; ok { cs.Online = s.Online cs.LastPing = s.LastPing + cs.RemoteAddr = s.RemoteAddr } result = append(result, cs) } @@ -256,10 +259,11 @@ func (h *APIHandler) getClient(rw http.ResponseWriter, clientID string) { http.Error(rw, "client not found", http.StatusNotFound) return } - online, lastPing := h.server.GetClientStatus(clientID) + online, lastPing, remoteAddr := h.server.GetClientStatus(clientID) h.jsonResponse(rw, map[string]interface{}{ "id": client.ID, "nickname": client.Nickname, "rules": client.Rules, "plugins": client.Plugins, "online": online, "last_ping": lastPing, + "remote_addr": remoteAddr, }) } @@ -429,7 +433,7 @@ func (h *APIHandler) pushConfigToClient(rw http.ResponseWriter, r *http.Request, return } - online, _ := h.server.GetClientStatus(clientID) + online, _, _ := h.server.GetClientStatus(clientID) if !online { http.Error(rw, "client not online", http.StatusBadRequest) return @@ -523,7 +527,7 @@ func (h *APIHandler) installPluginsToClient(rw http.ResponseWriter, r *http.Requ return } - online, _ := h.server.GetClientStatus(clientID) + online, _, _ := h.server.GetClientStatus(clientID) if !online { http.Error(rw, "client not online", http.StatusBadRequest) return @@ -734,7 +738,7 @@ func (h *APIHandler) updateClientPluginConfig(rw http.ResponseWriter, r *http.Re } // 如果客户端在线,同步配置 - online, _ := h.server.GetClientStatus(clientID) + online, _, _ := h.server.GetClientStatus(clientID) if online { if err := h.server.SyncPluginConfigToClient(clientID, pluginName, req.Config); err != nil { // 配置已保存,但同步失败,返回警告 diff --git a/internal/server/tunnel/server.go b/internal/server/tunnel/server.go index 6d081f2..42a2cbc 100644 --- a/internal/server/tunnel/server.go +++ b/internal/server/tunnel/server.go @@ -52,6 +52,7 @@ type Server struct { // ClientSession 客户端会话 type ClientSession struct { ID string + RemoteAddr string // 客户端 IP 地址 Session *yamux.Session Rules []protocol.ProxyRule Listeners map[int]net.Listener @@ -185,13 +186,20 @@ func (s *Server) setupClientSession(conn net.Conn, clientID string, rules []prot return } + // 提取客户端 IP(去掉端口) + remoteAddr := conn.RemoteAddr().String() + if host, _, err := net.SplitHostPort(remoteAddr); err == nil { + remoteAddr = host + } + cs := &ClientSession{ - ID: clientID, - Session: session, - Rules: rules, - Listeners: make(map[int]net.Listener), - UDPConns: make(map[int]*net.UDPConn), - LastPing: time.Now(), + ID: clientID, + RemoteAddr: remoteAddr, + Session: session, + Rules: rules, + Listeners: make(map[int]net.Listener), + UDPConns: make(map[int]*net.UDPConn), + LastPing: time.Now(), } s.registerClient(cs) @@ -284,6 +292,10 @@ func (s *Server) stopProxyListeners(cs *ClientSession) { // startProxyListeners 启动代理监听 func (s *Server) startProxyListeners(cs *ClientSession) { for _, rule := range cs.Rules { + if !rule.IsEnabled() { + continue + } + ruleType := rule.Type if ruleType == "" { ruleType = "tcp" @@ -295,6 +307,12 @@ func (s *Server) startProxyListeners(cs *ClientSession) { continue } + // 检查是否为客户端插件 + if s.isClientPlugin(ruleType) { + s.startClientPluginListener(cs, rule) + continue + } + // TCP 类型 if err := s.portManager.Reserve(rule.RemotePort, cs.ID); err != nil { log.Printf("[Server] Port %d error: %v", rule.RemotePort, err) @@ -445,22 +463,23 @@ func (s *Server) sendHeartbeat(cs *ClientSession) bool { } // GetClientStatus 获取客户端状态 -func (s *Server) GetClientStatus(clientID string) (online bool, lastPing string) { +func (s *Server) GetClientStatus(clientID string) (online bool, lastPing string, remoteAddr string) { s.mu.RLock() defer s.mu.RUnlock() if cs, ok := s.clients[clientID]; ok { cs.mu.Lock() defer cs.mu.Unlock() - return true, cs.LastPing.Format(time.RFC3339) + return true, cs.LastPing.Format(time.RFC3339), cs.RemoteAddr } - return false, "" + return false, "", "" } // GetAllClientStatus 获取所有客户端状态 func (s *Server) GetAllClientStatus() map[string]struct { - Online bool - LastPing string + Online bool + LastPing string + RemoteAddr string } { // 先复制客户端引用,避免嵌套锁 s.mu.RLock() @@ -471,18 +490,21 @@ func (s *Server) GetAllClientStatus() map[string]struct { s.mu.RUnlock() result := make(map[string]struct { - Online bool - LastPing string + Online bool + LastPing string + RemoteAddr string }) for _, cs := range clients { cs.mu.Lock() result[cs.ID] = struct { - Online bool - LastPing string + Online bool + LastPing string + RemoteAddr string }{ - Online: true, - LastPing: cs.LastPing.Format(time.RFC3339), + Online: true, + LastPing: cs.LastPing.Format(time.RFC3339), + RemoteAddr: cs.RemoteAddr, } cs.mu.Unlock() } @@ -812,3 +834,119 @@ func (s *Server) sendPluginConfig(session *yamux.Session, pluginName string, con } return protocol.WriteMessage(stream, msg) } + +// isClientPlugin 检查是否为客户端插件 +func (s *Server) isClientPlugin(pluginType string) bool { + if s.pluginRegistry == nil { + return false + } + handler, err := s.pluginRegistry.GetClientPlugin(pluginType) + if err != nil { + return false + } + return handler != nil +} + +// startClientPluginListener 启动客户端插件监听 +func (s *Server) startClientPluginListener(cs *ClientSession, rule protocol.ProxyRule) { + if err := s.portManager.Reserve(rule.RemotePort, cs.ID); err != nil { + log.Printf("[Server] Port %d error: %v", rule.RemotePort, err) + return + } + + // 发送启动命令到客户端 + if err := s.sendClientPluginStart(cs.Session, rule); err != nil { + log.Printf("[Server] Failed to start client plugin %s: %v", rule.Type, err) + s.portManager.Release(rule.RemotePort) + return + } + + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", rule.RemotePort)) + if err != nil { + log.Printf("[Server] Listen %d error: %v", rule.RemotePort, err) + s.portManager.Release(rule.RemotePort) + return + } + + cs.mu.Lock() + cs.Listeners[rule.RemotePort] = ln + cs.mu.Unlock() + + log.Printf("[Server] Client plugin %s on :%d", rule.Type, rule.RemotePort) + go s.acceptClientPluginConns(cs, ln, rule) +} + +// sendClientPluginStart 发送客户端插件启动命令 +func (s *Server) sendClientPluginStart(session *yamux.Session, rule protocol.ProxyRule) error { + stream, err := session.Open() + if err != nil { + return err + } + defer stream.Close() + + req := protocol.ClientPluginStartRequest{ + PluginName: rule.Type, + RuleName: rule.Name, + RemotePort: rule.RemotePort, + Config: rule.PluginConfig, + } + msg, err := protocol.NewMessage(protocol.MsgTypeClientPluginStart, req) + if err != nil { + return err + } + if err := protocol.WriteMessage(stream, msg); err != nil { + return err + } + + // 等待响应 + resp, err := protocol.ReadMessage(stream) + if err != nil { + return err + } + if resp.Type != protocol.MsgTypeClientPluginStatus { + return fmt.Errorf("unexpected response type: %d", resp.Type) + } + + var status protocol.ClientPluginStatusResponse + if err := resp.ParsePayload(&status); err != nil { + return err + } + if !status.Running { + return fmt.Errorf("plugin failed: %s", status.Error) + } + return nil +} + +// acceptClientPluginConns 接受客户端插件连接 +func (s *Server) acceptClientPluginConns(cs *ClientSession, ln net.Listener, rule protocol.ProxyRule) { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go s.handleClientPluginConn(cs, conn, rule) + } +} + +// handleClientPluginConn 处理客户端插件连接 +func (s *Server) handleClientPluginConn(cs *ClientSession, conn net.Conn, rule protocol.ProxyRule) { + defer conn.Close() + + stream, err := cs.Session.Open() + if err != nil { + log.Printf("[Server] Open stream error: %v", err) + return + } + defer stream.Close() + + req := protocol.ClientPluginConnRequest{ + PluginName: rule.Type, + RuleName: rule.Name, + } + msg, _ := protocol.NewMessage(protocol.MsgTypeClientPluginConn, req) + if err := protocol.WriteMessage(stream, msg); err != nil { + return + } + + relay.Relay(conn, stream) +} diff --git a/pkg/plugin/builtin/echo.go b/pkg/plugin/builtin/echo.go new file mode 100644 index 0000000..c6e7618 --- /dev/null +++ b/pkg/plugin/builtin/echo.go @@ -0,0 +1,95 @@ +package builtin + +import ( + "io" + "log" + "net" + "sync" + + "github.com/gotunnel/pkg/plugin" +) + +func init() { + RegisterClientPlugin(NewEchoPlugin()) +} + +// EchoPlugin 回显插件 - 客户端插件示例 +type EchoPlugin struct { + config map[string]string + listener net.Listener + running bool + mu sync.Mutex +} + +// NewEchoPlugin 创建 Echo 插件 +func NewEchoPlugin() *EchoPlugin { + return &EchoPlugin{} +} + +// Metadata 返回插件信息 +func (p *EchoPlugin) Metadata() plugin.PluginMetadata { + return plugin.PluginMetadata{ + Name: "echo", + Version: "1.0.0", + Type: plugin.PluginTypeApp, + Source: plugin.PluginSourceBuiltin, + RunAt: plugin.SideClient, + Description: "Echo server (client plugin example)", + Author: "GoTunnel", + RuleSchema: &plugin.RuleSchema{ + NeedsLocalAddr: false, + }, + } +} + +// Init 初始化插件 +func (p *EchoPlugin) Init(config map[string]string) error { + p.config = config + return nil +} + +// Start 启动服务 +func (p *EchoPlugin) Start() (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.running { + return "", nil + } + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", err + } + + p.listener = ln + p.running = true + + log.Printf("[Echo] Started on %s", ln.Addr().String()) + return ln.Addr().String(), nil +} + +// HandleConn 处理连接 +func (p *EchoPlugin) HandleConn(conn net.Conn) error { + defer conn.Close() + log.Printf("[Echo] New connection from tunnel") + _, err := io.Copy(conn, conn) + return err +} + +// Stop 停止服务 +func (p *EchoPlugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if !p.running { + return nil + } + + if p.listener != nil { + p.listener.Close() + } + p.running = false + log.Printf("[Echo] Stopped") + return nil +} diff --git a/pkg/plugin/builtin/http.go b/pkg/plugin/builtin/http.go deleted file mode 100644 index 41e47f6..0000000 --- a/pkg/plugin/builtin/http.go +++ /dev/null @@ -1,116 +0,0 @@ -package builtin - -import ( - "bufio" - "io" - "net" - "net/http" - "strings" - - "github.com/gotunnel/pkg/plugin" -) - -// HTTPPlugin 将现有 HTTP 代理实现封装为 plugin -type HTTPPlugin struct { - config map[string]string -} - -// NewHTTPPlugin 创建 HTTP plugin -func NewHTTPPlugin() *HTTPPlugin { - return &HTTPPlugin{} -} - -// Metadata 返回 plugin 信息 -func (p *HTTPPlugin) Metadata() plugin.PluginMetadata { - return plugin.PluginMetadata{ - Name: "http", - Version: "1.0.0", - Type: plugin.PluginTypeProxy, - Source: plugin.PluginSourceBuiltin, - Description: "HTTP/HTTPS proxy protocol handler", - Author: "GoTunnel", - Capabilities: []string{ - "dial", "read", "write", "close", - }, - } -} - -// Init 初始化 plugin -func (p *HTTPPlugin) Init(config map[string]string) error { - p.config = config - return nil -} - -// HandleConn 处理 HTTP 代理连接 -func (p *HTTPPlugin) HandleConn(conn net.Conn, dialer plugin.Dialer) error { - defer conn.Close() - - reader := bufio.NewReader(conn) - req, err := http.ReadRequest(reader) - if err != nil { - return err - } - - if req.Method == http.MethodConnect { - return p.handleConnect(conn, req, dialer) - } - return p.handleHTTP(conn, req, dialer) -} - -// Close 释放资源 -func (p *HTTPPlugin) Close() error { - return nil -} - -// handleConnect 处理 CONNECT 方法 (HTTPS) -func (p *HTTPPlugin) handleConnect(conn net.Conn, req *http.Request, dialer plugin.Dialer) error { - target := req.Host - if !strings.Contains(target, ":") { - target = target + ":443" - } - - remote, err := dialer.Dial("tcp", target) - if err != nil { - conn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n")) - return err - } - defer remote.Close() - - conn.Write([]byte("HTTP/1.1 200 Connection Established\r\n\r\n")) - - go io.Copy(remote, conn) - io.Copy(conn, remote) - return nil -} - -// handleHTTP 处理普通 HTTP 请求 -func (p *HTTPPlugin) handleHTTP(conn net.Conn, req *http.Request, dialer plugin.Dialer) error { - target := req.Host - if !strings.Contains(target, ":") { - target = target + ":80" - } - - remote, err := dialer.Dial("tcp", target) - if err != nil { - conn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n")) - return err - } - defer remote.Close() - - // 修改请求路径为相对路径 - req.URL.Scheme = "" - req.URL.Host = "" - req.RequestURI = req.URL.Path - if req.URL.RawQuery != "" { - req.RequestURI += "?" + req.URL.RawQuery - } - - // 发送请求到目标 - if err := req.Write(remote); err != nil { - return err - } - - // 转发响应 - _, err = io.Copy(conn, remote) - return err -} diff --git a/pkg/plugin/builtin/register.go b/pkg/plugin/builtin/register.go index b1f8af6..e1b7f4b 100644 --- a/pkg/plugin/builtin/register.go +++ b/pkg/plugin/builtin/register.go @@ -3,14 +3,27 @@ package builtin import "github.com/gotunnel/pkg/plugin" // 全局插件注册表 -var registry []plugin.ProxyHandler +var ( + serverPlugins []plugin.ProxyHandler + clientPlugins []plugin.ClientHandler +) -// Register 插件自注册函数,由各插件的 init() 调用 +// Register 注册服务端插件 func Register(handler plugin.ProxyHandler) { - registry = append(registry, handler) + serverPlugins = append(serverPlugins, handler) } -// GetAll 返回所有已注册的内置插件 -func GetAll() []plugin.ProxyHandler { - return registry +// RegisterClientPlugin 注册客户端插件 +func RegisterClientPlugin(handler plugin.ClientHandler) { + clientPlugins = append(clientPlugins, handler) +} + +// GetAll 返回所有服务端插件 +func GetAll() []plugin.ProxyHandler { + return serverPlugins +} + +// GetAllClientPlugins 返回所有客户端插件 +func GetAllClientPlugins() []plugin.ClientHandler { + return clientPlugins } diff --git a/pkg/plugin/builtin/socks5.go b/pkg/plugin/builtin/socks5.go index cde0e9a..7136a2a 100644 --- a/pkg/plugin/builtin/socks5.go +++ b/pkg/plugin/builtin/socks5.go @@ -45,34 +45,29 @@ func (p *SOCKS5Plugin) Metadata() plugin.PluginMetadata { Version: "1.0.0", Type: plugin.PluginTypeProxy, Source: plugin.PluginSourceBuiltin, - Description: "SOCKS5 proxy protocol handler (official plugin)", + RunAt: plugin.SideServer, + Description: "SOCKS5 proxy protocol handler", Author: "GoTunnel", - Capabilities: []string{ - "dial", "read", "write", "close", - }, RuleSchema: &plugin.RuleSchema{ - NeedsLocalAddr: false, // SOCKS5 不需要本地地址 + NeedsLocalAddr: false, }, ConfigSchema: []plugin.ConfigField{ { - Key: "auth", - Label: "认证方式", - Type: plugin.ConfigFieldSelect, - Default: "none", - Options: []string{"none", "password"}, - Description: "SOCKS5 认证方式", + Key: "auth", + Label: "认证方式", + Type: plugin.ConfigFieldSelect, + Default: "none", + Options: []string{"none", "password"}, }, { - Key: "username", - Label: "用户名", - Type: plugin.ConfigFieldString, - Description: "认证用户名(仅 password 认证时需要)", + Key: "username", + Label: "用户名", + Type: plugin.ConfigFieldString, }, { - Key: "password", - Label: "密码", - Type: plugin.ConfigFieldPassword, - Description: "认证密码(仅 password 认证时需要)", + Key: "password", + Label: "密码", + Type: plugin.ConfigFieldPassword, }, }, } diff --git a/pkg/plugin/builtin/vnc.go b/pkg/plugin/builtin/vnc.go index a68c6f4..7197ca2 100644 --- a/pkg/plugin/builtin/vnc.go +++ b/pkg/plugin/builtin/vnc.go @@ -29,20 +29,17 @@ func (p *VNCPlugin) Metadata() plugin.PluginMetadata { Version: "1.0.0", Type: plugin.PluginTypeApp, Source: plugin.PluginSourceBuiltin, - Description: "VNC remote desktop relay (connects to client's local VNC server)", + RunAt: plugin.SideServer, // 当前为服务端中继模式 + Description: "VNC remote desktop relay", Author: "GoTunnel", - Capabilities: []string{ - "dial", "read", "write", "close", - }, RuleSchema: &plugin.RuleSchema{ NeedsLocalAddr: false, ExtraFields: []plugin.ConfigField{ { - Key: "vnc_addr", - Label: "VNC 地址", - Type: plugin.ConfigFieldString, - Default: "127.0.0.1:5900", - Description: "客户端本地 VNC 服务地址", + Key: "vnc_addr", + Label: "VNC 地址", + Type: plugin.ConfigFieldString, + Default: "127.0.0.1:5900", }, }, }, diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go index d7a8002..ad1d689 100644 --- a/pkg/plugin/registry.go +++ b/pkg/plugin/registry.go @@ -8,20 +8,22 @@ import ( // Registry 管理可用的 plugins type Registry struct { - builtin map[string]ProxyHandler // 内置 Go 实现 - enabled map[string]bool // 启用状态 - mu sync.RWMutex + serverPlugins map[string]ProxyHandler // 服务端插件 + clientPlugins map[string]ClientHandler // 客户端插件 + enabled map[string]bool // 启用状态 + mu sync.RWMutex } // NewRegistry 创建 plugin 注册表 func NewRegistry() *Registry { return &Registry{ - builtin: make(map[string]ProxyHandler), - enabled: make(map[string]bool), + serverPlugins: make(map[string]ProxyHandler), + clientPlugins: make(map[string]ClientHandler), + enabled: make(map[string]bool), } } -// RegisterBuiltin 注册内置 plugin +// RegisterBuiltin 注册服务端插件 func (r *Registry) RegisterBuiltin(handler ProxyHandler) error { r.mu.Lock() defer r.mu.Unlock() @@ -31,22 +33,40 @@ func (r *Registry) RegisterBuiltin(handler ProxyHandler) error { return fmt.Errorf("plugin name cannot be empty") } - if _, exists := r.builtin[meta.Name]; exists { + if _, exists := r.serverPlugins[meta.Name]; exists { return fmt.Errorf("plugin %s already registered", meta.Name) } - r.builtin[meta.Name] = handler - r.enabled[meta.Name] = true // 默认启用 + r.serverPlugins[meta.Name] = handler + r.enabled[meta.Name] = true return nil } -// Get 返回指定代理类型的 handler +// RegisterClientPlugin 注册客户端插件 +func (r *Registry) RegisterClientPlugin(handler ClientHandler) error { + r.mu.Lock() + defer r.mu.Unlock() + + meta := handler.Metadata() + if meta.Name == "" { + return fmt.Errorf("plugin name cannot be empty") + } + + if _, exists := r.clientPlugins[meta.Name]; exists { + return fmt.Errorf("client plugin %s already registered", meta.Name) + } + + r.clientPlugins[meta.Name] = handler + r.enabled[meta.Name] = true + return nil +} + +// Get 返回指定代理类型的服务端 handler func (r *Registry) Get(proxyType string) (ProxyHandler, error) { r.mu.RLock() defer r.mu.RUnlock() - // 先查找内置 plugin - if handler, ok := r.builtin[proxyType]; ok { + if handler, ok := r.serverPlugins[proxyType]; ok { if !r.enabled[proxyType] { return nil, fmt.Errorf("plugin %s is disabled", proxyType) } @@ -56,6 +76,21 @@ func (r *Registry) Get(proxyType string) (ProxyHandler, error) { return nil, fmt.Errorf("plugin %s not found", proxyType) } +// GetClientPlugin 返回指定类型的客户端 handler +func (r *Registry) GetClientPlugin(name string) (ClientHandler, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + if handler, ok := r.clientPlugins[name]; ok { + if !r.enabled[name] { + return nil, fmt.Errorf("client plugin %s is disabled", name) + } + return handler, nil + } + + return nil, fmt.Errorf("client plugin %s not found", name) +} + // List 返回所有可用的 plugins func (r *Registry) List() []PluginInfo { r.mu.RLock() @@ -63,8 +98,17 @@ func (r *Registry) List() []PluginInfo { var plugins []PluginInfo - // 内置 plugins - for name, handler := range r.builtin { + // 服务端插件 + for name, handler := range r.serverPlugins { + plugins = append(plugins, PluginInfo{ + Metadata: handler.Metadata(), + Loaded: true, + Enabled: r.enabled[name], + }) + } + + // 客户端插件 + for name, handler := range r.clientPlugins { plugins = append(plugins, PluginInfo{ Metadata: handler.Metadata(), Loaded: true, @@ -80,8 +124,9 @@ func (r *Registry) Has(name string) bool { r.mu.RLock() defer r.mu.RUnlock() - _, ok := r.builtin[name] - return ok + _, ok1 := r.serverPlugins[name] + _, ok2 := r.clientPlugins[name] + return ok1 || ok2 } // Close 关闭所有 plugins @@ -90,11 +135,16 @@ func (r *Registry) Close(ctx context.Context) error { defer r.mu.Unlock() var lastErr error - for name, handler := range r.builtin { + for name, handler := range r.serverPlugins { if err := handler.Close(); err != nil { lastErr = fmt.Errorf("failed to close plugin %s: %w", name, err) } } + for name, handler := range r.clientPlugins { + if err := handler.Stop(); err != nil { + lastErr = fmt.Errorf("failed to stop client plugin %s: %w", name, err) + } + } return lastErr } @@ -104,7 +154,7 @@ func (r *Registry) Enable(name string) error { r.mu.Lock() defer r.mu.Unlock() - if _, ok := r.builtin[name]; !ok { + if !r.has(name) { return fmt.Errorf("plugin %s not found", name) } r.enabled[name] = true @@ -116,13 +166,20 @@ func (r *Registry) Disable(name string) error { r.mu.Lock() defer r.mu.Unlock() - if _, ok := r.builtin[name]; !ok { + if !r.has(name) { return fmt.Errorf("plugin %s not found", name) } r.enabled[name] = false return nil } +// has 内部检查(无锁) +func (r *Registry) has(name string) bool { + _, ok1 := r.serverPlugins[name] + _, ok2 := r.clientPlugins[name] + return ok1 || ok2 +} + // IsEnabled 检查插件是否启用 func (r *Registry) IsEnabled(name string) bool { r.mu.RLock() diff --git a/pkg/plugin/types.go b/pkg/plugin/types.go index 2c93329..6925dc1 100644 --- a/pkg/plugin/types.go +++ b/pkg/plugin/types.go @@ -20,7 +20,6 @@ type PluginSource string const ( PluginSourceBuiltin PluginSource = "builtin" // 内置编译 - PluginSourceWASM PluginSource = "wasm" // WASM 模块 ) // ConfigFieldType 配置字段类型 @@ -53,18 +52,17 @@ type RuleSchema struct { // PluginMetadata 描述一个 plugin type PluginMetadata struct { - Name string `json:"name"` // 唯一标识符 (如 "socks5") - Version string `json:"version"` // 语义化版本 - Type PluginType `json:"type"` // Plugin 类别 - Source PluginSource `json:"source"` // builtin 或 wasm - Description string `json:"description"` // 人类可读描述 - Author string `json:"author"` // Plugin 作者 - Icon string `json:"icon,omitempty"` // 图标文件名 (如 "socks5.png") - Checksum string `json:"checksum,omitempty"` // WASM 二进制的 SHA256 - Size int64 `json:"size,omitempty"` // WASM 二进制大小 - Capabilities []string `json:"capabilities,omitempty"` // 所需 host functions - ConfigSchema []ConfigField `json:"config_schema,omitempty"`// 插件配置模式 - RuleSchema *RuleSchema `json:"rule_schema,omitempty"` // 规则表单模式 + Name string `json:"name"` // 唯一标识符 + Version string `json:"version"` // 语义化版本 + Type PluginType `json:"type"` // Plugin 类别 + Source PluginSource `json:"source"` // builtin + RunAt Side `json:"run_at"` // 运行位置: server 或 client + Description string `json:"description"` // 人类可读描述 + Author string `json:"author"` // Plugin 作者 + Icon string `json:"icon,omitempty"` // 图标文件名 + Capabilities []string `json:"capabilities,omitempty"` // 所需能力 + ConfigSchema []ConfigField `json:"config_schema,omitempty"` // 插件配置模式 + RuleSchema *RuleSchema `json:"rule_schema,omitempty"` // 规则表单模式 } // PluginInfo 组合元数据和运行时状态 @@ -82,6 +80,7 @@ type Dialer interface { } // ProxyHandler 是所有 proxy plugin 必须实现的接口 +// 运行在服务端,处理外部连接并通过隧道转发 type ProxyHandler interface { // Metadata 返回 plugin 信息 Metadata() PluginMetadata @@ -97,6 +96,26 @@ type ProxyHandler interface { Close() error } +// ClientHandler 客户端插件接口 +// 运行在客户端,提供本地服务(如 VNC 服务器、文件管理等) +type ClientHandler interface { + // Metadata 返回 plugin 信息 + Metadata() PluginMetadata + + // Init 使用配置初始化 plugin + Init(config map[string]string) error + + // Start 启动客户端服务 + // 返回服务监听的本地地址(如 "127.0.0.1:5900") + Start() (localAddr string, err error) + + // HandleConn 处理来自隧道的连接 + HandleConn(conn net.Conn) error + + // Stop 停止客户端服务 + Stop() error +} + // ExtendedProxyHandler 扩展的代理处理器接口 // 支持 PluginAPI 的插件应实现此接口 type ExtendedProxyHandler interface { @@ -211,27 +230,3 @@ var ( ErrInvalidConfig = &APIError{Code: 7, Message: "invalid configuration"} ) -// ConnHandle WASM 连接句柄 -type ConnHandle uint32 - -// HostContext 提供给 WASM plugin 的 host functions -type HostContext interface { - // 网络操作 - Dial(network, address string) (ConnHandle, error) - Read(handle ConnHandle, buf []byte) (int, error) - Write(handle ConnHandle, buf []byte) (int, error) - CloseConn(handle ConnHandle) error - - // 客户端连接操作 - ClientRead(buf []byte) (int, error) - ClientWrite(buf []byte) (int, error) - - // 日志 - Log(level LogLevel, message string) - - // 时间 - Now() int64 - - // 配置 - GetConfig(key string) string -} diff --git a/pkg/plugin/wasm/host.go b/pkg/plugin/wasm/host.go deleted file mode 100644 index 80097aa..0000000 --- a/pkg/plugin/wasm/host.go +++ /dev/null @@ -1,146 +0,0 @@ -package wasm - -import ( - "errors" - "log" - "net" - "sync" - "time" - - "github.com/gotunnel/pkg/plugin" -) - -// ErrInvalidHandle 无效的连接句柄 -var ErrInvalidHandle = errors.New("invalid connection handle") - -// HostContextImpl 实现 HostContext 接口 -type HostContextImpl struct { - dialer plugin.Dialer - clientConn net.Conn - config map[string]string - - // 连接管理 - conns map[plugin.ConnHandle]net.Conn - nextHandle plugin.ConnHandle - mu sync.Mutex -} - -// NewHostContext 创建 host context -func NewHostContext(dialer plugin.Dialer, clientConn net.Conn, config map[string]string) *HostContextImpl { - return &HostContextImpl{ - dialer: dialer, - clientConn: clientConn, - config: config, - conns: make(map[plugin.ConnHandle]net.Conn), - nextHandle: 1, - } -} - -// Dial 通过隧道建立连接 -func (h *HostContextImpl) Dial(network, address string) (plugin.ConnHandle, error) { - conn, err := h.dialer.Dial(network, address) - if err != nil { - return 0, err - } - - h.mu.Lock() - handle := h.nextHandle - h.nextHandle++ - h.conns[handle] = conn - h.mu.Unlock() - - return handle, nil -} - -// Read 从连接读取数据 -func (h *HostContextImpl) Read(handle plugin.ConnHandle, buf []byte) (int, error) { - h.mu.Lock() - conn, ok := h.conns[handle] - h.mu.Unlock() - - if !ok { - return 0, ErrInvalidHandle - } - - return conn.Read(buf) -} - -// Write 向连接写入数据 -func (h *HostContextImpl) Write(handle plugin.ConnHandle, buf []byte) (int, error) { - h.mu.Lock() - conn, ok := h.conns[handle] - h.mu.Unlock() - - if !ok { - return 0, ErrInvalidHandle - } - - return conn.Write(buf) -} - -// CloseConn 关闭连接 -func (h *HostContextImpl) CloseConn(handle plugin.ConnHandle) error { - h.mu.Lock() - conn, ok := h.conns[handle] - if ok { - delete(h.conns, handle) - } - h.mu.Unlock() - - if !ok { - return ErrInvalidHandle - } - - return conn.Close() -} - -// ClientRead 从客户端连接读取数据 -func (h *HostContextImpl) ClientRead(buf []byte) (int, error) { - return h.clientConn.Read(buf) -} - -// ClientWrite 向客户端连接写入数据 -func (h *HostContextImpl) ClientWrite(buf []byte) (int, error) { - return h.clientConn.Write(buf) -} - -// Log 记录日志 -func (h *HostContextImpl) Log(level plugin.LogLevel, message string) { - prefix := "[WASM]" - switch level { - case plugin.LogDebug: - prefix = "[WASM DEBUG]" - case plugin.LogInfo: - prefix = "[WASM INFO]" - case plugin.LogWarn: - prefix = "[WASM WARN]" - case plugin.LogError: - prefix = "[WASM ERROR]" - } - log.Printf("%s %s", prefix, message) -} - -// Now 返回当前 Unix 时间戳 -func (h *HostContextImpl) Now() int64 { - return time.Now().Unix() -} - -// GetConfig 获取配置值 -func (h *HostContextImpl) GetConfig(key string) string { - if h.config == nil { - return "" - } - return h.config[key] -} - -// Close 关闭所有连接 -func (h *HostContextImpl) Close() error { - h.mu.Lock() - defer h.mu.Unlock() - - for handle, conn := range h.conns { - conn.Close() - delete(h.conns, handle) - } - return nil -} diff --git a/pkg/plugin/wasm/memory.go b/pkg/plugin/wasm/memory.go deleted file mode 100644 index 072bd4c..0000000 --- a/pkg/plugin/wasm/memory.go +++ /dev/null @@ -1,29 +0,0 @@ -package wasm - -import ( - "github.com/tetratelabs/wazero/api" -) - -// ReadString 从 WASM 内存读取字符串 -func ReadString(mem api.Memory, ptr, len uint32) (string, bool) { - data, ok := mem.Read(ptr, len) - if !ok { - return "", false - } - return string(data), true -} - -// WriteString 向 WASM 内存写入字符串 -func WriteString(mem api.Memory, ptr uint32, s string) bool { - return mem.Write(ptr, []byte(s)) -} - -// ReadBytes 从 WASM 内存读取字节 -func ReadBytes(mem api.Memory, ptr, len uint32) ([]byte, bool) { - return mem.Read(ptr, len) -} - -// WriteBytes 向 WASM 内存写入字节 -func WriteBytes(mem api.Memory, ptr uint32, data []byte) bool { - return mem.Write(ptr, data) -} diff --git a/pkg/plugin/wasm/module.go b/pkg/plugin/wasm/module.go deleted file mode 100644 index 3673ea7..0000000 --- a/pkg/plugin/wasm/module.go +++ /dev/null @@ -1,148 +0,0 @@ -package wasm - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/gotunnel/pkg/plugin" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" -) - -// WASMPlugin 封装 WASM 模块作为 ProxyHandler -type WASMPlugin struct { - name string - metadata plugin.PluginMetadata - runtime *Runtime - compiled wazero.CompiledModule - config map[string]string -} - -// NewWASMPlugin 从 WASM 字节创建 plugin -func NewWASMPlugin(ctx context.Context, rt *Runtime, name string, wasmBytes []byte) (*WASMPlugin, error) { - compiled, err := rt.runtime.CompileModule(ctx, wasmBytes) - if err != nil { - return nil, fmt.Errorf("compile module: %w", err) - } - - p := &WASMPlugin{ - name: name, - runtime: rt, - compiled: compiled, - } - - // 尝试获取元数据 - if err := p.loadMetadata(ctx); err != nil { - // 使用默认元数据 - p.metadata = plugin.PluginMetadata{ - Name: name, - Type: plugin.PluginTypeProxy, - Source: plugin.PluginSourceWASM, - } - } - - return p, nil -} - -// loadMetadata 从 WASM 模块加载元数据 -func (p *WASMPlugin) loadMetadata(ctx context.Context) error { - // 创建临时实例获取元数据 - inst, err := p.runtime.runtime.InstantiateModule(ctx, p.compiled, wazero.NewModuleConfig()) - if err != nil { - return err - } - defer inst.Close(ctx) - - metadataFn := inst.ExportedFunction("metadata") - if metadataFn == nil { - return fmt.Errorf("metadata function not exported") - } - - allocFn := inst.ExportedFunction("alloc") - if allocFn == nil { - return fmt.Errorf("alloc function not exported") - } - - // 分配缓冲区 - results, err := allocFn.Call(ctx, 1024) - if err != nil { - return err - } - bufPtr := uint32(results[0]) - - // 调用 metadata 函数 - results, err = metadataFn.Call(ctx, uint64(bufPtr), 1024) - if err != nil { - return err - } - actualLen := uint32(results[0]) - - // 读取元数据 - mem := inst.Memory() - data, ok := mem.Read(bufPtr, actualLen) - if !ok { - return fmt.Errorf("failed to read metadata") - } - - return json.Unmarshal(data, &p.metadata) -} - -// Metadata 返回 plugin 信息 -func (p *WASMPlugin) Metadata() plugin.PluginMetadata { - return p.metadata -} - -// Init 初始化 plugin -func (p *WASMPlugin) Init(config map[string]string) error { - p.config = config - return nil -} - -// HandleConn 处理连接 -func (p *WASMPlugin) HandleConn(conn interface{}, dialer plugin.Dialer) error { - // WASM plugin 的连接处理需要更复杂的实现 - // 这里提供基础框架,实际实现需要注册 host functions - return fmt.Errorf("WASM plugin HandleConn not fully implemented") -} - -// Close 关闭 plugin -func (p *WASMPlugin) Close() error { - return p.compiled.Close(context.Background()) -} - -// RegisterHostFunctions 注册 host functions 到 wazero 运行时 -func RegisterHostFunctions(ctx context.Context, r wazero.Runtime) (wazero.CompiledModule, error) { - return r.NewHostModuleBuilder("env"). - NewFunctionBuilder(). - WithFunc(hostLog). - Export("log"). - NewFunctionBuilder(). - WithFunc(hostNow). - Export("now"). - Compile(ctx) -} - -// host function 实现 -func hostLog(ctx context.Context, m api.Module, level uint32, msgPtr, msgLen uint32) { - data, ok := m.Memory().Read(msgPtr, msgLen) - if !ok { - return - } - prefix := "[WASM]" - switch plugin.LogLevel(level) { - case plugin.LogDebug: - prefix = "[WASM DEBUG]" - case plugin.LogInfo: - prefix = "[WASM INFO]" - case plugin.LogWarn: - prefix = "[WASM WARN]" - case plugin.LogError: - prefix = "[WASM ERROR]" - } - fmt.Printf("%s %s\n", prefix, string(data)) -} - -func hostNow(ctx context.Context) int64 { - return ctx.Value("now").(func() int64)() -} diff --git a/pkg/plugin/wasm/runtime.go b/pkg/plugin/wasm/runtime.go deleted file mode 100644 index 4a3bf0f..0000000 --- a/pkg/plugin/wasm/runtime.go +++ /dev/null @@ -1,116 +0,0 @@ -package wasm - -import ( - "context" - "fmt" - "sync" - - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" -) - -// Runtime 管理 wazero WASM 运行时 -type Runtime struct { - runtime wazero.Runtime - modules map[string]*Module - mu sync.RWMutex -} - -// NewRuntime 创建新的 WASM 运行时 -func NewRuntime(ctx context.Context) (*Runtime, error) { - r := wazero.NewRuntime(ctx) - return &Runtime{ - runtime: r, - modules: make(map[string]*Module), - }, nil -} - -// GetWazeroRuntime 返回底层 wazero 运行时 -func (r *Runtime) GetWazeroRuntime() wazero.Runtime { - return r.runtime -} - -// LoadModule 从字节加载 WASM 模块 -func (r *Runtime) LoadModule(ctx context.Context, name string, wasmBytes []byte) (*Module, error) { - r.mu.Lock() - defer r.mu.Unlock() - - if _, exists := r.modules[name]; exists { - return nil, fmt.Errorf("module %s already loaded", name) - } - - compiled, err := r.runtime.CompileModule(ctx, wasmBytes) - if err != nil { - return nil, fmt.Errorf("failed to compile module: %w", err) - } - - module := &Module{ - name: name, - compiled: compiled, - } - - r.modules[name] = module - return module, nil -} - -// GetModule 获取已加载的模块 -func (r *Runtime) GetModule(name string) (*Module, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - m, ok := r.modules[name] - return m, ok -} - -// UnloadModule 卸载 WASM 模块 -func (r *Runtime) UnloadModule(ctx context.Context, name string) error { - r.mu.Lock() - defer r.mu.Unlock() - - module, exists := r.modules[name] - if !exists { - return fmt.Errorf("module %s not found", name) - } - - if err := module.Close(ctx); err != nil { - return err - } - - delete(r.modules, name) - return nil -} - -// Close 关闭运行时 -func (r *Runtime) Close(ctx context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - - for name, module := range r.modules { - if err := module.Close(ctx); err != nil { - return fmt.Errorf("failed to close module %s: %w", name, err) - } - } - - return r.runtime.Close(ctx) -} - -// Module WASM 模块封装 -type Module struct { - name string - compiled wazero.CompiledModule - instance api.Module -} - -// Name 返回模块名称 -func (m *Module) Name() string { - return m.name -} - -// Close 关闭模块 -func (m *Module) Close(ctx context.Context) error { - if m.instance != nil { - if err := m.instance.Close(ctx); err != nil { - return err - } - } - return m.compiled.Close(ctx) -} diff --git a/pkg/protocol/message.go b/pkg/protocol/message.go index 42b82cf..575577d 100644 --- a/pkg/protocol/message.go +++ b/pkg/protocol/message.go @@ -38,6 +38,12 @@ const ( // 插件安装消息 MsgTypeInstallPlugins uint8 = 24 // 服务端推送安装插件列表 MsgTypePluginConfig uint8 = 25 // 插件配置同步 + + // 客户端插件消息 + MsgTypeClientPluginStart uint8 = 40 // 启动客户端插件 + MsgTypeClientPluginStop uint8 = 41 // 停止客户端插件 + MsgTypeClientPluginStatus uint8 = 42 // 客户端插件状态 + MsgTypeClientPluginConn uint8 = 43 // 客户端插件连接请求 ) // Message 基础消息结构 @@ -169,6 +175,35 @@ type UDPPacket struct { Data []byte `json:"data"` // UDP 数据 } +// ClientPluginStartRequest 启动客户端插件请求 +type ClientPluginStartRequest struct { + PluginName string `json:"plugin_name"` // 插件名称 + RuleName string `json:"rule_name"` // 规则名称 + RemotePort int `json:"remote_port"` // 服务端监听端口 + Config map[string]string `json:"config"` // 插件配置 +} + +// ClientPluginStopRequest 停止客户端插件请求 +type ClientPluginStopRequest struct { + PluginName string `json:"plugin_name"` // 插件名称 + RuleName string `json:"rule_name"` // 规则名称 +} + +// ClientPluginStatusResponse 客户端插件状态响应 +type ClientPluginStatusResponse struct { + PluginName string `json:"plugin_name"` // 插件名称 + RuleName string `json:"rule_name"` // 规则名称 + Running bool `json:"running"` // 是否运行中 + LocalAddr string `json:"local_addr"` // 本地监听地址 + Error string `json:"error"` // 错误信息 +} + +// ClientPluginConnRequest 客户端插件连接请求 +type ClientPluginConnRequest struct { + PluginName string `json:"plugin_name"` // 插件名称 + RuleName string `json:"rule_name"` // 规则名称 +} + // WriteMessage 写入消息到 writer func WriteMessage(w io.Writer, msg *Message) error { header := make([]byte, HeaderSize) diff --git a/web/src/types/index.ts b/web/src/types/index.ts index c70c317..ce1fa4f 100644 --- a/web/src/types/index.ts +++ b/web/src/types/index.ts @@ -55,6 +55,7 @@ export interface ClientStatus { nickname?: string online: boolean last_ping?: string + remote_addr?: string rule_count: number } @@ -66,6 +67,7 @@ export interface ClientDetail { plugins?: ClientPlugin[] online: boolean last_ping?: string + remote_addr?: string } // 服务器状态 diff --git a/web/src/views/ClientView.vue b/web/src/views/ClientView.vue index 85dc32c..2bff890 100644 --- a/web/src/views/ClientView.vue +++ b/web/src/views/ClientView.vue @@ -25,6 +25,7 @@ const clientId = route.params.id as string const online = ref(false) const lastPing = ref('') +const remoteAddr = ref('') const nickname = ref('') const rules = ref([]) const clientPlugins = ref([]) @@ -119,6 +120,7 @@ const loadClient = async () => { const { data } = await getClient(clientId) online.value = data.online lastPing.value = data.last_ping || '' + remoteAddr.value = data.remote_addr || '' nickname.value = data.nickname || '' rules.value = data.rules || [] clientPlugins.value = data.plugins || [] @@ -182,7 +184,16 @@ const saveEdit = async () => { await updateClient(clientId, { id: clientId, nickname: nickname.value, rules: editRules.value }) editing.value = false message.success('保存成功') - loadClient() + await loadClient() + // 如果客户端在线,自动推送配置 + if (online.value) { + try { + await pushConfigToClient(clientId) + message.success('配置已自动推送到客户端') + } catch (e: any) { + message.warning('配置已保存,但推送失败: ' + (e.response?.data || '未知错误')) + } + } } catch (e) { message.error('保存失败') } @@ -322,6 +333,9 @@ const savePluginConfig = async () => { {{ online ? '在线' : '离线' }} + + IP: {{ remoteAddr }} + 最后心跳: {{ lastPing }} diff --git a/web/src/views/HomeView.vue b/web/src/views/HomeView.vue index 31168a3..9a83691 100644 --- a/web/src/views/HomeView.vue +++ b/web/src/views/HomeView.vue @@ -66,7 +66,8 @@ const viewClient = (id: string) => {

{{ client.nickname || client.id }}

-

{{ client.id }}

+

{{ client.id }}

+

IP: {{ client.remote_addr }}

{{ client.online ? '在线' : '离线' }}