This commit is contained in:
Ingo Oppermann
2022-07-28 20:27:27 +02:00
parent f8ac5e22e7
commit 542653d3e2
34 changed files with 730 additions and 702 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/99designs/gqlgen v0.17.12 github.com/99designs/gqlgen v0.17.12
github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/datarhei/gosrt v0.1.2 github.com/datarhei/gosrt v0.1.2
github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce github.com/datarhei/joy4 v0.0.0-20220728180719-f752080f4a36
github.com/go-playground/validator/v10 v10.11.0 github.com/go-playground/validator/v10 v10.11.0
github.com/golang-jwt/jwt/v4 v4.4.2 github.com/golang-jwt/jwt/v4 v4.4.2
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0

4
go.sum
View File

@@ -76,8 +76,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datarhei/gosrt v0.1.2 h1:rGOP2Xkbi52z4tLzBwCBw2TKt7BrfTO2LmEVY+yWf1M= github.com/datarhei/gosrt v0.1.2 h1:rGOP2Xkbi52z4tLzBwCBw2TKt7BrfTO2LmEVY+yWf1M=
github.com/datarhei/gosrt v0.1.2/go.mod h1:IftDbZGIIC9OvQO5on5ZpU0iB/JX/PFOqGXORbwHYQM= github.com/datarhei/gosrt v0.1.2/go.mod h1:IftDbZGIIC9OvQO5on5ZpU0iB/JX/PFOqGXORbwHYQM=
github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce h1:bg/OE9GfGK6d/XbqiMq8YaGQzw1Ul3Y3qiGMzU1G4HQ= github.com/datarhei/joy4 v0.0.0-20220728180719-f752080f4a36 h1:ppjcv7wazy4d7vANREERXkSAUnhV/nfT2a+13u4ZijQ=
github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= github.com/datarhei/joy4 v0.0.0-20220728180719-f752080f4a36/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -1,4 +1,3 @@
// Package av defines basic interfaces and data structures of container demux/mux and audio encode/decode. // Package av defines basic interfaces and data structures of container demux/mux and audio encode/decode.
package av package av
@@ -294,12 +293,12 @@ type AudioEncoder interface {
CodecData() (AudioCodecData, error) // encoder's codec data can put into container CodecData() (AudioCodecData, error) // encoder's codec data can put into container
Encode(AudioFrame) ([][]byte, error) // encode raw audio frame into compressed pakcet(s) Encode(AudioFrame) ([][]byte, error) // encode raw audio frame into compressed pakcet(s)
Close() // close encoder, free cgo contexts Close() // close encoder, free cgo contexts
SetSampleRate(int) (error) // set encoder sample rate SetSampleRate(int) error // set encoder sample rate
SetChannelLayout(ChannelLayout) (error) // set encoder channel layout SetChannelLayout(ChannelLayout) error // set encoder channel layout
SetSampleFormat(SampleFormat) (error) // set encoder sample format SetSampleFormat(SampleFormat) error // set encoder sample format
SetBitrate(int) (error) // set encoder bitrate SetBitrate(int) error // set encoder bitrate
SetOption(string,interface{}) (error) // encoder setopt, in ffmpeg is av_opt_set_dict() SetOption(string, interface{}) error // encoder setopt, in ffmpeg is av_opt_set_dict()
GetOption(string,interface{}) (error) // encoder getopt GetOption(string, interface{}) error // encoder getopt
} }
// AudioDecoder can decode compressed audio packets into raw audio frame. // AudioDecoder can decode compressed audio packets into raw audio frame.
@@ -313,4 +312,3 @@ type AudioDecoder interface {
type AudioResampler interface { type AudioResampler interface {
Resample(AudioFrame) (AudioFrame, error) // convert raw audio frames Resample(AudioFrame) (AudioFrame, error) // convert raw audio frames
} }

View File

@@ -1,14 +1,15 @@
package avutil package avutil
import ( import (
"io"
"strings"
"fmt"
"bytes" "bytes"
"github.com/datarhei/joy4/av" "fmt"
"io"
"net/url" "net/url"
"os" "os"
"path" "path"
"strings"
"github.com/datarhei/joy4/av"
) )
type HandlerDemuxer struct { type HandlerDemuxer struct {
@@ -108,7 +109,7 @@ func (self *Handlers) NewAudioEncoder(typ av.CodecType) (enc av.AudioEncoder, er
} }
} }
} }
err = fmt.Errorf("avutil: encoder", typ, "not found") err = fmt.Errorf("avutil: encoder %s not found", typ)
return return
} }
@@ -120,7 +121,7 @@ func (self *Handlers) NewAudioDecoder(codec av.AudioCodecData) (dec av.AudioDeco
} }
} }
} }
err = fmt.Errorf("avutil: decoder", codec.Type(), "not found") err = fmt.Errorf("avutil: decoder %s not found", codec.Type())
return return
} }

View File

@@ -1,10 +1,9 @@
// Package pktque provides packet Filter interface and structures used by other components. // Package pktque provides packet Filter interface and structures used by other components.
package pktque package pktque
import ( import (
"time"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"time"
) )
type Filter interface { type Filter interface {
@@ -188,4 +187,3 @@ func (self *Walltime) ModifyPacket(pkt *av.Packet, streams []av.CodecData, video
} }
return return
} }

View File

@@ -58,4 +58,3 @@ func (self *Timeline) Pop(dur time.Duration) (tm time.Duration) {
return return
} }

View File

@@ -2,11 +2,12 @@
package pubsub package pubsub
import ( import (
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/pktque"
"io" "io"
"sync" "sync"
"time" "time"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/pktque"
) )
// time // time
@@ -97,7 +98,6 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
break break
} }
} }
//println("shrink", self.curgopcount, self.maxgopcount, self.buf.Head, self.buf.Tail, "count", self.buf.Count, "size", self.buf.Size)
self.cond.Broadcast() self.cond.Broadcast()

View File

@@ -1,12 +1,12 @@
package aacparser package aacparser
import ( import (
"github.com/datarhei/joy4/utils/bits"
"github.com/datarhei/joy4/av"
"time"
"fmt"
"bytes" "bytes"
"fmt"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/utils/bits"
"io" "io"
"time"
) )
// copied from libavcodec/mpeg4audio.h // copied from libavcodec/mpeg4audio.h
@@ -308,4 +308,3 @@ func NewCodecDataFromMPEG4AudioConfigBytes(config []byte) (self CodecData, err e
} }
return return
} }

View File

@@ -61,4 +61,3 @@ func NewSpeexCodecData(sr int, cl av.ChannelLayout) SpeexCodecData {
codec.ChannelLayout_ = cl codec.ChannelLayout_ = cl
return codec return codec
} }

View File

@@ -26,4 +26,3 @@ func (self CodecData) ChannelLayout() av.ChannelLayout {
func (self CodecData) SampleRate() int { func (self CodecData) SampleRate() int {
return self.SampleRate_ return self.SampleRate_
} }

View File

@@ -1,12 +1,11 @@
package h264parser package h264parser
import ( import (
"bytes"
"fmt"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/utils/bits" "github.com/datarhei/joy4/utils/bits"
"github.com/datarhei/joy4/utils/bits/pio" "github.com/datarhei/joy4/utils/bits/pio"
"fmt"
"bytes"
) )
const ( const (
@@ -740,4 +739,3 @@ func ParseSliceHeaderFromNALU(packet []byte) (sliceType SliceType, err error) {
return return
} }

View File

@@ -1,14 +1,13 @@
package aac package aac
import ( import (
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/codec/aacparser"
"time"
"fmt"
"io"
"bufio" "bufio"
"fmt"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/codec/aacparser"
"io"
"time"
) )
type Muxer struct { type Muxer struct {

View File

@@ -3,7 +3,6 @@ package flv
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/datarhei/joy4/utils/bits/pio"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/codec" "github.com/datarhei/joy4/codec"
@@ -11,6 +10,7 @@ import (
"github.com/datarhei/joy4/codec/fake" "github.com/datarhei/joy4/codec/fake"
"github.com/datarhei/joy4/codec/h264parser" "github.com/datarhei/joy4/codec/h264parser"
"github.com/datarhei/joy4/format/flv/flvio" "github.com/datarhei/joy4/format/flv/flvio"
"github.com/datarhei/joy4/utils/bits/pio"
"io" "io"
) )

View File

@@ -1,11 +1,11 @@
package flvio package flvio
import ( import (
"strings"
"math"
"fmt" "fmt"
"time"
"github.com/datarhei/joy4/utils/bits/pio" "github.com/datarhei/joy4/utils/bits/pio"
"math"
"strings"
"time"
) )
type AMF0ParseError struct { type AMF0ParseError struct {
@@ -278,7 +278,6 @@ func FillAMF0Val(b []byte, _val interface{}) (n int) {
return return
} }
func ParseAMF0Val(b []byte) (val interface{}, n int, err error) { func ParseAMF0Val(b []byte) (val interface{}, n int, err error) {
return parseAMF0Val(b, 0) return parseAMF0Val(b, 0)
} }
@@ -465,4 +464,3 @@ func parseAMF0Val(b []byte, offset int) (val interface{}, n int, err error) {
return return
} }

View File

@@ -2,8 +2,8 @@ package flvio
import ( import (
"fmt" "fmt"
"github.com/datarhei/joy4/utils/bits/pio"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/utils/bits/pio"
"io" "io"
"time" "time"
) )

View File

@@ -1,13 +1,13 @@
package format package format
import ( import (
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/format/aac"
"github.com/datarhei/joy4/format/flv"
"github.com/datarhei/joy4/format/mp4" "github.com/datarhei/joy4/format/mp4"
"github.com/datarhei/joy4/format/ts"
"github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format/rtmp"
"github.com/datarhei/joy4/format/rtsp" "github.com/datarhei/joy4/format/rtsp"
"github.com/datarhei/joy4/format/flv" "github.com/datarhei/joy4/format/ts"
"github.com/datarhei/joy4/format/aac"
"github.com/datarhei/joy4/av/avutil"
) )
func RegisterAll() { func RegisterAll() {
@@ -18,4 +18,3 @@ func RegisterAll() {
avutil.DefaultHandlers.Add(flv.Handler) avutil.DefaultHandlers.Add(flv.Handler)
avutil.DefaultHandlers.Add(aac.Handler) avutil.DefaultHandlers.Add(aac.Handler)
} }

View File

@@ -1,9 +1,9 @@
package mp4 package mp4
import ( import (
"io"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"io"
) )
var CodecTypes = []av.CodecType{av.H264, av.AAC} var CodecTypes = []av.CodecType{av.H264, av.AAC}
@@ -29,4 +29,3 @@ func Handler(h *avutil.RegisterHandler) {
h.CodecTypes = CodecTypes h.CodecTypes = CodecTypes
} }

View File

@@ -1,14 +1,13 @@
package mp4io package mp4io
import ( import (
"github.com/datarhei/joy4/utils/bits/pio"
"os"
"io"
"fmt" "fmt"
"time" "github.com/datarhei/joy4/utils/bits/pio"
"io"
"math" "math"
"os"
"strings" "strings"
"time"
) )
type ParseError struct { type ParseError struct {
@@ -500,4 +499,3 @@ func (self *Track) GetElemStreamDesc() (esds *ElemStreamDesc) {
esds, _ = atom.(*ElemStreamDesc) esds, _ = atom.(*ElemStreamDesc)
return return
} }

View File

@@ -1,15 +1,15 @@
package mp4 package mp4
import ( import (
"bufio"
"fmt" "fmt"
"time"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/codec/aacparser" "github.com/datarhei/joy4/codec/aacparser"
"github.com/datarhei/joy4/codec/h264parser" "github.com/datarhei/joy4/codec/h264parser"
"github.com/datarhei/joy4/format/mp4/mp4io" "github.com/datarhei/joy4/format/mp4/mp4io"
"github.com/datarhei/joy4/utils/bits/pio" "github.com/datarhei/joy4/utils/bits/pio"
"io" "io"
"bufio" "time"
) )
type Muxer struct { type Muxer struct {

View File

@@ -10,16 +10,17 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"github.com/datarhei/joy4/utils/bits/pio"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/format/flv"
"github.com/datarhei/joy4/format/flv/flvio"
"io" "io"
"net" "net"
"net/url" "net/url"
"strings" "strings"
"time" "time"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/format/flv"
"github.com/datarhei/joy4/format/flv/flvio"
"github.com/datarhei/joy4/utils/bits/pio"
) )
var Debug bool var Debug bool
@@ -55,7 +56,7 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) {
return return
} }
var ErrServerClosed = errors.New("rtmp: Server closed") var ErrServerClosed = errors.New("server closed")
type Server struct { type Server struct {
Addr string Addr string
@@ -98,7 +99,7 @@ func (self *Server) ListenAndServe() error {
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
return fmt.Errorf("rtmp: %w", err) return err
} }
return self.Serve(listener) return self.Serve(listener)
@@ -112,7 +113,7 @@ func (self *Server) ListenAndServeTLS(certFile, keyFile string) error {
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
return fmt.Errorf("rtmp: %w", err) return err
} }
return self.ServeTLS(listener, certFile, keyFile) return self.ServeTLS(listener, certFile, keyFile)
@@ -135,7 +136,7 @@ func (self *Server) ServeTLS(listener net.Listener, certFile, keyFile string) er
var err error var err error
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil { if err != nil {
return fmt.Errorf("rtmp: %w", err) return err
} }
} }
@@ -179,8 +180,6 @@ func (self *Server) Serve(listener net.Listener) error {
} }
}() }()
} }
return nil
} }
func (self *Server) Close() { func (self *Server) Close() {
@@ -322,6 +321,8 @@ const (
eventtypeStreamBegin = 0 eventtypeStreamBegin = 0
eventtypeSetBufferLength = 3 eventtypeSetBufferLength = 3
eventtypeStreamIsRecorded = 4 eventtypeStreamIsRecorded = 4
eventtypePingRequest = 6
eventtypePingResponse = 7
) )
func (self *Conn) NetConn() net.Conn { func (self *Conn) NetConn() net.Conn {
@@ -429,11 +430,11 @@ func (self *Conn) writeBasicConf() (err error) {
return return
} }
// > WindowAckSize // > WindowAckSize
if err = self.writeWindowAckSize(5000000); err != nil { if err = self.writeWindowAckSize(1024 * 1024 * 3); err != nil {
return return
} }
// > SetPeerBandwidth // > SetPeerBandwidth
if err = self.writeSetPeerBandwidth(5000000, 2); err != nil { if err = self.writeSetPeerBandwidth(1024*1024*3, 0); err != nil {
return return
} }
return return
@@ -447,18 +448,18 @@ func (self *Conn) readConnect() (err error) {
return return
} }
if self.commandname != "connect" { if self.commandname != "connect" {
err = fmt.Errorf("rtmp: first command is not connect") err = fmt.Errorf("first command is not connect")
return return
} }
if self.commandobj == nil { if self.commandobj == nil {
err = fmt.Errorf("rtmp: connect command params invalid") err = fmt.Errorf("connect command params invalid")
return return
} }
var ok bool var ok bool
var _app, _tcurl interface{} var _app, _tcurl interface{}
if _app, ok = self.commandobj["app"]; !ok { if _app, ok = self.commandobj["app"]; !ok {
err = fmt.Errorf("rtmp: `connect` params missing `app`") err = fmt.Errorf("the `connect` params missing `app`")
return return
} }
connectpath, _ = _app.(string) connectpath, _ = _app.(string)
@@ -521,7 +522,7 @@ func (self *Conn) readConnect() (err error) {
} }
if len(self.commandparams) < 1 { if len(self.commandparams) < 1 {
err = fmt.Errorf("rtmp: publish params invalid") err = fmt.Errorf("publish params invalid")
return return
} }
publishpath, _ := self.commandparams[0].(string) publishpath, _ := self.commandparams[0].(string)
@@ -547,7 +548,7 @@ func (self *Conn) readConnect() (err error) {
} }
if cberr != nil { if cberr != nil {
err = fmt.Errorf("rtmp: OnPlayOrPublish check failed") err = fmt.Errorf("OnPlayOrPublish check failed")
return return
} }
@@ -564,7 +565,7 @@ func (self *Conn) readConnect() (err error) {
} }
if len(self.commandparams) < 1 { if len(self.commandparams) < 1 {
err = fmt.Errorf("rtmp: command play params invalid") err = fmt.Errorf("command play params invalid")
return return
} }
playpath, _ := self.commandparams[0].(string) playpath, _ := self.commandparams[0].(string)
@@ -606,8 +607,6 @@ func (self *Conn) readConnect() (err error) {
} }
} }
return
} }
func (self *Conn) checkConnectResult() (ok bool, errmsg string) { func (self *Conn) checkConnectResult() (ok bool, errmsg string) {
@@ -657,7 +656,7 @@ func (self *Conn) probe() (err error) {
} }
if err = self.prober.PushTag(tag, int32(self.timestamp)); err != nil { if err = self.prober.PushTag(tag, int32(self.timestamp)); err != nil {
if Debug { if Debug {
fmt.Printf("error probing tag: %s\n", err.Error()) fmt.Printf("rtmp: error probing tag: %s\n", err.Error())
} }
} }
} }
@@ -705,7 +704,7 @@ func (self *Conn) writeConnect(path string) (err error) {
var ok bool var ok bool
var errmsg string var errmsg string
if ok, errmsg = self.checkConnectResult(); !ok { if ok, errmsg = self.checkConnectResult(); !ok {
err = fmt.Errorf("rtmp: command connect failed: %s", errmsg) err = fmt.Errorf("command connect failed: %s", errmsg)
return return
} }
if Debug { if Debug {
@@ -718,9 +717,9 @@ func (self *Conn) writeConnect(path string) (err error) {
if len(self.msgdata) == 4 { if len(self.msgdata) == 4 {
self.readAckSize = pio.U32BE(self.msgdata) self.readAckSize = pio.U32BE(self.msgdata)
} }
if err = self.writeWindowAckSize(0xffffffff); err != nil { //if err = self.writeWindowAckSize(0xffffffff); err != nil {
return // return
} //}
} }
} }
} }
@@ -759,7 +758,7 @@ func (self *Conn) connectPublish() (err error) {
if self.commandname == "_result" { if self.commandname == "_result" {
var ok bool var ok bool
if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok { if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok {
err = fmt.Errorf("rtmp: createStream command failed") err = fmt.Errorf("createStream command failed")
return return
} }
break break
@@ -819,7 +818,7 @@ func (self *Conn) connectPlay() (err error) {
if self.commandname == "_result" { if self.commandname == "_result" {
var ok bool var ok bool
if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok { if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok {
err = fmt.Errorf("rtmp: createStream command failed") err = fmt.Errorf("createStream command failed")
return return
} }
break break
@@ -862,11 +861,9 @@ func (self *Conn) ReadPacket() (pkt av.Packet, err error) {
var ok bool var ok bool
if pkt, ok = self.prober.TagToPacket(tag, int32(self.timestamp)); ok { if pkt, ok = self.prober.TagToPacket(tag, int32(self.timestamp)); ok {
return return pkt, nil
} }
} }
return
} }
func (self *Conn) Prepare() (err error) { func (self *Conn) Prepare() (err error) {
@@ -910,7 +907,7 @@ func (self *Conn) prepare(stage int, flags int) (err error) {
return return
} }
} else { } else {
err = fmt.Errorf("rtmp: call WriteHeader() before WritePacket()") err = fmt.Errorf("call WriteHeader() before WritePacket()")
return return
} }
} }
@@ -1135,6 +1132,17 @@ func (self *Conn) writeSetBufferLength(msgsid uint32, timestamp uint32) (err err
return return
} }
func (self *Conn) writePingResponse(timestamp uint32) (err error) {
b := self.tmpwbuf(chunkHeaderLength + 10)
n := self.fillChunkHeader(b, 2, 0, msgtypeidUserControl, 0, 6)
pio.PutU16BE(b[n:], eventtypePingResponse)
n += 2
pio.PutU32BE(b[n:], timestamp)
n += 4
_, err = self.bufw.Write(b[:n])
return
}
const chunkHeaderLength = 12 const chunkHeaderLength = 12
const FlvTimestampMax = 0xFFFFFF const FlvTimestampMax = 0xFFFFFF
@@ -1203,13 +1211,13 @@ func (self *Conn) readChunk() (err error) {
default: // Chunk basic header 1 default: // Chunk basic header 1
case 0: // Chunk basic header 2 case 0: // Chunk basic header 2
if _, err = io.ReadFull(self.bufr, b[:1]); err != nil { if _, err = io.ReadFull(self.bufr, b[:1]); err != nil {
return return fmt.Errorf("chunk basic header 2: %w", err)
} }
n += 1 n += 1
csid = uint32(b[0]) + 64 csid = uint32(b[0]) + 64
case 1: // Chunk basic header 3 case 1: // Chunk basic header 3
if _, err = io.ReadFull(self.bufr, b[:2]); err != nil { if _, err = io.ReadFull(self.bufr, b[:2]); err != nil {
return return fmt.Errorf("chunk basic header 3: %w", err)
} }
n += 2 n += 2
csid = uint32(pio.U16BE(b)) + 64 csid = uint32(pio.U16BE(b)) + 64
@@ -1237,7 +1245,7 @@ func (self *Conn) readChunk() (err error) {
// //
// Figure 9 Chunk Message Header Type 0 // Figure 9 Chunk Message Header Type 0
if cs.msgdataleft != 0 { if cs.msgdataleft != 0 {
err = fmt.Errorf("rtmp: chunk msgdataleft=%d invalid", cs.msgdataleft) err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft)
return return
} }
h := b[:11] h := b[:11]
@@ -1274,7 +1282,7 @@ func (self *Conn) readChunk() (err error) {
// //
// Figure 10 Chunk Message Header Type 1 // Figure 10 Chunk Message Header Type 1
if cs.msgdataleft != 0 { if cs.msgdataleft != 0 {
err = fmt.Errorf("rtmp: chunk msgdataleft=%d invalid", cs.msgdataleft) err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft)
return return
} }
h := b[:7] h := b[:7]
@@ -1309,7 +1317,7 @@ func (self *Conn) readChunk() (err error) {
// //
// Figure 11 Chunk Message Header Type 2 // Figure 11 Chunk Message Header Type 2
if cs.msgdataleft != 0 { if cs.msgdataleft != 0 {
err = fmt.Errorf("rtmp: chunk msgdataleft=%d invalid", cs.msgdataleft) err = fmt.Errorf("chunk msgdataleft=%d invalid", cs.msgdataleft)
return return
} }
h := b[:3] h := b[:3]
@@ -1361,7 +1369,7 @@ func (self *Conn) readChunk() (err error) {
} }
default: default:
err = fmt.Errorf("rtmp: invalid chunk msg header type=%d", msghdrtype) err = fmt.Errorf("invalid chunk msg header type=%d", msghdrtype)
return return
} }
@@ -1389,15 +1397,17 @@ func (self *Conn) readChunk() (err error) {
} }
if err = self.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil { if err = self.handleMsg(cs.timenow, cs.msgsid, cs.msgtypeid, cs.msgdata); err != nil {
return return fmt.Errorf("handleMsg: %w", err)
} }
} }
self.ackn += uint32(n) self.ackn += uint32(n)
if self.readAckSize != 0 && self.ackn > self.readAckSize { if self.readAckSize != 0 && self.ackn > self.readAckSize {
if err = self.writeAck(self.ackn); err != nil { if err = self.writeAck(self.ackn); err != nil {
return return fmt.Errorf("writeACK: %w", err)
} }
self.flushWrite()
self.ackn = 0 self.ackn = 0
} }
@@ -1423,7 +1433,7 @@ func (self *Conn) handleCommandMsgAMF0(b []byte) (n int, err error) {
var ok bool var ok bool
if self.commandname, ok = name.(string); !ok { if self.commandname, ok = name.(string); !ok {
err = fmt.Errorf("rtmp: CommandMsgAMF0 command is not string") err = fmt.Errorf("CommandMsgAMF0 command is not string")
return return
} }
self.commandtransid, _ = transid.(float64) self.commandtransid, _ = transid.(float64)
@@ -1438,7 +1448,7 @@ func (self *Conn) handleCommandMsgAMF0(b []byte) (n int, err error) {
self.commandparams = append(self.commandparams, obj) self.commandparams = append(self.commandparams, obj)
} }
if n < len(b) { if n < len(b) {
err = fmt.Errorf("rtmp: CommandMsgAMF0 left bytes=%d", len(b)-n) err = fmt.Errorf("CommandMsgAMF0 left bytes=%d", len(b)-n)
return return
} }
@@ -1459,7 +1469,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
case msgtypeidCommandMsgAMF3: case msgtypeidCommandMsgAMF3:
if len(msgdata) < 1 { if len(msgdata) < 1 {
err = fmt.Errorf("rtmp: short packet of CommandMsgAMF3") err = fmt.Errorf("short packet of CommandMsgAMF3")
return return
} }
// skip first byte // skip first byte
@@ -1469,11 +1479,21 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
case msgtypeidUserControl: case msgtypeidUserControl:
if len(msgdata) < 2 { if len(msgdata) < 2 {
err = fmt.Errorf("rtmp: short packet of UserControl") err = fmt.Errorf("short packet of UserControl")
return return
} }
self.eventtype = pio.U16BE(msgdata) self.eventtype = pio.U16BE(msgdata)
if self.eventtype == eventtypePingRequest {
if len(msgdata) != 6 {
err = fmt.Errorf("wrong length for UserControl.PingRequest")
return
}
pingtimestamp := pio.U32BE(msgdata[2:])
self.writePingResponse(pingtimestamp)
self.flushWrite()
}
case msgtypeidDataMsgAMF0: case msgtypeidDataMsgAMF0:
b := msgdata b := msgdata
n := 0 n := 0
@@ -1487,7 +1507,7 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
self.datamsgvals = append(self.datamsgvals, obj) self.datamsgvals = append(self.datamsgvals, obj)
} }
if n < len(b) { if n < len(b) {
err = fmt.Errorf("rtmp: DataMsgAMF0 left bytes=%d", len(b)-n) err = fmt.Errorf("DataMsgAMF0 left bytes=%d", len(b)-n)
return return
} }
@@ -1537,11 +1557,17 @@ func (self *Conn) handleMsg(timestamp uint32, msgsid uint32, msgtypeid uint8, ms
case msgtypeidSetChunkSize: case msgtypeidSetChunkSize:
if len(msgdata) < 4 { if len(msgdata) < 4 {
err = fmt.Errorf("rtmp: short packet of SetChunkSize") err = fmt.Errorf("short packet of SetChunkSize")
return return
} }
self.readMaxChunkSize = int(pio.U32BE(msgdata)) self.readMaxChunkSize = int(pio.U32BE(msgdata))
return return
case msgtypeidWindowAckSize:
if len(self.msgdata) != 4 {
return fmt.Errorf("invalid packet of WindowAckSize")
}
self.readAckSize = pio.U32BE(self.msgdata)
return
} }
self.gotmsg = true self.gotmsg = true
@@ -1660,7 +1686,7 @@ func (self *Conn) handshakeClient() (err error) {
} }
if Debug { if Debug {
fmt.Println("rtmp: handshakeClient: server version", S1[4], S1[5], S1[6], S1[7]) fmt.Println("handshakeClient: server version", S1[4], S1[5], S1[6], S1[7])
} }
if ver := pio.U32BE(S1[4:8]); ver != 0 { if ver := pio.U32BE(S1[4:8]); ver != 0 {
@@ -1698,7 +1724,7 @@ func (self *Conn) handshakeServer() (err error) {
return return
} }
if C0[0] != 3 { if C0[0] != 3 {
err = fmt.Errorf("rtmp: handshake version=%d invalid", C0[0]) err = fmt.Errorf("handshake version=%d invalid", C0[0])
return return
} }
@@ -1713,7 +1739,7 @@ func (self *Conn) handshakeServer() (err error) {
var ok bool var ok bool
var digest []byte var digest []byte
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); !ok { if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); !ok {
err = fmt.Errorf("rtmp: handshake server: C1 invalid") err = fmt.Errorf("handshake server: C1 invalid")
return return
} }
hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey) hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey)

View File

@@ -8,13 +8,13 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/datarhei/joy4/utils/bits/pio"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/codec" "github.com/datarhei/joy4/codec"
"github.com/datarhei/joy4/codec/aacparser" "github.com/datarhei/joy4/codec/aacparser"
"github.com/datarhei/joy4/codec/h264parser" "github.com/datarhei/joy4/codec/h264parser"
"github.com/datarhei/joy4/format/rtsp/sdp" "github.com/datarhei/joy4/format/rtsp/sdp"
"github.com/datarhei/joy4/utils/bits/pio"
"io" "io"
"net" "net"
"net/textproto" "net/textproto"
@@ -1236,4 +1236,3 @@ func Handler(h *avutil.RegisterHandler) {
return return
} }
} }

View File

@@ -26,4 +26,3 @@ type Stream struct {
lasttime time.Duration lasttime time.Duration
} }

View File

@@ -3,13 +3,13 @@ package ts
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"time"
"github.com/datarhei/joy4/utils/bits/pio"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/format/ts/tsio"
"github.com/datarhei/joy4/codec/aacparser" "github.com/datarhei/joy4/codec/aacparser"
"github.com/datarhei/joy4/codec/h264parser" "github.com/datarhei/joy4/codec/h264parser"
"github.com/datarhei/joy4/format/ts/tsio"
"github.com/datarhei/joy4/utils/bits/pio"
"io" "io"
"time"
) )
type Demuxer struct { type Demuxer struct {

View File

@@ -1,9 +1,9 @@
package ts package ts
import ( import (
"io"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"io"
) )
func Handler(h *avutil.RegisterHandler) { func Handler(h *avutil.RegisterHandler) {
@@ -23,4 +23,3 @@ func Handler(h *avutil.RegisterHandler) {
h.CodecTypes = CodecTypes h.CodecTypes = CodecTypes
} }

View File

@@ -1,9 +1,9 @@
package ts package ts
import ( import (
"time"
"github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/format/ts/tsio" "github.com/datarhei/joy4/format/ts/tsio"
"time"
) )
type Stream struct { type Stream struct {
@@ -24,4 +24,3 @@ type Stream struct {
data []byte data []byte
datalen int datalen int
} }

View File

@@ -52,4 +52,3 @@ func calcCRC32(crc uint32, data []byte) uint32 {
} }
return crc return crc
} }

View File

@@ -1,11 +1,10 @@
package tsio package tsio
import ( import (
"io"
"time"
"fmt" "fmt"
"github.com/datarhei/joy4/utils/bits/pio" "github.com/datarhei/joy4/utils/bits/pio"
"io"
"time"
) )
const ( const (
@@ -587,4 +586,3 @@ func ParseTSHeader(tshdr []byte) (pid uint16, start bool, iskeyframe bool, hdrle
} }
return return
} }

View File

@@ -1,5 +1,3 @@
package pio package pio
var RecommendBufioSize = 1024 * 64 var RecommendBufioSize = 1024 * 64

View File

@@ -1,4 +1,3 @@
package pio package pio
func U8(b []byte) (i uint8) { func U8(b []byte) (i uint8) {
@@ -7,85 +6,116 @@ func U8(b []byte) (i uint8) {
func U16BE(b []byte) (i uint16) { func U16BE(b []byte) (i uint16) {
i = uint16(b[0]) i = uint16(b[0])
i <<= 8; i |= uint16(b[1]) i <<= 8
i |= uint16(b[1])
return return
} }
func I16BE(b []byte) (i int16) { func I16BE(b []byte) (i int16) {
i = int16(b[0]) i = int16(b[0])
i <<= 8; i |= int16(b[1]) i <<= 8
i |= int16(b[1])
return return
} }
func I24BE(b []byte) (i int32) { func I24BE(b []byte) (i int32) {
i = int32(int8(b[0])) i = int32(int8(b[0]))
i <<= 8; i |= int32(b[1]) i <<= 8
i <<= 8; i |= int32(b[2]) i |= int32(b[1])
i <<= 8
i |= int32(b[2])
return return
} }
func U24BE(b []byte) (i uint32) { func U24BE(b []byte) (i uint32) {
i = uint32(b[0]) i = uint32(b[0])
i <<= 8; i |= uint32(b[1]) i <<= 8
i <<= 8; i |= uint32(b[2]) i |= uint32(b[1])
i <<= 8
i |= uint32(b[2])
return return
} }
func I32BE(b []byte) (i int32) { func I32BE(b []byte) (i int32) {
i = int32(int8(b[0])) i = int32(int8(b[0]))
i <<= 8; i |= int32(b[1]) i <<= 8
i <<= 8; i |= int32(b[2]) i |= int32(b[1])
i <<= 8; i |= int32(b[3]) i <<= 8
i |= int32(b[2])
i <<= 8
i |= int32(b[3])
return return
} }
func U32LE(b []byte) (i uint32) { func U32LE(b []byte) (i uint32) {
i = uint32(b[3]) i = uint32(b[3])
i <<= 8; i |= uint32(b[2]) i <<= 8
i <<= 8; i |= uint32(b[1]) i |= uint32(b[2])
i <<= 8; i |= uint32(b[0]) i <<= 8
i |= uint32(b[1])
i <<= 8
i |= uint32(b[0])
return return
} }
func U32BE(b []byte) (i uint32) { func U32BE(b []byte) (i uint32) {
i = uint32(b[0]) i = uint32(b[0])
i <<= 8; i |= uint32(b[1]) i <<= 8
i <<= 8; i |= uint32(b[2]) i |= uint32(b[1])
i <<= 8; i |= uint32(b[3]) i <<= 8
i |= uint32(b[2])
i <<= 8
i |= uint32(b[3])
return return
} }
func U40BE(b []byte) (i uint64) { func U40BE(b []byte) (i uint64) {
i = uint64(b[0]) i = uint64(b[0])
i <<= 8; i |= uint64(b[1]) i <<= 8
i <<= 8; i |= uint64(b[2]) i |= uint64(b[1])
i <<= 8; i |= uint64(b[3]) i <<= 8
i <<= 8; i |= uint64(b[4]) i |= uint64(b[2])
i <<= 8
i |= uint64(b[3])
i <<= 8
i |= uint64(b[4])
return return
} }
func U64BE(b []byte) (i uint64) { func U64BE(b []byte) (i uint64) {
i = uint64(b[0]) i = uint64(b[0])
i <<= 8; i |= uint64(b[1]) i <<= 8
i <<= 8; i |= uint64(b[2]) i |= uint64(b[1])
i <<= 8; i |= uint64(b[3]) i <<= 8
i <<= 8; i |= uint64(b[4]) i |= uint64(b[2])
i <<= 8; i |= uint64(b[5]) i <<= 8
i <<= 8; i |= uint64(b[6]) i |= uint64(b[3])
i <<= 8; i |= uint64(b[7]) i <<= 8
i |= uint64(b[4])
i <<= 8
i |= uint64(b[5])
i <<= 8
i |= uint64(b[6])
i <<= 8
i |= uint64(b[7])
return return
} }
func I64BE(b []byte) (i int64) { func I64BE(b []byte) (i int64) {
i = int64(int8(b[0])) i = int64(int8(b[0]))
i <<= 8; i |= int64(b[1]) i <<= 8
i <<= 8; i |= int64(b[2]) i |= int64(b[1])
i <<= 8; i |= int64(b[3]) i <<= 8
i <<= 8; i |= int64(b[4]) i |= int64(b[2])
i <<= 8; i |= int64(b[5]) i <<= 8
i <<= 8; i |= int64(b[6]) i |= int64(b[3])
i <<= 8; i |= int64(b[7]) i <<= 8
i |= int64(b[4])
i <<= 8
i |= int64(b[5])
i <<= 8
i |= int64(b[6])
i <<= 8
i |= int64(b[7])
return return
} }

View File

@@ -66,4 +66,3 @@ func VecSlice(in [][]byte, s int, e int) (out [][]byte) {
out = out[:n] out = out[:n]
return return
} }

View File

@@ -1,4 +1,3 @@
package pio package pio
func PutU8(b []byte, v uint8) { func PutU8(b []byte, v uint8) {
@@ -86,4 +85,3 @@ func PutI64BE(b []byte, v int64) {
b[6] = byte(v >> 8) b[6] = byte(v >> 8)
b[7] = byte(v) b[7] = byte(v)
} }

2
vendor/modules.txt vendored
View File

@@ -53,7 +53,7 @@ github.com/datarhei/gosrt/internal/congestion
github.com/datarhei/gosrt/internal/crypto github.com/datarhei/gosrt/internal/crypto
github.com/datarhei/gosrt/internal/net github.com/datarhei/gosrt/internal/net
github.com/datarhei/gosrt/internal/packet github.com/datarhei/gosrt/internal/packet
# github.com/datarhei/joy4 v0.0.0-20210125162555-2102a8289cce # github.com/datarhei/joy4 v0.0.0-20220728180719-f752080f4a36
## explicit; go 1.14 ## explicit; go 1.14
github.com/datarhei/joy4/av github.com/datarhei/joy4/av
github.com/datarhei/joy4/av/avutil github.com/datarhei/joy4/av/avutil