支持恢复上次推流的时间戳和序号

This commit is contained in:
yangjiechina
2024-11-30 17:48:39 +08:00
parent 62542c3e4f
commit 92bd005fc4
39 changed files with 674 additions and 1228 deletions

18
api.go
View File

@@ -232,7 +232,7 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.
}
func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request) {
sid := r.URL.Query().Get(hls.SessionIdKey)
sid := r.URL.Query().Get(hls.SessionIDKey)
var sink stream.Sink
if sid != "" {
sink = stream.SinkManager.Find(stream.SinkID(sid))
@@ -266,12 +266,12 @@ func (api *ApiServer) onHLS(source string, w http.ResponseWriter, r *http.Reques
// 如果没有携带会话ID, 认为是首次拉流. Server将生成会话ID, 应答给拉流端, 后续拉流请求(.M3U8和.TS的HTTP请求)都将携带该会话ID.
// 会话ID的Key为"hls_sid", 为避免冲突, 播放端和hook server不要再使用, 否则会一直拉流失败.
sid := r.URL.Query().Get(hls.SessionIdKey)
sid := r.URL.Query().Get(hls.SessionIDKey)
if sid == "" {
sid = utils.RandStringBytes(10)
query := r.URL.Query()
query.Add(hls.SessionIdKey, sid)
query.Add(hls.SessionIDKey, sid)
path := fmt.Sprintf("/%s.m3u8?%s", source, query.Encode())
response := "#EXTM3U\r\n" +
@@ -284,7 +284,7 @@ func (api *ApiServer) onHLS(source string, w http.ResponseWriter, r *http.Reques
sink := stream.SinkManager.Find(sid)
// 更新最近的M3U8文件
if sink != nil {
w.Write([]byte(sink.(*hls.M3U8Sink).GetM3U8String()))
w.Write([]byte(sink.(*hls.M3U8Sink).GetPlaylist()))
return
}
@@ -394,10 +394,10 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
var details []SourceDetails
for _, source := range sources {
var tracks []string
streams := source.OriginStreams()
for _, avStream := range streams {
tracks = append(tracks, avStream.CodecId().String())
var codecs []string
tracks := source.OriginTracks()
for _, track := range tracks {
codecs = append(codecs, track.Stream.CodecId().String())
}
details = append(details, SourceDetails{
@@ -406,7 +406,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
Time: source.CreateTime(),
SinkCount: source.SinkCount(),
Bitrate: strconv.Itoa(source.GetBitrateStatistics().PreviousSecond()/1024) + "KBS", // 后续开发
Tracks: tracks,
Tracks: codecs,
})
}

93
args.go Normal file
View File

@@ -0,0 +1,93 @@
package main
import (
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"os"
"strconv"
"strings"
)
func readRunArgs() (map[string]string, map[string]string) {
args := os.Args
// 运行参数项优先级高于config.json参数项
// --disable-rtmp --enable-rtmp=11935
// --disable-rtsp --enable-rtsp
// --disable-hls --enable-hls
// --disable-webrtc --enable-webrtc=18000
// --disable-gb28181 --enable-gb28181
// --disable-jt1078 --enable-jt1078=11078
// --disable-hooks --enable-hooks
// --disable-record --enable-record
disableOptions := map[string]string{}
enableOptions := map[string]string{}
for _, arg := range args {
// 参数忽略大小写
arg = strings.ToLower(arg)
var option string
var enable bool
if strings.HasPrefix(arg, "--disable-") {
option = arg[len("--disable-"):]
} else if strings.HasPrefix(arg, "--enable-") {
option = arg[len("--enable-"):]
enable = true
} else {
continue
}
pair := strings.Split(option, "=")
var value string
if len(pair) > 1 {
value = pair[1]
}
if enable {
enableOptions[pair[0]] = value
} else {
disableOptions[pair[0]] = value
}
}
// 删除重叠参数, 禁用和开启同时声明时, 以开启为准.
for k := range enableOptions {
if _, ok := disableOptions[k]; ok {
delete(disableOptions, k)
}
}
return disableOptions, enableOptions
}
func mergeArgs(options map[string]stream.EnableConfig, disableOptions, enableOptions map[string]string) {
for k := range disableOptions {
option, ok := options[k]
utils.Assert(ok)
option.SetEnable(false)
}
for k, v := range enableOptions {
var port int
if len(v) > 0 {
atoi, err := strconv.Atoi(v)
if err == nil && atoi > 0 {
port = atoi
}
}
option, ok := options[k]
utils.Assert(ok)
option.SetEnable(true)
if port > 0 {
if config, ok := option.(stream.PortConfig); ok {
config.SetPort(port)
}
}
}
}

51
bridge.go Normal file
View File

@@ -0,0 +1,51 @@
package main
import (
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/lkm/stream"
)
// 处理不同包不能相互引用的需求
func NewStreamEndInfo(source stream.Source) *stream.StreamEndInfo {
tracks := source.OriginTracks()
streams := source.GetTransStreams()
if len(tracks) < 1 || len(streams) < 1 {
return nil
}
info := stream.StreamEndInfo{
ID: source.GetID(),
Timestamps: make(map[utils.AVCodecID][2]int64, len(tracks)),
}
for _, track := range tracks {
var timestamp [2]int64
timestamp[0] = track.Dts
timestamp[1] = track.Pts
info.Timestamps[track.Stream.CodecId()] = timestamp
}
for _, transStream := range streams {
// 获取ts切片序号
if stream.TransStreamHls == transStream.GetProtocol() {
if hls := transStream.(*hls.TransStream); hls.M3U8Writer.Size() > 0 {
info.M3U8Writer = hls.M3U8Writer
info.PlaylistFormat = hls.PlaylistFormat
}
} else if stream.TransStreamRtsp == transStream.GetProtocol() {
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
info.RtspTracks = make(map[byte]uint16, len(tracks))
for _, track := range rtsp.RtspTracks {
info.RtspTracks[track.PT] = track.EndSeq
}
}
}
}
return &info
}

View File

@@ -1,78 +0,0 @@
package collections
import "github.com/lkmio/avformat/utils"
type Node[T any] struct {
data T
next *Node[T]
}
type LinkedList[T any] struct {
fist *Node[T]
last *Node[T]
size int
}
func (l *LinkedList[T]) Add(data T) {
node := &Node[T]{data: data, next: nil}
if l.last != nil {
l.last.next = node
l.last = node
} else {
l.fist = node
l.last = node
}
l.size++
}
func (l *LinkedList[T]) Remove(index int) T {
utils.Assert(index < l.size)
prevNode := l.fist
offsetNode := l.fist
for i := 0; i < l.size; i++ {
if i == index {
break
}
prevNode = offsetNode
offsetNode = offsetNode.next
}
if offsetNode == l.fist {
//删除第一个node
l.fist = l.fist.next
} else if offsetNode == l.last {
//删除最后一个node
l.last = prevNode
} else {
prevNode.next = offsetNode.next
}
if l.size--; l.size == 0 {
l.fist = nil
l.last = nil
}
return offsetNode.data
}
func (l *LinkedList[T]) Get(index int) T {
utils.Assert(index < l.size)
offsetNode := l.fist
for i := 0; i < l.size; i++ {
if i == index {
break
}
offsetNode = offsetNode.next
}
return offsetNode.data
}
func (l *LinkedList[T]) Size() int {
return l.size
}

View File

@@ -1,24 +0,0 @@
package collections
import (
"github.com/lkmio/avformat/utils"
"testing"
)
func TestLinkedList(t *testing.T) {
l := LinkedList[int]{}
for i := 0; i < 100; i++ {
l.Add(i)
}
for i := 0; i < 100; i++ {
utils.Assert(l.Get(i) == i)
}
for i := 0; i < 100; i++ {
utils.Assert(l.Remove(0) == i)
}
utils.Assert(l.Size() == 0)
}

View File

@@ -1,235 +0,0 @@
package collections
import (
"github.com/lkmio/avformat/utils"
)
// MemoryPool 从解复用阶段拼凑成完整的AVPacket开始(写)到GOP缓存结束(释放),整个过程都使用池中内存
// 类似环形缓冲区, 区别在于,内存块是连续的、整块内存.
// AVPacket缓存使用memorypool_rb, 允许回环(内存必须完整). tranStream使用memorypool_direct, 连续一块完整的内存, 否则与合并缓存写的观念背道而驰.
// 两种使用方式:
// 1. 已知需要分配内存大小, 直接使用Allocate()函数分配, 并且外部自行操作内存块
// 2. 未知分配内存大小, 先使用Mark()函数,标记内存起始偏移量, 再通过Write()函数将数据拷贝进内存块最后调用Fetch/Reset函数完成或释放内存块
//
// 两种使用方式互斥,不能同时使用.
type MemoryPool interface {
// Allocate 分配指定大小的内存块
Allocate(size int) []byte
// Mark 标记内存块起始位置
Mark()
TryMark()
// Write 向内存块中写入数据, 必须先调用Mark函数
Write(data []byte)
// Fetch 获取当前内存块必须先调用Mark函数
Fetch() []byte
// Reset 清空本次流程写入的还未生效内存块
Reset()
// Reserve 预留指定大小的内存块
//主要是为了和实现和Write相似功能但是不拷贝, 所以使用流程和Write一样.
Reserve(size int)
// FreeHead 释放头部一块内存
FreeHead()
// FreeTail 释放尾部一块内存
FreeTail()
// Data 返回头尾已使用的内存块
Data() ([]byte, []byte)
// Clear 清空所有内存块
Clear()
IsEmpty() bool
Capacity() int
Size() int
}
type memoryPool struct {
data []byte
capacity int //实际的可用容量当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
head int //起始索引
tail int //末尾索引, 当形成回环时, 会小于起始索引
markIndex int //分配内存块的起始索引, 一定小于末尾索引, data[markIndex:tail]此次分配的内存块
marked bool
blockQueue *Queue
discardBlockCount int //扩容时, 丢弃之前的内存块数量
recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据.
isFull func(int) bool
}
func (m *memoryPool) grow(size int) {
//1.5倍扩容
newData := make([]byte, (cap(m.data)+size)*3/2)
//未写入缓冲区大小
flushSize := m.tail - m.markIndex
//拷贝之前的数据
if m.recopy {
head, tail := m.Data()
copy(newData, head)
copy(newData[len(head):], tail)
m.head = 0
m.tail = len(head) + len(tail)
m.markIndex = m.tail - flushSize
} else {
//只拷贝本回合数据
copy(newData, m.data[m.tail-flushSize:m.tail])
//丢弃之前的内存块
m.discardBlockCount += m.blockQueue.Size()
m.blockQueue.Clear()
m.head = 0
m.tail = flushSize
m.markIndex = 0
}
m.data = newData
m.capacity = cap(newData)
}
// 根据head和tail计算出可用的内存地址
func (m *memoryPool) allocate(size int) []byte {
if m.isFull(size) {
m.grow(size)
}
bytes := m.data[m.tail : m.tail+size]
m.tail += size
return bytes
}
func (m *memoryPool) Mark() {
utils.Assert(!m.marked)
m.markIndex = m.tail
m.marked = true
}
func (m *memoryPool) TryMark() {
if !m.marked {
m.markIndex = m.tail
m.marked = true
}
}
func (m *memoryPool) Write(data []byte) {
utils.Assert(m.marked)
allocate := m.allocate(len(data))
copy(allocate, data)
}
func (m *memoryPool) Reserve(size int) {
utils.Assert(m.marked)
_ = m.allocate(size)
}
func (m *memoryPool) Allocate(size int) []byte {
m.Mark()
_ = m.allocate(size)
return m.Fetch()
}
func (m *memoryPool) Fetch() []byte {
utils.Assert(m.marked)
m.marked = false
size := m.tail - m.markIndex
m.blockQueue.Push(size)
return m.data[m.markIndex:m.tail]
}
func (m *memoryPool) Reset() {
m.marked = false
m.tail = m.markIndex
}
func (m *memoryPool) freeOldBlocks() bool {
utils.Assert(!m.marked)
if m.discardBlockCount > 0 {
m.discardBlockCount--
return true
}
return false
}
func (m *memoryPool) FreeHead() {
if m.freeOldBlocks() /*|| m.blockQueue.IsEmpty()*/ {
return
}
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.Pop().(int)
m.head += size
if m.blockQueue.IsEmpty() {
m.Clear()
} else if m.head >= m.capacity {
//清空末尾, 从头开始
m.head = 0
}
}
func (m *memoryPool) FreeTail() {
if m.freeOldBlocks() /*|| m.blockQueue.IsEmpty()*/ {
return
}
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.PopBack().(int)
m.tail -= size
if m.blockQueue.IsEmpty() {
m.Clear()
} else if m.tail == 0 {
//回环回到线性
m.tail = m.capacity
m.capacity = cap(m.data)
}
}
func (m *memoryPool) Data() ([]byte, []byte) {
if m.tail <= m.head && !m.blockQueue.IsEmpty() {
return m.data[m.head:m.capacity], m.data[:m.tail]
} else {
return m.data[m.head:m.tail], nil
}
}
func (m *memoryPool) Clear() {
m.capacity = cap(m.data)
m.head = 0
m.tail = 0
m.markIndex = 0
m.marked = false
m.blockQueue.Clear()
m.discardBlockCount = 0
}
func (m *memoryPool) IsEmpty() bool {
utils.Assert(!m.marked)
return m.blockQueue.Size() < 1
}
func (m *memoryPool) Capacity() int {
return m.capacity
}
func (m *memoryPool) Size() int {
head, tail := m.Data()
return len(head) + len(tail)
}

View File

@@ -1,32 +0,0 @@
package collections
import (
"encoding/hex"
"github.com/lkmio/avformat/utils"
"testing"
"unsafe"
)
func TestMemoryPool(t *testing.T) {
bytes := make([]byte, 10)
for i := 0; i < 10; i++ {
bytes[i] = byte(i)
}
pool := NewDirectMemoryPool(5)
last := uintptr(0)
for i := 0; i < 10; i++ {
pool.Mark()
pool.Write(bytes)
fetch := pool.Fetch()
addr := *(*uintptr)(unsafe.Pointer(&fetch))
if last != 0 {
utils.Assert(last == addr)
}
last = addr
println(hex.Dump(fetch))
pool.FreeTail()
}
}

View File

@@ -1,23 +0,0 @@
package collections
type directMemoryPool struct {
*memoryPool
}
func (m *directMemoryPool) isFull(size int) bool {
//尾部没有大小合适的内存空间
return m.capacity-m.tail < size
}
func NewDirectMemoryPool(capacity int) MemoryPool {
pool := &directMemoryPool{}
pool.memoryPool = &memoryPool{
data: make([]byte, capacity),
capacity: capacity,
blockQueue: NewQueue(2048),
recopy: true,
isFull: pool.isFull,
}
return pool
}

View File

@@ -1,42 +0,0 @@
package collections
type rbMemoryPool struct {
*memoryPool
}
func (m *rbMemoryPool) isFull(size int) bool {
//已经回环
over := m.tail < m.head
if over && m.head-m.tail >= size {
//头部有大小合适的内存空间
} else if !over && m.capacity-m.tail >= size {
//尾部有大小合适的内存空间
} else if !over && m.head > size {
//形成回环
//修改有效内存容量大小
m.capacity = m.markIndex
//拷贝之前的数据
incompleteBlockSize := m.tail - m.markIndex
copy(m.data, m.data[m.markIndex:m.tail])
m.markIndex = 0
m.tail = incompleteBlockSize
} else {
return true
}
return false
}
func NewRbMemoryPool(capacity int) MemoryPool {
pool := &rbMemoryPool{}
pool.memoryPool = &memoryPool{
data: make([]byte, capacity),
capacity: capacity,
blockQueue: NewQueue(2048),
recopy: false,
isFull: pool.isFull,
}
return pool
}

View File

@@ -1,51 +0,0 @@
package collections
import (
"github.com/lkmio/avformat/libbufio"
"github.com/lkmio/avformat/utils"
)
type Queue struct {
*ringBuffer
}
func NewQueue(capacity int) *Queue {
utils.Assert(capacity > 0)
return &Queue{ringBuffer: &ringBuffer{
data: make([]interface{}, capacity),
head: 0,
tail: 0,
size: 0,
}}
}
func (q *Queue) Push(value interface{}) {
if q.ringBuffer.IsFull() {
newArray := make([]interface{}, q.ringBuffer.Size()*2)
head, tail := q.ringBuffer.Data()
copy(newArray, head)
if tail != nil {
copy(newArray[len(head):], tail)
}
q.data = newArray
q.head = 0
q.tail = q.size
}
q.data[q.tail] = value
q.tail = (q.tail + 1) % cap(q.data)
q.size++
}
func (q *Queue) PopBack() interface{} {
utils.Assert(q.size > 0)
value := q.ringBuffer.Tail()
q.size--
q.tail = libbufio.MaxInt(0, q.tail-1)
return value
}

View File

@@ -1,19 +0,0 @@
package collections
import (
"fmt"
"testing"
)
func TestQueue(t *testing.T) {
queue := NewQueue(1)
for i := 0; i < 100; i++ {
queue.Push(i)
}
for i := 0; i < 100; i++ {
pop := queue.PopBack()
println(fmt.Sprintf("element:%d", pop.(int)))
}
}

View File

@@ -1,119 +0,0 @@
package collections
import (
"github.com/lkmio/avformat/utils"
)
type RingBuffer interface {
IsEmpty() bool
IsFull() bool
Push(value interface{})
Pop() interface{}
Head() interface{}
Tail() interface{}
Size() int
Capacity() int
Data() ([]interface{}, []interface{})
Clear()
}
func NewRingBuffer(capacity int) RingBuffer {
utils.Assert(capacity > 0)
r := &ringBuffer{
data: make([]interface{}, capacity),
head: 0,
tail: 0,
size: 0,
capacity: capacity,
}
return r
}
type ringBuffer struct {
data []interface{}
head int
tail int
size int
capacity int
}
func (r *ringBuffer) IsEmpty() bool {
return r.size == 0
}
func (r *ringBuffer) IsFull() bool {
return r.size == cap(r.data)
}
func (r *ringBuffer) Push(value interface{}) {
if r.IsFull() {
r.Pop()
}
r.data[r.tail] = value
r.tail = (r.tail + 1) % cap(r.data)
r.size++
}
func (r *ringBuffer) Pop() interface{} {
if r.IsEmpty() {
return nil
}
element := r.data[r.head]
r.data[r.head] = nil
r.head = (r.head + 1) % cap(r.data)
r.size--
return element
}
func (r *ringBuffer) Head() interface{} {
utils.Assert(!r.IsEmpty())
return r.data[r.head]
}
func (r *ringBuffer) Tail() interface{} {
utils.Assert(!r.IsEmpty())
if r.tail > 0 {
return r.data[r.tail-1]
} else {
return r.data[cap(r.data)-1]
}
}
func (r *ringBuffer) Size() int {
return r.size
}
func (r *ringBuffer) Capacity() int {
return r.capacity
}
func (r *ringBuffer) Data() ([]interface{}, []interface{}) {
if r.size == 0 {
return nil, nil
}
if r.tail <= r.head {
return r.data[r.head:], r.data[:r.tail]
} else {
return r.data[r.head:r.tail], nil
}
}
func (r *ringBuffer) Clear() {
r.size = 0
r.head = 0
r.tail = 0
}

View File

@@ -1,29 +0,0 @@
package collections
import (
"fmt"
"testing"
)
func TestRingBuffer(t *testing.T) {
buffer := NewRingBuffer(10)
full := buffer.IsFull()
empty := buffer.IsEmpty()
head := buffer.Head()
tail := buffer.Tail()
pop := buffer.Pop()
println(full)
println(empty)
println(head)
println(tail)
println(pop)
for i := 0; i < 100; i++ {
buffer.Push(i)
}
for !buffer.IsEmpty() {
i := buffer.Pop()
println(fmt.Sprintf("element:%d", i.(int)))
}
}

View File

@@ -81,18 +81,18 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}
func (t *TransStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
if utils.AVMediaTypeAudio == stream.Type() {
t.muxer.AddAudioTrack(stream.CodecId(), 0, 0, 0)
} else if utils.AVMediaTypeVideo == stream.Type() {
t.muxer.AddVideoTrack(stream.CodecId())
if utils.AVMediaTypeAudio == track.Stream.Type() {
t.muxer.AddAudioTrack(track.Stream.CodecId(), 0, 0, 0)
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
t.muxer.AddVideoTrack(track.Stream.CodecId())
t.muxer.AddProperty("width", stream.CodecParameters().Width())
t.muxer.AddProperty("height", stream.CodecParameters().Height())
t.muxer.AddProperty("width", track.Stream.CodecParameters().Width())
t.muxer.AddProperty("height", track.Stream.CodecParameters().Height())
}
return nil
}
@@ -102,13 +102,13 @@ func (t *TransStream) WriteHeader() error {
for _, track := range t.BaseTransStream.Tracks {
var data []byte
if utils.AVMediaTypeAudio == track.Type() {
data = track.Extra()
} else if utils.AVMediaTypeVideo == track.Type() {
data = track.CodecParameters().MP4ExtraData()
if utils.AVMediaTypeAudio == track.Stream.Type() {
data = track.Stream.Extra()
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
data = track.Stream.CodecParameters().MP4ExtraData()
}
n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true)
n := t.muxer.Input(t.header[t.headerSize:], track.Stream.Type(), len(data), 0, 0, false, true)
t.headerSize += n
copy(t.header[t.headerSize:], data)
t.headerSize += len(data)
@@ -226,6 +226,6 @@ func NewHttpTransStream() stream.TransStream {
}
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewHttpTransStream(), nil
}

View File

@@ -2,7 +2,6 @@ package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
)
@@ -34,6 +33,6 @@ func NewTransStream() (stream.TransStream, error) {
}, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewTransStream()
}

View File

@@ -148,7 +148,7 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
if stream_ != nil {
source.OnDeMuxStream(stream_)
if len(source.OriginStreams()) >= source.deMuxerCtx.TrackCount() {
if len(source.OriginTracks()) >= source.deMuxerCtx.TrackCount() {
source.OnDeMuxStreamDone()
}
}
@@ -196,7 +196,7 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
packet.SetDuration(duration)
duration = packet.Duration(90000)
if duration < 0 || duration < 750 {
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. ssrc:%d", source.ssrc)
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. source: %s ssrc: %d", source.ID, source.ssrc)
source.isSystemClock = true
}
}

View File

@@ -9,22 +9,22 @@ import (
)
const (
SessionIdKey = "hls_sid"
SessionIDKey = "hls_sid"
)
type M3U8Sink struct {
stream.BaseSink
cb func(m3u8 []byte) // 生成m3u8文件的发送回调
sessionId string
playtime time.Time
playTimer *time.Timer
m3u8StringFormat *string
cb func(m3u8 []byte) // 生成m3u8文件的发送回调
sessionId string // 拉流会话ID
playtime time.Time
playTimer *time.Timer
playlistFormat *string
}
// SendM3U8Data 首次向拉流端应答M3U8文件 后续更新M3U8文件, 通过调用@see GetM3U8String 函数获取最新的M3U8文件.
// SendM3U8Data 首次向拉流端应答M3U8文件 后续更新M3U8文件, 通过调用@see GetPlaylist 函数获取最新的M3U8文件.
func (s *M3U8Sink) SendM3U8Data(data *string) error {
s.m3u8StringFormat = data
s.cb([]byte(s.GetM3U8String()))
s.playlistFormat = data
s.cb([]byte(s.GetPlaylist()))
// 开启计时器, 长时间没有拉流关闭sink
timeout := time.Duration(stream.AppConfig.IdleTimeout)
@@ -50,8 +50,8 @@ func (s *M3U8Sink) SendM3U8Data(data *string) error {
func (s *M3U8Sink) StartStreaming(transStream stream.TransStream) error {
hls := transStream.(*TransStream)
if hls.m3u8.Size() > 0 {
if err := s.SendM3U8Data(&hls.m3u8StringFormat); err != nil {
if hls.M3U8Writer.Size() > 0 {
if err := s.SendM3U8Data(hls.PlaylistFormat); err != nil {
return err
}
} else {
@@ -62,13 +62,14 @@ func (s *M3U8Sink) StartStreaming(transStream stream.TransStream) error {
return nil
}
func (s *M3U8Sink) GetM3U8String() string {
func (s *M3U8Sink) GetPlaylist() string {
// 更新拉流时间
//s.RefreshPlayTime()
param := fmt.Sprintf("?%s=%s", SessionIdKey, s.sessionId)
m3u8 := strings.ReplaceAll(*s.m3u8StringFormat, "%s", param)
return m3u8
// 替换每个sink唯一的拉流会话ID
param := fmt.Sprintf("?%s=%s", SessionIDKey, s.sessionId)
playlist := strings.ReplaceAll(*s.playlistFormat, "%s", param)
return playlist
}
func (s *M3U8Sink) RefreshPlayTime() {

View File

@@ -2,6 +2,7 @@ package hls
import (
"fmt"
"github.com/lkmio/avformat/libhls"
"github.com/lkmio/avformat/libmpeg"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
@@ -26,7 +27,7 @@ type TransStream struct {
muxer libmpeg.TSMuxer
context *tsContext
m3u8 M3U8Writer
M3U8Writer libhls.M3U8Writer
m3u8Name string // m3u8文件名
m3u8File *os.File // m3u8文件句柄
dir string // m3u8文件父目录
@@ -35,8 +36,8 @@ type TransStream struct {
duration int // 切片时长, 单位秒
playlistLength int // 最大切片文件个数
m3u8Sinks map[stream.SinkID]*M3U8Sink // 等待响应m3u8文件的sink队列
m3u8StringFormat string // 一个协程写, 多个协程读, 不用加锁保护
m3u8Sinks map[stream.SinkID]*M3U8Sink // 保存还未生成mu38文件就拉流的sink, 发送一次后就删除.
PlaylistFormat *string // 位于内存中的播放列表每个sink都引用指针地址.
}
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
@@ -64,7 +65,7 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream), pts, dts, packet.KeyFrame())
} else {
t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
}
@@ -72,17 +73,17 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
return nil, -1, true, nil
}
func (t *TransStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
var err error
if utils.AVMediaTypeVideo == stream.Type() {
data := stream.CodecParameters().AnnexBExtraData()
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data)
if utils.AVMediaTypeVideo == track.Stream.Type() {
data := track.Stream.CodecParameters().AnnexBExtraData()
_, err = t.muxer.AddTrack(track.Stream.Type(), track.Stream.CodecId(), data)
} else {
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), stream.Extra())
_, err = t.muxer.AddTrack(track.Stream.Type(), track.Stream.CodecId(), track.Stream.Extra())
}
return err
}
@@ -105,12 +106,9 @@ func (t *TransStream) onTSAlloc(size int) []byte {
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
}
// 写入新的TS切片更新M3U8
func (t *TransStream) flushSegment(end bool) error {
defer func() {
t.context.segmentSeq++
}()
// 将剩余数据写入缓冲区
// 写入剩余TS包
if t.context.writeBufferSize > 0 {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
t.context.writeBufferSize = 0
@@ -121,37 +119,42 @@ func (t *TransStream) flushSegment(end bool) error {
}
// 删除多余的ts切片文件
if t.m3u8.Size() >= t.playlistLength {
_ = os.Remove(t.m3u8.Head().path)
if t.M3U8Writer.Size() >= t.playlistLength {
_ = os.Remove(t.M3U8Writer.Get(0).Path)
}
// 更新m3u8
// 更新m3u8列表
duration := float32(t.muxer.Duration()) / 90000
t.M3U8Writer.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path)
m3u8Txt := t.M3U8Writer.String()
t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path)
m3u8Txt := t.m3u8.ToString()
if end {
m3u8Txt += "#EXT-X-ENDLIST"
}
t.m3u8StringFormat = m3u8Txt
//if end {
// m3u8Txt += "#EXT-X-ENDLIST"
//}
if _, err := t.m3u8File.Seek(0, 0); err != nil {
return err
} else if err := t.m3u8File.Truncate(0); err != nil {
return err
} else if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
return err
*t.PlaylistFormat = m3u8Txt
// 写入最新的m3u8到文件
if t.m3u8File != nil {
if _, err := t.m3u8File.Seek(0, 0); err != nil {
return err
} else if err = t.m3u8File.Truncate(0); err != nil {
return err
} else if _, err = t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
return err
}
}
// 通知等待m3u8的sink
// 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
if len(t.m3u8Sinks) > 0 && t.m3u8.Size() > 1 {
if len(t.m3u8Sinks) > 0 && t.M3U8Writer.Size() > 1 {
for _, sink := range t.m3u8Sinks {
sink.SendM3U8Data(&t.m3u8StringFormat)
sink.SendM3U8Data(t.PlaylistFormat)
}
t.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 0)
}
return nil
}
@@ -160,8 +163,9 @@ func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
startSeq := t.context.segmentSeq + 1
for {
tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq)
tsName := fmt.Sprintf(t.tsFormat, startSeq)
// ts文件
t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName)
// m3u8列表中切片的url
@@ -179,9 +183,10 @@ func (t *TransStream) createSegment() error {
}
// 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
t.context.segmentSeq++
startSeq++
}
t.context.segmentSeq = startSeq
t.context.file = tsFile
_ = t.muxer.WriteHeader()
return nil
@@ -239,19 +244,19 @@ func DeleteOldSegments(id string) {
// @Params parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
// @Params segmentDuration 单个切片时长
// @Params playlistLength 缓存多少个切片
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int) (stream.TransStream, error) {
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer libhls.M3U8Writer) (stream.TransStream, error) {
// 创建文件夹
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
log.Sugar.Errorf("创建目录失败 err:%s path:%s", err.Error(), m3u8Path)
log.Sugar.Errorf("创建HLS目录失败 err: %s path: %s", err.Error(), m3u8Path)
return nil, err
}
// 创建m3u8文件
file, err := os.OpenFile(m3u8Path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
log.Sugar.Errorf("创建m3u8文件失败 err:%s path:%s", err.Error(), m3u8Path)
return nil, err
log.Sugar.Errorf("创建m3u8文件失败 err: %s path: %s", err.Error(), m3u8Path)
//return nil, err
}
transStream := &TransStream{
@@ -263,6 +268,18 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
playlistLength: playlistLength,
}
if writer != nil {
transStream.M3U8Writer = writer
} else {
transStream.M3U8Writer = libhls.NewM3U8Writer(playlistLength)
}
if playlistFormat != nil {
transStream.PlaylistFormat = playlistFormat
} else {
transStream.PlaylistFormat = new(string)
}
// 创建TS封装器
muxer := libmpeg.NewTSMuxer()
muxer.SetWriteHandler(transStream.onTSWrite)
@@ -270,24 +287,35 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
// ts封装上下文对象
transStream.context = &tsContext{
segmentSeq: 0,
segmentSeq: seq,
writeBuffer: make([]byte, 1024*1024),
writeBufferSize: 0,
}
transStream.muxer = muxer
transStream.m3u8 = NewM3U8Writer(playlistLength)
transStream.m3u8File = file
transStream.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
return transStream, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
id := source.GetID()
// 先删除旧的m3u8文件
_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
var writer libhls.M3U8Writer
var playlistFormat *string
startSeq := -1
endInfo := source.GetStreamEndInfo()
if endInfo != nil && endInfo.M3U8Writer != nil {
writer = endInfo.M3U8Writer
playlistFormat = endInfo.PlaylistFormat
startSeq = writer.Get(writer.Size() - 1).Sequence
}
// 删除旧的m3u8文件
//_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
// 删除旧的切片文件
go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength)
//go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength, startSeq, playlistFormat, writer)
}

View File

@@ -1,172 +0,0 @@
package hls
import (
"bytes"
"github.com/lkmio/lkm/collections"
"math"
"strconv"
)
const (
ExtM3u = "EXTM3U"
ExtXVersion = "EXT-X-VERSION" //在文件中唯一
ExtINF = "EXTINF" //<duration>(浮点类型, 版本小于3用整型),[<title>]
ExXByteRange = "EXT-X-BYTERANGE" //版本4及以上,分片位置
ExtXDiscontinuity = "EXT-X-DISCONTINUITY" //后面的切片不连续, 文件格式、时间戳发生变化
ExtXKey = "EXT-X-KEY" //加密使用
ExtXMap = "EXT-X-MAP" //音视频的元数据
ExtXProgramDateTime = "EXT-X-PROGRAM-DATE-TIME"
ExtXDateRange = "EXT-X-DATERANGE"
ExtXTargetDuration = "EXT-X-TARGETDURATION" //切片最大时长, 整型单位秒
ExtXMediaSequence = "EXT-X-MEDIA-SEQUENCE" //第一个切片序号
ExtXDiscontinuitySequence = "EXT-X-DISCONTINUITY-SEQUENCE"
ExtXEndList = "EXT-X-ENDLIST"
ExtXPlaylistType = "EXT-X-PLAYLIST-TYPE"
ExtXIFramesOnly = "EXT-X-I-FRAMES-ONLY"
ExtXMedia = "EXT-X-MEDIA"
ExtXStreamINF = "EXT-X-STREAM-INF"
ExtXIFrameStreamINF = "EXT-X-I-FRAME-STREAM-INF"
ExtXSessionData = "EXT-X-SESSION-DATA"
ExtXSessionKey = "EXT-X-SESSION-KEY"
ExtXIndependentSegments = "EXT-X-INDEPENDENT-SEGMENTS"
ExtXStart = "EXT-X-START"
)
//HttpContent-Type头必须是"application/vnd.apple.mpegurl"或"audio/mpegurl"
//无BOM
type M3U8Writer interface {
// AddSegment 添加切片
//@Params duration 切片时长
//@Params url m3u8列表中切片的url
//@Params sequence m3u8列表中的切片序号
//@Params path 切片位于磁盘中的绝对路径
AddSegment(duration float32, url string, sequence int, path string)
ToString() string
// Size 返回切片文件个数
Size() int
// Head 返回第一个切片文件
Head() Segment
}
func NewM3U8Writer(len int) M3U8Writer {
return &m3u8Writer{
stringBuffer: bytes.NewBuffer(make([]byte, 0, 1024*10)),
playlist: collections.NewQueue(len),
}
}
type Segment struct {
duration float32
url string
sequence int
path string
}
type m3u8Writer struct {
stringBuffer *bytes.Buffer
playlist *collections.Queue
}
func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) {
if m.playlist.IsFull() {
m.playlist.Pop()
}
m.playlist.Push(Segment{duration: duration, url: url, sequence: sequence, path: path})
}
func (m *m3u8Writer) targetDuration() int {
var targetDuration int
head, tail := m.playlist.Data()
compute := func(playlist []interface{}) {
for _, segment := range playlist {
//影响播放器缓存.
round := int(math.Ceil(float64(segment.(Segment).duration)))
if round > targetDuration {
targetDuration = round
}
}
}
if head != nil {
compute(head)
}
if tail != nil {
compute(tail)
}
return targetDuration
}
func (m *m3u8Writer) ToString() string {
//暂时只实现简单的播放列表
head, tail := m.playlist.Data()
if head == nil {
return ""
}
m.stringBuffer.Reset()
m.stringBuffer.WriteString("#EXTM3U\r\n")
//暂时只实现第三个版本
m.stringBuffer.WriteString("#EXT-X-VERSION:3\r\n")
m.stringBuffer.WriteString("#EXT-X-TARGETDURATION:")
m.stringBuffer.WriteString(strconv.Itoa(m.targetDuration()))
m.stringBuffer.WriteString("\r\n")
m.stringBuffer.WriteString("#EXT-X-MEDIA-SEQUENCE:")
m.stringBuffer.WriteString(strconv.Itoa(head[0].(Segment).sequence))
m.stringBuffer.WriteString("\r\n")
appendSegments := func(playlist []interface{}) {
for _, segment := range playlist {
m.stringBuffer.WriteString("#EXTINF:")
m.stringBuffer.WriteString(strconv.FormatFloat(float64(segment.(Segment).duration), 'f', -1, 32))
m.stringBuffer.WriteString(",\r\n")
m.stringBuffer.WriteString(segment.(Segment).url + "%s")
m.stringBuffer.WriteString("\r\n")
}
}
if head != nil {
appendSegments(head)
}
if tail != nil {
appendSegments(tail)
}
return m.stringBuffer.String()
}
func (m *m3u8Writer) Size() int {
var size int
head, tail := m.playlist.Data()
if head != nil {
size += len(head)
}
if tail != nil {
size += len(tail)
}
return size
}
func (m *m3u8Writer) Head() Segment {
head, _ := m.playlist.Data()
if head != nil {
return head[0].(Segment)
}
return Segment{}
}

View File

@@ -3,9 +3,9 @@ package jt1078
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/collections"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"

93
main.go
View File

@@ -3,7 +3,6 @@ package main
import (
"encoding/json"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/hls"
@@ -11,103 +10,16 @@ import (
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/record"
"github.com/lkmio/lkm/rtc"
"github.com/lkmio/lkm/rtmp"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/lkm/stream"
"go.uber.org/zap/zapcore"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
"github.com/lkmio/lkm/rtmp"
"github.com/lkmio/lkm/stream"
)
func readRunArgs() (map[string]string, map[string]string) {
args := os.Args
// 运行参数项优先级高于config.json参数项
// --disable-rtmp --enable-rtmp=11935
// --disable-rtsp --enable-rtsp
// --disable-hls --enable-hls
// --disable-webrtc --enable-webrtc=18000
// --disable-gb28181 --enable-gb28181
// --disable-jt1078 --enable-jt1078=11078
// --disable-hooks --enable-hooks
// --disable-record --enable-record
disableOptions := map[string]string{}
enableOptions := map[string]string{}
for _, arg := range args {
// 参数忽略大小写
arg = strings.ToLower(arg)
var option string
var enable bool
if strings.HasPrefix(arg, "--disable-") {
option = arg[len("--disable-"):]
} else if strings.HasPrefix(arg, "--enable-") {
option = arg[len("--enable-"):]
enable = true
} else {
continue
}
pair := strings.Split(option, "=")
var value string
if len(pair) > 1 {
value = pair[1]
}
if enable {
enableOptions[pair[0]] = value
} else {
disableOptions[pair[0]] = value
}
}
// 删除重叠参数, 禁用和开启同时声明时, 以开启为准.
for k := range enableOptions {
if _, ok := disableOptions[k]; ok {
delete(disableOptions, k)
}
}
return disableOptions, enableOptions
}
func mergeArgs(options map[string]stream.EnableConfig, disableOptions, enableOptions map[string]string) {
for k := range disableOptions {
option, ok := options[k]
utils.Assert(ok)
option.SetEnable(false)
}
for k, v := range enableOptions {
var port int
if len(v) > 0 {
atoi, err := strconv.Atoi(v)
if err == nil && atoi > 0 {
port = atoi
}
}
option, ok := options[k]
utils.Assert(ok)
option.SetEnable(true)
if port > 0 {
if config, ok := option.(stream.PortConfig); ok {
config.SetPort(port)
}
}
}
}
func init() {
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory)
@@ -116,6 +28,7 @@ func init() {
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBStreamForward, gb28181.TransStreamFactory)
stream.SetRecordStreamFactory(record.NewFLVFileSink)
stream.StreamEndInfoBride = NewStreamEndInfo
config, err := stream.LoadConfigFile("./config.json")
if err != nil {

View File

@@ -37,28 +37,29 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
for index, track := range tracks {
var mimeType string
var id string
if utils.AVCodecIdH264 == track.CodecId() {
codecId := track.Stream.CodecId()
if utils.AVCodecIdH264 == codecId {
mimeType = webrtc.MimeTypeH264
} else if utils.AVCodecIdH265 == track.CodecId() {
} else if utils.AVCodecIdH265 == codecId {
mimeType = webrtc.MimeTypeH265
} else if utils.AVCodecIdAV1 == track.CodecId() {
} else if utils.AVCodecIdAV1 == codecId {
mimeType = webrtc.MimeTypeAV1
} else if utils.AVCodecIdVP8 == track.CodecId() {
} else if utils.AVCodecIdVP8 == codecId {
mimeType = webrtc.MimeTypeVP8
} else if utils.AVCodecIdVP9 == track.CodecId() {
} else if utils.AVCodecIdVP9 == codecId {
mimeType = webrtc.MimeTypeVP9
} else if utils.AVCodecIdOPUS == track.CodecId() {
} else if utils.AVCodecIdOPUS == codecId {
mimeType = webrtc.MimeTypeOpus
} else if utils.AVCodecIdPCMALAW == track.CodecId() {
} else if utils.AVCodecIdPCMALAW == codecId {
mimeType = webrtc.MimeTypePCMA
} else if utils.AVCodecIdPCMMULAW == track.CodecId() {
} else if utils.AVCodecIdPCMMULAW == codecId {
mimeType = webrtc.MimeTypePCMU
} else {
log.Sugar.Errorf("codec %s not compatible with webrtc", track.CodecId())
log.Sugar.Errorf("codec %s not compatible with webrtc", codecId)
continue
}
if utils.AVMediaTypeAudio == track.Type() {
if utils.AVMediaTypeAudio == track.Stream.Type() {
id = "audio"
} else {
id = "video"

View File

@@ -23,11 +23,11 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
t.AppendOutStreamBuffer(packet.Data())
} else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
extra := t.BaseTransStream.Tracks[packet.Index()].Stream.CodecParameters().AnnexBExtraData()
t.AppendOutStreamBuffer(extra)
}
t.AppendOutStreamBuffer(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
t.AppendOutStreamBuffer(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream))
}
return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
@@ -74,6 +74,6 @@ func NewTransStream() stream.TransStream {
return t
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewTransStream(), nil
}

View File

@@ -123,12 +123,12 @@ func (t *transStream) WriteHeader() error {
var videoCodecId utils.AVCodecID
for _, track := range t.Tracks {
if utils.AVMediaTypeAudio == track.Type() {
audioStream = track
if utils.AVMediaTypeAudio == track.Stream.Type() {
audioStream = track.Stream
audioCodecId = audioStream.CodecId()
t.audioChunk = librtmp.NewAudioChunk()
} else if utils.AVMediaTypeVideo == track.Type() {
videoStream = track
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
videoStream = track.Stream
videoCodecId = videoStream.CodecId()
t.videoChunk = librtmp.NewVideoChunk()
}
@@ -193,6 +193,6 @@ func NewTransStream(chunkSize int) stream.TransStream {
return &transStream{chunkSize: chunkSize}
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewTransStream(librtmp.ChunkSize), nil
}

View File

@@ -32,7 +32,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
// sdp回调给sink, sink应答给describe请求
if s.sdpCb != nil {
s.sdpCb(transStream.(*TranStream).sdp)
s.sdpCb(transStream.(*TransStream).sdp)
s.sdpCb = nil
}
@@ -126,6 +126,7 @@ func (s *Sink) Write(index int, data [][]byte, rtpTime int64) error {
func (s *Sink) isConnected(index int) bool {
return s.TCPStreaming || (s.senders[index] != nil && s.senders[index].RtpConn != nil)
}
func (s *Sink) Close() {
s.BaseSink.Close()

View File

@@ -17,115 +17,129 @@ const (
OverTcpMagic = 0x24
)
// TranStream rtsp传输流封装
// TransStream rtsp传输流封装
// 低延迟是rtsp特性, 所以不考虑实现GOP缓存
type TranStream struct {
type TransStream struct {
stream.BaseTransStream
addr net.IPAddr
addrType string
urlFormat string
rtpTracks []*Track
RtspTracks []*Track
//oldTracks []*Track
oldTracks map[byte]uint16
sdp string
buffer *stream.ReceiveBuffer
buffer *stream.ReceiveBuffer // 保存封装后的rtp包
}
func (t *TranStream) OverTCP(data []byte, channel int) {
func (t *TransStream) OverTCP(data []byte, channel int) {
data[0] = OverTcpMagic
data[1] = byte(channel)
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
func (t *TranStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var ts uint32
track := t.rtpTracks[packet.Index()]
track.seq = track.muxer.GetHeader().Seq
track := t.RtspTracks[packet.Index()]
if utils.AVMediaTypeAudio == packet.MediaType() {
ts = uint32(packet.ConvertPts(track.rate))
t.PackRtpPayload(track.muxer, packet.Index(), packet.Data(), ts)
ts = uint32(packet.ConvertPts(track.Rate))
t.PackRtpPayload(track, packet.Index(), packet.Data(), ts)
} else if utils.AVMediaTypeVideo == packet.MediaType() {
ts = uint32(packet.ConvertPts(track.rate))
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
t.PackRtpPayload(track.muxer, packet.Index(), data, ts)
ts = uint32(packet.ConvertPts(track.Rate))
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream))
t.PackRtpPayload(track, packet.Index(), data, ts)
}
return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
}
func (t *TranStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
func (t *TransStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
// 返回视频编码数据的rtp包
for _, track := range t.rtpTracks {
if utils.AVMediaTypeVideo != track.mediaType {
for _, track := range t.RtspTracks {
if utils.AVMediaTypeVideo != track.MediaType {
continue
}
// 回滚序号和时间戳
index := int(track.seq) - len(track.extraDataBuffer)
for i, bytes := range track.extraDataBuffer {
index := int(track.StartSeq) - len(track.ExtraDataBuffer)
for i, bytes := range track.ExtraDataBuffer {
librtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts))
}
return track.extraDataBuffer, ts, nil
return track.ExtraDataBuffer, ts, nil
}
return nil, ts, nil
}
func (t *TranStream) PackRtpPayload(muxer librtp.Muxer, channel int, data []byte, timestamp uint32) {
func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) {
var index int
muxer.Input(data, timestamp, func() []byte {
// 保存开始序号
track.StartSeq = track.Muxer.GetHeader().Seq
track.Muxer.Input(data, timestamp, func() []byte {
index = t.buffer.Index()
block := t.buffer.GetBlock()
return block[OverTcpHeaderSize:]
}, func(bytes []byte) {
track.EndSeq = track.Muxer.GetHeader().Seq
packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(packet, channel)
t.AppendOutStreamBuffer(packet)
})
}
func (t *TranStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
payloadType, ok := librtp.CodecIdPayloads[stream.CodecId()]
payloadType, ok := librtp.CodecIdPayloads[track.Stream.CodecId()]
if !ok {
return fmt.Errorf("no payload type was found for codecid: %d", stream.CodecId())
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecId())
}
// 恢复上次拉流的序号
var startSeq uint16
if t.oldTracks != nil {
startSeq, ok = t.oldTracks[byte(payloadType.Pt)]
utils.Assert(ok)
}
// 创建RTP封装器
var muxer librtp.Muxer
if utils.AVCodecIdH264 == stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, 0, 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == stream.CodecId() {
muxer = librtp.NewH265Muxer(payloadType.Pt, 0, 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == stream.CodecId() {
muxer = librtp.NewAACMuxer(payloadType.Pt, 0, 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == stream.CodecId() || utils.AVCodecIdPCMMULAW == stream.CodecId() {
muxer = librtp.NewMuxer(payloadType.Pt, 0, 0xFFFFFFFF)
if utils.AVCodecIdH264 == track.Stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == track.Stream.CodecId() {
muxer = librtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == track.Stream.CodecId() {
muxer = librtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecId() || utils.AVCodecIdPCMMULAW == track.Stream.CodecId() {
muxer = librtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
}
t.rtpTracks = append(t.rtpTracks, NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, stream.Type()))
index := len(t.rtpTracks) - 1
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.Type())
t.RtspTracks = append(t.RtspTracks, rtspTrack)
index := len(t.RtspTracks) - 1
// 将sps和pps按照单一模式打包
bufferIndex := t.buffer.Index()
if utils.AVMediaTypeVideo == stream.Type() {
parameters := stream.CodecParameters()
if utils.AVMediaTypeVideo == track.Stream.Type() {
parameters := track.Stream.CodecParameters()
if utils.AVCodecIdH265 == stream.CodecId() {
if utils.AVCodecIdH265 == track.Stream.CodecId() {
bytes := parameters.(*utils.HEVCCodecData).VPS()
t.PackRtpPayload(muxer, index, bytes[0], 0)
t.PackRtpPayload(rtspTrack, index, bytes[0], 0)
}
spsBytes := parameters.SPS()
ppsBytes := parameters.PPS()
t.PackRtpPayload(muxer, index, spsBytes[0], 0)
t.PackRtpPayload(muxer, index, ppsBytes[0], 0)
t.PackRtpPayload(rtspTrack, index, spsBytes[0], 0)
t.PackRtpPayload(rtspTrack, index, ppsBytes[0], 0)
// 拷贝扩展数据的rtp包
size := t.buffer.Index() - bufferIndex
@@ -137,14 +151,14 @@ func (t *TranStream) AddTrack(stream utils.AVStream) error {
extraRtpBuffer[i] = dst[:OverTcpHeaderSize+binary.BigEndian.Uint16(dst[2:])]
}
t.rtpTracks[index].extraDataBuffer = extraRtpBuffer
t.RtspTracks[index].ExtraDataBuffer = extraRtpBuffer
}
return nil
}
func (t *TranStream) Close() ([][]byte, int64, error) {
for _, track := range t.rtpTracks {
func (t *TransStream) Close() ([][]byte, int64, error) {
for _, track := range t.RtspTracks {
if track != nil {
track.Close()
}
@@ -153,7 +167,7 @@ func (t *TranStream) Close() ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *TranStream) WriteHeader() error {
func (t *TransStream) WriteHeader() error {
description := sdp.SessionDescription{
Version: 0,
Origin: sdp.Origin{
@@ -177,7 +191,7 @@ func (t *TranStream) WriteHeader() error {
}
for i, track := range t.Tracks {
payloadType, _ := librtp.CodecIdPayloads[track.CodecId()]
payloadType, _ := librtp.CodecIdPayloads[track.Stream.CodecId()]
mediaDescription := sdp.MediaDescription{
ConnectionInformation: &sdp.ConnectionInformation{
NetworkType: "IN",
@@ -195,10 +209,10 @@ func (t *TranStream) WriteHeader() error {
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)}
if utils.AVMediaTypeAudio == track.Type() {
if utils.AVMediaTypeAudio == track.Stream.Type() {
mediaDescription.MediaName.Media = "audio"
if utils.AVCodecIdAAC == track.CodecId() {
if utils.AVCodecIdAAC == track.Stream.CodecId() {
//[14496-3], [RFC6416] profile-level-id:
//1 : Main Audio Profile Level 1
//9 : Speech Audio Profile Level 1
@@ -237,12 +251,13 @@ func (t *TranStream) WriteHeader() error {
return nil
}
func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
t := &TranStream{
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream {
t := &TransStream{
addr: addr,
urlFormat: urlFormat,
// 在将AVPacket打包rtp时, 会使用多个buffer块, 回环覆盖多个rtp块, 如果是TCP拉流并且网络不好, 推流的数据会错乱.
buffer: stream.NewReceiveBuffer(1500, 1024),
buffer: stream.NewReceiveBuffer(1500, 1024),
oldTracks: oldTracks,
}
if addr.IP.To4() != nil {
@@ -254,10 +269,15 @@ func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
return t
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
trackFormat := "?track=%d"
var oldTracks map[byte]uint16
if endInfo := source.GetStreamEndInfo(); endInfo != nil {
oldTracks = endInfo.RtspTracks
}
return NewTransStream(net.IPAddr{
IP: net.ParseIP(stream.AppConfig.PublicIP),
Zone: "",
}, trackFormat), nil
}, trackFormat, oldTracks), nil
}

View File

@@ -7,13 +7,14 @@ import (
// Track RtspTrack 对rtsp每路输出流的封装
type Track struct {
pt byte
rate int
mediaType utils.AVMediaType
seq uint16
PT byte
Rate int
MediaType utils.AVMediaType
StartSeq uint16
EndSeq uint16
muxer librtp.Muxer
extraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
Muxer librtp.Muxer
ExtraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
}
func (r *Track) Close() {
@@ -21,10 +22,10 @@ func (r *Track) Close() {
func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
stream := &Track{
pt: pt,
rate: rate,
muxer: muxer,
mediaType: mediaType,
PT: pt,
Rate: rate,
Muxer: muxer,
MediaType: mediaType,
}
return stream

View File

@@ -1,8 +1,8 @@
package stream
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/collections"
)
// GOPBuffer GOP缓存

View File

@@ -1,8 +1,8 @@
package stream
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/collections"
)
// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存

View File

@@ -223,6 +223,11 @@ func (s *BaseSink) Close() {
// 从等待队列中删除Sink
RemoveSinkFromWaitingQueue(s.SourceID, s.ID)
go HookPlayDoneEvent(s)
// 等待队列为空, 不再保留推流源信息
if !ExistSourceInWaitingQueue(s.SourceID) {
streamEndInfoManager.Remove(s.SourceID)
}
}
}
@@ -251,7 +256,7 @@ func (s *BaseSink) StartStreaming(stream TransStream) error {
}
func (s *BaseSink) StopStreaming(stream TransStream) {
s.SentPacketCount = 0
}
func (s *BaseSink) GetConn() net.Conn {

View File

@@ -76,6 +76,14 @@ func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
return ok
}
func ExistSourceInWaitingQueue(id string) bool {
mutex.RLock()
defer mutex.RUnlock()
_, ok := waitingSinks[id]
return ok
}
func ExistSink(sourceId string, sinkId SinkID) bool {
if sourceId != "" {
if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist {

View File

@@ -2,8 +2,8 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/collections"
"github.com/lkmio/lkm/log"
"net"
"net/url"
@@ -16,7 +16,11 @@ import (
"github.com/lkmio/lkm/transcode"
)
// Source 对推流源的封装, 处理除解析流以外的所有事情
var (
StreamEndInfoBride func(s Source) *StreamEndInfo
)
// Source 对推流源的封装
type Source interface {
// GetID 返回SourceID
GetID() string
@@ -31,11 +35,11 @@ type Source interface {
SetType(sourceType SourceType)
// OriginStreams 返回推流的原始Streams
OriginStreams() []utils.AVStream
// OriginTracks 返回所有的推流track
OriginTracks() []*Track
// TranscodeStreams 返回转码的Streams
TranscodeStreams() []utils.AVStream
// TranscodeTracks 返回所有的转码track
TranscodeTracks() []*Track
// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader先将Sink添加到等待队列.
// 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink
@@ -119,6 +123,10 @@ type Source interface {
Sinks() []Sink
GetBitrateStatistics() *BitrateStatistics
GetTransStreams() map[TransStreamID]TransStream
GetStreamEndInfo() *StreamEndInfo
}
type PublishSource struct {
@@ -133,8 +141,8 @@ type PublishSource struct {
hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟.
audioTranscoders []transcode.Transcoder // 音频解码器
videoTranscoders []transcode.Transcoder // 视频解码器
originStreams StreamManager // 推流的音视频Streams
allStreams StreamManager // 推流Streams+转码器获得的Stream
originTracks TrackManager // 推流的音视频Streams
allStreamTracks TrackManager // 推流Streams+转码器获得的Stream
pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
@@ -144,9 +152,12 @@ type PublishSource struct {
probeTimer *time.Timer // track解析超时计时器, 触发时执行@see writeHeader
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
sinks map[SinkID]Sink // 保存所有Sink
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
sinks map[SinkID]Sink // 保存所有Sink
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
streamEndInfo *StreamEndInfo // 之前推流源信息
accumulateTimestamps bool // 是否累加时间戳
timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳
streamPipe chan []byte // 推流数据管道
mainContextEvents chan func() // 切换到主协程执行函数的事件管道
@@ -227,7 +238,7 @@ func (s *PublishSource) CreateDefaultOutStreams() {
// 创建HLS输出流
if AppConfig.Hls.Enable {
streams := s.OriginStreams()
streams := s.OriginTracks()
utils.Assert(len(streams) > 0)
id := GenerateTransStreamID(TransStreamHls, streams...)
@@ -270,12 +281,12 @@ func (s *PublishSource) Input(data []byte) error {
return nil
}
func (s *PublishSource) OriginStreams() []utils.AVStream {
return s.originStreams.All()
func (s *PublishSource) OriginTracks() []*Track {
return s.originTracks.All()
}
func (s *PublishSource) TranscodeStreams() []utils.AVStream {
return s.allStreams.All()
func (s *PublishSource) TranscodeTracks() []*Track {
return s.allStreamTracks.All()
}
func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool {
@@ -286,20 +297,24 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils
return true
}
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source: %s", protocol.String(), s.ID)
transStream, err := CreateTransStream(s, protocol, streams)
transStream, err := CreateTransStream(s, protocol, tracks)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return nil, err
}
for _, track := range streams {
transStream.AddTrack(track)
for _, track := range tracks {
// 重新拷贝一个track传输流内部使用track的时间戳
newTrack := *track
transStream.AddTrack(&newTrack)
}
transStream.SetID(id)
transStream.SetTransStreamProtocol(protocol)
// 创建输出流对应的拉流队列
s.TransStreamSinks[id] = make(map[SinkID]Sink, 128)
_ = transStream.WriteHeader()
@@ -365,11 +380,11 @@ func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int
func (s *PublishSource) doAddSink(sink Sink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
audioStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeAudio)
videoStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeVideo)
audioTrack := s.originTracks.FindWithType(utils.AVMediaTypeAudio)
videoTrack := s.originTracks.FindWithType(utils.AVMediaTypeVideo)
disableAudio := audioStream == nil
disableVideo := videoStream == nil || !sink.EnableVideo()
disableAudio := audioTrack == nil
disableVideo := videoTrack == nil || !sink.EnableVideo()
if disableAudio && disableVideo {
return false
}
@@ -381,39 +396,37 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
}
if !disableAudio && utils.AVCodecIdNONE == audioCodecId {
audioCodecId = audioStream.CodecId()
audioCodecId = audioTrack.Stream.CodecId()
}
if !disableVideo && utils.AVCodecIdNONE == videoCodecId {
videoCodecId = videoStream.CodecId()
videoCodecId = videoTrack.Stream.CodecId()
}
// 创建音频转码器
if !disableAudio && audioCodecId != audioStream.CodecId() {
if !disableAudio && audioCodecId != audioTrack.Stream.CodecId() {
utils.Assert(false)
}
// 创建视频转码器
if !disableVideo && videoCodecId != videoStream.CodecId() {
if !disableVideo && videoCodecId != videoTrack.Stream.CodecId() {
utils.Assert(false)
}
var streams [5]utils.AVStream
var size int
for _, stream_ := range s.originStreams.All() {
if disableVideo && stream_.Type() == utils.AVMediaTypeVideo {
// 查找传输流需要的所有track
var tracks []*Track
for _, track := range s.originTracks.All() {
if disableVideo && track.Stream.Type() == utils.AVMediaTypeVideo {
continue
}
streams[size] = stream_
size++
tracks = append(tracks, track)
}
transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
transStreamId := GenerateTransStreamID(sink.GetProtocol(), tracks...)
transStream, exist := s.TransStreams[transStreamId]
if !exist {
var err error
transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), streams[:size])
transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), tracks)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return false
@@ -597,6 +610,12 @@ func (s *PublishSource) DoClose() {
return
}
// 保留推流信息
if s.sinkCount > 0 && len(s.originTracks.All()) > 0 {
sourceHistory := StreamEndInfoBride(s)
streamEndInfoManager.Add(sourceHistory)
}
// 关闭所有输出流
for _, transStream := range s.TransStreams {
// 发送剩余包
@@ -669,6 +688,7 @@ func (s *PublishSource) Close() {
group.Wait()
}
// OnDiscardPacket GOP缓存溢出丢弃回调
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead()
}
@@ -682,11 +702,11 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
return
}
s.originStreams.Add(stream)
s.allStreams.Add(stream)
s.originTracks.Add(NewTrack(stream, 0, 0))
s.allStreamTracks.Add(NewTrack(stream, 0, 0))
// 启动track解析超时计时器
if len(s.originStreams.All()) == 1 {
if len(s.originTracks.All()) == 1 {
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, func() {
s.PostEvent(func() {
s.writeHeader()
@@ -718,12 +738,32 @@ func (s *PublishSource) writeHeader() {
s.probeTimer.Stop()
}
if len(s.originStreams.All()) == 0 {
if len(s.originTracks.All()) == 0 {
log.Sugar.Errorf("没有一路track, 删除source: %s", s.ID)
s.DoClose()
return
}
// 尝试恢复上次推流的会话
if streamInfo := streamEndInfoManager.Remove(s.ID); streamInfo != nil && EqualsTracks(streamInfo, s.originTracks.All()) {
s.streamEndInfo = streamInfo
// 恢复每路track的时间戳
tracks := s.originTracks.All()
for _, track := range tracks {
timestamps := streamInfo.Timestamps[track.Stream.CodecId()]
track.Dts = timestamps[0]
track.Pts = timestamps[1]
}
}
// 纠正GOP中的时间戳
if s.gopBuffer.Size() != 0 {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
s.CorrectTimestamp(packet)
})
}
// 创建录制流和HLS
s.CreateDefaultOutStreams()
@@ -744,10 +784,10 @@ func (s *PublishSource) IsCompleted() bool {
return s.completed
}
// NotTrackAdded 是否没有添加该index对应的track
// NotTrackAdded 返回该index对应的track是否没有添加
func (s *PublishSource) NotTrackAdded(index int) bool {
for _, avStream := range s.originStreams.All() {
if avStream.Index() == index {
for _, track := range s.originTracks.All() {
if track.Stream.Index() == index {
return false
}
}
@@ -759,6 +799,31 @@ func (s *PublishSource) OnDeMuxStreamDone() {
s.writeHeader()
}
func (s *PublishSource) CorrectTimestamp(packet utils.AVPacket) {
// 对比第一包的时间戳和上次推流的最后时间戳。如果小于上次的推流时间戳,则在原来的基础上累加。
if s.streamEndInfo != nil && !s.timestampModeDecided {
s.timestampModeDecided = true
timestamps := s.streamEndInfo.Timestamps[packet.CodecId()]
if packet.Dts() < timestamps[0] || packet.Pts() < timestamps[1] {
s.accumulateTimestamps = true
log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1])
}
}
// 累加时间戳
if s.accumulateTimestamps {
timestamps := s.streamEndInfo.Timestamps[packet.CodecId()]
packet.SetDts(timestamps[0] + packet.Dts())
packet.SetPts(timestamps[1] + packet.Pts())
}
// 更新track最近的时间戳
track := s.originTracks.Find(packet.CodecId())
track.Dts = packet.Dts()
track.Pts = packet.Pts()
}
func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
// track超时忽略推流数据
if s.NotTrackAdded(packet.Index()) {
@@ -766,13 +831,19 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
return
}
// 保存到GOP缓存
if AppConfig.GOPCache && s.existVideo {
s.gopBuffer.AddPacket(packet)
}
// 分发给各个传输流
for _, transStream := range s.TransStreams {
s.DispatchPacket(transStream, packet)
// track解析完毕后才能生成传输流
if s.completed {
s.CorrectTimestamp(packet)
// 分发给各个传输流
for _, transStream := range s.TransStreams {
s.DispatchPacket(transStream, packet)
}
}
// 未开启GOP缓存或只存在音频流, 释放掉内存
@@ -849,3 +920,11 @@ func (s *PublishSource) Sinks() []Sink {
func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics {
return s.statistics
}
func (s *PublishSource) GetTransStreams() map[TransStreamID]TransStream {
return s.TransStreams
}
func (s *PublishSource) GetStreamEndInfo() *StreamEndInfo {
return s.streamEndInfo
}

64
stream/stream_endinfo.go Normal file
View File

@@ -0,0 +1,64 @@
package stream
import (
"github.com/lkmio/avformat/libhls"
"github.com/lkmio/avformat/utils"
"sync"
)
var (
streamEndInfoManager *StreamEndInfoManager
)
func init() {
streamEndInfoManager = &StreamEndInfoManager{sources: make(map[string]*StreamEndInfo, 32)}
}
// StreamEndInfo 保留结束推流Source的推流信息
// 在结束推流时,如果还有拉流端没有断开,则保留一些推流信息(目前有时间戳、ts切片序号等等)。在下次推流时使用该时间戳作为新传输流的起始时间戳保证拉流端在拉流时不会重现pts和dts错误.
// 如果在此之前陆续有拉流端断开直至sink计数为0则会不再保留该信息。
type StreamEndInfo struct {
ID string
Timestamps map[utils.AVCodecID][2]int64 // 每路track最后的时间戳
M3U8Writer libhls.M3U8Writer
PlaylistFormat *string
RtspTracks map[byte]uint16
}
func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool {
if len(info.Timestamps) != len(tracks) {
return false
}
for _, track := range tracks {
if _, ok := info.Timestamps[track.Stream.CodecId()]; !ok {
return false
}
}
return true
}
type StreamEndInfoManager struct {
sources map[string]*StreamEndInfo
lock sync.RWMutex
}
func (s *StreamEndInfoManager) Add(history *StreamEndInfo) {
s.lock.Lock()
defer s.lock.Unlock()
_, ok := s.sources[history.ID]
utils.Assert(!ok)
s.sources[history.ID] = history
}
func (s *StreamEndInfoManager) Remove(id string) *StreamEndInfo {
s.lock.Lock()
defer s.lock.Unlock()
history := s.sources[id]
delete(s.sources, id)
return history
}

View File

@@ -2,10 +2,9 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/utils"
)
type TransStreamFactory func(source Source, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error)
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)
type RecordStreamFactory func(source string) (Sink, string, error)
@@ -36,13 +35,13 @@ func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, e
return f, nil
}
func CreateTransStream(source Source, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
func CreateTransStream(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) {
factory, err := FindTransStreamFactory(protocol)
if err != nil {
return nil, err
}
return factory(source, protocol, streams)
return factory(source, protocol, tracks)
}
func SetRecordStreamFactory(factory RecordStreamFactory) {

View File

@@ -1,75 +0,0 @@
package stream
import (
"github.com/lkmio/avformat/utils"
)
type StreamManager struct {
streams []utils.AVStream
}
func (s *StreamManager) Add(stream utils.AVStream) {
for _, stream_ := range s.streams {
utils.Assert(stream_.Type() != stream.Type())
utils.Assert(stream_.CodecId() != stream.CodecId())
}
s.streams = append(s.streams, stream)
//按照AVCodecId升序排序
for i := 0; i < len(s.streams); i++ {
for j := 1; j < len(s.streams)-i; j++ {
tmp := s.streams[j-1]
if s.streams[j].CodecId() < tmp.CodecId() {
s.streams[j-1] = s.streams[j]
s.streams[j] = tmp
}
}
}
}
func (s *StreamManager) FindStream(id utils.AVCodecID) utils.AVStream {
for _, stream_ := range s.streams {
if stream_.CodecId() == id {
return stream_
}
}
return nil
}
func (s *StreamManager) FindStreamWithType(mediaType utils.AVMediaType) utils.AVStream {
for _, stream_ := range s.streams {
if stream_.Type() == mediaType {
return stream_
}
}
return nil
}
func (s *StreamManager) FindStreams(id utils.AVCodecID) []utils.AVStream {
var streams []utils.AVStream
for _, stream_ := range s.streams {
if stream_.CodecId() == id {
streams = append(streams, stream_)
}
}
return streams
}
func (s *StreamManager) FindStreamsWithType(mediaType utils.AVMediaType) []utils.AVStream {
var streams []utils.AVStream
for _, stream_ := range s.streams {
if stream_.Type() == mediaType {
streams = append(streams, stream_)
}
}
return streams
}
func (s *StreamManager) All() []utils.AVStream {
return s.streams
}

13
stream/track.go Normal file
View File

@@ -0,0 +1,13 @@
package stream
import "github.com/lkmio/avformat/utils"
type Track struct {
Stream utils.AVStream
Pts int64 // 最新的PTS
Dts int64 // 最新的DTS
}
func NewTrack(stream utils.AVStream, dts, pts int64) *Track {
return &Track{stream, dts, pts}
}

64
stream/track_manager.go Normal file
View File

@@ -0,0 +1,64 @@
package stream
import (
"github.com/lkmio/avformat/utils"
)
type TrackManager struct {
tracks []*Track
}
func (s *TrackManager) Add(track *Track) {
for _, t := range s.tracks {
utils.Assert(t.Stream.Type() != track.Stream.Type())
utils.Assert(t.Stream.CodecId() != track.Stream.CodecId())
}
s.tracks = append(s.tracks, track)
}
func (s *TrackManager) Find(id utils.AVCodecID) *Track {
for _, track := range s.tracks {
if track.Stream.CodecId() == id {
return track
}
}
return nil
}
func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track {
for _, track := range s.tracks {
if track.Stream.Type() == mediaType {
return track
}
}
return nil
}
func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track {
var tracks []*Track
for _, track := range s.tracks {
if track.Stream.CodecId() == id {
tracks = append(tracks, track)
}
}
return tracks
}
func (s *TrackManager) FindTracksWithType(mediaType utils.AVMediaType) []*Track {
var tracks []*Track
for _, track := range s.tracks {
if track.Stream.Type() == mediaType {
tracks = append(tracks, track)
}
}
return tracks
}
func (s *TrackManager) All() []*Track {
return s.tracks
}

View File

@@ -12,11 +12,11 @@ type TransStream interface {
Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
AddTrack(stream utils.AVStream) error
AddTrack(track *Track) error
TrackCount() int
GetTracks() []utils.AVStream
GetTracks() []*Track
WriteHeader() error
@@ -39,17 +39,18 @@ type TransStream interface {
OutStreamBufferCapacity() int
IsExistVideo() bool
SetTransStreamProtocol(protocol TransStreamProtocol)
}
type BaseTransStream struct {
//muxer stream.Muxer
ID TransStreamID
Tracks []utils.AVStream
Tracks []*Track
Completed bool
ExistVideo bool
Protocol TransStreamProtocol
OutBuffer [][]byte // 完成封装的输出流队列
OutBuffer [][]byte // 输出流的返回队列
OutBufferSize int
}
@@ -65,9 +66,9 @@ func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, e
return nil, -1, false, nil
}
func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
t.Tracks = append(t.Tracks, stream)
if utils.AVMediaTypeVideo == stream.Type() {
func (t *BaseTransStream) AddTrack(track *Track) error {
t.Tracks = append(t.Tracks, track)
if utils.AVMediaTypeVideo == track.Stream.Type() {
t.ExistVideo = true
}
return nil
@@ -109,7 +110,7 @@ func (t *BaseTransStream) TrackCount() int {
return len(t.Tracks)
}
func (t *BaseTransStream) GetTracks() []utils.AVStream {
func (t *BaseTransStream) GetTracks() []*Track {
return t.Tracks
}
@@ -125,6 +126,10 @@ func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *BaseTransStream) SetTransStreamProtocol(protocol TransStreamProtocol) {
t.Protocol = protocol
}
type TCPTransStream struct {
BaseTransStream

View File

@@ -49,18 +49,18 @@ func init() {
}*/
// GenerateTransStreamID 根据输出流协议和输出流包含的音视频编码器ID生成流ID
func GenerateTransStreamID(protocol TransStreamProtocol, ids ...utils.AVStream) TransStreamID {
len_ := len(ids)
func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) TransStreamID {
len_ := len(tracks)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id.CodecId())]
for i, track := range tracks {
id, ok := narrowCodecIds[int(track.Stream.CodecId())]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
streamId |= uint64(id) << (48 - i*8)
}
return TransStreamID(streamId)