commit 3a259f314c19d19ce5a90928a2c8d20b49a2e81e Author: Alexander Date: Wed Jan 16 16:14:45 2019 -0600 first commit diff --git a/config.go b/config.go new file mode 100644 index 0000000..7633eb9 --- /dev/null +++ b/config.go @@ -0,0 +1,10 @@ +package holepunch + +type Config struct { + RelayIP string + RelayPort string + ListenAddr string + LocalPort string + LocalRelayIP string + UID string +} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..397cbc8 --- /dev/null +++ b/constants.go @@ -0,0 +1,13 @@ +package holepunch + +const ( + CREATE_SESSION = 0x3C //60 + CREATE_SESSION_RESPONSE = 0x3D //61 + CONN_REQUEST = 0x3E //62 + CONN_REQUEST_RESPONSE = 0x3F //63 + INIT_HOLEPUNCH = 0x40 //64 + INIT_CONN = 0x41 + ETX = 3 + NAK = 21 + ACK = 6 +) diff --git a/holepunch.go b/holepunch.go new file mode 100644 index 0000000..e401582 --- /dev/null +++ b/holepunch.go @@ -0,0 +1,88 @@ +package holepunch + +import ( + "bufio" + "fmt" + "net" + "strconv" + "sync" + + "github.com/libp2p/go-reuseport" +) + +type Holepunch struct { + raddr *net.TCPAddr //remote relay server address + laddr *net.TCPAddr //listen address for peers + wg *sync.WaitGroup + uID string + peer string + connRequestChan chan string + stopReqChan chan int + initHolepunchChan chan peerInfo + stopListenChan chan int + connReadyChan chan net.Conn + closeChan chan int + reader *bufio.Reader + writer *bufio.Writer +} + +func NewHolepunch(config Config) (h *Holepunch, err error) { + relayAddr := config.RelayIP + ":" + config.RelayPort + + raddr, err := net.ResolveTCPAddr("tcp", relayAddr) + laddr, err := net.ResolveTCPAddr("tcp", config.ListenAddr+":"+config.LocalPort) + + h = &Holepunch{ + raddr: raddr, + laddr: laddr, + wg: new(sync.WaitGroup), + uID: config.UID, + connRequestChan: make(chan string, 1), + stopReqChan: make(chan int), + initHolepunchChan: make(chan peerInfo), + stopListenChan: make(chan int), + connReadyChan: make(chan net.Conn), + closeChan: make(chan int), + } + + return +} + +func (h *Holepunch) Connect(peer string) (p2pConn net.Conn, err error) { + //immediately start listening + listenLoop(h.wg, h.laddr, h.stopListenChan, h.connReadyChan, h.closeChan) + + if peer != "" { + h.connRequestChan <- peer + } + + //connect to relayserver + laddr := h.laddr.IP.String() + ":" + strconv.Itoa(h.laddr.Port) + raddr := h.raddr.IP.String() + ":" + strconv.Itoa(h.raddr.Port) + remoteConn, err := reuseport.Dial("tcp", laddr, raddr) + + if err != nil { + fmt.Println(err) + return + } + h.reader = bufio.NewReader(remoteConn) + h.writer = bufio.NewWriter(remoteConn) + + h.writer.Write(createSessionPacket(h.uID, h.laddr.IP.String(), strconv.Itoa(h.laddr.Port))) + h.writer.Flush() + + //start go routines + readLoop(h.wg, h.reader, h.stopReqChan, h.initHolepunchChan, h.closeChan) + connRequestLoop(h.wg, h.writer, h.connRequestChan, h.stopReqChan, h.closeChan) + initHolepunch(h.wg, laddr, h.initHolepunchChan, h.stopListenChan, h.connReadyChan, h.closeChan) + + p2pConn = <-h.connReadyChan + + teardown(h.wg, h.closeChan) + return +} + +func teardown(wg *sync.WaitGroup, closeChan chan int) { + close(closeChan) + wg.Wait() +} diff --git a/routines.go b/routines.go new file mode 100644 index 0000000..3e570e4 --- /dev/null +++ b/routines.go @@ -0,0 +1,190 @@ +package holepunch + +import ( + "bufio" + "fmt" + "net" + "strconv" + "sync" + "time" + + "github.com/libp2p/go-reuseport" +) + +func wgDone(wg *sync.WaitGroup, name string) { + fmt.Println("Exit " + name) + wg.Done() +} + +func listenLoop(wg *sync.WaitGroup, addr *net.TCPAddr, doneChan chan int, connReadyChan chan net.Conn, closeChan chan int) { + + l, err := reuseport.Listen("tcp", addr.IP.String()+":"+strconv.Itoa(addr.Port)) + if err != nil { + fmt.Println("Could not listen\n", err) + return + } + + fmt.Println("Listening for Peers at: ", addr) + wg.Add(1) + acceptedPeerConn := listen(l) + + go func() { + defer wgDone(wg, "Listen Loop") + + select { + case <-closeChan: + return + case conn := <-acceptedPeerConn: + doneChan <- 1 + connReadyChan <- conn + } + + }() + +} + +func listen(listener net.Listener) chan net.Conn { + result := make(chan net.Conn) + + go func(connReady chan net.Conn) { + + conn, err := listener.Accept() + + if err != nil { + fmt.Println("Error accepting....\nExiting listen loop") + return + } + + connReady <- conn + + }(result) + + return result +} + +func readLoop(wg *sync.WaitGroup, reader *bufio.Reader, stopRequestChan chan int, initHolepunchChan chan peerInfo, closeChan chan int) { + wg.Add(1) + + go func() { + defer wgDone(wg, "Read Loop") + packetReady := packetAvailable(reader) + + for { + select { + case <-closeChan: + return + case packet := <-packetReady: + opType := packet[0] + + switch opType { + case CONN_REQUEST_RESPONSE: + //check if the relay server knows about the peer + if _, ok := parseConnRequestResponse(packet); ok { + stopRequestChan <- 1 + } + case INIT_HOLEPUNCH: + initHolepunchChan <- parseInitHolePunchMessage(packet) + } + } + + } + }() +} + +func packetAvailable(reader *bufio.Reader) chan []byte { + result := make(chan []byte) + + go func(pkt chan []byte) { + for { + msg, err := reader.ReadString(ETX) + + if err != nil { + fmt.Println("Error reading") + return + } + + packet := []byte(msg) + pkt <- packet + } + }(result) + + return result +} + +func connRequestLoop(wg *sync.WaitGroup, writer *bufio.Writer, connRequestChan chan string, stopRequestChan chan int, closeChan chan int) { + wg.Add(1) + + ticker := time.NewTicker(time.Duration(3) * time.Second) + var req string + + go func() { + defer wgDone(wg, "Conn Request Loop") + + for { + select { + case <-closeChan: + return + case <-stopRequestChan: + fmt.Println("Stopping: connectRequestLoop") + ticker.Stop() + case request := <-connRequestChan: + req = request + writer.Write(createConnRequestPacket(req)) + writer.Flush() + + case <-ticker.C: + if req != "" { + writer.Write(createConnRequestPacket(req)) + writer.Flush() + } + } + } + }() +} + +func initHolepunch(wg *sync.WaitGroup, laddr string, initHolepunchChan chan peerInfo, stopListenChan chan int, connReadyChan chan net.Conn, closeChan chan int) { + wg.Add(1) + + ticker := time.NewTicker(time.Duration(2) * time.Second) + var peer *peerInfo + + go func() { + defer wgDone(wg, "Init Holepunch Loop") + + for { + select { + case <-closeChan: + return + case <-stopListenChan: + ticker.Stop() + case peerInf := <-initHolepunchChan: + peer = &peerInf + conn, err := reuseport.Dial("tcp", laddr, peer.String()) + + if err != nil { + fmt.Println(err) + } else { + fmt.Println("we got a connection ", conn) + ticker.Stop() + connReadyChan <- conn + } + + case <-ticker.C: + + if peer != nil { + conn, err := reuseport.Dial("tcp", laddr, peer.String()) + + if err != nil { + fmt.Println(err) + } else { + fmt.Println("we got a connection ", conn) + ticker.Stop() + connReadyChan <- conn + } + } + + } + } + + }() +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..9c5bb85 --- /dev/null +++ b/utils.go @@ -0,0 +1,213 @@ +package holepunch + +import ( + "bytes" + "encoding/binary" + "fmt" + "net" + "strconv" + "strings" +) + +type NoIPv4FoundError struct{} + +type peerInfo struct { + ip net.IP + port uint16 + isIPv6 bool +} + +func (p *peerInfo) portToString() string { + return strconv.FormatInt(int64(p.port), 10) +} + +func (p peerInfo) String() string { + if p.isIPv6 { + return "[" + p.ip.String() + "]" + ":" + p.portToString() + } + + return p.ip.String() + ":" + p.portToString() +} + +func (e NoIPv4FoundError) Error() string { + return "No IPv4 Interface found" +} + +func addressStringToIP(address string) net.IP { + split := strings.Split(address, "/") + ipSlice := strings.Split(split[0], ".") + + parts := make([]byte, 4) + + for i := range ipSlice { + part, _ := strconv.ParseInt(ipSlice[i], 10, 16) + parts[i] = byte(part) + } + + ip := net.IPv4(parts[0], parts[1], parts[2], parts[3]) + + return ip + +} + +func getMyIpv4Addr() (net.IP, error) { + ifaces, _ := net.Interfaces() + + for _, iface := range ifaces { + + addr, _ := iface.Addrs() + + for _, a := range addr { + if strings.Contains(a.String(), ":") { //must be an ipv6 + continue + } + + ip := addressStringToIP(a.String()) + + if ip.IsLoopback() { + continue + } + + return ip, nil + + } + } + e := NoIPv4FoundError{} + return nil, e +} + +func ipToBytes(ip string) []byte { + split := strings.Split(ip, ".") + + parts := make([]byte, 4) + for i := range split { + part, _ := strconv.ParseInt(split[i], 10, 16) + + parts[i] = byte(part) + } + + return parts + +} + +func fromIntToBytes(num uint16) ([]byte, error) { + buffer := new(bytes.Buffer) + + err := binary.Write(buffer, binary.BigEndian, num) + + if err != nil { + return nil, err + } + + return buffer.Bytes(), nil +} + +// 1 byte length prefixed & zero terminated 4 bytes 16 bytes +// [ rpc ][ uid ] [ ip ] [ port ] +// +func createSessionPacket(uid, address, port string) []byte { + packet := make([]byte, 0) + p, _ := strconv.ParseInt(port, 10, 16) + + packet = append(packet, byte(CREATE_SESSION)) + //uid + packet = append(packet, byte(len(uid))) + packet = append(packet, []byte(uid)...) + packet = append(packet, byte(0)) + //ip + packet = append(packet, ipToBytes(address)...) + packet = append(packet, byte(0)) + //port + portBytes, _ := fromIntToBytes(uint16(p)) + packet = append(packet, portBytes...) + packet = append(packet, ETX) + + return packet +} + +func createConnRequestPacket(peer string) []byte { + packet := make([]byte, 0) + + packet = append(packet, byte(CONN_REQUEST)) + packet = append(packet, byte(len(peer))) + packet = append(packet, []byte(peer)...) + packet = append(packet, ETX) + + return packet +} + +func parseConnRequestResponse(data []byte) (string, bool) { + var ok bool + + if data[1] == NAK { + ok = false + } else { + ok = true + } + + peerLength := data[2] + peer := string(data[2 : 2+peerLength]) + + return peer, ok + +} + +func parseInitHolePunchMessage(data []byte) peerInfo { + var isIPv6 bool + var ip string + var startOfPort int + + if data[1] == 1 { + isIPv6 = true + ip = parseIPv6(data[2:10]) + startOfPort = 10 + + } else { + isIPv6 = false + ip = parseIPv4(data[2:6]) + startOfPort = 6 + + } + + port := binary.BigEndian.Uint16(data[startOfPort : startOfPort+2]) + + peer := peerInfo{ + port: port, + ip: net.ParseIP(ip), + isIPv6: isIPv6, + } + + return peer +} + +func parseIPv4(ip []byte) string { + + buf := new(bytes.Buffer) + + for i := range ip { + bt := int(ip[i]) + buf.WriteString(strconv.Itoa(bt)) + + if i < 3 { + buf.WriteString(".") + } + } + + return buf.String() +} + +func parseIPv6(ip []byte) string { + buf := new(bytes.Buffer) + + for i := 0; i < len(ip); i += 2 { + + part := binary.BigEndian.Uint16(ip[i : i+2]) + buf.WriteString(fmt.Sprintf("%X", part)) + + if i != 14 { + buf.WriteString(":") + } + } + + return buf.String() +}