refactor: 拆分avformat依赖库

This commit is contained in:
ydajiang
2025-04-08 09:23:18 +08:00
parent 611812da4c
commit a508ef2838
52 changed files with 1374 additions and 1154 deletions

4
api.go
View File

@@ -43,7 +43,7 @@ func init() {
func filterSourceID(f func(sourceId string, w http.ResponseWriter, req *http.Request), suffix string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
source, err := stream.Path2SourceId(req.URL.Path, suffix)
source, err := stream.Path2SourceID(req.URL.Path, suffix)
if err != nil {
log.Sugar.Errorf("拉流失败 解析流id发生err: %s path: %s", err.Error(), req.URL.Path)
httpResponse(w, http.StatusBadRequest, err.Error())
@@ -398,7 +398,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
var codecs []string
tracks := source.OriginTracks()
for _, track := range tracks {
codecs = append(codecs, track.Stream.CodecId().String())
codecs = append(codecs, track.Stream.CodecID.String())
}
details = append(details, SourceDetails{

View File

@@ -28,7 +28,7 @@ func NewStreamEndInfo(source stream.Source) *stream.StreamEndInfo {
timestamp[0] = track.Dts + int64(track.FrameDuration)
timestamp[1] = track.Pts + int64(track.FrameDuration)
info.Timestamps[track.Stream.CodecId()] = timestamp
info.Timestamps[track.Stream.CodecID] = timestamp
}
for _, transStream := range streams {

View File

@@ -7,9 +7,8 @@ import (
const (
// HttpFlvBlockHeaderSize 在每块http-flv流的头部预留指定大小的数据, 用于描述flv数据块的长度信息
// http-flv是以文件流形式传输http流, 格式如下: length\r\n|flv data\r\n
// http-flv是以文件流形式传输http流, 格式如下: length\r\n|flv data\r\n
// 我们对http-flv-block的封装: |block size[4]|skip count[2]|length\r\n|flv data\r\n
// skip count是因为length长度不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
HttpFlvBlockHeaderSize = 20
)

View File

@@ -1,8 +1,8 @@
package flv
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
)

View File

@@ -2,8 +2,10 @@ package flv
import (
"encoding/binary"
"github.com/lkmio/avformat/libflv"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/flv"
"github.com/lkmio/flv/amf0"
"github.com/lkmio/lkm/rtmp"
"github.com/lkmio/lkm/stream"
)
@@ -11,13 +13,13 @@ import (
type TransStream struct {
stream.TCPTransStream
Muxer libflv.Muxer
Muxer *flv.Muxer
flvHeaderBlock []byte // 单独保存9个字节长的flv头, 只发一次, 后续恢复推流不再发送
flvExtraDataBlock []byte // metadata和sequence header
flvExtraDataTagSize int // 整个flv tag大小
flvExtraDataPreTagSize uint32
}
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var flvTagSize int
@@ -26,17 +28,19 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
var dts int64
var pts int64
var keyBuffer bool
var frameType int
dts = packet.ConvertDts(1000)
pts = packet.ConvertPts(1000)
if utils.AVMediaTypeAudio == packet.MediaType() {
flvTagSize = 17 + len(packet.Data())
data = packet.Data()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
flvTagSize = t.Muxer.ComputeVideoDataSize(uint32(pts-dts)) + libflv.TagHeaderSize + len(packet.AVCCPacketData())
data = packet.AVCCPacketData()
videoKey = packet.KeyFrame()
if utils.AVMediaTypeAudio == packet.MediaType {
data = packet.Data
flvTagSize = flv.TagHeaderSize + t.Muxer.ComputeAudioDataHeaderSize() + len(packet.Data)
} else if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AnnexBPacket2AVCC(packet)
flvTagSize = flv.TagHeaderSize + t.Muxer.ComputeVideoDataHeaderSize(uint32(pts-dts)) + len(data)
if videoKey = packet.Key; videoKey {
frameType = flv.FrameTypeKeyFrame
}
}
// 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
@@ -64,7 +68,7 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
// 分配block
bytes := t.MWBuffer.Allocate(separatorSize+flvTagSize, dts, videoKey)
// 写flv tag
n += t.Muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
n += t.Muxer.Input(bytes[n:], packet.MediaType, len(data), dts, pts, false, frameType)
copy(bytes[n:], data)
// 合并写满再发
@@ -82,43 +86,28 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
return err
}
if utils.AVMediaTypeAudio == track.Stream.Type() {
t.Muxer.AddAudioTrack(track.Stream.CodecId(), 0, 0, 0)
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
t.Muxer.AddVideoTrack(track.Stream.CodecId())
if utils.AVMediaTypeAudio == track.Stream.MediaType {
t.Muxer.AddAudioTrack(track.Stream)
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.Muxer.AddVideoTrack(track.Stream)
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters().Width()))
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters().Height()))
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
}
return nil
}
func (t *TransStream) WriteHeader() error {
var header [4096]byte
var extraDataSize int
size := t.Muxer.WriteHeader(header[:])
tags := header[9:size]
copy(t.flvHeaderBlock[HttpFlvBlockHeaderSize:], header[:9])
copy(t.flvExtraDataBlock[HttpFlvBlockHeaderSize:], header[9:size])
copy(t.flvExtraDataBlock[HttpFlvBlockHeaderSize:], tags)
extraDataSize = HttpFlvBlockHeaderSize + (size - 9)
for _, track := range t.BaseTransStream.Tracks {
var data []byte
if utils.AVMediaTypeAudio == track.Stream.Type() {
data = track.Stream.Extra()
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
data = track.Stream.CodecParameters().MP4ExtraData()
}
t.flvExtraDataPreTagSize = t.Muxer.PrevTagSize()
n := t.Muxer.Input(t.flvExtraDataBlock[extraDataSize:], track.Stream.Type(), len(data), 0, 0, false, true)
extraDataSize += n
copy(t.flvExtraDataBlock[extraDataSize:], data)
extraDataSize += len(data)
t.flvExtraDataTagSize = n - 15 + len(data) + 11
}
// 加上末尾换行符
extraDataSize += 2
t.flvExtraDataBlock = t.flvExtraDataBlock[:extraDataSize]
// +2 加上末尾换行符
t.flvExtraDataBlock = t.flvExtraDataBlock[:HttpFlvBlockHeaderSize+size-9+2]
writeSeparator(t.flvHeaderBlock)
writeSeparator(t.flvExtraDataBlock)
@@ -135,9 +124,9 @@ func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
// 发送当前内存池已有的合并写切片
t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
if t.OutBufferSize < 1 {
// 修改第一个flv tag的pre tag size为sequence header tag size
binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], uint32(t.flvExtraDataTagSize))
if t.OutBufferSize < 1 {
binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], t.flvExtraDataPreTagSize)
}
// 遍历发送合并写切片
@@ -173,9 +162,9 @@ func (t *TransStream) flushSegment() ([]byte, bool) {
return FormatSegment(segment), key
}
func NewHttpTransStream(metadata *libflv.AMF0Object, prevTagSize uint32) stream.TransStream {
func NewHttpTransStream(metadata *amf0.Object, prevTagSize uint32) stream.TransStream {
return &TransStream{
Muxer: libflv.NewMuxerWithPrevTagSize(metadata, prevTagSize),
Muxer: flv.NewMuxerWithPrevTagSize(metadata, prevTagSize),
flvHeaderBlock: make([]byte, 31),
flvExtraDataBlock: make([]byte, 4096),
}
@@ -183,7 +172,7 @@ func NewHttpTransStream(metadata *libflv.AMF0Object, prevTagSize uint32) stream.
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
var prevTagSize uint32
var metaData *libflv.AMF0Object
var metaData *amf0.Object
endInfo := source.GetStreamEndInfo()
if endInfo != nil {
@@ -191,7 +180,7 @@ func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtoco
}
if stream.SourceTypeRtmp == source.GetType() {
metaData = source.(*rtmp.Publisher).Stack.MetaData()
metaData = source.(*rtmp.Publisher).Stack.Metadata()
}
return NewHttpTransStream(metaData, prevTagSize), nil

View File

@@ -2,7 +2,7 @@ package flv
import (
"github.com/gorilla/websocket"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/transport"
"net"
"time"
)

View File

@@ -1,11 +1,11 @@
package gb28181
import (
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtp"
"github.com/lkmio/transport"
"net"
)
@@ -17,7 +17,7 @@ const (
type ForwardSink struct {
stream.BaseSink
setup SetupType
socket transport.ITransport
socket transport.Transport
ssrc uint32
}
@@ -50,7 +50,7 @@ func (f *ForwardSink) Write(index int, data [][]byte, ts int64) error {
}
// 修改为与上级协商的SSRC
librtp.ModifySSRC(data[0], f.ssrc)
rtp.ModifySSRC(data[0], f.ssrc)
if SetupUDP == f.setup {
f.socket.(*transport.UDPClient).Write(data[0][2:])
@@ -83,14 +83,14 @@ func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stre
return nil, 0, err
}
client, err := TransportManger.NewUDPClient(stream.AppConfig.ListenIP, remoteAddr)
client, err := TransportManger.NewUDPClient(remoteAddr)
if err != nil {
return nil, 0, err
}
sink.socket = client
} else if SetupActive == setup {
server, err := TransportManger.NewTCPServer(stream.AppConfig.ListenIP)
server, err := TransportManger.NewTCPServer()
if err != nil {
return nil, 0, err
}

View File

@@ -3,8 +3,8 @@ package gb28181
import (
"encoding/json"
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
"net/http"
"os"
@@ -82,8 +82,8 @@ func closeForwardSink(source, sink string) {
}
}
func createTransport(setup string) (transport.ITransport, *os.File) {
var socket transport.ITransport
func createTransport(setup string) (transport.Transport, *os.File) {
var socket transport.Transport
name := fmt.Sprintf("./gb_forward_ps_%s_%d.raw", setup, time.Now().UnixMilli())
file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
@@ -135,7 +135,7 @@ func TestForwardSink(t *testing.T) {
for {
var ids []string
var transports []transport.ITransport
var transports []transport.Transport
var files []*os.File
// 三种推流方式都测试

View File

@@ -5,50 +5,20 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/mpeg"
"github.com/lkmio/transport"
"github.com/pion/rtp"
"io"
"net"
"net/http"
"os"
"sort"
"testing"
"time"
)
// 输入rtp负载的ps流文件路径, 根据ssrc解析, rtp头不要带扩展
func readRtpRaw(path string, ssrc uint32, tcp bool, cb func([]byte)) {
file, err := os.ReadFile(path)
if err != nil {
panic(err)
}
var offset int
tcpRtp := make([]byte, 1500)
for i := 0; i < len(file)-4; i++ {
if ssrc != binary.BigEndian.Uint32(file[i:]) {
continue
}
if i-8 != 0 {
var err error
rtp := file[offset : i-8]
if tcp {
binary.BigEndian.PutUint16(tcpRtp, uint16(len(rtp)))
copy(tcpRtp[2:], rtp)
cb(tcpRtp[:2+len(rtp)])
} else {
cb(rtp)
}
if err != nil {
panic(err.Error())
}
}
offset = i - 8
}
}
func connectSource(source string, addr string) {
v := &struct {
Source string `json:"source"` //GetSourceID
@@ -131,32 +101,137 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
return connectInfo.Data.IP, connectInfo.Data.Port
}
func rtp2overTcp(path string, ssrc uint32) {
file, err := os.OpenFile("./rtp.raw", os.O_CREATE|os.O_RDWR, 0666)
// 分割rtp包, 返回rtp over tcp包
func splitPackets(data []byte, ssrc uint32) ([][]byte, uint32) {
tcp := binary.BigEndian.Uint16(data) <= 1500
length := len(data)
var packets [][]byte
if tcp {
var offset int
for i := 0; i < length; i += 2 {
if i > 0 {
packets = append(packets, data[offset:i])
}
offset = i
i += int(binary.BigEndian.Uint16(data[i:]))
}
if len(packets) > 0 {
packet := rtp.Packet{}
err := packet.Unmarshal(packets[0][2:])
if err != nil {
panic(err)
}
readRtpRaw(path, ssrc, true, func(data []byte) {
file.Write(data)
})
return packets, packet.SSRC
}
} else {
// udp包根据ssrc查找
var offset int
for i := 0; i < length-4; i++ {
if ssrc != binary.BigEndian.Uint32(data[i:]) {
continue
}
file.Close()
if i-8 != 0 {
packet := data[offset : i-8]
bytes := make([]byte, 2+len(packet))
binary.BigEndian.PutUint16(bytes, uint16(len(packet)))
copy(bytes[2:], packet)
packets = append(packets, bytes)
}
offset = i - 8
}
return packets, ssrc
}
return nil, ssrc
}
// 使用wireshark直接导出udp流
var ts int64 = -1
func ctrDelay(data []byte) {
packet := rtp.Packet{}
err := packet.Unmarshal(data)
if err != nil {
panic(err)
}
if ts == -1 {
ts = int64(packet.Timestamp)
}
if dis := (int64(packet.Timestamp) - ts) / 90; dis > 0 {
time.Sleep(time.Duration(dis) * time.Millisecond)
}
ts = int64(packet.Timestamp)
}
// 使用wireshark直接导出的rtp流
// 根据ssrc来查找每个rtp包, rtp不要带扩展字段
func TestUDPRecv(t *testing.T) {
path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp"
ssrc := 0xBEBC201
func TestPublish(t *testing.T) {
path := "../../source_files/gb28181_h264.rtp"
var ssrc uint32 = 0xBEBC201
localAddr := "0.0.0.0:20001"
setup := "udp" //udp/passive/active
id := "hls_mystream"
rtp2overTcp(path, uint32(ssrc))
ip, port := createSource(id, setup, uint32(ssrc))
data, err := os.ReadFile(path)
if err != nil {
panic(err)
}
var packets [][]byte
packets, ssrc = splitPackets(data, ssrc)
utils.Assert(len(packets) > 0)
sort.Slice(packets, func(i, j int) bool {
packet := rtp.Packet{}
if err := packet.Unmarshal(packets[i][2:]); err != nil {
panic(err)
}
packet2 := rtp.Packet{}
if err := packet2.Unmarshal(packets[j][2:]); err != nil {
panic(err)
}
return packet.SequenceNumber < packet2.SequenceNumber
})
t.Run("demux", func(t *testing.T) {
buffer := mpeg.NewProbeBuffer(1024 * 1024 * 2)
demuxer := mpeg.NewPSDemuxer(true)
demuxer.SetHandler(&avformat.OnUnpackStream2FileHandler{
Path: "./ps_demux",
})
file, err := os.OpenFile("./ps_demux.ps", os.O_WRONLY|os.O_CREATE, 132)
if err != nil {
panic(err)
}
for _, packet := range packets {
file.Write(packet[14:])
bytes, err := buffer.Input(packet[14:])
if err != nil {
panic(err)
}
n, err := demuxer.Input(bytes)
if err != nil {
panic(err)
}
buffer.Reset(n)
}
})
t.Run("udp", func(t *testing.T) {
ip, port := createSource(id, "udp", ssrc)
if setup == "udp" {
addr, _ := net.ResolveUDPAddr("udp", localAddr)
remoteAddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ip, port))
@@ -166,11 +241,15 @@ func TestUDPRecv(t *testing.T) {
panic(err)
}
readRtpRaw(path, uint32(ssrc), false, func(data []byte) {
client.Write(data)
time.Sleep(1 * time.Millisecond)
for _, packet := range packets {
client.Write(packet[2:])
ctrDelay(packet[2:])
}
})
} else if !(setup == "active") {
t.Run("passive", func(t *testing.T) {
ip, port := createSource(id, "passive", ssrc)
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
remoteAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
@@ -181,19 +260,23 @@ func TestUDPRecv(t *testing.T) {
panic(err)
}
readRtpRaw(path, uint32(ssrc), true, func(data []byte) {
client.Write(data)
time.Sleep(1 * time.Millisecond)
for _, packet := range packets {
client.Write(packet)
ctrDelay(packet[2:])
}
})
} else {
t.Run("active", func(t *testing.T) {
ip, port := createSource(id, "active", ssrc)
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
server := transport.TCPServer{}
server.SetHandler2(func(conn net.Conn) []byte {
readRtpRaw(path, uint32(ssrc), true, func(data []byte) {
conn.Write(data)
time.Sleep(1 * time.Millisecond)
})
for _, packet := range packets {
conn.Write(packet)
ctrDelay(packet[2:])
}
return nil
}, nil, nil)
@@ -204,7 +287,5 @@ func TestUDPRecv(t *testing.T) {
}
connectSource(id, fmt.Sprintf("%s:%d", ip, port))
}
select {}
})
}

View File

@@ -2,11 +2,12 @@ package gb28181
import (
"fmt"
"github.com/lkmio/avformat/libmpeg"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"github.com/lkmio/transport"
"github.com/pion/rtp"
"math"
"net"
@@ -49,12 +50,10 @@ type GBSource interface {
type BaseGBSource struct {
stream.PublishSource
deMuxerCtx *libmpeg.PSDeMuxerContext
audioStream utils.AVStream
videoStream utils.AVStream
probeBuffer *mpeg.PSProbeBuffer
ssrc uint32
transport transport.ITransport
transport transport.Transport
audioTimestamp int64
videoTimestamp int64
@@ -64,9 +63,13 @@ type BaseGBSource struct {
}
func (source *BaseGBSource) Init(receiveQueueSize int) {
source.deMuxerCtx = libmpeg.NewPSDeMuxerContext(make([]byte, PsProbeBufferSize))
source.deMuxerCtx.SetHandler(source)
source.TransDemuxer = mpeg.NewPSDemuxer(false)
source.TransDemuxer.SetHandler(source)
source.TransDemuxer.SetOnPreprocessPacketHandler(func(packet *avformat.AVPacket) {
source.correctTimestamp(packet, packet.Dts, packet.Pts)
})
source.SetType(stream.SourceType28181)
source.probeBuffer = mpeg.NewProbeBuffer(PsProbeBufferSize)
source.PublishSource.Init(receiveQueueSize)
}
@@ -85,7 +88,14 @@ func (source *BaseGBSource) Input(data []byte) error {
packet := rtp.Packet{}
_ = packet.Unmarshal(data)
err := source.deMuxerCtx.Input(packet.Payload)
var bytes []byte
var n int
var err error
bytes, err = source.probeBuffer.Input(packet.Payload)
if err == nil {
n, err = source.TransDemuxer.Input(bytes)
}
// 非解析缓冲区满的错误, 继续解析
if err != nil {
@@ -94,94 +104,25 @@ func (source *BaseGBSource) Input(data []byte) error {
return err
}
}
return nil
}
// OnPartPacket 部分es流回调
func (source *BaseGBSource) OnPartPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, data []byte, first bool) {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
// 第一个es包, 标记内存起始位置
if first {
buffer.Mark()
}
buffer.Write(data)
}
// OnLossPacket 非完整es包丢弃回调, 直接释放内存块
func (source *BaseGBSource) OnLossPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID) {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
buffer.Fetch()
buffer.FreeTail()
}
// OnCompletePacket 完整帧回调
func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, dts int64, pts int64, key bool) error {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
data := buffer.Fetch()
var packet utils.AVPacket
var stream_ utils.AVStream
var err error
defer func() {
if packet == nil {
buffer.FreeTail()
}
}()
if utils.AVMediaTypeAudio == mediaType {
stream_, packet, err = stream.ExtractAudioPacket(codec, source.audioStream == nil, data, pts, dts, index, 90000)
if err != nil {
return err
}
if stream_ != nil {
source.audioStream = stream_
}
} else {
if source.videoStream == nil && !key {
log.Sugar.Errorf("skip non keyframes conn:%s", source.Conn.RemoteAddr())
return nil
}
stream_, packet, err = stream.ExtractVideoPacket(codec, key, source.videoStream == nil, data, pts, dts, index, 90000)
if err != nil {
return err
}
if stream_ != nil {
source.videoStream = stream_
}
}
if stream_ != nil {
source.OnDeMuxStream(stream_)
if len(source.OriginTracks()) >= source.deMuxerCtx.TrackCount() {
source.OnDeMuxStreamDone()
}
}
source.correctTimestamp(packet, dts, pts)
source.OnDeMuxPacket(packet)
source.probeBuffer.Reset(n)
return nil
}
// 纠正国标推流的时间戳
func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int64) {
func (source *BaseGBSource) correctTimestamp(packet *avformat.AVPacket, dts, pts int64) {
// dts和pts保持一致
pts = int64(math.Max(float64(dts), float64(pts)))
dts = pts
packet.SetPts(pts)
packet.SetDts(dts)
packet.Pts = pts
packet.Dts = dts
var lastTimestamp int64
var lastCreatedTime int64
if utils.AVMediaTypeAudio == packet.MediaType() {
if utils.AVMediaTypeAudio == packet.MediaType {
lastTimestamp = source.audioTimestamp
lastCreatedTime = source.audioPacketCreatedTime
} else if utils.AVMediaTypeVideo == packet.MediaType() {
} else if utils.AVMediaTypeVideo == packet.MediaType {
lastTimestamp = source.videoTimestamp
lastCreatedTime = source.videoPacketCreatedTime
}
@@ -193,38 +134,38 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
duration = 0x1FFFFFFFF - lastTimestamp + pts
if duration < 90000 {
// 处理正常溢出
packet.SetDuration(duration)
packet.Duration = duration
} else {
// 时间戳不正确
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. ssrc:%d", source.ssrc)
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. ssrc: %x", source.ssrc)
source.isSystemClock = true
}
} else {
duration = pts - lastTimestamp
}
packet.SetDuration(duration)
duration = packet.Duration(90000)
packet.Duration = duration
duration = packet.GetDuration(90000)
if duration < 0 || duration < 750 {
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. ts: %d duration: %d source: %s ssrc: %d", pts, duration, source.ID, source.ssrc)
log.Sugar.Errorf("推流时间戳不正确, 使用系统时钟. ts: %d duration: %d source: %s ssrc: %x", pts, duration, source.ID, source.ssrc)
source.isSystemClock = true
}
}
// 纠正时间戳
if source.isSystemClock && lastTimestamp != -1 {
duration = (packet.CreatedTime() - lastCreatedTime) * 90
packet.SetDts(lastTimestamp + duration)
packet.SetPts(lastTimestamp + duration)
packet.SetDuration(duration)
duration = (packet.CreatedTime - lastCreatedTime) * 90
packet.Dts = lastTimestamp + duration
packet.Pts = lastTimestamp + duration
packet.Duration = duration
}
if utils.AVMediaTypeAudio == packet.MediaType() {
source.audioTimestamp = packet.Pts()
source.audioPacketCreatedTime = packet.CreatedTime()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
source.videoTimestamp = packet.Pts()
source.videoPacketCreatedTime = packet.CreatedTime()
if utils.AVMediaTypeAudio == packet.MediaType {
source.audioTimestamp = packet.Pts
source.audioPacketCreatedTime = packet.CreatedTime
} else if utils.AVMediaTypeVideo == packet.MediaType {
source.videoTimestamp = packet.Pts
source.videoPacketCreatedTime = packet.CreatedTime
}
}
@@ -249,11 +190,6 @@ func (source *BaseGBSource) Close() {
}
source.PublishSource.Close()
if source.deMuxerCtx != nil {
source.deMuxerCtx.Close()
source.deMuxerCtx = nil
}
}
func (source *BaseGBSource) SetConn(conn net.Conn) {

View File

@@ -1,6 +1,7 @@
package gb28181
import (
"github.com/lkmio/transport"
"net"
)
@@ -12,8 +13,8 @@ type ActiveSource struct {
tcp *TCPClient
}
func (a ActiveSource) Connect(remoteAddr *net.TCPAddr) error {
client, err := NewTCPClient(a.port, remoteAddr, &a)
func (a *ActiveSource) Connect(remoteAddr *net.TCPAddr) error {
client, err := NewTCPClient(a.port, remoteAddr, a)
if err != nil {
return err
}
@@ -22,7 +23,7 @@ func (a ActiveSource) Connect(remoteAddr *net.TCPAddr) error {
return nil
}
func (a ActiveSource) SetupType() SetupType {
func (a *ActiveSource) SetupType() SetupType {
return SetupActive
}
@@ -33,5 +34,10 @@ func NewActiveSource() (*ActiveSource, int, error) {
return nil
})
return &ActiveSource{port: port}, port, nil
return &ActiveSource{
PassiveSource: PassiveSource{
decoder: transport.NewLengthFieldFrameDecoder(0xFFFF, 2),
},
port: port,
}, port, nil
}

View File

@@ -1,21 +1,24 @@
package gb28181
import "github.com/lkmio/avformat/transport"
import "github.com/lkmio/transport"
type PassiveSource struct {
BaseGBSource
decoder *transport.LengthFieldFrameDecoder
}
// Input 重写stream.Source的Input函数, 主解析将会把推流数据交给PassiveSource处理
func (p PassiveSource) Input(data []byte) error {
return p.decoder.Input(data)
// Input 重写stream.Source的Input函数, 主协程把推流数据交给PassiveSource处理
func (p *PassiveSource) Input(data []byte) error {
_, err := DecodeGBRTPOverTCPPacket(data, p, p.decoder, nil, p.Conn)
return err
}
func (p PassiveSource) SetupType() SetupType {
func (p *PassiveSource) SetupType() SetupType {
return SetupPassive
}
func NewPassiveSource() *PassiveSource {
return &PassiveSource{}
return &PassiveSource{
decoder: transport.NewLengthFieldFrameDecoder(0xFFFF, 2),
}
}

View File

@@ -1,8 +1,8 @@
package gb28181
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
)

View File

@@ -1,9 +1,10 @@
package gb28181
import (
"github.com/lkmio/avformat/transport"
"encoding/hex"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
"runtime"
)
@@ -49,11 +50,16 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
// 单端口推流时, 先解析出SSRC找到GBSource. 后序将推流数据交给stream.Source处理
if session.source == nil {
if err := session.decoder.Input(data); err != nil {
log.Sugar.Errorf("解析粘包数据失败 err: %s", err.Error())
conn.Close()
source, err := DecodeGBRTPOverTCPPacket(data, nil, session.decoder, T.filter, conn)
if err != nil {
log.Sugar.Errorf("解析rtp失败 err: %s conn: %s data: %s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(data))
_ = conn.Close()
return nil
}
if source != nil {
session.Init(source)
}
} else {
// 将流交给Source的主协程处理主协程最终会调用PassiveSource的Input函数处理
if session.source.SetupType() == SetupPassive {
@@ -80,7 +86,7 @@ func NewTCPServer(filter Filter) (*TCPServer, error) {
var err error
if stream.AppConfig.GB28181.IsMultiPort() {
tcp = &transport.TCPServer{}
tcp, err = TransportManger.NewTCPServer(stream.AppConfig.ListenIP)
tcp, err = TransportManger.NewTCPServer()
if err != nil {
return nil, err
}

View File

@@ -1,10 +1,9 @@
package gb28181
import (
"encoding/hex"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/log"
"fmt"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"github.com/pion/rtp"
"net"
)
@@ -13,20 +12,14 @@ import (
type TCPSession struct {
conn net.Conn
source GBSource
decoder *transport.LengthFieldFrameDecoder
receiveBuffer *stream.ReceiveBuffer
decoder *transport.LengthFieldFrameDecoder
}
func (t *TCPSession) Init(source GBSource) {
t.source = source
// 创建收流缓冲区
t.receiveBuffer = stream.NewTCPReceiveBuffer()
// session关联到source后, 后续流数据都由source处理
if source.SetupType() == SetupPassive {
source.(*PassiveSource).decoder = t.decoder
} else {
source.(*ActiveSource).decoder = t.decoder
}
}
func (t *TCPSession) Close() {
@@ -35,52 +28,55 @@ func (t *TCPSession) Close() {
t.source.Close()
t.source = nil
}
}
if t.decoder != nil {
t.decoder.Close()
t.decoder = nil
func DecodeGBRTPOverTCPPacket(data []byte, source GBSource, decoder *transport.LengthFieldFrameDecoder, filter Filter, conn net.Conn) (GBSource, error) {
length := len(data)
for i := 0; i < length; {
n, bytes, err := decoder.Input(data[i:])
if err != nil {
return source, err
}
i += n
// 单端口模式,ssrc匹配source
if source == nil || stream.SessionStateHandshakeSuccess == source.State() {
packet := rtp.Packet{}
if err := packet.Unmarshal(bytes); err != nil {
return nil, err
} else if source == nil {
source = filter.FindSource(packet.SSRC)
}
if source == nil {
// ssrc 匹配不到Source
return nil, fmt.Errorf("gb28181推流失败 ssrc: %x 匹配不到source", packet.SSRC)
}
if stream.SessionStateHandshakeSuccess == source.State() {
source.PreparePublish(conn, packet.SSRC, source)
}
}
// 如果是单端口推流, 并且刚才与source绑定, 此时正位于网络收流协程, 否则都位于主协程
if source.SetupType() == SetupPassive {
source.(*PassiveSource).BaseGBSource.Input(bytes)
} else {
source.(*ActiveSource).BaseGBSource.Input(bytes)
}
}
return source, nil
}
func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
session := &TCPSession{
conn: conn,
// filter: filter,
decoder: transport.NewLengthFieldFrameDecoder(0xFFFF, 2),
}
// 创建粘包解码器, 并设置解粘包处理回调
session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) {
// 单端口模式,ssrc匹配source
if session.source == nil || stream.SessionStateHandshakeSuccess == session.source.State() {
packet := rtp.Packet{}
if err := packet.Unmarshal(bytes); err != nil {
log.Sugar.Errorf("解析rtp失败 err: %s conn: %s data: %s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
source := filter.FindSource(packet.SSRC)
if source == nil {
// 匹配不到Source, 直接关闭连接
log.Sugar.Errorf("gb28181推流失败 ssrc: %x 匹配不到source conn: %s data: %s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
session.Init(source)
if stream.SessionStateHandshakeSuccess == session.source.State() {
session.source.PreparePublish(session.conn, packet.SSRC, session.source)
}
}
// 如果是单端口推流, 并且刚才与source绑定, 此时正位于网络收流协程, 否则都位于主协程
if session.source.SetupType() == SetupPassive {
session.source.(*PassiveSource).BaseGBSource.Input(bytes)
} else {
session.source.(*ActiveSource).BaseGBSource.Input(bytes)
}
})
// 多端口收流, Source已知, 直接初始化Session
if stream.AppConfig.GB28181.IsMultiPort() {
session.Init(filter.(*singleFilter).source)

View File

@@ -1,9 +1,9 @@
package gb28181
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"github.com/pion/rtp"
"net"
"runtime"
@@ -64,7 +64,7 @@ func NewUDPServer(filter Filter) (*UDPServer, error) {
var udp *transport.UDPServer
var err error
if stream.AppConfig.GB28181.IsMultiPort() {
udp, err = TransportManger.NewUDPServer(stream.AppConfig.ListenIP)
udp, err = TransportManger.NewUDPServer()
if err != nil {
return nil, err
}

25
go.mod
View File

@@ -1,13 +1,19 @@
module github.com/lkmio/lkm
require github.com/lkmio/avformat v0.0.0
require (
github.com/lkmio/mpeg v0.0.0
github.com/lkmio/flv v0.0.0
github.com/lkmio/rtmp v0.0.0
github.com/lkmio/transport v0.0.0
github.com/lkmio/rtp v0.0.0
)
require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/lkmio/avformat v0.0.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.5
github.com/pion/interceptor v0.1.25
github.com/pion/webrtc/v3 v3.2.29
github.com/sirupsen/logrus v1.9.3
github.com/x-cray/logrus-prefixed-formatter v0.5.2
@@ -24,10 +30,11 @@ require (
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/ice/v2 v2.3.13 // indirect
github.com/pion/interceptor v0.1.25 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.12 // indirect
github.com/pion/rtp v1.8.5 // indirect
github.com/pion/sctp v1.8.12 // indirect
github.com/pion/sdp/v3 v3.0.8 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
@@ -47,4 +54,14 @@ require (
replace github.com/lkmio/avformat => ../avformat
replace github.com/lkmio/mpeg => ../mpeg
replace github.com/lkmio/flv => ../flv
replace github.com/lkmio/rtmp => ../rtmp
replace github.com/lkmio/transport => ../transport
replace github.com/lkmio/rtp => ../rtp
go 1.19

View File

@@ -2,32 +2,29 @@ package hls
import (
"fmt"
"github.com/lkmio/avformat/libhls"
"github.com/lkmio/avformat/libmpeg"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"os"
"path/filepath"
"strconv"
)
type tsContext struct {
type TransStream struct {
stream.BaseTransStream
muxer *mpeg.TSMuxer
ctx struct {
segmentSeq int // 切片序号
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
url string // @See TransStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
}
type TransStream struct {
stream.BaseTransStream
muxer libmpeg.TSMuxer
context *tsContext
M3U8Writer libhls.M3U8Writer
M3U8Writer stream.M3U8Writer
m3u8Name string // m3u8文件名
m3u8File *os.File // m3u8文件句柄
dir string // m3u8文件父目录
@@ -40,16 +37,12 @@ type TransStream struct {
PlaylistFormat *string // 位于内存中的播放列表每个sink都引用指针地址.
}
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
if packet.Index() >= t.muxer.TrackCount() {
return nil, -1, false, fmt.Errorf("track not available")
}
func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
// 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
// 保存当前切片文件
if t.context.file != nil {
if t.ctx.file != nil {
err := t.flushSegment(false)
if err != nil {
return nil, -1, false, err
@@ -64,12 +57,23 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream), pts, dts, packet.KeyFrame())
} else {
t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
}
length := len(data)
capacity := cap(t.ctx.writeBuffer)
for i := 0; i < length; {
if capacity-t.ctx.writeBufferSize < mpeg.TsPacketSize {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
i += t.muxer.Input(bytes, packet.Index, data[i:], length, dts, pts, packet.Key, i == 0)
t.ctx.writeBufferSize += mpeg.TsPacketSize
}
return nil, -1, true, nil
}
@@ -79,42 +83,31 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
}
var err error
if utils.AVMediaTypeVideo == track.Stream.Type() {
data := track.Stream.CodecParameters().AnnexBExtraData()
_, err = t.muxer.AddTrack(track.Stream.Type(), track.Stream.CodecId(), data)
if utils.AVMediaTypeVideo == track.Stream.MediaType {
data := track.Stream.CodecParameters.AnnexBExtraData()
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
} else {
_, err = t.muxer.AddTrack(track.Stream.Type(), track.Stream.CodecId(), track.Stream.Extra())
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
}
return err
}
func (t *TransStream) WriteHeader() error {
//if packet.Index >= t.muxer.TrackCount() {
// return nil, -1, false, fmt.Errorf("track not available")
//}
return t.createSegment()
}
func (t *TransStream) onTSWrite(data []byte) {
t.context.writeBufferSize += len(data)
}
func (t *TransStream) onTSAlloc(size int) []byte {
n := len(t.context.writeBuffer) - t.context.writeBufferSize
if n < size {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
t.context.writeBufferSize = 0
}
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
}
// 写入新的TS切片更新M3U8
func (t *TransStream) flushSegment(end bool) error {
// 写入剩余TS包
if t.context.writeBufferSize > 0 {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
t.context.writeBufferSize = 0
if t.ctx.writeBufferSize > 0 {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
if err := t.context.file.Close(); err != nil {
if err := t.ctx.file.Close(); err != nil {
return err
}
@@ -125,7 +118,7 @@ func (t *TransStream) flushSegment(end bool) error {
// 更新m3u8列表
duration := float32(t.muxer.Duration()) / 90000
t.M3U8Writer.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path)
t.M3U8Writer.AddSegment(duration, t.ctx.url, t.ctx.segmentSeq, t.ctx.path)
m3u8Txt := t.M3U8Writer.String()
//if end {
@@ -163,21 +156,21 @@ func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
startSeq := t.context.segmentSeq + 1
startSeq := t.ctx.segmentSeq + 1
for {
tsName := fmt.Sprintf(t.tsFormat, startSeq)
// ts文件
t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName)
t.ctx.path = fmt.Sprintf("%s/%s", t.dir, tsName)
// m3u8列表中切片的url
t.context.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
t.ctx.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
file, err := os.OpenFile(t.ctx.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err == nil {
tsFile = file
break
}
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.context.path)
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.ctx.path)
if os.IsPermission(err) || os.IsTimeout(err) || os.IsNotExist(err) {
return err
}
@@ -186,19 +179,24 @@ func (t *TransStream) createSegment() error {
startSeq++
}
t.context.segmentSeq = startSeq
t.context.file = tsFile
_ = t.muxer.WriteHeader()
t.ctx.segmentSeq = startSeq
t.ctx.file = tsFile
n, err := t.muxer.WriteHeader(t.ctx.writeBuffer)
if err != nil {
return err
}
t.ctx.writeBufferSize = n
return nil
}
func (t *TransStream) Close() ([][]byte, int64, error) {
var err error
if t.context.file != nil {
if t.ctx.file != nil {
err = t.flushSegment(true)
err = t.context.file.Close()
t.context.file = nil
err = t.ctx.file.Close()
t.ctx.file = nil
}
if t.muxer != nil {
@@ -244,7 +242,7 @@ func DeleteOldSegments(id string) {
// @Params parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
// @Params segmentDuration 单个切片时长
// @Params playlistLength 缓存多少个切片
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer libhls.M3U8Writer) (stream.TransStream, error) {
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer stream.M3U8Writer) (stream.TransStream, error) {
// 创建文件夹
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
@@ -271,7 +269,7 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
if writer != nil {
transStream.M3U8Writer = writer
} else {
transStream.M3U8Writer = libhls.NewM3U8Writer(playlistLength)
transStream.M3U8Writer = stream.NewM3U8Writer(playlistLength)
}
if playlistFormat != nil {
@@ -281,16 +279,11 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
}
// 创建TS封装器
muxer := libmpeg.NewTSMuxer()
muxer.SetWriteHandler(transStream.onTSWrite)
muxer.SetAllocHandler(transStream.onTSAlloc)
muxer := mpeg.NewTSMuxer()
// ts封装上下文对象
transStream.context = &tsContext{
segmentSeq: seq,
writeBuffer: make([]byte, 1024*1024),
writeBufferSize: 0,
}
// 初始化ts封装上下文对象
transStream.ctx.segmentSeq = seq
transStream.ctx.writeBuffer = make([]byte, 1024*1024)
transStream.muxer = muxer
transStream.m3u8File = file
@@ -301,7 +294,7 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
id := source.GetID()
var writer libhls.M3U8Writer
var writer stream.M3U8Writer
var playlistFormat *string
startSeq := -1

97
jt1078/jt_demuxer.go Normal file
View File

@@ -0,0 +1,97 @@
package jt1078
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
)
type Demuxer struct {
avformat.BaseDemuxer
prevPacket *Packet
sim string
channel int
lastError string
}
func (d *Demuxer) ProcessPrevPacket() error {
var codec utils.AVCodecID
index := d.FindBufferIndex(int(d.prevPacket.pt))
bytes, err := d.BaseDemuxer.DataPipeline.Feat(index)
if err != nil {
return err
} else /*if d.prevPacket.packetType > AudioFrameMark {
// 透传数据, 丢弃
d.DataPipeline.DiscardBackPacket(index)
return nil
} else*/if err, codec = PT2CodecID(d.prevPacket.pt); err != nil {
d.BaseDemuxer.DataPipeline.DiscardBackPacket(index)
return err
}
if d.prevPacket.packetType == AudioFrameMark {
d.OnAudioPacket(index, codec, bytes, int64(d.prevPacket.ts))
} else if d.prevPacket.packetType < AudioFrameMark {
// 视频帧
d.OnVideoPacket(index, codec, bytes, avformat.IsKeyFrame(codec, bytes), int64(d.prevPacket.ts), int64(d.prevPacket.ts), avformat.PacketTypeAnnexB)
}
if !d.Completed && d.Tracks.Size() > 1 {
d.ProbeComplete()
}
return nil
}
func (d *Demuxer) Input(data []byte) (int, error) {
packet := Packet{}
if err := packet.Unmarshal(data); err != nil {
return 0, err
} else if len(packet.payload) == 0 {
// 过滤空数据
return 0, nil
}
// 如果时间戳或者负载类型发生变化, 认为是新的音视频帧处理前一包创建AVPacket回调给PublishSource。
// 分包标记可能不靠谱
if d.prevPacket != nil && (d.prevPacket.ts != packet.ts || d.prevPacket.pt != packet.pt) {
err := d.ProcessPrevPacket()
if err != nil && err.Error() != d.lastError {
println(err.Error())
d.lastError = err.Error()
}
}
if d.prevPacket == nil {
d.prevPacket = &Packet{}
d.sim = packet.simNumber
d.channel = int(packet.channelNumber)
}
var mediaType utils.AVMediaType
if packet.packetType < AudioFrameMark {
mediaType = utils.AVMediaTypeVideo
} else if packet.packetType == AudioFrameMark {
mediaType = utils.AVMediaTypeAudio
} else {
// 透传数据, 丢弃
return len(data), nil
}
index := d.FindBufferIndex(int(packet.pt))
_, err := d.DataPipeline.Write(packet.payload, index, mediaType)
if err != nil {
panic(err)
}
*d.prevPacket = packet
return len(data), nil
}
func NewDemuxer() *Demuxer {
return &Demuxer{
BaseDemuxer: avformat.BaseDemuxer{
DataPipeline: &avformat.StreamsBuffer{},
Name: "jt1078", // vob
AutoFree: false,
},
}
}

120
jt1078/jt_packet.go Normal file
View File

@@ -0,0 +1,120 @@
package jt1078
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/utils"
)
const (
VideoIFrameMark = 0b000
VideoPFrameMark = 0b001
VideoBFrameMark = 0b010
AudioFrameMark = 0b011
TransmissionDataMark = 0b100
PTVideoH264 = 98
PTVideoH265 = 99
PTVideoAVS = 100
PTVideoSVAC = 101
PTAudioG711A = 6
PTAudioG711U = 7
PTAudioG726 = 8
PTAudioG729A = 9
PTAudioAAC = 19
PTAudioMP3 = 25
PTAudioADPCMA = 26
)
type Packet struct {
pt byte
packetType byte
ts uint64
subMark byte
simNumber string
channelNumber byte
payload []byte
}
func (p *Packet) Unmarshal(data []byte) error {
if len(data) < 12 {
return fmt.Errorf("invaild data")
}
packetType := data[11] >> 4 & 0x0F
// 忽略透传数据
if TransmissionDataMark == packetType {
return fmt.Errorf("invaild data")
}
// 忽略低于最低长度的数据包
if (AudioFrameMark == packetType && len(data) < 26) || (AudioFrameMark == packetType && len(data) < 22) {
return fmt.Errorf("invaild data")
}
// x扩展位,固定为0
_ = data[0] >> 4 & 0x1
pt := data[1] & 0x7F
// seq
_ = binary.BigEndian.Uint16(data[2:])
var simNumber string
for i := 4; i < 10; i++ {
simNumber += fmt.Sprintf("%02d", data[i])
}
// channel
channelNumber := data[10]
// subMark
subMark := data[11] & 0x0F
// 时间戳,单位ms
var ts uint64
n := 12
if TransmissionDataMark != packetType {
ts = binary.BigEndian.Uint64(data[n:])
n += 8
}
if AudioFrameMark > packetType {
// iFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
// lastFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
}
// size
_ = binary.BigEndian.Uint16(data[n:])
n += 2
p.pt = pt
p.packetType = packetType
p.ts = ts
p.simNumber = simNumber
p.channelNumber = channelNumber
p.subMark = subMark
p.payload = data[n:]
return nil
}
func PT2CodecID(pt byte) (error, utils.AVCodecID) {
switch pt {
case PTVideoH264:
return nil, utils.AVCodecIdH264
case PTVideoH265:
return nil, utils.AVCodecIdH265
case PTAudioG711A:
return nil, utils.AVCodecIdPCMALAW
case PTAudioG711U:
return nil, utils.AVCodecIdPCMMULAW
case PTAudioAAC:
return nil, utils.AVCodecIdAAC
//case PTAudioADPCMA:
// return nil, utils.AVCodecIdADPCMAFC
default:
return fmt.Errorf("the codec %d is not implemented", pt), utils.AVCodecIdNONE
}
}

View File

@@ -1,9 +1,9 @@
package jt1078
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
"runtime"
)

View File

@@ -1,86 +1,46 @@
package jt1078
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
"strconv"
)
const (
VideoIFrameMark = 0b000
VideoPFrameMark = 0b001
VideoBFrameMark = 0b010
AudioFrameMark = 0b011
TransmissionDataMark = 0b100
PTVideoH264 = 98
PTVideoH265 = 99
PTVideoAVS = 100
PTVideoSVAC = 101
PTAudioG711A = 6
PTAudioG711U = 7
PTAudioG726 = 8
PTAudioG729A = 9
PTAudioAAC = 19
PTAudioMP3 = 25
PTAudioADPCMA = 26
)
type Session struct {
stream.PublishSource
phone string
decoder *transport.DelimiterFrameDecoder
audioIndex int
videoIndex int
audioStream utils.AVStream
videoStream utils.AVStream
audioBuffer collections.MemoryPool
videoBuffer collections.MemoryPool
rtpPacket *RtpPacket
receiveBuffer *stream.ReceiveBuffer
}
type RtpPacket struct {
pt byte
packetType byte
ts uint64
subMark byte
simNumber string
channelNumber byte
payload []byte
}
func (s *Session) OnJtPTPPacket(data []byte) {
packet, err := read1078RTPPacket(data)
func (s *Session) Input(data []byte) error {
var n int
for length := len(data); n < length; {
i, bytes, err := s.decoder.Input(data[n:])
if err != nil {
return
return err
} else if len(bytes) < 1 {
break
}
// 过滤空数据
if len(packet.payload) == 0 {
return
n += i
demuxer := s.TransDemuxer.(*Demuxer)
firstOfPacket := demuxer.prevPacket == nil
_, err = demuxer.Input(bytes)
if err != nil {
return err
}
// 首包处理, hook通知
if s.rtpPacket == nil {
s.SetID(packet.simNumber + "/" + strconv.Itoa(int(packet.channelNumber)))
s.rtpPacket = &RtpPacket{}
*s.rtpPacket = packet
if firstOfPacket && demuxer.prevPacket != nil {
s.SetID(demuxer.sim + "/" + strconv.Itoa(demuxer.channel))
go func() {
_, state := stream.PreparePublishSource(s, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("1078推流失败 source: %s", s.phone)
log.Sugar.Errorf("1078推流失败 source: %s", demuxer.sim)
if s.Conn != nil {
s.Conn.Close()
@@ -88,59 +48,9 @@ func (s *Session) OnJtPTPPacket(data []byte) {
}
}()
}
// 如果时间戳或者负载类型发生变化, 认为是新的音视频帧处理前一包创建AVPacket回调给PublishSource。
// 分包标记可能不靠谱
if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt {
if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil {
if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil {
log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error())
s.audioBuffer.FreeTail()
}
*s.rtpPacket = packet
} else if s.rtpPacket.packetType < AudioFrameMark && s.videoBuffer != nil {
if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil {
log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error())
s.videoBuffer.FreeTail()
}
*s.rtpPacket = packet
}
}
// 部分音视频帧
if packet.packetType == AudioFrameMark {
if s.audioBuffer == nil {
if s.videoIndex == 0 && s.audioIndex == 0 {
s.videoIndex = 1
}
// 创建音频的AVPacket缓冲区
s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio)
}
// 将部分音频帧写入缓冲区
s.audioBuffer.TryMark()
s.audioBuffer.Write(packet.payload)
} else {
if s.videoBuffer == nil {
if s.videoIndex == 0 && s.audioIndex == 0 {
s.audioIndex = 1
}
// 创建视频的AVPacket缓冲区
s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo)
}
// 将部分视频帧写入缓冲区
s.videoBuffer.TryMark()
s.videoBuffer.Write(packet.payload)
}
}
func (s *Session) Input(data []byte) error {
return s.decoder.Input(data)
return nil
}
func (s *Session) Close() {
@@ -151,149 +61,23 @@ func (s *Session) Close() {
s.Conn = nil
}
if s.decoder != nil {
s.decoder.Close()
s.decoder = nil
}
s.PublishSource.Close()
}
// 从视频帧中提取AVPacket和AVStream, 回调给PublishSource
func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
var codecId utils.AVCodecID
if PTVideoH264 == pt {
if s.videoStream == nil && VideoIFrameMark != pktType {
log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr())
return nil
}
codecId = utils.AVCodecIdH264
} else if PTVideoH265 == pt {
if s.videoStream == nil && VideoIFrameMark != pktType {
log.Sugar.Errorf("skip non keyframes conn:%s", s.Conn.RemoteAddr())
return nil
}
codecId = utils.AVCodecIdH265
} else {
return fmt.Errorf("the codec %d is not implemented", pt)
}
videoStream, videoPacket, err := stream.ExtractVideoPacket(codecId, VideoIFrameMark == pktType, s.videoStream == nil, data, int64(ts), int64(ts), index, 1000)
if err != nil {
return err
}
if videoStream != nil {
s.videoStream = videoStream
s.OnDeMuxStream(videoStream)
if s.videoStream != nil && s.audioStream != nil {
s.OnDeMuxStreamDone()
}
}
s.OnDeMuxPacket(videoPacket)
return nil
}
// 从音频帧中提取AVPacket和AVStream, 回调给PublishSource
func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error {
var codecId utils.AVCodecID
if PTAudioG711A == pt {
codecId = utils.AVCodecIdPCMALAW
} else if PTAudioG711U == pt {
codecId = utils.AVCodecIdPCMMULAW
} else if PTAudioAAC == pt {
codecId = utils.AVCodecIdAAC
} else {
return fmt.Errorf("the codec %d is not implemented", pt)
}
audioStream, audioPacket, err := stream.ExtractAudioPacket(codecId, s.audioStream == nil, data, int64(ts), int64(ts), index, 1000)
if err != nil {
return err
}
if audioStream != nil {
s.audioStream = audioStream
s.OnDeMuxStream(audioStream)
if s.videoStream != nil && s.audioStream != nil {
s.OnDeMuxStreamDone()
}
}
s.OnDeMuxPacket(audioPacket)
return nil
}
// 读取1078的rtp包, 返回数据类型, 负载类型、时间戳、负载数据
func read1078RTPPacket(data []byte) (RtpPacket, error) {
if len(data) < 12 {
return RtpPacket{}, fmt.Errorf("invaild data")
}
packetType := data[11] >> 4 & 0x0F
// 忽略透传数据
if TransmissionDataMark == packetType {
return RtpPacket{}, fmt.Errorf("invaild data")
}
// 忽略低于最低长度的数据包
if (AudioFrameMark == packetType && len(data) < 26) || (AudioFrameMark == packetType && len(data) < 22) {
return RtpPacket{}, fmt.Errorf("invaild data")
}
// x扩展位,固定为0
_ = data[0] >> 4 & 0x1
pt := data[1] & 0x7F
// seq
_ = binary.BigEndian.Uint16(data[2:])
var simNumber string
for i := 4; i < 10; i++ {
simNumber += fmt.Sprintf("%02d", data[i])
}
// channel
channelNumber := data[10]
// subMark
subMark := data[11] & 0x0F
// 时间戳,单位ms
var ts uint64
n := 12
if TransmissionDataMark != packetType {
ts = binary.BigEndian.Uint64(data[n:])
n += 8
}
if AudioFrameMark > packetType {
// iFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
// lastFrameInterval
_ = binary.BigEndian.Uint16(data[n:])
n += 2
}
// size
_ = binary.BigEndian.Uint16(data[n:])
n += 2
return RtpPacket{pt: pt, packetType: packetType, ts: ts, simNumber: simNumber, channelNumber: channelNumber, subMark: subMark, payload: data[n:]}, nil
}
func NewSession(conn net.Conn) *Session {
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
session := Session{
PublishSource: stream.PublishSource{
Conn: conn,
Type: stream.SourceType1078,
TransDemuxer: NewDemuxer(),
},
}
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
session.decoder = transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:], session.OnJtPTPPacket)
session.receiveBuffer = stream.NewTCPReceiveBuffer()
decoder: transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:]),
receiveBuffer: stream.NewTCPReceiveBuffer(),
}
session.TransDemuxer.SetHandler(&session)
session.Init(stream.ReceiveBufferTCPBlockCount)
go stream.LoopEvent(&session)
return &session

View File

@@ -1,8 +1,10 @@
package jt1078
import (
"github.com/lkmio/avformat/libbufio"
"github.com/lkmio/avformat/transport"
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/transport"
"net"
"os"
"testing"
@@ -10,7 +12,52 @@ import (
)
func TestPublish(t *testing.T) {
path := "D:\\GOProjects\\avformat\\10352264314-2.bin"
t.Run("decode_1078_data", func(t *testing.T) {
data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659")
if err != nil {
panic(err)
}
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
decoder := transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:])
length := len(data)
for j := 0; j < length; j += 4 {
size := int(binary.BigEndian.Uint32(data[j:]))
if 4+size > length-j {
break
}
rtp := data[j+4 : j+4+size]
var n int
for length := len(rtp); n < length; {
i, bytes, err := decoder.Input(rtp[n:])
if err != nil {
panic(err)
} else if len(bytes) < 1 {
break
}
n += i
packet := Packet{}
err = packet.Unmarshal(bytes)
if err != nil {
panic(err)
}
fmt.Printf("1078 packet ts: %d\r\n", packet.ts)
}
j += size
}
})
t.Run("publish", func(t *testing.T) {
//path := "../../source_files/10352264314-2.bin"
path := "../../source_files/013800138000-1.bin"
client := transport.TCPClient{}
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078")
@@ -29,11 +76,10 @@ func TestPublish(t *testing.T) {
index := 0
for index < len(file) {
n := libbufio.MinInt(len(file)-index, 1500)
n := bufio.MinInt(len(file)-index, 1500)
client.Write(file[index : index+n])
index += n
time.Sleep(1 * time.Millisecond)
}
println("end")
})
}

21
main.go
View File

@@ -2,16 +2,18 @@ package main
import (
"encoding/json"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/jt1078"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/record"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/transport"
"os"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/rtc"
"github.com/lkmio/lkm/rtmp"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/lkm/stream"
"go.uber.org/zap/zapcore"
"net"
@@ -67,11 +69,18 @@ func init() {
log.InitLogger(config.Log.FileLogging, zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress)
if stream.AppConfig.GB28181.Enable && stream.AppConfig.GB28181.IsMultiPort() {
gb28181.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.GB28181.Port[0]), uint16(stream.AppConfig.GB28181.Port[1]))
gb28181.TransportManger = transport.NewTransportManager(config.ListenIP, uint16(stream.AppConfig.GB28181.Port[0]), uint16(stream.AppConfig.GB28181.Port[1]))
}
if stream.AppConfig.Rtsp.Enable && stream.AppConfig.Rtsp.IsMultiPort() {
rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[1]), uint16(stream.AppConfig.Rtsp.Port[2]))
rtsp.TransportManger = transport.NewTransportManager(config.ListenIP, uint16(stream.AppConfig.Rtsp.Port[1]), uint16(stream.AppConfig.Rtsp.Port[2]))
}
// 创建dump目录
if stream.AppConfig.Debug {
if err := os.MkdirAll("dump", 0666); err != nil {
panic(err)
}
}
// 打印配置信息

View File

@@ -14,12 +14,13 @@ type FLVFileSink struct {
}
// Input 输入http-flv数据
func (f *FLVFileSink) Input(data []byte) error {
func (f *FLVFileSink) Write(index int, blocks [][]byte, ts int64) error {
if f.fail {
return nil
}
//去掉不需要的换行符
for _, data := range blocks {
// 去掉不需要的换行符
var offset int
for i := 2; i < len(data); i++ {
if data[i-2] == 0x0D && data[i-1] == 0x0A {
@@ -30,11 +31,13 @@ func (f *FLVFileSink) Input(data []byte) error {
_, err := f.file.Write(data[offset : len(data)-2])
if err != nil {
//只要写入失败一次,后续不再允许写入, 不影响拉流
// 只要写入失败一次,后续不再允许写入, 不影响拉流
f.fail = true
return err
}
}
return err
return nil
}
func (f *FLVFileSink) Close() {
@@ -50,20 +53,20 @@ func NewFLVFileSink(sourceId string) (stream.Sink, string, error) {
now := time.Now().Format("2006-01-02/15-04-05")
path := filepath.Join(stream.AppConfig.Record.Dir, sourceId, now+".flv")
//创建目录
// 创建目录
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0666); err != nil {
return nil, "", err
}
//创建flv文件
// 创建flv文件
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, "", err
}
return &FLVFileSink{
BaseSink: stream.BaseSink{ID: "record-sink-flv", SourceID: sourceId, Protocol: stream.TransStreamFlv},
BaseSink: stream.BaseSink{ID: "record-sink-flv", SourceID: sourceId, Protocol: stream.TransStreamFlv, Ready: true},
file: file,
}, path, nil
}

View File

@@ -42,7 +42,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
for index, track := range tracks {
var mimeType string
var id string
codecId := track.Stream.CodecId()
codecId := track.Stream.CodecID
if utils.AVCodecIdH264 == codecId {
mimeType = webrtc.MimeTypeH264
} else if utils.AVCodecIdH265 == codecId {
@@ -64,7 +64,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
continue
}
if utils.AVMediaTypeAudio == track.Stream.Type() {
if utils.AVMediaTypeAudio == track.Stream.MediaType {
id = "audio"
} else {
id = "video"

View File

@@ -1,6 +1,7 @@
package rtc
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"github.com/pion/interceptor"
@@ -16,21 +17,23 @@ type transStream struct {
stream.BaseTransStream
}
func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
if utils.AVMediaTypeAudio == packet.MediaType() {
t.AppendOutStreamBuffer(packet.Data())
} else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
extra := t.BaseTransStream.Tracks[packet.Index()].Stream.CodecParameters().AnnexBExtraData()
if utils.AVMediaTypeAudio == packet.MediaType {
t.AppendOutStreamBuffer(packet.Data)
} else if utils.AVMediaTypeVideo == packet.MediaType {
avStream := t.BaseTransStream.Tracks[packet.Index].Stream
if packet.Key {
extra := avStream.CodecParameters.AnnexBExtraData()
t.AppendOutStreamBuffer(extra)
}
t.AppendOutStreamBuffer(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream))
data := avformat.AVCCPacket2AnnexB(avStream, packet)
t.AppendOutStreamBuffer(data)
}
return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.GetDuration(1000))), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
}
func (t *transStream) WriteHeader() error {

View File

@@ -2,7 +2,7 @@ package rtmp
import (
"encoding/binary"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/transport"
"net"
"os"
"testing"
@@ -19,7 +19,7 @@ func TestName(t *testing.T) {
}
client := transport.TCPClient{}
if err := client.Connect(nil, addr); err != nil {
if _, err := client.Connect(nil, addr); err != nil {
panic(err)
}

View File

@@ -1,50 +1,19 @@
package rtmp
import (
"github.com/lkmio/avformat/libflv"
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtmp"
"net"
)
// Publisher RTMP推流Source
type Publisher struct {
stream.PublishSource
Stack *librtmp.Stack
Stack *rtmp.ServerStack
}
func (p *Publisher) Input(data []byte) error {
return p.Stack.Input(data)
}
func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
// AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存
p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail()
p.PublishSource.OnDeMuxStream(stream)
}
// OnVideo 解析出来的完整视频包
func (p *Publisher) OnVideo(index int, data []byte, ts uint32) {
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeVideo).Fetch()
// 交给flv解复用器, 解析出AVPacket
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
}
func (p *Publisher) OnAudio(index int, data []byte, ts uint32) {
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeAudio).Fetch()
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
}
// OnPartPacket AVPacket的部分数据包
func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []byte, first bool) {
buffer := p.FindOrCreatePacketBuffer(index, mediaType)
if first {
buffer.Mark()
}
buffer.Write(data)
return p.Stack.Input(p.Conn, data)
}
func (p *Publisher) Close() {
@@ -52,10 +21,11 @@ func (p *Publisher) Close() {
p.Stack = nil
}
func NewPublisher(source string, stack *librtmp.Stack, conn net.Conn) *Publisher {
deMuxer := libflv.NewDeMuxer()
publisher := &Publisher{PublishSource: stream.PublishSource{ID: source, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, Stack: stack}
func NewPublisher(source string, stack *rtmp.ServerStack, conn net.Conn) *Publisher {
demuxer := stack.FLV
publisher := &Publisher{PublishSource: stream.PublishSource{ID: source, Type: stream.SourceTypeRtmp, TransDemuxer: demuxer, Conn: conn}, Stack: stack}
// 设置回调, 接受从DeMuxer解析出来的音视频包
deMuxer.SetHandler(publisher)
demuxer.SetHandler(publisher)
demuxer.AutoFree = false
return publisher
}

View File

@@ -6,8 +6,8 @@ import (
"net"
"runtime"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/transport"
)
type Server interface {

View File

@@ -1,16 +1,16 @@
package rtmp
import (
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/avformat"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtmp"
"net"
"testing"
)
func CreateTransStream(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) stream.TransStream {
func CreateTransStream(source stream.Source, protocol stream.TransStreamProtocol, streams []*avformat.AVStream) stream.TransStream {
if stream.TransStreamRtmp == protocol {
return NewTransStream(librtmp.ChunkSize)
return NewTransStream(rtmp.MaxChunkSize, nil)
}
return nil

View File

@@ -1,20 +1,20 @@
package rtmp
import (
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtmp"
"net"
)
// Session RTMP会话, 解析处理Message
type Session struct {
stack *librtmp.Stack // rtmp协议栈, 解析message
handle interface{} // 持有具体会话句柄(推流端/拉流端) 在@see OnPublish @see OnPlay回调中赋值
isPublisher bool // 是否时推流会话
conn net.Conn
stack *rtmp.ServerStack // rtmp协议栈, 解析message
handle interface{} // 持有具体会话句柄(推流端/拉流端) 在@see OnPublish @see OnPlay回调中赋值
isPublisher bool // 是否是推流会话
receiveBuffer *stream.ReceiveBuffer // 推流源收流队列
}
@@ -29,14 +29,12 @@ func (s *Session) generateSourceID(app, stream string) string {
}
func (s *Session) OnPublish(app, stream_ string) utils.HookState {
log.Sugar.Infof("rtmp onpublish app:%s stream:%s conn:%s", app, stream_, s.conn.RemoteAddr().String())
log.Sugar.Infof("rtmp onpublish app: %s stream: %s conn: %s", app, stream_, s.conn.RemoteAddr().String())
streamName, values := stream.ParseUrl(stream_)
sourceId := s.generateSourceID(app, streamName)
source := NewPublisher(sourceId, s.stack, s.conn)
// 设置推流的音视频回调
s.stack.SetOnPublishHandler(source)
// 初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针.
source.Init(stream.ReceiveBufferTCPBlockCount)
@@ -45,7 +43,7 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState {
// 统一处理source推流事件, source是否已经存在, hook回调....
_, state := stream.PreparePublishSource(source, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId)
log.Sugar.Errorf("rtmp推流失败 source: %s", sourceId)
} else {
s.handle = source
s.isPublisher = true
@@ -81,12 +79,12 @@ func (s *Session) Input(data []byte) error {
if s.isPublisher {
return s.handle.(*Publisher).PublishSource.Input(data)
} else {
return s.stack.Input(data)
return s.stack.Input(s.conn, data)
}
}
func (s *Session) Close() {
// session/conn/stack相互引用, go释放不了...手动赋值为nil
// session/conn/stack相互引用, gc回收不了...手动赋值为nil
s.conn = nil
defer func() {
@@ -120,8 +118,9 @@ func (s *Session) Close() {
func NewSession(conn net.Conn) *Session {
session := &Session{}
stack := librtmp.NewStack(conn, session)
session.stack = stack
stackServer := rtmp.NewStackServer(false)
stackServer.SetOnStreamHandler(session)
session.stack = stackServer
session.conn = conn
return session
}

View File

@@ -1,23 +1,23 @@
package rtmp
import (
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtmp"
"net"
)
type Sink struct {
stream.BaseSink
stack *librtmp.Stack
stack *rtmp.ServerStack
}
func (s *Sink) StartStreaming(_ stream.TransStream) error {
return s.stack.SendStreamBeginChunk()
return s.stack.SendStreamBeginChunk(s.Conn)
}
func (s *Sink) StopStreaming(stream stream.TransStream) {
_ = s.stack.SendStreamEOFChunk()
_ = s.stack.SendStreamEOFChunk(s.Conn)
s.BaseSink.StopStreaming(stream)
}
@@ -26,7 +26,7 @@ func (s *Sink) Close() {
s.stack = nil
}
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *rtmp.ServerStack) stream.Sink {
return &Sink{
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE, TCPStreaming: true},
stack: stack,

View File

@@ -1,59 +1,63 @@
package rtmp
import (
"github.com/lkmio/avformat/libflv"
"github.com/lkmio/avformat/librtmp"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/flv"
"github.com/lkmio/flv/amf0"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtmp"
)
type transStream struct {
stream.TCPTransStream
sequenceHeader []byte // sequence sequenceHeader
muxer *flv.Muxer
audioChunk rtmp.Chunk
videoChunk rtmp.Chunk
chunkSize int
header []byte //sequence header
headerSize int
muxer libflv.Muxer
audioChunk librtmp.Chunk
videoChunk librtmp.Chunk
metaData *libflv.AMF0Object // 推流方携带的元数据
metaData *amf0.Object // 推流方携带的元数据
}
func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var data []byte
var chunk *librtmp.Chunk
var chunk *rtmp.Chunk
var videoPkt bool
var videoKey bool
// rtmp chunk消息体的数据大小
// chunk负载消息体的数据大小
var payloadSize int
// 先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
var chunkPayloadOffset int
// flv data header
var dataHeaderSize int
var dts int64
var pts int64
var keyBuffer bool
var frameType int
dts = packet.ConvertDts(1000)
pts = packet.ConvertPts(1000)
ct := pts - dts
if utils.AVMediaTypeAudio == packet.MediaType() {
data = packet.Data()
// chunk = header+payload(audio data / video data)
if utils.AVMediaTypeAudio == packet.MediaType {
data = packet.Data
chunk = &t.audioChunk
chunkPayloadOffset = 2
payloadSize += chunkPayloadOffset + len(data)
} else if utils.AVMediaTypeVideo == packet.MediaType() {
dataHeaderSize = t.muxer.ComputeAudioDataHeaderSize()
} else if utils.AVMediaTypeVideo == packet.MediaType {
videoPkt = true
videoKey = packet.KeyFrame()
data = packet.AVCCPacketData()
chunk = &t.videoChunk
chunkPayloadOffset = t.muxer.ComputeVideoDataSize(uint32(ct))
payloadSize += chunkPayloadOffset + len(data)
if videoKey = packet.Key; videoKey {
frameType = flv.FrameTypeKeyFrame
}
data = avformat.AnnexBPacket2AVCC(packet)
chunk = &t.videoChunk
dataHeaderSize = t.muxer.ComputeVideoDataHeaderSize(uint32(ct))
}
payloadSize += dataHeaderSize + len(data)
// 遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey {
if segment, key := t.MWBuffer.FlushSegment(); len(segment) > 0 {
@@ -66,30 +70,31 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
chunkHeaderSize := 12
// type为3的chunk数量
numChunks := (payloadSize - 1) / t.chunkSize
rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
// 整个chunk大小
totalSize := chunkHeaderSize + payloadSize + numChunks
// 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
if dts >= 0xFFFFFF && dts <= 0xFFFFFFFF {
rtmpMsgSize += (1 + numChunks) * 4
if dts >= 0xFFFFFF && dts >= 0xFFFFFFFF {
totalSize += (1 + numChunks) * 4
}
// 分配指定大小的内存
allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)
bytes := t.MWBuffer.Allocate(totalSize, dts, videoKey)
// 写第一个type为0的chunk header
// 写第一个type为0的chunk sequenceHeader
chunk.Length = payloadSize
chunk.Timestamp = uint32(dts)
n := chunk.MarshalHeader(allocate)
n := chunk.MarshalHeader(bytes)
// 封装成flv
if videoPkt {
n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
n += t.muxer.VideoData.Marshal(bytes[n:], uint32(ct), frameType, false)
} else {
n += t.muxer.WriteAudioData(allocate[n:], false)
n += t.muxer.AudioData.Marshal(bytes[n:], false)
}
// 将flv data写入chunk body
n += chunk.WriteBody(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
utils.Assert(len(allocate) == n)
n += chunk.WriteBody(bytes[n:], data, t.chunkSize, dataHeaderSize)
utils.Assert(len(bytes) == n)
// 合并写满了再发
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
@@ -101,10 +106,10 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
}
func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
utils.Assert(t.headerSize > 0)
utils.Assert(len(t.sequenceHeader) > 0)
// 发送sequence header
return [][]byte{t.header[:t.headerSize]}, 0, nil
// 发送sequence sequenceHeader
return [][]byte{t.sequenceHeader}, 0, nil
}
func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
@@ -122,20 +127,20 @@ func (t *transStream) WriteHeader() error {
utils.Assert(t.Tracks != nil)
utils.Assert(!t.BaseTransStream.Completed)
var audioStream utils.AVStream
var videoStream utils.AVStream
var audioStream *avformat.AVStream
var videoStream *avformat.AVStream
var audioCodecId utils.AVCodecID
var videoCodecId utils.AVCodecID
for _, track := range t.Tracks {
if utils.AVMediaTypeAudio == track.Stream.Type() {
if utils.AVMediaTypeAudio == track.Stream.MediaType {
audioStream = track.Stream
audioCodecId = audioStream.CodecId()
t.audioChunk = librtmp.NewAudioChunk()
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
audioCodecId = audioStream.CodecID
t.audioChunk = rtmp.NewAudioChunk()
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
videoStream = track.Stream
videoCodecId = videoStream.CodecId()
t.videoChunk = librtmp.NewVideoChunk()
videoCodecId = videoStream.CodecID
t.videoChunk = rtmp.NewVideoChunk()
}
}
@@ -143,67 +148,67 @@ func (t *transStream) WriteHeader() error {
// 初始化
t.BaseTransStream.Completed = true
t.muxer = libflv.NewMuxer(t.metaData)
t.muxer = flv.NewMuxer(t.metaData)
if utils.AVCodecIdNONE != audioCodecId {
t.muxer.AddAudioTrack(audioCodecId, 0, 0, 0)
t.muxer.AddAudioTrack(audioStream)
}
if utils.AVCodecIdNONE != videoCodecId {
t.muxer.AddVideoTrack(videoCodecId)
t.muxer.AddVideoTrack(videoStream)
}
var n int
t.header = make([]byte, 4096)
t.sequenceHeader = make([]byte, 4096)
// 生成推流的数据头(chunk+sequence header)
// 生成数据头
if audioStream != nil {
n += t.muxer.WriteAudioData(t.header[12:], true)
extra := audioStream.Extra()
copy(t.header[n+12:], extra)
n += t.muxer.AudioData.Marshal(t.sequenceHeader[12:], true)
extra := audioStream.Data
copy(t.sequenceHeader[n+12:], extra)
n += len(extra)
t.audioChunk.Length = n
t.audioChunk.MarshalHeader(t.header)
t.audioChunk.MarshalHeader(t.sequenceHeader)
n += 12
}
if videoStream != nil {
tmp := n
n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true)
extra := videoStream.CodecParameters().MP4ExtraData()
copy(t.header[n+12:], extra)
n += t.muxer.VideoData.Marshal(t.sequenceHeader[n+12:], 0, 0, true)
extra := videoStream.CodecParameters.MP4ExtraData()
copy(t.sequenceHeader[n+12:], extra)
n += len(extra)
t.videoChunk.Length = 5 + len(extra)
t.videoChunk.MarshalHeader(t.header[tmp:])
t.videoChunk.MarshalHeader(t.sequenceHeader[tmp:])
n += 12
}
// 创建元数据chunk
var body [1024]byte
amf0 := libflv.AMF0{}
amf0.AddString("onMetaData")
amf0.Add(t.muxer.MetaData())
length, _ := amf0.Marshal(body[:])
metadata := amf0.Data{}
metadata.AddString("onMetaData")
metadata.Add(t.muxer.MetaData())
length, _ := metadata.Marshal(body[:])
metaData := librtmp.Chunk{
Type: librtmp.ChunkType0,
ChunkStreamID_: 5,
metaDataChunk := rtmp.Chunk{
Type: rtmp.ChunkType0,
ChunkStreamID: 5,
Timestamp: 0,
TypeID: librtmp.MessageTypeIDDataAMF0,
TypeID: rtmp.MessageTypeIDDataAMF0,
StreamID: 1,
Body: body[:length],
Length: length,
}
var tmp [1600]byte
size := metaData.Marshal(tmp[:], librtmp.ChunkSize)
// metadata 放在sequence之前
copy(t.header[size:], t.header[:n])
copy(t.header, tmp[:][:size])
size := metaDataChunk.Marshal(tmp[:], rtmp.MaxChunkSize)
// metadata放在sequence之前
copy(t.sequenceHeader[size:], t.sequenceHeader[:n])
copy(t.sequenceHeader, tmp[:][:size])
n += size
t.headerSize = n
t.sequenceHeader = t.sequenceHeader[:n]
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
@@ -219,15 +224,15 @@ func (t *transStream) Close() ([][]byte, int64, error) {
return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func NewTransStream(chunkSize int, metaData *libflv.AMF0Object) stream.TransStream {
func NewTransStream(chunkSize int, metaData *amf0.Object) stream.TransStream {
return &transStream{chunkSize: chunkSize, metaData: metaData}
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
// 获取推流的元数据
var metaData *libflv.AMF0Object
var metaData *amf0.Object
if stream.SourceTypeRtmp == source.GetType() {
metaData = source.(*Publisher).Stack.MetaData()
metaData = source.(*Publisher).Stack.Metadata()
}
return NewTransStream(librtmp.ChunkSize, metaData), nil
return NewTransStream(rtmp.MaxChunkSize, metaData), nil
}

View File

@@ -1,5 +1,6 @@
package rtsp
import (
"fmt"
"github.com/lkmio/avformat/utils"
@@ -70,7 +71,7 @@ func (h handler) Process(session *session, method string, url_ *url.URL, headers
return fmt.Errorf("please establish a session first")
}
source, _ := stream.Path2SourceId(url_.Path, "")
source, _ := stream.Path2SourceID(url_.Path, "")
//反射调用各个处理函数
results := m.Call([]reflect.Value{

View File

@@ -1,9 +1,10 @@
package rtsp
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/transport"
"net"
"runtime"
)

View File

@@ -1,5 +1,6 @@
package rtsp
import (
"bufio"
"bytes"

View File

@@ -1,11 +1,11 @@
package rtsp
import (
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtp"
"github.com/lkmio/transport"
"github.com/pion/rtcp"
"net"
"time"
@@ -21,7 +21,7 @@ var (
type Sink struct {
stream.BaseSink
senders []*librtp.RtpSender // 一个rtsp源, 可能存在多个流, 每个流都需要拉取
senders []*rtp.RtpSender // 一个rtsp源, 可能存在多个流, 每个流都需要拉取
cb func(sdp string) // sdp回调, 响应describe
}
@@ -31,7 +31,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
return nil
}
s.senders = make([]*librtp.RtpSender, transStream.TrackSize())
s.senders = make([]*rtp.RtpSender, transStream.TrackSize())
// sdp回调给sink, sink应答给describe请求
if s.cb != nil {
s.cb(transStream.(*TransStream).sdp)
@@ -47,19 +47,19 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
var rtpPort uint16
var rtcpPort uint16
sender := librtp.RtpSender{
sender := rtp.RtpSender{
SSRC: ssrc,
}
if tcp {
s.TCPStreaming = true
} else {
sender.Rtp, err = TransportManger.NewUDPServer("0.0.0.0")
sender.Rtp, err = TransportManger.NewUDPServer()
if err != nil {
return 0, 0, err
}
sender.Rtcp, err = TransportManger.NewUDPServer("0.0.0.0")
sender.Rtcp, err = TransportManger.NewUDPServer()
if err != nil {
sender.Rtp.Close()
sender.Rtp = nil

View File

@@ -3,11 +3,12 @@ package rtsp
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/libavc"
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/librtsp/sdp"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/avc"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtp"
"github.com/pion/sdp/v3"
"net"
"strconv"
)
@@ -38,21 +39,22 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var ts uint32
track := t.RtspTracks[packet.Index()]
if utils.AVMediaTypeAudio == packet.MediaType() {
track := t.RtspTracks[packet.Index]
if utils.AVMediaTypeAudio == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
t.PackRtpPayload(track, packet.Index(), packet.Data(), ts)
} else if utils.AVMediaTypeVideo == packet.MediaType() {
t.PackRtpPayload(track, packet.Index, packet.Data, ts)
} else if utils.AVMediaTypeVideo == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()].Stream))
t.PackRtpPayload(track, packet.Index(), data, ts)
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
data := avc.RemoveStartCode(annexBData)
t.PackRtpPayload(track, packet.Index, data, ts)
}
return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
}
func (t *TransStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
@@ -65,7 +67,7 @@ func (t *TransStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
// 回滚序号和时间戳
index := int(track.StartSeq) - len(track.ExtraDataBuffer)
for i, bytes := range track.ExtraDataBuffer {
librtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
rtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts))
}
@@ -98,9 +100,9 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
return err
}
payloadType, ok := librtp.CodecIdPayloads[track.Stream.CodecId()]
payloadType, ok := rtp.CodecIdPayloads[track.Stream.CodecID]
if !ok {
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecId())
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecID)
}
// 恢复上次拉流的序号
@@ -111,35 +113,34 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
}
// 创建RTP封装器
var muxer librtp.Muxer
if utils.AVCodecIdH264 == track.Stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == track.Stream.CodecId() {
muxer = librtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == track.Stream.CodecId() {
muxer = librtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecId() || utils.AVCodecIdPCMMULAW == track.Stream.CodecId() {
muxer = librtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
var muxer rtp.Muxer
if utils.AVCodecIdH264 == track.Stream.CodecID {
muxer = rtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == track.Stream.CodecID {
muxer = rtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == track.Stream.CodecID {
muxer = rtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
muxer = rtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
}
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.Type())
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType)
t.RtspTracks = append(t.RtspTracks, rtspTrack)
index := len(t.RtspTracks) - 1
// 将sps和pps按照单一模式打包
bufferIndex := t.buffer.Index()
if utils.AVMediaTypeVideo == track.Stream.Type() {
parameters := track.Stream.CodecParameters()
if utils.AVCodecIdH265 == track.Stream.CodecId() {
bytes := parameters.(*utils.HEVCCodecData).VPS()
t.PackRtpPayload(rtspTrack, index, libavc.RemoveStartCode(bytes[0]), 0)
if utils.AVMediaTypeVideo == track.Stream.MediaType {
parameters := track.Stream.CodecParameters
if utils.AVCodecIdH265 == track.Stream.CodecID {
bytes := parameters.(*avformat.HEVCCodecData).VPS()
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(bytes[0]), 0)
}
spsBytes := parameters.SPS()
ppsBytes := parameters.PPS()
t.PackRtpPayload(rtspTrack, index, libavc.RemoveStartCode(spsBytes[0]), 0)
t.PackRtpPayload(rtspTrack, index, libavc.RemoveStartCode(ppsBytes[0]), 0)
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(spsBytes[0]), 0)
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(ppsBytes[0]), 0)
// 拷贝扩展数据的rtp包
size := t.buffer.Index() - bufferIndex
@@ -191,7 +192,7 @@ func (t *TransStream) WriteHeader() error {
}
for i, track := range t.Tracks {
payloadType, _ := librtp.CodecIdPayloads[track.Stream.CodecId()]
payloadType, _ := rtp.CodecIdPayloads[track.Stream.CodecID]
mediaDescription := sdp.MediaDescription{
ConnectionInformation: &sdp.ConnectionInformation{
NetworkType: "IN",
@@ -209,10 +210,10 @@ func (t *TransStream) WriteHeader() error {
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)}
if utils.AVMediaTypeAudio == track.Stream.Type() {
if utils.AVMediaTypeAudio == track.Stream.MediaType {
mediaDescription.MediaName.Media = "audio"
if utils.AVCodecIdAAC == track.Stream.CodecId() {
if utils.AVCodecIdAAC == track.Stream.CodecID {
//[14496-3], [RFC6416] profile-level-id:
//1 : Main Audio Profile Level 1
//9 : Speech Audio Profile Level 1

View File

@@ -1,8 +1,9 @@
package rtsp
import (
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/rtp"
)
// Track RtspTrack 对rtsp每路输出流的封装
@@ -13,14 +14,14 @@ type Track struct {
StartSeq uint16
EndSeq uint16
Muxer librtp.Muxer
Muxer rtp.Muxer
ExtraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
}
func (r *Track) Close() {
}
func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
func NewRTSPTrack(muxer rtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
stream := &Track{
PT: pt,
Rate: rate,

View File

@@ -232,11 +232,6 @@ func GetStreamPlayUrls(source string) []string {
// DumpStream2File 保存推流到文件, 用4字节帧长分割
func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte) {
if err := os.MkdirAll("dump", 0666); err != nil {
log.Sugar.Errorf("创建dump文件夹失败 err:%s", err.Error())
return
}
path := fmt.Sprintf("dump/%s-%s", sourceType.String(), conn.RemoteAddr().String())
path = strings.ReplaceAll(path, ":", ".")

View File

@@ -1,6 +1,7 @@
package stream
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
)
@@ -9,14 +10,14 @@ import (
type GOPBuffer interface {
// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
AddPacket(packet utils.AVPacket) bool
AddPacket(packet *avformat.AVPacket) bool
// SetDiscardHandler 设置丢弃帧时的回调
SetDiscardHandler(handler func(packet utils.AVPacket))
SetDiscardHandler(handler func(packet *avformat.AVPacket))
PeekAll(handler func(packet utils.AVPacket))
PeekAll(handler func(packet *avformat.AVPacket))
Peek(index int) utils.AVPacket
Peek(index int) *avformat.AVPacket
Size() int
@@ -26,24 +27,24 @@ type GOPBuffer interface {
}
type streamBuffer struct {
buffer collections.RingBuffer
buffer collections.RingBuffer[*avformat.AVPacket]
existVideoKeyFrame bool
discardHandler func(packet utils.AVPacket)
discardHandler func(packet *avformat.AVPacket)
}
func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool {
func (s *streamBuffer) AddPacket(packet *avformat.AVPacket) bool {
// 缓存满,清空
if s.Size()+1 == s.buffer.Capacity() {
s.Clear()
}
// 丢弃首帧视频非关键帧
if utils.AVMediaTypeVideo == packet.MediaType() && !s.existVideoKeyFrame && !packet.KeyFrame() {
if utils.AVMediaTypeVideo == packet.MediaType && !s.existVideoKeyFrame && !packet.Key {
return false
}
// 丢弃前一组GOP
videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()
videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType && packet.Key
if videoKeyFrame {
if s.existVideoKeyFrame {
s.discard()
@@ -56,7 +57,7 @@ func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool {
return true
}
func (s *streamBuffer) SetDiscardHandler(handler func(packet utils.AVPacket)) {
func (s *streamBuffer) SetDiscardHandler(handler func(packet *avformat.AVPacket)) {
s.discardHandler = handler
}
@@ -65,36 +66,36 @@ func (s *streamBuffer) discard() {
pkt := s.buffer.Pop()
if s.discardHandler != nil {
s.discardHandler(pkt.(utils.AVPacket))
s.discardHandler(pkt)
}
}
s.existVideoKeyFrame = false
}
func (s *streamBuffer) Peek(index int) utils.AVPacket {
func (s *streamBuffer) Peek(index int) *avformat.AVPacket {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.Data()
if index < len(head) {
return head[index].(utils.AVPacket)
return head[index]
} else {
return tail[index-len(head)].(utils.AVPacket)
return tail[index-len(head)]
}
}
func (s *streamBuffer) PeekAll(handler func(packet utils.AVPacket)) {
func (s *streamBuffer) PeekAll(handler func(packet *avformat.AVPacket)) {
head, tail := s.buffer.Data()
if head != nil {
for _, value := range head {
handler(value.(utils.AVPacket))
handler(value)
}
}
if tail != nil {
for _, value := range tail {
handler(value.(utils.AVPacket))
handler(value)
}
}
}
@@ -112,5 +113,5 @@ func (s *streamBuffer) Close() {
}
func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: collections.NewRingBuffer(1000), existVideoKeyFrame: false}
return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), existVideoKeyFrame: false}
}

173
stream/m3u8_writer.go Normal file
View File

@@ -0,0 +1,173 @@
package stream
import (
"bytes"
"github.com/lkmio/avformat/collections"
"math"
"strconv"
)
const (
ExtM3u = "EXTM3U"
ExtXVersion = "EXT-X-VERSION" //在文件中唯一
ExtINF = "EXTINF" //<duration>(浮点类型, 版本小于3用整型),[<title>]
ExXByteRange = "EXT-X-BYTERANGE" //版本4及以上,分片位置
ExtXDiscontinuity = "EXT-X-DISCONTINUITY" //后面的切片不连续, 文件格式、时间戳发生变化
ExtXKey = "EXT-X-KEY" //加密使用
ExtXMap = "EXT-X-MAP" //音视频的元数据
ExtXProgramDateTime = "EXT-X-PROGRAM-DATE-TIME"
ExtXDateRange = "EXT-X-DATERANGE"
ExtXTargetDuration = "EXT-X-TARGETDURATION" //切片最大时长, 整型单位秒
ExtXMediaSequence = "EXT-X-MEDIA-SEQUENCE" //第一个切片序号
ExtXDiscontinuitySequence = "EXT-X-DISCONTINUITY-SEQUENCE"
ExtXEndList = "EXT-X-ENDLIST"
ExtXPlaylistType = "EXT-X-PLAYLIST-TYPE"
ExtXIFramesOnly = "EXT-X-I-FRAMES-ONLY"
ExtXMedia = "EXT-X-MEDIA"
ExtXStreamINF = "EXT-X-STREAM-INF"
ExtXIFrameStreamINF = "EXT-X-I-FRAME-STREAM-INF"
ExtXSessionData = "EXT-X-SESSION-DATA"
ExtXSessionKey = "EXT-X-SESSION-KEY"
ExtXIndependentSegments = "EXT-X-INDEPENDENT-SEGMENTS"
ExtXStart = "EXT-X-START"
)
//HttpContent-Type头必须是"application/vnd.apple.mpegurl"或"audio/mpegurl"
//无BOM
type Segment struct {
Duration float32
Url string
Sequence int
Path string
}
type M3U8Writer interface {
// AddSegment 添加切片
//@Params duration 切片时长
//@Params url m3u8列表中切片的url
//@Params sequence m3u8列表中的切片序号
//@Params path 切片位于磁盘中的绝对路径
AddSegment(duration float32, url string, sequence int, path string)
String() string
// Size 返回切片文件数量
Size() int
// Get Head 返回指定索引切片文件
Get(index int) Segment
}
type m3u8Writer struct {
stringBuffer *bytes.Buffer
segments *collections.Queue[Segment]
}
func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) {
if m.segments.IsFull() {
m.segments.Pop()
}
m.segments.Push(Segment{Duration: duration, Url: url, Sequence: sequence, Path: path})
}
// 返回切片时长最长的值(秒)
func (m *m3u8Writer) targetDuration() int {
var targetDuration int
head, tail := m.segments.Data()
compute := func(playlist []Segment) {
for _, segment := range playlist {
// 会影响播放器缓存.
round := int(math.Ceil(float64(segment.Duration)))
if round > targetDuration {
targetDuration = round
}
}
}
if head != nil {
compute(head)
}
if tail != nil {
compute(tail)
}
return targetDuration
}
func (m *m3u8Writer) String() string {
// 仅实现简单的播放列表
head, tail := m.segments.Data()
if head == nil {
return ""
}
m.stringBuffer.Reset()
m.stringBuffer.WriteString("#EXTM3U\r\n")
// 仅实现第三个版本
m.stringBuffer.WriteString("#EXT-X-VERSION:3\r\n")
m.stringBuffer.WriteString("#EXT-X-TARGETDURATION:")
m.stringBuffer.WriteString(strconv.Itoa(m.targetDuration()))
m.stringBuffer.WriteString("\r\n")
m.stringBuffer.WriteString("#EXT-X-MEDIA-SEQUENCE:")
m.stringBuffer.WriteString(strconv.Itoa(head[0].Sequence))
m.stringBuffer.WriteString("\r\n")
appendSegments := func(playlist []Segment) {
for _, segment := range playlist {
m.stringBuffer.WriteString("#EXTINF:")
m.stringBuffer.WriteString(strconv.FormatFloat(float64(segment.Duration), 'f', -1, 32))
m.stringBuffer.WriteString(",\r\n")
m.stringBuffer.WriteString(segment.Url + "%s") // %s用于替换每个sink的拉流key
m.stringBuffer.WriteString("\r\n")
}
}
if head != nil {
appendSegments(head)
}
if tail != nil {
appendSegments(tail)
}
return m.stringBuffer.String()
}
func (m *m3u8Writer) Size() int {
var size int
head, tail := m.segments.Data()
if head != nil {
size += len(head)
}
if tail != nil {
size += len(tail)
}
return size
}
func (m *m3u8Writer) Get(index int) Segment {
head, tail := m.segments.Data()
if index >= len(head) {
return tail[index-len(head)]
} else {
return head[index]
}
}
func NewM3U8Writer(len int) M3U8Writer {
return &m3u8Writer{
stringBuffer: bytes.NewBuffer(make([]byte, 0, 1024*10)),
segments: collections.NewQueue[Segment](len),
}
}

View File

@@ -3,9 +3,9 @@ package stream
import (
"context"
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/transport"
"net"
"net/url"
"sync"

View File

@@ -2,16 +2,16 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat"
"github.com/lkmio/lkm/log"
"github.com/lkmio/transport"
"net"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/lkmio/avformat/stream"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/transcode"
)
@@ -64,23 +64,8 @@ type Source interface {
// IsCompleted 所有推流track是否解析完毕
IsCompleted() bool
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool
// OnDiscardPacket GOP缓存溢出回调, 释放AVPacket
OnDiscardPacket(pkt utils.AVPacket)
// OnDeMuxStream 解析出AVStream回调
OnDeMuxStream(stream utils.AVStream)
// OnDeMuxStreamDone 所有track解析完毕回调, 后续的OnDeMuxStream回调不再处理
OnDeMuxStreamDone()
// OnDeMuxPacket 解析出AvPacket回调
OnDeMuxPacket(packet utils.AVPacket)
// OnDeMuxDone 所有流解析完毕回调
OnDeMuxDone()
OnDiscardPacket(pkt *avformat.AVPacket)
Init(receiveQueueSize int)
@@ -135,7 +120,7 @@ type PublishSource struct {
state SessionState
Conn net.Conn
TransDeMuxer stream.DeMuxer // 负责从推流协议中解析出AVStream和AVPacket
TransDemuxer avformat.Demuxer // 负责从推流协议中解析出AVStream和AVPacket
recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径
hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟.
@@ -143,7 +128,7 @@ type PublishSource struct {
videoTranscoders []transcode.Transcoder // 视频解码器
originTracks TrackManager // 推流的音视频Streams
allStreamTracks TrackManager // 推流Streams+转码器获得的Stream
pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
//pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
closed atomic.Bool // source是否已经关闭
@@ -152,7 +137,7 @@ type PublishSource struct {
probeTimer *time.Timer // track解析超时计时器, 触发时执行@see writeHeader
TransStreams map[TransStreamID]TransStream // 所有输出流, 持有Sink
TransStreams map[TransStreamID]TransStream // 所有输出流
sinks map[SinkID]Sink // 保存所有Sink
slowSinks map[SinkID]Sink // 因推流慢被挂起的sink队列
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
@@ -169,6 +154,7 @@ type PublishSource struct {
urlValues url.Values // 推流url携带的参数
createTime time.Time // source创建时间
statistics *BitrateStatistics // 码流统计
streamLogger avformat.OnUnpackStream2FileHandler
}
func (s *PublishSource) SetLastPacketTime(time2 time.Time) {
@@ -220,6 +206,17 @@ func (s *PublishSource) Init(receiveQueueSize int) {
s.slowSinks = make(map[SinkID]Sink, 12)
s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
s.statistics = NewBitrateStatistics()
// 此处设置的探测时长, 只是为了保证在probeTimeout触发前, 一直在探测
s.TransDemuxer.SetProbeDuration(60000)
// 启动探测超时计时器
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, func() {
s.PostEvent(func() {
// s.writeHeader()
s.TransDemuxer.ProbeComplete()
})
})
}
func (s *PublishSource) CreateDefaultOutStreams() {
@@ -255,28 +252,6 @@ func (s *PublishSource) CreateDefaultOutStreams() {
}
}
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool {
if index >= cap(s.pktBuffers) {
panic("流路数过多...")
}
if s.pktBuffers[index] == nil {
if utils.AVMediaTypeAudio == mediaType {
s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 12)
} else if AppConfig.GOPCache {
// 开启GOP缓存
s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize)
} else {
// 未开启GOP缓存
// 1M缓存大小, 单帧绰绰有余
s.pktBuffers[index] = collections.NewRbMemoryPool(1024 * 1000)
}
}
return s.pktBuffers[index]
}
func (s *PublishSource) Input(data []byte) error {
s.streamPipe <- data
s.statistics.Input(len(data))
@@ -328,20 +303,20 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
func (s *PublishSource) DispatchGOPBuffer(transStream TransStream) {
if s.gopBuffer != nil {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
s.gopBuffer.PeekAll(func(packet *avformat.AVPacket) {
s.DispatchPacket(transStream, packet)
})
}
}
// DispatchPacket 分发AVPacket
func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket) {
func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
data, timestamp, videoKey, err := transStream.Input(packet)
if err != nil || len(data) < 1 {
return
}
s.DispatchBuffer(transStream, packet.Index(), data, timestamp, videoKey)
s.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey)
}
// DispatchBuffer 分发传输流
@@ -427,26 +402,26 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
}
if !disableAudio && utils.AVCodecIdNONE == audioCodecId {
audioCodecId = audioTrack.Stream.CodecId()
audioCodecId = audioTrack.Stream.CodecID
}
if !disableVideo && utils.AVCodecIdNONE == videoCodecId {
videoCodecId = videoTrack.Stream.CodecId()
videoCodecId = videoTrack.Stream.CodecID
}
// 创建音频转码器
if !disableAudio && audioCodecId != audioTrack.Stream.CodecId() {
if !disableAudio && audioCodecId != audioTrack.Stream.CodecID {
utils.Assert(false)
}
// 创建视频转码器
if !disableVideo && videoCodecId != videoTrack.Stream.CodecId() {
if !disableVideo && videoCodecId != videoTrack.Stream.CodecID {
utils.Assert(false)
}
// 查找传输流需要的所有track
var tracks []*Track
for _, track := range s.originTracks.All() {
if disableVideo && track.Stream.Type() == utils.AVMediaTypeVideo {
if disableVideo && track.Stream.MediaType == utils.AVMediaTypeVideo {
continue
}
@@ -504,7 +479,6 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
_, ok := sink.GetConn().(*transport.Conn)
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
length := transStream.OutStreamBufferCapacity() - 2
fmt.Printf("发送缓冲区容量: %d\r\n", length)
sink.EnableAsyncWriteMode(length)
}
@@ -617,17 +591,17 @@ func (s *PublishSource) DoClose() {
s.closed.Store(true)
if s.TransDeMuxer != nil {
s.TransDeMuxer.Close()
s.TransDeMuxer = nil
if s.TransDemuxer != nil {
s.TransDemuxer.Close()
s.TransDemuxer = nil
}
// 清空未写完的buffer
for _, buffer := range s.pktBuffers {
if buffer != nil {
buffer.Reset()
}
}
//for _, buffer := range s.pktBuffers {
// if buffer != nil {
// buffer.Reset()
// }
//}
// 释放GOP缓存
if s.gopBuffer != nil {
@@ -735,40 +709,9 @@ func (s *PublishSource) Close() {
}
// OnDiscardPacket GOP缓存溢出丢弃回调
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead()
}
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
if s.completed {
log.Sugar.Warnf("添加%s track失败,已经WriteHeader. source: %s", stream.Type().ToString(), s.ID)
return
} else if !s.NotTrackAdded(stream.Index()) {
log.Sugar.Warnf("添加%s track失败,已经添加索引为%d的track. source: %s", stream.Type().ToString(), stream.Index(), s.ID)
return
}
s.originTracks.Add(NewTrack(stream, 0, 0))
s.allStreamTracks.Add(NewTrack(stream, 0, 0))
// 启动track解析超时计时器
if len(s.originTracks.All()) == 1 {
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, func() {
s.PostEvent(func() {
s.writeHeader()
})
})
}
if utils.AVMediaTypeVideo == stream.Type() {
s.existVideo = true
}
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
// 设置GOP缓存溢出回调
s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
func (s *PublishSource) OnDiscardPacket(packet *avformat.AVPacket) {
if s.TransDemuxer != nil {
s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex)
}
}
@@ -797,7 +740,7 @@ func (s *PublishSource) writeHeader() {
// 恢复每路track的时间戳
tracks := s.originTracks.All()
for _, track := range tracks {
timestamps := streamInfo.Timestamps[track.Stream.CodecId()]
timestamps := streamInfo.Timestamps[track.Stream.CodecID]
track.Dts = timestamps[0]
track.Pts = timestamps[1]
}
@@ -805,7 +748,7 @@ func (s *PublishSource) writeHeader() {
// 纠正GOP中的时间戳
if s.gopBuffer != nil && s.gopBuffer.Size() != 0 {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
s.gopBuffer.PeekAll(func(packet *avformat.AVPacket) {
s.CorrectTimestamp(packet)
})
}
@@ -833,7 +776,7 @@ func (s *PublishSource) IsCompleted() bool {
// NotTrackAdded 返回该index对应的track是否没有添加
func (s *PublishSource) NotTrackAdded(index int) bool {
for _, track := range s.originTracks.All() {
if track.Stream.Index() == index {
if track.Stream.Index == index {
return false
}
}
@@ -841,39 +784,84 @@ func (s *PublishSource) NotTrackAdded(index int) bool {
return true
}
func (s *PublishSource) OnDeMuxStreamDone() {
s.writeHeader()
}
func (s *PublishSource) CorrectTimestamp(packet utils.AVPacket) {
func (s *PublishSource) CorrectTimestamp(packet *avformat.AVPacket) {
// 对比第一包的时间戳和上次推流的最后时间戳。如果小于上次的推流时间戳,则在原来的基础上累加。
if s.streamEndInfo != nil && !s.timestampModeDecided {
s.timestampModeDecided = true
timestamps := s.streamEndInfo.Timestamps[packet.CodecId()]
timestamps := s.streamEndInfo.Timestamps[packet.CodecID]
s.accumulateTimestamps = true
log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1])
}
track := s.originTracks.Find(packet.CodecId())
duration := packet.Duration(packet.Timebase())
track := s.originTracks.Find(packet.CodecID)
duration := packet.GetDuration(packet.Timebase)
// 根据duration来累加时间戳
if s.accumulateTimestamps {
offset := packet.Pts() - packet.Dts()
packet.SetDts(track.Dts + duration)
packet.SetPts(packet.Dts() + offset)
offset := packet.Pts - packet.Dts
packet.Dts = track.Dts + duration
packet.Pts = packet.Dts + offset
}
track.Dts = packet.Dts()
track.Pts = packet.Pts()
track.Dts = packet.Dts
track.Pts = packet.Pts
track.FrameDuration = int(duration)
}
func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
func (s *PublishSource) OnNewTrack(track avformat.Track) {
if AppConfig.Debug {
s.streamLogger.Path = "dump/" + strings.ReplaceAll(s.ID, "/", "_")
s.streamLogger.OnNewTrack(track)
}
stream := track.GetStream()
if s.completed {
log.Sugar.Warnf("添加%s track失败,已经WriteHeader. source: %s", stream.MediaType, s.ID)
return
} else if !s.NotTrackAdded(stream.Index) {
log.Sugar.Warnf("添加%s track失败,已经添加索引为%d的track. source: %s", stream.MediaType, stream.Index, s.ID)
return
}
s.originTracks.Add(NewTrack(stream, 0, 0))
s.allStreamTracks.Add(NewTrack(stream, 0, 0))
if utils.AVMediaTypeVideo == stream.MediaType {
s.existVideo = true
}
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
// 设置GOP缓存溢出回调
s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
}
}
func (s *PublishSource) OnTrackComplete() {
if AppConfig.Debug {
s.streamLogger.OnTrackComplete()
}
s.writeHeader()
}
func (s *PublishSource) OnTrackNotFind() {
if AppConfig.Debug {
s.streamLogger.OnTrackNotFind()
}
}
func (s *PublishSource) OnPacket(packet *avformat.AVPacket) {
if AppConfig.Debug {
s.streamLogger.OnPacket(packet)
}
// track超时忽略推流数据
if s.NotTrackAdded(packet.Index()) {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
if s.NotTrackAdded(packet.Index) {
s.TransDemuxer.DiscardBackPacket(packet.BufferIndex)
return
}
@@ -883,7 +871,7 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
}
// 遇到关键帧, 恢复推流
if utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame() && len(s.slowSinks) > 0 {
if utils.AVMediaTypeVideo == packet.MediaType && packet.Key && len(s.slowSinks) > 0 {
s.ResumeStreaming()
}
@@ -899,14 +887,10 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
// 未开启GOP缓存或只存在音频流, 释放掉内存
if !AppConfig.GOPCache || !s.existVideo {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
s.TransDemuxer.DiscardBackPacket(packet.BufferIndex)
}
}
func (s *PublishSource) OnDeMuxDone() {
}
func (s *PublishSource) GetType() SourceType {
return s.Type
}

View File

@@ -1,10 +1,7 @@
package stream
import (
"encoding/hex"
"fmt"
"github.com/lkmio/avformat/libavc"
"github.com/lkmio/avformat/libhevc"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"net/url"
@@ -94,7 +91,7 @@ func (s SessionState) String() string {
panic(fmt.Sprintf("unknown session state %d", s))
}
func Path2SourceId(path string, suffix string) (string, error) {
func Path2SourceID(path string, suffix string) (string, error) {
source := strings.TrimSpace(path)
if strings.HasPrefix(source, "/") {
source = source[1:]
@@ -129,95 +126,96 @@ func ParseUrl(name string) (string, url.Values) {
return name, nil
}
func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) {
var stream utils.AVStream
if utils.AVCodecIdH264 == codec {
//从关键帧中解析出sps和pps
if key && extractStream {
sps, pps, err := libavc.ParseExtraDataFromKeyNALU(data)
if err != nil {
log.Sugar.Errorf("从关键帧中解析sps pps失败 data:%s", hex.EncodeToString(data))
return nil, nil, err
}
codecData, err := utils.NewAVCCodecData(sps, pps)
if err != nil {
log.Sugar.Errorf("解析sps pps失败 data:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps))
return nil, nil, err
}
stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
}
} else if utils.AVCodecIdH265 == codec {
if key && extractStream {
vps, sps, pps, err := libhevc.ParseExtraDataFromKeyNALU(data)
if err != nil {
log.Sugar.Errorf("从关键帧中解析vps sps pps失败 data:%s", hex.EncodeToString(data))
return nil, nil, err
}
codecData, err := utils.NewHEVCCodecData(vps, sps, pps)
if err != nil {
log.Sugar.Errorf("解析sps pps失败 data:%s vps:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
return nil, nil, err
}
stream = utils.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
}
}
packet := utils.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, timebase)
return stream, packet, nil
}
func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error) {
var stream utils.AVStream
var packet utils.AVPacket
if utils.AVCodecIdAAC == codec {
//必须包含ADTSHeader
if len(data) < 7 {
return nil, nil, fmt.Errorf("need more data")
}
var skip int
header, err := utils.ReadADtsFixedHeader(data)
if err != nil {
log.Sugar.Errorf("读取ADTSHeader失败 data:%s", hex.EncodeToString(data[:7]))
return nil, nil, err
} else {
skip = 7
//跳过ADtsHeader长度
if header.ProtectionAbsent() == 0 {
skip += 2
}
}
if extractStream {
configData, err := utils.ADtsHeader2MpegAudioConfigData(header)
config, err := utils.ParseMpeg4AudioConfig(configData)
println(config)
if err != nil {
log.Sugar.Errorf("adt头转m4ac失败 data:%s", hex.EncodeToString(data[:7]))
return nil, nil, err
}
stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil)
}
packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, timebase)
} else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec {
if extractStream {
stream = utils.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil)
}
packet = utils.NewAudioPacket(data, dts, pts, codec, index, timebase)
}
return stream, packet, nil
}
//
//func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (*avformat.AVStream, *avformat.AVPacket, error) {
// var stream *avformat.AVStream
//
// if utils.AVCodecIdH264 == codec {
// //从关键帧中解析出sps和pps
// if key && extractStream {
// sps, pps, err := avc.ParseExtraDataFromKeyNALU(data)
// if err != nil {
// log.Sugar.Errorf("从关键帧中解析sps pps失败 data:%s", hex.EncodeToString(data))
// return nil, nil, err
// }
//
// codecData, err := utils.NewAVCCodecData(sps, pps)
// if err != nil {
// log.Sugar.Errorf("解析sps pps失败 data:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps))
// return nil, nil, err
// }
//
// stream = avformat.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
// }
//
// } else if utils.AVCodecIdH265 == codec {
// if key && extractStream {
// vps, sps, pps, err := hevc.ParseExtraDataFromKeyNALU(data)
// if err != nil {
// log.Sugar.Errorf("从关键帧中解析vps sps pps失败 data:%s", hex.EncodeToString(data))
// return nil, nil, err
// }
//
// codecData, err := utils.NewHEVCCodecData(vps, sps, pps)
// if err != nil {
// log.Sugar.Errorf("解析sps pps失败 data:%s vps:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps))
// return nil, nil, err
// }
//
// stream = avformat.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData)
// }
//
// }
//
// packet := avformat.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, timebase)
// return stream, packet, nil
//}
//
//func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (*avformat.AVStream, *avformat.AVPacket, error) {
// var stream *avformat.AVStream
// var packet *avformat.AVPacket
// if utils.AVCodecIdAAC == codec {
// //必须包含ADTSHeader
// if len(data) < 7 {
// return nil, nil, fmt.Errorf("need more data")
// }
//
// var skip int
// header, err := utils.ReadADtsFixedHeader(data)
// if err != nil {
// log.Sugar.Errorf("读取ADTSHeader失败 data:%s", hex.EncodeToString(data[:7]))
// return nil, nil, err
// } else {
// skip = 7
// //跳过ADtsHeader长度
// if header.ProtectionAbsent() == 0 {
// skip += 2
// }
// }
//
// if extractStream {
// configData, err := utils.ADtsHeader2MpegAudioConfigData(header)
// config, err := utils.ParseMpeg4AudioConfig(configData)
// println(config)
// if err != nil {
// log.Sugar.Errorf("adt头转m4ac失败 data:%s", hex.EncodeToString(data[:7]))
// return nil, nil, err
// }
//
// stream = avformat.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil)
// }
//
// packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, timebase)
// } else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec {
// if extractStream {
// stream = avformat.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil)
// }
//
// packet = utils.NewAudioPacket(data, dts, pts, codec, index, timebase)
// }
//
// return stream, packet, nil
//}
// StartReceiveDataTimer 启动收流超时计时器
func StartReceiveDataTimer(source Source) *time.Timer {
@@ -308,7 +306,7 @@ func LoopEvent(source Source) {
}
if err := source.Input(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", source.GetID(), err.Error())
log.Sugar.Errorf("解析推流数据发生err: %s 释放source: %s", err.Error(), source.GetID())
source.DoClose()
return
}

View File

@@ -1,7 +1,6 @@
package stream
import (
"github.com/lkmio/avformat/libhls"
"github.com/lkmio/avformat/utils"
"sync"
)
@@ -14,13 +13,13 @@ func init() {
streamEndInfoManager = &StreamEndInfoManager{sources: make(map[string]*StreamEndInfo, 32)}
}
// StreamEndInfo 保留结束推流Source的推流信息
// 在结束推流时,如果还有拉流端没有断开,则保留一些推流信息(目前有时间戳、ts切片序号等等)。在下次推流时,使用该时间戳作为新传输流的起始时间戳,保证拉流端在拉流时不会现pts和dts错误.
// 如果在此之前陆续有拉流端断开直至sink计数为0则会不再保留该信息。
// StreamEndInfo 保Source结束推流时的推流信息
// 在结束推流时,如果还有拉流端没有断开,则保留一些推流信息(时间戳、ts切片序号等等)。在下次推流时,使用该时间戳作为新传输流的起始时间戳,保证拉流端在拉流时不会现pts和dts错误.
// 如果重新推流之前陆续有拉流端断开直至sink计数为0删除保存的推流信息。
type StreamEndInfo struct {
ID string
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
M3U8Writer libhls.M3U8Writer // 保存M3U8生成器
M3U8Writer M3U8Writer // 保存M3U8生成器
PlaylistFormat *string // M3U8播放列表
RtspTracks map[byte]uint16 // rtsp每路track的结束序号
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
@@ -32,7 +31,7 @@ func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool {
}
for _, track := range tracks {
if _, ok := info.Timestamps[track.Stream.CodecId()]; !ok {
if _, ok := info.Timestamps[track.Stream.CodecID]; !ok {
return false
}
}

View File

@@ -1,8 +1,8 @@
package stream
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/log"
"github.com/lkmio/transport"
"net"
)

View File

@@ -1,14 +1,16 @@
package stream
import "github.com/lkmio/avformat/utils"
import (
"github.com/lkmio/avformat"
)
type Track struct {
Stream utils.AVStream
Stream *avformat.AVStream
Pts int64 // 最新的PTS
Dts int64 // 最新的DTS
FrameDuration int // 单帧时长, timebase和推流一致
}
func NewTrack(stream utils.AVStream, dts, pts int64) *Track {
func NewTrack(stream *avformat.AVStream, dts, pts int64) *Track {
return &Track{stream, dts, pts, 0}
}

View File

@@ -10,8 +10,8 @@ type TrackManager struct {
func (s *TrackManager) Add(track *Track) {
for _, t := range s.tracks {
utils.Assert(t.Stream.Type() != track.Stream.Type())
utils.Assert(t.Stream.CodecId() != track.Stream.CodecId())
utils.Assert(t.Stream.MediaType != track.Stream.MediaType)
utils.Assert(t.Stream.CodecID != track.Stream.CodecID)
}
s.tracks = append(s.tracks, track)
@@ -19,7 +19,7 @@ func (s *TrackManager) Add(track *Track) {
func (s *TrackManager) Find(id utils.AVCodecID) *Track {
for _, track := range s.tracks {
if track.Stream.CodecId() == id {
if track.Stream.CodecID == id {
return track
}
}
@@ -29,7 +29,7 @@ func (s *TrackManager) Find(id utils.AVCodecID) *Track {
func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track {
for _, track := range s.tracks {
if track.Stream.Type() == mediaType {
if track.Stream.MediaType == mediaType {
return track
}
}
@@ -40,7 +40,7 @@ func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track {
func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track {
var tracks []*Track
for _, track := range s.tracks {
if track.Stream.CodecId() == id {
if track.Stream.CodecID == id {
tracks = append(tracks, track)
}
}
@@ -51,7 +51,7 @@ func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track {
func (s *TrackManager) FindTracksWithType(mediaType utils.AVMediaType) []*Track {
var tracks []*Track
for _, track := range s.tracks {
if track.Stream.Type() == mediaType {
if track.Stream.MediaType == mediaType {
tracks = append(tracks, track)
}
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
)
@@ -11,7 +12,7 @@ type TransStream interface {
SetID(id TransStreamID)
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error)
AddTrack(track *Track) error
@@ -67,13 +68,13 @@ func (t *BaseTransStream) SetID(id TransStreamID) {
t.ID = id
}
func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) {
return nil, -1, false, nil
}
func (t *BaseTransStream) AddTrack(track *Track) error {
t.Tracks = append(t.Tracks, track)
if utils.AVMediaTypeVideo == track.Stream.Type() {
if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.ExistVideo = true
}
return nil

View File

@@ -57,7 +57,7 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
streamId = uint64(protocol) << 56
for i, track := range tracks {
id, ok := narrowCodecIds[int(track.Stream.CodecId())]
id, ok := narrowCodecIds[int(track.Stream.CodecID)]
utils.Assert(ok)
streamId |= uint64(id) << (48 - i*8)