diff --git a/internal/rtmp/rawmessage/reader.go b/internal/rtmp/rawmessage/reader.go index 12b34da4..f7b293b9 100644 --- a/internal/rtmp/rawmessage/reader.go +++ b/internal/rtmp/rawmessage/reader.go @@ -201,6 +201,11 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { v1 := *rc.curTimestamp + *rc.curTimestampDelta rc.curTimestamp = &v1 + if *rc.curBodyLen != uint32(len(c3.Body)) { + rc.curBody = &c3.Body + return nil, errMoreChunksNeeded + } + return &Message{ Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, Type: *rc.curType, diff --git a/internal/rtmp/rawmessage/reader_test.go b/internal/rtmp/rawmessage/reader_test.go index 927b40d1..8e770073 100644 --- a/internal/rtmp/rawmessage/reader_test.go +++ b/internal/rtmp/rawmessage/reader_test.go @@ -11,154 +11,191 @@ import ( "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" ) -func TestReader(t *testing.T) { - type sequenceEntry struct { - chunk chunk.Chunk - msg *Message - } +var cases = []struct { + name string + messages []*Message + chunks []chunk.Chunk + chunkSizes []uint32 +}{ + { + "(chunk0) + (chunk1)", + []*Message{ + { + ChunkStreamID: 27, + Timestamp: 18576 * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x03}, 64), + }, + { + ChunkStreamID: 27, + Timestamp: (18576 + 15) * time.Millisecond, + Type: chunk.MessageTypeSetWindowAckSize, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x04}, 64), + }, + }, + []chunk.Chunk{ + &chunk.Chunk0{ + ChunkStreamID: 27, + Timestamp: 18576, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + BodyLen: 64, + Body: bytes.Repeat([]byte{0x03}, 64), + }, + &chunk.Chunk1{ + ChunkStreamID: 27, + TimestampDelta: 15, + Type: chunk.MessageTypeSetWindowAckSize, + BodyLen: 64, + Body: bytes.Repeat([]byte{0x04}, 64), + }, + }, + []uint32{ + 128, + 128, + }, + }, + { + "(chunk0) + (chunk2) + (chunk3)", + []*Message{ + { + ChunkStreamID: 27, + Timestamp: 18576 * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x03}, 64), + }, + { + ChunkStreamID: 27, + Timestamp: (18576 + 15) * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x04}, 64), + }, + { + ChunkStreamID: 27, + Timestamp: (18576 + 15 + 15) * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x05}, 64), + }, + }, + []chunk.Chunk{ + &chunk.Chunk0{ + ChunkStreamID: 27, + Timestamp: 18576, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + BodyLen: 64, + Body: bytes.Repeat([]byte{0x03}, 64), + }, + &chunk.Chunk2{ + ChunkStreamID: 27, + TimestampDelta: 15, + Body: bytes.Repeat([]byte{0x04}, 64), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x05}, 64), + }, + }, + []uint32{ + 128, + 64, + 64, + }, + }, + { + "(chunk0 + chunk3) + (chunk1 + chunk3) + (chunk2 + chunk3) + (chunk3 + chunk3)", + []*Message{ + { + ChunkStreamID: 27, + Timestamp: 18576 * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x03}, 190), + }, + { + ChunkStreamID: 27, + Timestamp: 18576 * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x04}, 192), + }, + { + ChunkStreamID: 27, + Timestamp: (18576 + 15) * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x05}, 192), + }, + { + ChunkStreamID: 27, + Timestamp: (18576 + 15 + 15) * time.Millisecond, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + Body: bytes.Repeat([]byte{0x06}, 192), + }, + }, + []chunk.Chunk{ + &chunk.Chunk0{ + ChunkStreamID: 27, + Timestamp: 18576, + Type: chunk.MessageTypeSetPeerBandwidth, + MessageStreamID: 3123, + BodyLen: 190, + Body: bytes.Repeat([]byte{0x03}, 128), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x03}, 62), + }, + &chunk.Chunk1{ + ChunkStreamID: 27, + TimestampDelta: 0, + Type: chunk.MessageTypeSetPeerBandwidth, + BodyLen: 192, + Body: bytes.Repeat([]byte{0x04}, 128), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x04}, 64), + }, + &chunk.Chunk2{ + ChunkStreamID: 27, + TimestampDelta: 15, + Body: bytes.Repeat([]byte{0x05}, 128), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x05}, 64), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x06}, 128), + }, + &chunk.Chunk3{ + ChunkStreamID: 27, + Body: bytes.Repeat([]byte{0x06}, 64), + }, + }, + []uint32{ + 128, + 62, + 128, + 64, + 128, + 64, + 128, + 64, + }, + }, +} - for _, ca := range []struct { - name string - sequence []sequenceEntry - }{ - { - "chunk0 + chunk1", - []sequenceEntry{ - { - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x02}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x02}, 64), - }, - }, - { - &chunk.Chunk1{ - ChunkStreamID: 27, - TimestampDelta: 15, - Type: chunk.MessageTypeSetPeerBandwidth, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: (18576 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - }, - }, - }, - { - "chunk0 + chunk2 + chunk3", - []sequenceEntry{ - { - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x02}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x02}, 64), - }, - }, - { - &chunk.Chunk2{ - ChunkStreamID: 27, - TimestampDelta: 15, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: (18576 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - }, - { - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: (18576 + 15 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - }, - }, - }, - { - "chunk0 + chunk3 + chunk2 + chunk3", - []sequenceEntry{ - { - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 192, - Body: bytes.Repeat([]byte{0x03}, 128), - }, - nil, - }, - { - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 192), - }, - }, - { - &chunk.Chunk2{ - ChunkStreamID: 27, - TimestampDelta: 15, - Body: bytes.Repeat([]byte{0x04}, 128), - }, - nil, - }, - { - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - &Message{ - ChunkStreamID: 27, - Timestamp: 18591 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x04}, 192), - }, - }, - }, - }, - } { +func TestReader(t *testing.T) { + for _, ca := range cases { t.Run(ca.name, func(t *testing.T) { var buf bytes.Buffer bcr := bytecounter.NewReader(&buf) @@ -166,16 +203,16 @@ func TestReader(t *testing.T) { return nil }) - for _, entry := range ca.sequence { - buf2, err := entry.chunk.Marshal() + for _, cach := range ca.chunks { + buf2, err := cach.Marshal() require.NoError(t, err) buf.Write(buf2) + } - if entry.msg != nil { - msg, err := r.Read() - require.NoError(t, err) - require.Equal(t, entry.msg, msg) - } + for _, camsg := range ca.messages { + msg, err := r.Read() + require.NoError(t, err) + require.Equal(t, camsg, msg) } }) } diff --git a/internal/rtmp/rawmessage/writer_test.go b/internal/rtmp/rawmessage/writer_test.go index 1e0b8745..36fbae41 100644 --- a/internal/rtmp/rawmessage/writer_test.go +++ b/internal/rtmp/rawmessage/writer_test.go @@ -12,151 +12,7 @@ import ( ) func TestWriter(t *testing.T) { - for _, ca := range []struct { - name string - messages []*Message - chunks []chunk.Chunk - chunkSizes []uint32 - }{ - { - "chunk0 + chunk1", - []*Message{ - { - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - { - ChunkStreamID: 27, - Timestamp: (18576 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetWindowAckSize, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - }, - []chunk.Chunk{ - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &chunk.Chunk1{ - ChunkStreamID: 27, - TimestampDelta: 15, - Type: chunk.MessageTypeSetWindowAckSize, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - }, - []uint32{ - 128, - 128, - }, - }, - { - "chunk0 + chunk2 + chunk3", - []*Message{ - { - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - { - ChunkStreamID: 27, - Timestamp: (18576 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - { - ChunkStreamID: 27, - Timestamp: (18576 + 15 + 15) * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x05}, 64), - }, - }, - []chunk.Chunk{ - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 64, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &chunk.Chunk2{ - ChunkStreamID: 27, - TimestampDelta: 15, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x05}, 64), - }, - }, - []uint32{ - 128, - 64, - 64, - }, - }, - { - "chunk0 + chunk3 + chunk2 + chunk3", - []*Message{ - { - ChunkStreamID: 27, - Timestamp: 18576 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x03}, 192), - }, - { - ChunkStreamID: 27, - Timestamp: 18591 * time.Millisecond, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - Body: bytes.Repeat([]byte{0x04}, 192), - }, - }, - []chunk.Chunk{ - &chunk.Chunk0{ - ChunkStreamID: 27, - Timestamp: 18576, - Type: chunk.MessageTypeSetPeerBandwidth, - MessageStreamID: 3123, - BodyLen: 192, - Body: bytes.Repeat([]byte{0x03}, 128), - }, - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x03}, 64), - }, - &chunk.Chunk2{ - ChunkStreamID: 27, - TimestampDelta: 15, - Body: bytes.Repeat([]byte{0x04}, 128), - }, - &chunk.Chunk3{ - ChunkStreamID: 27, - Body: bytes.Repeat([]byte{0x04}, 64), - }, - }, - []uint32{ - 128, - 64, - 128, - 64, - }, - }, - } { + for _, ca := range cases { t.Run(ca.name, func(t *testing.T) { var buf bytes.Buffer w := NewWriter(bytecounter.NewWriter(&buf), true)