Files
rtsp-simple-server/internal/protocols/rtmp/rawmessage/writer.go

208 lines
4.8 KiB
Go

package rawmessage
import (
"bufio"
"fmt"
"io"
"time"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp/bytecounter"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp/chunk"
)
type writerChunkStream struct {
mw *Writer
lastMessageStreamID *uint32
lastType *uint8
lastBodyLen *uint32
lastTimestamp *int64
lastTimestampDelta *int64
hasExtendedTimestamp bool
}
func (wc *writerChunkStream) writeChunk(c chunk.Chunk, hasExtendedTimestamp bool) error {
// check if we received an acknowledge
if wc.mw.checkAcknowledge && wc.mw.ackWindowSize != 0 {
diff := uint32(wc.mw.bcw.Count()) - wc.mw.ackValue
if diff > (wc.mw.ackWindowSize * 3 / 2) {
return fmt.Errorf("no acknowledge received within window")
}
}
buf, err := c.Marshal(hasExtendedTimestamp)
if err != nil {
return err
}
_, err = wc.mw.bw.Write(buf)
if err != nil {
return err
}
return nil
}
func (wc *writerChunkStream) writeMessage(msg *Message) error {
bodyLen := uint32(len(msg.Body))
pos := uint32(0)
firstChunk := true
// convert timestamp to milliseconds before splitting message in chunks
/// otherwise timestampDelta gets messed up.
timestamp := int64(msg.Timestamp / time.Millisecond)
var timestampDelta *int64
if wc.lastTimestamp != nil {
diff := timestamp - *wc.lastTimestamp
// use delta only if it is positive
if diff >= 0 {
timestampDelta = &diff
}
}
for {
chunkBodyLen := bodyLen - pos
if chunkBodyLen > wc.mw.chunkSize {
chunkBodyLen = wc.mw.chunkSize
}
if firstChunk {
firstChunk = false
switch {
case wc.lastMessageStreamID == nil || timestampDelta == nil || *wc.lastMessageStreamID != msg.MessageStreamID:
ts := uint32(timestamp)
err := wc.writeChunk(&chunk.Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Timestamp: ts,
Type: msg.Type,
MessageStreamID: msg.MessageStreamID,
BodyLen: (bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}, false)
if err != nil {
return err
}
wc.hasExtendedTimestamp = ts >= 0xFFFFFF
case *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen:
ts := uint32(*timestampDelta)
err := wc.writeChunk(&chunk.Chunk1{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: ts,
Type: msg.Type,
BodyLen: (bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}, false)
if err != nil {
return err
}
wc.hasExtendedTimestamp = ts >= 0xFFFFFF
case wc.lastTimestampDelta == nil || *wc.lastTimestampDelta != *timestampDelta:
ts := uint32(*timestampDelta)
err := wc.writeChunk(&chunk.Chunk2{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: ts,
Body: msg.Body[pos : pos+chunkBodyLen],
}, false)
if err != nil {
return err
}
wc.hasExtendedTimestamp = ts >= 0xFFFFFF
default:
err := wc.writeChunk(&chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}, wc.hasExtendedTimestamp)
if err != nil {
return err
}
}
v1 := msg.MessageStreamID
wc.lastMessageStreamID = &v1
v2 := msg.Type
wc.lastType = &v2
v3 := bodyLen
wc.lastBodyLen = &v3
v4 := timestamp
wc.lastTimestamp = &v4
if timestampDelta != nil {
v5 := *timestampDelta
wc.lastTimestampDelta = &v5
}
} else {
err := wc.writeChunk(&chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}, wc.hasExtendedTimestamp)
if err != nil {
return err
}
}
pos += chunkBodyLen
if (bodyLen - pos) == 0 {
return wc.mw.bw.Flush()
}
}
}
// Writer is a raw message writer.
type Writer struct {
bcw *bytecounter.Writer
bw *bufio.Writer
checkAcknowledge bool
chunkSize uint32
ackWindowSize uint32
ackValue uint32
chunkStreams map[byte]*writerChunkStream
}
// NewWriter allocates a Writer.
func NewWriter(
w io.Writer,
bcw *bytecounter.Writer,
checkAcknowledge bool,
) *Writer {
return &Writer{
bcw: bcw,
bw: bufio.NewWriter(w),
checkAcknowledge: checkAcknowledge,
chunkSize: 128,
chunkStreams: make(map[byte]*writerChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (w *Writer) SetChunkSize(v uint32) {
w.chunkSize = v
}
// SetWindowAckSize sets the window acknowledgement size.
func (w *Writer) SetWindowAckSize(v uint32) {
w.ackWindowSize = v
}
// SetAcknowledgeValue sets the acknowledge sequence number.
func (w *Writer) SetAcknowledgeValue(v uint32) {
w.ackValue = v
}
// Write writes a Message.
func (w *Writer) Write(msg *Message) error {
wc, ok := w.chunkStreams[msg.ChunkStreamID]
if !ok {
wc = &writerChunkStream{mw: w}
w.chunkStreams[msg.ChunkStreamID] = wc
}
return wc.writeMessage(msg)
}