diff --git a/go.mod b/go.mod index acdcce5..f630a0a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Monibuca/plugin-rtmp/v3 go 1.13 require ( - github.com/Monibuca/engine/v3 v3.0.0-alpha3 - github.com/Monibuca/utils/v3 v3.0.0-alpha3 + github.com/Monibuca/engine/v3 v3.0.0-alpha4 + github.com/Monibuca/utils/v3 v3.0.0-alpha4 github.com/logrusorgru/aurora v2.0.3+incompatible ) diff --git a/go.sum b/go.sum index 7bbe7e5..8df1fd4 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,12 @@ github.com/Monibuca/engine/v3 v3.0.0-alpha2 h1:11rWc6Qnp3kuVwGh8XDXqRivUSWMyv+Au github.com/Monibuca/engine/v3 v3.0.0-alpha2/go.mod h1:K5FB3wk+iS/nPc+NS4XwObYQV4gtF6klEwDxaVM0BsQ= github.com/Monibuca/engine/v3 v3.0.0-alpha3 h1:NtFBMsu1nvEA09q64sW9xNzLdQ9RCKQXJlESM3GmGxU= github.com/Monibuca/engine/v3 v3.0.0-alpha3/go.mod h1:K5FB3wk+iS/nPc+NS4XwObYQV4gtF6klEwDxaVM0BsQ= +github.com/Monibuca/engine/v3 v3.0.0-alpha4 h1:kAStDd1p9tlSQeNyAAmb7vrPL2UCz7LFTzw5LdbGxBI= +github.com/Monibuca/engine/v3 v3.0.0-alpha4/go.mod h1:V0/kfen6K5O/RLXHPsZj4DF/LboDZ0OqfeCfn35bWMo= github.com/Monibuca/utils/v3 v3.0.0-alpha3 h1:n4Sq7mS1Iz8oBj2BcV4sXgKbZgix0fFLvjAfXYoiXl0= github.com/Monibuca/utils/v3 v3.0.0-alpha3/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= +github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04= +github.com/Monibuca/utils/v3 v3.0.0-alpha4/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= diff --git a/netStream.go b/netStream.go index b1dc76b..74d79a1 100644 --- a/netStream.go +++ b/netStream.go @@ -3,14 +3,15 @@ package rtmp import ( "bufio" "fmt" - "github.com/Monibuca/engine/v3" - "github.com/Monibuca/utils/v3" - "github.com/Monibuca/utils/v3/codec" "log" "net" "strings" "sync/atomic" "time" + + "github.com/Monibuca/engine/v3" + "github.com/Monibuca/utils/v3" + "github.com/Monibuca/utils/v3/codec" ) func ListenRtmp(addr string) error { @@ -83,14 +84,20 @@ func processRtmp(conn net.Conn) { return } var rec_audio, rec_video func(*Chunk) + var abslouteTs uint32 rec_audio = func(msg *Chunk) { - var ts_audio uint32 - va := stream.AudioTracks[0] + va := engine.NewAudioTrack() + stream.OriginAudioTrack = va + var acodec string tmp := msg.Body[0] - if va.SoundFormat = tmp >> 4; va.SoundFormat == 10 { + soundFormat := tmp >> 4 + switch soundFormat { + case 10: if msg.Body[1] != 0 { return } + acodec = "aac" + va.SoundFormat = soundFormat config1, config2 := msg.Body[2], msg.Body[3] //audioObjectType = (config1 & 0xF8) >> 3 // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 @@ -102,28 +109,48 @@ func processRtmp(conn net.Conn) { //frameLengthFlag = (config2 >> 2) & 0x01 //dependsOnCoreCoder = (config2 >> 1) & 0x01 //extensionFlag = config2 & 0x01 - } else { + va.RtmpTag = msg.Body + rec_audio = func(msg *Chunk) { + if msg.Timestamp == 0xffffff { + abslouteTs += msg.ExtendTimestamp + } else { + abslouteTs += msg.Timestamp // 绝对时间戳 + } + va.Push(abslouteTs, msg.Body[2:]) + } + stream.AddAudioTrack(acodec, va) + return + case 7: + acodec = "pcma" + case 8: + acodec = "pcmu" + } + if acodec != "" { + va.RtmpTag = msg.Body + va.SoundFormat = soundFormat va.SoundRate = codec.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz va.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples va.SoundType = tmp & 0x01 // 0 单声道,1立体声 - } - va.RtmpTag = msg.Body - rec_audio = func(msg *Chunk) { - if msg.Timestamp == 0xffffff { - ts_audio += msg.ExtendTimestamp - } else { - ts_audio += msg.Timestamp // 绝对时间戳 + rec_audio = func(msg *Chunk) { + if msg.Timestamp == 0xffffff { + abslouteTs += msg.ExtendTimestamp + } else { + abslouteTs += msg.Timestamp // 绝对时间戳 + } + va.Push(abslouteTs, msg.Body[1:]) } - stream.PushAudio(ts_audio, msg.Body[2:]) + stream.AddAudioTrack(acodec, va) } } rec_video = func(msg *Chunk) { + codecId := msg.Body[0] & 0x0F // 等待AVC序列帧 - if msg.Body[1] != 0 { + if codecId != 7 && codecId != 12 || msg.Body[1] != 0 { return } - vt := stream.VideoTracks[0] - var ts_video uint32 + vt := engine.NewVideoTrack() + vt.CodecID = codecId + stream.OriginVideoTrack = vt var info codec.AVCDecoderConfigurationRecord //0:codec,1:IsAVCSequence,2~4:compositionTime if _, err := info.Unmarshal(msg.Body[5:]); err == nil { @@ -133,23 +160,23 @@ func processRtmp(conn net.Conn) { } vt.RtmpTag = msg.Body nalulenSize := int(info.LengthSizeMinusOne&3 + 1) + stream.AddVideoTrack("h264", vt) rec_video = func(msg *Chunk) { nalus := msg.Body[5:] if msg.Timestamp == 0xffffff { - ts_video += msg.ExtendTimestamp + abslouteTs += msg.ExtendTimestamp } else { - ts_video += msg.Timestamp // 绝对时间戳 + abslouteTs += msg.Timestamp // 绝对时间戳 } for len(nalus) > nalulenSize { nalulen := 0 for i := 0; i < nalulenSize; i++ { nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1)) } - vt.Push(ts_video, nalus[nalulenSize:nalulen+nalulenSize]) + vt.Push(abslouteTs, nalus[nalulenSize:nalulen+nalulenSize]) nalus = nalus[nalulen+nalulenSize:] } } - close(vt.WaitFirst) } for { if msg, err := nc.RecvMessage(); err == nil { @@ -185,9 +212,10 @@ func processRtmp(conn net.Conn) { pm := msg.MsgData.(*PlayMessage) streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] nc.writeChunkSize = config.ChunkSize - var subscriber engine.Subscriber - subscriber.Type = "RTMP" - subscriber.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID) + subscriber := engine.Subscriber{ + Type: "RTMP", + ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID), + } if err = subscriber.Subscribe(streamPath); err == nil { streams[nc.streamID] = &subscriber err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize)) @@ -195,48 +223,40 @@ func processRtmp(conn net.Conn) { err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) - vt, at := subscriber.VideoTracks[0], subscriber.AudioTracks[0] - err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag}) - if at.SoundFormat == 10 { - err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag}) - } - var lastAudioTime, lastVideoTime uint32 - go (&engine.TrackCP{at, vt}).Play(subscriber.Context, func(pack engine.AudioPack) { - if lastAudioTime == 0 { - lastAudioTime = pack.Timestamp - } - t := pack.Timestamp - lastAudioTime - lastAudioTime = pack.Timestamp - l := len(pack.Payload) + 1 - if at.SoundFormat == 10 { - l++ - } - payload := utils.GetSlice(l) - defer utils.RecycleSlice(payload) - payload[0] = at.RtmpTag[0] - if at.SoundFormat == 10 { - payload[1] = 1 - } - copy(payload[2:], pack.Payload) - err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) - }, func(pack engine.VideoPack) { - if lastVideoTime == 0 { + vt, at := subscriber.GetVideoTrack("h264"), subscriber.OriginAudioTrack + if vt != nil { + var lastVideoTime uint32 + err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag}) + subscriber.OnVideo = func(pack engine.VideoPack) { + if lastVideoTime == 0 { + lastVideoTime = pack.Timestamp + } + t := pack.Timestamp - lastVideoTime lastVideoTime = pack.Timestamp + payload := codec.Nalu2RTMPTag(pack.Payload) + defer utils.RecycleSlice(payload) + err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) } - t := pack.Timestamp - lastVideoTime - lastVideoTime = pack.Timestamp - payload := utils.GetSlice(9 + len(pack.Payload)) - defer utils.RecycleSlice(payload) - if pack.NalType == codec.NALU_IDR_Picture { - payload[0] = 0x17 - } else { - payload[0] = 0x27 + } + if at != nil { + var lastAudioTime uint32 + var aac byte + if at.SoundFormat == 10 { + aac = at.RtmpTag[0] + err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag}) } - payload[1] = 0x01 - utils.BigEndian.PutUint32(payload[5:], uint32(len(pack.Payload))) - copy(payload[9:], pack.Payload) - err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) - }) + subscriber.OnAudio = func(pack engine.AudioPack) { + if lastAudioTime == 0 { + lastAudioTime = pack.Timestamp + } + t := pack.Timestamp - lastAudioTime + lastAudioTime = pack.Timestamp + payload := codec.Audio2RTMPTag(aac, pack.Payload) + defer utils.RecycleSlice(payload) + err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) + } + } + go subscriber.Play(subscriber.Context, at, vt) } case "closeStream": cm := msg.MsgData.(*CURDStreamMessage)