From a5d7eaeed716df1b35b8fea27b10020d21269884 Mon Sep 17 00:00:00 2001 From: shynome Date: Mon, 28 Aug 2023 23:11:24 +0800 Subject: [PATCH] remove err2 --- bind.go | 25 +++-- bind_ierr.go | 216 ++++++++++++++++++++++++++++++++++++++ endpoint/err4.go | 12 +++ endpoint/inbound.go | 32 +++--- endpoint/inbound_ierr.go | 160 ++++++++++++++++++++++++++++ endpoint/outbound.go | 31 +++--- endpoint/outbound_ierr.go | 171 ++++++++++++++++++++++++++++++ err4.go | 3 + 8 files changed, 605 insertions(+), 45 deletions(-) create mode 100755 bind_ierr.go create mode 100644 endpoint/err4.go create mode 100755 endpoint/inbound_ierr.go create mode 100755 endpoint/outbound_ierr.go create mode 100644 err4.go diff --git a/bind.go b/bind.go index 4c8d394..bf36c0d 100644 --- a/bind.go +++ b/bind.go @@ -1,3 +1,5 @@ +//go:build ierr + package wgortc import ( @@ -5,8 +7,6 @@ import ( "net" "sync" - "github.com/lainio/err2" - "github.com/lainio/err2/try" "github.com/pion/ice/v2" "github.com/pion/webrtc/v3" "github.com/shynome/wgortc/endpoint" @@ -42,8 +42,7 @@ func NewBind(signaler signaler.Channel) *Bind { } } -func (b *Bind) Open(port uint16) (fns []conn.ReceiveFunc, actualPort uint16, err error) { - defer err2.Handle(&err) +func (b *Bind) Open(port uint16) (fns []conn.ReceiveFunc, actualPort uint16, ierr error) { b.locker.Lock() defer b.locker.Unlock() @@ -56,12 +55,12 @@ func (b *Bind) Open(port uint16) (fns []conn.ReceiveFunc, actualPort uint16, err settingEngine = b.NewSettingEngine() } if mux.WithUDPMux != nil { - b.mux = try.To1(mux.WithUDPMux(&settingEngine, &port)) + b.mux, ierr = mux.WithUDPMux(&settingEngine, &port) actualPort = port } b.api = webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) - ch := try.To1(b.Accept()) + ch, ierr := b.Accept() go func() { for ev := range ch { go b.handleConnect(ev) @@ -94,16 +93,17 @@ func (b *Bind) receiveFunc(packets [][]byte, sizes []int, eps []conn.Endpoint) ( } func (b *Bind) handleConnect(sess signaler.Session) { - defer err2.Catch() + var ierr error + _ = ierr config := webrtc.Configuration{ ICEServers: b.ICEServers, } - pc := try.To1(b.api.NewPeerConnection(config)) + pc, ierr := b.api.NewPeerConnection(config) defer pc.Close() inbound := endpoint.NewInbound(sess, pc) - initiator := try.To1(inbound.ExtractInitiator()) + initiator, ierr := inbound.ExtractInitiator() b.msgCh <- packetMsg{ data: initiator, ep: inbound, @@ -129,8 +129,7 @@ func (b *Bind) isClosed() bool { return b.closed } -func (b *Bind) Close() (err error) { - defer err2.Handle(&err) +func (b *Bind) Close() (ierr error) { b.locker.Lock() defer b.locker.Unlock() @@ -138,10 +137,10 @@ func (b *Bind) Close() (err error) { b.closed = true if b.mux != nil { - try.To(b.mux.Close()) + ierr = b.mux.Close() } if b.Channel != nil { - try.To(b.Channel.Close()) + ierr = b.Channel.Close() } if b.msgCh != nil { close(b.msgCh) diff --git a/bind_ierr.go b/bind_ierr.go new file mode 100755 index 0000000..69f0958 --- /dev/null +++ b/bind_ierr.go @@ -0,0 +1,216 @@ +//go:build !ierr + +// Code generated by github.com/shynome/err4 DO NOT EDIT + +package wgortc + +import ( + "errors" + "net" + "sync" + + "github.com/pion/ice/v2" + "github.com/pion/webrtc/v3" + "github.com/shynome/wgortc/endpoint" + "github.com/shynome/wgortc/mux" + "github.com/shynome/wgortc/signaler" + "golang.zx2c4.com/wireguard/conn" +) + +type Bind struct { + NewSettingEngine func() webrtc.SettingEngine + + signaler.Channel + + api *webrtc.API + mux ice.UDPMux + + ICEServers []webrtc.ICEServer + + msgCh chan packetMsg + + closed bool + locker *sync.RWMutex +} + +var _ conn.Bind = (*Bind)(nil) + +func NewBind(signaler signaler.Channel) *Bind { + return &Bind{ + Channel: signaler, + + closed: false, + locker: &sync.RWMutex{}, + } +} + +func (b *Bind) Open(port uint16) (fns []conn.ReceiveFunc, actualPort uint16, ierr error) { + b.locker.Lock() + defer b.locker.Unlock() + + fns = append(fns, b.receiveFunc) + + b.msgCh = make(chan packetMsg, b.BatchSize()-1) + + settingEngine := webrtc.SettingEngine{} + if b.NewSettingEngine != nil { + settingEngine = b.NewSettingEngine() + } + if mux.WithUDPMux != nil { + b.mux, ierr = mux.WithUDPMux(&settingEngine, &port) + if ierr != nil { + return + } + actualPort = port + } + b.api = webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + ch, ierr := b.Accept() + if ierr != nil { + return + } + go func() { + for ev := range ch { + go b.handleConnect(ev) + } + }() + + b.closed = false + return +} + +type packetMsg struct { + data []byte + ep conn.Endpoint +} + +func (b *Bind) receiveFunc(packets [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) { + if b.isClosed() { + return 0, net.ErrClosed + } + for i := 0; i < b.BatchSize(); i++ { + msg, ok := <-b.msgCh + if !ok { + return 0, net.ErrClosed + } + sizes[i] = copy(packets[i], msg.data) + eps[i] = msg.ep + n += 1 + } + return +} + +func (b *Bind) handleConnect(sess signaler.Session) { + var ierr error + _ = ierr + + config := webrtc.Configuration{ + ICEServers: b.ICEServers, + } + pc, ierr := b.api.NewPeerConnection(config) + if ierr != nil { + return + } + defer pc.Close() + + inbound := endpoint.NewInbound(sess, pc) + initiator, ierr := inbound.ExtractInitiator() + if ierr != nil { + return + } + b.msgCh <- packetMsg{ + data: initiator, + ep: inbound, + } + + ch := inbound.Message() + for d := range ch { + if b.isClosed() { + break + } + b.msgCh <- packetMsg{ + data: d, + ep: inbound, + } + } + + return +} + +func (b *Bind) isClosed() bool { + b.locker.RLock() + defer b.locker.RUnlock() + return b.closed +} + +func (b *Bind) Close() (ierr error) { + + b.locker.Lock() + defer b.locker.Unlock() + + b.closed = true + + if b.mux != nil { + ierr = b.mux.Close() + if ierr != nil { + return + } + } + if b.Channel != nil { + ierr = b.Channel.Close() + if ierr != nil { + return + } + } + if b.msgCh != nil { + close(b.msgCh) + } + return +} + +func (b *Bind) ParseEndpoint(s string) (ep conn.Endpoint, err error) { + outbound := endpoint.NewOutbound(s, b) + go func() { + ch := outbound.Message() + for d := range ch { + if b.isClosed() { + break + } + b.msgCh <- packetMsg{ + data: d, + ep: outbound, + } + } + }() + return outbound, nil +} + +var _ endpoint.Hub = (*Bind)(nil) + +func (b *Bind) NewPeerConnection() (*webrtc.PeerConnection, error) { + config := webrtc.Configuration{ + ICEServers: b.ICEServers, + } + return b.api.NewPeerConnection(config) +} + +func (b *Bind) Send(bufs [][]byte, ep conn.Endpoint) (err error) { + if b.isClosed() { + return net.ErrClosed + } + sender, ok := ep.(endpoint.Sender) + if !ok { + return ErrEndpointImpl + } + for _, buf := range bufs { + if err := sender.Send(buf); err != nil { + return err + } + } + return nil +} + +var ErrEndpointImpl = errors.New("endpoint is not wgortc.Endpoint") + +func (b *Bind) SetMark(mark uint32) error { return nil } +func (b *Bind) BatchSize() int { return 1 } diff --git a/endpoint/err4.go b/endpoint/err4.go new file mode 100644 index 0000000..4d1af96 --- /dev/null +++ b/endpoint/err4.go @@ -0,0 +1,12 @@ +package endpoint + +//go:generate err4gen . + +func then(err *error, ok func(), catch func()) { + switch { + case *err == nil && ok != nil: + ok() + case *err != nil && catch != nil: + catch() + } +} diff --git a/endpoint/inbound.go b/endpoint/inbound.go index 8be90af..a215ce2 100644 --- a/endpoint/inbound.go +++ b/endpoint/inbound.go @@ -1,3 +1,5 @@ +//go:build ierr + package endpoint import ( @@ -7,8 +9,6 @@ import ( "net" "time" - "github.com/lainio/err2" - "github.com/lainio/err2/try" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/shynome/wgortc/signaler" @@ -60,21 +60,20 @@ func (ep *Inbound) dcIsClosed() bool { return ep.dc.ReadyState() != webrtc.DataChannelStateOpen } -func (ep *Inbound) ExtractInitiator() (initiator []byte, err error) { - defer err2.Handle(&err) +func (ep *Inbound) ExtractInitiator() (initiator []byte, ierr error) { offer := ep.sess.Description() - sdp := try.To1(offer.Unmarshal()) + sdp, ierr := offer.Unmarshal() rawStr := sdp.SessionInformation if rawStr == nil { return nil, ErrInitiatorRequired } - initiator = try.To1(base64.StdEncoding.DecodeString(string(*rawStr))) + initiator, ierr = base64.StdEncoding.DecodeString(string(*rawStr)) return initiator, nil } -func (ep *Inbound) HandleConnect(buf []byte) (err error) { - defer err2.Handle(&err, func() { - ep.sess.Reject(err) +func (ep *Inbound) HandleConnect(buf []byte) (ierr error) { + defer then(&ierr, nil, func() { + ep.sess.Reject(ierr) }) pc := ep.pc @@ -85,19 +84,20 @@ func (ep *Inbound) HandleConnect(buf []byte) (err error) { } }) - try.To(pc.SetRemoteDescription(ep.sess.Description())) - answer := try.To1(pc.CreateAnswer(nil)) + ierr = pc.SetRemoteDescription(ep.sess.Description()) + answer, ierr := pc.CreateAnswer(nil) gatherComplete := webrtc.GatheringCompletePromise(pc) - try.To(pc.SetLocalDescription(answer)) + ierr = pc.SetLocalDescription(answer) <-gatherComplete roffer := pc.LocalDescription() responder := sdp.Information(base64.StdEncoding.EncodeToString(buf)) - sdp := try.To1(roffer.Unmarshal()) + sdp, ierr := roffer.Unmarshal() sdp.SessionInformation = &responder - roffer.SDP = string(try.To1(sdp.Marshal())) + rsdp, ierr := sdp.Marshal() + roffer.SDP = string(rsdp) - try.To(ep.sess.Resolve(roffer)) + ierr = ep.sess.Resolve(roffer) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -114,7 +114,7 @@ func (ep *Inbound) HandleConnect(buf []byte) (err error) { }) <-ctx.Done() if err := context.Cause(ctx); err != context.Canceled { - try.To(err) + ierr = err } return diff --git a/endpoint/inbound_ierr.go b/endpoint/inbound_ierr.go new file mode 100755 index 0000000..7ecb4dc --- /dev/null +++ b/endpoint/inbound_ierr.go @@ -0,0 +1,160 @@ +//go:build !ierr + +// Code generated by github.com/shynome/err4 DO NOT EDIT + +package endpoint + +import ( + "context" + "encoding/base64" + "errors" + "net" + "time" + + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + "github.com/shynome/wgortc/signaler" + "golang.zx2c4.com/wireguard/conn" +) + +type Inbound struct { + baseEndpoint + dc *webrtc.DataChannel + sess signaler.Session + + pc *webrtc.PeerConnection + ch chan []byte +} + +var ( + _ conn.Endpoint = (*Inbound)(nil) + _ Sender = (*Inbound)(nil) +) + +func NewInbound(sess signaler.Session, pc *webrtc.PeerConnection) *Inbound { + return &Inbound{ + baseEndpoint: baseEndpoint{ + id: sess.Description().SDP, + }, + pc: pc, + sess: sess, + ch: make(chan []byte), + } +} + +func (ep *Inbound) Send(buf []byte) (err error) { + closed := ep.dcIsClosed() + if buf[0] == 2 && closed { + go ep.HandleConnect(buf) + return + } + if closed { + return net.ErrClosed + } + go ep.dc.Send(buf) + return +} + +func (ep *Inbound) dcIsClosed() bool { + if ep.dc == nil { + return true + } + return ep.dc.ReadyState() != webrtc.DataChannelStateOpen +} + +func (ep *Inbound) ExtractInitiator() (initiator []byte, ierr error) { + offer := ep.sess.Description() + sdp, ierr := offer.Unmarshal() + if ierr != nil { + return + } + rawStr := sdp.SessionInformation + if rawStr == nil { + return nil, ErrInitiatorRequired + } + initiator, ierr = base64.StdEncoding.DecodeString(string(*rawStr)) + if ierr != nil { + return + } + return initiator, nil +} + +func (ep *Inbound) HandleConnect(buf []byte) (ierr error) { + defer then(&ierr, nil, func() { + ep.sess.Reject(ierr) + }) + + pc := ep.pc + pc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { + switch pcs { + case webrtc.PeerConnectionStateDisconnected: + pc.Close() + } + }) + + ierr = pc.SetRemoteDescription(ep.sess.Description()) + if ierr != nil { + return + } + answer, ierr := pc.CreateAnswer(nil) + if ierr != nil { + return + } + gatherComplete := webrtc.GatheringCompletePromise(pc) + ierr = pc.SetLocalDescription(answer) + if ierr != nil { + return + } + <-gatherComplete + roffer := pc.LocalDescription() + + responder := sdp.Information(base64.StdEncoding.EncodeToString(buf)) + sdp, ierr := roffer.Unmarshal() + if ierr != nil { + return + } + sdp.SessionInformation = &responder + rsdp, ierr := sdp.Marshal() + if ierr != nil { + return + } + roffer.SDP = string(rsdp) + + ierr = ep.sess.Resolve(roffer) + if ierr != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ctx, cause := context.WithCancelCause(context.Background()) + pc.OnDataChannel(func(dc *webrtc.DataChannel) { + switch dc.Label() { + case "wgortc": + defer cause(nil) + ep.dc = dc + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + ep.ch <- msg.Data + }) + } + }) + <-ctx.Done() + if err := context.Cause(ctx); err != context.Canceled { + ierr = err + if ierr != nil { + return + } + } + + return +} + +func (ep *Inbound) Message() (ch <-chan []byte) { + return ep.ch +} + +var ErrInitiatorRequired = errors.New("first message initiator is required in webrtc sdp SessionInformation") + +func (ep *Inbound) DstToString() string { + return getPCRemote(ep.pc) +} diff --git a/endpoint/outbound.go b/endpoint/outbound.go index 276d9e7..626f6dd 100644 --- a/endpoint/outbound.go +++ b/endpoint/outbound.go @@ -1,3 +1,5 @@ +//go:build ierr + package endpoint import ( @@ -6,8 +8,6 @@ import ( "net" "time" - "github.com/lainio/err2" - "github.com/lainio/err2/try" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/shynome/wgortc/signaler" @@ -61,15 +61,13 @@ func (ep *Outbound) dcIsClosed() bool { return ep.dc.ReadyState() != webrtc.DataChannelStateOpen } -func (ep *Outbound) Connect(buf []byte) (err error) { - defer err2.Handle(&err) - +func (ep *Outbound) Connect(buf []byte) (ierr error) { var pc *webrtc.PeerConnection = ep.pc if pc != nil { pc.Close() } - pc = try.To1(ep.hub.NewPeerConnection()) + pc, ierr = ep.hub.NewPeerConnection() ep.pc = pc pc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { @@ -83,7 +81,7 @@ func (ep *Outbound) Connect(buf []byte) (err error) { Ordered: refVal(false), MaxRetransmits: refVal(uint16(0)), } - dc := try.To1(pc.CreateDataChannel("wgortc", &dcinit)) + dc, ierr := pc.CreateDataChannel("wgortc", &dcinit) ep.dc = dc dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -91,27 +89,28 @@ func (ep *Outbound) Connect(buf []byte) (err error) { }) gatherComplete := webrtc.GatheringCompletePromise(pc) - offer := try.To1(pc.CreateOffer(nil)) - try.To(pc.SetLocalDescription(offer)) + offer, ierr := pc.CreateOffer(nil) + ierr = pc.SetLocalDescription(offer) <-gatherComplete offer = *pc.LocalDescription() initiator := sdp.Information(base64.StdEncoding.EncodeToString(buf)) - sdp := try.To1(offer.Unmarshal()) + sdp, ierr := offer.Unmarshal() sdp.SessionInformation = &initiator - offer.SDP = string(try.To1(sdp.Marshal())) + rsdp, ierr := sdp.Marshal() + offer.SDP = string(rsdp) - anwser := try.To1(ep.hub.Handshake(ep.id, offer)) + anwser, ierr := ep.hub.Handshake(ep.id, offer) - try.To(pc.SetRemoteDescription(*anwser)) + ierr = pc.SetRemoteDescription(*anwser) - sdp2 := try.To1(anwser.Unmarshal()) + sdp2, ierr := anwser.Unmarshal() if sdp2.SessionInformation == nil { return ErrInitiatorResponderRequired } - responder := try.To1(base64.StdEncoding.DecodeString(string(*sdp2.SessionInformation))) + responder, ierr := base64.StdEncoding.DecodeString(string(*sdp2.SessionInformation)) - try.To(WaitDC(dc, 5*time.Second)) + ierr = WaitDC(dc, 5*time.Second) ep.ch <- responder return diff --git a/endpoint/outbound_ierr.go b/endpoint/outbound_ierr.go new file mode 100755 index 0000000..bf57ca0 --- /dev/null +++ b/endpoint/outbound_ierr.go @@ -0,0 +1,171 @@ +//go:build !ierr + +// Code generated by github.com/shynome/err4 DO NOT EDIT + +package endpoint + +import ( + "encoding/base64" + "errors" + "net" + "time" + + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + "github.com/shynome/wgortc/signaler" + "golang.zx2c4.com/wireguard/conn" +) + +type Outbound struct { + baseEndpoint + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + hub Hub + ch chan []byte +} + +var ( + _ conn.Endpoint = (*Outbound)(nil) + _ Sender = (*Outbound)(nil) +) + +type Hub interface { + NewPeerConnection() (*webrtc.PeerConnection, error) + signaler.Channel +} + +func NewOutbound(id string, hub Hub) *Outbound { + return &Outbound{ + baseEndpoint: baseEndpoint{id: id}, + + hub: hub, + ch: make(chan []byte), + } +} + +func (ep *Outbound) Send(buf []byte) (err error) { + closed := ep.dcIsClosed() + if buf[0] == 1 && closed { + go ep.Connect(buf) + return + } + if closed { + return net.ErrClosed + } + go ep.dc.Send(buf) + return +} + +func (ep *Outbound) dcIsClosed() bool { + if ep.dc == nil { + return true + } + return ep.dc.ReadyState() != webrtc.DataChannelStateOpen +} + +func (ep *Outbound) Connect(buf []byte) (ierr error) { + var pc *webrtc.PeerConnection = ep.pc + if pc != nil { + pc.Close() + } + + pc, ierr = ep.hub.NewPeerConnection() + if ierr != nil { + return + } + ep.pc = pc + + pc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { + switch pcs { + case webrtc.PeerConnectionStateDisconnected: + pc.Close() + } + }) + + dcinit := webrtc.DataChannelInit{ + Ordered: refVal(false), + MaxRetransmits: refVal(uint16(0)), + } + dc, ierr := pc.CreateDataChannel("wgortc", &dcinit) + if ierr != nil { + return + } + ep.dc = dc + + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + ep.ch <- msg.Data + }) + + gatherComplete := webrtc.GatheringCompletePromise(pc) + offer, ierr := pc.CreateOffer(nil) + if ierr != nil { + return + } + ierr = pc.SetLocalDescription(offer) + if ierr != nil { + return + } + <-gatherComplete + offer = *pc.LocalDescription() + + initiator := sdp.Information(base64.StdEncoding.EncodeToString(buf)) + sdp, ierr := offer.Unmarshal() + if ierr != nil { + return + } + sdp.SessionInformation = &initiator + rsdp, ierr := sdp.Marshal() + if ierr != nil { + return + } + offer.SDP = string(rsdp) + + anwser, ierr := ep.hub.Handshake(ep.id, offer) + if ierr != nil { + return + } + + ierr = pc.SetRemoteDescription(*anwser) + if ierr != nil { + return + } + + sdp2, ierr := anwser.Unmarshal() + if ierr != nil { + return + } + if sdp2.SessionInformation == nil { + return ErrInitiatorResponderRequired + } + responder, ierr := base64.StdEncoding.DecodeString(string(*sdp2.SessionInformation)) + if ierr != nil { + return + } + + ierr = WaitDC(dc, 5*time.Second) + if ierr != nil { + return + } + ep.ch <- responder + + return +} + +var ErrInitiatorResponderRequired = errors.New("first message initiator responder is required in webrtc sdp SessionInformation") + +func (ep *Outbound) Close() (err error) { + if pc := ep.pc; pc != nil { + if err = pc.Close(); err != nil { + return + } + } + return +} + +func (ep *Outbound) Message() (ch <-chan []byte) { + return ep.ch +} + +func (ep *Outbound) DstToString() string { + return getPCRemote(ep.pc) +} diff --git a/err4.go b/err4.go new file mode 100644 index 0000000..7ccd982 --- /dev/null +++ b/err4.go @@ -0,0 +1,3 @@ +package wgortc + +//go:generate err4gen .