feat: play rtsp h264

This commit is contained in:
langhuihui
2024-06-27 20:42:14 +08:00
parent 2dbb40dc78
commit 0e28086d02
9 changed files with 153 additions and 54 deletions

View File

@@ -81,7 +81,7 @@ func (a *Annexb264Ctx) CreateFrame(frame *AVFrame) (IAVFrame, error) {
if i > 0 {
annexb.Append(codec.NALU_Delimiter1)
}
annexb.Append(nalu...)
annexb.Append(nalu.Buffers...)
}
return &annexb, nil
}

View File

@@ -126,6 +126,12 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) {
if err = r.readFrame(conf.SubMode); err != nil {
return
}
if conf.SubMode != SUBMODE_REAL {
// 防止过快消费
if fast := r.Value.Timestamp - r.FirstTs - time.Since(r.startTime); fast > 0 && fast < time.Second {
time.Sleep(fast)
}
}
}
r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds())
if r.AbsTime == 0 {

View File

@@ -42,12 +42,10 @@ type (
Dump(byte, io.Writer)
}
Nalu = [][]byte
Nalus struct {
PTS time.Duration
DTS time.Duration
Nalus []Nalu
Nalus []util.Memory
}
AVFrame struct {
DataFrame
@@ -97,15 +95,15 @@ func (df *DataFrame) Ready() {
}
func (nalus *Nalus) H264Type() codec.H264NALUType {
return codec.ParseH264NALUType(nalus.Nalus[0][0][0])
return codec.ParseH264NALUType(nalus.Nalus[0].Buffers[0][0])
}
func (nalus *Nalus) H265Type() codec.H265NALUType {
return codec.ParseH265NALUType(nalus.Nalus[0][0][0])
return codec.ParseH265NALUType(nalus.Nalus[0].Buffers[0][0])
}
func (nalus *Nalus) Append(bytes ...[]byte) {
nalus.Nalus = append(nalus.Nalus, bytes)
func (nalus *Nalus) Append(bytes []byte) {
nalus.Nalus = append(nalus.Nalus, util.Memory{Buffers: net.Buffers{bytes}, Size: len(bytes)})
}
func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error {
@@ -114,9 +112,7 @@ func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error
if err != nil {
return err
}
reader.RangeN(l, func(nalu []byte) {
nalus.Append(nalu)
})
reader.RangeN(l, nalus.Append)
}
return nil
}

View File

@@ -1,7 +1,9 @@
package util
import (
"math/rand"
"testing"
"time"
)
func TestBuffer(t *testing.T) {
@@ -16,3 +18,32 @@ func TestBuffer(t *testing.T) {
}
})
}
func TestReadBytesTo(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
s := RandomString(100)
t.Logf("s:%s", s)
var m Memory
m.Append([]byte(s))
r := m.NewReader()
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
var total []byte
for {
i := seededRand.Intn(10)
if i == 0 {
continue
}
buf := make([]byte, i)
n := r.ReadBytesTo(buf)
t.Logf("n:%d buf:%s", n, string(buf))
total = append(total, buf[:n]...)
if n == 0 {
if string(total) != s {
t.Logf("total:%s", total)
t.Fail()
}
return
}
}
})
}

View File

@@ -18,10 +18,6 @@ type MemoryReader struct {
offset1 int
}
func NewMemoryFromBytes(b ...[]byte) *Memory {
return NewMemory(b)
}
func NewReadableBuffersFromBytes(b ...[]byte) *MemoryReader {
buf := NewMemory(b)
return &MemoryReader{Memory: buf, Length: buf.Size}
@@ -57,6 +53,12 @@ func (m *Memory) CopyTo(buf []byte) {
}
}
func (m *Memory) ToBytes() []byte {
buf := make([]byte, m.Size)
m.CopyTo(buf)
return buf
}
func (m *Memory) Append(b ...[]byte) {
m.Buffers = append(m.Buffers, b...)
for _, level0 := range b {
@@ -96,6 +98,9 @@ func (r *MemoryReader) MoveToEnd() {
}
func (r *MemoryReader) ReadBytesTo(buf []byte) (actual int) {
if r.Length == 0 {
return 0
}
n := len(buf)
curBuf := r.GetCurrent()
curBufLen := len(curBuf)

View File

@@ -2,8 +2,10 @@ package util
import (
"log"
"math/rand"
"os"
"path/filepath"
"time"
)
func Conditoinal[T any](cond bool, t, f T) T {
@@ -19,11 +21,15 @@ func Bit1(b byte, index int) bool {
return b&(1<<(7-index)) != 0
}
func LenOfBuffers(b [][]byte) (n int) {
for _, bb := range b {
n += len(bb)
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func RandomString(length int) string {
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return
return string(b)
}
func initFatalLog() *os.File {

View File

@@ -239,9 +239,9 @@ func createH26xFrame(from *AVFrame, codecID VideoCodecID) (frame IAVFrame, err e
util.PutBE(head[2:5], (nalus.PTS-nalus.DTS)/90) // cts
for _, nalu := range nalus.Nalus {
naluLenM := rtmpVideo.NextN(4)
naluLen := uint32(util.LenOfBuffers(nalu))
naluLen := uint32(nalu.Size)
binary.BigEndian.PutUint32(naluLenM, naluLen)
rtmpVideo.Append(nalu...)
rtmpVideo.Append(nalu.Buffers...)
}
frame = &rtmpVideo
return

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"regexp"
"slices"
"time"
"github.com/pion/rtp"
@@ -76,15 +75,17 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
}
nalus := raw.(Nalus)
for _, nalu := range nalus.Nalus {
switch codec.ParseH264NALUType(nalu[0][0]) {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case codec.NALU_SPS:
ctx = &RTPH264Ctx{}
ctx.SPS = [][]byte{slices.Concat(nalu...)}
ctx.SPSInfo.Unmarshal(ctx.SPS[0])
ctx.SPS = [][]byte{nalu.ToBytes()}
if err = ctx.SPSInfo.Unmarshal(ctx.SPS[0]); err != nil {
return
}
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
case codec.NALU_PPS:
ctx.PPS = [][]byte{slices.Concat(nalu...)}
ctx.PPS = [][]byte{nalu.ToBytes()}
case codec.NALU_IDR_Picture:
isIDR = true
}
@@ -114,17 +115,17 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
}
nalus := raw.(Nalus)
for _, nalu := range nalus.Nalus {
switch codec.ParseH265NALUType(nalu[0][0]) {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case codec.NAL_UNIT_SPS:
ctx = &RTPH265Ctx{}
ctx.SPS = [][]byte{slices.Concat(nalu...)}
ctx.SPS = [][]byte{nalu.ToBytes()}
ctx.SPSInfo.Unmarshal(ctx.SPS[0])
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
case codec.NAL_UNIT_PPS:
ctx.PPS = [][]byte{slices.Concat(nalu...)}
ctx.PPS = [][]byte{nalu.ToBytes()}
case codec.NAL_UNIT_VPS:
ctx.VPS = [][]byte{slices.Concat(nalu...)}
ctx.VPS = [][]byte{nalu.ToBytes()}
case codec.NAL_UNIT_CODED_SLICE_BLA,
codec.NAL_UNIT_CODED_SLICE_BLANT,
codec.NAL_UNIT_CODED_SLICE_BLA_N_LP,
@@ -135,10 +136,9 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
}
}
case "audio/MPEG4-GENERIC", "audio/AAC":
// var ctx RTPAACCtx
// ctx.FourCC = codec.FourCC_MP4A
// ctx.RTPCodecParameters = *r.RTPCodecParameters
// codecCtx = &ctx
var ctx RTPAACCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = &ctx
default:
err = ErrUnsupportCodec
}
@@ -155,9 +155,11 @@ func (h265 *RTPH265Ctx) GetInfo() string {
func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
var r RTPVideo
//r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
r.RTPCodecParameters = &h264.RTPCodecParameters
if len(from.Wraps) > 0 {
r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
}
nalus := from.Raw.(Nalus)
nalutype := nalus.H264Type()
var lastPacket *rtp.Packet
createPacket := func(payload []byte) *rtp.Packet {
h264.SequenceNumber++
@@ -173,22 +175,30 @@ func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
}
return lastPacket
}
if nalutype == codec.NALU_IDR_Picture {
if nalus.H264Type() == codec.NALU_IDR_Picture && len(h264.SPS) > 0 && len(h264.PPS) > 0 {
r.Packets = append(r.Packets, createPacket(h264.SPS[0]), createPacket(h264.PPS[0]))
}
for _, nalu := range nalus.Nalus {
reader := util.NewReadableBuffersFromBytes(nalu...)
if startIndex := len(r.Packets); reader.Length > MTUSize {
if reader := nalu.NewReader(); reader.Length > MTUSize {
//fu-a
for reader.Length > 0 {
mem := r.Malloc(MTUSize)
n := reader.ReadBytesTo(mem[1:])
mem[0] = codec.NALU_FUA.Or(mem[1] & 0x60)
fuaHead := codec.NALU_FUA.Or(mem[1] & 0x60)
mem[0] = fuaHead
naluType := mem[1] & 0x1f
mem[1] = naluType | startBit
r.FreeRest(&mem, n+1)
r.AddRecycleBytes(mem)
r.Packets = append(r.Packets, createPacket(mem))
for reader.Length > 0 {
mem = r.Malloc(MTUSize)
n = reader.ReadBytesTo(mem[2:])
mem[0] = fuaHead
mem[1] = naluType
r.FreeRest(&mem, n+2)
r.AddRecycleBytes(mem)
r.Packets = append(r.Packets, createPacket(mem))
}
r.Packets[startIndex].Payload[1] |= startBit
lastPacket.Payload[1] |= endBit
} else {
mem := r.NextN(reader.Length)
@@ -205,21 +215,21 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
switch ictx.(type) {
case *RTPH264Ctx:
var nalus Nalus
var nalu Nalu
var nalu util.Memory
var naluType codec.H264NALUType
gotNalu := func() {
if len(nalu) > 0 {
if nalu.Size > 0 {
nalus.Nalus = append(nalus.Nalus, nalu)
nalu = nil
nalu = util.Memory{}
}
}
for i := 0; i < len(r.Packets); i++ {
packet := r.Packets[i]
for _, packet := range r.Packets {
nalus.PTS = time.Duration(packet.Timestamp)
// TODO: B-frame
nalus.DTS = nalus.PTS
b0 := packet.Payload[0]
if t := codec.ParseH264NALUType(b0); t < 24 {
nalu = [][]byte{packet.Payload}
nalu.Append(packet.Payload)
gotNalu()
} else {
offset := t.Offset()
@@ -230,7 +240,7 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
}
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu = [][]byte{buffer.ReadN(nextSize)}
nalu.Append(buffer.ReadN(nextSize))
gotNalu()
} else {
return nil, fmt.Errorf("invalid nalu size %d", nextSize)
@@ -240,10 +250,10 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
b1 := packet.Payload[1]
if util.Bit1(b1, 0) {
naluType.Parse(b1)
nalu = [][]byte{{naluType.Or(b0 & 0x60)}}
nalu.Append([]byte{naluType.Or(b0 & 0x60)})
}
if len(nalu) > 0 {
nalu = append(nalu, packet.Payload[offset:])
if nalu.Size > 0 {
nalu.Append(packet.Payload[offset:])
} else {
return nil, errors.New("fu have no start")
}

View File

@@ -0,0 +1,45 @@
package rtp
import (
"github.com/pion/webrtc/v4"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
"testing"
)
func TestRTPH264Ctx_CreateFrame(t *testing.T) {
var ctx = &RTPH264Ctx{
RTPCtx: RTPCtx{
RTPCodecParameters: webrtc.RTPCodecParameters{
PayloadType: 96,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "packetization-mode=1; sprop-parameter-sets=J2QAKaxWgHgCJ+WagICAgQ==,KO48sA==; profile-level-id=640029",
},
},
},
}
var randStr = util.RandomString(1500)
var avFrame = &pkg.AVFrame{}
var mem util.Memory
mem.Append([]byte(randStr))
avFrame.Raw = pkg.Nalus{
Nalus: []util.Memory{mem},
}
f, err := ctx.CreateFrame(avFrame)
if err != nil {
t.Error(err)
return
}
frame := f.(*RTPVideo)
var track = &pkg.AVTrack{}
_, _, raw, err := frame.Parse(track)
if err != nil {
t.Error(err)
return
}
if s := string(raw.(pkg.Nalus).Nalus[0].ToBytes()); s != randStr {
t.Error("not equal", len(s), len(randStr))
}
}