update
All checks were successful
Build Multi-Platform Binaries / build-frontend (push) Successful in 29s
Build Multi-Platform Binaries / build-binaries (amd64, darwin, server, false) (push) Successful in 49s
Build Multi-Platform Binaries / build-binaries (amd64, linux, client, true) (push) Successful in 34s
Build Multi-Platform Binaries / build-binaries (amd64, linux, server, true) (push) Successful in 59s
Build Multi-Platform Binaries / build-binaries (amd64, windows, client, true) (push) Successful in 34s
Build Multi-Platform Binaries / build-binaries (amd64, windows, server, true) (push) Successful in 55s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, client, true) (push) Successful in 37s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, server, true) (push) Successful in 1m7s
Build Multi-Platform Binaries / build-binaries (arm64, darwin, server, false) (push) Successful in 50s
Build Multi-Platform Binaries / build-binaries (arm64, linux, client, true) (push) Successful in 33s
Build Multi-Platform Binaries / build-binaries (arm64, linux, server, true) (push) Successful in 59s
Build Multi-Platform Binaries / build-binaries (arm64, windows, server, false) (push) Successful in 52s

This commit is contained in:
Flik
2025-12-29 14:24:46 +08:00
parent c728cc3bb7
commit e10736e05e
23 changed files with 591 additions and 910 deletions

View File

@@ -1,40 +1,25 @@
package plugin
import (
"context"
"fmt"
"log"
"sync"
"github.com/gotunnel/internal/server/db"
"github.com/gotunnel/pkg/plugin"
"github.com/gotunnel/pkg/plugin/builtin"
"github.com/gotunnel/pkg/plugin/wasm"
)
// Manager 服务端 plugin 管理器
type Manager struct {
registry *plugin.Registry
store db.PluginStore
runtime *wasm.Runtime
mu sync.RWMutex
}
// NewManager 创建 plugin 管理器
func NewManager(pluginStore db.PluginStore) (*Manager, error) {
ctx := context.Background()
runtime, err := wasm.NewRuntime(ctx)
if err != nil {
return nil, fmt.Errorf("create wasm runtime: %w", err)
}
func NewManager() (*Manager, error) {
registry := plugin.NewRegistry()
m := &Manager{
registry: registry,
store: pluginStore,
runtime: runtime,
}
// 注册内置 plugins
@@ -46,67 +31,20 @@ func NewManager(pluginStore db.PluginStore) (*Manager, error) {
}
// registerBuiltins 注册内置 plugins
// 注意: tcp, udp, http, https 是内置类型,直接在 tunnel 中处理
// 这里只注册需要通过 plugin 系统提供的协议
func (m *Manager) registerBuiltins() error {
// 使用统一的插件注册入口
// 注册服务端插件
if err := m.registry.RegisterAll(builtin.GetAll()); err != nil {
return err
}
log.Printf("[Plugin] Registered %d builtin plugins", len(builtin.GetAll()))
return nil
}
// LoadStoredPlugins 从数据库加载所有 plugins
func (m *Manager) LoadStoredPlugins(ctx context.Context) error {
if m.store == nil {
return nil
}
plugins, err := m.store.GetAllPlugins()
if err != nil {
return err
}
for _, p := range plugins {
data, err := m.store.GetPluginWASM(p.Name)
if err != nil {
log.Printf("[Plugin] Failed to load %s: %v", p.Name, err)
continue
}
if err := m.loadWASMPlugin(ctx, p.Name, data); err != nil {
log.Printf("[Plugin] Failed to init %s: %v", p.Name, err)
}
}
return nil
}
// loadWASMPlugin 加载 WASM plugin
func (m *Manager) loadWASMPlugin(ctx context.Context, name string, data []byte) error {
_, err := m.runtime.LoadModule(ctx, name, data)
if err != nil {
return err
}
log.Printf("[Plugin] WASM plugin loaded: %s", name)
return nil
}
// InstallPlugin 安装新的 WASM plugin
func (m *Manager) InstallPlugin(ctx context.Context, p *db.PluginData) error {
m.mu.Lock()
defer m.mu.Unlock()
// 存储到数据库
if m.store != nil {
if err := m.store.SavePlugin(p); err != nil {
// 注册客户端插件
for _, h := range builtin.GetAllClientPlugins() {
if err := m.registry.RegisterClientPlugin(h); err != nil {
return err
}
}
// 加载到运行时
return m.loadWASMPlugin(ctx, p.Name, p.WASMData)
log.Printf("[Plugin] Registered %d server plugins, %d client plugins",
len(builtin.GetAll()), len(builtin.GetAllClientPlugins()))
return nil
}
// GetHandler 返回指定代理类型的 handler
@@ -119,7 +57,7 @@ func (m *Manager) ListPlugins() []plugin.PluginInfo {
return m.registry.List()
}
// Close 关闭管理器
func (m *Manager) Close(ctx context.Context) error {
return m.runtime.Close(ctx)
// GetRegistry 返回插件注册表
func (m *Manager) GetRegistry() *plugin.Registry {
return m.registry
}

View File

@@ -22,19 +22,21 @@ func validateClientID(id string) bool {
// ClientStatus 客户端状态
type ClientStatus struct {
ID string `json:"id"`
Nickname string `json:"nickname,omitempty"`
Online bool `json:"online"`
LastPing string `json:"last_ping,omitempty"`
RuleCount int `json:"rule_count"`
ID string `json:"id"`
Nickname string `json:"nickname,omitempty"`
Online bool `json:"online"`
LastPing string `json:"last_ping,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
RuleCount int `json:"rule_count"`
}
// ServerInterface 服务端接口
type ServerInterface interface {
GetClientStatus(clientID string) (online bool, lastPing string)
GetClientStatus(clientID string) (online bool, lastPing string, remoteAddr string)
GetAllClientStatus() map[string]struct {
Online bool
LastPing string
Online bool
LastPing string
RemoteAddr string
}
ReloadConfig() error
GetBindAddr() string
@@ -158,6 +160,7 @@ func (h *APIHandler) getClients(rw http.ResponseWriter) {
if s, ok := statusMap[c.ID]; ok {
cs.Online = s.Online
cs.LastPing = s.LastPing
cs.RemoteAddr = s.RemoteAddr
}
result = append(result, cs)
}
@@ -256,10 +259,11 @@ func (h *APIHandler) getClient(rw http.ResponseWriter, clientID string) {
http.Error(rw, "client not found", http.StatusNotFound)
return
}
online, lastPing := h.server.GetClientStatus(clientID)
online, lastPing, remoteAddr := h.server.GetClientStatus(clientID)
h.jsonResponse(rw, map[string]interface{}{
"id": client.ID, "nickname": client.Nickname, "rules": client.Rules,
"plugins": client.Plugins, "online": online, "last_ping": lastPing,
"remote_addr": remoteAddr,
})
}
@@ -429,7 +433,7 @@ func (h *APIHandler) pushConfigToClient(rw http.ResponseWriter, r *http.Request,
return
}
online, _ := h.server.GetClientStatus(clientID)
online, _, _ := h.server.GetClientStatus(clientID)
if !online {
http.Error(rw, "client not online", http.StatusBadRequest)
return
@@ -523,7 +527,7 @@ func (h *APIHandler) installPluginsToClient(rw http.ResponseWriter, r *http.Requ
return
}
online, _ := h.server.GetClientStatus(clientID)
online, _, _ := h.server.GetClientStatus(clientID)
if !online {
http.Error(rw, "client not online", http.StatusBadRequest)
return
@@ -734,7 +738,7 @@ func (h *APIHandler) updateClientPluginConfig(rw http.ResponseWriter, r *http.Re
}
// 如果客户端在线,同步配置
online, _ := h.server.GetClientStatus(clientID)
online, _, _ := h.server.GetClientStatus(clientID)
if online {
if err := h.server.SyncPluginConfigToClient(clientID, pluginName, req.Config); err != nil {
// 配置已保存,但同步失败,返回警告

View File

@@ -52,6 +52,7 @@ type Server struct {
// ClientSession 客户端会话
type ClientSession struct {
ID string
RemoteAddr string // 客户端 IP 地址
Session *yamux.Session
Rules []protocol.ProxyRule
Listeners map[int]net.Listener
@@ -185,13 +186,20 @@ func (s *Server) setupClientSession(conn net.Conn, clientID string, rules []prot
return
}
// 提取客户端 IP去掉端口
remoteAddr := conn.RemoteAddr().String()
if host, _, err := net.SplitHostPort(remoteAddr); err == nil {
remoteAddr = host
}
cs := &ClientSession{
ID: clientID,
Session: session,
Rules: rules,
Listeners: make(map[int]net.Listener),
UDPConns: make(map[int]*net.UDPConn),
LastPing: time.Now(),
ID: clientID,
RemoteAddr: remoteAddr,
Session: session,
Rules: rules,
Listeners: make(map[int]net.Listener),
UDPConns: make(map[int]*net.UDPConn),
LastPing: time.Now(),
}
s.registerClient(cs)
@@ -284,6 +292,10 @@ func (s *Server) stopProxyListeners(cs *ClientSession) {
// startProxyListeners 启动代理监听
func (s *Server) startProxyListeners(cs *ClientSession) {
for _, rule := range cs.Rules {
if !rule.IsEnabled() {
continue
}
ruleType := rule.Type
if ruleType == "" {
ruleType = "tcp"
@@ -295,6 +307,12 @@ func (s *Server) startProxyListeners(cs *ClientSession) {
continue
}
// 检查是否为客户端插件
if s.isClientPlugin(ruleType) {
s.startClientPluginListener(cs, rule)
continue
}
// TCP 类型
if err := s.portManager.Reserve(rule.RemotePort, cs.ID); err != nil {
log.Printf("[Server] Port %d error: %v", rule.RemotePort, err)
@@ -445,22 +463,23 @@ func (s *Server) sendHeartbeat(cs *ClientSession) bool {
}
// GetClientStatus 获取客户端状态
func (s *Server) GetClientStatus(clientID string) (online bool, lastPing string) {
func (s *Server) GetClientStatus(clientID string) (online bool, lastPing string, remoteAddr string) {
s.mu.RLock()
defer s.mu.RUnlock()
if cs, ok := s.clients[clientID]; ok {
cs.mu.Lock()
defer cs.mu.Unlock()
return true, cs.LastPing.Format(time.RFC3339)
return true, cs.LastPing.Format(time.RFC3339), cs.RemoteAddr
}
return false, ""
return false, "", ""
}
// GetAllClientStatus 获取所有客户端状态
func (s *Server) GetAllClientStatus() map[string]struct {
Online bool
LastPing string
Online bool
LastPing string
RemoteAddr string
} {
// 先复制客户端引用,避免嵌套锁
s.mu.RLock()
@@ -471,18 +490,21 @@ func (s *Server) GetAllClientStatus() map[string]struct {
s.mu.RUnlock()
result := make(map[string]struct {
Online bool
LastPing string
Online bool
LastPing string
RemoteAddr string
})
for _, cs := range clients {
cs.mu.Lock()
result[cs.ID] = struct {
Online bool
LastPing string
Online bool
LastPing string
RemoteAddr string
}{
Online: true,
LastPing: cs.LastPing.Format(time.RFC3339),
Online: true,
LastPing: cs.LastPing.Format(time.RFC3339),
RemoteAddr: cs.RemoteAddr,
}
cs.mu.Unlock()
}
@@ -812,3 +834,119 @@ func (s *Server) sendPluginConfig(session *yamux.Session, pluginName string, con
}
return protocol.WriteMessage(stream, msg)
}
// isClientPlugin 检查是否为客户端插件
func (s *Server) isClientPlugin(pluginType string) bool {
if s.pluginRegistry == nil {
return false
}
handler, err := s.pluginRegistry.GetClientPlugin(pluginType)
if err != nil {
return false
}
return handler != nil
}
// startClientPluginListener 启动客户端插件监听
func (s *Server) startClientPluginListener(cs *ClientSession, rule protocol.ProxyRule) {
if err := s.portManager.Reserve(rule.RemotePort, cs.ID); err != nil {
log.Printf("[Server] Port %d error: %v", rule.RemotePort, err)
return
}
// 发送启动命令到客户端
if err := s.sendClientPluginStart(cs.Session, rule); err != nil {
log.Printf("[Server] Failed to start client plugin %s: %v", rule.Type, err)
s.portManager.Release(rule.RemotePort)
return
}
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", rule.RemotePort))
if err != nil {
log.Printf("[Server] Listen %d error: %v", rule.RemotePort, err)
s.portManager.Release(rule.RemotePort)
return
}
cs.mu.Lock()
cs.Listeners[rule.RemotePort] = ln
cs.mu.Unlock()
log.Printf("[Server] Client plugin %s on :%d", rule.Type, rule.RemotePort)
go s.acceptClientPluginConns(cs, ln, rule)
}
// sendClientPluginStart 发送客户端插件启动命令
func (s *Server) sendClientPluginStart(session *yamux.Session, rule protocol.ProxyRule) error {
stream, err := session.Open()
if err != nil {
return err
}
defer stream.Close()
req := protocol.ClientPluginStartRequest{
PluginName: rule.Type,
RuleName: rule.Name,
RemotePort: rule.RemotePort,
Config: rule.PluginConfig,
}
msg, err := protocol.NewMessage(protocol.MsgTypeClientPluginStart, req)
if err != nil {
return err
}
if err := protocol.WriteMessage(stream, msg); err != nil {
return err
}
// 等待响应
resp, err := protocol.ReadMessage(stream)
if err != nil {
return err
}
if resp.Type != protocol.MsgTypeClientPluginStatus {
return fmt.Errorf("unexpected response type: %d", resp.Type)
}
var status protocol.ClientPluginStatusResponse
if err := resp.ParsePayload(&status); err != nil {
return err
}
if !status.Running {
return fmt.Errorf("plugin failed: %s", status.Error)
}
return nil
}
// acceptClientPluginConns 接受客户端插件连接
func (s *Server) acceptClientPluginConns(cs *ClientSession, ln net.Listener, rule protocol.ProxyRule) {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go s.handleClientPluginConn(cs, conn, rule)
}
}
// handleClientPluginConn 处理客户端插件连接
func (s *Server) handleClientPluginConn(cs *ClientSession, conn net.Conn, rule protocol.ProxyRule) {
defer conn.Close()
stream, err := cs.Session.Open()
if err != nil {
log.Printf("[Server] Open stream error: %v", err)
return
}
defer stream.Close()
req := protocol.ClientPluginConnRequest{
PluginName: rule.Type,
RuleName: rule.Name,
}
msg, _ := protocol.NewMessage(protocol.MsgTypeClientPluginConn, req)
if err := protocol.WriteMessage(stream, msg); err != nil {
return
}
relay.Relay(conn, stream)
}