feat: implement initial client-server tunnel with WebSocket, custom protocol, and comprehensive proxy handling.
All checks were successful
Build Multi-Platform Binaries / build-frontend (push) Successful in 56s
Build Multi-Platform Binaries / build-binaries (amd64, linux, client, true) (push) Successful in 1m40s
Build Multi-Platform Binaries / build-binaries (amd64, darwin, server, false) (push) Successful in 1m48s
Build Multi-Platform Binaries / build-binaries (amd64, windows, client, true) (push) Successful in 1m24s
Build Multi-Platform Binaries / build-binaries (amd64, linux, server, true) (push) Successful in 1m47s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, client, true) (push) Successful in 1m16s
Build Multi-Platform Binaries / build-binaries (amd64, windows, server, true) (push) Successful in 1m42s
Build Multi-Platform Binaries / build-binaries (arm64, darwin, server, false) (push) Successful in 1m38s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, server, true) (push) Successful in 1m53s
Build Multi-Platform Binaries / build-binaries (arm64, linux, client, true) (push) Successful in 1m25s
Build Multi-Platform Binaries / build-binaries (arm64, linux, server, true) (push) Successful in 1m55s
Build Multi-Platform Binaries / build-binaries (arm64, windows, server, false) (push) Successful in 1m18s
All checks were successful
Build Multi-Platform Binaries / build-frontend (push) Successful in 56s
Build Multi-Platform Binaries / build-binaries (amd64, linux, client, true) (push) Successful in 1m40s
Build Multi-Platform Binaries / build-binaries (amd64, darwin, server, false) (push) Successful in 1m48s
Build Multi-Platform Binaries / build-binaries (amd64, windows, client, true) (push) Successful in 1m24s
Build Multi-Platform Binaries / build-binaries (amd64, linux, server, true) (push) Successful in 1m47s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, client, true) (push) Successful in 1m16s
Build Multi-Platform Binaries / build-binaries (amd64, windows, server, true) (push) Successful in 1m42s
Build Multi-Platform Binaries / build-binaries (arm64, darwin, server, false) (push) Successful in 1m38s
Build Multi-Platform Binaries / build-binaries (arm, 7, linux, server, true) (push) Successful in 1m53s
Build Multi-Platform Binaries / build-binaries (arm64, linux, client, true) (push) Successful in 1m25s
Build Multi-Platform Binaries / build-binaries (arm64, linux, server, true) (push) Successful in 1m55s
Build Multi-Platform Binaries / build-binaries (arm64, windows, server, false) (push) Successful in 1m18s
This commit is contained in:
@@ -61,12 +61,12 @@ type Server struct {
|
||||
mu sync.RWMutex
|
||||
tlsConfig *tls.Config
|
||||
pluginRegistry *plugin.Registry
|
||||
jsPlugins []JSPluginEntry // 配置的 JS 插件
|
||||
connSem chan struct{} // 连接数信号量
|
||||
activeConns int64 // 当前活跃连接数
|
||||
listener net.Listener // 主监听器
|
||||
shutdown chan struct{} // 关闭信号
|
||||
wg sync.WaitGroup // 等待所有连接关闭
|
||||
jsPlugins []JSPluginEntry // 配置的 JS 插件
|
||||
connSem chan struct{} // 连接数信号量
|
||||
activeConns int64 // 当前活跃连接数
|
||||
listener net.Listener // 主监听器
|
||||
shutdown chan struct{} // 关闭信号
|
||||
wg sync.WaitGroup // 等待所有连接关闭
|
||||
logSessions *LogSessionManager // 日志会话管理器
|
||||
}
|
||||
|
||||
@@ -82,14 +82,14 @@ type JSPluginEntry struct {
|
||||
|
||||
// ClientSession 客户端会话
|
||||
type ClientSession struct {
|
||||
ID string
|
||||
RemoteAddr string // 客户端 IP 地址
|
||||
Session *yamux.Session
|
||||
Rules []protocol.ProxyRule
|
||||
Listeners map[int]net.Listener
|
||||
UDPConns map[int]*net.UDPConn // UDP 连接
|
||||
LastPing time.Time
|
||||
mu sync.Mutex
|
||||
ID string
|
||||
RemoteAddr string // 客户端 IP 地址
|
||||
Session *yamux.Session
|
||||
Rules []protocol.ProxyRule
|
||||
Listeners map[int]net.Listener
|
||||
UDPConns map[int]*net.UDPConn // UDP 连接
|
||||
LastPing time.Time
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewServer 创建服务端
|
||||
@@ -452,6 +452,9 @@ func (s *Server) startProxyListeners(cs *ClientSession) {
|
||||
case "http", "https":
|
||||
log.Printf("[Server] HTTP proxy %s on :%d", rule.Name, rule.RemotePort)
|
||||
go s.acceptProxyServerConns(cs, ln, rule)
|
||||
case "websocket":
|
||||
log.Printf("[Server] Websocket proxy %s on :%d", rule.Name, rule.RemotePort)
|
||||
go s.acceptWebsocketConns(cs, ln, rule)
|
||||
default:
|
||||
log.Printf("[Server] TCP proxy %s: :%d -> %s:%d",
|
||||
rule.Name, rule.RemotePort, rule.LocalIP, rule.LocalPort)
|
||||
|
||||
146
internal/server/tunnel/websocket.go
Normal file
146
internal/server/tunnel/websocket.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/gotunnel/pkg/protocol"
|
||||
"github.com/gotunnel/pkg/relay"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true // 允许所有跨域请求
|
||||
},
|
||||
}
|
||||
|
||||
// WSConnAdapter 适配器:将 websocket.Conn 适配为 io.ReadWriter
|
||||
type WSConnAdapter struct {
|
||||
conn *websocket.Conn
|
||||
// 读缓冲
|
||||
reader io.Reader
|
||||
}
|
||||
|
||||
func NewWSConnAdapter(conn *websocket.Conn) *WSConnAdapter {
|
||||
return &WSConnAdapter{
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) Read(p []byte) (n int, err error) {
|
||||
if a.reader == nil {
|
||||
messageType, reader, err := a.conn.NextReader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
|
||||
// 忽略非数据消息
|
||||
return 0, nil
|
||||
}
|
||||
a.reader = reader
|
||||
}
|
||||
n, err = a.reader.Read(p)
|
||||
if err == io.EOF {
|
||||
a.reader = nil
|
||||
err = nil // 当前消息读完,不代表连接断开
|
||||
// 如果读到了0字节,尝试读下一个消息,避免因为返回 (0, nil) 导致调用方以为无数据空转
|
||||
if n == 0 {
|
||||
return a.Read(p)
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) Write(p []byte) (n int, err error) {
|
||||
err = a.conn.WriteMessage(websocket.BinaryMessage, p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) Close() error {
|
||||
return a.conn.Close()
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) LocalAddr() net.Addr {
|
||||
return a.conn.LocalAddr()
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) RemoteAddr() net.Addr {
|
||||
return a.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) SetDeadline(t time.Time) error {
|
||||
if err := a.conn.SetReadDeadline(t); err != nil {
|
||||
return err
|
||||
}
|
||||
return a.conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) SetReadDeadline(t time.Time) error {
|
||||
return a.conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (a *WSConnAdapter) SetWriteDeadline(t time.Time) error {
|
||||
return a.conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// acceptWebsocketConns 接受 Websocket 连接
|
||||
func (s *Server) acceptWebsocketConns(cs *ClientSession, ln net.Listener, rule protocol.ProxyRule) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
wsConn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[Server] Websocket upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
conn := NewWSConnAdapter(wsConn)
|
||||
// 这里的 conn 并没有实现 net.Conn 接口的全部方法 (LocalAddr, RemoteAddr 等),
|
||||
// Relay 函数如果需要 net.Conn,可能需要更完整的适配器。
|
||||
// 查看 relay.Relay 签名:func Relay(c1, c2 io.ReadWriteCloser)
|
||||
// 假设 relay.Relay 接受 io.ReadWriteCloser。
|
||||
|
||||
go s.handleWebsocketProxyConn(cs, conn, rule)
|
||||
})
|
||||
|
||||
server := &http.Server{
|
||||
Handler: mux,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
// 这里不需要协程,因为 startProxyListeners 中已经是 go s.acceptWebsocketConns(...) 调用了?
|
||||
// 不,startProxyListeners 中 iterate rules。如果是 acceptWebsocketConns,应该是在那里 go。
|
||||
// 检查 caller 逻辑。
|
||||
|
||||
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
||||
log.Printf("[Server] Websocket server error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleWebsocketProxyConn 处理 Websocket 代理连接
|
||||
func (s *Server) handleWebsocketProxyConn(cs *ClientSession, conn net.Conn, rule protocol.ProxyRule) {
|
||||
defer conn.Close()
|
||||
|
||||
stream, err := cs.Session.Open()
|
||||
if err != nil {
|
||||
log.Printf("[Server] Open stream error: %v", err)
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
// 发送新代理连接请求,告知客户端连接到哪里
|
||||
req := protocol.NewProxyRequest{RemotePort: rule.RemotePort}
|
||||
msg, _ := protocol.NewMessage(protocol.MsgTypeNewProxy, req)
|
||||
if err := protocol.WriteMessage(stream, msg); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
relay.Relay(conn, stream)
|
||||
}
|
||||
120
internal/server/tunnel/websocket_test.go
Normal file
120
internal/server/tunnel/websocket_test.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestWSConnAdapter(t *testing.T) {
|
||||
// 1. 设置测试服务器
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
upgrader := websocket.Upgrader{}
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
t.Errorf("upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
adapter := NewWSConnAdapter(c)
|
||||
defer adapter.Close()
|
||||
|
||||
// Echo server
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := adapter.Read(buf)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
// websocket close might cause normal error locally
|
||||
}
|
||||
break
|
||||
}
|
||||
_, err = adapter.Write(buf[:n])
|
||||
if err != nil {
|
||||
t.Errorf("write error: %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// 2. 客户端连接
|
||||
u := "ws" + strings.TrimPrefix(server.URL, "http")
|
||||
ws, _, err := websocket.DefaultDialer.Dial(u, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer ws.Close()
|
||||
|
||||
// 3. 发送数据
|
||||
message := []byte("hello websocket")
|
||||
err = ws.WriteMessage(websocket.BinaryMessage, message)
|
||||
if err != nil {
|
||||
t.Fatalf("write message error: %v", err)
|
||||
}
|
||||
|
||||
// 4. 接收响应
|
||||
_, p, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read message error: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(message, p) {
|
||||
t.Errorf("expected %s, got %s", message, p)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWSConnAdapter_ReadMultiFrame(t *testing.T) {
|
||||
// 测试多次 Read 调用读取一个 frame,或者一个 Read 读取多个 frame (net.Conn 语义)
|
||||
// WSConnAdapter 实现是 Read 对应 NextReader,如果 buffer 小,可能一部分一部分读。
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
upgrader := websocket.Upgrader{}
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
adapter := NewWSConnAdapter(c)
|
||||
|
||||
// 只要收到数据就这就验证通过
|
||||
buf := make([]byte, 10)
|
||||
n, err := adapter.Read(buf)
|
||||
if err != nil {
|
||||
t.Errorf("read error: %v", err)
|
||||
}
|
||||
if n != 5 { // "hello"
|
||||
t.Errorf("expected 5 bytes, got %d", n)
|
||||
}
|
||||
|
||||
// 读剩下的 "world"
|
||||
n, err = adapter.Read(buf)
|
||||
if err != nil {
|
||||
t.Errorf("read 2 error: %v", err)
|
||||
}
|
||||
if n != 5 {
|
||||
t.Errorf("expected 5 bytes, got %d", n)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
u := "ws" + strings.TrimPrefix(server.URL, "http")
|
||||
ws, _, err := websocket.DefaultDialer.Dial(u, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer ws.Close()
|
||||
|
||||
// 发送两个 BinaryMessage
|
||||
ws.WriteMessage(websocket.BinaryMessage, []byte("hello"))
|
||||
ws.WriteMessage(websocket.BinaryMessage, []byte("world"))
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
Reference in New Issue
Block a user