Files
plugin-rtmp/netStream.go
2021-02-14 23:04:30 +08:00

291 lines
8.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rtmp
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync/atomic"
"time"
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
)
func ListenRtmp(addr string) error {
defer log.Println("rtmp server start!")
// defer fmt.Println("server start!")
listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}
var tempDelay time.Duration
for {
conn, err := listener.Accept()
conn.(*net.TCPConn).SetNoDelay(false)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
fmt.Printf("rtmp: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
tempDelay = 0
go processRtmp(conn)
}
return nil
}
var gstreamid = uint32(64)
func processRtmp(conn net.Conn) {
var stream *engine.Stream
streams := make(map[uint32]*engine.Subscriber)
defer func() {
conn.Close()
if stream != nil {
if stream.Publisher != nil {
stream.Publisher.Dispose()
}
stream.Close()
}
for _, s := range streams {
s.Close()
}
}()
nc := NetConnection{
ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32][]byte),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
nextStreamID: func() uint32 {
return atomic.AddUint32(&gstreamid, 1)
},
}
/* Handshake */
if utils.MayBeError(Handshake(nc.ReadWriter)) {
return
}
if utils.MayBeError(nc.OnConnect()) {
return
}
var rec_audio, rec_video func(*Chunk)
var abslouteTs uint32
rec_audio = func(msg *Chunk) {
va := engine.NewAudioTrack()
stream.OriginAudioTrack = va
var acodec string
tmp := msg.Body[0]
soundFormat := tmp >> 4
switch soundFormat {
case 10:
if msg.Body[1] != 0 {
return
}
acodec = "aac"
va.SoundFormat = soundFormat
config1, config2 := msg.Body[2], msg.Body[3]
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
va.SoundRate = codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]
va.SoundType = (config2 >> 3) & 0x0F //声道
//frameLengthFlag = (config2 >> 2) & 0x01
//dependsOnCoreCoder = (config2 >> 1) & 0x01
//extensionFlag = config2 & 0x01
va.RtmpTag = msg.Body
rec_audio = func(msg *Chunk) {
if msg.Timestamp == 0xffffff {
abslouteTs += msg.ExtendTimestamp
} else {
abslouteTs += msg.Timestamp // 绝对时间戳
}
va.Push(abslouteTs, msg.Body[2:])
}
stream.AddAudioTrack(acodec, va)
return
case 7:
acodec = "pcma"
case 8:
acodec = "pcmu"
}
if acodec != "" {
va.RtmpTag = msg.Body
va.SoundFormat = soundFormat
va.SoundRate = codec.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz
va.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
va.SoundType = tmp & 0x01 // 0 单声道1立体声
rec_audio = func(msg *Chunk) {
if msg.Timestamp == 0xffffff {
abslouteTs += msg.ExtendTimestamp
} else {
abslouteTs += msg.Timestamp // 绝对时间戳
}
va.Push(abslouteTs, msg.Body[1:])
}
stream.AddAudioTrack(acodec, va)
}
}
rec_video = func(msg *Chunk) {
codecId := msg.Body[0] & 0x0F
// 等待AVC序列帧
if codecId != 7 && codecId != 12 || msg.Body[1] != 0 {
return
}
vt := engine.NewVideoTrack()
vt.CodecID = codecId
stream.OriginVideoTrack = vt
var info codec.AVCDecoderConfigurationRecord
//0:codec,1:IsAVCSequence,2~4:compositionTime
if _, err := info.Unmarshal(msg.Body[5:]); err == nil {
vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.SPS = info.SequenceParameterSetNALUnit
vt.PPS = info.PictureParameterSetNALUnit
}
vt.RtmpTag = msg.Body
nalulenSize := int(info.LengthSizeMinusOne&3 + 1)
stream.AddVideoTrack("h264", vt)
rec_video = func(msg *Chunk) {
nalus := msg.Body[5:]
if msg.Timestamp == 0xffffff {
abslouteTs += msg.ExtendTimestamp
} else {
abslouteTs += msg.Timestamp // 绝对时间戳
}
for len(nalus) > nalulenSize {
nalulen := 0
for i := 0; i < nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1))
}
vt.Push(abslouteTs, nalus[nalulenSize:nalulen+nalulenSize])
nalus = nalus[nalulen+nalulenSize:]
}
}
}
for {
if msg, err := nc.RecvMessage(); err == nil {
if msg.MessageLength <= 0 {
continue
}
switch msg.MessageTypeID {
case RTMP_MSG_AMF0_COMMAND:
if msg.MsgData == nil {
break
}
cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName {
case "createStream":
nc.streamID = nc.nextStreamID()
log.Println("createStream:", nc.streamID)
err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
if utils.MayBeError(err) {
return
}
case "publish":
pm := msg.MsgData.(*PublishMessage)
streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0]
if pub := new(engine.Publisher); pub.Publish(streamPath) {
pub.Type = "RTMP"
stream = pub.Stream
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
}
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0]
nc.writeChunkSize = config.ChunkSize
subscriber := engine.Subscriber{
Type: "RTMP",
ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID),
}
if err = subscriber.Subscribe(streamPath); err == nil {
streams[nc.streamID] = &subscriber
err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize))
err = nc.SendMessage(SEND_STREAM_IS_RECORDED_MESSAGE, nil)
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
vt, at := subscriber.GetVideoTrack("h264"), subscriber.OriginAudioTrack
if vt != nil {
var lastVideoTime uint32
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag})
subscriber.OnVideo = func(pack engine.VideoPack) {
if lastVideoTime == 0 {
lastVideoTime = pack.Timestamp
}
t := pack.Timestamp - lastVideoTime
lastVideoTime = pack.Timestamp
payload := codec.Nalu2RTMPTag(pack.Payload)
defer utils.RecycleSlice(payload)
err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
}
}
if at != nil {
var lastAudioTime uint32
var aac byte
if at.SoundFormat == 10 {
aac = at.RtmpTag[0]
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag})
}
subscriber.OnAudio = func(pack engine.AudioPack) {
if lastAudioTime == 0 {
lastAudioTime = pack.Timestamp
}
t := pack.Timestamp - lastAudioTime
lastAudioTime = pack.Timestamp
payload := codec.Audio2RTMPTag(aac, pack.Payload)
defer utils.RecycleSlice(payload)
err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
}
}
go subscriber.Play(subscriber.Context, at, vt)
}
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := streams[cm.StreamId]; ok {
stream.Close()
delete(streams, cm.StreamId)
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0]
amfobj := newAMFObjects()
if s := engine.FindStream(streamPath); s != nil {
amfobj["level"] = "_result"
s.Close()
} else {
amfobj["level"] = "_error"
}
amfobj["tid"] = cm.TransactionId
err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
}
case RTMP_MSG_AUDIO:
rec_audio(msg)
case RTMP_MSG_VIDEO:
rec_video(msg)
}
msg.Recycle()
} else {
return
}
}
}