feature: use tproxy instead of tunnel vpn

This commit is contained in:
ICKelin
2021-05-05 15:09:56 +08:00
parent 54e4b0f24a
commit f122d9d791
9 changed files with 393 additions and 338 deletions

View File

@@ -9,7 +9,6 @@ import (
"net"
"time"
"github.com/ICKelin/opennotr/pkg/logs"
"github.com/ICKelin/opennotr/pkg/proto"
"github.com/hashicorp/yamux"
)
@@ -163,14 +162,14 @@ func (c *Client) udpProxy(stream *yamux.Stream, p *proto.ProxyProtocol) {
for {
_, err := io.ReadFull(stream, hdr)
if err != nil {
logs.Error("read stream fail %v", err)
log.Println("read stream fail: ", err)
break
}
nlen := binary.BigEndian.Uint16(hdr)
buf := make([]byte, nlen)
_, err = io.ReadFull(stream, buf)
if err != nil {
logs.Error("read stream body fail: %v", err)
log.Println("read stream body fail: ", err)
break
}

View File

@@ -15,11 +15,11 @@ type Config struct {
}
type ServerConfig struct {
ListenAddr string `yaml:"listen"`
AuthKey string `yaml:"authKey"`
Domain string `yaml:"domain"`
TCPProxyListen string `yaml:"tcplisten"`
UDPProxyListen string `yaml:"udplisten"`
ListenAddr string `yaml:"listen"`
AuthKey string `yaml:"authKey"`
Domain string `yaml:"domain"`
TCPForwardListen string `yaml:"tcplisten"`
UDPForwardListen string `yaml:"udplisten"`
}
type DHCPConfig struct {

View File

@@ -1,9 +1,14 @@
package core
import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
"syscall"
"github.com/ICKelin/opennotr/pkg/logs"
"github.com/ICKelin/opennotr/pkg/proto"
)
func checksum_add(buf []byte, seed uint32) uint32 {
@@ -19,7 +24,7 @@ func checksum_add(buf []byte, seed uint32) uint32 {
return sum
}
func checksum_warp(seed uint32) uint16 {
func checksum_wrap(seed uint32) uint16 {
sum := seed
for sum > 0xffff {
sum = (sum >> 16) + (sum & 0xffff)
@@ -34,7 +39,7 @@ func checksum_warp(seed uint32) uint16 {
}
func CheckSum(buf []byte) uint16 {
return checksum_warp(checksum_add(buf, 0))
return checksum_wrap(checksum_add(buf, 0))
}
func sendUDPViaRaw(fd int, src, dst *net.UDPAddr, payload []byte) error {
@@ -56,7 +61,7 @@ func sendUDPViaRaw(fd int, src, dst *net.UDPAddr, payload []byte) error {
data[25] = byte(ulen)
copy(data[28:], payload)
uc := checksum_warp(checksum_add(data, uint32(ulen)))
uc := checksum_wrap(checksum_add(data, uint32(ulen)))
data[26] = byte(uc >> 8)
data[27] = byte(uc)
@@ -73,3 +78,28 @@ func sendUDPViaRaw(fd int, src, dst *net.UDPAddr, payload []byte) error {
copy(addr.Addr[:], data[16:20])
return syscall.Sendto(fd, data, 0, &addr)
}
func encode(raw []byte) []byte {
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(len(raw)))
buf = append(buf, raw...)
return buf
}
func encodeProxyProtocol(protocol, sip, sport, dip, dport string) []byte {
proxyProtocol := &proto.ProxyProtocol{
Protocol: protocol,
SrcIP: sip,
SrcPort: sport,
DstIP: dip,
DstPort: dport,
}
body, err := json.Marshal(proxyProtocol)
if err != nil {
logs.Error("json marshal fail: %v", err)
}
bytes := encode(body)
return bytes
}

View File

@@ -1,20 +1,13 @@
package core
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/ICKelin/opennotr/opennotrd/plugin"
"github.com/ICKelin/opennotr/pkg/logs"
@@ -22,20 +15,6 @@ import (
"github.com/hashicorp/yamux"
)
type Session struct {
conn *yamux.Session
clientAddr string
rxbytes uint64
txbytes uint64
}
func newSession(conn *yamux.Session, clientAddr string) *Session {
return &Session{
conn: conn,
clientAddr: clientAddr,
}
}
type Server struct {
cfg ServerConfig
addr string
@@ -52,10 +31,8 @@ type Server struct {
// resolver writes domains to etcd and it will be used by coredns
resolver *Resolver
// sess store client connect wraper
// key: client virtual ip(vip)
// value: *Session
sess sync.Map
// sess manager is the model of client session
sessMgr *SessionManager
}
func NewServer(cfg ServerConfig,
@@ -70,6 +47,7 @@ func NewServer(cfg ServerConfig,
dhcp: dhcp,
pluginMgr: plugin.DefaultPluginManager(),
resolver: resolver,
sessMgr: GetSessionManager(),
}
}
@@ -79,9 +57,6 @@ func (s *Server) ListenAndServe() error {
return err
}
go s.tproxyTCP(s.cfg.TCPProxyListen)
go s.tproxyUDP(s.cfg.UDPProxyListen)
for {
conn, err := listener.Accept()
if err != nil {
@@ -182,8 +157,8 @@ func (s *Server) onConn(conn net.Conn) {
}
sess := newSession(mux, conn.RemoteAddr().String())
s.sess.Store(vip, sess)
defer s.sess.Delete(vip)
s.sessMgr.AddSession(vip, sess)
defer s.sessMgr.DeleteSession(vip)
rttInterval := time.NewTicker(time.Second * 10)
for {
@@ -202,298 +177,6 @@ func (s *Server) onConn(conn net.Conn) {
}
}
func (s *Server) tproxyTCP(listenAddr string) error {
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
return err
}
defer listener.Close()
// set socket with ip transparent option
file, err := listener.(*net.TCPListener).File()
if err != nil {
return err
}
defer file.Close()
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_TRANSPARENT, 1)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
logs.Error("accept fail: %v", err)
break
}
go s.tcpProxy(conn)
}
return nil
}
func (s *Server) tcpProxy(conn net.Conn) {
dip, dport, _ := net.SplitHostPort(conn.LocalAddr().String())
sip, sport, _ := net.SplitHostPort(conn.RemoteAddr().String())
val, ok := s.sess.Load(dip)
if !ok {
logs.Error("no route to host: %s", dip)
conn.Close()
return
}
stream, err := val.(*Session).conn.OpenStream()
if err != nil {
logs.Error("open stream fail: %v", err)
conn.Close()
return
}
buf := make([]byte, 2)
// write proxy protocol packet
proxyProtocol := &proto.ProxyProtocol{
Protocol: "tcp",
SrcIP: sip,
SrcPort: sport,
// DstIP: dip,
DstIP: "127.0.0.1", // may change to client setting
DstPort: dport,
}
body, err := json.Marshal(proxyProtocol)
if err != nil {
logs.Error("json marshal fail: %v", err)
conn.Close()
stream.Close()
return
}
binary.BigEndian.PutUint16(buf, uint16(len(body)))
buf = append(buf, body...)
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(buf)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
conn.Close()
stream.Close()
return
}
go func() {
defer stream.Close()
defer conn.Close()
io.Copy(stream, conn)
}()
go func() {
defer stream.Close()
defer conn.Close()
io.Copy(conn, stream)
}()
}
func (s *Server) tproxyUDP(listenAddr string) error {
laddr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
logs.Error("resolve udp fail: %v", err)
return err
}
lconn, err := net.ListenUDP("udp", laddr)
if err != nil {
return err
}
// set socket with ip transparent option
file, err := lconn.File()
if err != nil {
return err
}
defer file.Close()
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_TRANSPARENT, 1)
if err != nil {
return err
}
// set socket with recv origin dst option
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_RECVORIGDSTADDR, 1)
if err != nil {
return err
}
rawfd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil || rawfd < 0 {
logs.Error("call socket fail: %v", err)
return err
}
defer syscall.Close(rawfd)
err = syscall.SetsockoptInt(rawfd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return err
}
streams := sync.Map{}
defer func() {
streams.Range(func(k, v interface{}) bool {
v.(*yamux.Stream).Close()
return true
})
}()
buf := make([]byte, 64*1024)
oob := make([]byte, 1024)
for {
nr, oobn, _, raddr, err := lconn.ReadMsgUDP(buf, oob)
if err != nil {
logs.Error("read from udp fail: %v", err)
break
}
origindst, err := getOriginDst(oob[:oobn])
if err != nil {
logs.Error("%v", err)
continue
}
dip, dport, _ := net.SplitHostPort(origindst.String())
sip, sport, _ := net.SplitHostPort(raddr.String())
key := fmt.Sprintf("%s:%s:%s:%s", sip, sport, dip, dport)
val, ok := streams.Load(key)
if !ok {
val, ok := s.sess.Load(dip)
if !ok {
logs.Error("no route to host: %s", dip)
continue
}
stream, err := val.(*Session).conn.OpenStream()
if err != nil {
logs.Error("open stream fail: %v", err)
continue
}
streams.Store(key, stream)
// write proxy protocol
proxyProtocol := &proto.ProxyProtocol{
Protocol: "udp",
SrcIP: sip,
SrcPort: sport,
// DstIP: dip,
DstIP: "127.0.0.1", // may change to client setting
DstPort: dport,
}
body, err := json.Marshal(proxyProtocol)
if err != nil {
logs.Error("json marshal fail: %v", err)
continue
}
bytes := encode(body)
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(bytes)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
continue
}
go s.udpProxy(stream, rawfd, origindst, raddr)
}
val, ok = streams.Load(key)
if !ok {
logs.Error("get stream for %s fail", key)
continue
}
stream := val.(*yamux.Stream)
bytes := encode(buf[:nr])
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(bytes)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
}
}
return nil
}
func (s *Server) udpProxy(stream *yamux.Stream, tofd int, fromaddr, toaddr *net.UDPAddr) {
hdr := make([]byte, 2)
for {
_, err := io.ReadFull(stream, hdr)
if err != nil {
logs.Error("read stream fail %v", err)
break
}
nlen := binary.BigEndian.Uint16(hdr)
buf := make([]byte, nlen)
_, err = io.ReadFull(stream, buf)
if err != nil {
logs.Error("read stream body fail: %v", err)
break
}
err = sendUDPViaRaw(tofd, fromaddr, toaddr, buf)
if err != nil {
logs.Error("send via raw socket fail: %v", err)
}
}
}
func encode(raw []byte) []byte {
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(len(raw)))
buf = append(buf, raw...)
return buf
}
func getOriginDst(hdr []byte) (*net.UDPAddr, error) {
msgs, err := syscall.ParseSocketControlMessage(hdr)
if err != nil {
return nil, err
}
var origindst *net.UDPAddr
for _, msg := range msgs {
if msg.Header.Level == syscall.SOL_IP &&
msg.Header.Type == syscall.IP_RECVORIGDSTADDR {
originDstRaw := &syscall.RawSockaddrInet4{}
err := binary.Read(bytes.NewReader(msg.Data), binary.LittleEndian, originDstRaw)
if err != nil {
logs.Error("read origin dst fail: %v", err)
continue
}
// only support for ipv4
if originDstRaw.Family == syscall.AF_INET {
pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(originDstRaw))
p := (*[2]byte)(unsafe.Pointer(&pp.Port))
origindst = &net.UDPAddr{
IP: net.IPv4(pp.Addr[0], pp.Addr[1], pp.Addr[2], pp.Addr[3]),
Port: int(p[0])<<8 + int(p[1]),
}
}
}
}
if origindst == nil {
return nil, fmt.Errorf("get origin dst fail")
}
return origindst, nil
}
// randomDomain generate random domain for client
func randomDomain(num int64) string {
const ALPHABET = "123456789abcdefghijklmnopqrstuvwxyz"

47
opennotrd/core/session.go Normal file
View File

@@ -0,0 +1,47 @@
package core
import (
"sync"
"github.com/hashicorp/yamux"
)
var sessionMgr = &SessionManager{}
type SessionManager struct {
sessions sync.Map
}
func GetSessionManager() *SessionManager {
return sessionMgr
}
type Session struct {
conn *yamux.Session
clientAddr string
rxbytes uint64
txbytes uint64
}
func newSession(conn *yamux.Session, clientAddr string) *Session {
return &Session{
conn: conn,
clientAddr: clientAddr,
}
}
func (mgr *SessionManager) AddSession(vip string, sess *Session) {
mgr.sessions.Store(vip, sess)
}
func (mgr *SessionManager) GetSession(vip string) *Session {
val, ok := mgr.sessions.Load(vip)
if !ok {
return nil
}
return val.(*Session)
}
func (mgr *SessionManager) DeleteSession(vip string) {
mgr.sessions.Delete(vip)
}

View File

@@ -0,0 +1,94 @@
package core
import (
"io"
"net"
"syscall"
"time"
"github.com/ICKelin/opennotr/pkg/logs"
)
type TCPForward struct {
sessMgr *SessionManager
}
func NewTCPForward() *TCPForward {
return &TCPForward{
sessMgr: GetSessionManager(),
}
}
func (f *TCPForward) ListenAndServe(listenAddr string) error {
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
return err
}
defer listener.Close()
// set socket with ip transparent option
file, err := listener.(*net.TCPListener).File()
if err != nil {
return err
}
defer file.Close()
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_TRANSPARENT, 1)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
logs.Error("accept fail: %v", err)
break
}
go f.forwardTCP(conn)
}
return nil
}
func (f *TCPForward) forwardTCP(conn net.Conn) {
dip, dport, _ := net.SplitHostPort(conn.LocalAddr().String())
sip, sport, _ := net.SplitHostPort(conn.RemoteAddr().String())
sess := f.sessMgr.GetSession(dip)
if sess == nil {
logs.Error("no route to host: %s", dip)
conn.Close()
return
}
stream, err := sess.conn.OpenStream()
if err != nil {
logs.Error("open stream fail: %v", err)
conn.Close()
return
}
bytes := encodeProxyProtocol("tcp", sip, sport, "127.0.0.1", dport)
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(bytes)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
conn.Close()
stream.Close()
return
}
go func() {
defer stream.Close()
defer conn.Close()
io.Copy(stream, conn)
}()
go func() {
defer stream.Close()
defer conn.Close()
io.Copy(conn, stream)
}()
}

View File

@@ -0,0 +1,201 @@
package core
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"syscall"
"time"
"unsafe"
"github.com/ICKelin/opennotr/pkg/logs"
"github.com/hashicorp/yamux"
)
type UDPForward struct {
sessMgr *SessionManager
}
func NewUDPForward() *UDPForward {
return &UDPForward{
sessMgr: GetSessionManager(),
}
}
func (f *UDPForward) ListenAndServe(listenAddr string) error {
laddr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
logs.Error("resolve udp fail: %v", err)
return err
}
lconn, err := net.ListenUDP("udp", laddr)
if err != nil {
return err
}
// set socket with ip transparent option
file, err := lconn.File()
if err != nil {
return err
}
defer file.Close()
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_TRANSPARENT, 1)
if err != nil {
return err
}
// set socket with recv origin dst option
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_IP, syscall.IP_RECVORIGDSTADDR, 1)
if err != nil {
return err
}
// create raw socket fd
// we use rawsocket to send udp packet back to client.
rawfd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil || rawfd < 0 {
logs.Error("call socket fail: %v", err)
return err
}
defer syscall.Close(rawfd)
err = syscall.SetsockoptInt(rawfd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return err
}
streams := sync.Map{}
defer func() {
streams.Range(func(k, v interface{}) bool {
v.(*yamux.Stream).Close()
return true
})
}()
buf := make([]byte, 64*1024)
oob := make([]byte, 1024)
for {
nr, oobn, _, raddr, err := lconn.ReadMsgUDP(buf, oob)
if err != nil {
logs.Error("read from udp fail: %v", err)
break
}
origindst, err := getOriginDst(oob[:oobn])
if err != nil {
logs.Error("%v", err)
continue
}
dip, dport, _ := net.SplitHostPort(origindst.String())
sip, sport, _ := net.SplitHostPort(raddr.String())
key := fmt.Sprintf("%s:%s:%s:%s", sip, sport, dip, dport)
val, ok := streams.Load(key)
if !ok {
sess := f.sessMgr.GetSession(dip)
if sess == nil {
logs.Error("no route to host: %s", dip)
continue
}
stream, err := sess.conn.OpenStream()
if err != nil {
logs.Error("open stream fail: %v", err)
continue
}
streams.Store(key, stream)
bytes := encodeProxyProtocol("udp", sip, sport, "127.0.0.1", dport)
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(bytes)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
continue
}
go f.forwardUDP(stream, rawfd, origindst, raddr)
}
val, ok = streams.Load(key)
if !ok {
logs.Error("get stream for %s fail", key)
continue
}
stream := val.(*yamux.Stream)
bytes := encode(buf[:nr])
stream.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = stream.Write(bytes)
stream.SetWriteDeadline(time.Time{})
if err != nil {
logs.Error("stream write fail: %v", err)
}
}
return nil
}
func (f *UDPForward) forwardUDP(stream *yamux.Stream, tofd int, fromaddr, toaddr *net.UDPAddr) {
hdr := make([]byte, 2)
for {
_, err := io.ReadFull(stream, hdr)
if err != nil {
logs.Error("read stream fail %v", err)
break
}
nlen := binary.BigEndian.Uint16(hdr)
buf := make([]byte, nlen)
_, err = io.ReadFull(stream, buf)
if err != nil {
logs.Error("read stream body fail: %v", err)
break
}
err = sendUDPViaRaw(tofd, fromaddr, toaddr, buf)
if err != nil {
logs.Error("send via raw socket fail: %v", err)
}
}
}
func getOriginDst(hdr []byte) (*net.UDPAddr, error) {
msgs, err := syscall.ParseSocketControlMessage(hdr)
if err != nil {
return nil, err
}
var origindst *net.UDPAddr
for _, msg := range msgs {
if msg.Header.Level == syscall.SOL_IP &&
msg.Header.Type == syscall.IP_RECVORIGDSTADDR {
originDstRaw := &syscall.RawSockaddrInet4{}
err := binary.Read(bytes.NewReader(msg.Data), binary.LittleEndian, originDstRaw)
if err != nil {
logs.Error("read origin dst fail: %v", err)
continue
}
// only support for ipv4
if originDstRaw.Family == syscall.AF_INET {
pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(originDstRaw))
p := (*[2]byte)(unsafe.Pointer(&pp.Port))
origindst = &net.UDPAddr{
IP: net.IPv4(pp.Addr[0], pp.Addr[1], pp.Addr[2], pp.Addr[3]),
Port: int(p[0])<<8 + int(p[1]),
}
}
}
}
if origindst == nil {
return nil, fmt.Errorf("get origin dst fail")
}
return origindst, nil
}

View File

@@ -96,7 +96,6 @@ func (p *UDPProxy) doProxy(lis *net.UDPConn, item *plugin.PluginMeta) {
break
}
logs.Debug("create new udp connection to %s", item.To)
backendConn, err := net.DialUDP("udp", nil, backendAddr)
if err != nil {
logs.Error("dial udp fail: %v", err)
@@ -111,7 +110,6 @@ func (p *UDPProxy) doProxy(lis *net.UDPConn, item *plugin.PluginMeta) {
val, _ = sess.Load(key)
// read from $from address and write to $to address
val.(*net.UDPConn).Write(buf[:nr])
logs.Debug("write to backend %d bytes", nr)
}
}
@@ -125,7 +123,6 @@ func (p *UDPProxy) udpCopy(dst, src *net.UDPConn, toaddr *net.UDPAddr) {
break
}
logs.Debug("write back to client %d bytes", nr)
_, err = dst.WriteToUDP(buf[:nr], toaddr)
if err != nil {
logs.Error("write to udp fail: %v", err)

View File

@@ -50,8 +50,12 @@ func Run() {
}
}
// run tunnel tcp server, it will cause tcp over tcp problems
// it may changed to udp later.
// up local tcp,udp service
// we use tproxy to route traffic to the tcp port and udp port here.
go core.NewTCPForward().ListenAndServe(cfg.ServerConfig.TCPForwardListen)
go core.NewUDPForward().ListenAndServe(cfg.ServerConfig.UDPForwardListen)
// server provides tcp server for opennotr client
s := core.NewServer(cfg.ServerConfig, dhcp, resolver)
fmt.Println(s.ListenAndServe())
}