From 669effd680d1d92f12a95fce68b407e827eddd3b Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 8 Jul 2022 20:06:28 +0200 Subject: [PATCH] Update srt dependency --- go.mod | 2 +- go.sum | 4 +- .../datarhei/gosrt/CODE_OF_CONDUCT.md | 33 ++++ vendor/github.com/datarhei/gosrt/README.md | 4 +- vendor/github.com/datarhei/gosrt/SECURITY.md | 16 ++ vendor/github.com/datarhei/gosrt/dial.go | 13 +- .../gosrt/internal/congestion/live.go | 178 ++++++++++++++++++ .../datarhei/gosrt/internal/packet/packet.go | 15 +- vendor/github.com/datarhei/gosrt/listen.go | 13 +- vendor/modules.txt | 2 +- 10 files changed, 268 insertions(+), 12 deletions(-) create mode 100644 vendor/github.com/datarhei/gosrt/CODE_OF_CONDUCT.md create mode 100644 vendor/github.com/datarhei/gosrt/SECURITY.md diff --git a/go.mod b/go.mod index 36f81664..11eb7ec8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/99designs/gqlgen v0.17.12 github.com/atrox/haikunatorgo/v2 v2.0.1 - github.com/datarhei/gosrt v0.1.1 + github.com/datarhei/gosrt v0.1.2 github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce github.com/go-openapi/spec v0.20.6 // indirect github.com/go-openapi/swag v0.21.1 // indirect diff --git a/go.sum b/go.sum index ea831c74..2822dcfa 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/datarhei/gosrt v0.1.1 h1:y5v0CNmT8FFxgJnr2a0+RzoNS5t4OKJBkKtXovmfsFE= -github.com/datarhei/gosrt v0.1.1/go.mod h1:IftDbZGIIC9OvQO5on5ZpU0iB/JX/PFOqGXORbwHYQM= +github.com/datarhei/gosrt v0.1.2 h1:rGOP2Xkbi52z4tLzBwCBw2TKt7BrfTO2LmEVY+yWf1M= +github.com/datarhei/gosrt v0.1.2/go.mod h1:IftDbZGIIC9OvQO5on5ZpU0iB/JX/PFOqGXORbwHYQM= github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce h1:bg/OE9GfGK6d/XbqiMq8YaGQzw1Ul3Y3qiGMzU1G4HQ= github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/vendor/github.com/datarhei/gosrt/CODE_OF_CONDUCT.md b/vendor/github.com/datarhei/gosrt/CODE_OF_CONDUCT.md new file mode 100644 index 00000000..0db593af --- /dev/null +++ b/vendor/github.com/datarhei/gosrt/CODE_OF_CONDUCT.md @@ -0,0 +1,33 @@ +Contributor Code of Conduct +=========================== + +As contributors and maintainers of this project, we pledge to respect all +people who contribute through reporting issues, posting feature requests, +updating documentation, submitting pull requests or patches, and other +activities. + +Examples of unacceptable behavior by participants include: +- Sexual language or imagery. +- Derogatory comments or personal attacks. +- Trolling, public or private harassment. +- Insults. +- Other unprofessional conduct. + +Examples of unacceptable behavior by participants include sexual +language or imagery, derogatory comments or personal attacks, trolling, public +or private harassment, insults, or other unprofessional conduct. + +Project maintainers have the right and responsibility to remove, edit, or reject +comments, commits, code, wiki edits, issues, and other contributions that are not +aligned with this Code of Conduct. In addition, project maintainers who do not +follow the Code of Conduct may be removed from the project team. + +This code of conduct applies both within a project and in public spaces when an +individual represents the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by opening an issue or contacting one or more of the project maintainers. + +This Code of Conduct is adapted from the [Contributor +Covenant](https://contributor-covenant.org/), version 1.1.0, available at +[https://contributor-covenant.org/version/1/1/0/](https://contributor-covenant.org/version/1/1/0/) diff --git a/vendor/github.com/datarhei/gosrt/README.md b/vendor/github.com/datarhei/gosrt/README.md index f20be5df..bfd98b56 100644 --- a/vendor/github.com/datarhei/gosrt/README.md +++ b/vendor/github.com/datarhei/gosrt/README.md @@ -42,7 +42,7 @@ go get github.com/datarhei/gosrt # Caller example ``` -import github.com/datarhei/gosrt +import "github.com/datarhei/gosrt" conn, err := srt.Dial("srt", "golang.org:6000", srt.Config{ StreamId: "...", @@ -70,7 +70,7 @@ In the `contrib/client` directory you'll find a complete example of a SRT client # Listener example ``` -import github.com/datarhei/gosrt +import "github.com/datarhei/gosrt" ln, err := srt.Listen("srt", ":6000", srt.Config{...}) if err != nil { diff --git a/vendor/github.com/datarhei/gosrt/SECURITY.md b/vendor/github.com/datarhei/gosrt/SECURITY.md new file mode 100644 index 00000000..07b352c7 --- /dev/null +++ b/vendor/github.com/datarhei/gosrt/SECURITY.md @@ -0,0 +1,16 @@ +# Security Policy + +## Supported Versions + +All versions in the list receive security updates. + +| Version | Supported | +| ------- | ------------------ | +| 0.1.1 | :white_check_mark: | +| - | :x: | + +## Reporting a Vulnerability + +If you have found or just suspect a security problem somewhere in Restreamer or Core, report it on support@datarhei.com. + +We treat security issues with confidentiality until controlled and disclosed responsibly. diff --git a/vendor/github.com/datarhei/gosrt/dial.go b/vendor/github.com/datarhei/gosrt/dial.go index 85596995..4176147e 100644 --- a/vendor/github.com/datarhei/gosrt/dial.go +++ b/vendor/github.com/datarhei/gosrt/dial.go @@ -167,7 +167,12 @@ func Dial(network, address string, config Config) (Conn, error) { continue } - dl.rcvQueue <- p + // non-blocking + select { + case dl.rcvQueue <- p: + default: + dl.log("dial", func() string { return "receive queue is full" }) + } } }() @@ -285,7 +290,11 @@ func (dl *dialer) writer(ctx context.Context) { case p := <-dl.sndQueue: data.Reset() - p.Marshal(&data) + if err := p.Marshal(&data); err != nil { + p.Decommission() + dl.log("packet:send:error", func() string { return "marshalling packet failed" }) + continue + } buffer := data.Bytes() diff --git a/vendor/github.com/datarhei/gosrt/internal/congestion/live.go b/vendor/github.com/datarhei/gosrt/internal/congestion/live.go index dbfedd56..fd57b717 100644 --- a/vendor/github.com/datarhei/gosrt/internal/congestion/live.go +++ b/vendor/github.com/datarhei/gosrt/internal/congestion/live.go @@ -618,3 +618,181 @@ func (r *liveReceive) String(t uint64) string { return b.String() } + +type fakeLiveReceive struct { + maxSeenSequenceNumber circular.Number + lastACKSequenceNumber circular.Number + lastDeliveredSequenceNumber circular.Number + + nPackets uint + + periodicACKInterval uint64 // config + periodicNAKInterval uint64 // config + + lastPeriodicACK uint64 + lastPeriodicNAK uint64 + + avgPayloadSize float64 // bytes + + rate struct { + last time.Time + period time.Duration + + packets uint64 + prevPackets uint64 + bytes uint64 + prevBytes uint64 + + pps uint32 + bps uint32 + } + + sendACK func(seq circular.Number, light bool) + sendNAK func(from, to circular.Number) + deliver func(p packet.Packet) + + lock sync.RWMutex +} + +func NewFakeLiveReceive(config ReceiveConfig) Receiver { + r := &fakeLiveReceive{ + maxSeenSequenceNumber: config.InitialSequenceNumber.Dec(), + lastACKSequenceNumber: config.InitialSequenceNumber.Dec(), + lastDeliveredSequenceNumber: config.InitialSequenceNumber.Dec(), + + periodicACKInterval: config.PeriodicACKInterval, + periodicNAKInterval: config.PeriodicNAKInterval, + + avgPayloadSize: 1456, // 5.1.2. SRT's Default LiveCC Algorithm + + sendACK: config.OnSendACK, + sendNAK: config.OnSendNAK, + deliver: config.OnDeliver, + } + + if r.sendACK == nil { + r.sendACK = func(seq circular.Number, light bool) {} + } + + if r.sendNAK == nil { + r.sendNAK = func(from, to circular.Number) {} + } + + if r.deliver == nil { + r.deliver = func(p packet.Packet) {} + } + + r.rate.last = time.Now() + r.rate.period = time.Second + + return r +} + +func (r *fakeLiveReceive) Stats() ReceiveStats { return ReceiveStats{} } +func (r *fakeLiveReceive) PacketRate() (pps, bps uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + tdiff := time.Since(r.rate.last) + + if tdiff < r.rate.period { + pps = r.rate.pps + bps = r.rate.bps + + return + } + + pdiff := r.rate.packets - r.rate.prevPackets + bdiff := r.rate.bytes - r.rate.prevBytes + + r.rate.pps = uint32(float64(pdiff) / tdiff.Seconds()) + r.rate.bps = uint32(float64(bdiff) / tdiff.Seconds()) + + r.rate.prevPackets, r.rate.prevBytes = r.rate.packets, r.rate.bytes + r.rate.last = time.Now() + + pps = r.rate.pps + bps = r.rate.bps + + return +} + +func (r *fakeLiveReceive) Flush() {} + +func (r *fakeLiveReceive) Push(pkt packet.Packet) { + r.lock.Lock() + defer r.lock.Unlock() + + if pkt == nil { + return + } + + r.nPackets++ + + pktLen := pkt.Len() + + r.rate.packets++ + r.rate.bytes += pktLen + + // 5.1.2. SRT's Default LiveCC Algorithm + r.avgPayloadSize = 0.875*r.avgPayloadSize + 0.125*float64(pktLen) + + if pkt.Header().PacketSequenceNumber.Lte(r.lastDeliveredSequenceNumber) { + // too old, because up until r.lastDeliveredSequenceNumber, we already delivered + return + } + + if pkt.Header().PacketSequenceNumber.Lt(r.lastACKSequenceNumber) { + // already acknowledged, ignoring + return + } + + if pkt.Header().PacketSequenceNumber.Lte(r.maxSeenSequenceNumber) { + return + } + + r.maxSeenSequenceNumber = pkt.Header().PacketSequenceNumber +} + +func (r *fakeLiveReceive) periodicACK(now uint64) (ok bool, sequenceNumber circular.Number, lite bool) { + r.lock.RLock() + defer r.lock.RUnlock() + + // 4.8.1. Packet Acknowledgement (ACKs, ACKACKs) + if now-r.lastPeriodicACK < r.periodicACKInterval { + if r.nPackets >= 64 { + lite = true // send light ACK + } else { + return + } + } + + ok = true + sequenceNumber = r.maxSeenSequenceNumber.Inc() + + r.lastACKSequenceNumber = r.maxSeenSequenceNumber + + r.lastPeriodicACK = now + r.nPackets = 0 + + return +} + +func (r *fakeLiveReceive) Tick(now uint64) { + if ok, sequenceNumber, lite := r.periodicACK(now); ok { + r.sendACK(sequenceNumber, lite) + } + + // deliver packets whose PktTsbpdTime is ripe + r.lock.Lock() + defer r.lock.Unlock() + + r.lastDeliveredSequenceNumber = r.lastACKSequenceNumber +} + +func (r *fakeLiveReceive) SetNAKInterval(nakInterval uint64) { + r.lock.Lock() + defer r.lock.Unlock() + + r.periodicNAKInterval = nakInterval +} diff --git a/vendor/github.com/datarhei/gosrt/internal/packet/packet.go b/vendor/github.com/datarhei/gosrt/internal/packet/packet.go index 20670cef..ed8dab9a 100644 --- a/vendor/github.com/datarhei/gosrt/internal/packet/packet.go +++ b/vendor/github.com/datarhei/gosrt/internal/packet/packet.go @@ -177,7 +177,7 @@ type Packet interface { SetData([]byte) Len() uint64 Unmarshal(data []byte) error - Marshal(w io.Writer) + Marshal(w io.Writer) error Dump() string MarshalCIF(c CIF) UnmarshalCIF(c CIF) error @@ -259,6 +259,7 @@ func NewPacket(addr net.Addr, rawdata []byte) Packet { if len(rawdata) != 0 { if err := p.Unmarshal(rawdata); err != nil { + p.Decommission() return nil } } @@ -267,6 +268,10 @@ func NewPacket(addr net.Addr, rawdata []byte) Packet { } func (p *pkt) Decommission() { + if p.payload == nil { + return + } + payloadPool.Put(p.payload) p.payload = nil } @@ -351,9 +356,13 @@ func (p *pkt) Unmarshal(data []byte) error { return nil } -func (p *pkt) Marshal(w io.Writer) { +func (p *pkt) Marshal(w io.Writer) error { var buffer [16]byte + if p.payload == nil { + return fmt.Errorf("invalid payload") + } + if p.header.IsControlPacket { binary.BigEndian.PutUint16(buffer[0:], p.header.ControlType) // control type binary.BigEndian.PutUint16(buffer[2:], p.header.SubType) // sub type @@ -384,6 +393,8 @@ func (p *pkt) Marshal(w io.Writer) { w.Write(buffer[0:]) w.Write(p.payload.Bytes()) + + return nil } func (p *pkt) Dump() string { diff --git a/vendor/github.com/datarhei/gosrt/listen.go b/vendor/github.com/datarhei/gosrt/listen.go index 042b27b2..32907618 100644 --- a/vendor/github.com/datarhei/gosrt/listen.go +++ b/vendor/github.com/datarhei/gosrt/listen.go @@ -262,7 +262,12 @@ func Listen(network, address string, config Config) (Listener, error) { continue } - ln.rcvQueue <- p + // non-blocking + select { + case ln.rcvQueue <- p: + default: + ln.log("listen", func() string { return "receive queue is full" }) + } } }() @@ -503,7 +508,11 @@ func (ln *listener) writer(ctx context.Context) { case p := <-ln.sndQueue: data.Reset() - p.Marshal(&data) + if err := p.Marshal(&data); err != nil { + p.Decommission() + ln.log("packet:send:error", func() string { return "marshalling packet failed" }) + continue + } buffer := data.Bytes() diff --git a/vendor/modules.txt b/vendor/modules.txt index c6074365..02ed5f0c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -39,7 +39,7 @@ github.com/beorn7/perks/quantile github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.1 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/gosrt v0.1.1 +# github.com/datarhei/gosrt v0.1.2 ## explicit github.com/datarhei/gosrt github.com/datarhei/gosrt/internal/circular