This commit is contained in:
hhd
2021-11-24 22:40:07 +08:00
commit 954dfba4e0
47 changed files with 3625 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
__debug_bin
__debug_bin.exe
# .vscode
openp2p
openp2p.exe*
*.log
go.sum
*.tar.gz
*.zip
*.exe

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 OpenP2P.cn
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

119
README-ZH.md Normal file
View File

@@ -0,0 +1,119 @@
[English](/README.md)|中文
## OpenP2P是什么
它是一个开源、免费、轻量级的P2P共享网络。任何设备接入OpenP2P就可以随时随地访问它们。
## 为什么选择OpenP2P
### 免费
完全免费满足大部分用户的核心白票需求。不像其它类似的产品我们不需要有公网IP的服务器不需要花钱买服务。
### 安全
代码开源,接受各位大佬检验。下面详细展开
### 轻量
文件大小2MB+运行内存2MB+;全部在应用层实现,没有虚拟网卡,没有内核程序
### 跨平台
因为轻量所以很容易支持各个平台。支持主流的操作系统Windows,Linux,MacOS和主流的cpu架构386、amd64、arm、arm64、mipsle、mipsle64、mips、mips64
### 高效
P2P直连可以让你的设备跑满带宽。不论你的设备在任何网络环境无论NAT1-4Cone或Symmetric都支持。依靠Quic协议优秀的拥塞算法能在糟糕的网络环境获得高带宽低延时。
### 二次开发
基于OpenP2P只需数行代码就能让原来只能局域网通信的程序变成任何内网都能通信
## 快速入门
以一个最常见的例子说明OpenP2P如何使用远程办公在家里连入办公室Windows电脑。
相信很多人在疫情下远程办公是刚需。
1. 先确认办公室电脑已开启远程桌面功能如何开启参考官方说明https://docs.microsoft.com/zh-cn/windows-server/remote/remote-desktop-services/clients/remote-desktop-allow-access
2. 在办公室下载最新的OpenP2P(补上URL),解压出来,在命令行执行
```
openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1
```
`切记将标记大写的参数改成自己的`
![image](/doc/images/officelisten.png)
3. 在家里下载最新的OpenP2P(补上URL),解压出来,在命令行执行
```
openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 --peernode OFFICEPC1 --dstip 127.0.0.1 --dstport 3389 --srcport 23389 --protocol tcp
```
`切记将标记大写的参数改成自己的`
![image](/doc/images/homeconnect.png)
![image](/doc/images/mem.png)
`LISTEN ON PORT 23389 START` 看到这行日志表示P2PApp建立成功监听23389端口。只需连接本机的127.0.0.1:23389就相当于连接公司Windows电脑的3389端口。
4. 在家里Windows电脑按Win+R输入mstsc打开远程桌面输入127.0.0.1:23389 /admin
![image](/doc/images/mstscconnect.png)
![image](/doc/images/afterconnect.png)
## [详细使用说明](/USAGE-ZH.md)
## 典型应用场景
特别适合大流量的内网访问
### 远程办公
Windows MSTSC、VNC等远程桌面SSH内网各种ERP系统
### 远程访问NAS
管理大量视频、图片
### 远程监控摄像头
### 远程刷机
### 远程数据备份
---
## 概要设计
### 原型
![image](/doc/images/prototype.png)
### 客户端架构
![image](/doc/images/architecture.png)
### P2PApp
它是项目里最重要的概念一个P2PApp就是把远程的一个服务mstsc/ssh等通过P2P网络映射到本地监听。二次开发或者我们提供的Restful API主要工作就是管理P2PApp
![image](/doc/images/appdetail.png)
## 共享
默认会开启共享限速10mbps只有你用户下提供了共享节点才能使用别人的共享节点。这非常公平也是这个项目的初衷。
我们建议你在带宽足够的地方(比如办公室,家里的百兆光纤)加入共享网络。
如果你仍然不想共享任何节点,请查看运行参数
## 安全性
加入OpenP2P共享网络的节点只能凭授权访问。共享节点只会中转数据别人无法访问内网任何资源。
### TLS1.3+AES
两个节点间通信数据走业界最安全的TLS1.3通道。通信内容还会使用AES加密双重安全密钥是通过服务端作换。有效阻止中间人攻击
### 共享的中转节点是否会获得我的数据
没错中转节点天然就是一个中间人所以才加上AES加密通信内容保证安全。中转节点是无法获取明文的
### 中转节点是如何校验权限的
服务端有个调度模型根据带宽、ping值、稳定性、服务时长尽可能地使共享节点均匀地提供服务。连接共享节点使用TOTP密码hmac-sha256算法校验它是一次性密码和我们平时使用的手机验证码或银行密码器一样的原理。
## 编译
cd到代码根目录执行
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
go build
```
## TODO
近期计划:
1. 支持IPv6
2. 支持随系统自动启动,安装成系统服务
3. 提供一些免费服务器给特别差的网络,如广电网络
4. 建立网站用户可以在网站管理所有P2PApp和设备。查看设备在线状态升级增删查改重启P2PApp等
5. 建立公众号用户可在微信公众号管理所有P2PApp和设备
6. 客户端提供WebUI
7. 支持自有服务器高并发连接
8. 共享节点调度模型优化,对不同的运营商优化
9. 方便二次开发提供API和lib
10. 应用层支持UDP协议实现很简单但UDP应用较少暂不急
11. 底层通信支持KCP协议目前仅支持QuicKCP专门对延时优化被游戏加速器广泛使用可以牺牲一定的带宽降低延时
12. 支持Android系统让旧手机焕发青春变成移动网关
13. 支持Windows网上邻居共享文件
14. 内网直连优化,用处不大,估计就用户测试时用到
远期计划:
1. 彻底地分布式去中心化设计
2. 企业级支持,可以更好地管理大量设备,和更安全更细的权限控制
## 参与贡献
TODO或ISSUE里如果有你擅长的领域或者你有特别好的主意可以加入OpenP2P项目贡献你的代码。待项目茁壮成长后你们就是知名开源项目的主要代码贡献者岂不快哉。
## 商业合作
它是一个中国人发起的项目,更懂国内网络环境,更懂用户需求,更好的企业级支持
## 技术交流
QQ群16947733
邮箱openp2p.cn@gmail.com 271357901@qq.com
第一时间获得最新版本消息以及一些最新IT业界动态
微信公众号openp2p
微博https://weibo.com/openp2p
## 免责声明
本项目开源供大家学习和免费使用,禁止用于非法用途,任何不当使用本项目或意外造成的损失,本项目及相关人员不会承担任何责任。

144
README.md Normal file
View File

@@ -0,0 +1,144 @@
English|[中文](/README-ZH.md)
## What is OpenP2P
It is an open source, free, and lightweight P2P sharing network. As long as any device joins in, you can access them anywhere
## Why OpenP2P
### Free
Totaly free, fullfills most of users(especially free-rider). Unlike other similar products, we don't need a server with public IP, and don't need to pay for services.
### Safe
Open source, trustable(see details below)
### Lightweight
2MB+ filesize, 2MB+ memory. It runs at appllication layer, no vitrual NIC, no kernel driver.
### Cross-platform
Benefit from lightweight, it easily supports most of major OS, like Windows, Linux, MacOS, also most of CPU architecture, like 386、amd64、arm、arm64、mipsle、mipsle64、mips、mips64.
### Efficient
P2P direct connection lets your devices make good use of bandwidth. Your device can be connected in any network environments, even supports NAT1-4 (Cone or Symmetric). Relying on the excellent congestion algorithm of the Quic protocol, high bandwidth and low latency can be obtained in a bad network environment.
### Integration
Your applicaiton can call OpenP2P with a few code to make any internal networks communicate with each other.
## Get Started
A common scenario to introduce OpenP2P: remote work. At home connects to office's Linux PC .
Under the outbreak of covid-19 pandemic, surely remote work becomes a fundamental demand.
1. Make sure your office device(Linux) has opened the access of ssh.
```
netstat -nl | grep 22
```
Output sample
![image](/doc/images/officelisten_linux.png)
2. Download the latest version of OpenP2P(TBC),unzip the downloaded package, and execute below command line.
```
tar xvf openp2p0.95.3.linux-amd64.tar.gz
openp2p -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1
```
`Must change the parameters marked in uppercase to your own`
Output sample
![image](/doc/images/officeexecute_linux.png)
3. Download the same package of OpenP2P(TBC) on your home deviceunzip and execute below command line.
```
openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 --peernode OFFICEPC1 --dstip 127.0.0.1 --dstport 22 --srcport 22022 --protocol tcp
```
`Must change the parameters marked in uppercase to your own`
Output sample
![image](/doc/images/homeconnect_windows.png)
The log of `LISTEN ON PORT 22022 START` indicates P2PApp runs successfully on your home device, listing port is 22022. Once connects to local ip:port,127.0.0.1:22022, it means the home device has conneccted to the office device's port, 22.
![image](/doc/images/officelisten_2_linux.png)
4. Test the connection between office device and home device.In your home deivce, run SSH to login the office device.
```
ssh -p22022 root@127.0.0.1:22022
```
![image](/doc/images/sshconnect.png)
## [Usage](/USAGE.md)
## Scenarios
Especially suitable for large traffic intranet access.
### Remote work
Windows MSTSC, VNC and other remote desktops, SSH, various ERP systems in the intranet
### Remote Access NAS
Manage a large number of videos and pictures
### Remote Access Camera
### Remote Flashing Phone
### Remotely Data Backup
---
## Overview Design
### Prototype
![image](/doc/images/prototype.png)
### Client architecture
![image](/doc/images/architecture.png)
### P2PApp
P2PAPP is the most import concept in this project, one P2PApp is able to map the remote service(mstsc/ssh) to the local listening. The main job of re-development or restful API we provide is to manage P2PApp.
![image](/doc/images/appdetail.png)
## Share
10mbps is its default setting of share speed limit. Only when your users have shared their nodes, they are allowed to use others' shared nodes. This is very fair, and it is also the original intention of this project.
We recommend that you join a shared network in a place with sufficient bandwidth (such as an office or home with 100M optical fiber).
If you are still not willing to contribute any node to the OpenP2P share network, please refer to the operating parameters for your own setting.
## Safety
The nodes which have joined the OpenP2P share network can vist each other by authentications. Shared nodes will only relay data, and others cannot access any resources in the intranet.
### TLS1.3+AES
The communication data between the two nodes uses the industry's most secure TLS1.3 channel. The communication content will also use AES encryption, double security, the key is exchanged through the server. Effectively prevent man-in-the-middle attacks.
### Will the shared node capture my data?
That's right, the relay node is naturally an man-in-middle, so AES encryption is added to ensure the security of the communication content. The relay node cannot obtain the plaintext.
### How does the shared relay node verify the authority?
The server side has a scheduling model, which calculate bandwith, ping value,stability and service duration to provide a well-proportioned service to every share node. It uses TOTP(Time-based One-time Password) with hmac-sha256 algorithem, its theory as same as the cellphone validation code or bank cipher coder.
## Build
cd root directory of the socure code and execute
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
go build
```
## TODO
Short-Term:
1. Support IPv6.
2. Support auto run when system boot, setup system service.
3. Provide free servers to some low-performance network.
4. Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .
5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website.
6. Provide WebUI on client side.
7. Support high concurrency on server side.
8. Optimize our share scheduling model for different network operators.
9. Provide REST APIs and libary for secondary development.
10. Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.
11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag.
12. Support Android platform, let the phones to be mobile gateway .
13. Support SMB Windows neighborhood.
14. Direct connection on intranet, for testing.
Long-Term:
1. Decentration and distribution.
2. Enterprise-level product can well manage large scale equipment and ACL.
## Contribute
If the items in TODO or ISSUE is your domain, or you have sepical good idea, welcome to join this OpenP2P project and contribute your code. When this project grows stronger, you will be the major outstanding contributors. That's cool.
## Contact
QQ Group: 16947733
Email: openp2p.cn@gmail.com tenderiron@139.com
Get the latest version news, and some of the latest IT industry trends
WeChat public account: openp2p
## Disclaimer
This project is open source for everyone to learn and use for free. It is forbidden to be used for illegal purposes. Any loss caused by improper use of this project or accident, this project and related personnel will not bear any responsibility.

41
USAGE-ZH.md Normal file
View File

@@ -0,0 +1,41 @@
# 详细运行参数说明
## 监听
```
openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1
```
>* -d daemon模式推荐使用。发现worker进程意外退出就会自动启动新的worker进程
>* -node 独一无二的节点名字,唯一标识
>* -user 独一无二的用户名字该节点属于这个user
>* -password 密码
>* -sharebandwidth 作为共享节点时提供带宽默认10mbps. 如果是光纤大带宽,设置越大效果越好
>* -loglevel 需要查看更多调试日志设置0默认是1
>* -noshare 不共享该节点只在私有的P2P网络使用。不加入共享的P2P网络这样也意味着无法使用别人的共享节点
## 连接
```
openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389 -protocol tcp
```
>* -peernode 目标节点名字
>* -dstip 目标服务地址默认本机127.0.0.1
>* -dstport 目标服务端口常见的如windows远程桌面3389Linux ssh 22
>* -protocol 目标服务协议 tcp、udp
>* -peeruser 目标用户,如果是同一个用户下的节点,则无需设置
>* -peerpassword 目标密码,如果是同一个用户下的节点,则无需设置
>* -f 配置文件,如果希望配置多个P2PApp参考[config.json](/config.json)
## 升级客户端
```
# update local client
openp2p update
# update remote client
curl --insecure 'https://openp2p.cn:27182/api/v1/device/YOUR-NODE-NAME/update?user=&password='
```
Windows系统需要设置防火墙放行本程序程序会自动设置如果设置失败会影响连接功能。
Linux系统Ubuntu和CentOS7的防火墙默认配置均不会有影响如果不行可尝试关闭防火墙
```
systemctl stop firewalld.service
systemctl start firewalld.service
firewall-cmd --state
```

41
USAGE.md Normal file
View File

@@ -0,0 +1,41 @@
# Parameters details
## Listen
```
openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1
```
>* -d daemon mode is recommand. When the worker process is found to exit unexpectedly, a new worker process will be automatically started
>* -node Unique node name, unique identification
>* -user Unique user name, the node belongs to this user
>* -password Password
>* -sharebandwidth Provides bandwidth when used as a shared node, the default is 10mbps. If it is a large bandwidth of optical fiber, the larger the setting, the better the effect
>* -loglevel Need to view more debug logs, set 0; the default is 1
>* -noshare Not shared, the node is only used in a private P2P network. Do not join the shared P2P network, which also means that you CAN NOT use other peoples shared nodes
## Connect
```
openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389 -protocol tcp
```
>* -peernode Target node name
>* -dstip Target service address, default local 127.0.0.1
>* -dstport Target service port, such as windows remote desktop 3389, Linux ssh 22
>* -protocol Target service protocol tcp, udp
>* -peeruser The target user, if it is a node under the same user, no need to set
>* -peerpassword The target password, if it is a node under the same user, no need to set
>* -f Configuration file, if you want to configure multiple P2PApp refer to [config.json](/config.json)
## Client update
```
# update local client
openp2p update
# update remote client
curl --insecure 'https://openp2p.cn:27182/api/v1/device/YOUR-NODE-NAME/update?user=&password='
```
Windows system needs to set up firewall for this program, the program will automatically set the firewall, if the setting fails, the UDP punching will be affected.
The default firewall configuration of Linux system (Ubuntu and CentOS7) will not have any effect, if not, you can try to turn off the firewall
```
systemctl stop firewalld.service
systemctl start firewalld.service
firewall-cmd --state
```

45
bandwidthLimit.go Normal file
View File

@@ -0,0 +1,45 @@
package main
import (
"sync"
"time"
)
// BandwidthLimiter ...
type BandwidthLimiter struct {
freeFlowTime time.Time
bandwidth int // mbps
freeFlow int // bytes
maxFreeFlow int // bytes
freeFlowMtx sync.Mutex
}
// mbps
func newBandwidthLimiter(bw int) *BandwidthLimiter {
return &BandwidthLimiter{
bandwidth: bw,
freeFlowTime: time.Now(),
maxFreeFlow: bw * 1024 * 1024 / 8,
freeFlow: bw * 1024 * 1024 / 8,
}
}
// Add ...
func (bl *BandwidthLimiter) Add(bytes int) {
if bl.bandwidth <= 0 {
return
}
bl.freeFlowMtx.Lock()
defer bl.freeFlowMtx.Unlock()
// calc free flow 1000*1000/1024/1024=0.954; 1024*1024/1000/1000=1.048
bl.freeFlow += int(time.Now().Sub(bl.freeFlowTime) * time.Duration(bl.bandwidth) / 8 / 954)
if bl.freeFlow > bl.maxFreeFlow {
bl.freeFlow = bl.maxFreeFlow
}
bl.freeFlow -= bytes
bl.freeFlowTime = time.Now()
if bl.freeFlow < 0 {
// sleep for the overflow
time.Sleep(time.Millisecond * time.Duration(-bl.freeFlow/(bl.bandwidth*1048/8)))
}
}

101
common.go Normal file
View File

@@ -0,0 +1,101 @@
package main
import (
"crypto/aes"
"crypto/cipher"
"fmt"
"net"
)
func getmac(ip string) string {
//get mac relative to the ip address which connected to the mq.
ifaces, err := net.Interfaces()
if err != nil {
return ""
}
firstMac := ""
for _, iface := range ifaces {
addrs, _ := iface.Addrs()
for _, addr := range addrs {
if firstMac == "" {
firstMac = iface.HardwareAddr.String()
}
if ipNet, ok := addr.(*net.IPNet); ok && ipNet.IP.String() == ip {
if iface.HardwareAddr.String() != "" {
return iface.HardwareAddr.String()
}
return firstMac
}
}
}
return firstMac
}
var cbcIVBlock = []byte("UHNJUSBACIJFYSQN")
var paddingArray = [][]byte{
{0},
{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
{4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4},
{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5},
{6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6},
{7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7},
{8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8},
{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9},
{10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10},
{11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11},
{12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12},
{13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13},
{14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14},
{15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15},
{16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16},
}
func pkcs7Padding(plainData []byte, dataLen, blockSize int) int {
padLen := blockSize - dataLen%blockSize
pPadding := plainData[dataLen : dataLen+padLen]
copy(pPadding, paddingArray[padLen][:padLen])
return padLen
}
func pkcs7UnPadding(origData []byte, dataLen int) ([]byte, error) {
unPadLen := int(origData[dataLen-1])
if unPadLen <= 0 || unPadLen > 16 {
return nil, fmt.Errorf("wrong pkcs7 padding head size:%d", unPadLen)
}
return origData[:(dataLen - unPadLen)], nil
}
func encryptBytes(key []byte, out, in []byte, plainLen int) ([]byte, error) {
if len(key) == 0 {
return in[:plainLen], nil
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
//iv := out[:aes.BlockSize]
//if _, err := io.ReadFull(rand.Reader, iv); err != nil {
// return nil, err
//}
mode := cipher.NewCBCEncrypter(block, cbcIVBlock)
total := pkcs7Padding(in, plainLen, aes.BlockSize) + plainLen
mode.CryptBlocks(out[:total], in[:total])
return out[:total], nil
}
func decryptBytes(key []byte, out, in []byte, dataLen int) ([]byte, error) {
if len(key) == 0 {
return in[:dataLen], nil
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
mode := cipher.NewCBCDecrypter(block, cbcIVBlock)
mode.CryptBlocks(out[:dataLen], in[:dataLen])
return pkcs7UnPadding(out, dataLen)
}

41
common_test.go Normal file
View File

@@ -0,0 +1,41 @@
package main
import (
"log"
"testing"
)
func TestAESCBC(t *testing.T) {
for packetSize := 1; packetSize <= 8192; packetSize++ {
log.Println("test packetSize=", packetSize)
data := make([]byte, packetSize)
for i := 0; i < packetSize; i++ {
data[i] = byte('0' + i%10)
}
p2pEncryptBuf := make([]byte, len(data)+PaddingSize)
inBuf := make([]byte, len(data)+PaddingSize)
copy(inBuf, data)
cryptKey := []byte("0123456789ABCDEF")
sendBuf, err := encryptBytes(cryptKey, p2pEncryptBuf, inBuf, len(data))
if err != nil {
t.Errorf("encrypt packet failed:%s", err)
}
log.Printf("encrypt data len=%d\n", len(sendBuf))
decryptBuf := make([]byte, len(sendBuf))
outBuf, err := decryptBytes(cryptKey, decryptBuf, sendBuf, len(sendBuf))
if err != nil {
t.Errorf("decrypt packet failed:%s", err)
}
// log.Printf("len=%d,content=%s\n", len(outBuf), outBuf)
log.Printf("decrypt data len=%d\n", len(outBuf))
log.Println("validate")
for i := 0; i < len(outBuf); i++ {
if outBuf[i] != byte('0'+i%10) {
t.Error("validate failed")
}
}
log.Println("validate ok")
}
}

85
config.go Normal file
View File

@@ -0,0 +1,85 @@
package main
import (
"encoding/json"
"io/ioutil"
"time"
)
var gConf Config
type AppConfig struct {
// required
Protocol string
SrcPort int
PeerNode string
DstPort int
DstHost string
PeerUser string
PeerPassword string
// runtime info
peerToken uint64
peerNatType int
peerIP string
peerConeNatPort int
retryNum int
retryTime time.Time
shareBandwidth int
}
type Config struct {
Network NetworkConfig `json:"network"`
Apps []AppConfig `json:"apps"`
daemonMode bool
}
func (c *Config) add(app AppConfig) {
if app.SrcPort == 0 || app.DstPort == 0 {
return
}
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
return
}
}
c.Apps = append(c.Apps, app)
}
// func (c *Config) save() {
// data, _ := json.MarshalIndent(c, "", "")
// ioutil.WriteFile("config.json", data, 0644)
// }
func (c *Config) load() error {
data, err := ioutil.ReadFile("config.json")
if err != nil {
gLog.Println(LevelERROR, "read config.json error:", err)
return err
}
err = json.Unmarshal(data, &c)
if err != nil {
gLog.Println(LevelERROR, "parse config.json error:", err)
}
return err
}
type NetworkConfig struct {
// local info
Node string
User string
Password string
NoShare bool
localIP string
ipv6 string
hostName string
mac string
os string
publicIP string
natType int
shareBandwidth int
// server info
ServerHost string
ServerPort int
UDPPort1 int
UDPPort2 int
}

31
config.json Normal file
View File

@@ -0,0 +1,31 @@
{
"network": {
"Node": "hhd1207-222",
"User": "tenderiron",
"Password": "13760636579",
"ServerHost": "openp2p.cn",
"ServerPort": 27182,
"UDPPort1": 27182,
"UDPPort2": 27183
},
"apps": [
{
"Protocol": "tcp",
"SrcPort": 53389,
"PeerNode": "dell720-902",
"DstPort": 3389,
"DstHost": "10.1.6.36",
"PeerUser": "",
"PeerPassword": ""
},
{
"Protocol": "tcp",
"SrcPort": 22,
"PeerNode": "dell720-902",
"DstPort": 22,
"DstHost": "127.0.0.1",
"PeerUser": "",
"PeerPassword": ""
}
]
}

36
daemon.go Normal file
View File

@@ -0,0 +1,36 @@
package main
import (
"os"
"time"
)
type daemon struct {
}
func (d *daemon) run() {
gLog.Println(LevelINFO, "daemon start")
defer gLog.Println(LevelINFO, "daemon end")
var args []string
// rm -d parameter
for i := 0; i < len(os.Args); i++ {
if os.Args[i] == "-d" {
args = append(os.Args[0:i], os.Args[i+1:]...)
break
}
}
args = append(args, "-bydaemon")
for {
// start worker
gLog.Println(LevelINFO, "start worker process")
execSpec := &os.ProcAttr{Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}}
p, err := os.StartProcess(os.Args[0], args, execSpec)
if err != nil {
gLog.Printf(LevelERROR, "start worker error:%s", err)
return
}
_, _ = p.Wait()
gLog.Printf(LevelERROR, "worker stop, restart it after 10s")
time.Sleep(time.Second * 10)
}
}

BIN
doc/images/afterconnect.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

BIN
doc/images/appdetail.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

BIN
doc/images/architecture.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

BIN
doc/images/homeconnect.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 120 KiB

BIN
doc/images/mem.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

BIN
doc/images/mstscconnect.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

BIN
doc/images/officelisten.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

BIN
doc/images/prototype.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

BIN
doc/images/sshconnect.PNG Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

9
go.mod Normal file
View File

@@ -0,0 +1,9 @@
module openp2p
go 1.16
require (
github.com/gorilla/websocket v1.4.2
github.com/lucas-clemente/quic-go v0.24.0
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34
)

180
holepunch.go Normal file
View File

@@ -0,0 +1,180 @@
package main
import (
"bytes"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sync"
"time"
)
func handshakeC2C(t *P2PTunnel) (err error) {
gLog.Printf(LevelDEBUG, "handshakeC2C %s:%d:%d to %s:%d", t.pn.config.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort)
defer gLog.Printf(LevelDEBUG, "handshakeC2C ok")
conn, err := net.ListenUDP("udp", t.la)
if err != nil {
return err
}
defer conn.Close()
_, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
if err != nil {
gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshake error:", err)
return err
}
ra, head, _, _, err := UDPRead(conn, 5000)
if err != nil {
time.Sleep(time.Millisecond * 200)
gLog.Println(LevelDEBUG, err, ", return this error when ip was not reachable, retry read")
ra, head, _, _, err = UDPRead(conn, 5000)
if err != nil {
gLog.Println(LevelDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
}
}
t.ra, _ = net.ResolveUDPAddr("udp", ra.String())
// cone server side
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LevelDEBUG, "read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
if err != nil {
gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
return err
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id)
return nil
}
}
// cone client side will only read handshake ack
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id)
_, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
if err != nil {
gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
}
return err
}
return nil
}
func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LevelDEBUG, "handshakeC2S start")
defer gLog.Printf(LevelDEBUG, "handshakeC2S end")
// even if read timeout, continue handshake
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la)
if err != nil {
return err
}
defer conn.Close()
go func() error {
gLog.Printf(LevelDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort)
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
time.Sleep(SymmetricHandshakeInterval)
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2))
if err != nil {
return err
}
_, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
if err != nil {
gLog.Println(LevelDEBUG, "handshakeC2S write MsgPunchHandshake error:", err)
return err
}
}
gLog.Println(LevelDEBUG, "send symmetric handshake end")
return nil
}()
deadline := time.Now().Add(SymmetricHandshakeAckTimeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LevelERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
return err
}
// read response of the punching hole ok port
result := make([]byte, 1024)
_, dst, err := conn.ReadFrom(result)
if err != nil {
gLog.Println(LevelERROR, "handshakeC2S wait timeout")
return err
}
head := &openP2PHeader{}
err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head)
if err != nil {
gLog.Println(LevelERROR, "parse p2pheader error:", err)
return err
}
t.ra, _ = net.ResolveUDPAddr("udp", dst.String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LevelDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String())
_, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
return err
}
return nil
}
func handshakeS2C(t *P2PTunnel) error {
gLog.Printf(LevelDEBUG, "handshakeS2C start")
defer gLog.Printf(LevelDEBUG, "handshakeS2C end")
gotCh := make(chan *net.UDPAddr, 5)
// sequencely udp send handshake, do not parallel send
gLog.Printf(LevelDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort)
gotIt := false
gotMtx := sync.Mutex{}
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
time.Sleep(SymmetricHandshakeInterval)
go func(t *P2PTunnel) error {
conn, err := net.ListenUDP("udp", nil)
if err != nil {
gLog.Printf(LevelDEBUG, "listen error")
return err
}
defer conn.Close()
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
_, head, _, _, err := UDPRead(conn, 10000)
if err != nil {
// gLog.Println(LevelDEBUG, "one of the handshake error:", err)
return err
}
gotMtx.Lock()
defer gotMtx.Unlock()
if gotIt {
return nil
}
gotIt = true
t.la, _ = net.ResolveUDPAddr("udp", conn.LocalAddr().String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
if err != nil {
gLog.Println(LevelDEBUG, "handshakeS2C handshake error")
return err
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String())
gotCh <- t.la
return nil
}
}
return nil
}(t)
}
gLog.Printf(LevelDEBUG, "send symmetric handshake end")
gLog.Println(LevelDEBUG, "handshakeS2C ready, notify peer connect")
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
select {
case <-time.After(SymmetricHandshakeAckTimeout):
return fmt.Errorf("wait handshake failed")
case la := <-gotCh:
gLog.Println(LevelDEBUG, "symmetric handshake ok", la)
}
return nil
}

198
log.go Normal file
View File

@@ -0,0 +1,198 @@
package main
import (
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
)
// LogLevel ...
type LogLevel int
var gLog *V8log
// LevelDEBUG ...
const (
LevelDEBUG LogLevel = iota
LevelINFO
LevelWARN
LevelERROR
)
var (
logFileNames map[LogLevel]string
loglevel map[LogLevel]string
)
func init() {
logFileNames = make(map[LogLevel]string)
loglevel = make(map[LogLevel]string)
logFileNames[0] = ".log"
loglevel[LevelDEBUG] = "DEBUG"
loglevel[LevelINFO] = "INFO"
loglevel[LevelWARN] = "WARN"
loglevel[LevelERROR] = "ERROR"
}
const (
LogFile = iota
LogConsole
LogFileAndConsole
)
// V8log ...
type V8log struct {
loggers map[LogLevel]*log.Logger
files map[LogLevel]*os.File
llevel LogLevel
stopSig chan bool
logDir string
mtx *sync.Mutex
stoped bool
lineEnding string
pid int
lastError string
maxLogSize int64
mode int
}
// InitLogger ...
func InitLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *V8log {
logger := make(map[LogLevel]*log.Logger)
openedfile := make(map[LogLevel]*os.File)
var (
logdir string
)
if path == "" {
logdir = "log/"
} else {
logdir = path + "/log/"
}
os.MkdirAll(logdir, 0777)
for l := range logFileNames {
logFilePath := logdir + filePrefix + logFileNames[l]
f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatal(err)
}
os.Chmod(logFilePath, 0666)
openedfile[l] = f
logger[l] = log.New(f, "", log.LstdFlags)
}
var le string
if runtime.GOOS == "windows" {
le = "\r\n"
} else {
le = "\n"
}
pLog := &V8log{logger, openedfile, level, make(chan bool, 10), logdir, &sync.Mutex{}, false, le, os.Getpid(), "", maxLogSize, mode}
go pLog.checkFile()
return pLog
}
// UninitLogger ...
func (vl *V8log) UninitLogger() {
if !vl.stoped {
vl.stoped = true
close(vl.stopSig)
for l := range logFileNames {
if l >= vl.llevel {
vl.files[l].Close()
}
}
}
}
func (vl *V8log) checkFile() {
if vl.maxLogSize <= 0 {
return
}
ticker := time.NewTicker(time.Minute)
for {
select {
case <-ticker.C:
vl.mtx.Lock()
for l, logFile := range vl.files {
f, e := logFile.Stat()
if e != nil {
break
}
if f.Size() <= vl.maxLogSize {
break
}
logFile.Close()
fname := f.Name()
backupPath := vl.logDir + fname + ".0"
os.Remove(backupPath)
os.Rename(vl.logDir+fname, backupPath)
newFile, e := os.OpenFile(vl.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if e == nil {
vl.loggers[l].SetOutput(newFile)
vl.files[l] = newFile
}
}
vl.mtx.Unlock()
case <-vl.stopSig:
}
if vl.stoped {
break
}
}
}
// Printf Warning: report error log depends on this Print format.
func (vl *V8log) Printf(level LogLevel, format string, params ...interface{}) {
vl.mtx.Lock()
defer vl.mtx.Unlock()
if vl.stoped {
return
}
if level < vl.llevel {
return
}
if level == LevelERROR {
vl.lastError = fmt.Sprintf(format, params...)
}
pidAndLevel := []interface{}{vl.pid, loglevel[level]}
params = append(pidAndLevel, params...)
if vl.mode == LogFile || vl.mode == LogFileAndConsole {
vl.loggers[0].Printf("%d %s "+format+vl.lineEnding, params...)
}
if vl.mode == LogConsole || vl.mode == LogFileAndConsole {
log.Printf("%d %s "+format+vl.lineEnding, params...)
}
}
// Println ...
func (vl *V8log) Println(level LogLevel, params ...interface{}) {
vl.mtx.Lock()
defer vl.mtx.Unlock()
if vl.stoped {
return
}
if level < vl.llevel {
return
}
if level == LevelERROR {
vl.lastError = fmt.Sprint(params...)
}
pidAndLevel := []interface{}{vl.pid, " ", loglevel[level], " "}
params = append(pidAndLevel, params...)
params = append(params, vl.lineEnding)
vl.loggers[0].Print(params...)
if vl.mode == LogConsole || vl.mode == LogFileAndConsole {
log.Print(params...)
}
}
func (vl *V8log) getLastError() string {
vl.mtx.Lock()
defer vl.mtx.Unlock()
return vl.lastError
}

94
nat.go Normal file
View File

@@ -0,0 +1,94 @@
package main
import (
"encoding/json"
"fmt"
"math/rand"
"net"
"time"
)
func natTest(serverHost string, serverPort int, localPort int) (publicIP string, isPublicIP int, publicPort int, err error) {
conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", localPort))
if err != nil {
return "", 0, 0, err
}
defer conn.Close()
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", serverHost, serverPort))
if err != nil {
return "", 0, 0, err
}
// The connection can write data to the desired address.
msg, err := newMessage(MsgNATDetect, 0, &NatDetectReq{SrcPort: localPort, EchoPort: EchoPort})
_, err = conn.WriteTo(msg, dst)
if err != nil {
return "", 0, 0, err
}
deadline := time.Now().Add(NatTestTimeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
return "", 0, 0, err
}
buffer := make([]byte, 1024)
nRead, _, err := conn.ReadFrom(buffer)
if err != nil {
gLog.Println(LevelERROR, "NAT detect error:", err)
return "", 0, 0, err
}
natRsp := NatDetectRsp{}
err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
return natRsp.IP, natRsp.IsPublicIP, natRsp.Port, nil
}
func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, err error) {
// the random local port may be used by other.
go echo()
localPort := int(rand.Uint32()%10000 + 50000)
ip1, isPublicIP, port1, err := natTest(host, udp1, localPort)
gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port1)
if err != nil {
return "", 0, err
}
if isPublicIP == 1 {
return ip1, NATNone, nil
}
ip2, _, port2, err := natTest(host, udp2, localPort)
gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port2)
if err != nil {
return "", 0, err
}
if ip1 != ip2 {
return "", 0, fmt.Errorf("ip have changed, please retry again")
}
natType := NATSymmetric
if port1 == port2 {
natType = NATCone
}
//TODO: NATNone
return ip1, natType, nil
}
const (
UDPPort1 = 27182
UDPPort2 = 27183
EchoPort = 31415
)
func echo() {
conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: EchoPort})
if err != nil {
gLog.Println(LevelERROR, "echo server listen error:", err)
return
}
buf := make([]byte, 1600)
defer conn.Close()
// wait 5s for echo testing
conn.SetReadDeadline(time.Now().Add(time.Second * 5))
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
return
}
conn.WriteToUDP(buf[0:n], addr)
}

158
openp2p.go Normal file
View File

@@ -0,0 +1,158 @@
package main
import (
"flag"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
// TODO: install sub command, deamon process
// groups := flag.String("groups", "", "you could join in several groups. like: GroupName1:Password1;GroupName2:Password2; group name 8-31 characters")
if len(os.Args) > 1 {
switch os.Args[1] {
case "version", "-v", "--version":
fmt.Println(OpenP2PVersion)
return
case "update":
gLog = InitLogger(filepath.Dir(os.Args[0]), "openp2p", LevelDEBUG, 1024*1024, LogConsole)
update()
return
}
}
user := flag.String("user", "", "user name. 8-31 characters")
node := flag.String("node", "", "node name. 8-31 characters")
password := flag.String("password", "", "user password. 8-31 characters")
peerNode := flag.String("peernode", "", "peer node name that you want to connect")
peerUser := flag.String("peeruser", "", "peer node user (default peeruser=user)")
peerPassword := flag.String("peerpassword", "", "peer node password (default peerpassword=password)")
dstIP := flag.String("dstip", "127.0.0.1", "destination ip ")
serverHost := flag.String("serverhost", "openp2p.cn", "server host ")
// serverHost := flag.String("serverhost", "127.0.0.1", "server host ") // for debug
dstPort := flag.Int("dstport", 0, "destination port ")
srcPort := flag.Int("srcport", 0, "source port ")
protocol := flag.String("protocol", "tcp", "tcp or udp")
noShare := flag.Bool("noshare", false, "disable using the huge numbers of shared nodes in OpenP2P network, your connectivity will be weak. also this node will not shared with others")
shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private node no limit")
configFile := flag.Bool("f", false, "config file")
daemonMode := flag.Bool("d", false, "daemonMode")
byDaemon := flag.Bool("bydaemon", false, "start by daemon")
logLevel := flag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
flag.Parse()
gLog = InitLogger(filepath.Dir(os.Args[0]), "openp2p", LogLevel(*logLevel), 1024*1024, LogConsole)
gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion)
if *daemonMode {
d := daemon{}
d.run()
return
}
if !*configFile {
// validate cmd params
if *node == "" {
gLog.Println(LevelERROR, "node name not set", os.Args, len(os.Args), os.Args[0])
return
}
if *user == "" {
gLog.Println(LevelERROR, "user name not set")
return
}
if *password == "" {
gLog.Println(LevelERROR, "password not set")
return
}
if *peerNode != "" {
if *dstPort == 0 {
gLog.Println(LevelERROR, "dstPort not set")
return
}
if *srcPort == 0 {
gLog.Println(LevelERROR, "srcPort not set")
return
}
}
}
config := AppConfig{}
config.PeerNode = *peerNode
config.PeerUser = *peerUser
config.PeerPassword = *peerPassword
config.DstHost = *dstIP
config.DstPort = *dstPort
config.SrcPort = *srcPort
config.Protocol = *protocol
gLog.Println(LevelINFO, config)
if *configFile {
if err := gConf.load(); err != nil {
gLog.Println(LevelERROR, "load config error. exit.")
return
}
} else {
gConf.add(config)
gConf.Network = NetworkConfig{
Node: *node,
User: *user,
Password: *password,
NoShare: *noShare,
ServerHost: *serverHost,
ServerPort: 27182,
UDPPort1: 27182,
UDPPort2: 27183,
ipv6: "240e:3b7:621:def0:fda4:dd7f:36a1:2803", // TODO: detect real ipv6
shareBandwidth: *shareBandwidth,
}
}
// gConf.save() // not change config file
gConf.daemonMode = *byDaemon
gLog.Println(LevelINFO, gConf)
setFirewall()
network := P2PNetworkInstance(&gConf.Network)
if ok := network.Connect(30000); !ok {
gLog.Println(LevelERROR, "P2PNetwork login error")
return
}
for _, app := range gConf.Apps {
// set default peer user password
if app.PeerPassword == "" {
app.PeerPassword = gConf.Network.Password
}
if app.PeerUser == "" {
app.PeerUser = gConf.Network.User
}
err := network.AddApp(app)
if err != nil {
gLog.Println(LevelERROR, "addTunnel error")
}
}
// test
// go func() {
// time.Sleep(time.Second * 30)
// config := AppConfig{}
// config.PeerNode = *peerNode
// config.PeerUser = *peerUser
// config.PeerPassword = *peerPassword
// config.DstHost = *dstIP
// config.DstPort = *dstPort
// config.SrcPort = 32
// config.Protocol = *protocol
// network.AddApp(config)
// // time.Sleep(time.Second * 30)
// // network.DeleteTunnel(config)
// // time.Sleep(time.Second * 30)
// // network.DeleteTunnel(config)
// }()
// // TODO: http api
// api := ClientAPI{}
// go api.run()
gLog.Println(LevelINFO, "waiting for connection...")
forever := make(chan bool)
<-forever
}

85
overlaytcp.go Normal file
View File

@@ -0,0 +1,85 @@
package main
import (
"bytes"
"encoding/binary"
"net"
"time"
)
// implement io.Writer
type overlayTCP struct {
tunnel *P2PTunnel
conn net.Conn
id uint64
rtid uint64
running bool
isClient bool
appID uint64
appKey uint64
appKeyBytes []byte
}
func (otcp *overlayTCP) run() {
gLog.Printf(LevelINFO, "%d overlayTCP run start", otcp.id)
defer gLog.Printf(LevelINFO, "%d overlayTCP run end", otcp.id)
otcp.running = true
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, otcp.rtid)
binary.Write(tunnelHead, binary.LittleEndian, otcp.id)
for otcp.running && otcp.tunnel.isRuning() {
otcp.conn.SetReadDeadline(time.Now().Add(time.Second * 5))
dataLen, err := otcp.conn.Read(readBuf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
// overlay tcp connection normal close, debug log
gLog.Printf(LevelDEBUG, "overlayTCP %d read error:%s,close it", otcp.id, err)
break
} else {
payload := readBuf[:dataLen]
if otcp.appKey != 0 {
payload, _ = encryptBytes(otcp.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if otcp.rtid == 0 {
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
} else {
// write raley data
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
all = append(all, writeBytes...)
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", otcp.rtid, otcp.id, len(writeBytes))
}
}
}
otcp.conn.Close()
otcp.tunnel.overlayConns.Delete(otcp.id)
// notify peer disconnect
if otcp.isClient {
req := OverlayDisconnectReq{ID: otcp.id}
if otcp.rtid == 0 {
otcp.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
}
}
// calling by p2pTunnel
func (otcp *overlayTCP) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
n, err = otcp.conn.Write(buff)
if err != nil {
otcp.tunnel.overlayConns.Delete(otcp.id)
}
return
}

142
p2papp.go Normal file
View File

@@ -0,0 +1,142 @@
package main
import (
"bytes"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sync"
"time"
)
type p2pApp struct {
config AppConfig
listener net.Listener
tunnel *P2PTunnel
rtid uint64
hbTime time.Time
hbMtx sync.Mutex
running bool
id uint64
key uint64
wg sync.WaitGroup
}
func (app *p2pApp) isActive() bool {
if app.rtid == 0 { // direct mode app heartbeat equals to tunnel heartbeat
return app.tunnel.isActive()
}
// relay mode calc app heartbeat
app.hbMtx.Lock()
defer app.hbMtx.Unlock()
return time.Now().Before(app.hbTime.Add(TunnelIdleTimeout))
}
func (app *p2pApp) updateHeartbeat() {
app.hbMtx.Lock()
defer app.hbMtx.Unlock()
app.hbTime = time.Now()
}
func (app *p2pApp) listenTCP() error {
var err error
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
if err != nil {
gLog.Printf(LevelERROR, "listen error:%s", err)
return err
}
for {
conn, err := app.listener.Accept()
if err != nil {
gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err)
break
}
otcp := overlayTCP{
tunnel: app.tunnel,
conn: conn,
id: rand.Uint64(),
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
}
// calc key bytes for encrypt
if otcp.appKey != 0 {
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
otcp.appKeyBytes = encryptKey
}
app.tunnel.overlayConns.Store(otcp.id, &otcp)
gLog.Printf(LevelINFO, "Accept overlayID:%d", otcp.id)
// tell peer connect
req := OverlayConnectReq{ID: otcp.id,
User: app.config.PeerUser,
Password: app.config.PeerPassword,
DstIP: app.config.DstHost,
DstPort: app.config.DstPort,
Protocol: app.config.Protocol,
AppID: app.id,
}
if app.rtid == 0 {
app.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayConnectReq, &req)
} else {
req.RelayTunnelID = app.tunnel.id
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, app.rtid)
msg, _ := newMessage(MsgP2P, MsgOverlayConnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
go otcp.run()
}
return nil
}
func (app *p2pApp) listen() error {
gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
defer gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
app.wg.Add(1)
defer app.wg.Done()
app.running = true
if app.rtid != 0 {
go app.relayHeartbeatLoop()
}
for app.running {
if app.config.Protocol == "tcp" {
app.listenTCP()
}
time.Sleep(time.Second * 5)
// TODO: listen UDP
}
return nil
}
func (app *p2pApp) close() {
app.running = false
if app.listener != nil {
app.listener.Close()
}
app.tunnel.closeOverlayConns(app.id)
app.wg.Wait()
}
// TODO: many relay app on the same P2PTunnel will send a lot of relay heartbeat
func (app *p2pApp) relayHeartbeatLoop() {
app.wg.Add(1)
defer app.wg.Done()
gLog.Printf(LevelDEBUG, "relayHeartbeat to %d start", app.rtid)
defer gLog.Printf(LevelDEBUG, "relayHeartbeat to %d end", app.rtid)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, app.rtid)
req := RelayHeartbeat{RelayTunnelID: app.tunnel.id,
AppID: app.id}
msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
for app.tunnel.isRuning() && app.running {
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
time.Sleep(TunnelHeartbeatTime)
}
}

19
p2pappkeys.go Normal file
View File

@@ -0,0 +1,19 @@
package main
import (
"sync"
)
var p2pAppKeys sync.Map
func GetKey(appID uint64) uint64 {
i, ok := p2pAppKeys.Load(appID)
if !ok {
return 0
}
return i.(uint64)
}
func SaveKey(appID uint64, appKey uint64) {
p2pAppKeys.Store(appID, appKey)
}

17
p2pconn.go Normal file
View File

@@ -0,0 +1,17 @@
package main
import (
"time"
)
type p2pConn interface {
ReadMessage() (*openP2PHeader, []byte, error)
WriteBytes(uint16, uint16, []byte) error
WriteBuffer([]byte) error
WriteMessage(uint16, uint16, interface{}) error
Close() error
Accept() error
CloseListener()
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}

691
p2pnetwork.go Normal file
View File

@@ -0,0 +1,691 @@
package main
import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
var (
instance *P2PNetwork
once sync.Once
)
type P2PNetwork struct {
conn *websocket.Conn
online bool
running bool
restartCh chan bool
wg sync.WaitGroup
writeMtx sync.Mutex
serverTs uint64
// msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID
msgMapMtx sync.Mutex
config NetworkConfig
allTunnels sync.Map
apps sync.Map
limiter *BandwidthLimiter
}
func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
if instance == nil {
once.Do(func() {
instance = &P2PNetwork{
restartCh: make(chan bool, 2),
online: false,
running: true,
msgMap: make(map[uint64]chan []byte),
limiter: newBandwidthLimiter(config.shareBandwidth),
}
instance.msgMap[0] = make(chan []byte) // for gateway
if config != nil {
instance.config = *config
}
instance.init()
go instance.run()
})
}
return instance
}
func (pn *P2PNetwork) run() {
go pn.autoReconnectApp()
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
for pn.running {
select {
case <-heartbeatTimer.C: // TODO: deal with connect failed, no send hb
pn.write(MsgHeartbeat, 0, "")
case <-pn.restartCh:
pn.online = false
pn.wg.Wait() // wait read/write goroutine exited
time.Sleep(NetworkHeartbeatTime)
err := pn.init()
if err != nil {
gLog.Println(LevelERROR, "P2PNetwork init error:", err)
}
}
}
}
func (pn *P2PNetwork) Connect(timeout int) bool {
// waiting for login response
for i := 0; i < (timeout / 1000); i++ {
if pn.serverTs != 0 {
return true
}
time.Sleep(time.Second)
}
return false
}
func (pn *P2PNetwork) autoReconnectApp() {
gLog.Println(LevelINFO, "autoReconnectApp start")
retryApps := make([]AppConfig, 0)
for pn.running {
time.Sleep(time.Second)
if !pn.online {
continue
}
if len(retryApps) > 0 {
gLog.Printf(LevelINFO, "retryApps len=%d", len(retryApps))
thisRound := make([]AppConfig, 0)
for i := 0; i < len(retryApps); i++ {
// reset retryNum when running 15min continuously
delay := math.Exp(float64(retryApps[i].retryNum+1)/2) * 5
if delay > 1800 { // max delay 30min
delay = 1800
}
if retryApps[i].retryTime.Add(time.Minute * 15).Before(time.Now()) {
retryApps[i].retryNum = 0
}
retryApps[i].retryNum++
retryApps[i].retryTime = time.Now()
if retryApps[i].retryNum > MaxRetry {
gLog.Printf(LevelERROR, "app %s%d retry more than %d times, exit.", retryApps[i].Protocol, retryApps[i].SrcPort, MaxRetry)
continue
}
pn.DeleteApp(retryApps[i])
if err := pn.AddApp(retryApps[i]); err != nil {
gLog.Printf(LevelERROR, "AddApp %s%d error:%s", retryApps[i].Protocol, retryApps[i].SrcPort, err)
thisRound = append(thisRound, retryApps[i])
time.Sleep(RetryInterval)
}
}
retryApps = thisRound
}
pn.apps.Range(func(_, i interface{}) bool {
app := i.(*p2pApp)
if app.isActive() {
return true
}
gLog.Printf(LevelINFO, "detect app %s%d disconnect,last hb %s reconnecting...", app.config.Protocol, app.config.SrcPort, app.hbTime)
config := app.config
// clear peerinfo
config.peerConeNatPort = 0
config.peerIP = ""
config.peerNatType = 0
config.peerToken = 0
pn.DeleteApp(config)
retryApps = append(retryApps, config)
return true
})
}
gLog.Println(LevelINFO, "autoReconnectApp end")
}
func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint64) (*P2PTunnel, uint64, error) {
gLog.Printf(LevelINFO, "addRelayTunnel to %s start", config.PeerNode)
defer gLog.Printf(LevelINFO, "addRelayTunnel to %s end", config.PeerNode)
pn.write(MsgRelay, MsgRelayNodeReq, nil)
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10)
if head == nil {
return nil, 0, errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
gLog.Printf(LevelERROR, "MsgRelayNodeReq error")
return nil, 0, errors.New("MsgRelayNodeReq error")
}
gLog.Printf(LevelINFO, "got relay node:%s", rsp.RelayName)
relayConfig := config
relayConfig.PeerNode = rsp.RelayName
relayConfig.peerToken = rsp.RelayToken
t, err := pn.addDirectTunnel(relayConfig, 0)
if err != nil {
gLog.Println(LevelERROR, "direct connect error:", err)
return nil, 0, err
}
// notify peer addRelayTunnel
req := AddRelayTunnelReq{
From: pn.config.Node,
RelayName: rsp.RelayName,
RelayToken: rsp.RelayToken,
AppID: appid,
AppKey: appkey,
}
gLog.Printf(LevelINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName)
pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req)
// wait relay ready
head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value
if head == nil {
gLog.Printf(LevelERROR, "read MsgPushAddRelayTunnelRsp error")
return nil, 0, errors.New("read MsgPushAddRelayTunnelRsp error")
}
rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error")
}
return t, rspID.ID, err
}
func (pn *P2PNetwork) AddApp(config AppConfig) error {
gLog.Printf(LevelINFO, "addApp %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LevelINFO, "addApp %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
if !pn.online {
return errors.New("P2PNetwork offline")
}
// check if app already exist?
appExist := false
pn.apps.Range(func(_, i interface{}) bool {
app := i.(*p2pApp)
if app.config.Protocol == config.Protocol && app.config.SrcPort == config.SrcPort {
appExist = true
return false
}
return true
})
if appExist {
return errors.New("P2PApp already exist")
}
appID := rand.Uint64()
appKey := uint64(0)
t, err := pn.addDirectTunnel(config, 0)
var rtid uint64
relayNode := ""
peerNatType := 100
peerIP := ""
errMsg := ""
if err != nil && err == ErrorHandshake {
gLog.Println(LevelERROR, "direct connect failed, try to relay")
appKey = rand.Uint64()
t, rtid, err = pn.addRelayTunnel(config, appID, appKey)
if t != nil {
relayNode = t.config.PeerNode
}
}
if t != nil {
peerNatType = t.config.peerNatType
peerIP = t.config.peerIP
}
if err != nil {
errMsg = err.Error()
}
req := ReportConnect{
Error: errMsg,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
NatType: pn.config.natType,
PeerNode: config.PeerNode,
DstPort: config.DstPort,
DstHost: config.DstHost,
PeerUser: config.PeerUser,
PeerNatType: peerNatType,
PeerIP: peerIP,
ShareBandwidth: pn.config.shareBandwidth,
RelayNode: relayNode,
Version: OpenP2PVersion,
}
pn.write(MsgReport, MsgReportConnect, &req)
if err != nil {
return err
}
app := p2pApp{
id: appID,
key: appKey,
tunnel: t,
config: config,
rtid: rtid,
hbTime: time.Now()}
pn.apps.Store(appID, &app)
go app.listen()
return err
}
func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LevelINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort)
defer gLog.Printf(LevelINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort)
// close the apps of this config
pn.apps.Range(func(_, i interface{}) bool {
app := i.(*p2pApp)
if app.config.Protocol == config.Protocol && app.config.SrcPort == config.SrcPort {
gLog.Printf(LevelINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
app := i.(*p2pApp)
app.close()
pn.apps.Delete(app.id)
return false
}
return true
})
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) {
gLog.Printf(LevelINFO, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LevelINFO, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
isClient := false
// client side tid=0, assign random uint64
if tid == 0 {
tid = rand.Uint64()
isClient = true
}
exist := false
// find existing tunnel to peer
var t *P2PTunnel
pn.allTunnels.Range(func(id, i interface{}) bool {
t = i.(*P2PTunnel)
if t.config.PeerNode == config.PeerNode {
// server side force close existing tunnel
if !isClient {
t.close()
return false
}
// client side checking
gLog.Println(LevelINFO, "tunnel already exist ", config.PeerNode)
isActive := t.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LevelINFO, "but it's not active, close it ", config.PeerNode)
t.close()
} else {
// active
exist = true
}
return false
}
return true
})
// create tunnel if not exist
if !exist {
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50)
pn.msgMapMtx.Unlock()
t.init()
if isClient {
if err := t.connect(); err != nil {
gLog.Println(LevelERROR, "p2pTunnel connect error:", err)
return t, err
}
} else {
rsp := PushConnectRsp{
Error: 0,
Detail: "connect ok",
To: t.config.PeerNode,
From: pn.config.Node,
NatType: pn.config.natType,
FromIP: pn.config.publicIP,
ConeNatPort: t.coneNatPort,
ID: t.id}
t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp)
if err := t.listen(); err != nil {
gLog.Println(LevelERROR, "p2pTunnel listen error:", err)
return t, err
}
}
}
// store it when success
gLog.Printf(LevelDEBUG, "store tunnel %d", tid)
pn.allTunnels.Store(tid, t)
return t, nil
}
func (pn *P2PNetwork) init() error {
gLog.Println(LevelINFO, "init start")
var err error
for {
pn.config.hostName, err = os.Hostname()
if err != nil {
break
}
// detect nat type
pn.config.publicIP, pn.config.natType, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2)
// TODO rm test s2s
if pn.config.Node == "hhd1207-222S2S" {
pn.config.natType = NATSymmetric
}
if err != nil {
gLog.Println(LevelINFO, "detect NAT type error:", err)
break
}
gLog.Println(LevelINFO, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
forwardPath := "/openp2p/v1/login"
config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: forwardPath}
q := u.Query()
q.Add("node", pn.config.Node)
q.Add("user", pn.config.User)
q.Add("password", pn.config.Password)
q.Add("version", OpenP2PVersion)
q.Add("nattype", fmt.Sprintf("%d", pn.config.natType))
q.Add("timestamp", fmt.Sprintf("%d", time.Now().Unix()))
noShareStr := "false"
if pn.config.NoShare {
noShareStr = "true"
}
q.Add("noshare", noShareStr)
u.RawQuery = q.Encode()
var ws *websocket.Conn
ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
break
}
pn.online = true
pn.conn = ws
localAddr := strings.Split(ws.LocalAddr().String(), ":")
if len(localAddr) == 2 {
pn.config.localIP = localAddr[0]
} else {
err = errors.New("get local ip failed")
break
}
go pn.readLoop()
pn.config.mac = getmac(pn.config.localIP)
pn.config.os = getOsName()
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
IPv6: pn.config.ipv6,
Version: OpenP2PVersion,
}
pn.write(MsgReport, MsgReportBasic, &req)
gLog.Println(LevelINFO, "P2PNetwork init ok")
break
}
if err != nil {
// init failed, retry
pn.restartCh <- true
gLog.Println(LevelERROR, "P2PNetwork init error:", err)
}
return err
}
func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
head := openP2PHeader{}
err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, &head)
if err != nil {
gLog.Println(LevelERROR, "handleMessage error:", err)
return
}
switch head.MainType {
case MsgLogin:
// gLog.Println(LevelINFO,string(msg))
rsp := LoginRsp{}
err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
gLog.Printf(LevelERROR, "wrong login response:%s", err)
return
}
if rsp.Error != 0 {
gLog.Printf(LevelERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail)
pn.running = false
} else {
gLog.Printf(LevelINFO, "login ok. Server ts=%d, local ts=%d", rsp.Ts, time.Now().Unix())
pn.serverTs = rsp.Ts
}
case MsgHeartbeat:
gLog.Printf(LevelDEBUG, "P2PNetwork heartbeat ok")
case MsgPush:
pn.handlePush(head.SubType, msg)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[0]
pn.msgMapMtx.Unlock()
ch <- msg
return
}
}
func (pn *P2PNetwork) readLoop() {
gLog.Printf(LevelINFO, "P2PNetwork readLoop start")
pn.wg.Add(1)
defer pn.wg.Done()
for pn.running {
pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second))
t, msg, err := pn.conn.ReadMessage()
if err != nil {
gLog.Printf(LevelERROR, "P2PNetwork read error:%s", err)
pn.conn.Close()
pn.restartCh <- true
break
}
pn.handleMessage(t, msg)
}
gLog.Printf(LevelINFO, "P2PNetwork readLoop end")
}
func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) error {
if !pn.online {
return errors.New("P2P network offline")
}
msg, err := newMessage(mainType, subType, packet)
if err != nil {
return err
}
pn.writeMtx.Lock()
defer pn.writeMtx.Unlock()
if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
gLog.Printf(LevelERROR, "write msgType %d,%d error:%s", mainType, subType, err)
pn.conn.Close()
}
return err
}
func (pn *P2PNetwork) relay(to uint64, body []byte) error {
gLog.Printf(LevelDEBUG, "relay data to %d", to)
i, ok := pn.allTunnels.Load(to)
if !ok {
return nil
}
tunnel := i.(*P2PTunnel)
if tunnel.config.shareBandwidth > 0 {
pn.limiter.Add(len(body))
}
tunnel.conn.WriteBuffer(body)
return nil
}
func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error {
gLog.Printf(LevelDEBUG, "push msgType %d to %s", subType, to)
if !pn.online {
return errors.New("client offline")
}
pushHead := PushHeader{}
pushHead.From = nodeNameToID(pn.config.Node)
pushHead.To = nodeNameToID(to)
pushHeadBuf := new(bytes.Buffer)
err := binary.Write(pushHeadBuf, binary.LittleEndian, pushHead)
if err != nil {
return err
}
data, err := json.Marshal(packet)
if err != nil {
return err
}
// gLog.Println(LevelINFO,"write packet:", string(data))
pushMsg := append(encodeHeader(MsgPush, subType, uint32(len(data)+PushHeaderSize)), pushHeadBuf.Bytes()...)
pushMsg = append(pushMsg, data...)
pn.writeMtx.Lock()
defer pn.writeMtx.Unlock()
if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil {
gLog.Printf(LevelERROR, "push to %s error:%s", to, err)
pn.conn.Close()
}
return err
}
func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout time.Duration) (head *openP2PHeader, body []byte) {
var nodeID uint64
if node == "" {
nodeID = 0
} else {
nodeID = nodeNameToID(node)
}
for {
select {
case <-time.After(timeout):
gLog.Printf(LevelERROR, "wait msg%d:%d timeout", mainType, subType)
return
case msg := <-pn.msgMap[nodeID]:
head = &openP2PHeader{}
err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, head)
if err != nil {
gLog.Println(LevelERROR, "read msg error:", err)
break
}
if head.MainType != mainType || head.SubType != subType {
continue
}
if mainType == MsgPush {
body = msg[openP2PHeaderSize+PushHeaderSize:]
} else {
body = msg[openP2PHeaderSize:]
}
return
}
}
}
func (pn *P2PNetwork) handlePush(subType uint16, msg []byte) error {
pushHead := PushHeader{}
err := binary.Read(bytes.NewReader(msg[openP2PHeaderSize:openP2PHeaderSize+PushHeaderSize]), binary.LittleEndian, &pushHead)
if err != nil {
return err
}
gLog.Printf(LevelDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq:
req := PushConnectReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LevelINFO, "%s is connecting...", req.From)
gLog.Println(LevelDEBUG, "push connect response to ", req.From)
// verify token or name&password
if VerifyTOTP(req.Token, pn.config.User, pn.config.Password, time.Now().Unix()) || (req.User == pn.config.User && req.Password == pn.config.Password) {
gLog.Printf(LevelINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
// share relay node will limit bandwidth
if req.User != pn.config.User || req.Password != pn.config.Password {
gLog.Printf(LevelINFO, "set share bandwidth %d mbps", pn.config.shareBandwidth)
config.shareBandwidth = pn.config.shareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
break
}
gLog.Println(LevelERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
case MsgPushRsp:
rsp := PushRsp{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
gLog.Printf(LevelERROR, "wrong pushRsp:%s", err)
return err
}
if rsp.Error == 0 {
gLog.Printf(LevelDEBUG, "push ok, detail:%s", rsp.Detail)
} else {
gLog.Printf(LevelERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail)
}
case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err)
return err
}
config := AppConfig{}
config.PeerNode = req.RelayName
config.peerToken = req.RelayToken
// set user password, maybe the relay node is your private node
config.PeerUser = pn.config.User
config.PeerPassword = pn.config.Password
go func(r AddRelayTunnelReq) {
t, errDt := pn.addDirectTunnel(config, 0)
if errDt == nil {
// notify peer relay ready
msg := TunnelMsg{ID: t.id}
pn.push(r.From, MsgPushAddRelayTunnelRsp, msg)
SaveKey(req.AppID, req.AppKey)
}
}(req)
case MsgPushUpdate:
update()
if gConf.daemonMode {
os.Exit(0)
}
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
}
return nil
}
func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) {
pn.apps.Range(func(id, i interface{}) bool {
key := id.(uint64)
if key != appID {
return true
}
app := i.(*p2pApp)
app.updateHeartbeat()
return false
})
}

418
p2ptunnel.go Normal file
View File

@@ -0,0 +1,418 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
)
type P2PTunnel struct {
pn *P2PNetwork
conn p2pConn
hbTime time.Time
hbMtx sync.Mutex
hbTimeRelay time.Time
config AppConfig
la *net.UDPAddr // local hole address
ra *net.UDPAddr // remote hole address
overlayConns sync.Map // both TCP and UDP
id uint64
running bool
runMtx sync.Mutex
isServer bool // 0:server 1:client
coneLocalPort int
coneNatPort int
}
func (t *P2PTunnel) init() {
t.running = true
t.hbMtx.Lock()
t.hbTime = time.Now()
t.hbMtx.Unlock()
t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time
localPort := int(rand.Uint32()%10000 + 50000)
if t.pn.config.natType == NATCone {
// prepare one random cone hole
_, _, port1, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort)
t.coneLocalPort = localPort
t.coneNatPort = port1
t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort}
} else {
t.coneLocalPort = localPort
t.coneNatPort = localPort // NATNONE or symmetric doesn't need coneNatPort
t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort}
}
gLog.Printf(LevelDEBUG, "prepare punching port %d:%d", t.coneLocalPort, t.coneNatPort)
}
func (t *P2PTunnel) connect() error {
gLog.Printf(LevelINFO, "start p2pTunnel to %s ", t.config.PeerNode)
t.isServer = false
req := PushConnectReq{
User: t.config.PeerUser,
Password: t.config.PeerPassword,
Token: t.config.peerToken,
From: t.pn.config.Node,
FromIP: t.pn.config.publicIP,
ConeNatPort: t.coneNatPort,
NatType: t.pn.config.natType,
ID: t.id}
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10)
if head == nil {
return errors.New("connect error")
}
rsp := PushConnectRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LevelERROR, "wrong MsgPushConnectRsp:%s", err)
return err
}
// gLog.Println(LevelINFO, rsp)
if rsp.Error != 0 {
return errors.New(rsp.Detail)
}
t.config.peerNatType = int(rsp.NatType)
t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP
err = t.handshake()
if err != nil {
gLog.Println(LevelERROR, "handshake error:", err)
err = ErrorHandshake
}
return err
}
func (t *P2PTunnel) isRuning() bool {
t.runMtx.Lock()
defer t.runMtx.Unlock()
return t.running
}
func (t *P2PTunnel) setRun(running bool) {
t.runMtx.Lock()
defer t.runMtx.Unlock()
t.running = running
}
func (t *P2PTunnel) isActive() bool {
t.hbMtx.Lock()
defer t.hbMtx.Unlock()
return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout))
}
func (t *P2PTunnel) checkActive() bool {
hbt := time.Now()
t.hbMtx.Lock()
if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) {
t.hbMtx.Unlock()
return false
}
t.hbMtx.Unlock()
// hbtime within TunnelHeartbeatTime, check it now
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
isActive := false
// wait at most 5s
for i := 0; i < 50 && !isActive; i++ {
t.hbMtx.Lock()
if t.hbTime.After(hbt) {
isActive = true
}
t.hbMtx.Unlock()
time.Sleep(time.Millisecond * 100)
}
return isActive
}
// call when user delete tunnel
func (t *P2PTunnel) close() {
t.setRun(false)
t.pn.allTunnels.Delete(t.id)
}
func (t *P2PTunnel) handshake() error {
if t.config.peerConeNatPort > 0 {
var err error
t.ra, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort))
if err != nil {
return err
}
}
gLog.Println(LevelINFO, "handshake to ", t.config.PeerNode)
var err error
// TODO: handle NATNone, nodes with public ip has no punching
if (t.pn.config.natType == NATCone && t.config.peerNatType == NATCone) || (t.pn.config.natType == NATNone || t.config.peerNatType == NATNone) {
err = handshakeC2C(t)
} else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATSymmetric {
err = ErrorS2S
t.close()
} else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATCone {
err = handshakeC2S(t)
} else if t.config.peerNatType == NATCone && t.pn.config.natType == NATSymmetric {
err = handshakeS2C(t)
} else {
return errors.New("unknown error")
}
if err != nil {
gLog.Println(LevelERROR, "punch handshake error:", err)
return err
}
gLog.Printf(LevelINFO, "handshake to %s ok", t.config.PeerNode)
err = t.run()
if err != nil {
gLog.Println(LevelERROR, err)
return err
}
return nil
}
func (t *P2PTunnel) run() error {
if t.isServer {
qConn, e := listenQuic(t.la.String(), TunnelIdleTimeout)
if e != nil {
gLog.Println(LevelINFO, "listen quic error:", e, ", retry...")
time.Sleep(time.Millisecond * 10)
qConn, e = listenQuic(t.la.String(), TunnelIdleTimeout)
if e != nil {
return fmt.Errorf("listen quic error:%s", e)
}
}
t.pn.push(t.config.PeerNode, MsgPushQuicConnect, nil)
e = qConn.Accept()
if e != nil {
qConn.CloseListener()
return fmt.Errorf("accept quic error:%s", e)
}
_, buff, err := qConn.ReadMessage()
if e != nil {
qConn.listener.Close()
return fmt.Errorf("read start msg error:%s", err)
}
if buff != nil {
gLog.Println(LevelDEBUG, string(buff))
}
qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2"))
gLog.Println(LevelINFO, "quic connection ok")
t.conn = qConn
t.setRun(true)
go t.readLoop()
go t.writeLoop()
return nil
}
//else
conn, e := net.ListenUDP("udp", t.la)
if e != nil {
time.Sleep(time.Millisecond * 10)
conn, e = net.ListenUDP("udp", t.la)
if e != nil {
return fmt.Errorf("quic listen error:%s", e)
}
}
t.pn.read(t.config.PeerNode, MsgPush, MsgPushQuicConnect, time.Second*5)
gLog.Println(LevelINFO, "quic dial to ", t.ra.String())
qConn, e := dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil {
return fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e)
}
handshakeBegin := time.Now()
qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello"))
_, buff, err := qConn.ReadMessage()
if e != nil {
qConn.listener.Close()
return fmt.Errorf("read MsgTunnelHandshake error:%s", err)
}
if buff != nil {
gLog.Println(LevelDEBUG, string(buff))
}
gLog.Println(LevelINFO, "rtt=", time.Since(handshakeBegin))
gLog.Println(LevelINFO, "quic connection ok")
t.conn = qConn
t.setRun(true)
go t.readLoop()
go t.writeLoop()
return nil
}
func (t *P2PTunnel) readLoop() {
decryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
gLog.Printf(LevelINFO, "%d tunnel readloop start", t.id)
for t.isRuning() {
t.conn.SetReadDeadline(time.Now().Add(TunnelIdleTimeout))
head, body, err := t.conn.ReadMessage()
if err != nil {
if t.isRuning() {
gLog.Printf(LevelERROR, "%d tunnel read error:%s", t.id, err)
}
break
}
if head.MainType != MsgP2P {
continue
}
switch head.SubType {
case MsgTunnelHeartbeat:
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil)
gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat", t.id)
case MsgTunnelHeartbeatAck:
t.hbMtx.Lock()
t.hbTime = time.Now()
t.hbMtx.Unlock()
gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat ack", t.id)
case MsgOverlayData:
if len(body) < overlayHeaderSize {
continue
}
overlayID := binary.LittleEndian.Uint64(body[:8])
gLog.Printf(LevelDEBUG, "%d tunnel read overlay data %d", t.id, overlayID)
s, ok := t.overlayConns.Load(overlayID)
if !ok {
// debug level, when overlay connection closed, always has some packet not found tunnel
gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID)
continue
}
overlayConn, ok := s.(*overlayTCP)
if !ok {
continue
}
payload := body[overlayHeaderSize:]
var err error
if overlayConn.appKey != 0 {
payload, _ = decryptBytes(overlayConn.appKeyBytes, decryptData, body[overlayHeaderSize:], int(head.DataLen-uint32(overlayHeaderSize)))
}
_, err = overlayConn.Write(payload)
if err != nil {
gLog.Println(LevelERROR, "overlay write error:", err)
}
case MsgRelayData:
gLog.Printf(LevelDEBUG, "got relay data datalen=%d", head.DataLen)
if len(body) < 8 {
continue
}
tunnelID := binary.LittleEndian.Uint64(body[:8])
t.pn.relay(tunnelID, body[8:])
case MsgRelayHeartbeat:
req := RelayHeartbeat{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err)
continue
}
gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID)
msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
t.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
case MsgRelayHeartbeatAck:
req := RelayHeartbeat{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err)
continue
}
gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID)
t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq:
req := OverlayConnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong MsgOverlayConnectReq:%s", err)
continue
}
// app connect only accept user/password, avoid someone using the share relay node's token
if req.User != t.pn.config.User || req.Password != t.pn.config.Password {
gLog.Println(LevelERROR, "Access Denied:", req.User)
continue
}
overlayID := req.ID
gLog.Printf(LevelINFO, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req)
if req.Protocol == "tcp" {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
if err != nil {
gLog.Println(LevelERROR, err)
continue
}
otcp := overlayTCP{
tunnel: t,
conn: conn,
id: overlayID,
isClient: false,
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
}
// calc key bytes for encrypt
if otcp.appKey != 0 {
encryptKey := make([]byte, 16)
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
otcp.appKeyBytes = encryptKey
}
t.overlayConns.Store(otcp.id, &otcp)
go otcp.run()
}
case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LevelERROR, "wrong OverlayDisconnectRequest:%s", err)
continue
}
overlayID := req.ID
gLog.Printf(LevelINFO, "%d disconnect overlay connection %d", t.id, overlayID)
i, ok := t.overlayConns.Load(overlayID)
if ok {
otcp := i.(*overlayTCP)
otcp.running = false
}
default:
}
}
t.setRun(false)
t.conn.Close()
gLog.Printf(LevelINFO, "%d tunnel readloop end", t.id)
}
func (t *P2PTunnel) writeLoop() {
tc := time.NewTicker(TunnelHeartbeatTime)
defer tc.Stop()
defer gLog.Printf(LevelINFO, "%d tunnel writeloop end", t.id)
for t.isRuning() {
select {
case <-tc.C:
// tunnel send
err := t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
if err != nil {
gLog.Printf(LevelERROR, "%d write tunnel heartbeat error %s", t.id, err)
t.setRun(false)
return
}
gLog.Printf(LevelDEBUG, "%d write tunnel heartbeat ok", t.id)
}
}
}
func (t *P2PTunnel) listen() error {
gLog.Printf(LevelINFO, "p2ptunnel wait for connecting")
t.isServer = true
return t.handshake()
}
func (t *P2PTunnel) closeOverlayConns(appID uint64) {
t.overlayConns.Range(func(_, i interface{}) bool {
otcp := i.(*overlayTCP)
if otcp.appID == appID {
otcp.conn.Close()
}
return true
})
}

280
protocol.go Normal file
View File

@@ -0,0 +1,280 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"hash/crc64"
"time"
)
const OpenP2PVersion = "0.95.5"
const ProducnName string = "openp2p"
type openP2PHeader struct {
DataLen uint32
MainType uint16
SubType uint16
}
var openP2PHeaderSize = binary.Size(openP2PHeader{})
type PushHeader struct {
From uint64
To uint64
}
var PushHeaderSize = binary.Size(PushHeader{})
type overlayHeader struct {
id uint64
}
var overlayHeaderSize = binary.Size(overlayHeader{})
func decodeHeader(data []byte) (*openP2PHeader, error) {
head := openP2PHeader{}
rd := bytes.NewReader(data)
err := binary.Read(rd, binary.LittleEndian, &head)
if err != nil {
return nil, err
}
return &head, nil
}
func encodeHeader(mainType uint16, subType uint16, len uint32) []byte {
head := openP2PHeader{
len,
mainType,
subType,
}
headBuf := new(bytes.Buffer)
err := binary.Write(headBuf, binary.LittleEndian, head)
if err != nil {
return []byte("")
}
return headBuf.Bytes()
}
// Message type
const (
MsgLogin = 0
MsgHeartbeat = 1
MsgNATDetect = 2
MsgPush = 3
MsgP2P = 4
MsgRelay = 5
MsgReport = 6
)
const (
MsgPushRsp = 0
MsgPushConnectReq = 1
MsgPushConnectRsp = 2
MsgPushHandshakeStart = 3
MsgPushAddRelayTunnelReq = 4
MsgPushAddRelayTunnelRsp = 5
MsgPushUpdate = 6
MsgPushReportApps = 7
MsgPushQuicConnect = 8
)
// MsgP2P sub type message
const (
MsgPunchHandshake = iota
MsgPunchHandshakeAck
MsgTunnelHandshake
MsgTunnelHandshakeAck
MsgTunnelHeartbeat
MsgTunnelHeartbeatAck
MsgOverlayConnectReq
MsgOverlayConnectRsp
MsgOverlayDisconnectReq
MsgOverlayData
MsgRelayData
MsgRelayHeartbeat
MsgRelayHeartbeatAck
)
// MsgRelay sub type message
const (
MsgRelayNodeReq = iota
MsgRelayNodeRsp
)
// MsgReport sub type message
const (
MsgReportBasic = iota
MsgReportQuery
MsgReportConnect
)
const (
ReadBuffLen = 1024
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15
TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond
SymmetricHandshakeAckTimeout = time.Second * 11
PeerAddRelayTimeount = time.Second * 20
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 5
NatTestTimeout = time.Second * 10
)
// error message
var (
// ErrorS2S string = "s2s is not supported"
// ErrorHandshake string = "handshake error"
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
)
// NATNone has public ip
const (
NATNone = 0
NATCone = 1
NATSymmetric = 2
)
func newMessage(mainType uint16, subType uint16, packet interface{}) ([]byte, error) {
data, err := json.Marshal(packet)
if err != nil {
return nil, err
}
// gLog.Println(LevelINFO,"write packet:", string(data))
head := openP2PHeader{
uint32(len(data)),
mainType,
subType,
}
headBuf := new(bytes.Buffer)
err = binary.Write(headBuf, binary.LittleEndian, head)
if err != nil {
return nil, err
}
writeBytes := append(headBuf.Bytes(), data...)
return writeBytes, nil
}
func nodeNameToID(name string) uint64 {
return crc64.Checksum([]byte(name), crc64.MakeTable(crc64.ISO))
}
type PushConnectReq struct {
From string `json:"from,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Token uint64 `json:"token,omitempty"`
ConeNatPort int `json:"coneNatPort,omitempty"`
NatType int `json:"natType,omitempty"`
FromIP string `json:"fromIP,omitempty"`
ID uint64 `json:"id,omitempty"`
}
type PushConnectRsp struct {
Error int `json:"error,omitempty"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"`
Detail string `json:"detail,omitempty"`
NatType int `json:"natType,omitempty"`
ConeNatPort int `json:"coneNatPort,omitempty"`
FromIP string `json:"fromIP,omitempty"`
ID uint64 `json:"id,omitempty"`
}
type PushRsp struct {
Error int `json:"error,omitempty"`
Detail string `json:"detail,omitempty"`
}
type LoginRsp struct {
Error int `json:"error,omitempty"`
Detail string `json:"detail,omitempty"`
Ts uint64 `json:"ts,omitempty"`
}
type NatDetectReq struct {
SrcPort int `json:"srcPort,omitempty"`
EchoPort int `json:"echoPort,omitempty"`
}
type NatDetectRsp struct {
IP string `json:"IP,omitempty"`
Port int `json:"port,omitempty"`
IsPublicIP int `json:"isPublicIP,omitempty"`
}
type P2PHandshakeReq struct {
ID uint64 `json:"id,omitempty"`
}
type OverlayConnectReq struct {
ID uint64 `json:"id,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
DstIP string `json:"dstIP,omitempty"`
DstPort int `json:"dstPort,omitempty"`
Protocol string `json:"protocol,omitempty"`
RelayTunnelID uint64 `json:"relayTunnelID,omitempty"` // if not 0 relay
AppID uint64 `json:"appID,omitempty"`
}
type OverlayDisconnectReq struct {
ID uint64 `json:"id,omitempty"`
}
type TunnelMsg struct {
ID uint64 `json:"id,omitempty"`
}
type RelayNodeRsp struct {
RelayName string `json:"relayName,omitempty"`
RelayToken uint64 `json:"relayToken,omitempty"`
}
type AddRelayTunnelReq struct {
From string `json:"from,omitempty"`
RelayName string `json:"relayName,omitempty"`
RelayToken uint64 `json:"relayToken,omitempty"`
AppID uint64 `json:"appID,omitempty"`
AppKey uint64 `json:"appKey,omitempty"`
}
type RelayHeartbeat struct {
RelayTunnelID uint64 `json:"relayTunnelID,omitempty"`
AppID uint64 `json:"appID,omitempty"`
}
type ReportBasic struct {
OS string `json:"os,omitempty"`
Mac string `json:"mac,omitempty"`
LanIP string `json:"lanIP,omitempty"`
IPv6 string `json:"IPv6,omitempty"`
Version string `json:"version,omitempty"`
}
type ReportConnect struct {
Error string `json:"error,omitempty"`
Protocol string `json:"protocol,omitempty"`
SrcPort int `json:"srcPort,omitempty"`
NatType int `json:"natType,omitempty"`
PeerNode string `json:"peerNode,omitempty"`
DstPort int `json:"dstPort,omitempty"`
DstHost string `json:"dsdtHost,omitempty"`
PeerUser string `json:"peerUser,omitempty"`
PeerNatType int `json:"peerNatType,omitempty"`
PeerIP string `json:"peerIP,omitempty"`
ShareBandwidth int `json:"shareBandWidth,omitempty"`
RelayNode string `json:"relayNode,omitempty"`
Version string `json:"version,omitempty"`
}
type UpdateInfo struct {
Error int `json:"error,omitempty"`
ErrorDetail string `json:"errorDetail,omitempty"`
Url string `json:"url,omitempty"`
}

151
quic.go Normal file
View File

@@ -0,0 +1,151 @@
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"math/big"
"net"
"sync"
"time"
"github.com/lucas-clemente/quic-go"
)
//quic.DialContext do not support version 44,disable it
var quicVersion []quic.VersionNumber
type quicConn struct {
listener quic.Listener
writeMtx *sync.Mutex
quic.Stream
quic.Session
}
func (conn *quicConn) ReadMessage() (*openP2PHeader, []byte, error) {
headBuf := make([]byte, openP2PHeaderSize)
_, err := io.ReadFull(conn, headBuf)
if err != nil {
return nil, nil, err
}
head, err := decodeHeader(headBuf)
if err != nil {
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)
_, err = io.ReadFull(conn, dataBuf)
return head, dataBuf, err
}
func (conn *quicConn) WriteBytes(mainType uint16, subType uint16, data []byte) error {
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
conn.writeMtx.Lock()
_, err := conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
}
func (conn *quicConn) WriteBuffer(data []byte) error {
conn.writeMtx.Lock()
_, err := conn.Write(data)
conn.writeMtx.Unlock()
return err
}
func (conn *quicConn) WriteMessage(mainType uint16, subType uint16, packet interface{}) error {
// TODO: call newMessage
data, err := json.Marshal(packet)
if err != nil {
return err
}
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
conn.writeMtx.Lock()
_, err = conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
}
func (conn *quicConn) Close() error {
conn.Stream.CancelRead(1)
conn.Session.CloseWithError(0, "")
return nil
}
func (conn *quicConn) CloseListener() {
if conn.listener != nil {
conn.listener.Close()
}
}
func (conn *quicConn) Accept() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
sess, err := conn.listener.Accept(ctx)
if err != nil {
return err
}
stream, err := sess.AcceptStream(context.Background())
if err != nil {
return err
}
conn.Stream = stream
conn.Session = sess
return nil
}
func listenQuic(addr string, idleTimeout time.Duration) (*quicConn, error) {
gLog.Println(LevelINFO, "quic listen on ", addr)
listener, err := quic.ListenAddr(addr, generateTLSConfig(),
&quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true})
if err != nil {
return nil, fmt.Errorf("quic.ListenAddr error:%s", err)
}
return &quicConn{listener: listener, writeMtx: &sync.Mutex{}}, nil
}
func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, idleTimeout time.Duration) (*quicConn, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"openp2pv1"},
}
session, err := quic.DialContext(context.Background(), conn, remoteAddr, conn.LocalAddr().String(), tlsConf,
&quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true})
if err != nil {
return nil, fmt.Errorf("quic.DialContext error:%s", err)
}
stream, err := session.OpenStreamSync(context.Background())
if err != nil {
return nil, fmt.Errorf("OpenStreamSync error:%s", err)
}
qConn := &quicConn{nil, &sync.Mutex{}, stream, session}
return qConn, nil
}
// Setup a bare-bones TLS config for the server
func generateTLSConfig() *tls.Config {
key, err := rsa.GenerateKey(rand.Reader, 1024)
if err != nil {
panic(err)
}
template := x509.Certificate{SerialNumber: big.NewInt(1)}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
panic(err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
panic(err)
}
return &tls.Config{
Certificates: []tls.Certificate{tlsCert},
NextProtos: []string{"openp2pv1"},
}
}

30
sysinfodarwin.go Normal file
View File

@@ -0,0 +1,30 @@
//go:build darwin
// +build darwin
package main
import (
"strings"
"syscall"
)
func getOsName() (osName string) {
output := execOutput("sw_vers", "-productVersion")
osName = "Mac OS X " + strings.TrimSpace(output)
return
}
func setRLimit() error {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Cur = 10240
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
return nil
}
func setFirewall() {
}

70
sysinfolinux.go Normal file
View File

@@ -0,0 +1,70 @@
//go:build linux
// +build linux
package main
import (
"bufio"
"bytes"
"io/ioutil"
"os"
"strings"
"syscall"
)
func getOsName() (osName string) {
var sysnamePath string
sysnamePath = "/etc/redhat-release"
_, err := os.Stat(sysnamePath)
if err != nil && os.IsNotExist(err) {
str := "PRETTY_NAME="
f, err := os.Open("/etc/os-release")
if err != nil && os.IsNotExist(err) {
str = "DISTRIB_ID="
f, err = os.Open("/etc/openwrt_release")
}
if err == nil {
buf := bufio.NewReader(f)
for {
line, err := buf.ReadString('\n')
if err == nil {
line = strings.TrimSpace(line)
pos := strings.Count(line, str)
if pos > 0 {
len1 := len([]rune(str)) + 1
rs := []rune(line)
osName = string(rs[len1 : (len(rs))-1])
break
}
} else {
break
}
}
}
} else {
buff, err := ioutil.ReadFile(sysnamePath)
if err == nil {
osName = string(bytes.TrimSpace(buff))
}
}
if osName == "" {
osName = "Linux"
}
return
}
func setRLimit() error {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 1024 * 1024
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
return nil
}
func setFirewall() {
}

51
sysinfowin.go Normal file
View File

@@ -0,0 +1,51 @@
//go:build windows
// +build windows
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"golang.org/x/sys/windows/registry"
)
func getOsName() (osName string) {
k, err := registry.OpenKey(registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Windows NT\CurrentVersion`, registry.QUERY_VALUE|registry.WOW64_64KEY)
if err != nil {
return
}
defer k.Close()
pn, _, err := k.GetStringValue("ProductName")
if err == nil {
osName = pn
}
return
}
func setRLimit() error {
return nil
}
func setFirewall() {
fullPath, err := filepath.Abs(os.Args[0])
if err != nil {
gLog.Println(LevelERROR, "add firewall error:", err)
return
}
isXP := false
osName := getOsName()
if strings.Contains(osName, "XP") || strings.Contains(osName, "2003") {
isXP = true
}
if isXP {
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall del allowedprogram "%s"`, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProducnName, fullPath)).Run()
} else { // win7 or later
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProducnName)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProducnName, fullPath)).Run()
}
}

30
totp.go Normal file
View File

@@ -0,0 +1,30 @@
// Time-based One-time Password
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
)
const TOTPStep = 30 // 30s
func GenTOTP(user string, password string, ts int64) uint64 {
step := ts / TOTPStep
mac := hmac.New(sha256.New, []byte(user+password))
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(step))
mac.Write(b)
num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8])
// fmt.Printf("%x\n", mac.Sum(nil))
return num
}
func VerifyTOTP(code uint64, user string, password string, ts int64) bool {
if code == 0 {
return false
}
if code == GenTOTP(user, password, ts) || code == GenTOTP(user, password, ts-TOTPStep) || code == GenTOTP(user, password, ts+TOTPStep) {
return true
}
return false
}

36
totp_test.go Normal file
View File

@@ -0,0 +1,36 @@
// Time-based One-time Password
package main
import (
"testing"
"time"
)
func TestTOTP(t *testing.T) {
for i := 0; i < 20; i++ {
ts := time.Now().Unix()
code := GenTOTP("testuser1", "testpassword1", ts)
t.Log(code)
if !VerifyTOTP(code, "testuser1", "testpassword1", ts) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, "testuser1", "testpassword1", ts-10) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, "testuser1", "testpassword1", ts+10) {
t.Error("TOTP error")
}
if VerifyTOTP(code, "testuser1", "testpassword1", ts+60) {
t.Error("TOTP error")
}
if VerifyTOTP(code, "testuser2", "testpassword1", ts+1) {
t.Error("TOTP error")
}
if VerifyTOTP(code, "testuser1", "testpassword2", ts+1) {
t.Error("TOTP error")
}
time.Sleep(time.Second)
t.Log("round", i, " ", ts, " test ok")
}
}

44
udp.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"bytes"
"encoding/binary"
"net"
"time"
)
func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16, packet interface{}) (len int, err error) {
msg, err := newMessage(mainType, subType, packet)
if err != nil {
return 0, err
}
if dst == nil {
return conn.Write(msg)
}
return conn.WriteTo(msg, dst)
}
func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
if timeout > 0 {
deadline := time.Now().Add(time.Millisecond * time.Duration(timeout))
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LevelERROR, "SetReadDeadline error")
return nil, nil, nil, 0, err
}
}
result = make([]byte, 1024)
len, ra, err = conn.ReadFrom(result)
if err != nil {
// gLog.Println(LevelDEBUG, "ReadFrom error")
return nil, nil, nil, 0, err
}
head = &openP2PHeader{}
err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head)
if err != nil {
gLog.Println(LevelERROR, "parse p2pheader error:", err)
return nil, nil, nil, 0, err
}
return
}

207
update.go Normal file
View File

@@ -0,0 +1,207 @@
package main
import (
"archive/tar"
"archive/zip"
"compress/gzip"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"runtime"
"time"
)
// type updateFileInfo struct {
// Name string `json:"name,omitempty"`
// RelativePath string `json:"relativePath,omitempty"`
// Length int64 `json:"length,omitempty"`
// URL string `json:"url,omitempty"`
// Hash string `json:"hash,omitempty"`
// }
func update() {
gLog.Println(LevelINFO, "update start")
defer gLog.Println(LevelINFO, "update end")
// TODO: download from gitee. save flow
c := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Second * 30,
}
goos := runtime.GOOS
goarch := runtime.GOARCH
rsp, err := c.Get(fmt.Sprintf("https://openp2p.cn:27182/api/v1/update?fromver=%s&os=%s&arch=%s", OpenP2PVersion, goos, goarch))
if err != nil {
gLog.Println(LevelERROR, "update:query update list failed:", err)
return
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
gLog.Println(LevelERROR, "get update info error:", rsp.Status)
return
}
rspBuf, err := ioutil.ReadAll(rsp.Body)
if err != nil {
gLog.Println(LevelERROR, "update:read update list failed:", err)
return
}
updateInfo := UpdateInfo{}
err = json.Unmarshal(rspBuf, &updateInfo)
if err != nil {
gLog.Println(LevelERROR, rspBuf, " update info decode error:", err)
return
}
if updateInfo.Error != 0 {
gLog.Println(LevelERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail)
return
}
os.MkdirAll("download", 0666)
err = updateFile(updateInfo.Url, "", "openp2p")
if err != nil {
gLog.Println(LevelERROR, "update: download failed:", err)
return
}
}
// todo rollback on error
func updateFile(url string, checksum string, dst string) error {
gLog.Println(LevelINFO, "download ", url)
tmpFile := filepath.Dir(os.Args[0]) + "/openp2p.tmp"
output, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0776)
if err != nil {
gLog.Printf(LevelERROR, "OpenFile %s error:%s", tmpFile, err)
return err
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
response, err := client.Get(url)
if err != nil {
gLog.Printf(LevelERROR, "download url %s error:%s", url, err)
output.Close()
return err
}
defer response.Body.Close()
n, err := io.Copy(output, response.Body)
if err != nil {
gLog.Printf(LevelERROR, "io.Copy error:%s", err)
output.Close()
return err
}
output.Sync()
output.Close()
gLog.Println(LevelINFO, "download ", url, " ok")
gLog.Printf(LevelINFO, "size: %d bytes", n)
err = os.Rename(os.Args[0], os.Args[0]+"0")
if err != nil && os.IsExist(err) {
gLog.Printf(LevelINFO, " rename %s error:%s", os.Args[0], err)
}
// extract
gLog.Println(LevelINFO, "extract files")
err = extract(filepath.Dir(os.Args[0]), tmpFile)
if err != nil {
gLog.Printf(LevelERROR, "extract error:%s. revert rename", err)
os.Rename(os.Args[0]+"0", os.Args[0])
return err
}
return nil
}
func extract(dst, src string) (err error) {
if runtime.GOOS == "windows" {
return unzip(dst, src)
} else {
return extractTgz(dst, src)
}
}
func unzip(dst, src string) (err error) {
archive, err := zip.OpenReader(src)
if err != nil {
return err
}
defer archive.Close()
for _, f := range archive.File {
filePath := filepath.Join(dst, f.Name)
fmt.Println("unzipping file ", filePath)
// if !strings.HasPrefix(filePath, filepath.Clean(dst)+string(os.PathSeparator)) {
// fmt.Println("invalid file path")
// return
// }
if f.FileInfo().IsDir() {
fmt.Println("creating directory...")
os.MkdirAll(filePath, os.ModePerm)
continue
}
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
return err
}
dstFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
fileInArchive, err := f.Open()
if err != nil {
return err
}
if _, err := io.Copy(dstFile, fileInArchive); err != nil {
return err
}
dstFile.Close()
fileInArchive.Close()
}
return nil
}
func extractTgz(dst, src string) error {
gzipStream, err := os.Open(src)
if err != nil {
return err
}
uncompressedStream, err := gzip.NewReader(gzipStream)
if err != nil {
return err
}
tarReader := tar.NewReader(uncompressedStream)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
switch header.Typeflag {
case tar.TypeDir:
if err := os.Mkdir(header.Name, 0755); err != nil {
return err
}
case tar.TypeReg:
filePath := filepath.Join(dst, header.Name)
outFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return err
}
if err != nil {
return err
}
defer outFile.Close()
if _, err := io.Copy(outFile, tarReader); err != nil {
return err
}
default:
return err
}
}
return nil
}