From 2f98e1ac7d3d3b5681bd23a3ab5d3125c7494a23 Mon Sep 17 00:00:00 2001 From: Flik Date: Sat, 3 Jan 2026 16:19:52 +0800 Subject: [PATCH] feat(logging): implement client log streaming and management - Added log streaming functionality for clients, allowing real-time log access via SSE. - Introduced LogHandler to manage log streaming requests and responses. - Implemented LogSessionManager to handle active log sessions and listeners. - Enhanced protocol with log-related message types and structures. - Created Logger for client-side logging, supporting various log levels and file output. - Developed LogViewer component for the web interface to display and filter logs. - Updated API to support log stream creation and management. - Added support for querying logs by level and searching through log messages. --- PLUGINS.md | 82 +++++++ internal/client/tunnel/client.go | 88 +++++++ internal/client/tunnel/logger.go | 240 +++++++++++++++++++ internal/server/router/handler/interfaces.go | 4 + internal/server/router/handler/log.go | 96 ++++++++ internal/server/router/middleware/jwt.go | 8 + internal/server/router/router.go | 4 + internal/server/tunnel/log_session.go | 155 ++++++++++++ internal/server/tunnel/server.go | 95 ++++++++ pkg/plugin/script/js.go | 215 ++++++++++++++++- pkg/protocol/message.go | 33 +++ web/src/api/index.ts | 41 +++- web/src/components/LogViewer.vue | 232 ++++++++++++++++++ web/src/types/index.ts | 15 ++ web/src/views/ClientView.vue | 15 +- 15 files changed, 1311 insertions(+), 12 deletions(-) create mode 100644 internal/client/tunnel/logger.go create mode 100644 internal/server/router/handler/log.go create mode 100644 internal/server/tunnel/log_session.go create mode 100644 web/src/components/LogViewer.vue diff --git a/PLUGINS.md b/PLUGINS.md index b8327b6..7370f18 100644 --- a/PLUGINS.md +++ b/PLUGINS.md @@ -283,6 +283,88 @@ function handleConn(conn) { --- +### 增强 API (Enhanced APIs) + +GoTunnel v2.0+ 提供了更多强大的 API 能力。 + +#### `logger` (日志) + +推荐使用结构化日志替代简单的 `log()`。 + +- `logger.info(msg)` +- `logger.warn(msg)` +- `logger.error(msg)` + +```javascript +logger.info("Server started"); +logger.error("Connection failed"); +``` + +#### `config` (配置) + +增强的配置获取方式。 + +- `config.get(key)`: 获取配置值 +- `config.getAll()`: 获取所有配置 + +```javascript +var all = config.getAll(); +var port = config.get("port"); +``` + +#### `storage` (持久化存储) + +简单的 Key-Value 存储,数据保存在客户端本地。 + +- `storage.get(key, default)` +- `storage.set(key, value)` +- `storage.delete(key)` +- `storage.keys()` + +```javascript +storage.set("last_run", Date.now()); +var last = storage.get("last_run", 0); +``` + +#### `event` (事件总线) + +插件内部或插件间的事件通信。 + +- `event.on(name, callback)` +- `event.emit(name, data)` +- `event.off(name)` + +```javascript +event.on("user_login", function(user) { + logger.info("User logged in: " + user); +}); +event.emit("user_login", "admin"); +``` + +#### `request` (HTTP 请求) + +发起外部 HTTP 请求。 + +- `request.get(url)` +- `request.post(url, contentType, body)` + +```javascript +var res = request.get("https://api.ipify.org"); +logger.info("My IP: " + res.body); +``` + +#### `notify` (通知) + +发送系统通知。 + +- `notify.send(title, message)` + +```javascript +notify.send("Download Complete", "File saved to disk"); +``` + +--- + ## 示例插件 ### Echo 服务 diff --git a/internal/client/tunnel/client.go b/internal/client/tunnel/client.go index 23cd804..ebe98ae 100644 --- a/internal/client/tunnel/client.go +++ b/internal/client/tunnel/client.go @@ -47,6 +47,7 @@ type Client struct { runningPlugins map[string]plugin.ClientPlugin versionStore *PluginVersionStore pluginMu sync.RWMutex + logger *Logger // 日志收集器 } // NewClient 创建客户端 @@ -59,12 +60,19 @@ func NewClient(serverAddr, token, id string) *Client { home, _ := os.UserHomeDir() dataDir := filepath.Join(home, ".gotunnel") + // 初始化日志收集器 + logger, err := NewLogger(dataDir) + if err != nil { + log.Printf("[Client] Failed to initialize logger: %v", err) + } + return &Client{ ServerAddr: serverAddr, Token: token, ID: id, DataDir: dataDir, runningPlugins: make(map[string]plugin.ClientPlugin), + logger: logger, } } @@ -236,6 +244,10 @@ func (c *Client) handleStream(stream net.Conn) { c.handlePluginConfigUpdate(stream, msg) case protocol.MsgTypeUpdateDownload: c.handleUpdateDownload(stream, msg) + case protocol.MsgTypeLogRequest: + go c.handleLogRequest(stream, msg) + case protocol.MsgTypeLogStop: + c.handleLogStop(stream, msg) } } @@ -900,3 +912,79 @@ func restartClientProcess(path, serverAddr, token, id string) { cmd.Start() os.Exit(0) } + +// handleLogRequest 处理日志请求 +func (c *Client) handleLogRequest(stream net.Conn, msg *protocol.Message) { + if c.logger == nil { + stream.Close() + return + } + + var req protocol.LogRequest + if err := msg.ParsePayload(&req); err != nil { + stream.Close() + return + } + + c.logger.Printf("Log request received: session=%s, follow=%v", req.SessionID, req.Follow) + + // 发送历史日志 + entries := c.logger.GetRecentLogs(req.Lines, req.Level) + if len(entries) > 0 { + data := protocol.LogData{ + SessionID: req.SessionID, + Entries: entries, + EOF: !req.Follow, + } + respMsg, _ := protocol.NewMessage(protocol.MsgTypeLogData, data) + if err := protocol.WriteMessage(stream, respMsg); err != nil { + stream.Close() + return + } + } + + // 如果不需要持续推送,关闭流 + if !req.Follow { + stream.Close() + return + } + + // 订阅新日志 + ch := c.logger.Subscribe(req.SessionID) + defer c.logger.Unsubscribe(req.SessionID) + defer stream.Close() + + // 持续推送新日志 + for entry := range ch { + // 应用级别过滤 + if req.Level != "" && entry.Level != req.Level { + continue + } + + data := protocol.LogData{ + SessionID: req.SessionID, + Entries: []protocol.LogEntry{entry}, + EOF: false, + } + respMsg, _ := protocol.NewMessage(protocol.MsgTypeLogData, data) + if err := protocol.WriteMessage(stream, respMsg); err != nil { + return + } + } +} + +// handleLogStop 处理停止日志流请求 +func (c *Client) handleLogStop(stream net.Conn, msg *protocol.Message) { + defer stream.Close() + + if c.logger == nil { + return + } + + var req protocol.LogStopRequest + if err := msg.ParsePayload(&req); err != nil { + return + } + + c.logger.Unsubscribe(req.SessionID) +} diff --git a/internal/client/tunnel/logger.go b/internal/client/tunnel/logger.go new file mode 100644 index 0000000..1784122 --- /dev/null +++ b/internal/client/tunnel/logger.go @@ -0,0 +1,240 @@ +package tunnel + +import ( + "container/ring" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/gotunnel/pkg/protocol" +) + +const ( + maxBufferSize = 1000 // 环形缓冲区最大条目数 + logFilePattern = "client.%s.log" // 日志文件名模式 +) + +// LogLevel 日志级别 +type LogLevel int + +const ( + LevelDebug LogLevel = iota + LevelInfo + LevelWarn + LevelError +) + +// Logger 客户端日志收集器 +type Logger struct { + dataDir string + buffer *ring.Ring + bufferMu sync.RWMutex + file *os.File + fileMu sync.Mutex + fileDate string + subscribers map[string]chan protocol.LogEntry + subMu sync.RWMutex +} + +// NewLogger 创建新的日志收集器 +func NewLogger(dataDir string) (*Logger, error) { + l := &Logger{ + dataDir: dataDir, + buffer: ring.New(maxBufferSize), + subscribers: make(map[string]chan protocol.LogEntry), + } + + // 确保日志目录存在 + logDir := filepath.Join(dataDir, "logs") + if err := os.MkdirAll(logDir, 0755); err != nil { + return nil, err + } + + return l, nil +} + +// Printf 记录日志 (兼容 log.Printf) +func (l *Logger) Printf(format string, args ...interface{}) { + l.log(LevelInfo, "client", format, args...) +} + +// Infof 记录信息日志 +func (l *Logger) Infof(format string, args ...interface{}) { + l.log(LevelInfo, "client", format, args...) +} + +// Warnf 记录警告日志 +func (l *Logger) Warnf(format string, args ...interface{}) { + l.log(LevelWarn, "client", format, args...) +} + +// Errorf 记录错误日志 +func (l *Logger) Errorf(format string, args ...interface{}) { + l.log(LevelError, "client", format, args...) +} + +// Debugf 记录调试日志 +func (l *Logger) Debugf(format string, args ...interface{}) { + l.log(LevelDebug, "client", format, args...) +} + +// PluginLog 记录插件日志 +func (l *Logger) PluginLog(pluginName, level, format string, args ...interface{}) { + var lvl LogLevel + switch level { + case "debug": + lvl = LevelDebug + case "warn": + lvl = LevelWarn + case "error": + lvl = LevelError + default: + lvl = LevelInfo + } + l.log(lvl, "plugin:"+pluginName, format, args...) +} + +func (l *Logger) log(level LogLevel, source, format string, args ...interface{}) { + entry := protocol.LogEntry{ + Timestamp: time.Now().UnixMilli(), + Level: levelToString(level), + Message: fmt.Sprintf(format, args...), + Source: source, + } + + // 输出到标准输出 + fmt.Printf("%s [%s] [%s] %s\n", + time.Now().Format("2006-01-02 15:04:05"), + entry.Level, + entry.Source, + entry.Message) + + // 添加到环形缓冲区 + l.bufferMu.Lock() + l.buffer.Value = entry + l.buffer = l.buffer.Next() + l.bufferMu.Unlock() + + // 写入文件 + l.writeToFile(entry) + + // 通知订阅者 + l.notifySubscribers(entry) +} + +// Subscribe 订阅日志流 +func (l *Logger) Subscribe(sessionID string) <-chan protocol.LogEntry { + ch := make(chan protocol.LogEntry, 100) + l.subMu.Lock() + l.subscribers[sessionID] = ch + l.subMu.Unlock() + return ch +} + +// Unsubscribe 取消订阅 +func (l *Logger) Unsubscribe(sessionID string) { + l.subMu.Lock() + if ch, ok := l.subscribers[sessionID]; ok { + close(ch) + delete(l.subscribers, sessionID) + } + l.subMu.Unlock() +} + +// GetRecentLogs 获取最近的日志 +func (l *Logger) GetRecentLogs(lines int, level string) []protocol.LogEntry { + l.bufferMu.RLock() + defer l.bufferMu.RUnlock() + + var entries []protocol.LogEntry + l.buffer.Do(func(v interface{}) { + if v == nil { + return + } + entry := v.(protocol.LogEntry) + // 应用级别过滤 + if level != "" && entry.Level != level { + return + } + entries = append(entries, entry) + }) + + // 如果指定了行数,返回最后 N 行 + if lines > 0 && len(entries) > lines { + entries = entries[len(entries)-lines:] + } + + return entries +} + +// Close 关闭日志收集器 +func (l *Logger) Close() { + l.fileMu.Lock() + if l.file != nil { + l.file.Close() + l.file = nil + } + l.fileMu.Unlock() + + l.subMu.Lock() + for _, ch := range l.subscribers { + close(ch) + } + l.subscribers = make(map[string]chan protocol.LogEntry) + l.subMu.Unlock() +} + +func (l *Logger) writeToFile(entry protocol.LogEntry) { + l.fileMu.Lock() + defer l.fileMu.Unlock() + + today := time.Now().Format("2006-01-02") + if l.fileDate != today { + if l.file != nil { + l.file.Close() + } + + logPath := filepath.Join(l.dataDir, "logs", fmt.Sprintf(logFilePattern, today)) + f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return + } + l.file = f + l.fileDate = today + } + + if l.file != nil { + fmt.Fprintf(l.file, "%d|%s|%s|%s\n", + entry.Timestamp, entry.Level, entry.Source, entry.Message) + } +} + +func (l *Logger) notifySubscribers(entry protocol.LogEntry) { + l.subMu.RLock() + defer l.subMu.RUnlock() + + for _, ch := range l.subscribers { + select { + case ch <- entry: + default: + // 订阅者太慢,丢弃日志 + } + } +} + +func levelToString(level LogLevel) string { + switch level { + case LevelDebug: + return "debug" + case LevelInfo: + return "info" + case LevelWarn: + return "warn" + case LevelError: + return "error" + default: + return "info" + } +} diff --git a/internal/server/router/handler/interfaces.go b/internal/server/router/handler/interfaces.go index d298e66..71c1a51 100644 --- a/internal/server/router/handler/interfaces.go +++ b/internal/server/router/handler/interfaces.go @@ -3,6 +3,7 @@ package handler import ( "github.com/gotunnel/internal/server/config" "github.com/gotunnel/internal/server/db" + "github.com/gotunnel/pkg/protocol" ) // AppInterface 应用接口 @@ -41,6 +42,9 @@ type ServerInterface interface { RestartClientPlugin(clientID, pluginName, ruleName string) error UpdateClientPluginConfig(clientID, pluginName, ruleName string, config map[string]string, restart bool) error SendUpdateToClient(clientID, downloadURL string) error + // 日志流 + StartClientLogStream(clientID, sessionID string, lines int, follow bool, level string) (<-chan protocol.LogEntry, error) + StopClientLogStream(sessionID string) } // ConfigField 配置字段 diff --git a/internal/server/router/handler/log.go b/internal/server/router/handler/log.go new file mode 100644 index 0000000..d2259ae --- /dev/null +++ b/internal/server/router/handler/log.go @@ -0,0 +1,96 @@ +package handler + +import ( + "io" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// LogHandler 日志处理器 +type LogHandler struct { + app AppInterface +} + +// NewLogHandler 创建日志处理器 +func NewLogHandler(app AppInterface) *LogHandler { + return &LogHandler{app: app} +} + +// StreamLogs 流式传输客户端日志 +// @Summary 流式传输客户端日志 +// @Description 通过 Server-Sent Events 实时接收客户端日志 +// @Tags Logs +// @Produce text/event-stream +// @Security Bearer +// @Param id path string true "客户端 ID" +// @Param lines query int false "初始日志行数" default(100) +// @Param follow query bool false "是否持续推送新日志" default(true) +// @Param level query string false "日志级别过滤 (info, warn, error)" +// @Success 200 {object} protocol.LogEntry +// @Router /api/client/{id}/logs [get] +func (h *LogHandler) StreamLogs(c *gin.Context) { + clientID := c.Param("id") + + // 检查客户端是否在线 + online, _, _ := h.app.GetServer().GetClientStatus(clientID) + if !online { + c.JSON(400, gin.H{"code": 400, "message": "client not online"}) + return + } + + // 解析查询参数 + lines := 100 + if v := c.Query("lines"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + lines = n + } + } + + follow := true + if v := c.Query("follow"); v == "false" { + follow = false + } + + level := c.Query("level") + + // 生成会话 ID + sessionID := uuid.New().String() + + // 启动日志流 + logCh, err := h.app.GetServer().StartClientLogStream(clientID, sessionID, lines, follow, level) + if err != nil { + c.JSON(500, gin.H{"code": 500, "message": err.Error()}) + return + } + + // 设置 SSE 响应头 + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + + // 获取客户端断开信号 + clientGone := c.Request.Context().Done() + + // 流式传输日志 + c.Stream(func(w io.Writer) bool { + select { + case <-clientGone: + h.app.GetServer().StopClientLogStream(sessionID) + return false + case entry, ok := <-logCh: + if !ok { + return false + } + c.SSEvent("log", entry) + return true + case <-time.After(30 * time.Second): + // 发送心跳 + c.SSEvent("heartbeat", gin.H{"ts": time.Now().UnixMilli()}) + return true + } + }) +} diff --git a/internal/server/router/middleware/jwt.go b/internal/server/router/middleware/jwt.go index 50ee0f3..bf953ed 100644 --- a/internal/server/router/middleware/jwt.go +++ b/internal/server/router/middleware/jwt.go @@ -12,6 +12,14 @@ import ( func JWTAuth(jwtAuth *auth.JWTAuth) gin.HandlerFunc { return func(c *gin.Context) { authHeader := c.GetHeader("Authorization") + + // 支持从 query 参数获取 token (用于 SSE 等不支持自定义 header 的场景) + if authHeader == "" { + if token := c.Query("token"); token != "" { + authHeader = "Bearer " + token + } + } + if authHeader == "" { c.JSON(http.StatusUnauthorized, gin.H{ "code": 401, diff --git a/internal/server/router/router.go b/internal/server/router/router.go index e937761..82ade77 100644 --- a/internal/server/router/router.go +++ b/internal/server/router/router.go @@ -105,6 +105,10 @@ func (r *GinRouter) SetupRoutes(app handler.AppInterface, jwtAuth *auth.JWTAuth, api.GET("/update/check/client", updateHandler.CheckClient) api.POST("/update/apply/server", updateHandler.ApplyServer) api.POST("/update/apply/client", updateHandler.ApplyClient) + + // 日志管理 + logHandler := handler.NewLogHandler(app) + api.GET("/client/:id/logs", logHandler.StreamLogs) } } diff --git a/internal/server/tunnel/log_session.go b/internal/server/tunnel/log_session.go new file mode 100644 index 0000000..ef621c5 --- /dev/null +++ b/internal/server/tunnel/log_session.go @@ -0,0 +1,155 @@ +package tunnel + +import ( + "net" + "sync" + + "github.com/gotunnel/pkg/protocol" +) + +// LogSessionManager 管理所有活跃的日志会话 +type LogSessionManager struct { + sessions map[string]*LogSession + mu sync.RWMutex +} + +// LogSession 日志流会话 +type LogSession struct { + ID string + ClientID string + Stream net.Conn + listeners []chan protocol.LogEntry + mu sync.Mutex + closed bool +} + +// NewLogSessionManager 创建日志会话管理器 +func NewLogSessionManager() *LogSessionManager { + return &LogSessionManager{ + sessions: make(map[string]*LogSession), + } +} + +// CreateSession 创建日志会话 +func (m *LogSessionManager) CreateSession(clientID, sessionID string, stream net.Conn) *LogSession { + session := &LogSession{ + ID: sessionID, + ClientID: clientID, + Stream: stream, + listeners: make([]chan protocol.LogEntry, 0), + } + + m.mu.Lock() + m.sessions[sessionID] = session + m.mu.Unlock() + + return session +} + +// GetSession 获取会话 +func (m *LogSessionManager) GetSession(sessionID string) *LogSession { + m.mu.RLock() + defer m.mu.RUnlock() + return m.sessions[sessionID] +} + +// RemoveSession 移除会话 +func (m *LogSessionManager) RemoveSession(sessionID string) { + m.mu.Lock() + if session, ok := m.sessions[sessionID]; ok { + session.Close() + delete(m.sessions, sessionID) + } + m.mu.Unlock() +} + +// GetSessionsByClient 获取客户端的所有会话 +func (m *LogSessionManager) GetSessionsByClient(clientID string) []*LogSession { + m.mu.RLock() + defer m.mu.RUnlock() + + var sessions []*LogSession + for _, session := range m.sessions { + if session.ClientID == clientID { + sessions = append(sessions, session) + } + } + return sessions +} + +// CleanupClientSessions 清理客户端的所有会话 +func (m *LogSessionManager) CleanupClientSessions(clientID string) { + m.mu.Lock() + defer m.mu.Unlock() + + for id, session := range m.sessions { + if session.ClientID == clientID { + session.Close() + delete(m.sessions, id) + } + } +} + +// AddListener 添加监听器 +func (s *LogSession) AddListener() <-chan protocol.LogEntry { + ch := make(chan protocol.LogEntry, 100) + s.mu.Lock() + s.listeners = append(s.listeners, ch) + s.mu.Unlock() + return ch +} + +// RemoveListener 移除监听器 +func (s *LogSession) RemoveListener(ch <-chan protocol.LogEntry) { + s.mu.Lock() + defer s.mu.Unlock() + + for i, listener := range s.listeners { + if listener == ch { + close(listener) + s.listeners = append(s.listeners[:i], s.listeners[i+1:]...) + break + } + } +} + +// Broadcast 广播日志条目到所有监听器 +func (s *LogSession) Broadcast(entry protocol.LogEntry) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, ch := range s.listeners { + select { + case ch <- entry: + default: + // 监听器太慢,丢弃日志 + } + } +} + +// Close 关闭会话 +func (s *LogSession) Close() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed { + return + } + s.closed = true + + for _, ch := range s.listeners { + close(ch) + } + s.listeners = nil + + if s.Stream != nil { + s.Stream.Close() + } +} + +// IsClosed 检查会话是否已关闭 +func (s *LogSession) IsClosed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed +} diff --git a/internal/server/tunnel/server.go b/internal/server/tunnel/server.go index cd607d7..28619d5 100644 --- a/internal/server/tunnel/server.go +++ b/internal/server/tunnel/server.go @@ -65,6 +65,7 @@ type Server struct { listener net.Listener // 主监听器 shutdown chan struct{} // 关闭信号 wg sync.WaitGroup // 等待所有连接关闭 + logSessions *LogSessionManager // 日志会话管理器 } // JSPluginEntry JS 插件条目 @@ -102,6 +103,7 @@ func NewServer(cs db.ClientStore, bindAddr string, bindPort int, token string, h clients: make(map[string]*ClientSession), connSem: make(chan struct{}, maxConnections), shutdown: make(chan struct{}), + logSessions: NewLogSessionManager(), } } @@ -1474,3 +1476,96 @@ func (s *Server) SendUpdateToClient(clientID, downloadURL string) error { log.Printf("[Server] Update command sent to client %s: %s", clientID, downloadURL) return nil } + +// StartClientLogStream 启动客户端日志流 +func (s *Server) StartClientLogStream(clientID, sessionID string, lines int, follow bool, level string) (<-chan protocol.LogEntry, error) { + s.mu.RLock() + cs, ok := s.clients[clientID] + s.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("client %s not found or not online", clientID) + } + + // 打开到客户端的流 + stream, err := cs.Session.Open() + if err != nil { + return nil, err + } + + // 发送日志请求 + req := protocol.LogRequest{ + SessionID: sessionID, + Lines: lines, + Follow: follow, + Level: level, + } + msg, _ := protocol.NewMessage(protocol.MsgTypeLogRequest, req) + if err := protocol.WriteMessage(stream, msg); err != nil { + stream.Close() + return nil, err + } + + // 创建会话 + session := s.logSessions.CreateSession(clientID, sessionID, stream) + listener := session.AddListener() + + // 启动 goroutine 读取客户端日志 + go s.readClientLogs(session, stream) + + return listener, nil +} + +// readClientLogs 读取客户端日志并广播到监听器 +func (s *Server) readClientLogs(session *LogSession, stream net.Conn) { + defer s.logSessions.RemoveSession(session.ID) + + for { + msg, err := protocol.ReadMessage(stream) + if err != nil { + return + } + + if msg.Type != protocol.MsgTypeLogData { + continue + } + + var data protocol.LogData + if err := msg.ParsePayload(&data); err != nil { + continue + } + + for _, entry := range data.Entries { + session.Broadcast(entry) + } + + if data.EOF { + return + } + } +} + +// StopClientLogStream 停止客户端日志流 +func (s *Server) StopClientLogStream(sessionID string) { + session := s.logSessions.GetSession(sessionID) + if session == nil { + return + } + + // 发送停止请求到客户端 + s.mu.RLock() + cs, ok := s.clients[session.ClientID] + s.mu.RUnlock() + + if ok { + stream, err := cs.Session.Open() + if err == nil { + req := protocol.LogStopRequest{SessionID: sessionID} + msg, _ := protocol.NewMessage(protocol.MsgTypeLogStop, req) + protocol.WriteMessage(stream, msg) + stream.Close() + } + } + + s.logSessions.RemoveSession(sessionID) +} diff --git a/pkg/plugin/script/js.go b/pkg/plugin/script/js.go index 4b1aaa6..7a2f20a 100644 --- a/pkg/plugin/script/js.go +++ b/pkg/plugin/script/js.go @@ -5,8 +5,10 @@ import ( "fmt" "io" "net" + "net/http" "os" "path/filepath" + "strings" "sync" "github.com/dop251/goja" @@ -19,21 +21,28 @@ type JSPlugin struct { source string vm *goja.Runtime metadata plugin.Metadata - config map[string]string - sandbox *Sandbox - running bool - mu sync.Mutex + config map[string]string + sandbox *Sandbox + running bool + mu sync.Mutex + eventListeners map[string][]func(goja.Value) + storagePath string } // NewJSPlugin 从 JS 源码创建插件 func NewJSPlugin(name, source string) (*JSPlugin, error) { p := &JSPlugin{ - name: name, - source: source, - vm: goja.New(), - sandbox: DefaultSandbox(), + name: name, + source: source, + vm: goja.New(), + sandbox: DefaultSandbox(), + eventListeners: make(map[string][]func(goja.Value)), + storagePath: filepath.Join("plugin_data", name+".json"), } + // 确保存储目录存在 + os.MkdirAll("plugin_data", 0755) + if err := p.init(); err != nil { return nil, err } @@ -55,7 +64,21 @@ func (p *JSPlugin) init() error { // 注入基础 API p.vm.Set("log", p.jsLog) + + // Config API (兼容旧的 config() 调用,同时支持 config.get/getAll) p.vm.Set("config", p.jsGetConfig) + if configObj := p.vm.Get("config"); configObj != nil { + obj := configObj.ToObject(p.vm) + obj.Set("get", p.jsGetConfig) + obj.Set("getAll", p.jsGetAllConfig) + } + + // 注入增强 API + p.vm.Set("logger", p.createLoggerAPI()) + p.vm.Set("storage", p.createStorageAPI()) + p.vm.Set("event", p.createEventAPI()) + p.vm.Set("request", p.createRequestAPI()) + p.vm.Set("notify", p.createNotifyAPI()) // 注入文件 API p.vm.Set("fs", p.createFsAPI()) @@ -117,7 +140,7 @@ func (p *JSPlugin) Metadata() plugin.Metadata { // Init 初始化插件配置 func (p *JSPlugin) Init(config map[string]string) error { p.config = config - p.vm.Set("config", config) + // p.vm.Set("config", config) // Do not overwrite the config API return nil } @@ -472,3 +495,177 @@ func getContentType(path string) string { } return "application/octet-stream" } + +// ============================================================================= +// Logger API +// ============================================================================= + +func (p *JSPlugin) createLoggerAPI() map[string]interface{} { + return map[string]interface{}{ + "info": func(msg string) { fmt.Printf("[JS:%s][INFO] %s\n", p.name, msg) }, + "warn": func(msg string) { fmt.Printf("[JS:%s][WARN] %s\n", p.name, msg) }, + "error": func(msg string) { fmt.Printf("[JS:%s][ERROR] %s\n", p.name, msg) }, + } +} + +// ============================================================================= +// Config API Enhancements +// ============================================================================= + +func (p *JSPlugin) jsGetAllConfig() map[string]string { + if p.config == nil { + return map[string]string{} + } + return p.config +} + +// ============================================================================= +// Storage API +// ============================================================================= + +func (p *JSPlugin) createStorageAPI() map[string]interface{} { + return map[string]interface{}{ + "get": p.storageGet, + "set": p.storageSet, + "delete": p.storageDelete, + "keys": p.storageKeys, + } +} + +func (p *JSPlugin) loadStorage() map[string]interface{} { + data := make(map[string]interface{}) + if _, err := os.Stat(p.storagePath); err == nil { + content, _ := os.ReadFile(p.storagePath) + json.Unmarshal(content, &data) + } + return data +} + +func (p *JSPlugin) saveStorage(data map[string]interface{}) { + content, _ := json.MarshalIndent(data, "", " ") + os.WriteFile(p.storagePath, content, 0644) +} + +func (p *JSPlugin) storageGet(key string, def interface{}) interface{} { + p.mu.Lock() + defer p.mu.Unlock() + data := p.loadStorage() + if v, ok := data[key]; ok { + return v + } + return def +} + +func (p *JSPlugin) storageSet(key string, value interface{}) { + p.mu.Lock() + defer p.mu.Unlock() + data := p.loadStorage() + data[key] = value + p.saveStorage(data) +} + +func (p *JSPlugin) storageDelete(key string) { + p.mu.Lock() + defer p.mu.Unlock() + data := p.loadStorage() + delete(data, key) + p.saveStorage(data) +} + +func (p *JSPlugin) storageKeys() []string { + p.mu.Lock() + defer p.mu.Unlock() + data := p.loadStorage() + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + return keys +} + +// ============================================================================= +// Event API +// ============================================================================= + +func (p *JSPlugin) createEventAPI() map[string]interface{} { + return map[string]interface{}{ + "on": p.eventOn, + "emit": p.eventEmit, + "off": p.eventOff, + } +} + +func (p *JSPlugin) eventOn(event string, callback func(goja.Value)) { + p.mu.Lock() + defer p.mu.Unlock() + p.eventListeners[event] = append(p.eventListeners[event], callback) +} + +func (p *JSPlugin) eventEmit(event string, data interface{}) { + p.mu.Lock() + listeners := p.eventListeners[event] + p.mu.Unlock() // 释放锁以允许回调中操作 + + val := p.vm.ToValue(data) + for _, cb := range listeners { + cb(val) + } +} + +func (p *JSPlugin) eventOff(event string) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.eventListeners, event) +} + +// ============================================================================= +// Request API (HTTP Client) +// ============================================================================= + +func (p *JSPlugin) createRequestAPI() map[string]interface{} { + return map[string]interface{}{ + "get": p.requestGet, + "post": p.requestPost, + } +} + +func (p *JSPlugin) requestGet(url string) map[string]interface{} { + resp, err := http.Get(url) + if err != nil { + return map[string]interface{}{"error": err.Error(), "status": 0} + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + return map[string]interface{}{ + "status": resp.StatusCode, + "body": string(body), + "error": "", + } +} + +func (p *JSPlugin) requestPost(url string, contentType, data string) map[string]interface{} { + resp, err := http.Post(url, contentType, strings.NewReader(data)) + if err != nil { + return map[string]interface{}{"error": err.Error(), "status": 0} + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + return map[string]interface{}{ + "status": resp.StatusCode, + "body": string(body), + "error": "", + } +} + +// ============================================================================= +// Notify API +// ============================================================================= + +func (p *JSPlugin) createNotifyAPI() map[string]interface{} { + return map[string]interface{}{ + "send": func(title, msg string) { + // 目前仅打印到日志,后续对接系统通知 + fmt.Printf("[NOTIFY][%s] %s: %s\n", p.name, title, msg) + }, + } +} diff --git a/pkg/protocol/message.go b/pkg/protocol/message.go index 26d177c..6fb0136 100644 --- a/pkg/protocol/message.go +++ b/pkg/protocol/message.go @@ -60,6 +60,11 @@ const ( MsgTypeUpdateApply uint8 = 73 // 应用更新请求 MsgTypeUpdateProgress uint8 = 74 // 更新进度 MsgTypeUpdateResult uint8 = 75 // 更新结果 + + // 日志相关消息 + MsgTypeLogRequest uint8 = 80 // 请求客户端日志 + MsgTypeLogData uint8 = 81 // 日志数据 + MsgTypeLogStop uint8 = 82 // 停止日志流 ) // Message 基础消息结构 @@ -305,6 +310,34 @@ type UpdateResultResponse struct { Message string `json:"message"` } +// LogRequest 日志请求 +type LogRequest struct { + SessionID string `json:"session_id"` // 会话 ID + Lines int `json:"lines"` // 请求的日志行数 + Follow bool `json:"follow"` // 是否持续推送新日志 + Level string `json:"level"` // 日志级别过滤 +} + +// LogEntry 日志条目 +type LogEntry struct { + Timestamp int64 `json:"ts"` // Unix 时间戳 (毫秒) + Level string `json:"level"` // 日志级别: debug, info, warn, error + Message string `json:"msg"` // 日志消息 + Source string `json:"src"` // 来源: client, plugin: +} + +// LogData 日志数据 +type LogData struct { + SessionID string `json:"session_id"` // 会话 ID + Entries []LogEntry `json:"entries"` // 日志条目 + EOF bool `json:"eof"` // 是否结束 +} + +// LogStopRequest 停止日志流请求 +type LogStopRequest struct { + SessionID string `json:"session_id"` // 会话 ID +} + // WriteMessage 写入消息到 writer func WriteMessage(w io.Writer, msg *Message) error { header := make([]byte, HeaderSize) diff --git a/web/src/api/index.ts b/web/src/api/index.ts index 010b3b2..71d29b8 100644 --- a/web/src/api/index.ts +++ b/web/src/api/index.ts @@ -1,5 +1,5 @@ -import { get, post, put, del } from '../config/axios' -import type { ClientConfig, ClientStatus, ClientDetail, ServerStatus, PluginInfo, StorePluginInfo, PluginConfigResponse, JSPlugin, RuleSchemasMap } from '../types' +import { get, post, put, del, getToken } from '../config/axios' +import type { ClientConfig, ClientStatus, ClientDetail, ServerStatus, PluginInfo, StorePluginInfo, PluginConfigResponse, JSPlugin, RuleSchemasMap, LogEntry, LogStreamOptions } from '../types' // 重新导出 token 管理方法 export { getToken, setToken, removeToken } from '../config/axios' @@ -104,3 +104,40 @@ export const applyServerUpdate = (downloadUrl: string, restart: boolean = true) post('/update/apply/server', { download_url: downloadUrl, restart }) export const applyClientUpdate = (clientId: string, downloadUrl: string) => post('/update/apply/client', { client_id: clientId, download_url: downloadUrl }) + +// 日志流 +export const createLogStream = ( + clientId: string, + options: LogStreamOptions = {}, + onLog: (entry: LogEntry) => void, + onError?: (error: Event) => void +): EventSource => { + const token = getToken() + const params = new URLSearchParams() + if (token) params.append('token', token) + if (options.lines !== undefined) params.append('lines', String(options.lines)) + if (options.follow !== undefined) params.append('follow', String(options.follow)) + if (options.level) params.append('level', options.level) + + const url = `/api/client/${clientId}/logs?${params.toString()}` + const eventSource = new EventSource(url) + + eventSource.addEventListener('log', (event) => { + try { + const entry = JSON.parse((event as MessageEvent).data) as LogEntry + onLog(entry) + } catch (e) { + console.error('Failed to parse log entry', e) + } + }) + + eventSource.addEventListener('heartbeat', () => { + // Keep-alive, no action needed + }) + + if (onError) { + eventSource.onerror = onError + } + + return eventSource +} diff --git a/web/src/components/LogViewer.vue b/web/src/components/LogViewer.vue new file mode 100644 index 0000000..63d3155 --- /dev/null +++ b/web/src/components/LogViewer.vue @@ -0,0 +1,232 @@ + + + + + diff --git a/web/src/types/index.ts b/web/src/types/index.ts index 86e99c2..1438a05 100644 --- a/web/src/types/index.ts +++ b/web/src/types/index.ts @@ -130,3 +130,18 @@ export interface JSPlugin { // 规则配置模式集合 export type RuleSchemasMap = Record + +// 日志条目 +export interface LogEntry { + ts: number // Unix 时间戳 (毫秒) + level: string // 日志级别: debug, info, warn, error + msg: string // 日志消息 + src: string // 来源: client, plugin: +} + +// 日志流选项 +export interface LogStreamOptions { + lines?: number // 初始日志行数 + follow?: boolean // 是否持续推送 + level?: string // 日志级别过滤 +} diff --git a/web/src/views/ClientView.vue b/web/src/views/ClientView.vue index c30148d..c7f70ab 100644 --- a/web/src/views/ClientView.vue +++ b/web/src/views/ClientView.vue @@ -9,7 +9,7 @@ import { import { ArrowBackOutline, CreateOutline, TrashOutline, PushOutline, PowerOutline, AddOutline, SaveOutline, CloseOutline, - SettingsOutline, StorefrontOutline, RefreshOutline, StopOutline, PlayOutline + SettingsOutline, StorefrontOutline, RefreshOutline, StopOutline, PlayOutline, DocumentTextOutline } from '@vicons/ionicons5' import { getClient, updateClient, deleteClient, pushConfigToClient, disconnectClient, restartClient, @@ -17,6 +17,7 @@ import { getStorePlugins, installStorePlugin, getRuleSchemas, startClientPlugin, restartClientPlugin, stopClientPlugin, deleteClientPlugin } from '../api' import type { ProxyRule, ClientPlugin, ConfigField, StorePluginInfo, RuleSchemasMap } from '../types' +import LogViewer from '../components/LogViewer.vue' const route = useRoute() const router = useRouter() @@ -88,6 +89,9 @@ const storeLoading = ref(false) const selectedStorePlugin = ref(null) const storeInstalling = ref(false) +// 日志查看相关 +const showLogViewer = ref(false) + // 商店插件相关函数 const openStoreModal = async () => { showStoreModal.value = true @@ -418,6 +422,10 @@ const handleDeletePlugin = (plugin: ClientPlugin) => { 推送配置 + + + 查看日志 + 从商店安装 @@ -731,5 +739,10 @@ const handleDeletePlugin = (plugin: ClientPlugin) => { + + + + +