diff --git a/README.md b/README.md index aaff3fe..69546c9 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,14 @@ A Full Mesh Layer2 VPN based on wireguard-go [![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.1-4baaaa.svg)](code_of_conduct.md) OSPF can find best route based on it's cost. -But sometimes the lentancy are different in the packet goes and back. -I'am thinking, is it possible to find the best route based on the **single-way latency**? +But sometimes the latency are different in the packet goes and back. +I'm thinking, is it possible to find the best route based on the **single-way latency**? For example, I have two routes A and B at node N1, both of them can reach my node N2. A goes fast, but B backs fast. My VPN can automatically send packet through route A at node N1, and the packet backs from route B. -Here is the solution. This VPN `Etherguard` can collect all the single-way lentancy from all nodes, and calculate the best route using [Floyd–Warshall algorithm](https://en.wikipedia.org/wiki/Floyd–Warshall_algorithm). +Here is the solution. This VPN `Etherguard` can collect all the single-way latency from all nodes, and calculate the best route using [Floyd–Warshall algorithm](https://en.wikipedia.org/wiki/Floyd–Warshall_algorithm). -Wirried about the clock not match so that the measure result are not correct? It doesn't matter, here is the proof (Mandarin): [https://www.kskb.eu.org/2021/08/rootless-routerpart-3-etherguard.html](https://www.kskb.eu.org/2021/08/rootless-routerpart-3-etherguard.html) +Worried about the clock not match so that the measure result are not correct? It doesn't matter, here is the proof (Mandarin): [https://www.kskb.eu.org/2021/08/rootless-routerpart-3-etherguard.html](https://www.kskb.eu.org/2021/08/rootless-routerpart-3-etherguard.html) ## Usage @@ -40,7 +40,7 @@ Usage of ./etherguard-go: ## Mode -1. Static Mode: Similar to origional wireguard. [Introduction](example_config/static_mode/README.md). +1. Static Mode: Similar to original wireguard. [Introduction](example_config/static_mode/README.md). 2. Super Mode: Inspired by[n2n](https://github.com/ntop/n2n). [Introduction](example_config/super_mode/README.md). 3. P2P Mode: Inspired by[tinc](https://github.com/gsliepen/tinc). [Introduction](example_config/p2p_mode/README.md). @@ -53,8 +53,14 @@ Usage of ./etherguard-go: 1. `dummy`: Dymmy interface, drop any packet received. You need this if you want to setup it as a relay node. 2. `stdio`: Wrtie to stdout,read from stdin. Paramaters: `macaddrprefix`,`l2headermode` - 3. `udpsock`: Write to an udp socket, and read from an net assress. - Paramaters: `macaddrprefix`,`recvaddr`,`sendaddr` + 3. `udpsock`: Read/Write the raw packet to an udp socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `tcpsock`: Read/Write the raw packet to a tcp socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `unixsock`: Read/Write the raw packet to an unix socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `fd`: Read/Write the raw packet to specific file descriptor. + Paramaters: None. But require environment variable `EG_FD_RX` and `EG_FD_TX` 4. `vpp`: Integrate to VPP by libmemif. Paramaters: `name`,`vppifaceid`,`vppbridgeid`,`macaddrprefix`,`mtu` 5. `tap`: Read/Write to tap device from linux. @@ -65,9 +71,9 @@ Usage of ./etherguard-go: 5. `macaddrprefix`: Mac address Prefix. Real Mac address=[Prefix]:[NodeID]. If you fill full mac address here, NodeID will be ignored. - 6. `recvaddr`: Listen address for `udpsock` mode - 7. `sendaddr`: Packet send address for `udpsock` mode - 8. `l2headermode`: For debug usage, `stdio` and `udpsock` mode only + 6. `recvaddr`: Listen address for `XXXsock` mode(server mode) + 7. `sendaddr`: Packet send address for `XXXsock` mode(client mode) + 8. `l2headermode`: For debug usage, for `stdio` mode only 1. `nochg`: Do not change anything. 2. `kbdbg`: Keyboard debug mode. Let me construct Layer 2 header by ascii character only. @@ -137,7 +143,7 @@ make Build Etherguard with VPP integrated. You need libmemif.so installed to run this version. -Install VPP and libemif +Install VPP and libmemif ```bash echo "deb [trusted=yes] https://packagecloud.io/fdio/release/ubuntu focal main" > /etc/apt/sources.list.d/99fd.io.list diff --git a/README_zh.md b/README_zh.md index e3721c2..f5235bc 100644 --- a/README_zh.md +++ b/README_zh.md @@ -59,8 +59,14 @@ Usage of ./etherguard-go-vpp: 1. `dummy`: 收到的封包直接丟棄,也不發出任何封包。作為中繼節點使用 2. `stdio`: 收到的封包丟stdout,stdin進來的資料丟入vpn網路 需要參數: `macaddrprefix`,`l2headermode` - 3. `udpsock`: 收到的封包用udp丟到某個網路位置,監聽port進來的資料丟去vpn網路 - 需要參數: `macaddrprefix`,`recvaddr`,`sendaddr` + 3. `udpsock`: 把VPN網路收到的layer2封包讀寫去一個udp socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `tcpsock`: 把VPN網路收到的layer2封包讀寫去一個tcp socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `unixsock`: 把VPN網路收到的layer2封包讀寫去一個unix socket. + Paramaters: `recvaddr`,`sendaddr` + 3. `fd`: 把VPN網路收到的layer2封包讀寫去一個特定的file descriptor. + Paramaters: 無. 但是使用環境變數 `EG_FD_RX` 和 `EG_FD_TX` 來指定 4. `vpp`: 使用libmemif使vpp加入VPN網路 需要參數: `name`,`vppifaceid`,`vppbridgeid`,`macaddrprefix`,`mtu` 5. `tap`: Linux的tap設備。讓linux加入VPN網路 @@ -70,9 +76,9 @@ Usage of ./etherguard-go-vpp: 4. `vppbridgeid`: VPP 的網橋ID。不使用VPP網橋功能的話填0 5. `macaddrprefix`: MAC地址前綴。真正的 MAC 地址=[前綴]:[NodeID]。 如果這邊填了完整6格長度,就忽略`NodeID` - 6. `recvaddr`: 僅限`udpsock`生效。收到的東西丟去 VPN 網路 - 7. `sendaddr`: 僅限`udpsock`生效。VPN網路收到的東西丟去這個 udp 地址 - 8. `l2headermode`: 僅限 `stdio` 和 `udpsock` 生效。debug用途,有三種模式: + 6. `recvaddr`: 僅限`XXXsock`生效。listen地址,收到的東西丟去 VPN 網路 + 7. `sendaddr`: 僅限`XXXsock`生效。連線地址,VPN網路收到的東西丟去這個地址 + 8. `l2headermode`: 僅限 `stdio` 生效。debug用途,有三種模式: 1. `nochg`: 從 VPN 網路收到什麼,就往tap裝置發送什麼。不對封包作任何更動 2. `kbdbg`: 鍵盤bebug模式。搭配 `stdio` 模式,讓我 debug 用 因為前 12 byte 會用來做選路判斷,但是只是要debug,構造完整的封包就不是很方便 diff --git a/device/receive.go b/device/receive.go index 780cfa8..79c4225 100644 --- a/device/receive.go +++ b/device/receive.go @@ -463,6 +463,7 @@ func (peer *Peer) RoutineSequentialReceiver() { dst_nodeID = EgHeader.GetDst() elem.packet = elem.packet[:EgHeader.GetPacketLength()+path.EgHeaderLen] // EG header + true packet packet_type = elem.Type + peer.LastPingReceived = time.Now() if device.IsSuperNode { peer.LastPingReceived = time.Now() diff --git a/device/receivesendproc.go b/device/receivesendproc.go index eb83d47..e96bd0e 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -318,9 +318,9 @@ func (device *Device) server_process_Pong(peer *Peer, content path.PongMsg) erro func (device *Device) process_ping(peer *Peer, content path.PingMsg) error { peer.LastPingReceived = time.Now() - peer.Lock() + //peer.Lock() //remove peer.endpoint_trylist - peer.Unlock() + //peer.Unlock() PongMSG := path.PongMsg{ Src_nodeID: content.Src_nodeID, Dst_nodeID: device.ID, @@ -479,7 +479,7 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerM //Peer died, try to switch to this new endpoint for url, _ := range peerinfo.Connurl { thepeer.Lock() - thepeer.endpoint_trylist.Set(url, time.Time{}) //another gorouting will process it + thepeer.endpoint_trylist.LoadOrStore(url, time.Time{}) //another gorouting will process it thepeer.Unlock() send_signal = true } @@ -625,7 +625,7 @@ func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.Boardcas if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) { //Peer died, try to switch to this new endpoint thepeer.Lock() - thepeer.endpoint_trylist.Set(content.ConnURL, time.Time{}) //another gorouting will process it + thepeer.endpoint_trylist.LoadOrStore(content.ConnURL, time.Time{}) //another gorouting will process it thepeer.Unlock() device.event_tryendpoint <- struct{}{} } diff --git a/example_config/super_mode/n1_fd.yaml b/example_config/super_mode/n1_fd.yaml new file mode 100644 index 0000000..a68e741 --- /dev/null +++ b/example_config/super_mode/n1_fd.yaml @@ -0,0 +1,71 @@ +interface: + itype: fd + name: tap1 + vppifaceid: 1 + vppbridgeid: 4242 + macaddrprefix: AA:BB:CC:DD + mtu: 1416 + recvaddr: 127.0.0.1:4001 + sendaddr: 127.0.0.1:5001 + l2headermode: kbdbg +nodeid: 1 +nodename: Node01 +defaultttl: 200 +l2fibtimeout: 3600 +privkey: 6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M= +listenport: 3001 +loglevel: + loglevel: normal + logtransit: true + logcontrol: true + lognormal: true + logntp: true +dynamicroute: + sendpinginterval: 16 + peeralivetimeout: 30 + dupchecktimeout: 40 + conntimeout: 30 + connnexttry: 5 + savenewpeers: true + supernode: + usesupernode: true + pskey: 'iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=' + connurlv4: 127.0.0.1:3000 + pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= + connurlv6: '' + pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= + apiurl: http://127.0.0.1:3000/api + supernodeinfotimeout: 50 + p2p: + usep2p: false + sendpeerinterval: 20 + graphrecalculatesetting: + staticmode: false + jittertolerance: 20 + jittertolerancemultiplier: 1.1 + nodereporttimeout: 40 + recalculatecooldown: 5 + ntpconfig: + usentp: true + maxserveruse: 8 + synctimeinterval: 3600 + ntptimeout: 3 + servers: + - time.google.com + - time1.google.com + - time2.google.com + - time3.google.com + - time4.google.com + - time1.facebook.com + - time2.facebook.com + - time3.facebook.com + - time4.facebook.com + - time5.facebook.com + - time.cloudflare.com + - time.apple.com + - time.asia.apple.com + - time.euro.apple.com + - time.windows.com +nexthoptable: {} +resetconninterval: 86400 +peers: [] diff --git a/example_config/super_mode/n1_test_fd_mode.py b/example_config/super_mode/n1_test_fd_mode.py new file mode 100644 index 0000000..05aef03 --- /dev/null +++ b/example_config/super_mode/n1_test_fd_mode.py @@ -0,0 +1,61 @@ +import os +import sys +import signal +import subprocess + +pr, ew = os.pipe() +er, pw = os.pipe() + +import threading +import time + +bufsize=1500 + +def signal_handler(sig, frame): + print('You pressed Ctrl+C!') + os.close(pr) + os.close(pw) + sys.exit(0) + +def read_loop(fd): + print("Main Start read fd:",fd) + with os.fdopen(fd, 'rb') as fdfile: + while True: + text = fdfile.read() + if len(text) == 0: + print("EOF!!!!!!!!!!!!!!!!!!!!!!!!") + break + print("Main: RECEIVED:",text) + +def write_loop(fd): + with os.fdopen(fd, 'wb') as fdfile: + while True: + print("Main Write fd:",fd) + text = b'\xff\xff\xff\xff\xff\xff\xaa\xaa\xaa\xaa\xaa\xaa' + b'm'*88 + fdfile.write(text) + fdfile.flush() + time.sleep(1) + +tr = threading.Thread(target = read_loop, args=(pr,)) +tr.start() + +tw = threading.Thread(target = write_loop, args=(pw,)) +tw.start() + +os.environ["EG_FD_RX"] = str(er) +os.environ["EG_FD_TX"] = str(ew) + +print(str(er), str(ew)) + +#p = subprocess.Popen('./etherguard-go -config example_config/super_mode/n1.yaml -mode edge'.split(" "),pass_fds=[er,ew]) +p = subprocess.Popen('python3 example_config/super_mode/n1_test_fd_mode2.py'.split(" "),pass_fds=[er,ew]) +os.close(er) +os.close(ew) + +signal.signal(signal.SIGINT, signal_handler) +signal.pause() + +# tr.join() +# tw.join() +# os.close(pr) +# os.close(pw) \ No newline at end of file diff --git a/example_config/super_mode/n1_test_fd_mode2.py b/example_config/super_mode/n1_test_fd_mode2.py new file mode 100644 index 0000000..69ea5bd --- /dev/null +++ b/example_config/super_mode/n1_test_fd_mode2.py @@ -0,0 +1,51 @@ +import os +import sys +import signal +import subprocess + +er = int( os.environ['EG_FD_RX']) +ew = int( os.environ['EG_FD_TX']) + +import threading +import time + +bufsize=1500 + +def signal_handler(sig, frame): + print('You pressed Ctrl+C!') + os.close(er) + os.close(ew) + sys.exit(0) + +def read_loop(fd): + print("Sub: Start read fd:",fd) + with os.fdopen(fd, 'rb') as fdfile: + while True: + text = fdfile.read() + if len(text) == 0: + print("EOF!!!!!!!!!!!!!!!!!!!!!!!!") + break + print("Sub RECEIVED:",text) + +def write_loop(fd): + with os.fdopen(fd, 'wb') as fdfile: + while True: + print("Sub: Write fd:",fd) + text = b'\xff\xff\xff\xff\xff\xff\xaa\xaa\xaa\xaa\xaa\xaa' + b's'*88 + fdfile.write(text) + fdfile.flush() + time.sleep(1) + +tr = threading.Thread(target = read_loop, args=(er,)) +tr.start() + +tw = threading.Thread(target = write_loop, args=(ew,)) +tw.start() + +signal.signal(signal.SIGINT, signal_handler) +signal.pause() + +# tr.join() +# tw.join() +# os.close(er) +# os.close(ew) \ No newline at end of file diff --git a/example_config/super_mode/s1.yaml b/example_config/super_mode/s1.yaml index 2667a98..84a7d89 100644 --- a/example_config/super_mode/s1.yaml +++ b/example_config/super_mode/s1.yaml @@ -25,6 +25,7 @@ nexthoptable: 2: 1: 1 edgetemplate: example_config/super_mode/n1.yaml +usepskforinteredge: true peers: - nodeid: 1 name: Node_01 diff --git a/main_edge.go b/main_edge.go index f9091ba..5f2b3c2 100644 --- a/main_edge.go +++ b/main_edge.go @@ -10,7 +10,6 @@ package main import ( "errors" "fmt" - "net" "os" "os/signal" "strconv" @@ -190,9 +189,13 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) ( case "stdio": thetap, err = tap.CreateStdIOTAP(econfig.Interface, econfig.NodeID) case "udpsock": - lis, _ := net.ResolveUDPAddr("udp", econfig.Interface.RecvAddr) - sen, _ := net.ResolveUDPAddr("udp", econfig.Interface.SendAddr) - thetap, err = tap.CreateUDPSockTAP(econfig.Interface, econfig.NodeID, lis, sen) + thetap, err = tap.CreateUDPSockTAP(econfig.Interface, econfig.NodeID) + case "tcpsock": + thetap, err = tap.CreateSockTAP(econfig.Interface, "tcp", econfig.NodeID) + case "unixsock": + thetap, err = tap.CreateSockTAP(econfig.Interface, "unix", econfig.NodeID) + case "fd": + thetap, err = tap.CreateFdTAP(econfig.Interface, econfig.NodeID) case "vpp": thetap, err = tap.CreateVppTAP(econfig.Interface, econfig.NodeID, econfig.LogLevel.LogLevel) case "tap": diff --git a/main_httpserver.go b/main_httpserver.go index 6a8aeff..eac249a 100644 --- a/main_httpserver.go +++ b/main_httpserver.go @@ -32,16 +32,16 @@ var ( http_NhTableStr []byte http_PeerInfo config.API_Peers - http_PeerID2PubKey map[config.Vertex]string - http_passwords config.Passwords http_StateExpire time.Time http_StateString_tmp []byte - http_PeerState map[string]*PeerState //the state hash reported by peer - http_PeerIPs map[string]*HttpPeerLocalIP - http_PeerLastSeen sync.Map // ID -> time.Time - http_sconfig *config.SuperConfig + http_maps_lock sync.RWMutex + http_PeerID2PubKey map[config.Vertex]string + http_PeerState map[string]*PeerState //the state hash reported by peer + http_PeerIPs map[string]*HttpPeerLocalIP + + http_sconfig *config.SuperConfig http_sconfig_path string http_econfig_tmp *config.EdgeConfig @@ -68,6 +68,7 @@ type HttpPeerInfo struct { type PeerState struct { NhTableState [32]byte PeerInfoState [32]byte + LastSeen time.Time } type client struct { @@ -87,8 +88,8 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo config.API_Peers, Stat PSKey: peerinfo.PSKey, Connurl: make(map[string]int), } - lastSeen, has := http_PeerLastSeen.Load(peerinfo.NodeID) - if has && lastSeen.(time.Time).Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) { + http_maps_lock.RLock() + if http_PeerState[peerinfo.PubKey].LastSeen.Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) { connV4 := http_device4.GetConnurl(peerinfo.NodeID) connV6 := http_device6.GetConnurl(peerinfo.NodeID) api_peerinfo[peerinfo.PubKey].Connurl[connV4] = 4 @@ -107,6 +108,7 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo config.API_Peers, Stat } delete(api_peerinfo[peerinfo.PubKey].Connurl, "") } + http_maps_lock.RUnlock() } api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo) hash_raw := md5.Sum(append(api_peerinfo_str_byte, http_HashSalt...)) @@ -147,6 +149,8 @@ func get_peerinfo(w http.ResponseWriter, r *http.Request) { PubKey := PubKeyA[0] State := StateA[0] NodeID := config.Vertex(NID2) + http_maps_lock.RLock() + defer http_maps_lock.RUnlock() if http_PeerID2PubKey[NodeID] != PubKey { w.WriteHeader(http.StatusNotFound) w.Write([]byte("Not found")) @@ -221,6 +225,8 @@ func get_nhtable(w http.ResponseWriter, r *http.Request) { PubKey := PubKeyA[0] State := StateA[0] NodeID := config.Vertex(NID2) + http_maps_lock.RLock() + defer http_maps_lock.RUnlock() if http_PeerID2PubKey[NodeID] != PubKey { w.WriteHeader(http.StatusNotFound) w.Write([]byte("Not found")) @@ -262,16 +268,15 @@ func get_info(w http.ResponseWriter, r *http.Request) { Edges: http_graph.GetEdges(), Dist: http_graph.GetDtst(), } + http_maps_lock.RLock() for _, peerinfo := range http_sconfig.Peers { - LastSeenStr := "" - if lastseen, has := http_PeerLastSeen.Load(peerinfo.NodeID); has { - LastSeenStr = lastseen.(time.Time).String() - } + LastSeenStr := http_PeerState[peerinfo.PubKey].LastSeen.String() hs.PeerInfo[peerinfo.NodeID] = HttpPeerInfo{ Name: peerinfo.Name, LastSeen: LastSeenStr, } } + http_maps_lock.RUnlock() http_StateExpire = time.Now().Add(5 * time.Second) http_StateString_tmp, _ = json.Marshal(hs) } @@ -340,7 +345,7 @@ func peeradd(w http.ResponseWriter, r *http.Request) { //Waiting for test return } } - if http_sconfig.GraphRecalculateSetting.StaticMode == false { + if http_sconfig.GraphRecalculateSetting.StaticMode == true { NhTableStr := r.Form.Get("nexthoptable") if NhTableStr == "" { w.WriteHeader(http.StatusExpectationFailed) diff --git a/main_super.go b/main_super.go index c47c86a..78e7ee8 100644 --- a/main_super.go +++ b/main_super.go @@ -287,15 +287,18 @@ func super_peeradd(peerconf config.SuperPeerInfo) error { peer6.SetPSK(psk) } } + http_maps_lock.Lock() http_PeerID2PubKey[peerconf.NodeID] = peerconf.PubKey http_PeerState[peerconf.PubKey] = &PeerState{} http_PeerIPs[peerconf.PubKey] = &HttpPeerLocalIP{} - http_PeerLastSeen.Store(peerconf.NodeID, time.Time{}) + http_maps_lock.Unlock() return nil } func super_peerdel(toDelete config.Vertex) { + http_maps_lock.RLock() PubKey := http_PeerID2PubKey[toDelete] + http_maps_lock.RUnlock() UpdateErrorMsg := path.UpdateErrorMsg{ Node_id: toDelete, Action: path.Shutdown, @@ -322,11 +325,11 @@ func super_peerdel(toDelete config.Vertex) { http_device4.RemovePeerByID(toDelete) http_device6.RemovePeerByID(toDelete) http_graph.RemoveVirt(toDelete, true, false) - http_PeerLastSeen.Delete(toDelete) - + http_maps_lock.Lock() delete(http_PeerState, PubKey) delete(http_PeerIPs, PubKey) delete(http_PeerID2PubKey, toDelete) + http_maps_lock.Unlock() } func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { @@ -335,7 +338,8 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { case reg_msg := <-events.Event_server_register: var should_push_peer bool var should_push_nh bool - http_PeerLastSeen.Store(reg_msg.Node_id, time.Now()) + http_maps_lock.RLock() + http_PeerState[http_PeerID2PubKey[reg_msg.Node_id]].LastSeen = time.Now() if reg_msg.Node_id < config.Special_NodeID { PubKey := http_PeerID2PubKey[reg_msg.Node_id] if bytes.Equal(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false { @@ -352,11 +356,12 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { } var peer_state_changed bool http_PeerInfo, http_PeerInfo_hash, peer_state_changed = get_api_peers(http_PeerInfo_hash) + http_maps_lock.RUnlock() if should_push_peer || peer_state_changed { - PushPeerinfo() + PushPeerinfo(false) } if should_push_nh { - PushNhTable() + PushNhTable(false) } case <-events.Event_server_NhTable_changed: NhTable := graph.GetNHTable() @@ -366,7 +371,7 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { new_hash_str_byte := []byte(new_hash_str) copy(http_NhTable_Hash[:], new_hash_str_byte) http_NhTableStr = NhTablestr - PushNhTable() + PushNhTable(false) case pong_msg := <-events.Event_server_pong: changed := graph.UpdateLentancy(pong_msg.Src_nodeID, pong_msg.Dst_nodeID, pong_msg.Timediff, true, true) if changed { @@ -378,21 +383,29 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { copy(http_NhTable_Hash[:], new_hash_str_byte) copy(graph.NhTableHash[:], new_hash_str_byte) http_NhTableStr = NhTablestr - PushNhTable() + PushNhTable(false) } } } } func RoutinePushSettings(interval time.Duration) { + force := false + var lastforce time.Time for { - time.Sleep(interval) - PushNhTable() - PushPeerinfo() + if time.Now().After(lastforce.Add(interval)) { + lastforce = time.Now() + force = true + } else { + force = false + } + PushNhTable(force) + PushPeerinfo(false) + time.Sleep(path.S2TD(1)) } } -func PushNhTable() { +func PushNhTable(force bool) { body, err := path.GetByte(path.UpdateNhTableMsg{ State_hash: http_NhTable_Hash, }) @@ -407,17 +420,22 @@ func PushNhTable() { header.SetSrc(config.SuperNodeMessage) header.SetTTL(0) copy(buf[path.EgHeaderLen:], body) - for pkstr, _ := range http_PeerState { - if peer := http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { - http_device4.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) - } - if peer := http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { - http_device6.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) + http_maps_lock.RLock() + for pkstr, peerstate := range http_PeerState { + isAlive := peerstate.LastSeen.Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) + if (force && isAlive) || peerstate.NhTableState != http_NhTable_Hash { + if peer := http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { + http_device4.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) + } + if peer := http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { + http_device6.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) + } } } + http_maps_lock.RUnlock() } -func PushPeerinfo() { +func PushPeerinfo(force bool) { body, err := path.GetByte(path.UpdatePeerMsg{ State_hash: http_PeerInfo_hash, }) @@ -432,8 +450,9 @@ func PushPeerinfo() { header.SetSrc(config.SuperNodeMessage) header.SetTTL(0) copy(buf[path.EgHeaderLen:], body) + http_maps_lock.RLock() for pkstr, peerstate := range http_PeerState { - if peerstate.PeerInfoState != http_PeerInfo_hash { + if force || peerstate.PeerInfoState != http_PeerInfo_hash { if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { http_device4.SendPacket(peer, path.UpdatePeer, buf, device.MessageTransportOffsetContent) } @@ -442,6 +461,7 @@ func PushPeerinfo() { } } } + http_maps_lock.RUnlock() } func startUAPI(interfaceName string, logger *device.Logger, the_device *device.Device, errs chan error) (net.Listener, error) { diff --git a/orderdmap/orderdmap.go b/orderdmap/orderdmap.go index 54fec54..23325d0 100644 --- a/orderdmap/orderdmap.go +++ b/orderdmap/orderdmap.go @@ -65,6 +65,18 @@ func (o *OrderedMap) Set(key string, value interface{}) { o.values[key] = value } +func (o *OrderedMap) LoadOrStore(key string, value interface{}) (interface{}, bool) { + o.rwLock.Lock() + defer o.rwLock.Unlock() + _, exists := o.values[key] + if !exists { + o.keys = append(o.keys, key) + o.values[key] = value + return nil, false + } + return o.values[key], true +} + func (o *OrderedMap) Delete(key string) { // check key is in use o.rwLock.Lock() diff --git a/tap/tap_fd.go b/tap/tap_fd.go new file mode 100644 index 0000000..6cfce6a --- /dev/null +++ b/tap/tap_fd.go @@ -0,0 +1,88 @@ +package tap + +import ( + "errors" + "os" + "strconv" + + "github.com/KusakabeSi/EtherGuardVPN/config" +) + +type FdTap struct { + name string + mtu int + fileRX *os.File + fileTX *os.File + events chan Event +} + +// New creates and returns a new TUN interface for the application. +func CreateFdTAP(iconfig config.InterfaceConf, NodeID config.Vertex) (tapdev Device, err error) { + // Setup TUN Config + fdRXstr, has := os.LookupEnv("EG_FD_RX") + if !has { + return nil, errors.New("Need Environment Variable EG_FD_RX") + } + fdRX, err := strconv.Atoi(fdRXstr) + if err != nil { + return nil, err + } + + fdTxstr, has := os.LookupEnv("EG_FD_TX") + if !has { + return nil, errors.New("Need Environment Variable EG_FD_TX") + } + fdTX, err := strconv.Atoi(fdTxstr) + if err != nil { + return nil, err + } + + fileRX := os.NewFile(uintptr(fdRX), "pipeRX") + fileTX := os.NewFile(uintptr(fdTX), "pipeTX") + + tapdev = &FdTap{ + name: iconfig.Name, + fileRX: fileRX, + fileTX: fileTX, + events: make(chan Event, 1<<5), + } + tapdev.Events() <- EventUp + return +} + +// SetMTU sets the Maximum Tansmission Unit Size for a +// Packet on the interface. + +func (tap *FdTap) Read(buf []byte, offset int) (int, error) { + size, err := tap.fileRX.Read(buf[offset:]) + return size, err +} // read a packet from the device (without any additional headers) +func (tap *FdTap) Write(buf []byte, offset int) (size int, err error) { + packet := buf[offset:] + size, err = tap.fileRX.Write(packet) + if err != nil { + return 0, err + } + //err = syscall.Fsync(int(tap.fileTX.Fd())) + //err = tap.fileTX.Sync() + return +} // writes a packet to the device (without any additional headers) +func (tap *FdTap) Flush() error { + return nil +} // flush all previous writes to the device +func (tap *FdTap) MTU() (int, error) { + return tap.mtu, nil +} // returns the MTU of the device +func (tap *FdTap) Name() (string, error) { + return tap.name, nil +} // fetches and returns the current name +func (tap *FdTap) Events() chan Event { + return tap.events +} // returns a constant channel of events related to the device +func (tap *FdTap) Close() error { + tap.events <- EventDown + tap.fileRX.Close() + tap.fileTX.Close() + close(tap.events) + return nil +} // stops the device and closes the event channel diff --git a/tap/tap_sock.go b/tap/tap_sock.go new file mode 100644 index 0000000..d0c34ab --- /dev/null +++ b/tap/tap_sock.go @@ -0,0 +1,154 @@ +package tap + +import ( + "errors" + "net" + "time" + + "github.com/KusakabeSi/EtherGuardVPN/config" +) + +type SockServerTap struct { + name string + mtu int + protocol string + server *net.Listener + connRx *net.Conn + connTx *net.Conn + static bool + + closed bool + events chan Event +} + +// New creates and returns a new TUN interface for the application. +func CreateSockTAP(iconfig config.InterfaceConf, protocol string, NodeID config.Vertex) (tapdev Device, err error) { + // Setup TUN Config + + tap := &SockServerTap{ + name: iconfig.Name, + mtu: 1500, + protocol: protocol, + server: nil, + connRx: nil, + connTx: nil, + static: false, + closed: false, + events: make(chan Event, 1<<5), + } + + if iconfig.RecvAddr == "" && iconfig.SendAddr == "" { + return nil, errors.New("At least one of RecvAddr or SendAddr required.") + } + + if iconfig.RecvAddr != "" { + server, err := net.Listen(protocol, iconfig.RecvAddr) + if err != nil { + return nil, err + } + tap.server = &server + go tap.RoutineAcceptConnection() + } + + if iconfig.SendAddr != "" { + client, err := net.Dial(protocol, iconfig.SendAddr) + if err != nil { + return nil, err + } + tap.connTx = &client + tap.static = true + if tap.server == nil { + tap.connRx = &client + } + } + + tapdev = tap + tapdev.Events() <- EventUp + return +} + +func (tap *SockServerTap) RoutineAcceptConnection() { + if tap.server == nil { + return + } + for { + fd, err := (*tap.server).Accept() + if tap.closed == true { + return + } + if err != nil { + println("accept error", err) + return + } + if tap.connRx != nil { + (*tap.connRx).Close() + } + + tap.connRx = &fd + if tap.static == false { + tap.connTx = &fd + } + } +} + +// SetMTU sets the Maximum Tansmission Unit Size for a +// Packet on the interface. + +func (tap *SockServerTap) Read(buf []byte, offset int) (size int, err error) { + if tap.closed { + return 0, errors.New("Tap closed") + } + if tap.connRx == nil { + time.Sleep(time.Second) + return 0, nil + } + size, err = (*tap.connRx).Read(buf[offset:]) + if err != nil && tap.server != nil { + tap.connRx = nil + return 0, nil + } + return +} // read a packet from the device (without any additional headers) +func (tap *SockServerTap) Write(buf []byte, offset int) (size int, err error) { + if tap.closed { + return 0, errors.New("Tap closed") + } + if tap.connTx == nil { + return + } + size, err = (*tap.connTx).Write(buf[offset:]) + if serr, ok := err.(*net.OpError); ok && tap.server != nil { + if serr.Err.Error() == "use of closed network connection" || serr.Err.Error() == "EOF" { + tap.connTx = nil + return 0, nil + } + } + return +} // writes a packet to the device (without any additional headers) +func (tap *SockServerTap) Flush() error { + return nil +} // flush all previous writes to the device +func (tap *SockServerTap) MTU() (int, error) { + return tap.mtu, nil +} // returns the MTU of the device +func (tap *SockServerTap) Name() (string, error) { + return tap.name, nil +} // fetches and returns the current name +func (tap *SockServerTap) Events() chan Event { + return tap.events +} // returns a constant channel of events related to the device +func (tap *SockServerTap) Close() error { + tap.events <- EventDown + tap.closed = true + if tap.connRx != nil { + (*tap.connRx).Close() + } + if tap.connTx != nil { + (*tap.connTx).Close() + } + if tap.server != nil { + (*tap.server).Close() + } + close(tap.events) + return nil +} // stops the device and closes the event channel diff --git a/tap/tap_udpsock.go b/tap/tap_udpsock.go index 5116179..c2f13e5 100644 --- a/tap/tap_udpsock.go +++ b/tap/tap_udpsock.go @@ -1,6 +1,7 @@ package tap import ( + "errors" "fmt" "net" @@ -12,33 +13,55 @@ type UdpSockTap struct { mtu int recv *net.UDPConn send *net.UDPAddr + static bool L2mode L2MODE - macaddr MacAddress events chan Event } // New creates and returns a new TUN interface for the application. -func CreateUDPSockTAP(iconfig config.InterfaceConf,NodeID config.Vertex, listenAddr *net.UDPAddr, sendAddr *net.UDPAddr) (tapdev Device, err error) { +func CreateUDPSockTAP(iconfig config.InterfaceConf, NodeID config.Vertex) (tapdev Device, err error) { // Setup TUN Config - listener, err := net.ListenUDP("udp", listenAddr) - if err != nil { - fmt.Println(err.Error()) - } - macaddr, err := GetMacAddr(iconfig.MacAddrPrefix,uint32(NodeID)) - if err != nil { - fmt.Println("ERROR: Failed parse mac address:", iconfig.MacAddrPrefix) - return nil, err - } - tapdev = &UdpSockTap{ + tap := &UdpSockTap{ name: iconfig.Name, mtu: 1500, - recv: listener, - send: sendAddr, - macaddr: macaddr, + recv: nil, + send: nil, + static: false, L2mode: GetL2Mode(iconfig.L2HeaderMode), events: make(chan Event, 1<<5), } + + if iconfig.RecvAddr == "" && iconfig.SendAddr == "" { + return nil, errors.New("At least one of RecvAddr or SendAddr required.") + } + + if iconfig.RecvAddr == "" { + iconfig.RecvAddr = ":0" + } + listenAddr, err := net.ResolveUDPAddr("udp", iconfig.RecvAddr) + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + listener, err := net.ListenUDP("udp", listenAddr) + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + tap.recv = listener + + if iconfig.SendAddr != "" { + sendAddr, err := net.ResolveUDPAddr("udp", iconfig.SendAddr) + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + tap.send = sendAddr + tap.static = true + } + + tapdev = tap tapdev.Events() <- EventUp return } @@ -47,46 +70,15 @@ func CreateUDPSockTAP(iconfig config.InterfaceConf,NodeID config.Vertex, listenA // Packet on the interface. func (tap *UdpSockTap) Read(buf []byte, offset int) (int, error) { - switch tap.L2mode { - case KeyboardDebug: - size, _, err := tap.recv.ReadFromUDP(buf[offset+10:]) - packet := buf[offset:] - src := Charform2mac(packet[11]) - dst := Charform2mac(packet[10]) - copy(packet[0:6], dst[:]) - copy(packet[6:12], src[:]) - return size + 10, err - case BoardcastAndNodeID: - size, _, err := tap.recv.ReadFromUDP(buf[offset+12:]) - packet := buf[offset:] - src := tap.macaddr - dst := Charform2mac('b') - copy(packet[0:6], dst[:]) - copy(packet[6:12], src[:]) - return size + 12, err - default: - size, _, err := tap.recv.ReadFromUDP(buf[offset:]) - return size, err + size, source, err := tap.recv.ReadFromUDP(buf[offset:]) + if tap.static == false { + tap.send = source } - + return size, err } // read a packet from the device (without any additional headers) func (tap *UdpSockTap) Write(buf []byte, offset int) (size int, err error) { - packet := buf[offset:] - switch tap.L2mode { - case KeyboardDebug: - src := Mac2charForm(packet[6:12]) - dst := Mac2charForm(packet[0:6]) - packet[10] = dst - packet[11] = src - size, err = tap.recv.WriteToUDP(packet[10:], tap.send) - return - case BoardcastAndNodeID: - size, err = tap.recv.WriteToUDP(packet[12:], tap.send) - return - default: - size, err = tap.recv.WriteToUDP(packet, tap.send) - return - } + size, err = tap.recv.WriteToUDP(buf[offset:], tap.send) + return } // writes a packet to the device (without any additional headers) func (tap *UdpSockTap) Flush() error { return nil