From 1133c734ab66c3be02dd3232ccd4cefd2f9995b2 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 22 Aug 2023 22:56:23 +0200 Subject: [PATCH] support publishing AV1/H265 with OBS 30 (#2217) (#2234) --- internal/rtmp/message/audio.go | 10 +- .../rtmp/message/extended_coded_frames.go | 19 ++- internal/rtmp/message/extended_frames_x.go | 20 ++- internal/rtmp/message/extended_metadata.go | 8 +- .../extended_mpeg2ts_sequence_start.go | 8 +- .../rtmp/message/extended_sequence_end.go | 4 +- .../rtmp/message/extended_sequence_start.go | 21 ++- internal/rtmp/message/message.go | 8 +- internal/rtmp/message/reader.go | 3 +- internal/rtmp/message/reader_test.go | 14 ++ internal/rtmp/message/video.go | 6 +- internal/rtmp/reader.go | 3 +- internal/rtmp/reader_test.go | 145 ++++++++++++------ mediamtx.yml | 4 +- 14 files changed, 193 insertions(+), 80 deletions(-) diff --git a/internal/rtmp/message/audio.go b/internal/rtmp/message/audio.go index 6d611403..59725e02 100644 --- a/internal/rtmp/message/audio.go +++ b/internal/rtmp/message/audio.go @@ -77,15 +77,19 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { return nil } -// Marshal implements Message. -func (m Audio) Marshal() (*rawmessage.Message, error) { +func (m Audio) marshalBodySize() int { var l int if m.Codec == CodecMPEG1Audio { l = 1 + len(m.Payload) } else { l = 2 + len(m.Payload) } - body := make([]byte, l) + return l +} + +// Marshal implements Message. +func (m Audio) Marshal() (*rawmessage.Message, error) { + body := make([]byte, m.marshalBodySize()) body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels diff --git a/internal/rtmp/message/extended_coded_frames.go b/internal/rtmp/message/extended_coded_frames.go index 297d0337..c25118c3 100644 --- a/internal/rtmp/message/extended_coded_frames.go +++ b/internal/rtmp/message/extended_coded_frames.go @@ -12,7 +12,7 @@ type ExtendedCodedFrames struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 - FourCC [4]byte + FourCC FourCC PTSDelta time.Duration Payload []byte } @@ -26,7 +26,7 @@ func (m *ExtendedCodedFrames) Unmarshal(raw *rawmessage.Message) error { m.ChunkStreamID = raw.ChunkStreamID m.DTS = raw.Timestamp m.MessageStreamID = raw.MessageStreamID - copy(m.FourCC[:], raw.Body[1:5]) + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) if m.FourCC == FourCCHEVC { m.PTSDelta = time.Duration(uint32(raw.Body[5])<<16|uint32(raw.Body[6])<<8|uint32(raw.Body[7])) * time.Millisecond @@ -38,18 +38,25 @@ func (m *ExtendedCodedFrames) Unmarshal(raw *rawmessage.Message) error { return nil } -// Marshal implements Message. -func (m ExtendedCodedFrames) Marshal() (*rawmessage.Message, error) { +func (m ExtendedCodedFrames) marshalBodySize() int { var l int if m.FourCC == FourCCHEVC { l = 8 + len(m.Payload) } else { l = 5 + len(m.Payload) } - body := make([]byte, l) + return l +} + +// Marshal implements Message. +func (m ExtendedCodedFrames) Marshal() (*rawmessage.Message, error) { + body := make([]byte, m.marshalBodySize()) body[0] = 0b10000000 | byte(ExtendedTypeCodedFrames) - copy(body[1:5], m.FourCC[:]) + body[1] = uint8(m.FourCC >> 24) + body[2] = uint8(m.FourCC >> 16) + body[3] = uint8(m.FourCC >> 8) + body[4] = uint8(m.FourCC) if m.FourCC == FourCCHEVC { tmp := uint32(m.PTSDelta / time.Millisecond) diff --git a/internal/rtmp/message/extended_frames_x.go b/internal/rtmp/message/extended_frames_x.go index 49b48cac..53c823bf 100644 --- a/internal/rtmp/message/extended_frames_x.go +++ b/internal/rtmp/message/extended_frames_x.go @@ -1,6 +1,7 @@ package message import ( + "fmt" "time" "github.com/bluenviron/mediamtx/internal/rtmp/rawmessage" @@ -11,27 +12,38 @@ type ExtendedFramesX struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 - FourCC [4]byte + FourCC FourCC Payload []byte } // Unmarshal implements Message. func (m *ExtendedFramesX) Unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 6 { + return fmt.Errorf("not enough bytes") + } + m.ChunkStreamID = raw.ChunkStreamID m.DTS = raw.Timestamp m.MessageStreamID = raw.MessageStreamID - copy(m.FourCC[:], raw.Body[1:5]) + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) m.Payload = raw.Body[5:] return nil } +func (m ExtendedFramesX) marshalBodySize() int { + return 5 + len(m.Payload) +} + // Marshal implements Message. func (m ExtendedFramesX) Marshal() (*rawmessage.Message, error) { - body := make([]byte, 5+len(m.Payload)) + body := make([]byte, m.marshalBodySize()) body[0] = 0b10000000 | byte(ExtendedTypeFramesX) - copy(body[1:5], m.FourCC[:]) + body[1] = uint8(m.FourCC >> 24) + body[2] = uint8(m.FourCC >> 16) + body[3] = uint8(m.FourCC >> 8) + body[4] = uint8(m.FourCC) copy(body[5:], m.Payload) return &rawmessage.Message{ diff --git a/internal/rtmp/message/extended_metadata.go b/internal/rtmp/message/extended_metadata.go index e03226e0..2a35cf79 100644 --- a/internal/rtmp/message/extended_metadata.go +++ b/internal/rtmp/message/extended_metadata.go @@ -8,12 +8,16 @@ import ( // ExtendedMetadata is a metadata extended message. type ExtendedMetadata struct { - FourCC [4]byte + FourCC FourCC } // Unmarshal implements Message. func (m *ExtendedMetadata) Unmarshal(raw *rawmessage.Message) error { - copy(m.FourCC[:], raw.Body[1:5]) + if len(raw.Body) != 5 { + return fmt.Errorf("invalid body size") + } + + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) return fmt.Errorf("ExtendedMetadata is not implemented yet") } diff --git a/internal/rtmp/message/extended_mpeg2ts_sequence_start.go b/internal/rtmp/message/extended_mpeg2ts_sequence_start.go index 773863cb..d0546f23 100644 --- a/internal/rtmp/message/extended_mpeg2ts_sequence_start.go +++ b/internal/rtmp/message/extended_mpeg2ts_sequence_start.go @@ -8,12 +8,16 @@ import ( // ExtendedMPEG2TSSequenceStart is a MPEG2-TS sequence start extended message. type ExtendedMPEG2TSSequenceStart struct { - FourCC [4]byte + FourCC FourCC } // Unmarshal implements Message. func (m *ExtendedMPEG2TSSequenceStart) Unmarshal(raw *rawmessage.Message) error { - copy(m.FourCC[:], raw.Body[1:5]) + if len(raw.Body) != 5 { + return fmt.Errorf("invalid body size") + } + + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) return fmt.Errorf("ExtendedMPEG2TSSequenceStart is not implemented yet") } diff --git a/internal/rtmp/message/extended_sequence_end.go b/internal/rtmp/message/extended_sequence_end.go index 79971e9f..c36997ba 100644 --- a/internal/rtmp/message/extended_sequence_end.go +++ b/internal/rtmp/message/extended_sequence_end.go @@ -8,7 +8,7 @@ import ( // ExtendedSequenceEnd is a sequence end extended message. type ExtendedSequenceEnd struct { - FourCC [4]byte + FourCC FourCC } // Unmarshal implements Message. @@ -17,7 +17,7 @@ func (m *ExtendedSequenceEnd) Unmarshal(raw *rawmessage.Message) error { return fmt.Errorf("invalid body size") } - copy(m.FourCC[:], raw.Body[1:5]) + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) return nil } diff --git a/internal/rtmp/message/extended_sequence_start.go b/internal/rtmp/message/extended_sequence_start.go index e75560da..87c62b83 100644 --- a/internal/rtmp/message/extended_sequence_start.go +++ b/internal/rtmp/message/extended_sequence_start.go @@ -1,6 +1,8 @@ package message import ( + "fmt" + "github.com/bluenviron/mediamtx/internal/rtmp/rawmessage" ) @@ -8,26 +10,37 @@ import ( type ExtendedSequenceStart struct { ChunkStreamID byte MessageStreamID uint32 - FourCC [4]byte + FourCC FourCC Config []byte } // Unmarshal implements Message. func (m *ExtendedSequenceStart) Unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 6 { + return fmt.Errorf("not enough bytes") + } + m.ChunkStreamID = raw.ChunkStreamID m.MessageStreamID = raw.MessageStreamID - copy(m.FourCC[:], raw.Body[1:5]) + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) m.Config = raw.Body[5:] return nil } +func (m ExtendedSequenceStart) marshalBodySize() int { + return 5 + len(m.Config) +} + // Marshal implements Message. func (m ExtendedSequenceStart) Marshal() (*rawmessage.Message, error) { - body := make([]byte, 5+len(m.Config)) + body := make([]byte, m.marshalBodySize()) body[0] = 0b10000000 | byte(ExtendedTypeSequenceStart) - copy(body[1:5], m.FourCC[:]) + body[1] = uint8(m.FourCC >> 24) + body[2] = uint8(m.FourCC >> 16) + body[3] = uint8(m.FourCC >> 8) + body[4] = uint8(m.FourCC) copy(body[5:], m.Config) return &rawmessage.Message{ diff --git a/internal/rtmp/message/message.go b/internal/rtmp/message/message.go index fc39dedd..92df1f09 100644 --- a/internal/rtmp/message/message.go +++ b/internal/rtmp/message/message.go @@ -61,13 +61,13 @@ const ( ) // FourCC is an identifier of a video codec. -type FourCC [4]byte +type FourCC uint32 // video codec identifiers. var ( - FourCCAV1 FourCC = [4]byte{'a', 'v', '0', '1'} - FourCCVP9 FourCC = [4]byte{'v', 'p', '0', '9'} - FourCCHEVC FourCC = [4]byte{'h', 'v', 'c', '1'} + FourCCAV1 FourCC = 'a'<<24 | 'v'<<16 | '0'<<8 | '1' + FourCCVP9 FourCC = 'v'<<24 | 'p'<<16 | '0'<<8 | '9' + FourCCHEVC FourCC = 'h'<<24 | 'v'<<16 | 'c'<<8 | '1' ) // Message is a message. diff --git a/internal/rtmp/message/reader.go b/internal/rtmp/message/reader.go index 0aad13c0..39e9a1ca 100644 --- a/internal/rtmp/message/reader.go +++ b/internal/rtmp/message/reader.go @@ -70,8 +70,7 @@ func allocateMessage(raw *rawmessage.Message) (Message, error) { } if (raw.Body[0] & 0b10000000) != 0 { - var fourCC [4]byte - copy(fourCC[:], raw.Body[1:5]) + fourCC := FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) switch fourCC { case FourCCAV1, FourCCVP9, FourCCHEVC: diff --git a/internal/rtmp/message/reader_test.go b/internal/rtmp/message/reader_test.go index dd18d379..cef73456 100644 --- a/internal/rtmp/message/reader_test.go +++ b/internal/rtmp/message/reader_test.go @@ -232,6 +232,20 @@ var readWriterCases = []struct { 0x0a, 0x01, 0x02, 0x03, }, }, + { + "extended sequence start", + &ExtendedSequenceStart{ + ChunkStreamID: 4, + MessageStreamID: 0x1000000, + FourCC: FourCCHEVC, + Config: []byte{0x01, 0x02, 0x03}, + }, + []byte{ + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x09, + 0x01, 0x00, 0x00, 0x00, 0x80, 0x68, 0x76, 0x63, + 0x31, 0x01, 0x02, 0x03, + }, + }, { "extended coded frames", &ExtendedCodedFrames{ diff --git a/internal/rtmp/message/video.go b/internal/rtmp/message/video.go index 9504a735..8401a5a4 100644 --- a/internal/rtmp/message/video.go +++ b/internal/rtmp/message/video.go @@ -74,9 +74,13 @@ func (m *Video) Unmarshal(raw *rawmessage.Message) error { return nil } +func (m Video) marshalBodySize() int { + return 5 + len(m.Payload) +} + // Marshal implements Message. func (m Video) Marshal() (*rawmessage.Message, error) { - body := make([]byte, 5+len(m.Payload)) + body := make([]byte, m.marshalBodySize()) if m.IsKeyFrame { body[0] = flvio.FRAME_KEY << 4 diff --git a/internal/rtmp/reader.go b/internal/rtmp/reader.go index 49c66cfc..b73eacbe 100644 --- a/internal/rtmp/reader.go +++ b/internal/rtmp/reader.go @@ -98,7 +98,8 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (formats.Format, form case 0: return false, nil - case message.CodecH264: + case message.CodecH264, float64(message.FourCCAV1), + float64(message.FourCCVP9), float64(message.FourCCHEVC): return true, nil } diff --git a/internal/rtmp/reader_test.go b/internal/rtmp/reader_test.go index 05208dab..fee6c168 100644 --- a/internal/rtmp/reader_test.go +++ b/internal/rtmp/reader_test.go @@ -50,6 +50,58 @@ func TestReadTracks(t *testing.T) { 0x44, 0x01, 0xc0, 0xf7, 0xc0, 0xcc, 0x90, } + var spsp h265.SPS + err := spsp.Unmarshal(h265SPS) + require.NoError(t, err) + + hvcc := &mp4.HvcC{ + ConfigurationVersion: 1, + GeneralProfileIdc: spsp.ProfileTierLevel.GeneralProfileIdc, + GeneralProfileCompatibility: spsp.ProfileTierLevel.GeneralProfileCompatibilityFlag, + GeneralConstraintIndicator: [6]uint8{ + h265SPS[7], h265SPS[8], h265SPS[9], + h265SPS[10], h265SPS[11], h265SPS[12], + }, + GeneralLevelIdc: spsp.ProfileTierLevel.GeneralLevelIdc, + // MinSpatialSegmentationIdc + // ParallelismType + ChromaFormatIdc: uint8(spsp.ChromaFormatIdc), + BitDepthLumaMinus8: uint8(spsp.BitDepthLumaMinus8), + BitDepthChromaMinus8: uint8(spsp.BitDepthChromaMinus8), + // AvgFrameRate + // ConstantFrameRate + NumTemporalLayers: 1, + // TemporalIdNested + LengthSizeMinusOne: 3, + NumOfNaluArrays: 3, + NaluArrays: []mp4.HEVCNaluArray{ + { + NaluType: byte(h265.NALUType_VPS_NUT), + NumNalus: 1, + Nalus: []mp4.HEVCNalu{{ + Length: uint16(len(h265VPS)), + NALUnit: h265VPS, + }}, + }, + { + NaluType: byte(h265.NALUType_SPS_NUT), + NumNalus: 1, + Nalus: []mp4.HEVCNalu{{ + Length: uint16(len(h265SPS)), + NALUnit: h265SPS, + }}, + }, + { + NaluType: byte(h265.NALUType_PPS_NUT), + NumNalus: 1, + Nalus: []mp4.HEVCNalu{{ + Length: uint16(len(h265PPS)), + NALUnit: h265PPS, + }}, + }, + }, + } + for _, ca := range []struct { name string videoTrack formats.Format @@ -180,6 +232,16 @@ func TestReadTracks(t *testing.T) { }, nil, }, + { + "obs 30", + &formats.H265{ + PayloadTyp: 96, + VPS: h265VPS, + SPS: h265SPS, + PPS: h265PPS, + }, + nil, + }, } { t.Run(ca.name, func(t *testing.T) { var buf bytes.Buffer @@ -566,57 +628,46 @@ func TestReadTracks(t *testing.T) { }) require.NoError(t, err) - var spsp h265.SPS - err = spsp.Unmarshal(h265SPS) + var buf bytes.Buffer + _, err = mp4.Marshal(&buf, hvcc, mp4.Context{}) require.NoError(t, err) - hvcc := &mp4.HvcC{ - ConfigurationVersion: 1, - GeneralProfileIdc: spsp.ProfileTierLevel.GeneralProfileIdc, - GeneralProfileCompatibility: spsp.ProfileTierLevel.GeneralProfileCompatibilityFlag, - GeneralConstraintIndicator: [6]uint8{ - h265SPS[7], h265SPS[8], h265SPS[9], - h265SPS[10], h265SPS[11], h265SPS[12], - }, - GeneralLevelIdc: spsp.ProfileTierLevel.GeneralLevelIdc, - // MinSpatialSegmentationIdc - // ParallelismType - ChromaFormatIdc: uint8(spsp.ChromaFormatIdc), - BitDepthLumaMinus8: uint8(spsp.BitDepthLumaMinus8), - BitDepthChromaMinus8: uint8(spsp.BitDepthChromaMinus8), - // AvgFrameRate - // ConstantFrameRate - NumTemporalLayers: 1, - // TemporalIdNested - LengthSizeMinusOne: 3, - NumOfNaluArrays: 3, - NaluArrays: []mp4.HEVCNaluArray{ - { - NaluType: byte(h265.NALUType_VPS_NUT), - NumNalus: 1, - Nalus: []mp4.HEVCNalu{{ - Length: uint16(len(h265VPS)), - NALUnit: h265VPS, - }}, - }, - { - NaluType: byte(h265.NALUType_SPS_NUT), - NumNalus: 1, - Nalus: []mp4.HEVCNalu{{ - Length: uint16(len(h265SPS)), - NALUnit: h265SPS, - }}, - }, - { - NaluType: byte(h265.NALUType_PPS_NUT), - NumNalus: 1, - Nalus: []mp4.HEVCNalu{{ - Length: uint16(len(h265PPS)), - NALUnit: h265PPS, - }}, + err = mrw.Write(&message.ExtendedSequenceStart{ + ChunkStreamID: 4, + MessageStreamID: 0x1000000, + FourCC: message.FourCCHEVC, + Config: buf.Bytes(), + }) + require.NoError(t, err) + + case "obs 30": + err := mrw.Write(&message.DataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + { + K: "videodatarate", + V: float64(0), + }, + { + K: "videocodecid", + V: float64(message.FourCCHEVC), + }, + { + K: "audiodatarate", + V: float64(0), + }, + { + K: "audiocodecid", + V: float64(0), + }, }, }, - } + }) + require.NoError(t, err) var buf bytes.Buffer _, err = mp4.Marshal(&buf, hvcc, mp4.Context{}) diff --git a/mediamtx.yml b/mediamtx.yml index 3c6ac971..730d7f4e 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -14,9 +14,9 @@ readTimeout: 10s # Timeout of write operations. writeTimeout: 10s # Number of read buffers. -# A higher value allows a wider throughput, a lower value allows to save RAM. +# A higher value allows to increase throughput, a lower value allows to save RAM. readBufferCount: 512 -# Maximum size of payload of outgoing UDP packets. +# Maximum size of outgoing UDP packets. # This can be decreased to avoid fragmentation on networks with a low UDP MTU. udpMaxPayloadSize: 1472