refactor: add NewDialer and NewListener

This commit is contained in:
邹颖玖
2021-11-20 21:57:10 +08:00
parent 467369f4de
commit 38f5cc19c3
5 changed files with 77 additions and 44 deletions

View File

@@ -9,16 +9,14 @@ import (
type Forward struct {
listener transport.Listener
dialer transport.Dialer
routeTable *RouteTable
mempool sync.Pool
}
func NewForward(listener transport.Listener, dialer transport.Dialer) *Forward {
func NewForward(listener transport.Listener, routeTable *RouteTable) *Forward {
return &Forward{
listener: listener,
dialer: dialer,
routeTable: NewRouteTable(),
routeTable: routeTable,
mempool: sync.Pool{
New: func() interface{} {
return make([]byte, 1024*4)
@@ -50,7 +48,7 @@ func (f *Forward) forward(conn transport.Conn) {
}
defer entry.conn.Close()
logs.Debug("open a new connection to nexthop")
logs.Debug("open a new connection to next hop")
for {
stream, err := conn.AcceptStream()
@@ -62,7 +60,7 @@ func (f *Forward) forward(conn transport.Conn) {
logs.Debug("accept stream: %v", conn.RemoteAddr())
dst, err := entry.conn.OpenStream()
if err != nil {
logs.Error("open nexthop stream fail: %v", err)
logs.Error("open next hop stream fail: %v", err)
return
}

View File

@@ -4,9 +4,7 @@ import (
"flag"
"fmt"
"github.com/ICKelin/gtun/internal/logs"
"github.com/ICKelin/gtun/transport"
"github.com/ICKelin/gtun/transport/kcp"
"github.com/ICKelin/gtun/transport/mux"
"github.com/ICKelin/gtun/transport/transport_api"
)
func Main() {
@@ -22,39 +20,20 @@ func Main() {
logs.Init("forward.log", "debug", 10)
// initial local listener
var listener transport.Listener
lisCfg := cfg.ListenerConfig
switch lisCfg.Scheme {
case "kcp":
listener = kcp.NewListener(lisCfg.ListenAddr, []byte(lisCfg.RawConfig))
err := listener.Listen()
if err != nil {
logs.Error("new kcp server fail; %v", err)
return
}
defer listener.Close()
default:
listener = mux.NewListener(lisCfg.ListenAddr)
err := listener.Listen()
if err != nil {
logs.Error("new mux server fail: %v", err)
return
}
defer listener.Close()
listener, err := transport_api.NewListen(lisCfg.Scheme, lisCfg.ListenAddr, lisCfg.RawConfig)
if err != nil {
logs.Error("new listener fail: %v", err)
return
}
defer listener.Close()
// initial nexthop dialer
var dialer transport.Dialer
// initial next hop dialer
dialerCfg := cfg.NexthopConfig
switch dialerCfg.Scheme {
case "kcp":
dialer = kcp.NewDialer(dialerCfg.NexthopAddr, []byte(dialerCfg.RawConfig))
default:
dialer = mux.NewDialer(dialerCfg.NexthopAddr)
}
routeTable := NewRouteTable()
routeTable.Add(dialerCfg.Scheme, dialerCfg.NexthopAddr, dialerCfg.RawConfig)
f := NewForward(listener, dialer)
f := NewForward(listener, routeTable)
if err := f.Serve(); err != nil {
logs.Error("forward exist: %v", err)

View File

@@ -3,17 +3,19 @@ package forward
import (
"fmt"
"github.com/ICKelin/gtun/transport"
"github.com/ICKelin/gtun/transport/transport_api"
"math"
"sync"
)
var (
errNoRoute = fmt.Errorf("no route to host")
maxRtt = math.MinInt32
)
type RouteEntry struct {
scheme, addr string
rtt int32
lastActive int64
conn transport.Conn
}
@@ -30,8 +32,8 @@ func NewRouteTable() *RouteTable {
}
}
func (r *RouteTable) Add(scheme, addr string) error {
dialer, err := transport.Dial(scheme, addr)
func (r *RouteTable) Add(scheme, addr, cfg string) error {
dialer, err := transport_api.NewDialer(scheme, addr, cfg)
if err != nil {
return err
}

View File

@@ -0,0 +1,54 @@
package transport_api
import (
"errors"
"github.com/ICKelin/gtun/transport"
"github.com/ICKelin/gtun/transport/kcp"
"github.com/ICKelin/gtun/transport/mux"
)
const (
protoKCP = "kcp"
protoTCPMux = "mux"
)
var (
errUnsupported = errors.New("transport_api: unsupported protocol")
)
func NewListen(scheme, addr, cfg string) (transport.Listener, error) {
var listener transport.Listener
switch scheme {
case protoKCP:
listener = kcp.NewListener(addr, []byte(cfg))
err := listener.Listen()
if err != nil {
return nil, err
}
case protoTCPMux:
listener = mux.NewListener(addr)
err := listener.Listen()
if err != nil {
return nil, err
}
default:
return nil, errUnsupported
}
return listener, nil
}
func NewDialer(scheme, addr, cfg string) (transport.Dialer, error) {
var dialer transport.Dialer
switch scheme {
case protoKCP:
dialer = kcp.NewDialer(addr, []byte(cfg))
case protoTCPMux:
dialer = mux.NewDialer(addr)
default:
return nil, errUnsupported
}
return dialer, nil
}

View File

@@ -5,12 +5,12 @@ import (
"time"
)
// Dialer defines transport dialer for client side
// Dialer defines transport_api dialer for client side
type Dialer interface {
Dial() (Conn, error)
}
// Listener defines transport listener for server side
// Listener defines transport_api listener for server side
type Listener interface {
Listen() error
// Accept returns a connection
@@ -24,7 +24,7 @@ type Listener interface {
Addr() net.Addr
}
// Conn defines a transport connection
// Conn defines a transport_api connection
type Conn interface {
OpenStream() (Stream, error)
AcceptStream() (Stream, error)
@@ -33,7 +33,7 @@ type Conn interface {
RemoteAddr() net.Addr
}
// Stream defines a transport stream base on
// Stream defines a transport_api stream base on
// Conn.OpenStream or Conn.AcceptStream
type Stream interface {
Write(buf []byte) (int, error)