diff --git a/go.mod b/go.mod index f630a0a..922351d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Monibuca/plugin-rtmp/v3 go 1.13 require ( - github.com/Monibuca/engine/v3 v3.0.0-alpha4 + github.com/Monibuca/engine/v3 v3.0.0-alpha5 github.com/Monibuca/utils/v3 v3.0.0-alpha4 github.com/logrusorgru/aurora v2.0.3+incompatible ) diff --git a/go.sum b/go.sum index 8df1fd4..defb7af 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/Monibuca/engine/v3 v3.0.0-alpha3 h1:NtFBMsu1nvEA09q64sW9xNzLdQ9RCKQXJ github.com/Monibuca/engine/v3 v3.0.0-alpha3/go.mod h1:K5FB3wk+iS/nPc+NS4XwObYQV4gtF6klEwDxaVM0BsQ= github.com/Monibuca/engine/v3 v3.0.0-alpha4 h1:kAStDd1p9tlSQeNyAAmb7vrPL2UCz7LFTzw5LdbGxBI= github.com/Monibuca/engine/v3 v3.0.0-alpha4/go.mod h1:V0/kfen6K5O/RLXHPsZj4DF/LboDZ0OqfeCfn35bWMo= +github.com/Monibuca/engine/v3 v3.0.0-alpha5 h1:sXLg39SBkeiVkGcbcinXuRrYyClJvMVirxA5axiUD7s= +github.com/Monibuca/engine/v3 v3.0.0-alpha5/go.mod h1:V0/kfen6K5O/RLXHPsZj4DF/LboDZ0OqfeCfn35bWMo= github.com/Monibuca/utils/v3 v3.0.0-alpha3 h1:n4Sq7mS1Iz8oBj2BcV4sXgKbZgix0fFLvjAfXYoiXl0= github.com/Monibuca/utils/v3 v3.0.0-alpha3/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04= diff --git a/netConnection.go b/netConnection.go index 0fd2376..d4c2d1e 100644 --- a/netConnection.go +++ b/netConnection.go @@ -395,7 +395,7 @@ func (conn *NetConnection) sendAVMessage(ts uint32, payload []byte, isAudio bool return err } - // 如果音视频数据太大,一次发送不完,那么在这里进行分割(data + Chunk Basic Header(1)) + // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) for need != nil && len(need) > 0 { if need, err = conn.encodeChunk1(head, need, conn.writeChunkSize); err != nil { return err diff --git a/netStream.go b/netStream.go index 74d79a1..291b51a 100644 --- a/netStream.go +++ b/netStream.go @@ -56,9 +56,6 @@ func processRtmp(conn net.Conn) { defer func() { conn.Close() if stream != nil { - if stream.Publisher != nil { - stream.Publisher.Dispose() - } stream.Close() } for _, s := range streams { @@ -87,8 +84,6 @@ func processRtmp(conn net.Conn) { var abslouteTs uint32 rec_audio = func(msg *Chunk) { va := engine.NewAudioTrack() - stream.OriginAudioTrack = va - var acodec string tmp := msg.Body[0] soundFormat := tmp >> 4 switch soundFormat { @@ -96,7 +91,6 @@ func processRtmp(conn net.Conn) { if msg.Body[1] != 0 { return } - acodec = "aac" va.SoundFormat = soundFormat config1, config2 := msg.Body[2], msg.Body[3] //audioObjectType = (config1 & 0xF8) >> 3 @@ -118,14 +112,9 @@ func processRtmp(conn net.Conn) { } va.Push(abslouteTs, msg.Body[2:]) } - stream.AddAudioTrack(acodec, va) + stream.SetOriginAT(va) return - case 7: - acodec = "pcma" - case 8: - acodec = "pcmu" - } - if acodec != "" { + case 7, 8: va.RtmpTag = msg.Body va.SoundFormat = soundFormat va.SoundRate = codec.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz @@ -139,7 +128,7 @@ func processRtmp(conn net.Conn) { } va.Push(abslouteTs, msg.Body[1:]) } - stream.AddAudioTrack(acodec, va) + stream.SetOriginAT(va) } } rec_video = func(msg *Chunk) { @@ -150,17 +139,15 @@ func processRtmp(conn net.Conn) { } vt := engine.NewVideoTrack() vt.CodecID = codecId - stream.OriginVideoTrack = vt + vt.RtmpTag = msg.Body var info codec.AVCDecoderConfigurationRecord //0:codec,1:IsAVCSequence,2~4:compositionTime if _, err := info.Unmarshal(msg.Body[5:]); err == nil { - vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit) - vt.SPS = info.SequenceParameterSetNALUnit - vt.PPS = info.PictureParameterSetNALUnit + vt.Push(0, info.SequenceParameterSetNALUnit) + vt.Push(0, info.PictureParameterSetNALUnit) } - vt.RtmpTag = msg.Body nalulenSize := int(info.LengthSizeMinusOne&3 + 1) - stream.AddVideoTrack("h264", vt) + stream.SetOriginVT(vt) rec_video = func(msg *Chunk) { nalus := msg.Body[5:] if msg.Timestamp == 0xffffff { @@ -224,39 +211,38 @@ func processRtmp(conn net.Conn) { err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) vt, at := subscriber.GetVideoTrack("h264"), subscriber.OriginAudioTrack + var lastTimeStamp uint32 if vt != nil { - var lastVideoTime uint32 err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag}) subscriber.OnVideo = func(pack engine.VideoPack) { - if lastVideoTime == 0 { - lastVideoTime = pack.Timestamp + if lastTimeStamp == 0 { + lastTimeStamp = pack.Timestamp } - t := pack.Timestamp - lastVideoTime - lastVideoTime = pack.Timestamp + t := pack.Timestamp - lastTimeStamp + lastTimeStamp = pack.Timestamp payload := codec.Nalu2RTMPTag(pack.Payload) defer utils.RecycleSlice(payload) err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) } } if at != nil { - var lastAudioTime uint32 var aac byte if at.SoundFormat == 10 { aac = at.RtmpTag[0] err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag}) } subscriber.OnAudio = func(pack engine.AudioPack) { - if lastAudioTime == 0 { - lastAudioTime = pack.Timestamp + if lastTimeStamp == 0 { + lastTimeStamp = pack.Timestamp } - t := pack.Timestamp - lastAudioTime - lastAudioTime = pack.Timestamp + t := pack.Timestamp - lastTimeStamp + lastTimeStamp = pack.Timestamp payload := codec.Audio2RTMPTag(aac, pack.Payload) defer utils.RecycleSlice(payload) err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) } } - go subscriber.Play(subscriber.Context, at, vt) + go subscriber.Play(at, vt) } case "closeStream": cm := msg.MsgData.(*CURDStreamMessage)