mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
Compare commits
3 Commits
059bc20018
...
7155b866c2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7155b866c2 | ||
![]() |
1e982fce5f | ||
![]() |
93ea8f2fc2 |
@@ -37,9 +37,9 @@ func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.
|
||||
}
|
||||
} else if stream.TransStreamRtsp == transStream.GetProtocol() {
|
||||
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
|
||||
info.RtspTracks = make(map[byte]uint16, len(tracks))
|
||||
info.RtspTracks = make(map[int]uint16, len(tracks))
|
||||
for _, track := range rtsp.RtspTracks {
|
||||
info.RtspTracks[track.PT] = track.EndSeq
|
||||
info.RtspTracks[int(track.CodecID)] = track.EndSeq
|
||||
}
|
||||
}
|
||||
} else if stream.TransStreamFlv == transStream.GetProtocol() {
|
||||
|
@@ -18,7 +18,7 @@ type TransStream struct {
|
||||
flvExtraDataBlock []byte // metadata和sequence header
|
||||
}
|
||||
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
t.ClearOutStreamBuffer()
|
||||
|
||||
var flvTagSize int
|
||||
@@ -94,20 +94,19 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
||||
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||
var index int
|
||||
var err error
|
||||
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
||||
t.Muxer.AddAudioTrack(track.Stream)
|
||||
index, err = t.Muxer.AddAudioTrack(track.Stream)
|
||||
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||
t.Muxer.AddVideoTrack(track.Stream)
|
||||
index, err = t.Muxer.AddVideoTrack(track.Stream)
|
||||
|
||||
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
|
||||
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
|
||||
}
|
||||
return nil
|
||||
|
||||
return index, err
|
||||
}
|
||||
|
||||
func (t *TransStream) WriteHeader() error {
|
||||
|
@@ -2,11 +2,9 @@ package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"github.com/lkmio/lkm/stream"
|
||||
"github.com/lkmio/mpeg"
|
||||
"github.com/lkmio/rtp"
|
||||
@@ -17,49 +15,25 @@ type GBGateway struct {
|
||||
ps *mpeg.PSMuxer
|
||||
rtp rtp.Muxer
|
||||
psBuffer []byte
|
||||
tracks map[utils.AVCodecID]int // codec->track index
|
||||
rtpBuffer *stream.RtpBuffer
|
||||
}
|
||||
|
||||
func (s *GBGateway) WriteHeader() error {
|
||||
if len(s.tracks) == 0 {
|
||||
return fmt.Errorf("no tracks available")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GBGateway) AddTrack(track *stream.Track) error {
|
||||
s.BaseTransStream.AddTrack(track)
|
||||
|
||||
if utils.AVCodecIdH264 == track.Stream.CodecID || utils.AVCodecIdH265 == track.Stream.CodecID || utils.AVCodecIdAAC == track.Stream.CodecID || utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
|
||||
} else {
|
||||
log.Sugar.Errorf("不支持的编码格式: %d", track.Stream.CodecID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GBGateway) AddTrack(track *stream.Track) (int, error) {
|
||||
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
|
||||
if err != nil {
|
||||
log.Sugar.Error("添加%s到ps muxer失败", track.Stream.CodecID)
|
||||
return nil
|
||||
return -1, err
|
||||
}
|
||||
|
||||
s.tracks[track.Stream.CodecID] = index
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
trackIndex, ok := s.tracks[packet.CodecID]
|
||||
if !ok {
|
||||
log.Sugar.Errorf("未找到对应的track: %d", packet.CodecID)
|
||||
return nil, 0, false, nil
|
||||
return index, nil
|
||||
}
|
||||
|
||||
func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
dts := packet.ConvertDts(90000)
|
||||
pts := packet.ConvertPts(90000)
|
||||
|
||||
data := packet.Data
|
||||
if utils.AVMediaTypeVideo == packet.MediaType {
|
||||
data = avformat.AVCCPacket2AnnexB(s.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
||||
data = avformat.AVCCPacket2AnnexB(s.FindTrackWithStreamIndex(packet.Index).Stream, packet)
|
||||
}
|
||||
|
||||
// 扩容ps buffer
|
||||
@@ -67,7 +41,7 @@ func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCo
|
||||
s.psBuffer = make([]byte, len(data)*2)
|
||||
}
|
||||
|
||||
n := s.ps.Input(s.psBuffer, trackIndex, packet.Key, data, &pts, &dts)
|
||||
n := s.ps.Input(s.psBuffer, index, packet.Key, data, &pts, &dts)
|
||||
|
||||
var result []*collections.ReferenceCounter[[]byte]
|
||||
var rtpBuffer []byte
|
||||
@@ -101,7 +75,6 @@ func NewGBGateway(ssrc uint32) *GBGateway {
|
||||
ps: mpeg.NewPsMuxer(),
|
||||
rtp: rtp.NewMuxer(96, 0, ssrc),
|
||||
psBuffer: make([]byte, 1024*1024*2),
|
||||
tracks: make(map[utils.AVCodecID]int),
|
||||
rtpBuffer: stream.NewRtpBuffer(1024),
|
||||
}
|
||||
}
|
||||
|
@@ -17,7 +17,7 @@ func (s *TalkStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (s *TalkStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
var size int
|
||||
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
|
||||
return s.packet
|
||||
@@ -26,7 +26,7 @@ func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceC
|
||||
})
|
||||
|
||||
packet = &avformat.AVPacket{Data: s.packet[:size]}
|
||||
return s.RtpStream.Input(packet)
|
||||
return s.RtpStream.Input(packet, index)
|
||||
}
|
||||
|
||||
func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) {
|
||||
|
@@ -39,7 +39,7 @@ type TransStream struct {
|
||||
PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
|
||||
}
|
||||
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
// 创建一下个切片
|
||||
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
|
||||
var newSegment bool
|
||||
@@ -64,7 +64,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
||||
dts := packet.ConvertDts(90000)
|
||||
data := packet.Data
|
||||
if utils.AVMediaTypeVideo == packet.MediaType {
|
||||
data = avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
||||
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
|
||||
}
|
||||
|
||||
// 写入ts切片
|
||||
@@ -77,7 +77,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
||||
}
|
||||
|
||||
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
|
||||
i += t.muxer.Input(bytes, packet.Index, data[i:], length, dts, pts, packet.Key, i == 0)
|
||||
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
|
||||
t.ctx.writeBufferSize += mpeg.TsPacketSize
|
||||
}
|
||||
|
||||
@@ -89,25 +89,20 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
|
||||
return nil, -1, true, nil
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||
var err error
|
||||
var trackIndex int
|
||||
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||
data := track.Stream.CodecParameters.AnnexBExtraData()
|
||||
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
|
||||
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
|
||||
} else {
|
||||
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
|
||||
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
|
||||
}
|
||||
return err
|
||||
|
||||
return trackIndex, err
|
||||
}
|
||||
|
||||
func (t *TransStream) WriteHeader() error {
|
||||
//if packet.Index >= t.muxer.TrackCount() {
|
||||
// return nil, -1, false, fmt.Errorf("track not available")
|
||||
//}
|
||||
return t.createSegment()
|
||||
}
|
||||
|
||||
|
@@ -67,7 +67,7 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) {
|
||||
h.fos.Write(h.buffer[:n])
|
||||
}
|
||||
|
||||
packets, _, _, err := h.gateway.Input(packet)
|
||||
packets, _, _, err := h.gateway.Input(packet, i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
19
main.go
19
main.go
@@ -2,11 +2,14 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
flv2 "github.com/lkmio/flv"
|
||||
"github.com/lkmio/lkm/flv"
|
||||
"github.com/lkmio/lkm/hls"
|
||||
"github.com/lkmio/lkm/jt1078"
|
||||
"github.com/lkmio/lkm/record"
|
||||
"github.com/lkmio/lkm/rtsp"
|
||||
"github.com/lkmio/mpeg"
|
||||
"github.com/lkmio/rtp"
|
||||
"github.com/lkmio/transport"
|
||||
"os"
|
||||
"time"
|
||||
@@ -24,14 +27,14 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory, mpeg.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory, mpeg.SupportedCodecs)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory, mpeg.SupportedCodecs)
|
||||
stream.SetRecordStreamFactory(record.NewFLVFileSink)
|
||||
stream.StreamEndInfoBride = NewStreamEndInfo
|
||||
|
||||
|
@@ -41,27 +41,11 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
|
||||
|
||||
tracks := transStream.GetTracks()
|
||||
for index, track := range tracks {
|
||||
var mimeType string
|
||||
var id string
|
||||
codecId := track.Stream.CodecID
|
||||
if utils.AVCodecIdH264 == codecId {
|
||||
mimeType = webrtc.MimeTypeH264
|
||||
} else if utils.AVCodecIdH265 == codecId {
|
||||
mimeType = webrtc.MimeTypeH265
|
||||
} else if utils.AVCodecIdAV1 == codecId {
|
||||
mimeType = webrtc.MimeTypeAV1
|
||||
} else if utils.AVCodecIdVP8 == codecId {
|
||||
mimeType = webrtc.MimeTypeVP8
|
||||
} else if utils.AVCodecIdVP9 == codecId {
|
||||
mimeType = webrtc.MimeTypeVP9
|
||||
} else if utils.AVCodecIdOPUS == codecId {
|
||||
mimeType = webrtc.MimeTypeOpus
|
||||
} else if utils.AVCodecIdPCMALAW == codecId {
|
||||
mimeType = webrtc.MimeTypePCMA
|
||||
} else if utils.AVCodecIdPCMMULAW == codecId {
|
||||
mimeType = webrtc.MimeTypePCMU
|
||||
} else {
|
||||
log.Sugar.Errorf("codec %s not compatible with webrtc", codecId)
|
||||
mimeType, ok := SupportedCodecs[codecId]
|
||||
if !ok {
|
||||
log.Sugar.Errorf("unsupported codec: %s", codecId)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -71,7 +55,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
|
||||
id = "video"
|
||||
}
|
||||
|
||||
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
|
||||
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType.(string)}, id, "pion")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -12,19 +12,31 @@ import (
|
||||
|
||||
var (
|
||||
webrtcApi *webrtc.API
|
||||
|
||||
SupportedCodecs = map[utils.AVCodecID]interface{}{
|
||||
utils.AVCodecIdH264: webrtc.MimeTypeH264,
|
||||
utils.AVCodecIdH265: webrtc.MimeTypeH265,
|
||||
utils.AVCodecIdAV1: webrtc.MimeTypeAV1,
|
||||
utils.AVCodecIdVP8: webrtc.MimeTypeVP8,
|
||||
utils.AVCodecIdVP9: webrtc.MimeTypeVP9,
|
||||
utils.AVCodecIdOPUS: webrtc.MimeTypeOpus,
|
||||
utils.AVCodecIdPCMALAW: webrtc.MimeTypePCMA,
|
||||
utils.AVCodecIdPCMMULAW: webrtc.MimeTypePCMU,
|
||||
utils.AVCodecIdADPCMG722: webrtc.MimeTypeG722,
|
||||
}
|
||||
)
|
||||
|
||||
type transStream struct {
|
||||
stream.BaseTransStream
|
||||
}
|
||||
|
||||
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
t.ClearOutStreamBuffer()
|
||||
|
||||
if utils.AVMediaTypeAudio == packet.MediaType {
|
||||
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data))
|
||||
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
||||
avStream := t.BaseTransStream.Tracks[packet.Index].Stream
|
||||
avStream := t.FindTrackWithStreamIndex(packet.Index).Stream
|
||||
if packet.Key {
|
||||
extra := avStream.CodecParameters.AnnexBExtraData()
|
||||
t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra))
|
||||
|
@@ -21,7 +21,7 @@ type transStream struct {
|
||||
metaData *amf0.Object // 推流方携带的元数据
|
||||
}
|
||||
|
||||
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
t.ClearOutStreamBuffer()
|
||||
|
||||
var data []byte
|
||||
|
@@ -29,7 +29,7 @@ type TransStream struct {
|
||||
|
||||
RtspTracks []*Track
|
||||
//oldTracks []*Track
|
||||
oldTracks map[byte]uint16
|
||||
oldTracks map[int]uint16
|
||||
sdp string
|
||||
|
||||
rtpBuffer *stream.RtpBuffer
|
||||
@@ -41,18 +41,18 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
|
||||
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
|
||||
}
|
||||
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
var ts uint32
|
||||
var result []*collections.ReferenceCounter[[]byte]
|
||||
track := t.RtspTracks[packet.Index]
|
||||
track := t.RtspTracks[trackIndex]
|
||||
if utils.AVMediaTypeAudio == packet.MediaType {
|
||||
ts = uint32(packet.ConvertPts(track.Rate))
|
||||
result = t.PackRtpPayload(track, packet.Index, packet.Data, ts)
|
||||
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
|
||||
result = t.PackRtpPayload(track, trackIndex, packet.Data, ts)
|
||||
} else if utils.AVMediaTypeVideo == packet.MediaType {
|
||||
ts = uint32(packet.ConvertPts(track.Rate))
|
||||
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
|
||||
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
|
||||
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
|
||||
data := avc.RemoveStartCode(annexBData)
|
||||
result = t.PackRtpPayload(track, packet.Index, data, ts)
|
||||
result = t.PackRtpPayload(track, trackIndex, data, ts)
|
||||
}
|
||||
|
||||
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
|
||||
@@ -110,36 +110,24 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) error {
|
||||
if err := t.BaseTransStream.AddTrack(track); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
payloadType, ok := rtp.CodecIdPayloads[track.Stream.CodecID]
|
||||
if !ok {
|
||||
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecID)
|
||||
}
|
||||
|
||||
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
||||
// 恢复上次拉流的序号
|
||||
var startSeq uint16
|
||||
if t.oldTracks != nil {
|
||||
startSeq, ok = t.oldTracks[byte(payloadType.Pt)]
|
||||
var ok bool
|
||||
startSeq, ok = t.oldTracks[int(track.Stream.CodecID)]
|
||||
utils.Assert(ok)
|
||||
}
|
||||
|
||||
// 创建RTP封装器
|
||||
var muxer rtp.Muxer
|
||||
if utils.AVCodecIdH264 == track.Stream.CodecID {
|
||||
muxer = rtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
||||
} else if utils.AVCodecIdH265 == track.Stream.CodecID {
|
||||
muxer = rtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
||||
} else if utils.AVCodecIdAAC == track.Stream.CodecID {
|
||||
muxer = rtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
||||
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
|
||||
muxer = rtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
|
||||
}
|
||||
// 查找RTP封装器和PayloadType
|
||||
newMuxerFunc := rtp.SupportedCodecs[track.Stream.CodecID]
|
||||
utils.Assert(newMuxerFunc != nil)
|
||||
|
||||
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType)
|
||||
// 创建RTP封装器
|
||||
muxer, payload := newMuxerFunc.(func(seq int, ssrc uint32) (rtp.Muxer, rtp.PayloadType))(int(startSeq), 0xFFFFFFFF)
|
||||
|
||||
// 创建track
|
||||
rtspTrack := NewRTSPTrack(muxer, payload, track.Stream.MediaType, track.Stream.CodecID)
|
||||
t.RtspTracks = append(t.RtspTracks, rtspTrack)
|
||||
trackIndex := len(t.RtspTracks) - 1
|
||||
|
||||
@@ -170,7 +158,7 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
|
||||
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
|
||||
}
|
||||
|
||||
return nil
|
||||
return trackIndex, nil
|
||||
}
|
||||
|
||||
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
|
||||
@@ -206,8 +194,7 @@ func (t *TransStream) WriteHeader() error {
|
||||
},
|
||||
}
|
||||
|
||||
for i, track := range t.Tracks {
|
||||
payloadType, _ := rtp.CodecIdPayloads[track.Stream.CodecID]
|
||||
for i, track := range t.RtspTracks {
|
||||
mediaDescription := sdp.MediaDescription{
|
||||
ConnectionInformation: &sdp.ConnectionInformation{
|
||||
NetworkType: "IN",
|
||||
@@ -218,17 +205,17 @@ func (t *TransStream) WriteHeader() error {
|
||||
Attributes: []sdp.Attribute{
|
||||
sdp.NewAttribute("recvonly", ""),
|
||||
sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
|
||||
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", payloadType.Pt, payloadType.Encoding, payloadType.ClockRate), ""),
|
||||
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", track.payload.Pt, track.payload.Encoding, track.payload.ClockRate), ""),
|
||||
},
|
||||
}
|
||||
|
||||
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
|
||||
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)}
|
||||
mediaDescription.MediaName.Formats = []string{strconv.Itoa(track.payload.Pt)}
|
||||
|
||||
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
||||
if utils.AVMediaTypeAudio == track.MediaType {
|
||||
mediaDescription.MediaName.Media = "audio"
|
||||
|
||||
if utils.AVCodecIdAAC == track.Stream.CodecID {
|
||||
if utils.AVCodecIdAAC == track.CodecID {
|
||||
//[14496-3], [RFC6416] profile-level-id:
|
||||
//1 : Main Audio Profile Level 1
|
||||
//9 : Speech Audio Profile Level 1
|
||||
@@ -267,7 +254,7 @@ func (t *TransStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream {
|
||||
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) stream.TransStream {
|
||||
t := &TransStream{
|
||||
addr: addr,
|
||||
urlFormat: urlFormat,
|
||||
@@ -286,7 +273,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16
|
||||
|
||||
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
|
||||
trackFormat := "?track=%d"
|
||||
var oldTracks map[byte]uint16
|
||||
var oldTracks map[int]uint16
|
||||
if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
|
||||
oldTracks = endInfo.RtspTracks
|
||||
}
|
||||
|
@@ -8,11 +8,11 @@ import (
|
||||
|
||||
// Track rtsp每路输出流的封装
|
||||
type Track struct {
|
||||
PT byte
|
||||
Rate int
|
||||
payload rtp.PayloadType
|
||||
MediaType utils.AVMediaType
|
||||
StartSeq uint16
|
||||
EndSeq uint16
|
||||
CodecID utils.AVCodecID
|
||||
|
||||
Muxer rtp.Muxer
|
||||
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
|
||||
@@ -21,12 +21,12 @@ type Track struct {
|
||||
func (r *Track) Close() {
|
||||
}
|
||||
|
||||
func NewRTSPTrack(muxer rtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
|
||||
func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track {
|
||||
stream := &Track{
|
||||
PT: pt,
|
||||
Rate: rate,
|
||||
payload: payload,
|
||||
Muxer: muxer,
|
||||
MediaType: mediaType,
|
||||
CodecID: id,
|
||||
}
|
||||
|
||||
return stream
|
||||
|
@@ -12,11 +12,7 @@ type RtpStream struct {
|
||||
rtpBuffer *RtpBuffer
|
||||
}
|
||||
|
||||
func (f *RtpStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (f *RtpStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
size := 2 + uint16(len(packet.Data))
|
||||
if size > UDPReceiveBufferSize {
|
||||
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)
|
||||
|
@@ -21,7 +21,7 @@ type StreamEndInfo struct {
|
||||
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
|
||||
M3U8Writer M3U8Writer // 保存M3U8生成器
|
||||
PlaylistFormat *string // M3U8播放列表
|
||||
RtspTracks map[byte]uint16 // rtsp每路track的结束序号
|
||||
RtspTracks map[int]uint16 // rtsp每路track的结束序号
|
||||
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
|
||||
}
|
||||
|
||||
|
@@ -2,6 +2,7 @@ package stream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
)
|
||||
|
||||
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error)
|
||||
@@ -17,12 +18,15 @@ func init() {
|
||||
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
|
||||
}
|
||||
|
||||
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) {
|
||||
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory, supportedCodes map[utils.AVCodecID]interface{}) {
|
||||
_, ok := transStreamFactories[protocol]
|
||||
if ok {
|
||||
panic(fmt.Sprintf("%s has been registered", protocol.String()))
|
||||
} else if _, ok = SupportedCodes[protocol]; ok {
|
||||
panic(fmt.Sprintf("%s has been registered", protocol.String()))
|
||||
}
|
||||
|
||||
SupportedCodes[protocol] = supportedCodes
|
||||
transStreamFactories[protocol] = streamFunc
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
@@ -190,7 +191,8 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
|
||||
id := GenerateTransStreamID(TransStreamHls, streams...)
|
||||
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
t.DispatchGOPBuffer(hlsStream)
|
||||
@@ -199,14 +201,6 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
|
||||
}
|
||||
}
|
||||
|
||||
func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool {
|
||||
if TransStreamRtmp == protocol || TransStreamFlv == protocol {
|
||||
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) {
|
||||
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source)
|
||||
|
||||
@@ -214,16 +208,35 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
|
||||
utils.Assert(source != nil)
|
||||
transStream, err := CreateTransStream(source, protocol, tracks, sink)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), t.source)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, track := range tracks {
|
||||
supportedCodecs, ok := SupportedCodes[protocol]
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("unknown protocol %s", protocol.String()))
|
||||
}
|
||||
|
||||
_, ok = supportedCodecs[track.Stream.CodecID]
|
||||
if !ok {
|
||||
log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID)
|
||||
continue
|
||||
}
|
||||
|
||||
var index int
|
||||
// 重新拷贝一个track,传输流内部使用track的时间戳,
|
||||
newTrack := *track
|
||||
if err = transStream.AddTrack(&newTrack); err != nil {
|
||||
return nil, err
|
||||
if index, err = transStream.AddTrack(&newTrack); err != nil {
|
||||
log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID)
|
||||
continue
|
||||
}
|
||||
|
||||
// stream index->muxer track index
|
||||
transStream.SetMuxerTrack(index, &newTrack)
|
||||
}
|
||||
|
||||
if transStream.TrackSize() == 0 {
|
||||
return nil, fmt.Errorf("not found track")
|
||||
}
|
||||
|
||||
transStream.SetID(id)
|
||||
@@ -238,7 +251,7 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
|
||||
t.forwardTransStream = transStream
|
||||
}
|
||||
|
||||
return transStream, err
|
||||
return transStream, nil
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
|
||||
@@ -251,12 +264,17 @@ func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
|
||||
|
||||
// DispatchPacket 分发AVPacket
|
||||
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
|
||||
data, timestamp, videoKey, err := transStream.Input(packet)
|
||||
trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
data, timestamp, videoKey, err := transStream.Input(packet, trackIndex)
|
||||
if err != nil || len(data) < 1 {
|
||||
return
|
||||
}
|
||||
|
||||
t.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey)
|
||||
t.DispatchBuffer(transStream, trackIndex, data, timestamp, videoKey)
|
||||
}
|
||||
|
||||
// DispatchBuffer 分发传输流
|
||||
@@ -321,7 +339,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
||||
}
|
||||
|
||||
// 不支持对期望编码的流封装. 降级
|
||||
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) {
|
||||
if utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId {
|
||||
audioCodecId = utils.AVCodecIdNONE
|
||||
videoCodecId = utils.AVCodecIdNONE
|
||||
}
|
||||
@@ -370,14 +388,14 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
||||
|
||||
{
|
||||
sink.Lock()
|
||||
defer sink.UnLock()
|
||||
|
||||
if SessionStateClosed == sink.GetState() {
|
||||
sink.UnLock()
|
||||
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
|
||||
return false
|
||||
} else {
|
||||
sink.SetState(SessionStateTransferring)
|
||||
}
|
||||
sink.UnLock()
|
||||
}
|
||||
|
||||
err := sink.StartStreaming(transStream)
|
||||
@@ -408,13 +426,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
||||
|
||||
// 发送已有的缓存数据
|
||||
// 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
|
||||
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
||||
if len(data) > 0 {
|
||||
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
||||
if len(keyBuffer) > 0 {
|
||||
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
|
||||
t.write(sink, 0, extraData, timestamp, false)
|
||||
}
|
||||
|
||||
t.write(sink, 0, data, timestamp, true)
|
||||
t.write(sink, 0, keyBuffer, timestamp, true)
|
||||
}
|
||||
|
||||
// 新建传输流,发送已经缓存的音视频帧
|
||||
|
@@ -6,6 +6,10 @@ import (
|
||||
"github.com/lkmio/avformat/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
SupportedCodes = map[TransStreamProtocol]map[utils.AVCodecID]interface{}{}
|
||||
)
|
||||
|
||||
// TransStream 将AVPacket封装成传输流
|
||||
type TransStream interface {
|
||||
GetID() TransStreamID
|
||||
@@ -13,14 +17,20 @@ type TransStream interface {
|
||||
SetID(id TransStreamID)
|
||||
|
||||
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
|
||||
Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
|
||||
Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
|
||||
|
||||
AddTrack(track *Track) error
|
||||
AddTrack(track *Track) (int, error)
|
||||
|
||||
SetMuxerTrack(muxerIndex int, track *Track)
|
||||
|
||||
FindMuxerTrackIndex(streamIndex int) (int, bool)
|
||||
|
||||
TrackSize() int
|
||||
|
||||
GetTracks() []*Track
|
||||
|
||||
FindTrackWithStreamIndex(streamIndex int) *Track
|
||||
|
||||
// WriteHeader track添加完毕, 通过调用此函数告知
|
||||
WriteHeader() error
|
||||
|
||||
@@ -48,6 +58,7 @@ type TransStream interface {
|
||||
type BaseTransStream struct {
|
||||
ID TransStreamID
|
||||
Tracks []*Track
|
||||
MuxerIndex map[int]int // stream index->muxer track index
|
||||
Completed bool
|
||||
ExistVideo bool
|
||||
Protocol TransStreamProtocol
|
||||
@@ -64,15 +75,47 @@ func (t *BaseTransStream) SetID(id TransStreamID) {
|
||||
t.ID = id
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
func (t *BaseTransStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) Input(trackIndex, packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
return nil, -1, false, nil
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) AddTrack(track *Track) error {
|
||||
func (t *BaseTransStream) AddTrack(track *Track) (int, error) {
|
||||
return len(t.Tracks), nil
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) {
|
||||
t.Tracks = append(t.Tracks, track)
|
||||
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
||||
t.ExistVideo = true
|
||||
}
|
||||
|
||||
if t.MuxerIndex == nil {
|
||||
t.MuxerIndex = make(map[int]int)
|
||||
}
|
||||
|
||||
// 如果muxerIndex为-1, 意味着复用器封装流时并不需要指定track index, 比如flv.
|
||||
if muxerIndex > -1 {
|
||||
t.MuxerIndex[track.Stream.Index] = muxerIndex
|
||||
} else {
|
||||
t.MuxerIndex[track.Stream.Index] = len(t.Tracks) - 1
|
||||
}
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) FindMuxerTrackIndex(streamIndex int) (int, bool) {
|
||||
index, ok := t.MuxerIndex[streamIndex]
|
||||
return index, ok
|
||||
}
|
||||
|
||||
func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track {
|
||||
for _, track := range t.Tracks {
|
||||
if track.Stream.Index == streamIndex {
|
||||
return track
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -26,6 +26,7 @@ func init() {
|
||||
int(utils.AVCodecIdOPUS): 103,
|
||||
int(utils.AVCodecIdPCMALAW): 104,
|
||||
int(utils.AVCodecIdPCMMULAW): 105,
|
||||
int(utils.AVCodecIdADPCMG722): 106,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +59,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
|
||||
|
||||
for i, track := range tracks {
|
||||
id, ok := narrowCodecIds[int(track.Stream.CodecID)]
|
||||
utils.Assert(ok)
|
||||
if ok {
|
||||
id = byte(track.Stream.CodecID)
|
||||
}
|
||||
|
||||
streamId |= uint64(id) << (48 - i*8)
|
||||
}
|
||||
|
Reference in New Issue
Block a user