mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
Compare commits
3 Commits
059bc20018
...
7155b866c2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7155b866c2 | ||
![]() |
1e982fce5f | ||
![]() |
93ea8f2fc2 |
@@ -37,9 +37,9 @@ func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.
|
|||||||
}
|
}
|
||||||
} else if stream.TransStreamRtsp == transStream.GetProtocol() {
|
} else if stream.TransStreamRtsp == transStream.GetProtocol() {
|
||||||
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
|
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
|
||||||
info.RtspTracks = make(map[byte]uint16, len(tracks))
|
info.RtspTracks = make(map[int]uint16, len(tracks))
|
||||||
for _, track := range rtsp.RtspTracks {
|
for _, track := range rtsp.RtspTracks {
|
||||||
info.RtspTracks[track.PT] = track.EndSeq
|
info.RtspTracks[int(track.CodecID)] = track.EndSeq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if stream.TransStreamFlv == transStream.GetProtocol() {
|
} else if stream.TransStreamFlv == transStream.GetProtocol() {
|
||||||
|
@@ -18,7 +18,7 @@ type TransStream struct {
|
|||||||
flvExtraDataBlock []byte // metadata和sequence header
|
flvExtraDataBlock []byte // metadata和sequence header
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
t.ClearOutStreamBuffer()
|
t.ClearOutStreamBuffer()
|
||||||
|
|
||||||
var flvTagSize int
|
var flvTagSize int
|
||||||
@@ -94,20 +94,19 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
|||||||
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
|
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
var index int
|
||||||
return err
|
var err error
|
||||||
}
|
|
||||||
|
|
||||||
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
||||||
t.Muxer.AddAudioTrack(track.Stream)
|
index, err = t.Muxer.AddAudioTrack(track.Stream)
|
||||||
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||||
t.Muxer.AddVideoTrack(track.Stream)
|
index, err = t.Muxer.AddVideoTrack(track.Stream)
|
||||||
|
|
||||||
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
|
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
|
||||||
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
|
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return index, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) WriteHeader() error {
|
func (t *TransStream) WriteHeader() error {
|
||||||
|
@@ -2,11 +2,9 @@ package gb28181
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"github.com/lkmio/avformat"
|
"github.com/lkmio/avformat"
|
||||||
"github.com/lkmio/avformat/collections"
|
"github.com/lkmio/avformat/collections"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
"github.com/lkmio/lkm/log"
|
|
||||||
"github.com/lkmio/lkm/stream"
|
"github.com/lkmio/lkm/stream"
|
||||||
"github.com/lkmio/mpeg"
|
"github.com/lkmio/mpeg"
|
||||||
"github.com/lkmio/rtp"
|
"github.com/lkmio/rtp"
|
||||||
@@ -17,49 +15,25 @@ type GBGateway struct {
|
|||||||
ps *mpeg.PSMuxer
|
ps *mpeg.PSMuxer
|
||||||
rtp rtp.Muxer
|
rtp rtp.Muxer
|
||||||
psBuffer []byte
|
psBuffer []byte
|
||||||
tracks map[utils.AVCodecID]int // codec->track index
|
|
||||||
rtpBuffer *stream.RtpBuffer
|
rtpBuffer *stream.RtpBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GBGateway) WriteHeader() error {
|
func (s *GBGateway) AddTrack(track *stream.Track) (int, error) {
|
||||||
if len(s.tracks) == 0 {
|
|
||||||
return fmt.Errorf("no tracks available")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GBGateway) AddTrack(track *stream.Track) error {
|
|
||||||
s.BaseTransStream.AddTrack(track)
|
|
||||||
|
|
||||||
if utils.AVCodecIdH264 == track.Stream.CodecID || utils.AVCodecIdH265 == track.Stream.CodecID || utils.AVCodecIdAAC == track.Stream.CodecID || utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
|
|
||||||
} else {
|
|
||||||
log.Sugar.Errorf("不支持的编码格式: %d", track.Stream.CodecID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
|
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Error("添加%s到ps muxer失败", track.Stream.CodecID)
|
return -1, err
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.tracks[track.Stream.CodecID] = index
|
return index, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
trackIndex, ok := s.tracks[packet.CodecID]
|
|
||||||
if !ok {
|
|
||||||
log.Sugar.Errorf("未找到对应的track: %d", packet.CodecID)
|
|
||||||
return nil, 0, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
dts := packet.ConvertDts(90000)
|
dts := packet.ConvertDts(90000)
|
||||||
pts := packet.ConvertPts(90000)
|
pts := packet.ConvertPts(90000)
|
||||||
|
|
||||||
data := packet.Data
|
data := packet.Data
|
||||||
if utils.AVMediaTypeVideo == packet.MediaType {
|
if utils.AVMediaTypeVideo == packet.MediaType {
|
||||||
data = avformat.AVCCPacket2AnnexB(s.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
data = avformat.AVCCPacket2AnnexB(s.FindTrackWithStreamIndex(packet.Index).Stream, packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 扩容ps buffer
|
// 扩容ps buffer
|
||||||
@@ -67,7 +41,7 @@ func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCo
|
|||||||
s.psBuffer = make([]byte, len(data)*2)
|
s.psBuffer = make([]byte, len(data)*2)
|
||||||
}
|
}
|
||||||
|
|
||||||
n := s.ps.Input(s.psBuffer, trackIndex, packet.Key, data, &pts, &dts)
|
n := s.ps.Input(s.psBuffer, index, packet.Key, data, &pts, &dts)
|
||||||
|
|
||||||
var result []*collections.ReferenceCounter[[]byte]
|
var result []*collections.ReferenceCounter[[]byte]
|
||||||
var rtpBuffer []byte
|
var rtpBuffer []byte
|
||||||
@@ -101,7 +75,6 @@ func NewGBGateway(ssrc uint32) *GBGateway {
|
|||||||
ps: mpeg.NewPsMuxer(),
|
ps: mpeg.NewPsMuxer(),
|
||||||
rtp: rtp.NewMuxer(96, 0, ssrc),
|
rtp: rtp.NewMuxer(96, 0, ssrc),
|
||||||
psBuffer: make([]byte, 1024*1024*2),
|
psBuffer: make([]byte, 1024*1024*2),
|
||||||
tracks: make(map[utils.AVCodecID]int),
|
|
||||||
rtpBuffer: stream.NewRtpBuffer(1024),
|
rtpBuffer: stream.NewRtpBuffer(1024),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -17,7 +17,7 @@ func (s *TalkStream) WriteHeader() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (s *TalkStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
var size int
|
var size int
|
||||||
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
|
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
|
||||||
return s.packet
|
return s.packet
|
||||||
@@ -26,7 +26,7 @@ func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceC
|
|||||||
})
|
})
|
||||||
|
|
||||||
packet = &avformat.AVPacket{Data: s.packet[:size]}
|
packet = &avformat.AVPacket{Data: s.packet[:size]}
|
||||||
return s.RtpStream.Input(packet)
|
return s.RtpStream.Input(packet, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) {
|
func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) {
|
||||||
|
@@ -39,7 +39,7 @@ type TransStream struct {
|
|||||||
PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
|
PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
// 创建一下个切片
|
// 创建一下个切片
|
||||||
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
|
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
|
||||||
var newSegment bool
|
var newSegment bool
|
||||||
@@ -64,7 +64,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
|||||||
dts := packet.ConvertDts(90000)
|
dts := packet.ConvertDts(90000)
|
||||||
data := packet.Data
|
data := packet.Data
|
||||||
if utils.AVMediaTypeVideo == packet.MediaType {
|
if utils.AVMediaTypeVideo == packet.MediaType {
|
||||||
data = avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 写入ts切片
|
// 写入ts切片
|
||||||
@@ -77,7 +77,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
|||||||
}
|
}
|
||||||
|
|
||||||
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
|
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
|
||||||
i += t.muxer.Input(bytes, packet.Index, data[i:], length, dts, pts, packet.Key, i == 0)
|
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
|
||||||
t.ctx.writeBufferSize += mpeg.TsPacketSize
|
t.ctx.writeBufferSize += mpeg.TsPacketSize
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,25 +89,20 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
|||||||
return nil, -1, true, nil
|
return nil, -1, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
var trackIndex int
|
||||||
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||||
data := track.Stream.CodecParameters.AnnexBExtraData()
|
data := track.Stream.CodecParameters.AnnexBExtraData()
|
||||||
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
|
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
|
||||||
} else {
|
} else {
|
||||||
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
|
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
return trackIndex, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) WriteHeader() error {
|
func (t *TransStream) WriteHeader() error {
|
||||||
//if packet.Index >= t.muxer.TrackCount() {
|
|
||||||
// return nil, -1, false, fmt.Errorf("track not available")
|
|
||||||
//}
|
|
||||||
return t.createSegment()
|
return t.createSegment()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -67,7 +67,7 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) {
|
|||||||
h.fos.Write(h.buffer[:n])
|
h.fos.Write(h.buffer[:n])
|
||||||
}
|
}
|
||||||
|
|
||||||
packets, _, _, err := h.gateway.Input(packet)
|
packets, _, _, err := h.gateway.Input(packet, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
19
main.go
19
main.go
@@ -2,11 +2,14 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
flv2 "github.com/lkmio/flv"
|
||||||
"github.com/lkmio/lkm/flv"
|
"github.com/lkmio/lkm/flv"
|
||||||
"github.com/lkmio/lkm/hls"
|
"github.com/lkmio/lkm/hls"
|
||||||
"github.com/lkmio/lkm/jt1078"
|
"github.com/lkmio/lkm/jt1078"
|
||||||
"github.com/lkmio/lkm/record"
|
"github.com/lkmio/lkm/record"
|
||||||
"github.com/lkmio/lkm/rtsp"
|
"github.com/lkmio/lkm/rtsp"
|
||||||
|
"github.com/lkmio/mpeg"
|
||||||
|
"github.com/lkmio/rtp"
|
||||||
"github.com/lkmio/transport"
|
"github.com/lkmio/transport"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -24,14 +27,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory, mpeg.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory, mpeg.SupportedCodecs)
|
||||||
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory)
|
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory, mpeg.SupportedCodecs)
|
||||||
stream.SetRecordStreamFactory(record.NewFLVFileSink)
|
stream.SetRecordStreamFactory(record.NewFLVFileSink)
|
||||||
stream.StreamEndInfoBride = NewStreamEndInfo
|
stream.StreamEndInfoBride = NewStreamEndInfo
|
||||||
|
|
||||||
|
@@ -41,27 +41,11 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
|
|||||||
|
|
||||||
tracks := transStream.GetTracks()
|
tracks := transStream.GetTracks()
|
||||||
for index, track := range tracks {
|
for index, track := range tracks {
|
||||||
var mimeType string
|
|
||||||
var id string
|
var id string
|
||||||
codecId := track.Stream.CodecID
|
codecId := track.Stream.CodecID
|
||||||
if utils.AVCodecIdH264 == codecId {
|
mimeType, ok := SupportedCodecs[codecId]
|
||||||
mimeType = webrtc.MimeTypeH264
|
if !ok {
|
||||||
} else if utils.AVCodecIdH265 == codecId {
|
log.Sugar.Errorf("unsupported codec: %s", codecId)
|
||||||
mimeType = webrtc.MimeTypeH265
|
|
||||||
} else if utils.AVCodecIdAV1 == codecId {
|
|
||||||
mimeType = webrtc.MimeTypeAV1
|
|
||||||
} else if utils.AVCodecIdVP8 == codecId {
|
|
||||||
mimeType = webrtc.MimeTypeVP8
|
|
||||||
} else if utils.AVCodecIdVP9 == codecId {
|
|
||||||
mimeType = webrtc.MimeTypeVP9
|
|
||||||
} else if utils.AVCodecIdOPUS == codecId {
|
|
||||||
mimeType = webrtc.MimeTypeOpus
|
|
||||||
} else if utils.AVCodecIdPCMALAW == codecId {
|
|
||||||
mimeType = webrtc.MimeTypePCMA
|
|
||||||
} else if utils.AVCodecIdPCMMULAW == codecId {
|
|
||||||
mimeType = webrtc.MimeTypePCMU
|
|
||||||
} else {
|
|
||||||
log.Sugar.Errorf("codec %s not compatible with webrtc", codecId)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,7 +55,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
|
|||||||
id = "video"
|
id = "video"
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
|
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType.(string)}, id, "pion")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -12,19 +12,31 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
webrtcApi *webrtc.API
|
webrtcApi *webrtc.API
|
||||||
|
|
||||||
|
SupportedCodecs = map[utils.AVCodecID]interface{}{
|
||||||
|
utils.AVCodecIdH264: webrtc.MimeTypeH264,
|
||||||
|
utils.AVCodecIdH265: webrtc.MimeTypeH265,
|
||||||
|
utils.AVCodecIdAV1: webrtc.MimeTypeAV1,
|
||||||
|
utils.AVCodecIdVP8: webrtc.MimeTypeVP8,
|
||||||
|
utils.AVCodecIdVP9: webrtc.MimeTypeVP9,
|
||||||
|
utils.AVCodecIdOPUS: webrtc.MimeTypeOpus,
|
||||||
|
utils.AVCodecIdPCMALAW: webrtc.MimeTypePCMA,
|
||||||
|
utils.AVCodecIdPCMMULAW: webrtc.MimeTypePCMU,
|
||||||
|
utils.AVCodecIdADPCMG722: webrtc.MimeTypeG722,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
type transStream struct {
|
type transStream struct {
|
||||||
stream.BaseTransStream
|
stream.BaseTransStream
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
t.ClearOutStreamBuffer()
|
t.ClearOutStreamBuffer()
|
||||||
|
|
||||||
if utils.AVMediaTypeAudio == packet.MediaType {
|
if utils.AVMediaTypeAudio == packet.MediaType {
|
||||||
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data))
|
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data))
|
||||||
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
||||||
avStream := t.BaseTransStream.Tracks[packet.Index].Stream
|
avStream := t.FindTrackWithStreamIndex(packet.Index).Stream
|
||||||
if packet.Key {
|
if packet.Key {
|
||||||
extra := avStream.CodecParameters.AnnexBExtraData()
|
extra := avStream.CodecParameters.AnnexBExtraData()
|
||||||
t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra))
|
t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra))
|
||||||
|
@@ -21,7 +21,7 @@ type transStream struct {
|
|||||||
metaData *amf0.Object // 推流方携带的元数据
|
metaData *amf0.Object // 推流方携带的元数据
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
t.ClearOutStreamBuffer()
|
t.ClearOutStreamBuffer()
|
||||||
|
|
||||||
var data []byte
|
var data []byte
|
||||||
|
@@ -29,7 +29,7 @@ type TransStream struct {
|
|||||||
|
|
||||||
RtspTracks []*Track
|
RtspTracks []*Track
|
||||||
//oldTracks []*Track
|
//oldTracks []*Track
|
||||||
oldTracks map[byte]uint16
|
oldTracks map[int]uint16
|
||||||
sdp string
|
sdp string
|
||||||
|
|
||||||
rtpBuffer *stream.RtpBuffer
|
rtpBuffer *stream.RtpBuffer
|
||||||
@@ -41,18 +41,18 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
|
|||||||
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
|
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
var ts uint32
|
var ts uint32
|
||||||
var result []*collections.ReferenceCounter[[]byte]
|
var result []*collections.ReferenceCounter[[]byte]
|
||||||
track := t.RtspTracks[packet.Index]
|
track := t.RtspTracks[trackIndex]
|
||||||
if utils.AVMediaTypeAudio == packet.MediaType {
|
if utils.AVMediaTypeAudio == packet.MediaType {
|
||||||
ts = uint32(packet.ConvertPts(track.Rate))
|
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
|
||||||
result = t.PackRtpPayload(track, packet.Index, packet.Data, ts)
|
result = t.PackRtpPayload(track, trackIndex, packet.Data, ts)
|
||||||
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
||||||
ts = uint32(packet.ConvertPts(track.Rate))
|
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
|
||||||
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
|
||||||
data := avc.RemoveStartCode(annexBData)
|
data := avc.RemoveStartCode(annexBData)
|
||||||
result = t.PackRtpPayload(track, packet.Index, data, ts)
|
result = t.PackRtpPayload(track, trackIndex, data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
|
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
|
||||||
@@ -110,36 +110,24 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadType, ok := rtp.CodecIdPayloads[track.Stream.CodecID]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 恢复上次拉流的序号
|
// 恢复上次拉流的序号
|
||||||
var startSeq uint16
|
var startSeq uint16
|
||||||
if t.oldTracks != nil {
|
if t.oldTracks != nil {
|
||||||
startSeq, ok = t.oldTracks[byte(payloadType.Pt)]
|
var ok bool
|
||||||
|
startSeq, ok = t.oldTracks[int(track.Stream.CodecID)]
|
||||||
utils.Assert(ok)
|
utils.Assert(ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建RTP封装器
|
// 查找RTP封装器和PayloadType
|
||||||
var muxer rtp.Muxer
|
newMuxerFunc := rtp.SupportedCodecs[track.Stream.CodecID]
|
||||||
if utils.AVCodecIdH264 == track.Stream.CodecID {
|
utils.Assert(newMuxerFunc != nil)
|
||||||
muxer = rtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
|
||||||
} else if utils.AVCodecIdH265 == track.Stream.CodecID {
|
|
||||||
muxer = rtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
|
||||||
} else if utils.AVCodecIdAAC == track.Stream.CodecID {
|
|
||||||
muxer = rtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
|
||||||
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
|
|
||||||
muxer = rtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
|
||||||
}
|
|
||||||
|
|
||||||
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType)
|
// 创建RTP封装器
|
||||||
|
muxer, payload := newMuxerFunc.(func(seq int, ssrc uint32) (rtp.Muxer, rtp.PayloadType))(int(startSeq), 0xFFFFFFFF)
|
||||||
|
|
||||||
|
// 创建track
|
||||||
|
rtspTrack := NewRTSPTrack(muxer, payload, track.Stream.MediaType, track.Stream.CodecID)
|
||||||
t.RtspTracks = append(t.RtspTracks, rtspTrack)
|
t.RtspTracks = append(t.RtspTracks, rtspTrack)
|
||||||
trackIndex := len(t.RtspTracks) - 1
|
trackIndex := len(t.RtspTracks) - 1
|
||||||
|
|
||||||
@@ -170,7 +158,7 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
|
|||||||
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
|
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return trackIndex, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
|
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
|
||||||
@@ -206,8 +194,7 @@ func (t *TransStream) WriteHeader() error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, track := range t.Tracks {
|
for i, track := range t.RtspTracks {
|
||||||
payloadType, _ := rtp.CodecIdPayloads[track.Stream.CodecID]
|
|
||||||
mediaDescription := sdp.MediaDescription{
|
mediaDescription := sdp.MediaDescription{
|
||||||
ConnectionInformation: &sdp.ConnectionInformation{
|
ConnectionInformation: &sdp.ConnectionInformation{
|
||||||
NetworkType: "IN",
|
NetworkType: "IN",
|
||||||
@@ -218,17 +205,17 @@ func (t *TransStream) WriteHeader() error {
|
|||||||
Attributes: []sdp.Attribute{
|
Attributes: []sdp.Attribute{
|
||||||
sdp.NewAttribute("recvonly", ""),
|
sdp.NewAttribute("recvonly", ""),
|
||||||
sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
|
sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
|
||||||
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", payloadType.Pt, payloadType.Encoding, payloadType.ClockRate), ""),
|
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", track.payload.Pt, track.payload.Encoding, track.payload.ClockRate), ""),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
|
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
|
||||||
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)}
|
mediaDescription.MediaName.Formats = []string{strconv.Itoa(track.payload.Pt)}
|
||||||
|
|
||||||
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
if utils.AVMediaTypeAudio == track.MediaType {
|
||||||
mediaDescription.MediaName.Media = "audio"
|
mediaDescription.MediaName.Media = "audio"
|
||||||
|
|
||||||
if utils.AVCodecIdAAC == track.Stream.CodecID {
|
if utils.AVCodecIdAAC == track.CodecID {
|
||||||
//[14496-3], [RFC6416] profile-level-id:
|
//[14496-3], [RFC6416] profile-level-id:
|
||||||
//1 : Main Audio Profile Level 1
|
//1 : Main Audio Profile Level 1
|
||||||
//9 : Speech Audio Profile Level 1
|
//9 : Speech Audio Profile Level 1
|
||||||
@@ -267,7 +254,7 @@ func (t *TransStream) WriteHeader() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream {
|
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) stream.TransStream {
|
||||||
t := &TransStream{
|
t := &TransStream{
|
||||||
addr: addr,
|
addr: addr,
|
||||||
urlFormat: urlFormat,
|
urlFormat: urlFormat,
|
||||||
@@ -286,7 +273,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16
|
|||||||
|
|
||||||
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
|
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
|
||||||
trackFormat := "?track=%d"
|
trackFormat := "?track=%d"
|
||||||
var oldTracks map[byte]uint16
|
var oldTracks map[int]uint16
|
||||||
if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
|
if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
|
||||||
oldTracks = endInfo.RtspTracks
|
oldTracks = endInfo.RtspTracks
|
||||||
}
|
}
|
||||||
|
@@ -8,11 +8,11 @@ import (
|
|||||||
|
|
||||||
// Track rtsp每路输出流的封装
|
// Track rtsp每路输出流的封装
|
||||||
type Track struct {
|
type Track struct {
|
||||||
PT byte
|
payload rtp.PayloadType
|
||||||
Rate int
|
|
||||||
MediaType utils.AVMediaType
|
MediaType utils.AVMediaType
|
||||||
StartSeq uint16
|
StartSeq uint16
|
||||||
EndSeq uint16
|
EndSeq uint16
|
||||||
|
CodecID utils.AVCodecID
|
||||||
|
|
||||||
Muxer rtp.Muxer
|
Muxer rtp.Muxer
|
||||||
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
|
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
|
||||||
@@ -21,12 +21,12 @@ type Track struct {
|
|||||||
func (r *Track) Close() {
|
func (r *Track) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRTSPTrack(muxer rtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
|
func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track {
|
||||||
stream := &Track{
|
stream := &Track{
|
||||||
PT: pt,
|
payload: payload,
|
||||||
Rate: rate,
|
|
||||||
Muxer: muxer,
|
Muxer: muxer,
|
||||||
MediaType: mediaType,
|
MediaType: mediaType,
|
||||||
|
CodecID: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
return stream
|
return stream
|
||||||
|
@@ -12,11 +12,7 @@ type RtpStream struct {
|
|||||||
rtpBuffer *RtpBuffer
|
rtpBuffer *RtpBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *RtpStream) WriteHeader() error {
|
func (f *RtpStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
|
||||||
size := 2 + uint16(len(packet.Data))
|
size := 2 + uint16(len(packet.Data))
|
||||||
if size > UDPReceiveBufferSize {
|
if size > UDPReceiveBufferSize {
|
||||||
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)
|
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)
|
||||||
|
@@ -21,7 +21,7 @@ type StreamEndInfo struct {
|
|||||||
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
|
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
|
||||||
M3U8Writer M3U8Writer // 保存M3U8生成器
|
M3U8Writer M3U8Writer // 保存M3U8生成器
|
||||||
PlaylistFormat *string // M3U8播放列表
|
PlaylistFormat *string // M3U8播放列表
|
||||||
RtspTracks map[byte]uint16 // rtsp每路track的结束序号
|
RtspTracks map[int]uint16 // rtsp每路track的结束序号
|
||||||
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
|
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,6 +2,7 @@ package stream
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/lkmio/avformat/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error)
|
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error)
|
||||||
@@ -17,12 +18,15 @@ func init() {
|
|||||||
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
|
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
|
||||||
}
|
}
|
||||||
|
|
||||||
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) {
|
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory, supportedCodes map[utils.AVCodecID]interface{}) {
|
||||||
_, ok := transStreamFactories[protocol]
|
_, ok := transStreamFactories[protocol]
|
||||||
if ok {
|
if ok {
|
||||||
panic(fmt.Sprintf("%s has been registered", protocol.String()))
|
panic(fmt.Sprintf("%s has been registered", protocol.String()))
|
||||||
|
} else if _, ok = SupportedCodes[protocol]; ok {
|
||||||
|
panic(fmt.Sprintf("%s has been registered", protocol.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SupportedCodes[protocol] = supportedCodes
|
||||||
transStreamFactories[protocol] = streamFunc
|
transStreamFactories[protocol] = streamFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/lkmio/avformat"
|
"github.com/lkmio/avformat"
|
||||||
"github.com/lkmio/avformat/collections"
|
"github.com/lkmio/avformat/collections"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
@@ -190,7 +191,8 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
|
|||||||
id := GenerateTransStreamID(TransStreamHls, streams...)
|
id := GenerateTransStreamID(TransStreamHls, streams...)
|
||||||
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
|
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t.DispatchGOPBuffer(hlsStream)
|
t.DispatchGOPBuffer(hlsStream)
|
||||||
@@ -199,14 +201,6 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool {
|
|
||||||
if TransStreamRtmp == protocol || TransStreamFlv == protocol {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) {
|
func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) {
|
||||||
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source)
|
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source)
|
||||||
|
|
||||||
@@ -214,16 +208,35 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
|
|||||||
utils.Assert(source != nil)
|
utils.Assert(source != nil)
|
||||||
transStream, err := CreateTransStream(source, protocol, tracks, sink)
|
transStream, err := CreateTransStream(source, protocol, tracks, sink)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), t.source)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, track := range tracks {
|
for _, track := range tracks {
|
||||||
|
supportedCodecs, ok := SupportedCodes[protocol]
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("unknown protocol %s", protocol.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok = supportedCodecs[track.Stream.CodecID]
|
||||||
|
if !ok {
|
||||||
|
log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var index int
|
||||||
// 重新拷贝一个track,传输流内部使用track的时间戳,
|
// 重新拷贝一个track,传输流内部使用track的时间戳,
|
||||||
newTrack := *track
|
newTrack := *track
|
||||||
if err = transStream.AddTrack(&newTrack); err != nil {
|
if index, err = transStream.AddTrack(&newTrack); err != nil {
|
||||||
return nil, err
|
log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stream index->muxer track index
|
||||||
|
transStream.SetMuxerTrack(index, &newTrack)
|
||||||
|
}
|
||||||
|
|
||||||
|
if transStream.TrackSize() == 0 {
|
||||||
|
return nil, fmt.Errorf("not found track")
|
||||||
}
|
}
|
||||||
|
|
||||||
transStream.SetID(id)
|
transStream.SetID(id)
|
||||||
@@ -238,7 +251,7 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
|
|||||||
t.forwardTransStream = transStream
|
t.forwardTransStream = transStream
|
||||||
}
|
}
|
||||||
|
|
||||||
return transStream, err
|
return transStream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
|
func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
|
||||||
@@ -251,12 +264,17 @@ func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
|
|||||||
|
|
||||||
// DispatchPacket 分发AVPacket
|
// DispatchPacket 分发AVPacket
|
||||||
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
|
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
|
||||||
data, timestamp, videoKey, err := transStream.Input(packet)
|
trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
data, timestamp, videoKey, err := transStream.Input(packet, trackIndex)
|
||||||
if err != nil || len(data) < 1 {
|
if err != nil || len(data) < 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey)
|
t.DispatchBuffer(transStream, trackIndex, data, timestamp, videoKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DispatchBuffer 分发传输流
|
// DispatchBuffer 分发传输流
|
||||||
@@ -321,7 +339,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 不支持对期望编码的流封装. 降级
|
// 不支持对期望编码的流封装. 降级
|
||||||
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) {
|
if utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId {
|
||||||
audioCodecId = utils.AVCodecIdNONE
|
audioCodecId = utils.AVCodecIdNONE
|
||||||
videoCodecId = utils.AVCodecIdNONE
|
videoCodecId = utils.AVCodecIdNONE
|
||||||
}
|
}
|
||||||
@@ -370,14 +388,14 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
|||||||
|
|
||||||
{
|
{
|
||||||
sink.Lock()
|
sink.Lock()
|
||||||
defer sink.UnLock()
|
|
||||||
|
|
||||||
if SessionStateClosed == sink.GetState() {
|
if SessionStateClosed == sink.GetState() {
|
||||||
|
sink.UnLock()
|
||||||
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
|
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
sink.SetState(SessionStateTransferring)
|
sink.SetState(SessionStateTransferring)
|
||||||
}
|
}
|
||||||
|
sink.UnLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sink.StartStreaming(transStream)
|
err := sink.StartStreaming(transStream)
|
||||||
@@ -408,13 +426,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
|||||||
|
|
||||||
// 发送已有的缓存数据
|
// 发送已有的缓存数据
|
||||||
// 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
|
// 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
|
||||||
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
||||||
if len(data) > 0 {
|
if len(keyBuffer) > 0 {
|
||||||
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
|
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
|
||||||
t.write(sink, 0, extraData, timestamp, false)
|
t.write(sink, 0, extraData, timestamp, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.write(sink, 0, data, timestamp, true)
|
t.write(sink, 0, keyBuffer, timestamp, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 新建传输流,发送已经缓存的音视频帧
|
// 新建传输流,发送已经缓存的音视频帧
|
||||||
|
@@ -6,6 +6,10 @@ import (
|
|||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
SupportedCodes = map[TransStreamProtocol]map[utils.AVCodecID]interface{}{}
|
||||||
|
)
|
||||||
|
|
||||||
// TransStream 将AVPacket封装成传输流
|
// TransStream 将AVPacket封装成传输流
|
||||||
type TransStream interface {
|
type TransStream interface {
|
||||||
GetID() TransStreamID
|
GetID() TransStreamID
|
||||||
@@ -13,14 +17,20 @@ type TransStream interface {
|
|||||||
SetID(id TransStreamID)
|
SetID(id TransStreamID)
|
||||||
|
|
||||||
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
|
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
|
||||||
Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
|
Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
|
||||||
|
|
||||||
AddTrack(track *Track) error
|
AddTrack(track *Track) (int, error)
|
||||||
|
|
||||||
|
SetMuxerTrack(muxerIndex int, track *Track)
|
||||||
|
|
||||||
|
FindMuxerTrackIndex(streamIndex int) (int, bool)
|
||||||
|
|
||||||
TrackSize() int
|
TrackSize() int
|
||||||
|
|
||||||
GetTracks() []*Track
|
GetTracks() []*Track
|
||||||
|
|
||||||
|
FindTrackWithStreamIndex(streamIndex int) *Track
|
||||||
|
|
||||||
// WriteHeader track添加完毕, 通过调用此函数告知
|
// WriteHeader track添加完毕, 通过调用此函数告知
|
||||||
WriteHeader() error
|
WriteHeader() error
|
||||||
|
|
||||||
@@ -48,6 +58,7 @@ type TransStream interface {
|
|||||||
type BaseTransStream struct {
|
type BaseTransStream struct {
|
||||||
ID TransStreamID
|
ID TransStreamID
|
||||||
Tracks []*Track
|
Tracks []*Track
|
||||||
|
MuxerIndex map[int]int // stream index->muxer track index
|
||||||
Completed bool
|
Completed bool
|
||||||
ExistVideo bool
|
ExistVideo bool
|
||||||
Protocol TransStreamProtocol
|
Protocol TransStreamProtocol
|
||||||
@@ -64,15 +75,47 @@ func (t *BaseTransStream) SetID(id TransStreamID) {
|
|||||||
t.ID = id
|
t.ID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
func (t *BaseTransStream) WriteHeader() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BaseTransStream) Input(trackIndex, packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||||
return nil, -1, false, nil
|
return nil, -1, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTransStream) AddTrack(track *Track) error {
|
func (t *BaseTransStream) AddTrack(track *Track) (int, error) {
|
||||||
|
return len(t.Tracks), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) {
|
||||||
t.Tracks = append(t.Tracks, track)
|
t.Tracks = append(t.Tracks, track)
|
||||||
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||||
t.ExistVideo = true
|
t.ExistVideo = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if t.MuxerIndex == nil {
|
||||||
|
t.MuxerIndex = make(map[int]int)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果muxerIndex为-1, 意味着复用器封装流时并不需要指定track index, 比如flv.
|
||||||
|
if muxerIndex > -1 {
|
||||||
|
t.MuxerIndex[track.Stream.Index] = muxerIndex
|
||||||
|
} else {
|
||||||
|
t.MuxerIndex[track.Stream.Index] = len(t.Tracks) - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BaseTransStream) FindMuxerTrackIndex(streamIndex int) (int, bool) {
|
||||||
|
index, ok := t.MuxerIndex[streamIndex]
|
||||||
|
return index, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track {
|
||||||
|
for _, track := range t.Tracks {
|
||||||
|
if track.Stream.Index == streamIndex {
|
||||||
|
return track
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -21,11 +21,12 @@ func init() {
|
|||||||
int(utils.AVCodecIdVP8): 0x5,
|
int(utils.AVCodecIdVP8): 0x5,
|
||||||
int(utils.AVCodecIdVP9): 0x6,
|
int(utils.AVCodecIdVP9): 0x6,
|
||||||
|
|
||||||
int(utils.AVCodecIdAAC): 101,
|
int(utils.AVCodecIdAAC): 101,
|
||||||
int(utils.AVCodecIdMP3): 102,
|
int(utils.AVCodecIdMP3): 102,
|
||||||
int(utils.AVCodecIdOPUS): 103,
|
int(utils.AVCodecIdOPUS): 103,
|
||||||
int(utils.AVCodecIdPCMALAW): 104,
|
int(utils.AVCodecIdPCMALAW): 104,
|
||||||
int(utils.AVCodecIdPCMMULAW): 105,
|
int(utils.AVCodecIdPCMMULAW): 105,
|
||||||
|
int(utils.AVCodecIdADPCMG722): 106,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -58,7 +59,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
|
|||||||
|
|
||||||
for i, track := range tracks {
|
for i, track := range tracks {
|
||||||
id, ok := narrowCodecIds[int(track.Stream.CodecID)]
|
id, ok := narrowCodecIds[int(track.Stream.CodecID)]
|
||||||
utils.Assert(ok)
|
if ok {
|
||||||
|
id = byte(track.Stream.CodecID)
|
||||||
|
}
|
||||||
|
|
||||||
streamId |= uint64(id) << (48 - i*8)
|
streamId |= uint64(id) << (48 - i*8)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user