mirror of
https://github.com/zhangpeihao/gortmp
synced 2025-09-26 20:01:11 +08:00
119 lines
3.1 KiB
Go
119 lines
3.1 KiB
Go
// Copyright 2013, zhangpeihao All rights reserved.
|
|
|
|
package gortmp
|
|
|
|
import (
|
|
"github.com/zhangpeihao/log"
|
|
)
|
|
|
|
// Chunk stream
|
|
//
|
|
// A logical channel of communication that allows flow of chunks in a
|
|
// particular direction. The chunk stream can travel from the client
|
|
// to the server and reverse.
|
|
type OutboundChunkStream struct {
|
|
ID uint32
|
|
|
|
lastHeader *Header
|
|
lastOutAbsoluteTimestamp uint32
|
|
lastInAbsoluteTimestamp uint32
|
|
|
|
// Start at timestamp
|
|
startAt uint32
|
|
}
|
|
|
|
type InboundChunkStream struct {
|
|
ID uint32
|
|
|
|
lastHeader *Header
|
|
lastOutAbsoluteTimestamp uint32
|
|
lastInAbsoluteTimestamp uint32
|
|
|
|
// The unfinished incoming message
|
|
receivedMessage *Message
|
|
}
|
|
|
|
func NewOutboundChunkStream(id uint32) *OutboundChunkStream {
|
|
return &OutboundChunkStream{
|
|
ID: id,
|
|
}
|
|
}
|
|
|
|
func NewInboundChunkStream(id uint32) *InboundChunkStream {
|
|
return &InboundChunkStream{
|
|
ID: id,
|
|
}
|
|
}
|
|
|
|
func (chunkStream *OutboundChunkStream) NewOutboundHeader(message *Message) *Header {
|
|
header := &Header{
|
|
ChunkStreamID: chunkStream.ID,
|
|
MessageLength: uint32(message.Buf.Len()),
|
|
MessageTypeID: message.Type,
|
|
MessageStreamID: message.StreamID,
|
|
}
|
|
timestamp := message.Timestamp
|
|
if timestamp == AUTO_TIMESTAMP {
|
|
timestamp = chunkStream.GetTimestamp()
|
|
message.Timestamp = timestamp
|
|
message.AbsoluteTimestamp = timestamp
|
|
}
|
|
deltaTimestamp := uint32(0)
|
|
if chunkStream.lastOutAbsoluteTimestamp < message.Timestamp {
|
|
deltaTimestamp = message.Timestamp - chunkStream.lastOutAbsoluteTimestamp
|
|
}
|
|
if chunkStream.lastHeader == nil {
|
|
header.Fmt = HEADER_FMT_FULL
|
|
header.Timestamp = timestamp
|
|
} else {
|
|
|
|
if header.MessageStreamID == chunkStream.lastHeader.MessageStreamID {
|
|
if header.MessageTypeID == chunkStream.lastHeader.MessageTypeID &&
|
|
header.MessageLength == chunkStream.lastHeader.MessageLength {
|
|
switch chunkStream.lastHeader.Fmt {
|
|
case HEADER_FMT_FULL:
|
|
header.Fmt = HEADER_FMT_SAME_LENGTH_AND_STREAM
|
|
header.Timestamp = deltaTimestamp
|
|
case HEADER_FMT_SAME_STREAM:
|
|
fallthrough
|
|
case HEADER_FMT_SAME_LENGTH_AND_STREAM:
|
|
fallthrough
|
|
case HEADER_FMT_CONTINUATION:
|
|
if chunkStream.lastHeader.Timestamp == deltaTimestamp {
|
|
header.Fmt = HEADER_FMT_CONTINUATION
|
|
} else {
|
|
header.Fmt = HEADER_FMT_SAME_LENGTH_AND_STREAM
|
|
header.Timestamp = deltaTimestamp
|
|
}
|
|
}
|
|
} else {
|
|
header.Fmt = HEADER_FMT_SAME_STREAM
|
|
header.Timestamp = deltaTimestamp
|
|
}
|
|
} else {
|
|
header.Fmt = HEADER_FMT_FULL
|
|
header.Timestamp = timestamp
|
|
}
|
|
}
|
|
// Check extended timestamp
|
|
if header.Timestamp >= 0xffffff {
|
|
header.ExtendedTimestamp = message.Timestamp
|
|
header.Timestamp = 0xffffff
|
|
} else {
|
|
header.ExtendedTimestamp = 0
|
|
}
|
|
logger.ModulePrintf(logHandler, log.LOG_LEVEL_DEBUG,
|
|
"OutboundChunkStream::NewOutboundHeader() header: %+v\n", header)
|
|
chunkStream.lastHeader = header
|
|
chunkStream.lastOutAbsoluteTimestamp = timestamp
|
|
return header
|
|
}
|
|
|
|
func (chunkStream *OutboundChunkStream) GetTimestamp() uint32 {
|
|
if chunkStream.startAt == uint32(0) {
|
|
chunkStream.startAt = GetTimestamp()
|
|
return uint32(0)
|
|
}
|
|
return GetTimestamp() - chunkStream.startAt
|
|
}
|