diff --git a/example/default/config.yaml b/example/default/config.yaml index f088e57..33c780c 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,8 +1,8 @@ global: -# loglevel: debug - enableauth: true - tcp: - listenaddr: :50051 +# loglevel: trace +# enableauth: true +# tcp: +# listenaddr: :50051 # ringsize: 20-250 # buffertime: 10s # speed: 1 diff --git a/example/default/main.go b/example/default/main.go index 21c674d..4b51e6b 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -7,6 +7,7 @@ import ( "m7s.live/m7s/v5" _ "m7s.live/m7s/v5/plugin/console" _ "m7s.live/m7s/v5/plugin/debug" + _ "m7s.live/m7s/v5/plugin/gb28181" _ "m7s.live/m7s/v5/plugin/hdl" _ "m7s.live/m7s/v5/plugin/logrotate" _ "m7s.live/m7s/v5/plugin/mp4" diff --git a/example/default/recordmp4.yaml b/example/default/recordmp4.yaml new file mode 100644 index 0000000..d8ebe41 --- /dev/null +++ b/example/default/recordmp4.yaml @@ -0,0 +1,7 @@ +global: + loglevel: debug +mp4: + record: + enableregexp: true + recordlist: + .+: record/$0.mp4 \ No newline at end of file diff --git a/go.mod b/go.mod index b049178..9f2c2b0 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,10 @@ require ( github.com/cilium/ebpf v0.15.0 github.com/deepch/vdk v0.0.27 github.com/emiago/sipgo v0.22.0 + github.com/glebarez/sqlite v1.11.0 github.com/google/gopacket v1.1.19 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 + github.com/icholy/digest v0.1.22 github.com/pion/interceptor v0.1.29 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.6 @@ -19,11 +21,13 @@ require ( github.com/pion/webrtc/v3 v3.2.12 github.com/polarsignals/frostdb v0.0.0-20240613134636-1d823f7d7299 github.com/quic-go/quic-go v0.43.1 + github.com/rs/zerolog v1.33.0 github.com/vishvananda/netlink v1.1.0 github.com/yapingcat/gomedia v0.0.0-20240601043430-920523f8e5c7 google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.33.0 + gorm.io/gorm v1.25.11 ) require ( @@ -44,6 +48,7 @@ require ( github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/efficientgo/core v1.0.0-rc.2 // indirect + github.com/glebarez/go-sqlite v1.21.2 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -56,7 +61,8 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hamba/avro/v2 v2.20.1 // indirect - github.com/icholy/digest v0.1.22 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.8 // indirect @@ -70,6 +76,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/oklog/ulid/v2 v2.1.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect @@ -96,9 +103,9 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - github.com/rs/zerolog v1.33.0 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect github.com/segmentio/encoding v0.3.6 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect @@ -116,6 +123,10 @@ require ( golang.org/x/text v0.16.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + modernc.org/libc v1.41.0 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.29.5 // indirect ) require ( diff --git a/go.sum b/go.sum index 011323b..b8b01f3 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,10 @@ github.com/emiago/sipgo v0.22.0/go.mod h1:a77FgPEEjJvfYWYfP3p53u+dNhWEMb/VGVS6gu github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= +github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k= +github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw= +github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= @@ -123,6 +127,10 @@ github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSo github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/icholy/digest v0.1.22 h1:dRIwCjtAcXch57ei+F0HSb5hmprL873+q7PoVojdMzM= github.com/icholy/digest v0.1.22/go.mod h1:uLAeDdWKIWNFMH0wqbwchbTQOmJWhzSnL7zmqSPqEEc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -164,6 +172,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= @@ -269,6 +279,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/quic-go/quic-go v0.43.1 h1:fLiMNfQVe9q2JvSsiXo4fXOEguXHGGl9+6gLp4RPeZQ= github.com/quic-go/quic-go v0.43.1/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -509,6 +521,16 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= +gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY= gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= +modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= +modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/sqlite v1.29.5 h1:8l/SQKAjDtZFo9lkJLdk8g9JEOeYRG4/ghStDCCTiTE= +modernc.org/sqlite v1.29.5/go.mod h1:S02dvcmm7TnTRvGhv8IGYyLnIt7AS2KPaB1F/71p75U= diff --git a/pkg/adts.go b/pkg/adts.go new file mode 100644 index 0000000..2f70155 --- /dev/null +++ b/pkg/adts.go @@ -0,0 +1,80 @@ +package pkg + +import ( + "github.com/deepch/vdk/codec/aacparser" + "io" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + "time" +) + +var _ IAVFrame = (*ADTS)(nil) + +type ADTS struct { + DTS time.Duration + util.RecyclableMemory +} + +func (A *ADTS) Parse(track *AVTrack) (err error) { + if track.ICodecCtx == nil { + var ctx = &codec.AACCtx{} + var reader = A.NewReader() + var adts []byte + adts, err = reader.ReadBytes(7) + if err != nil { + return err + } + var hdrlen, framelen, samples int + ctx.Config, hdrlen, framelen, samples, err = aacparser.ParseADTSHeader(adts) + if err != nil { + return err + } + track.ICodecCtx = ctx + track.Info("ADTS", "hdrlen", hdrlen, "framelen", framelen, "samples", samples) + } + track.Value.Raw, err = A.Demux(track.ICodecCtx) + return +} + +func (A *ADTS) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) { + return ctx.GetBase(), nil, nil +} + +func (A *ADTS) Demux(ctx codec.ICodecCtx) (any, error) { + var reader = A.NewReader() + err := reader.Skip(7) + var mem util.Memory + reader.Range(mem.AppendOne) + return mem, err +} + +func (A *ADTS) Mux(ctx codec.ICodecCtx, frame *AVFrame) { + aacCtx := ctx.GetBase().(*codec.AACCtx) + A.InitRecycleIndexes(1) + adts := A.NextN(7) + raw := frame.Raw.(util.Memory) + aacparser.FillADTSHeader(adts, aacCtx.Config, raw.Size/aacCtx.GetSampleSize(), raw.Size) + A.Append(raw.Buffers...) +} + +func (A *ADTS) GetTimestamp() time.Duration { + return A.DTS * time.Millisecond / 90 +} + +func (A *ADTS) GetCTS() time.Duration { + return 0 +} + +func (A *ADTS) GetSize() int { + return A.Size +} + +func (A *ADTS) String() string { + //TODO implement me + panic("implement me") +} + +func (A *ADTS) Dump(b byte, writer io.Writer) { + //TODO implement me + panic("implement me") +} diff --git a/pkg/annexb.go b/pkg/annexb.go index 19d4f67..cdef087 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -3,6 +3,8 @@ package pkg import ( "encoding/binary" "fmt" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/codec/h265parser" "io" "time" @@ -10,9 +12,12 @@ import ( "m7s.live/m7s/v5/pkg/util" ) +var _ IAVFrame = (*AnnexB)(nil) + type AnnexB struct { - PTS time.Duration - DTS time.Duration + Hevc bool + PTS time.Duration + DTS time.Duration util.RecyclableMemory } @@ -42,7 +47,51 @@ func (a *AnnexB) GetCTS() time.Duration { // Parse implements pkg.IAVFrame. func (a *AnnexB) Parse(t *AVTrack) (err error) { - panic("unimplemented") + if a.Hevc { + if t.ICodecCtx == nil { + t.ICodecCtx = &codec.H265Ctx{} + } + } else { + if t.ICodecCtx == nil { + t.ICodecCtx = &codec.H264Ctx{} + } + } + if t.Value.Raw, err = a.Demux(t.ICodecCtx); err != nil { + return + } + for _, nalu := range t.Value.Raw.(Nalus) { + if a.Hevc { + ctx := t.ICodecCtx.(*codec.H265Ctx) + switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { + case h265parser.NAL_UNIT_VPS: + ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()} + case h265parser.NAL_UNIT_SPS: + ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()} + case h265parser.NAL_UNIT_PPS: + ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()} + ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS()) + case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP, + h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, + h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP, + h265parser.NAL_UNIT_CODED_SLICE_CRA: + t.Value.IDR = true + } + } else { + ctx := t.ICodecCtx.(*codec.H264Ctx) + switch codec.ParseH264NALUType(nalu.Buffers[0][0]) { + case codec.NALU_SPS: + ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()} + case codec.NALU_PPS: + ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()} + ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS()) + case codec.NALU_IDR_Picture: + t.Value.IDR = true + } + } + } + return } // String implements pkg.IAVFrame. @@ -51,23 +100,69 @@ func (a *AnnexB) String() string { } // Demux implements pkg.IAVFrame. -func (a *AnnexB) Demux(ctx codec.ICodecCtx) (any, error) { - panic("unimplemented") +func (a *AnnexB) Demux(codecCtx codec.ICodecCtx) (ret any, err error) { + var nalus Nalus + var lastFourBytes [4]byte + var b byte + var shallow util.Memory + shallow.Append(a.Buffers...) + reader := shallow.NewReader() + + gotNalu := func() { + var nalu util.Memory + for buf := range reader.ClipFront { + nalu.AppendOne(buf) + } + nalus = append(nalus, nalu) + + } + + for { + b, err = reader.ReadByte() + if err == nil { + copy(lastFourBytes[:], lastFourBytes[1:]) + lastFourBytes[3] = b + var startCode = 0 + if lastFourBytes == codec.NALU_Delimiter2 { + startCode = 4 + } else if [3]byte(lastFourBytes[1:]) == codec.NALU_Delimiter1 { + startCode = 3 + } + if startCode > 0 { + reader.Unread(startCode) + if reader.Offset() > 0 { + gotNalu() + } + reader.Skip(startCode) + for _ = range reader.ClipFront { + } + } + } else if err == io.EOF { + if reader.Offset() > 0 { + gotNalu() + } + err = nil + break + } + } + ret = nalus + return } func (a *AnnexB) Mux(codecCtx codec.ICodecCtx, frame *AVFrame) { - a.AppendOne(codec.NALU_Delimiter2) + delimiter2 := codec.NALU_Delimiter2[:] + a.AppendOne(delimiter2) if frame.IDR { switch ctx := codecCtx.(type) { case *codec.H264Ctx: - a.Append(ctx.SPS(), codec.NALU_Delimiter2, ctx.PPS(), codec.NALU_Delimiter2) + a.Append(ctx.SPS(), delimiter2, ctx.PPS(), delimiter2) case *codec.H265Ctx: - a.Append(ctx.SPS(), codec.NALU_Delimiter2, ctx.PPS(), codec.NALU_Delimiter2, ctx.VPS(), codec.NALU_Delimiter2) + a.Append(ctx.SPS(), delimiter2, ctx.PPS(), delimiter2, ctx.VPS(), delimiter2) } } for i, nalu := range frame.Raw.(Nalus) { if i > 0 { - a.AppendOne(codec.NALU_Delimiter1) + a.AppendOne(codec.NALU_Delimiter1[:]) } a.Append(nalu.Buffers...) } diff --git a/pkg/av-reader.go b/pkg/av-reader.go index 92e587f..e71d118 100644 --- a/pkg/av-reader.go +++ b/pkg/av-reader.go @@ -12,11 +12,13 @@ const ( READSTATE_INIT = iota READSTATE_FIRST READSTATE_NORMAL + READSTATE_WAITKEY ) const ( SUBMODE_REAL = iota SUBMODE_NOJUMP SUBMODE_BUFFER + SUBMODE_WAITKEY ) type AVRingReader struct { @@ -94,6 +96,13 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) { startRing = idr } r.State = READSTATE_NORMAL + case SUBMODE_WAITKEY: + startRing = r.Track.Ring + if startRing == r.Track.GetIDR() { + r.State = READSTATE_NORMAL + } else { + r.State = READSTATE_WAITKEY + } } if err = r.StartRead(startRing); err != nil { return @@ -133,6 +142,18 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) { time.Sleep(fast) } } + case READSTATE_WAITKEY: + r.Info("wait key frame", "seq", r.Value.Sequence) + for { + if err = r.readFrame(conf.SubMode); err != nil { + return + } + if r.Value.IDR { + r.Info("key frame read", "seq", r.Value.Sequence) + r.State = READSTATE_NORMAL + break + } + } } r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds()) if r.AbsTime == 0 { diff --git a/pkg/codec/h264.go b/pkg/codec/h264.go index fff0ec1..f09e0f7 100644 --- a/pkg/codec/h264.go +++ b/pkg/codec/h264.go @@ -1,7 +1,6 @@ package codec import ( - "bytes" "fmt" "github.com/deepch/vdk/codec/h264parser" ) @@ -87,8 +86,8 @@ const ( var ( NALU_AUD_BYTE = []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0xF0} - NALU_Delimiter1 = []byte{0x00, 0x00, 0x01} - NALU_Delimiter2 = []byte{0x00, 0x00, 0x00, 0x01} + NALU_Delimiter1 = [3]byte{0x00, 0x00, 0x01} + NALU_Delimiter2 = [4]byte{0x00, 0x00, 0x00, 0x01} ) // H.264/AVC视频编码标准中,整个系统框架被分为了两个层面:视频编码层面(VCL)和网络抽象层面(NAL) @@ -96,15 +95,15 @@ var ( // raw byte sequence payload (RBSP) 原始字节序列载荷 // SplitH264 以0x00000001分割H264裸数据 -func SplitH264(payload []byte) (nalus [][]byte) { - for _, v := range bytes.SplitN(payload, NALU_Delimiter2, -1) { - if len(v) == 0 { - continue - } - nalus = append(nalus, bytes.SplitN(v, NALU_Delimiter1, -1)...) - } - return -} +//func SplitH264(payload []byte) (nalus [][]byte) { +// for _, v := range bytes.SplitN(payload, NALU_Delimiter2, -1) { +// if len(v) == 0 { +// continue +// } +// nalus = append(nalus, bytes.SplitN(v, NALU_Delimiter1, -1)...) +// } +// return +//} type ( H264Ctx struct { diff --git a/pkg/config/db.go b/pkg/config/db.go new file mode 100644 index 0000000..cadbcd3 --- /dev/null +++ b/pkg/config/db.go @@ -0,0 +1,6 @@ +package config + +type DB struct { + DBType string `default:"sqlite" desc:"数据库类型"` + DSN string `default:"cascade.db" desc:"数据库文件路径"` +} diff --git a/pkg/config/types.go b/pkg/config/types.go index 57e6965..a4ccaa7 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -156,6 +156,33 @@ func (p *Push) CheckPush(streamPath string) string { return url } +type Record struct { + EnableRegexp bool `desc:"是否启用正则表达式"` // 是否启用正则表达式 + RecordList map[string]string +} + +func (p *Record) GetRecordConfig() *Record { + return p +} + +func (p *Record) CheckRecord(streamPath string) string { + url, ok := p.RecordList[streamPath] + if !ok && p.EnableRegexp { + for k, url := range p.RecordList { + if r, err := regexp.Compile(k); err == nil { + if group := r.FindStringSubmatch(streamPath); group != nil { + for i, value := range group { + url = strings.Replace(url, fmt.Sprintf("$%d", i), value, -1) + } + return url + } + } + return "" + } + } + return url +} + type Common struct { PublicIP string LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别 @@ -168,6 +195,8 @@ type Common struct { UDP Pull Push + Record + DB } type ICommonConf interface { diff --git a/pkg/db/index.go b/pkg/db/index.go new file mode 100644 index 0000000..855aa5a --- /dev/null +++ b/pkg/db/index.go @@ -0,0 +1,5 @@ +package db + +import "gorm.io/gorm" + +var Factory = map[string]func(string) gorm.Dialector{} diff --git a/pkg/db/sqlite.go b/pkg/db/sqlite.go new file mode 100644 index 0000000..a5a29bd --- /dev/null +++ b/pkg/db/sqlite.go @@ -0,0 +1,11 @@ +//go:build sqlite + +package db + +import "github.com/glebarez/sqlite" + +func init() { + Factory["sqlite"] = func(dsn string) gorm.Dialector { + return gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + } +} diff --git a/pkg/raw.go b/pkg/raw.go new file mode 100644 index 0000000..d925619 --- /dev/null +++ b/pkg/raw.go @@ -0,0 +1,130 @@ +package pkg + +import ( + "fmt" + "io" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + "time" +) + +var _ IAVFrame = (*RawAudio)(nil) + +type RawAudio struct { + codec.FourCC + Timestamp time.Duration + util.RecyclableMemory +} + +func (r *RawAudio) Parse(track *AVTrack) error { + if track.ICodecCtx == nil { + switch r.FourCC { + case codec.FourCC_ALAW: + track.ICodecCtx = &codec.PCMACtx{ + AudioCtx: codec.AudioCtx{ + SampleRate: 8000, + Channels: 1, + SampleSize: 8, + }, + } + case codec.FourCC_ULAW: + track.ICodecCtx = &codec.PCMUCtx{ + AudioCtx: codec.AudioCtx{ + SampleRate: 8000, + Channels: 1, + SampleSize: 8, + }, + } + } + } + return nil +} + +func (r *RawAudio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) { + return ctx.GetBase(), nil, nil +} + +func (r *RawAudio) Demux(ctx codec.ICodecCtx) (any, error) { + return r.Memory, nil +} + +func (r *RawAudio) Mux(ctx codec.ICodecCtx, frame *AVFrame) { + r.InitRecycleIndexes(0) + r.Memory = frame.Raw.(util.Memory) + r.Timestamp = frame.Timestamp +} + +func (r *RawAudio) GetTimestamp() time.Duration { + return r.Timestamp +} + +func (r *RawAudio) GetCTS() time.Duration { + return 0 +} + +func (r *RawAudio) GetSize() int { + return r.Size +} + +func (r *RawAudio) String() string { + return fmt.Sprintf("RawAudio{FourCC: %s, Timestamp: %s, Size: %d}", r.FourCC, r.Timestamp, r.Size) +} + +func (r *RawAudio) Dump(b byte, writer io.Writer) { + //TODO implement me + panic("implement me") +} + +var _ IAVFrame = (*H26xFrame)(nil) + +type H26xFrame struct { + Timestamp time.Duration + CTS time.Duration + Nalus + util.RecyclableMemory +} + +func (h *H26xFrame) Parse(track *AVTrack) error { + //TODO implement me + panic("implement me") +} + +func (h *H26xFrame) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) { + return ctx.GetBase(), nil, nil +} + +func (h *H26xFrame) Demux(ctx codec.ICodecCtx) (any, error) { + return h.Nalus, nil +} + +func (h *H26xFrame) Mux(ctx codec.ICodecCtx, frame *AVFrame) { + h.Nalus = frame.Raw.(Nalus) + h.Timestamp = frame.Timestamp + h.CTS = frame.CTS +} + +func (h *H26xFrame) GetTimestamp() time.Duration { + return h.Timestamp +} + +func (h *H26xFrame) GetCTS() time.Duration { + return h.CTS +} + +func (h *H26xFrame) GetSize() int { + var size int + for _, nalu := range h.Nalus { + size += nalu.Size + } + return size +} + +func (h *H26xFrame) String() string { + //TODO implement me + panic("implement me") +} + +func (h *H26xFrame) Dump(b byte, writer io.Writer) { + //TODO implement me + panic("implement me") +} diff --git a/pkg/track.go b/pkg/track.go index ee41a8f..673af12 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -55,9 +55,11 @@ func NewAVTrack(args ...any) (t *AVTrack) { t.RingWriter = NewRingWriter(v.RingSize) t.BufferRange[0] = v.BufferTime t.RingWriter.SLogger = t.Logger + case *util.Promise[struct{}]: + t.ready = v } } - t.ready = util.NewPromise(struct{}{}) + //t.ready = util.NewPromise(struct{}{}) t.Info("create") return } diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 84d4f2c..faa051a 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -9,29 +9,56 @@ import ( const defaultBufSize = 1 << 14 type BufReader struct { - reader io.Reader Allocator *ScalableMemoryAllocator buf MemoryReader BufLen int + feedData func() error } func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) { r = &BufReader{ - reader: reader, Allocator: NewScalableMemoryAllocator(bufLen), BufLen: bufLen, + feedData: func() error { + buf, err := r.Allocator.Read(reader, r.BufLen) + if err != nil { + return err + } + n := len(buf) + r.buf.Buffers = append(r.buf.Buffers, buf) + r.buf.Size += n + r.buf.Length += n + return nil + }, } r.buf.Memory = &Memory{} //fmt.Println("NewBufReaderWithBufLen", uintptr(unsafe.Pointer(r.allocator))) return } - +func NewBufReaderChan(feedChan chan []byte) (r *BufReader) { + r = &BufReader{ + Allocator: NewScalableMemoryAllocator(defaultBufSize), + BufLen: defaultBufSize, + feedData: func() error { + data, ok := <-feedChan + if !ok { + return io.EOF + } + n := len(data) + r.buf.Buffers = append(r.buf.Buffers, data) + r.buf.Size += n + r.buf.Length += n + return nil + }, + } + r.buf.Memory = &Memory{} + return +} func NewBufReader(reader io.Reader) (r *BufReader) { return NewBufReaderWithBufLen(reader, defaultBufSize) } func (r *BufReader) Recycle() { - r.reader = nil r.buf = MemoryReader{} r.Allocator.Recycle() } @@ -57,21 +84,9 @@ func (r *BufReader) Peek(n int) (buf []byte, err error) { return } -func (r *BufReader) eat() error { - buf, err := r.Allocator.Read(r.reader, r.BufLen) - if err != nil { - return err - } - n := len(buf) - r.buf.Buffers = append(r.buf.Buffers, buf) - r.buf.Size += n - r.buf.Length += n - return nil -} - func (r *BufReader) ReadByte() (b byte, err error) { for r.buf.Length == 0 { - if err = r.eat(); err != nil { + if err = r.feedData(); err != nil { return } } @@ -116,7 +131,7 @@ func (r *BufReader) Skip(n int) (err error) { } func (r *BufReader) ReadRange(n int, yield func([]byte)) (err error) { - for r.recycleFront(); n > 0 && err == nil; err = r.eat() { + for r.recycleFront(); n > 0 && err == nil; err = r.feedData() { if r.buf.Length > 0 { if r.buf.Length >= n { r.buf.RangeN(n, yield) diff --git a/pkg/util/buffers.go b/pkg/util/buffers.go index 2ccc6f8..6fe7874 100644 --- a/pkg/util/buffers.go +++ b/pkg/util/buffers.go @@ -220,6 +220,15 @@ func (r *MemoryReader) Skip(n int) error { return nil } +func (r *MemoryReader) Unread(n int) { + r.Length += n + r.offset1 -= n + for r.offset1 < 0 { + r.offset0-- + r.offset1 += len(r.Memory.Buffers[r.offset0]) + } +} + func (r *MemoryReader) forward(n int) { r.Length -= n r.offset1 += n diff --git a/pkg/util/collection.go b/pkg/util/collection.go index e417cc2..df981b0 100644 --- a/pkg/util/collection.go +++ b/pkg/util/collection.go @@ -63,11 +63,11 @@ func (c *Collection[K, T]) Range(f func(T) bool) { } } -func (c *Collection[K, T]) Remove(item T) { - c.RemoveByKey(item.GetKey()) +func (c *Collection[K, T]) Remove(item T) bool { + return c.RemoveByKey(item.GetKey()) } -func (c *Collection[K, T]) RemoveByKey(key K) { +func (c *Collection[K, T]) RemoveByKey(key K) bool { if c.L != nil { c.L.Lock() defer c.L.Unlock() @@ -77,9 +77,10 @@ func (c *Collection[K, T]) RemoveByKey(key K) { if c.Items[i].GetKey() == key { c.Items = slices.Delete(c.Items, i, i+1) c.Length-- - break + return true } } + return false } func (c *Collection[K, T]) Get(key K) (item T, ok bool) { diff --git a/pkg/util/convert.go b/pkg/util/convert.go new file mode 100644 index 0000000..d32bd13 --- /dev/null +++ b/pkg/util/convert.go @@ -0,0 +1,560 @@ +package util + +import ( + "encoding/binary" + "errors" + "io" + "strconv" + "strings" +) + +/* +func ReadByteToUintX(r io.Reader, l int) (data uint64, err error) { + if l%8 != 0 || l > 64 { + return 0, errors.New("disable convert") + } + + bb := make([]byte, l) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + switch l / 8 { + case 1: + { + return uint8(bb[0]), nil + } + case 2: + { + return BigEndian.Uint16(bb), nil + } + case 3: + { + return BigEndian.Uint24(bb), nil + } + case 4: + { + return BigEndian.Uint32(bb), nil + } + case 5: + { + //return BigEndian.Uint40(bb), nil + return 0, errors.New("disable convert") + } + case 6: + { + return BigEndian.Uint48(bb), nil + } + case 7: + { + //return BigEndian.Uint56(bb), nil + return 0, errors.New("disable convert") + } + case 8: + { + return BigEndian.Uint64(bb), nil + } + } + + return 0, errors.New("convert not exist") +} +*/ + +// // 千万注意大小端,RTMP是大端 +func ByteToUint32N(data []byte) (ret uint32, err error) { + if len(data) > 4 { + return 0, errors.New("ByteToUint32N error!") + } + + for i := 0; i < len(data); i++ { + ret <<= 8 + ret |= uint32(data[i]) + } + + return +} + +// // 千万注意大小端,RTMP是大端 +func ByteToUint64N(data []byte) (ret uint64, err error) { + if len(data) > 8 { + return 0, errors.New("ByteToUint64N error!") + } + + for i := 0; i < len(data); i++ { + ret <<= 8 + ret |= uint64(data[i]) + } + + return +} + +// 千万注意大小端,RTMP是大端 +func ByteToUint32(data []byte, bigEndian bool) (ret uint32, err error) { + if bigEndian { + return binary.BigEndian.Uint32(data), nil + } else { + return binary.LittleEndian.Uint32(data), nil + } +} + +func Uint32ToByte(data uint32, bigEndian bool) (ret []byte, err error) { + if bigEndian { + return BigEndian.ToUint32(data), nil + } else { + return LittleEndian.ToUint32(data), nil + } +} + +func ReadByteToUint8(r io.Reader) (data uint8, err error) { + bb := make([]byte, 1) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + return uint8(bb[0]), nil +} + +func ReadByteToUint16(r io.Reader, bigEndian bool) (data uint16, err error) { + bb := make([]byte, 2) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return binary.BigEndian.Uint16(bb), nil + } else { + return binary.LittleEndian.Uint16(bb), nil + } +} + +func ReadByteToUint24(r io.Reader, bigEndian bool) (data uint32, err error) { + bb := make([]byte, 3) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return BigEndian.Uint24(bb), nil + } else { + return LittleEndian.Uint24(bb), nil + } +} + +func ReadByteToUint32(r io.Reader, bigEndian bool) (data uint32, err error) { + bb := make([]byte, 4) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return binary.BigEndian.Uint32(bb), nil + } else { + return binary.LittleEndian.Uint32(bb), nil + } +} + +func ReadByteToUint40(r io.Reader, bigEndian bool) (data uint64, err error) { + bb := make([]byte, 5) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return BigEndian.Uint40(bb), nil + } else { + return LittleEndian.Uint40(bb), nil + } +} + +func ReadByteToUint48(r io.Reader, bigEndian bool) (data uint64, err error) { + bb := make([]byte, 6) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return BigEndian.Uint48(bb), nil + } else { + return LittleEndian.Uint48(bb), nil + } +} + +/* + func ReadByteToUint56(r io.Reader) (data uint64, err error) { + bb := make([]byte, 7) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + return uint8(bb[0]), nil + } +*/ +func BigLittleSwap(v uint) uint { + return (v >> 24) | ((v>>16)&0xff)<<8 | ((v>>8)&0xff)<<16 | (v&0xff)<<24 +} + +func ReadByteToUint64(r io.Reader, bigEndian bool) (data uint64, err error) { + bb := make([]byte, 8) + if _, err := io.ReadFull(r, bb); err != nil { + return 0, err + } + + if bigEndian { + return binary.BigEndian.Uint64(bb), nil + } else { + return binary.LittleEndian.Uint64(bb), nil + } +} + +func WriteUint8ToByte(w io.Writer, data uint8) error { + bb := make([]byte, 8) + bb[0] = byte(data) + _, err := w.Write(bb[:1]) + if err != nil { + return err + } + + return nil +} + +func WriteUint16ToByte(w io.Writer, data uint16, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint16(data) + } else { + bb = LittleEndian.ToUint16(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func WriteUint24ToByte(w io.Writer, data uint32, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint24(data) + } else { + bb = LittleEndian.ToUint24(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func WriteUint32ToByte(w io.Writer, data uint32, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint32(data) + } else { + bb = LittleEndian.ToUint32(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func WriteUint40ToByte(w io.Writer, data uint64, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint40(data) + } else { + bb = LittleEndian.ToUint40(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func WriteUint48ToByte(w io.Writer, data uint64, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint48(data) + } else { + bb = LittleEndian.ToUint48(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func WriteUint64ToByte(w io.Writer, data uint64, bigEndian bool) error { + var bb []byte + if bigEndian { + bb = BigEndian.ToUint64(data) + } else { + bb = LittleEndian.ToUint64(data) + } + + _, err := w.Write(bb) + if err != nil { + return err + } + + return nil +} + +func GetPtsDts(v uint64) uint64 { + // 4 + 3 + 1 + 15 + 1 + 15 + 1 + // 0011 + // 0010 + PTS[30-32] + marker_bit + PTS[29-15] + marker_bit + PTS[14-0] + marker_bit + pts1 := ((v >> 33) & 0x7) << 30 + pts2 := ((v >> 17) & 0x7fff) << 15 + pts3 := ((v >> 1) & 0x7fff) + + return pts1 | pts2 | pts3 +} + +func PutPtsDts(v uint64) uint64 { + // 4 + 3 + 1 + 15 + 1 + 15 + 1 + // 0011 + // 0010 + PTS[30-32] + marker_bit + PTS[29-15] + marker_bit + PTS[14-0] + marker_bit + // 0x100010001 + // 0001 0000 0000 0000 0001 0000 0000 0000 0001 + // 3个 market_it + pts1 := (v >> 30) & 0x7 << 33 + pts2 := (v >> 15) & 0x7fff << 17 + pts3 := (v & 0x7fff) << 1 + + return pts1 | pts2 | pts3 | 0x100010001 +} + +func GetPCR(v uint64) uint64 { + // program_clock_reference_base(33) + Reserved(6) + program_clock_reference_extension(9) + base := v >> 15 + ext := v & 0x1ff + return base*300 + ext +} + +func PutPCR(pcr uint64) uint64 { + base := pcr / 300 + ext := pcr % 300 + return base<<15 | 0x3f<<9 | ext +} + +func GetFillBytes(data byte, n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = data + } + + return b +} +func ToFloat64(num interface{}) float64 { + switch v := num.(type) { + case uint: + return float64(v) + case int: + return float64(v) + case uint8: + return float64(v) + case uint16: + return float64(v) + case uint32: + return float64(v) + case uint64: + return float64(v) + case int8: + return float64(v) + case int16: + return float64(v) + case int32: + return float64(v) + case int64: + return float64(v) + case float64: + return v + case float32: + return float64(v) + } + return 0 +} + +func Conf2Listener(conf string) (protocol string, ports []uint16) { + var port string + protocol, port, _ = strings.Cut(conf, ":") + if r := strings.Split(port, "-"); len(r) == 2 { + min, err := strconv.Atoi(r[0]) + if err != nil { + return + } + max, err := strconv.Atoi(r[1]) + if err != nil { + return + } + if min < max { + ports = append(ports, uint16(min), uint16(max)) + } + } else if p, err := strconv.Atoi(port); err == nil { + ports = append(ports, uint16(p)) + } + return +} + +type littleEndian struct{} + +var LittleEndian littleEndian + +func (littleEndian) Uint16(b []byte) uint16 { return uint16(b[0]) | uint16(b[1])<<8 } +func (littleEndian) Uint24(b []byte) uint32 { return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 } +func (littleEndian) Uint32(b []byte) uint32 { + return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 +} +func (littleEndian) Uint40(b []byte) uint64 { + return uint64(b[0]) | uint64(b[1])<<8 | + uint64(b[2])<<16 | uint64(b[3])<<24 | uint64(b[4])<<32 +} +func (littleEndian) Uint48(b []byte) uint64 { + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | + uint64(b[3])<<24 | uint64(b[4])<<32 | uint64(b[5])<<40 +} +func (littleEndian) Uint64(b []byte) uint64 { + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 +} + +func (littleEndian) ToUint16(v uint16) []byte { + b := make([]byte, 2) + b[0] = byte(v) + b[1] = byte(v >> 8) + return b +} +func (littleEndian) ToUint24(v uint32) []byte { + b := make([]byte, 3) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + return b +} +func (littleEndian) ToUint32(v uint32) []byte { + b := make([]byte, 4) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + b[3] = byte(v >> 24) + return b +} +func (littleEndian) ToUint40(v uint64) []byte { + b := make([]byte, 5) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + b[3] = byte(v >> 24) + b[4] = byte(v >> 32) + return b +} +func (littleEndian) ToUint48(v uint64) []byte { + b := make([]byte, 6) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + b[3] = byte(v >> 24) + b[4] = byte(v >> 32) + b[5] = byte(v >> 40) + return b +} +func (littleEndian) ToUint64(v uint64) []byte { + b := make([]byte, 8) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + b[3] = byte(v >> 24) + b[4] = byte(v >> 32) + b[5] = byte(v >> 40) + b[6] = byte(v >> 48) + b[7] = byte(v >> 56) + return b +} + +type bigEndian struct{} + +var BigEndian bigEndian + +func (bigEndian) Uint16(b []byte) uint16 { return uint16(b[1]) | uint16(b[0])<<8 } +func (bigEndian) Uint24(b []byte) uint32 { return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16 } +func (bigEndian) Uint32(b []byte) uint32 { + return uint32(b[3]) | uint32(b[2])<<8 | uint32(b[1])<<16 | uint32(b[0])<<24 +} +func (bigEndian) Uint40(b []byte) uint64 { + return uint64(b[4]) | uint64(b[3])<<8 | + uint64(b[2])<<16 | uint64(b[1])<<24 | uint64(b[0])<<32 +} +func (bigEndian) Uint48(b []byte) uint64 { + return uint64(b[5]) | uint64(b[4])<<8 | uint64(b[3])<<16 | + uint64(b[2])<<24 | uint64(b[1])<<32 | uint64(b[0])<<40 +} +func (bigEndian) Uint64(b []byte) uint64 { + return uint64(b[7]) | uint64(b[6])<<8 | uint64(b[5])<<16 | uint64(b[4])<<24 | + uint64(b[3])<<32 | uint64(b[2])<<40 | uint64(b[1])<<48 | uint64(b[0])<<56 +} +func (bigEndian) ToUint16(v uint16) []byte { + b := make([]byte, 2) + b[0] = byte(v >> 8) + b[1] = byte(v) + return b +} +func (bigEndian) ToUint24(v uint32) []byte { + b := make([]byte, 3) + b[0] = byte(v >> 16) + b[1] = byte(v >> 8) + b[2] = byte(v) + return b +} +func (bigEndian) ToUint32(v uint32) []byte { + b := make([]byte, 4) + b[0] = byte(v >> 24) + b[1] = byte(v >> 16) + b[2] = byte(v >> 8) + b[3] = byte(v) + return b +} +func (bigEndian) ToUint40(v uint64) []byte { + b := make([]byte, 5) + b[0] = byte(v >> 32) + b[1] = byte(v >> 24) + b[2] = byte(v >> 16) + b[3] = byte(v >> 8) + b[4] = byte(v) + return b +} +func (bigEndian) ToUint48(v uint64) []byte { + b := make([]byte, 6) + b[0] = byte(v >> 40) + b[1] = byte(v >> 32) + b[2] = byte(v >> 24) + b[3] = byte(v >> 16) + b[4] = byte(v >> 8) + b[5] = byte(v) + return b +} +func (bigEndian) ToUint64(v uint64) []byte { + b := make([]byte, 8) + b[0] = byte(v >> 56) + b[1] = byte(v >> 48) + b[2] = byte(v >> 40) + b[3] = byte(v >> 32) + b[4] = byte(v >> 24) + b[5] = byte(v >> 16) + b[6] = byte(v >> 8) + b[7] = byte(v) + return b +} diff --git a/pkg/util/crc32.go b/pkg/util/crc32.go new file mode 100644 index 0000000..0e2a509 --- /dev/null +++ b/pkg/util/crc32.go @@ -0,0 +1,126 @@ +package util + +import ( + "fmt" + "io" + "io/ioutil" +) + +var Crc32_Table = []uint32{ + 0x00000000, 0x77073096, 0xEE0E612C, 0x990951BA, + 0x076DC419, 0x706AF48F, 0xE963A535, 0x9E6495A3, + 0x0EDB8832, 0x79DCB8A4, 0xE0D5E91E, 0x97D2D988, + 0x09B64C2B, 0x7EB17CBD, 0xE7B82D07, 0x90BF1D91, + 0x1DB71064, 0x6AB020F2, 0xF3B97148, 0x84BE41DE, + 0x1ADAD47D, 0x6DDDE4EB, 0xF4D4B551, 0x83D385C7, + 0x136C9856, 0x646BA8C0, 0xFD62F97A, 0x8A65C9EC, + 0x14015C4F, 0x63066CD9, 0xFA0F3D63, 0x8D080DF5, + 0x3B6E20C8, 0x4C69105E, 0xD56041E4, 0xA2677172, + 0x3C03E4D1, 0x4B04D447, 0xD20D85FD, 0xA50AB56B, + 0x35B5A8FA, 0x42B2986C, 0xDBBBC9D6, 0xACBCF940, + 0x32D86CE3, 0x45DF5C75, 0xDCD60DCF, 0xABD13D59, + 0x26D930AC, 0x51DE003A, 0xC8D75180, 0xBFD06116, + 0x21B4F4B5, 0x56B3C423, 0xCFBA9599, 0xB8BDA50F, + 0x2802B89E, 0x5F058808, 0xC60CD9B2, 0xB10BE924, + 0x2F6F7C87, 0x58684C11, 0xC1611DAB, 0xB6662D3D, + 0x76DC4190, 0x01DB7106, 0x98D220BC, 0xEFD5102A, + 0x71B18589, 0x06B6B51F, 0x9FBFE4A5, 0xE8B8D433, + 0x7807C9A2, 0x0F00F934, 0x9609A88E, 0xE10E9818, + 0x7F6A0DBB, 0x086D3D2D, 0x91646C97, 0xE6635C01, + 0x6B6B51F4, 0x1C6C6162, 0x856530D8, 0xF262004E, + 0x6C0695ED, 0x1B01A57B, 0x8208F4C1, 0xF50FC457, + 0x65B0D9C6, 0x12B7E950, 0x8BBEB8EA, 0xFCB9887C, + 0x62DD1DDF, 0x15DA2D49, 0x8CD37CF3, 0xFBD44C65, + 0x4DB26158, 0x3AB551CE, 0xA3BC0074, 0xD4BB30E2, + 0x4ADFA541, 0x3DD895D7, 0xA4D1C46D, 0xD3D6F4FB, + 0x4369E96A, 0x346ED9FC, 0xAD678846, 0xDA60B8D0, + 0x44042D73, 0x33031DE5, 0xAA0A4C5F, 0xDD0D7CC9, + 0x5005713C, 0x270241AA, 0xBE0B1010, 0xC90C2086, + 0x5768B525, 0x206F85B3, 0xB966D409, 0xCE61E49F, + 0x5EDEF90E, 0x29D9C998, 0xB0D09822, 0xC7D7A8B4, + 0x59B33D17, 0x2EB40D81, 0xB7BD5C3B, 0xC0BA6CAD, + 0xEDB88320, 0x9ABFB3B6, 0x03B6E20C, 0x74B1D29A, + 0xEAD54739, 0x9DD277AF, 0x04DB2615, 0x73DC1683, + 0xE3630B12, 0x94643B84, 0x0D6D6A3E, 0x7A6A5AA8, + 0xE40ECF0B, 0x9309FF9D, 0x0A00AE27, 0x7D079EB1, + 0xF00F9344, 0x8708A3D2, 0x1E01F268, 0x6906C2FE, + 0xF762575D, 0x806567CB, 0x196C3671, 0x6E6B06E7, + 0xFED41B76, 0x89D32BE0, 0x10DA7A5A, 0x67DD4ACC, + 0xF9B9DF6F, 0x8EBEEFF9, 0x17B7BE43, 0x60B08ED5, + 0xD6D6A3E8, 0xA1D1937E, 0x38D8C2C4, 0x4FDFF252, + 0xD1BB67F1, 0xA6BC5767, 0x3FB506DD, 0x48B2364B, + 0xD80D2BDA, 0xAF0A1B4C, 0x36034AF6, 0x41047A60, + 0xDF60EFC3, 0xA867DF55, 0x316E8EEF, 0x4669BE79, + 0xCB61B38C, 0xBC66831A, 0x256FD2A0, 0x5268E236, + 0xCC0C7795, 0xBB0B4703, 0x220216B9, 0x5505262F, + 0xC5BA3BBE, 0xB2BD0B28, 0x2BB45A92, 0x5CB36A04, + 0xC2D7FFA7, 0xB5D0CF31, 0x2CD99E8B, 0x5BDEAE1D, + 0x9B64C2B0, 0xEC63F226, 0x756AA39C, 0x026D930A, + 0x9C0906A9, 0xEB0E363F, 0x72076785, 0x05005713, + 0x95BF4A82, 0xE2B87A14, 0x7BB12BAE, 0x0CB61B38, + 0x92D28E9B, 0xE5D5BE0D, 0x7CDCEFB7, 0x0BDBDF21, + 0x86D3D2D4, 0xF1D4E242, 0x68DDB3F8, 0x1FDA836E, + 0x81BE16CD, 0xF6B9265B, 0x6FB077E1, 0x18B74777, + 0x88085AE6, 0xFF0F6A70, 0x66063BCA, 0x11010B5C, + 0x8F659EFF, 0xF862AE69, 0x616BFFD3, 0x166CCF45, + 0xA00AE278, 0xD70DD2EE, 0x4E048354, 0x3903B3C2, + 0xA7672661, 0xD06016F7, 0x4969474D, 0x3E6E77DB, + 0xAED16A4A, 0xD9D65ADC, 0x40DF0B66, 0x37D83BF0, + 0xA9BCAE53, 0xDEBB9EC5, 0x47B2CF7F, 0x30B5FFE9, + 0xBDBDF21C, 0xCABAC28A, 0x53B39330, 0x24B4A3A6, + 0xBAD03605, 0xCDD70693, 0x54DE5729, 0x23D967BF, + 0xB3667A2E, 0xC4614AB8, 0x5D681B02, 0x2A6F2B94, + 0xB40BBE37, 0xC30C8EA1, 0x5A05DF1B, 0x2D02EF8D, +} + +type Crc32Reader struct { + R io.Reader + Crc32 uint32 +} + +type Crc32Writer struct { + W io.Writer + Crc32 uint32 +} + +func (cr *Crc32Reader) Read(b []byte) (n int, err error) { + if n, err = cr.R.Read(b); err != nil { + return + } + + cr.Crc32 = getCrc32(cr.Crc32, b) + + return +} + +func (cr *Crc32Reader) ReadCrc32UIntAndCheck() (err error) { + _, err = io.CopyN(ioutil.Discard, cr, 4) + if err != nil { + return err + } + + if cr.Crc32 != 0 { + err = fmt.Errorf("crc32(%x) != 0", cr.Crc32) + return err + } + + return nil +} + +func (wr *Crc32Writer) Write(b []byte) (n int, err error) { + if n, err = wr.W.Write(b); err != nil { + return + } + + wr.Crc32 = getCrc32(wr.Crc32, b) + + return +} + +func getCrc32(crc uint32, data []byte) uint32 { + for _, v := range data { + crc = Crc32_Table[v^byte(crc)] ^ (crc >> 8) + } + + return crc +} diff --git a/plugin.go b/plugin.go index c33050f..4d44646 100644 --- a/plugin.go +++ b/plugin.go @@ -2,7 +2,9 @@ package m7s import ( "context" + "gorm.io/gorm" "log/slog" + "m7s.live/m7s/v5/pkg/db" "net" "net/http" "os" @@ -81,7 +83,20 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { p.assign() } p.Info("init", "version", plugin.Version) - err := instance.OnInit() + var err error + if p.config.DSN == s.GetCommonConf().DSN { + p.DB = s.DB + } else if p.config.DSN != "" { + if factory, ok := db.Factory[p.config.DBType]; ok { + s.DB, err = gorm.Open(factory(p.config.DSN), &gorm.Config{}) + if err != nil { + s.Error("failed to connect database", "error", err, "dsn", s.config.DSN, "type", s.config.DBType) + p.Disabled = true + return + } + } + } + err = instance.OnInit() if err != nil { p.Error("init", "error", err) p.Stop(err) @@ -111,6 +126,22 @@ type IPlugin interface { OnEvent(any) } +type IRegisterHandler interface { + RegisterHandler() map[string]http.HandlerFunc +} + +type IPullerPlugin interface { + NewPullHandler() PullHandler +} + +type IPusherPlugin interface { + NewPushHandler() PushHandler +} + +type IRecorderPlugin interface { + NewRecordHandler() RecordHandler +} + type ITCPPlugin interface { OnTCPConnect(*net.TCPConn) } @@ -159,6 +190,7 @@ type Plugin struct { config.Config handler IPlugin Server *Server + DB *gorm.DB } func (Plugin) nothing() { @@ -193,9 +225,7 @@ func (p *Plugin) assign() { p.Config.ParseModifyFile(modifyConfig) } var handlerMap map[string]http.HandlerFunc - if v, ok := p.handler.(interface { - RegisterHandler() map[string]http.HandlerFunc - }); ok { + if v, ok := p.handler.(IRegisterHandler); ok { handlerMap = v.RegisterHandler() } p.registerHandler(handlerMap) @@ -314,13 +344,51 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu } } puller.Init(p, streamPath, &puller.Publish, options...) - _, err = p.Server.Call(puller) - if err == nil && pullHandler != nil { + if _, err = p.Server.Call(puller); err != nil { + return + } + if v, ok := p.handler.(IPullerPlugin); pullHandler == nil && ok { + pullHandler = v.NewPullHandler() + } + if pullHandler != nil { err = puller.Start(pullHandler) } return } +func (p *Plugin) Record(streamPath string, filePath string, options ...any) (recorder *Recorder, err error) { + recorder = &Recorder{ + Record: p.config.Record, + } + if err = os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { + return + } + recorder.StreamPath = streamPath + recorder.Subscribe = p.config.Subscribe + if recorder.File, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666); err != nil { + return + } + defer func() { + err = recorder.File.Close() + if info, err := recorder.File.Stat(); err == nil && info.Size() == 0 { + os.Remove(recorder.File.Name()) + } + }() + recorder.Init(p, streamPath, &recorder.Subscribe, options...) + if _, err = p.Server.Call(recorder); err != nil { + return + } + recorder.Publisher.WaitTrack() + var recordHandler RecordHandler + if v, ok := p.handler.(IRecorderPlugin); recordHandler == nil && ok { + recordHandler = v.NewRecordHandler() + } + if recordHandler != nil { + err = recorder.Start(recordHandler) + } + return +} + func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) { subscriber = &Subscriber{Subscribe: p.config.Subscribe} if p.config.EnableAuth { @@ -344,6 +412,7 @@ func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subsc subscriber.Subscribe.SubMode = SUBMODE_BUFFER } _, err = p.Server.Call(subscriber) + subscriber.Publisher.WaitTrack() return } @@ -377,18 +446,24 @@ func (p *Plugin) Push(streamPath string, url string, options ...any) (pusher *Pu pusher.Client.RemoteURL = url pusher.Subscribe = p.config.Subscribe pusher.StreamPath = streamPath + var pushHandler PushHandler for _, option := range options { switch v := option.(type) { case PushHandler: - defer func() { - if err == nil { - pusher.Start(v) - } - }() + pushHandler = v } } pusher.Init(p, streamPath, &pusher.Subscribe, options...) - _, err = p.Server.Call(pusher) + if _, err = p.Server.Call(pusher); err != nil { + return + } + pusher.Publisher.WaitTrack() + if v, ok := p.handler.(IPusherPlugin); pushHandler == nil && ok { + pushHandler = v.NewPushHandler() + } + if pushHandler != nil { + err = pusher.Start(pushHandler) + } return } diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go new file mode 100644 index 0000000..227ffec --- /dev/null +++ b/plugin/gb28181/api.go @@ -0,0 +1,64 @@ +package plugin_gb28181 + +import ( + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/util" + gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg" + "net/http" + "os" + "strings" + "time" +) + +func (gb *GB28181Plugin) replayPS(pub *m7s.Publisher, f *os.File) { + defer f.Close() + var t uint16 + receiver := gb28181.NewReceiver(pub) + go receiver.Demux() + defer close(receiver.FeedChan) + for l := make([]byte, 6); pub.State != m7s.PublisherStateDisposed; time.Sleep(time.Millisecond * time.Duration(t)) { + _, err := f.Read(l) + if err != nil { + return + } + payloadLen := util.ReadBE[int](l[:4]) + payload := make([]byte, payloadLen) + t = util.ReadBE[uint16](l[4:]) + _, err = f.Read(payload) + if err != nil { + return + } + err = receiver.Unmarshal(payload) + if err != nil { + return + } + receiver.FeedChan <- receiver.Payload + } +} + +func (gb *GB28181Plugin) api_ps_replay(w http.ResponseWriter, r *http.Request) { + dump := r.URL.Query().Get("dump") + streamPath := r.PathValue("streamPath") + if dump == "" { + dump = "dump/ps" + } + f, err := os.OpenFile(dump, os.O_RDONLY, 0644) + if err != nil { + util.ReturnError(http.StatusInternalServerError, err.Error(), w, r) + } else { + if streamPath == "" { + if strings.HasPrefix(dump, "/") { + streamPath = "replay" + dump + } else { + streamPath = "replay/" + dump + } + } + var pub *m7s.Publisher + if pub, err = gb.Publish(streamPath, f); err == nil { + go gb.replayPS(pub, f) + util.ReturnOK(w, r) + } else { + util.ReturnError(http.StatusInternalServerError, err.Error(), w, r) + } + } +} diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index bf80760..d2c7d67 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -1,9 +1,18 @@ package plugin_gb28181 import ( + "fmt" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/icholy/digest" + "github.com/rs/zerolog/log" "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/util" + gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg" + "net/http" + "strconv" + "sync" + "time" ) type SipConfig struct { @@ -13,21 +22,130 @@ type SipConfig struct { type GB28181Plugin struct { m7s.Plugin - Sip SipConfig - ua *sipgo.UserAgent - server *sipgo.Server + Username string + Password string + Sip SipConfig + ua *sipgo.UserAgent + server *sipgo.Server + devices util.Collection[string, *gb28181.Device] } var _ = m7s.InstallPlugin[GB28181Plugin]() func (gb *GB28181Plugin) OnInit() (err error) { - gb.ua, err = sipgo.NewUA() // Build user agent - gb.server, err = sipgo.NewServer(gb.ua) // Creating server handle for ua + gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("monibuca" + m7s.Version)) // Build user agent + gb.server, err = sipgo.NewServer(gb.ua) // Creating server handle for ua gb.server.OnRegister(gb.OnRegister) - gb.server.ListenAndServe(gb, "tcp", "") + gb.server.OnMessage(gb.OnMessage) + gb.devices.L = new(sync.RWMutex) + go gb.server.ListenAndServe(gb, "tcp", "") return } +func (gb *GB28181Plugin) RegisterHandler() map[string]http.HandlerFunc { + return map[string]http.HandlerFunc{ + "/api/ps/replay/{streamPath...}": gb.api_ps_replay, + } +} + func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction) { + from := req.From() + if from.Address.User == "" { + gb.Error("OnRegister", "error", "no user") + return + } + isUnregister := false + id := from.Address.User + exp := req.GetHeader("Expires") + if exp == nil { + gb.Error("OnRegister", "error", "no expires") + return + } + expSec, err := strconv.ParseInt(exp.Value(), 10, 32) + if err != nil { + gb.Error("OnRegister", "error", err.Error()) + return + } + if expSec == 0 { + isUnregister = true + } + // 不需要密码情况 + if gb.Username != "" && gb.Password != "" { + h := req.GetHeader("Authorization") + var chal digest.Challenge + var cred *digest.Credentials + var digCred *digest.Credentials + if h == nil { + chal = digest.Challenge{ + Realm: "monibuca-server", + Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()), + Opaque: "monibuca", + Algorithm: "MD5", + } + + res := sip.NewResponseFromRequest(req, http.StatusUnauthorized, "Unathorized", nil) + res.AppendHeader(sip.NewHeader("WWW-Authenticate", chal.String())) + + err = tx.Respond(res) + return + } + + cred, err = digest.ParseCredentials(h.Value()) + if err != nil { + log.Error().Err(err).Msg("parsing creds failed") + err = tx.Respond(sip.NewResponseFromRequest(req, http.StatusUnauthorized, "Bad credentials", nil)) + return + } + + // Check registry + if cred.Username != gb.Username { + err = tx.Respond(sip.NewResponseFromRequest(req, http.StatusNotFound, "Bad authorization header", nil)) + return + } + + // Make digest and compare response + digCred, err = digest.Digest(&chal, digest.Options{ + Method: "REGISTER", + URI: cred.URI, + Username: gb.Username, + Password: gb.Password, + }) + + if err != nil { + gb.Error("Calc digest failed") + err = tx.Respond(sip.NewResponseFromRequest(req, http.StatusUnauthorized, "Bad credentials", nil)) + return + } + + if cred.Response != digCred.Response { + err = tx.Respond(sip.NewResponseFromRequest(req, http.StatusUnauthorized, "Unathorized", nil)) + return + } + err = tx.Respond(sip.NewResponseFromRequest(req, http.StatusOK, "OK", nil)) + } + var d *gb28181.Device + if isUnregister { + if gb.devices.RemoveByKey(id) { + gb.Info("Unregister Device", "id", id) + } else { + return + } + } else { + var ok bool + if d, ok = gb.devices.Get(id); ok { + gb.RecoverDevice(d, req) + } else { + d = gb.StoreDevice(id, req) + } + } + DeviceNonce.Delete(id) + DeviceRegisterCount.Delete(id) + if !isUnregister { + //订阅设备更新 + go d.syncChannels() + } +} + +func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { } diff --git a/plugin/gb28181/pkg/audio.go b/plugin/gb28181/pkg/audio.go new file mode 100644 index 0000000..5a4fb3e --- /dev/null +++ b/plugin/gb28181/pkg/audio.go @@ -0,0 +1,85 @@ +package gb28181 + +import ( + "io" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + mpegts "m7s.live/m7s/v5/plugin/hls/pkg/ts" + "time" +) + +type PSAudio struct { + PTS, DTS uint32 + util.RecyclableMemory + streamType byte +} + +func (es *PSAudio) parsePESPacket(payload util.Memory) (result pkg.IAVFrame, err error) { + if payload.Size < 4 { + err = io.ErrShortBuffer + return + } + var flag, pesHeaderDataLen byte + reader := payload.NewReader() + reader.Skip(1) + //data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1 + err = reader.ReadByteTo(&flag, &pesHeaderDataLen) + if err != nil { + return + } + ptsFlag := flag>>7 == 1 + dtsFlag := (flag&0b0100_0000)>>6 == 1 + if payload.Size < int(pesHeaderDataLen) { + err = io.ErrShortBuffer + return + } + var extraData []byte + extraData, err = reader.ReadBytes(int(pesHeaderDataLen)) + pts, dts := es.PTS, es.DTS + if ptsFlag && len(extraData) > 4 { + pts = uint32(extraData[0]&0b0000_1110) << 29 + pts |= uint32(extraData[1]) << 22 + pts |= uint32(extraData[2]&0b1111_1110) << 14 + pts |= uint32(extraData[3]) << 7 + pts |= uint32(extraData[4]) >> 1 + if dtsFlag && len(extraData) > 9 { + dts = uint32(extraData[5]&0b0000_1110) << 29 + dts |= uint32(extraData[6]) << 22 + dts |= uint32(extraData[7]&0b1111_1110) << 14 + dts |= uint32(extraData[8]) << 7 + dts |= uint32(extraData[9]) >> 1 + } else { + dts = pts + } + } + if pts != es.PTS && es.Memory.Size > 0 { + switch es.streamType { + case mpegts.STREAM_TYPE_AAC: + var adts = &pkg.ADTS{ + DTS: time.Duration(es.PTS), + } + adts.Memory.CopyFrom(&es.Memory) + result = adts + case mpegts.STREAM_TYPE_G711A: + rawAudio := &pkg.RawAudio{ + Timestamp: time.Duration(es.PTS) * time.Millisecond / 90, + FourCC: codec.FourCC_ALAW, + } + rawAudio.Memory.CopyFrom(&es.Memory) + result = rawAudio + case mpegts.STREAM_TYPE_G711U: + rawAudio := &pkg.RawAudio{ + Timestamp: time.Duration(es.PTS) * time.Millisecond / 90, + FourCC: codec.FourCC_ULAW, + } + rawAudio.Memory.CopyFrom(&es.Memory) + result = rawAudio + } + es.Recycle() + es.Memory = util.Memory{} + } + es.PTS, es.DTS = pts, dts + reader.Range(es.AppendOne) + return +} diff --git a/plugin/gb28181/pkg/channel.go b/plugin/gb28181/pkg/channel.go new file mode 100644 index 0000000..6c1ab82 --- /dev/null +++ b/plugin/gb28181/pkg/channel.go @@ -0,0 +1,46 @@ +package gb28181 + +import ( + "log/slog" + "sync/atomic" + "time" +) + +type ChannelStatus string + +const ( + ChannelOnStatus ChannelStatus = "ON" + ChannelOffStatus ChannelStatus = "OFF" +) + +type Channel struct { + Device *Device // 所属设备 + State atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲 + LiveSubSP string // 实时子码流,通过rtsp + GpsTime time.Time // gps时间 + Longitude string // 经度 + Latitude string // 纬度 + *slog.Logger + ChannelInfo +} + +func (c *Channel) GetKey() string { + return c.DeviceID +} + +type ChannelInfo struct { + DeviceID string // 通道ID + ParentID string + Name string + Manufacturer string + Model string + Owner string + CivilCode string + Address string + Port int + Parental int + SafetyWay int + RegisterWay int + Secrecy int + Status ChannelStatus +} diff --git a/plugin/gb28181/pkg/device.go b/plugin/gb28181/pkg/device.go new file mode 100644 index 0000000..2f58396 --- /dev/null +++ b/plugin/gb28181/pkg/device.go @@ -0,0 +1,49 @@ +package gb28181 + +import ( + "github.com/emiago/sipgo/sip" + "log/slog" + "m7s.live/m7s/v5/pkg/util" + "time" +) + +type DeviceStatus string + +const ( + DeviceRegisterStatus DeviceStatus = "REGISTER" + DeviceRecoverStatus DeviceStatus = "RECOVER" + DeviceOnlineStatus DeviceStatus = "ONLINE" + DeviceOfflineStatus DeviceStatus = "OFFLINE" + DeviceAlarmedStatus DeviceStatus = "ALARMED" +) + +type Device struct { + ID string + Name string + Manufacturer string + Model string + Owner string + RegisterTime time.Time + UpdateTime time.Time + LastKeepaliveAt time.Time + Status DeviceStatus + SN int + Addr sip.Addr + SipIP string //设备对应网卡的服务器ip + MediaIP string //设备对应网卡的服务器ip + NetAddr string + channels util.Collection[string, *Channel] + subscriber struct { + CallID string + Timeout time.Time + } + lastSyncTime time.Time + GpsTime time.Time //gps时间 + Longitude string //经度 + Latitude string //纬度 + *slog.Logger +} + +func (d *Device) GetKey() string { + return d.ID +} diff --git a/plugin/gb28181/pkg/transceiver.go b/plugin/gb28181/pkg/transceiver.go new file mode 100644 index 0000000..1305c4a --- /dev/null +++ b/plugin/gb28181/pkg/transceiver.go @@ -0,0 +1,122 @@ +package gb28181 + +import ( + "github.com/pion/rtp" + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/util" + "os" +) + +const ( + StartCodePS = 0x000001ba + StartCodeSYS = 0x000001bb + StartCodeMAP = 0x000001bc + StartCodeVideo = 0x000001e0 + StartCodeAudio = 0x000001c0 + PrivateStreamCode = 0x000001bd + MEPGProgramEndCode = 0x000001b9 +) + +type Receiver struct { + *m7s.Publisher + rtp.Packet + *util.BufReader + FeedChan chan []byte + psm util.Memory + dump *os.File + dumpLen []byte + psVideo PSVideo + psAudio PSAudio +} + +func NewReceiver(puber *m7s.Publisher) *Receiver { + ret := &Receiver{ + Publisher: puber, + FeedChan: make(chan []byte), + } + ret.BufReader = util.NewBufReaderChan(ret.FeedChan) + ret.psVideo.SetAllocator(ret.Allocator) + ret.psAudio.SetAllocator(ret.Allocator) + return ret +} + +func (p *Receiver) ReadPayload() (payload util.Memory, err error) { + payloadlen, err := p.ReadBE(2) + if err != nil { + return + } + return p.ReadBytes(payloadlen) +} + +func (p *Receiver) Demux() { + var payload util.Memory + defer p.Info("demux exit") + for { + code, err := p.ReadBE32(4) + if err != nil { + return + } + p.Debug("demux", "code", code) + switch code { + case StartCodePS: + var psl byte + if err = p.Skip(9); err != nil { + return + } + psl, err = p.ReadByte() + if err != nil { + return + } + psl &= 0x07 + if err = p.Skip(int(psl)); err != nil { + return + } + case StartCodeVideo: + payload, err = p.ReadPayload() + var annexB *pkg.AnnexB + annexB, err = p.psVideo.parsePESPacket(payload) + if annexB != nil { + err = p.WriteVideo(annexB) + } + case StartCodeAudio: + payload, err = p.ReadPayload() + var audioFrame pkg.IAVFrame + audioFrame, err = p.psAudio.parsePESPacket(payload) + if audioFrame != nil { + err = p.WriteAudio(audioFrame) + } + case StartCodeMAP: + p.decProgramStreamMap() + default: + p.ReadPayload() + } + } +} + +func (dec *Receiver) decProgramStreamMap() (err error) { + dec.psm, err = dec.ReadPayload() + if err != nil { + return err + } + var programStreamInfoLen, programStreamMapLen, elementaryStreamInfoLength uint32 + var streamType, elementaryStreamID byte + reader := dec.psm.NewReader() + reader.Skip(2) + programStreamInfoLen, err = reader.ReadBE(2) + reader.Skip(int(programStreamInfoLen)) + programStreamMapLen, err = reader.ReadBE(2) + for programStreamMapLen > 0 { + streamType, err = reader.ReadByte() + elementaryStreamID, err = reader.ReadByte() + if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef { + dec.psVideo.streamType = streamType + } else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf { + dec.psAudio.streamType = streamType + } + elementaryStreamInfoLength, err = reader.ReadBE(2) + reader.Skip(int(elementaryStreamInfoLength)) + programStreamMapLen -= 4 + elementaryStreamInfoLength + } + return nil +} diff --git a/plugin/gb28181/pkg/video.go b/plugin/gb28181/pkg/video.go new file mode 100644 index 0000000..d547515 --- /dev/null +++ b/plugin/gb28181/pkg/video.go @@ -0,0 +1,85 @@ +package gb28181 + +import ( + "io" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + mpegts "m7s.live/m7s/v5/plugin/hls/pkg/ts" + "time" +) + +type PSVideo struct { + PSAudio +} + +func (es *PSVideo) parsePESPacket(payload util.Memory) (result *pkg.AnnexB, err error) { + if payload.Size < 4 { + err = io.ErrShortBuffer + return + } + var flag, pesHeaderDataLen byte + reader := payload.NewReader() + reader.Skip(1) + //data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1 + err = reader.ReadByteTo(&flag, &pesHeaderDataLen) + if err != nil { + return + } + ptsFlag := flag>>7 == 1 + dtsFlag := (flag&0b0100_0000)>>6 == 1 + if payload.Size < int(pesHeaderDataLen) { + err = io.ErrShortBuffer + return + } + var extraData []byte + extraData, err = reader.ReadBytes(int(pesHeaderDataLen)) + pts, dts := es.PTS, es.DTS + if ptsFlag && len(extraData) > 4 { + pts = uint32(extraData[0]&0b0000_1110) << 29 + pts |= uint32(extraData[1]) << 22 + pts |= uint32(extraData[2]&0b1111_1110) << 14 + pts |= uint32(extraData[3]) << 7 + pts |= uint32(extraData[4]) >> 1 + if dtsFlag && len(extraData) > 9 { + dts = uint32(extraData[5]&0b0000_1110) << 29 + dts |= uint32(extraData[6]) << 22 + dts |= uint32(extraData[7]&0b1111_1110) << 14 + dts |= uint32(extraData[8]) << 7 + dts |= uint32(extraData[9]) >> 1 + } else { + dts = pts + } + } + if pts != es.PTS && es.Memory.Size > 0 { + result = &pkg.AnnexB{ + PTS: time.Duration(es.PTS), + DTS: time.Duration(es.DTS), + } + switch es.streamType { + case 0: + //推测编码类型 + switch codec.ParseH264NALUType(es.Memory.Buffers[0][4]) { + case codec.NALU_Non_IDR_Picture, + codec.NALU_IDR_Picture, + codec.NALU_SEI, + codec.NALU_SPS, + codec.NALU_PPS, + codec.NALU_Access_Unit_Delimiter: + default: + result.Hevc = true + } + case mpegts.STREAM_TYPE_H265: + result.Hevc = true + } + result.Memory.CopyFrom(&es.Memory) + // fmt.Println("clone", es.PTS, es.Buffer[4]&0x0f) + es.Recycle() + es.Memory = util.Memory{} + } + es.PTS, es.DTS = pts, dts + // fmt.Println("append", es.PTS, payload[pesHeaderDataLen+4]&0x0f) + reader.Range(es.AppendOne) + // es.Buffer = append(es.Buffer, payload[pesHeaderDataLen:]...) + return +} diff --git a/plugin/hdl/index.go b/plugin/hdl/index.go index b3da0e5..1236e8f 100644 --- a/plugin/hdl/index.go +++ b/plugin/hdl/index.go @@ -22,12 +22,12 @@ const defaultConfig m7s.DefaultYaml = `publish: func (p *HDLPlugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { - go p.Pull(streamPath, url, NewHDLPuller()) + go p.Pull(streamPath, url) } return nil } -var _ = m7s.InstallPlugin[HDLPlugin](defaultConfig) +var _ = m7s.InstallPlugin[HDLPlugin](defaultConfig, NewPullHandler) func (p *HDLPlugin) WriteFlvHeader(sub *m7s.Subscriber) (flv net.Buffers) { at, vt := &sub.Publisher.AudioTrack, &sub.Publisher.VideoTrack diff --git a/plugin/mp4/index.go b/plugin/mp4/index.go index 485faab..76f27cd 100644 --- a/plugin/mp4/index.go +++ b/plugin/mp4/index.go @@ -74,13 +74,21 @@ const defaultConfig m7s.DefaultYaml = `publish: func (p *MP4Plugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { - go p.Pull(streamPath, url, pkg.NewMP4Puller()) + go p.Pull(streamPath, url) } return nil } var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig) +func (p *MP4Plugin) NewPullHandler() m7s.PullHandler { + return pkg.NewMP4Puller() +} + +func (p *MP4Plugin) NewRecordHandler() m7s.RecordHandler { + return pkg.NewMP4Recorder() +} + func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".mp4") if r.URL.RawQuery != "" { diff --git a/plugin/mp4/pkg/box/codec.go b/plugin/mp4/pkg/box/codec.go index af93d93..0376d87 100644 --- a/plugin/mp4/pkg/box/codec.go +++ b/plugin/mp4/pkg/box/codec.go @@ -88,14 +88,13 @@ func getCodecIdByObjectType(objType uint8) MP4_CODEC_TYPE { } func isH264NewAccessUnit(nalu []byte) bool { - nalu_type := codec.H264NaluType(nalu) - switch nalu_type { + switch codec.H264_NAL_TYPE(nalu[0] & 0x1F) { case codec.H264_NAL_AUD, codec.H264_NAL_SPS, codec.H264_NAL_PPS, codec.H264_NAL_SEI: return true case codec.H264_NAL_I_SLICE, codec.H264_NAL_P_SLICE, codec.H264_NAL_SLICE_A, codec.H264_NAL_SLICE_B, codec.H264_NAL_SLICE_C: - firstMbInSlice := codec.GetH264FirstMbInSlice(nalu) + firstMbInSlice := GetH264FirstMbInSlice(nalu) if firstMbInSlice == 0 { return true } @@ -104,8 +103,7 @@ func isH264NewAccessUnit(nalu []byte) bool { } func isH265NewAccessUnit(nalu []byte) bool { - nalu_type := codec.H265NaluType(nalu) - switch nalu_type { + switch codec.H265_NAL_TYPE((nalu[0] >> 1) & 0x3F) { case codec.H265_NAL_AUD, codec.H265_NAL_SPS, codec.H265_NAL_PPS, codec.H265_NAL_SEI, codec.H265_NAL_VPS: return true @@ -117,10 +115,24 @@ func isH265NewAccessUnit(nalu []byte) bool { codec.H265_NAL_SLICE_BLA_W_LP, codec.H265_NAL_SLICE_BLA_W_RADL, codec.H265_NAL_SLICE_BLA_N_LP, codec.H265_NAL_SLICE_IDR_W_RADL, codec.H265_NAL_SLICE_IDR_N_LP, codec.H265_NAL_SLICE_CRA: - firstMbInSlice := codec.GetH265FirstMbInSlice(nalu) + firstMbInSlice := GetH265FirstMbInSlice(nalu) if firstMbInSlice == 0 { return true } } return false } + +func GetH264FirstMbInSlice(nalu []byte) uint64 { + bs := codec.NewBitStream(nalu[1:]) + sliceHdr := &codec.SliceHeader{} + sliceHdr.Decode(bs) + return sliceHdr.First_mb_in_slice +} + +func GetH265FirstMbInSlice(nalu []byte) uint64 { + bs := codec.NewBitStream(nalu[2:]) + sliceHdr := &codec.SliceHeader{} + sliceHdr.Decode(bs) + return sliceHdr.First_mb_in_slice +} diff --git a/plugin/mp4/pkg/box/demuxer.go b/plugin/mp4/pkg/box/demuxer.go index cc4a396..5aba3c5 100644 --- a/plugin/mp4/pkg/box/demuxer.go +++ b/plugin/mp4/pkg/box/demuxer.go @@ -1,10 +1,8 @@ package box import ( - "encoding/binary" "errors" "github.com/deepch/vdk/codec/h264parser" - "github.com/yapingcat/gomedia/go-codec" "io" "m7s.live/m7s/v5/pkg/util" ) @@ -495,114 +493,3 @@ func (demuxer *MovDemuxer) buildSampleList() { } } } - -func (demuxer *MovDemuxer) processH264(avcc []byte, extra *h264ExtraData) []byte { - idr := false - vcl := false - spspps := false - h264 := avcc - for len(h264) > 0 { - nalusize := binary.BigEndian.Uint32(h264) - codec.CovertAVCCToAnnexB(h264) - nalType := codec.H264NaluType(h264) - switch { - case nalType == codec.H264_NAL_PPS: - fallthrough - case nalType == codec.H264_NAL_SPS: - spspps = true - case nalType == codec.H264_NAL_I_SLICE: - idr = true - fallthrough - case nalType >= codec.H264_NAL_P_SLICE && nalType <= codec.H264_NAL_SLICE_C: - vcl = true - } - h264 = h264[4+nalusize:] - } - - if !vcl { - if !spspps { - return avcc - } else { - demuxer.mp4out = append(demuxer.mp4out, avcc...) - } - return nil - } - - if spspps { - demuxer.mp4out = demuxer.mp4out[:0] - return avcc - } - if !idr { - return avcc - } - if len(demuxer.mp4out) > 0 { - out := make([]byte, len(demuxer.mp4out)+len(avcc)) - copy(out, demuxer.mp4out) - copy(out[len(demuxer.mp4out):], avcc) - demuxer.mp4out = demuxer.mp4out[:0] - return out - } - - out := make([]byte, 0) - for _, sps := range extra.spss { - out = append(out, sps...) - } - for _, pps := range extra.ppss { - out = append(out, pps...) - } - out = append(out, avcc...) - return out -} - -func (demuxer *MovDemuxer) processH265(hvcc []byte, extra *h265ExtraData) []byte { - idr := false - vcl := false - spsppsvps := false - h265 := hvcc - for len(h265) > 0 { - nalusize := binary.BigEndian.Uint32(h265) - codec.CovertAVCCToAnnexB(h265) - nalType := codec.H265NaluType(h265) - switch { - case nalType == codec.H265_NAL_VPS: - fallthrough - case nalType == codec.H265_NAL_PPS: - fallthrough - case nalType == codec.H265_NAL_SPS: - spsppsvps = true - case nalType >= codec.H265_NAL_SLICE_BLA_W_LP && nalType <= codec.H265_NAL_SLICE_CRA: - idr = true - fallthrough - case nalType >= codec.H265_NAL_Slice_TRAIL_N && nalType <= codec.H265_NAL_SLICE_RASL_R: - vcl = true - } - h265 = h265[4+nalusize:] - } - if !vcl { - if !spsppsvps { - return hvcc - } else { - demuxer.mp4out = append(demuxer.mp4out, hvcc...) - } - return nil - } - - if spsppsvps { - demuxer.mp4out = demuxer.mp4out[:0] - return hvcc - } - if !idr { - return hvcc - } - if len(demuxer.mp4out) > 0 { - out := make([]byte, len(demuxer.mp4out)+len(hvcc)) - copy(out, demuxer.mp4out) - copy(out[len(demuxer.mp4out):], hvcc) - demuxer.mp4out = demuxer.mp4out[:0] - return out - } - - out := extra.hvccExtra.ToNalus() - out = append(out, hvcc...) - return out -} diff --git a/plugin/mp4/pkg/box/mp4track.go b/plugin/mp4/pkg/box/mp4track.go index c6b0f43..b62de76 100644 --- a/plugin/mp4/pkg/box/mp4track.go +++ b/plugin/mp4/pkg/box/mp4track.go @@ -1,10 +1,8 @@ package box import ( - "errors" - "io" - "github.com/yapingcat/gomedia/go-codec" + "io" ) type sampleCache struct { @@ -275,68 +273,19 @@ func (track *mp4track) makeEmptyStblTable() { track.stbltable.stss = &movstss{} } -func (track *mp4track) writeSample(sample []byte, pts, dts uint64) (err error) { - switch track.cid { - case MP4_CODEC_H264: - err = track.writeH264(sample, pts, dts) - case MP4_CODEC_H265: - err = track.writeH265(sample, pts, dts) - case MP4_CODEC_AAC: - err = track.writeAAC(sample, pts, dts) - case MP4_CODEC_G711A, MP4_CODEC_G711U: - err = track.writeG711(sample, pts, dts) - case MP4_CODEC_MP2, MP4_CODEC_MP3: - err = track.writeMP3(sample, pts, dts) - case MP4_CODEC_OPUS: - err = track.writeOPUS(sample, pts, dts) - } - return err -} - -func (track *mp4track) writeH264(h264 []byte, pts, dts uint64) (err error) { - h264extra, ok := track.extra.(*h264ExtraData) - if !ok { - panic("must init h264ExtraData first") - } - codec.SplitFrameWithStartCode(h264, func(nalu []byte) bool { - nalu_type := codec.H264NaluType(nalu) - switch nalu_type { - case codec.H264_NAL_SPS: - spsid := codec.GetSPSIdWithStartCode(nalu) - for _, sps := range h264extra.spss { - if spsid == codec.GetSPSIdWithStartCode(sps) { - return true - } - } - tmp := make([]byte, len(nalu)) - copy(tmp, nalu) - h264extra.spss = append(h264extra.spss, tmp) - if track.width == 0 || track.height == 0 { - width, height := codec.GetH264Resolution(h264extra.spss[0]) - if track.width == 0 { - track.width = width - } - if track.height == 0 { - track.height = height - } - } - case codec.H264_NAL_PPS: - ppsid := codec.GetPPSIdWithStartCode(nalu) - for _, pps := range h264extra.ppss { - if ppsid == codec.GetPPSIdWithStartCode(pps) { - return true - } - } - tmp := make([]byte, len(nalu)) - copy(tmp, nalu) - h264extra.ppss = append(h264extra.ppss, tmp) - } +func (track *mp4track) writeH264(nalus [][]byte, pts, dts uint64) (err error) { + //h264extra, ok := track.extra.(*h264ExtraData) + //if !ok { + // panic("must init h264ExtraData first") + //} + for _, nalu := range nalus { + nalu_type := codec.H264_NAL_TYPE(nalu[0] & 0x1F) //aud/sps/pps/sei 为帧间隔 //通过first_slice_in_mb来判断,改nalu是否为一帧的开头 if track.lastSample.hasVcl && isH264NewAccessUnit(nalu) { var currentOffset int64 if currentOffset, err = track.writer.Seek(0, io.SeekCurrent); err != nil { - return false + return } entry := sampleEntry{ pts: track.lastSample.pts, @@ -348,7 +297,7 @@ func (track *mp4track) writeH264(h264 []byte, pts, dts uint64) (err error) { } n := 0 if n, err = track.writer.Write(track.lastSample.cache); err != nil { - return false + return } entry.size = uint64(n) track.addSampleEntry(entry) @@ -364,41 +313,22 @@ func (track *mp4track) writeH264(h264 []byte, pts, dts uint64) (err error) { track.lastSample.isKey = true } } - track.lastSample.cache = append(track.lastSample.cache, codec.ConvertAnnexBToAVCC(nalu)...) - return true - }) + track.lastSample.cache = append(track.lastSample.cache, nalu...) + } return } -func (track *mp4track) writeH265(h265 []byte, pts, dts uint64) (err error) { - h265extra, ok := track.extra.(*h265ExtraData) - if !ok { - panic("must init h265ExtraData first") - } - codec.SplitFrameWithStartCode(h265, func(nalu []byte) bool { - nalu_type := codec.H265NaluType(nalu) - switch nalu_type { - case codec.H265_NAL_SPS: - h265extra.hvccExtra.UpdateSPS(nalu) - if track.width == 0 || track.height == 0 { - width, height := codec.GetH265Resolution(nalu) - if track.width == 0 { - track.width = width - } - if track.height == 0 { - track.height = height - } - } - case codec.H265_NAL_PPS: - h265extra.hvccExtra.UpdatePPS(nalu) - case codec.H265_NAL_VPS: - h265extra.hvccExtra.UpdateVPS(nalu) - } - +func (track *mp4track) writeH265(nalus [][]byte, pts, dts uint64) (err error) { + //h265extra, ok := track.extra.(*h265ExtraData) + //if !ok { + // panic("must init h265ExtraData first") + //} + for _, nalu := range nalus { + nalu_type := codec.H265_NAL_TYPE((nalu[0] >> 1) & 0x3F) if track.lastSample.hasVcl && isH265NewAccessUnit(nalu) { var currentOffset int64 if currentOffset, err = track.writer.Seek(0, io.SeekCurrent); err != nil { - return false + return } entry := sampleEntry{ pts: track.lastSample.pts, @@ -410,7 +340,7 @@ func (track *mp4track) writeH265(h265 []byte, pts, dts uint64) (err error) { } n := 0 if n, err = track.writer.Write(track.lastSample.cache); err != nil { - return false + return } entry.size = uint64(n) track.addSampleEntry(entry) @@ -426,60 +356,32 @@ func (track *mp4track) writeH265(h265 []byte, pts, dts uint64) (err error) { track.lastSample.isKey = true } } - track.lastSample.cache = append(track.lastSample.cache, codec.ConvertAnnexBToAVCC(nalu)...) - return true - }) + track.lastSample.cache = append(track.lastSample.cache, nalu...) + } + return } func (track *mp4track) writeAAC(aacframes []byte, pts, dts uint64) (err error) { - aacextra, ok := track.extra.(*aacExtraData) - if !ok { - return errors.New("must init aacExtraData first") - } - if aacextra.asc == nil || len(aacextra.asc) <= 0 { - asc, err := codec.ConvertADTSToASC(aacframes) - if err != nil { - return err - } - aacextra.asc = asc.Encode() - - if track.chanelCount == 0 { - track.chanelCount = asc.Channel_configuration - } - if track.sampleRate == 0 { - track.sampleRate = uint32(codec.AACSampleIdxToSample(int(asc.Sample_freq_index))) - } - if track.sampleBits == 0 { - // aac has no fixed bit depth, so we just set it to the default of 16 - // see AudioSampleEntry (stsd-box) and https://superuser.com/a/1173507 - track.sampleBits = 16 - } - } - var currentOffset int64 if currentOffset, err = track.writer.Seek(0, io.SeekCurrent); err != nil { return } - //某些情况下,aacframes 可能由多个aac帧组成需要分帧,否则quicktime 貌似播放有问题 - codec.SplitAACFrame(aacframes, func(aac []byte) { - entry := sampleEntry{ - pts: pts, - dts: dts, - size: 0, - SampleDescriptionIndex: 1, - offset: uint64(currentOffset), - } - n := 0 - n, err = track.writer.Write(aac[7:]) - if err != nil { - return - } - currentOffset += int64(n) - entry.size = uint64(n) - track.addSampleEntry(entry) - }) - + entry := sampleEntry{ + pts: pts, + dts: dts, + size: 0, + SampleDescriptionIndex: 1, + offset: uint64(currentOffset), + } + n := 0 + n, err = track.writer.Write(aacframes) + if err != nil { + return + } + currentOffset += int64(n) + entry.size = uint64(n) + track.addSampleEntry(entry) return } diff --git a/plugin/mp4/pkg/box/muxer.go b/plugin/mp4/pkg/box/muxer.go index 97956c9..91e84fb 100644 --- a/plugin/mp4/pkg/box/muxer.go +++ b/plugin/mp4/pkg/box/muxer.go @@ -161,9 +161,29 @@ func (muxer *Movmuxer) addTrack(cid MP4_CODEC_TYPE, options ...TrackOption) uint return track.trackId } -func (muxer *Movmuxer) Write(track uint32, data []byte, pts uint64, dts uint64) error { +func (muxer *Movmuxer) WriteAudio(track uint32, sample []byte, dts uint64) (err error) { mp4track := muxer.tracks[track] - err := mp4track.writeSample(data, pts, dts) + switch mp4track.cid { + case MP4_CODEC_AAC: + err = mp4track.writeAAC(sample, dts, dts) + case MP4_CODEC_G711A, MP4_CODEC_G711U: + err = mp4track.writeG711(sample, dts, dts) + case MP4_CODEC_MP2, MP4_CODEC_MP3: + err = mp4track.writeMP3(sample, dts, dts) + case MP4_CODEC_OPUS: + err = mp4track.writeOPUS(sample, dts, dts) + } + return err +} + +func (muxer *Movmuxer) WriteVideo(track uint32, nalus [][]byte, pts uint64, dts uint64) (err error) { + mp4track := muxer.tracks[track] + switch mp4track.cid { + case MP4_CODEC_H264: + err = mp4track.writeH264(nalus, pts, dts) + case MP4_CODEC_H265: + err = mp4track.writeH265(nalus, pts, dts) + } if err != nil { return err } @@ -171,11 +191,6 @@ func (muxer *Movmuxer) Write(track uint32, data []byte, pts uint64, dts uint64) if !muxer.movFlag.isFragment() && !muxer.movFlag.isDash() { return err } - - if isAudio(mp4track.cid) { - return nil - } - // isCustion := muxer.movFlag.has(MP4_FLAG_CUSTOM) isKeyFrag := muxer.movFlag.has(MP4_FLAG_KEYFRAME) if isKeyFrag { @@ -194,13 +209,11 @@ func (muxer *Movmuxer) Write(track uint32, data []byte, pts uint64, dts uint64) } func (muxer *Movmuxer) WriteTrailer() (err error) { - for _, track := range muxer.tracks { if err = track.flush(); err != nil { return } } - switch { case muxer.movFlag.isDash(): case muxer.movFlag.isFragment(): diff --git a/plugin/mp4/pkg/pull.go b/plugin/mp4/pkg/pull.go index 73f8d1b..315154f 100644 --- a/plugin/mp4/pkg/pull.go +++ b/plugin/mp4/pkg/pull.go @@ -25,10 +25,6 @@ func NewMP4Puller() *MP4Puller { } } -func NewPullHandler() m7s.PullHandler { - return NewMP4Puller() -} - func (puller *MP4Puller) Connect(p *m7s.Client) (err error) { if strings.HasPrefix(p.RemoteURL, "http") { var res *http.Response diff --git a/plugin/mp4/pkg/write.go b/plugin/mp4/pkg/write.go new file mode 100644 index 0000000..08bf4ec --- /dev/null +++ b/plugin/mp4/pkg/write.go @@ -0,0 +1,69 @@ +package mp4 + +import ( + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/plugin/mp4/pkg/box" + "time" +) + +type Recorder struct { + *m7s.Subscriber + *box.Movmuxer + videoId uint32 + audioId uint32 +} + +func (r *Recorder) Record(recorder *m7s.Recorder) (err error) { + r.Movmuxer, err = box.CreateMp4Muxer(recorder.File) + if recorder.Publisher.HasAudioTrack() { + audioTrack := recorder.Publisher.AudioTrack + switch ctx := audioTrack.ICodecCtx.GetBase().(type) { + case *codec.AACCtx: + r.audioId = r.AddAudioTrack(box.MP4_CODEC_AAC, box.WithExtraData(ctx.ConfigBytes)) + case *codec.PCMACtx: + r.audioId = r.AddAudioTrack(box.MP4_CODEC_G711A, box.WithAudioSampleRate(uint32(ctx.SampleRate)), box.WithAudioChannelCount(uint8(ctx.Channels)), box.WithAudioSampleBits(uint8(ctx.SampleSize))) + case *codec.PCMUCtx: + r.audioId = r.AddAudioTrack(box.MP4_CODEC_G711U, box.WithAudioSampleRate(uint32(ctx.SampleRate)), box.WithAudioChannelCount(uint8(ctx.Channels)), box.WithAudioSampleBits(uint8(ctx.SampleSize))) + } + } + if recorder.Publisher.HasVideoTrack() { + videoTrack := recorder.Publisher.VideoTrack + switch ctx := videoTrack.ICodecCtx.GetBase().(type) { + case *codec.H264Ctx: + r.videoId = r.AddVideoTrack(box.MP4_CODEC_H264, box.WithExtraData(ctx.Record)) + case *codec.H265Ctx: + r.videoId = r.AddVideoTrack(box.MP4_CODEC_H265, box.WithExtraData(ctx.Record)) + } + } + return m7s.PlayBlock(&recorder.Subscriber, func(audio *pkg.RawAudio) error { + return r.WriteAudio(r.audioId, audio.ToBytes(), uint64(audio.Timestamp/time.Millisecond)) + }, func(video *pkg.H26xFrame) error { + var nalus [][]byte + for _, nalu := range video.Nalus { + nalus = append(nalus, nalu.ToBytes()) + } + return r.WriteVideo(r.videoId, nalus, uint64(video.Timestamp/time.Millisecond), uint64(video.CTS/time.Millisecond)) + }) +} + +func (r *Recorder) Close() { + defer func() { + if err := recover(); err != nil { + r.Error("close", "err", err) + } else { + r.Info("close") + } + }() + err := r.WriteTrailer() + if err != nil { + r.Error("write trailer", "err", err) + } else { + r.Info("write trailer") + } +} + +func NewMP4Recorder() *Recorder { + return &Recorder{} +} diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index a095b29..aabc86e 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -24,19 +24,17 @@ var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp: func (p *RTMPPlugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { - go p.Pull(streamPath, url, &Client{}) + go p.Pull(streamPath, url) } return nil } -func (p *RTMPPlugin) OnPull(puller *m7s.Puller) { - p.OnPublish(&puller.Publisher) +func (p *RTMPPlugin) NewPullHandler() m7s.PullHandler { + return &Client{} } -func (p *RTMPPlugin) OnPublish(puber *m7s.Publisher) { - if remoteURL, ok := p.GetCommonConf().PushList[puber.StreamPath]; ok { - go p.Push(puber.StreamPath, remoteURL, &Client{}) - } +func (p *RTMPPlugin) NewPushHandler() m7s.PushHandler { + return &Client{} } func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index ffc34fa..6bd88f9 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -22,23 +22,21 @@ type RTSPPlugin struct { m7s.Plugin } +func (p *RTSPPlugin) NewPullHandler() m7s.PullHandler { + return &Client{} +} + +func (p *RTSPPlugin) NewPushHandler() m7s.PushHandler { + return &Client{} +} + func (p *RTSPPlugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { - go p.Pull(streamPath, url, &Client{}) + go p.Pull(streamPath, url) } return nil } -func (p *RTSPPlugin) OnPull(puller *m7s.Puller) { - p.OnPublish(&puller.Publisher) -} - -func (p *RTSPPlugin) OnPublish(puber *m7s.Publisher) { - if remoteURL, ok := p.GetCommonConf().PushList[puber.StreamPath]; ok { - go p.Push(puber.StreamPath, remoteURL, &Client{}) - } -} - func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { logger := p.Logger.With("remote", conn.RemoteAddr().String()) var receiver *Receiver diff --git a/publisher.go b/publisher.go index 936545c..bf5cfee 100644 --- a/publisher.go +++ b/publisher.go @@ -59,7 +59,7 @@ type AVTracks struct { } func (t *AVTracks) CreateSubTrack(dataType reflect.Type) (track *AVTrack) { - track = NewAVTrack(dataType, t.AVTrack) + track = NewAVTrack(dataType, t.AVTrack, util.NewPromise(struct{}{})) track.WrapIndex = t.Length t.Add(track) return @@ -69,15 +69,14 @@ type Publisher struct { PubSubBase sync.RWMutex `json:"-" yaml:"-"` config.Publish - State PublisherState - VideoTrack AVTracks - AudioTrack AVTracks - DataTrack *DataTrack - Subscribers util.Collection[int, *Subscriber] `json:"-" yaml:"-"` - GOP int - baseTs time.Duration - lastTs time.Duration - dumpFile *os.File + State PublisherState + AudioTrack, VideoTrack AVTracks + audioReady, videoReady *util.Promise[struct{}] + DataTrack *DataTrack + Subscribers util.Collection[int, *Subscriber] `json:"-" yaml:"-"` + GOP int + baseTs, lastTs time.Duration + dumpFile *os.File } func (p *Publisher) SubscriberRange(yield func(sub *Subscriber) bool) { @@ -182,6 +181,8 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) { func (p *Publisher) Start() { p.Info("publish") + p.audioReady = util.NewPromiseWithTimeout(struct{}{}, time.Second*5) + p.videoReady = util.NewPromiseWithTimeout(struct{}{}, time.Second*5) if p.Dump { f := filepath.Join("./dump", p.StreamPath) os.MkdirAll(filepath.Dir(f), 0666) @@ -229,7 +230,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { } t := p.VideoTrack.AVTrack if t == nil { - t = NewAVTrack(data, p.Logger.With("track", "video"), &p.Publish) + t = NewAVTrack(data, p.Logger.With("track", "video"), &p.Publish, p.videoReady) p.Lock() p.VideoTrack.AVTrack = t p.VideoTrack.Add(t) @@ -327,7 +328,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { } t := p.AudioTrack.AVTrack if t == nil { - t = NewAVTrack(data, p.Logger.With("track", "audio"), &p.Publish) + t = NewAVTrack(data, p.Logger.With("track", "audio"), &p.Publish, p.audioReady) p.Lock() p.AudioTrack.AVTrack = t p.AudioTrack.Add(t) @@ -473,3 +474,13 @@ func (p *Publisher) TakeOver(old *Publisher) { } old.Subscribers = util.Collection[int, *Subscriber]{} } + +func (p *Publisher) WaitTrack() (err error) { + if p.PubVideo { + _, err = p.videoReady.Await() + } + if p.PubAudio { + _, err = p.audioReady.Await() + } + return +} diff --git a/recoder.go b/recoder.go new file mode 100644 index 0000000..3d1384b --- /dev/null +++ b/recoder.go @@ -0,0 +1,26 @@ +package m7s + +import ( + "m7s.live/m7s/v5/pkg/config" + "os" +) + +type RecordHandler interface { + Close() + Record(*Recorder) error +} + +type Recorder struct { + File *os.File + Subscriber + config.Record +} + +func (p *Recorder) GetKey() string { + return p.File.Name() +} + +func (p *Recorder) Start(handler RecordHandler) (err error) { + defer handler.Close() + return handler.Record(p) +} diff --git a/server.go b/server.go index 6e2f73b..aad303a 100644 --- a/server.go +++ b/server.go @@ -3,7 +3,17 @@ package m7s import ( "context" "fmt" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/phsym/console-slog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "gopkg.in/yaml.v3" + "gorm.io/gorm" "log/slog" + "m7s.live/m7s/v5/pb" + . "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/db" + "m7s.live/m7s/v5/pkg/util" "net" "net/http" "os" @@ -13,20 +23,11 @@ import ( "strings" "sync/atomic" "time" - - "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - "github.com/phsym/console-slog" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "gopkg.in/yaml.v3" - "m7s.live/m7s/v5/pb" - . "m7s.live/m7s/v5/pkg" - "m7s.live/m7s/v5/pkg/util" ) var ( Version = "v5.0.0" - MergeConfigs = []string{"Publish", "Subscribe", "HTTP", "PublicIP", "LogLevel", "EnableAuth"} + MergeConfigs = []string{"Publish", "Subscribe", "HTTP", "PublicIP", "LogLevel", "EnableAuth", "DB"} ExecPath = os.Args[0] ExecDir = filepath.Dir(ExecPath) serverIndexG atomic.Uint32 @@ -56,6 +57,7 @@ type Server struct { Streams, Waiting util.Collection[string, *Publisher] Pulls util.Collection[string, *Puller] Pushs util.Collection[string, *Pusher] + Records util.Collection[string, *Recorder] Subscribers util.Collection[int, *Subscriber] LogHandler MultiLogHandler pidG, sidG int @@ -92,6 +94,7 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) { server.Meta = s.Meta server.OnAuthPubs = s.OnAuthPubs server.OnAuthSubs = s.OnAuthSubs + server.DB = s.DB *s = server } return @@ -146,7 +149,15 @@ func (s *Server) run(ctx context.Context, conf any) (err error) { "/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_, "/api/videotrack/sse/{streamPath...}": s.api_VideoTrack_SSE, }) - + if s.config.DSN != "" { + if factory, ok := db.Factory[s.config.DBType]; ok { + s.DB, err = gorm.Open(factory(s.config.DSN), &gorm.Config{}) + if err != nil { + s.Error("failed to connect database", "error", err, "dsn", s.config.DSN, "type", s.config.DBType) + return + } + } + } if httpConf.ListenAddrTLS != "" { s.Info("https listen at ", "addr", httpConf.ListenAddrTLS) go func(addr string) { @@ -303,7 +314,7 @@ func (s *Server) eventLoop() { } event = v.Value case *Puller: - if _, ok := s.Pulls.Get(vv.StreamPath); ok { + if _, ok := s.Pulls.Get(vv.GetKey()); ok { v.Fulfill(ErrStreamExist) continue } else { @@ -317,7 +328,7 @@ func (s *Server) eventLoop() { event = v.Value } case *Pusher: - if _, ok := s.Pushs.Get(vv.StreamPath); ok { + if _, ok := s.Pushs.Get(vv.GetKey()); ok { v.Fulfill(ErrStreamExist) continue } else { @@ -330,6 +341,20 @@ func (s *Server) eventLoop() { s.Pushs.Add(vv) event = v.Value } + case *Recorder: + if _, ok := s.Records.Get(vv.GetKey()); ok { + v.Fulfill(ErrStreamExist) + continue + } else { + err := s.OnSubscribe(&vv.Subscriber) + v.Fulfill(err) + if err != nil { + continue + } + subChan <- vv.Done() + s.Records.Add(vv) + event = v.Value + } } case slog.Handler: s.LogHandler.Add(v) @@ -401,6 +426,21 @@ func (s *Server) OnPublish(publisher *Publisher) error { publisher.TakeOver(waiting) s.Waiting.Remove(waiting) } + for plugin := range s.Plugins.Range { + if plugin.Disabled { + continue + } + if remoteURL := plugin.GetCommonConf().CheckPush(publisher.StreamPath); remoteURL != "" { + if _, ok := plugin.handler.(IPusherPlugin); ok { + go plugin.Push(publisher.StreamPath, remoteURL) + } + } + if filePath := plugin.GetCommonConf().CheckRecord(publisher.StreamPath); filePath != "" { + if _, ok := plugin.handler.(IRecorderPlugin); ok { + go plugin.Record(publisher.StreamPath, filePath) + } + } + } return nil } @@ -428,6 +468,16 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error { publisher.AddSubscriber(subscriber) } else { s.createWait(subscriber.StreamPath).AddSubscriber(subscriber) + for plugin := range s.Plugins.Range { + if plugin.Disabled { + continue + } + if remoteURL := plugin.GetCommonConf().Pull.CheckPullOnSub(subscriber.StreamPath); remoteURL != "" { + if _, ok := plugin.handler.(IPullerPlugin); ok { + go plugin.Pull(subscriber.StreamPath, remoteURL) + } + } + } } return nil }