feat(logging): implement client log streaming and management
All checks were successful
Build Multi-Platform Binaries / build-frontend (push) Successful in 30s
Build Multi-Platform Binaries / build-binaries (amd64, darwin, server, false) (push) Successful in 1m16s
Build Multi-Platform Binaries / build-binaries (amd64, linux, client, true) (push) Successful in 1m4s
Build Multi-Platform Binaries / build-binaries (amd64, linux, server, true) (push) Successful in 2m29s
Build Multi-Platform Binaries / build-binaries (amd64, windows, client, true) (push) Successful in 54s
Build Multi-Platform Binaries / build-binaries (amd64, windows, server, true) (push) Successful in 2m35s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, client, true) (push) Successful in 55s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, server, true) (push) Successful in 2m21s
Build Multi-Platform Binaries / build-binaries (arm64, darwin, server, false) (push) Successful in 1m35s
Build Multi-Platform Binaries / build-binaries (arm64, linux, client, true) (push) Successful in 1m1s
Build Multi-Platform Binaries / build-binaries (arm64, linux, server, true) (push) Successful in 1m55s
Build Multi-Platform Binaries / build-binaries (arm64, windows, server, false) (push) Successful in 1m39s

- Added log streaming functionality for clients, allowing real-time log access via SSE.
- Introduced LogHandler to manage log streaming requests and responses.
- Implemented LogSessionManager to handle active log sessions and listeners.
- Enhanced protocol with log-related message types and structures.
- Created Logger for client-side logging, supporting various log levels and file output.
- Developed LogViewer component for the web interface to display and filter logs.
- Updated API to support log stream creation and management.
- Added support for querying logs by level and searching through log messages.
This commit is contained in:
2026-01-03 16:19:52 +08:00
parent d2ca3fa2b9
commit 2f98e1ac7d
15 changed files with 1311 additions and 12 deletions

View File

@@ -47,6 +47,7 @@ type Client struct {
runningPlugins map[string]plugin.ClientPlugin
versionStore *PluginVersionStore
pluginMu sync.RWMutex
logger *Logger // 日志收集器
}
// NewClient 创建客户端
@@ -59,12 +60,19 @@ func NewClient(serverAddr, token, id string) *Client {
home, _ := os.UserHomeDir()
dataDir := filepath.Join(home, ".gotunnel")
// 初始化日志收集器
logger, err := NewLogger(dataDir)
if err != nil {
log.Printf("[Client] Failed to initialize logger: %v", err)
}
return &Client{
ServerAddr: serverAddr,
Token: token,
ID: id,
DataDir: dataDir,
runningPlugins: make(map[string]plugin.ClientPlugin),
logger: logger,
}
}
@@ -236,6 +244,10 @@ func (c *Client) handleStream(stream net.Conn) {
c.handlePluginConfigUpdate(stream, msg)
case protocol.MsgTypeUpdateDownload:
c.handleUpdateDownload(stream, msg)
case protocol.MsgTypeLogRequest:
go c.handleLogRequest(stream, msg)
case protocol.MsgTypeLogStop:
c.handleLogStop(stream, msg)
}
}
@@ -900,3 +912,79 @@ func restartClientProcess(path, serverAddr, token, id string) {
cmd.Start()
os.Exit(0)
}
// handleLogRequest 处理日志请求
func (c *Client) handleLogRequest(stream net.Conn, msg *protocol.Message) {
if c.logger == nil {
stream.Close()
return
}
var req protocol.LogRequest
if err := msg.ParsePayload(&req); err != nil {
stream.Close()
return
}
c.logger.Printf("Log request received: session=%s, follow=%v", req.SessionID, req.Follow)
// 发送历史日志
entries := c.logger.GetRecentLogs(req.Lines, req.Level)
if len(entries) > 0 {
data := protocol.LogData{
SessionID: req.SessionID,
Entries: entries,
EOF: !req.Follow,
}
respMsg, _ := protocol.NewMessage(protocol.MsgTypeLogData, data)
if err := protocol.WriteMessage(stream, respMsg); err != nil {
stream.Close()
return
}
}
// 如果不需要持续推送,关闭流
if !req.Follow {
stream.Close()
return
}
// 订阅新日志
ch := c.logger.Subscribe(req.SessionID)
defer c.logger.Unsubscribe(req.SessionID)
defer stream.Close()
// 持续推送新日志
for entry := range ch {
// 应用级别过滤
if req.Level != "" && entry.Level != req.Level {
continue
}
data := protocol.LogData{
SessionID: req.SessionID,
Entries: []protocol.LogEntry{entry},
EOF: false,
}
respMsg, _ := protocol.NewMessage(protocol.MsgTypeLogData, data)
if err := protocol.WriteMessage(stream, respMsg); err != nil {
return
}
}
}
// handleLogStop 处理停止日志流请求
func (c *Client) handleLogStop(stream net.Conn, msg *protocol.Message) {
defer stream.Close()
if c.logger == nil {
return
}
var req protocol.LogStopRequest
if err := msg.ParsePayload(&req); err != nil {
return
}
c.logger.Unsubscribe(req.SessionID)
}

View File

@@ -0,0 +1,240 @@
package tunnel
import (
"container/ring"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/gotunnel/pkg/protocol"
)
const (
maxBufferSize = 1000 // 环形缓冲区最大条目数
logFilePattern = "client.%s.log" // 日志文件名模式
)
// LogLevel 日志级别
type LogLevel int
const (
LevelDebug LogLevel = iota
LevelInfo
LevelWarn
LevelError
)
// Logger 客户端日志收集器
type Logger struct {
dataDir string
buffer *ring.Ring
bufferMu sync.RWMutex
file *os.File
fileMu sync.Mutex
fileDate string
subscribers map[string]chan protocol.LogEntry
subMu sync.RWMutex
}
// NewLogger 创建新的日志收集器
func NewLogger(dataDir string) (*Logger, error) {
l := &Logger{
dataDir: dataDir,
buffer: ring.New(maxBufferSize),
subscribers: make(map[string]chan protocol.LogEntry),
}
// 确保日志目录存在
logDir := filepath.Join(dataDir, "logs")
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, err
}
return l, nil
}
// Printf 记录日志 (兼容 log.Printf)
func (l *Logger) Printf(format string, args ...interface{}) {
l.log(LevelInfo, "client", format, args...)
}
// Infof 记录信息日志
func (l *Logger) Infof(format string, args ...interface{}) {
l.log(LevelInfo, "client", format, args...)
}
// Warnf 记录警告日志
func (l *Logger) Warnf(format string, args ...interface{}) {
l.log(LevelWarn, "client", format, args...)
}
// Errorf 记录错误日志
func (l *Logger) Errorf(format string, args ...interface{}) {
l.log(LevelError, "client", format, args...)
}
// Debugf 记录调试日志
func (l *Logger) Debugf(format string, args ...interface{}) {
l.log(LevelDebug, "client", format, args...)
}
// PluginLog 记录插件日志
func (l *Logger) PluginLog(pluginName, level, format string, args ...interface{}) {
var lvl LogLevel
switch level {
case "debug":
lvl = LevelDebug
case "warn":
lvl = LevelWarn
case "error":
lvl = LevelError
default:
lvl = LevelInfo
}
l.log(lvl, "plugin:"+pluginName, format, args...)
}
func (l *Logger) log(level LogLevel, source, format string, args ...interface{}) {
entry := protocol.LogEntry{
Timestamp: time.Now().UnixMilli(),
Level: levelToString(level),
Message: fmt.Sprintf(format, args...),
Source: source,
}
// 输出到标准输出
fmt.Printf("%s [%s] [%s] %s\n",
time.Now().Format("2006-01-02 15:04:05"),
entry.Level,
entry.Source,
entry.Message)
// 添加到环形缓冲区
l.bufferMu.Lock()
l.buffer.Value = entry
l.buffer = l.buffer.Next()
l.bufferMu.Unlock()
// 写入文件
l.writeToFile(entry)
// 通知订阅者
l.notifySubscribers(entry)
}
// Subscribe 订阅日志流
func (l *Logger) Subscribe(sessionID string) <-chan protocol.LogEntry {
ch := make(chan protocol.LogEntry, 100)
l.subMu.Lock()
l.subscribers[sessionID] = ch
l.subMu.Unlock()
return ch
}
// Unsubscribe 取消订阅
func (l *Logger) Unsubscribe(sessionID string) {
l.subMu.Lock()
if ch, ok := l.subscribers[sessionID]; ok {
close(ch)
delete(l.subscribers, sessionID)
}
l.subMu.Unlock()
}
// GetRecentLogs 获取最近的日志
func (l *Logger) GetRecentLogs(lines int, level string) []protocol.LogEntry {
l.bufferMu.RLock()
defer l.bufferMu.RUnlock()
var entries []protocol.LogEntry
l.buffer.Do(func(v interface{}) {
if v == nil {
return
}
entry := v.(protocol.LogEntry)
// 应用级别过滤
if level != "" && entry.Level != level {
return
}
entries = append(entries, entry)
})
// 如果指定了行数,返回最后 N 行
if lines > 0 && len(entries) > lines {
entries = entries[len(entries)-lines:]
}
return entries
}
// Close 关闭日志收集器
func (l *Logger) Close() {
l.fileMu.Lock()
if l.file != nil {
l.file.Close()
l.file = nil
}
l.fileMu.Unlock()
l.subMu.Lock()
for _, ch := range l.subscribers {
close(ch)
}
l.subscribers = make(map[string]chan protocol.LogEntry)
l.subMu.Unlock()
}
func (l *Logger) writeToFile(entry protocol.LogEntry) {
l.fileMu.Lock()
defer l.fileMu.Unlock()
today := time.Now().Format("2006-01-02")
if l.fileDate != today {
if l.file != nil {
l.file.Close()
}
logPath := filepath.Join(l.dataDir, "logs", fmt.Sprintf(logFilePattern, today))
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
l.file = f
l.fileDate = today
}
if l.file != nil {
fmt.Fprintf(l.file, "%d|%s|%s|%s\n",
entry.Timestamp, entry.Level, entry.Source, entry.Message)
}
}
func (l *Logger) notifySubscribers(entry protocol.LogEntry) {
l.subMu.RLock()
defer l.subMu.RUnlock()
for _, ch := range l.subscribers {
select {
case ch <- entry:
default:
// 订阅者太慢,丢弃日志
}
}
}
func levelToString(level LogLevel) string {
switch level {
case LevelDebug:
return "debug"
case LevelInfo:
return "info"
case LevelWarn:
return "warn"
case LevelError:
return "error"
default:
return "info"
}
}

View File

@@ -3,6 +3,7 @@ package handler
import (
"github.com/gotunnel/internal/server/config"
"github.com/gotunnel/internal/server/db"
"github.com/gotunnel/pkg/protocol"
)
// AppInterface 应用接口
@@ -41,6 +42,9 @@ type ServerInterface interface {
RestartClientPlugin(clientID, pluginName, ruleName string) error
UpdateClientPluginConfig(clientID, pluginName, ruleName string, config map[string]string, restart bool) error
SendUpdateToClient(clientID, downloadURL string) error
// 日志流
StartClientLogStream(clientID, sessionID string, lines int, follow bool, level string) (<-chan protocol.LogEntry, error)
StopClientLogStream(sessionID string)
}
// ConfigField 配置字段

View File

@@ -0,0 +1,96 @@
package handler
import (
"io"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// LogHandler 日志处理器
type LogHandler struct {
app AppInterface
}
// NewLogHandler 创建日志处理器
func NewLogHandler(app AppInterface) *LogHandler {
return &LogHandler{app: app}
}
// StreamLogs 流式传输客户端日志
// @Summary 流式传输客户端日志
// @Description 通过 Server-Sent Events 实时接收客户端日志
// @Tags Logs
// @Produce text/event-stream
// @Security Bearer
// @Param id path string true "客户端 ID"
// @Param lines query int false "初始日志行数" default(100)
// @Param follow query bool false "是否持续推送新日志" default(true)
// @Param level query string false "日志级别过滤 (info, warn, error)"
// @Success 200 {object} protocol.LogEntry
// @Router /api/client/{id}/logs [get]
func (h *LogHandler) StreamLogs(c *gin.Context) {
clientID := c.Param("id")
// 检查客户端是否在线
online, _, _ := h.app.GetServer().GetClientStatus(clientID)
if !online {
c.JSON(400, gin.H{"code": 400, "message": "client not online"})
return
}
// 解析查询参数
lines := 100
if v := c.Query("lines"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
lines = n
}
}
follow := true
if v := c.Query("follow"); v == "false" {
follow = false
}
level := c.Query("level")
// 生成会话 ID
sessionID := uuid.New().String()
// 启动日志流
logCh, err := h.app.GetServer().StartClientLogStream(clientID, sessionID, lines, follow, level)
if err != nil {
c.JSON(500, gin.H{"code": 500, "message": err.Error()})
return
}
// 设置 SSE 响应头
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
// 获取客户端断开信号
clientGone := c.Request.Context().Done()
// 流式传输日志
c.Stream(func(w io.Writer) bool {
select {
case <-clientGone:
h.app.GetServer().StopClientLogStream(sessionID)
return false
case entry, ok := <-logCh:
if !ok {
return false
}
c.SSEvent("log", entry)
return true
case <-time.After(30 * time.Second):
// 发送心跳
c.SSEvent("heartbeat", gin.H{"ts": time.Now().UnixMilli()})
return true
}
})
}

View File

@@ -12,6 +12,14 @@ import (
func JWTAuth(jwtAuth *auth.JWTAuth) gin.HandlerFunc {
return func(c *gin.Context) {
authHeader := c.GetHeader("Authorization")
// 支持从 query 参数获取 token (用于 SSE 等不支持自定义 header 的场景)
if authHeader == "" {
if token := c.Query("token"); token != "" {
authHeader = "Bearer " + token
}
}
if authHeader == "" {
c.JSON(http.StatusUnauthorized, gin.H{
"code": 401,

View File

@@ -105,6 +105,10 @@ func (r *GinRouter) SetupRoutes(app handler.AppInterface, jwtAuth *auth.JWTAuth,
api.GET("/update/check/client", updateHandler.CheckClient)
api.POST("/update/apply/server", updateHandler.ApplyServer)
api.POST("/update/apply/client", updateHandler.ApplyClient)
// 日志管理
logHandler := handler.NewLogHandler(app)
api.GET("/client/:id/logs", logHandler.StreamLogs)
}
}

View File

@@ -0,0 +1,155 @@
package tunnel
import (
"net"
"sync"
"github.com/gotunnel/pkg/protocol"
)
// LogSessionManager 管理所有活跃的日志会话
type LogSessionManager struct {
sessions map[string]*LogSession
mu sync.RWMutex
}
// LogSession 日志流会话
type LogSession struct {
ID string
ClientID string
Stream net.Conn
listeners []chan protocol.LogEntry
mu sync.Mutex
closed bool
}
// NewLogSessionManager 创建日志会话管理器
func NewLogSessionManager() *LogSessionManager {
return &LogSessionManager{
sessions: make(map[string]*LogSession),
}
}
// CreateSession 创建日志会话
func (m *LogSessionManager) CreateSession(clientID, sessionID string, stream net.Conn) *LogSession {
session := &LogSession{
ID: sessionID,
ClientID: clientID,
Stream: stream,
listeners: make([]chan protocol.LogEntry, 0),
}
m.mu.Lock()
m.sessions[sessionID] = session
m.mu.Unlock()
return session
}
// GetSession 获取会话
func (m *LogSessionManager) GetSession(sessionID string) *LogSession {
m.mu.RLock()
defer m.mu.RUnlock()
return m.sessions[sessionID]
}
// RemoveSession 移除会话
func (m *LogSessionManager) RemoveSession(sessionID string) {
m.mu.Lock()
if session, ok := m.sessions[sessionID]; ok {
session.Close()
delete(m.sessions, sessionID)
}
m.mu.Unlock()
}
// GetSessionsByClient 获取客户端的所有会话
func (m *LogSessionManager) GetSessionsByClient(clientID string) []*LogSession {
m.mu.RLock()
defer m.mu.RUnlock()
var sessions []*LogSession
for _, session := range m.sessions {
if session.ClientID == clientID {
sessions = append(sessions, session)
}
}
return sessions
}
// CleanupClientSessions 清理客户端的所有会话
func (m *LogSessionManager) CleanupClientSessions(clientID string) {
m.mu.Lock()
defer m.mu.Unlock()
for id, session := range m.sessions {
if session.ClientID == clientID {
session.Close()
delete(m.sessions, id)
}
}
}
// AddListener 添加监听器
func (s *LogSession) AddListener() <-chan protocol.LogEntry {
ch := make(chan protocol.LogEntry, 100)
s.mu.Lock()
s.listeners = append(s.listeners, ch)
s.mu.Unlock()
return ch
}
// RemoveListener 移除监听器
func (s *LogSession) RemoveListener(ch <-chan protocol.LogEntry) {
s.mu.Lock()
defer s.mu.Unlock()
for i, listener := range s.listeners {
if listener == ch {
close(listener)
s.listeners = append(s.listeners[:i], s.listeners[i+1:]...)
break
}
}
}
// Broadcast 广播日志条目到所有监听器
func (s *LogSession) Broadcast(entry protocol.LogEntry) {
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.listeners {
select {
case ch <- entry:
default:
// 监听器太慢,丢弃日志
}
}
}
// Close 关闭会话
func (s *LogSession) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.closed = true
for _, ch := range s.listeners {
close(ch)
}
s.listeners = nil
if s.Stream != nil {
s.Stream.Close()
}
}
// IsClosed 检查会话是否已关闭
func (s *LogSession) IsClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}

View File

@@ -65,6 +65,7 @@ type Server struct {
listener net.Listener // 主监听器
shutdown chan struct{} // 关闭信号
wg sync.WaitGroup // 等待所有连接关闭
logSessions *LogSessionManager // 日志会话管理器
}
// JSPluginEntry JS 插件条目
@@ -102,6 +103,7 @@ func NewServer(cs db.ClientStore, bindAddr string, bindPort int, token string, h
clients: make(map[string]*ClientSession),
connSem: make(chan struct{}, maxConnections),
shutdown: make(chan struct{}),
logSessions: NewLogSessionManager(),
}
}
@@ -1474,3 +1476,96 @@ func (s *Server) SendUpdateToClient(clientID, downloadURL string) error {
log.Printf("[Server] Update command sent to client %s: %s", clientID, downloadURL)
return nil
}
// StartClientLogStream 启动客户端日志流
func (s *Server) StartClientLogStream(clientID, sessionID string, lines int, follow bool, level string) (<-chan protocol.LogEntry, error) {
s.mu.RLock()
cs, ok := s.clients[clientID]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("client %s not found or not online", clientID)
}
// 打开到客户端的流
stream, err := cs.Session.Open()
if err != nil {
return nil, err
}
// 发送日志请求
req := protocol.LogRequest{
SessionID: sessionID,
Lines: lines,
Follow: follow,
Level: level,
}
msg, _ := protocol.NewMessage(protocol.MsgTypeLogRequest, req)
if err := protocol.WriteMessage(stream, msg); err != nil {
stream.Close()
return nil, err
}
// 创建会话
session := s.logSessions.CreateSession(clientID, sessionID, stream)
listener := session.AddListener()
// 启动 goroutine 读取客户端日志
go s.readClientLogs(session, stream)
return listener, nil
}
// readClientLogs 读取客户端日志并广播到监听器
func (s *Server) readClientLogs(session *LogSession, stream net.Conn) {
defer s.logSessions.RemoveSession(session.ID)
for {
msg, err := protocol.ReadMessage(stream)
if err != nil {
return
}
if msg.Type != protocol.MsgTypeLogData {
continue
}
var data protocol.LogData
if err := msg.ParsePayload(&data); err != nil {
continue
}
for _, entry := range data.Entries {
session.Broadcast(entry)
}
if data.EOF {
return
}
}
}
// StopClientLogStream 停止客户端日志流
func (s *Server) StopClientLogStream(sessionID string) {
session := s.logSessions.GetSession(sessionID)
if session == nil {
return
}
// 发送停止请求到客户端
s.mu.RLock()
cs, ok := s.clients[session.ClientID]
s.mu.RUnlock()
if ok {
stream, err := cs.Session.Open()
if err == nil {
req := protocol.LogStopRequest{SessionID: sessionID}
msg, _ := protocol.NewMessage(protocol.MsgTypeLogStop, req)
protocol.WriteMessage(stream, msg)
stream.Close()
}
}
s.logSessions.RemoveSession(sessionID)
}