内存复用

This commit is contained in:
langhuihui
2020-05-24 22:58:38 +08:00
parent eaddc60775
commit f5bdd6a298
7 changed files with 87 additions and 108 deletions

View File

@@ -307,38 +307,30 @@ func (client *RTSP) startStream() {
Printf("io.ReadFull err:%v", err)
return
}
rtpBuf := content
var pack *RTPPack
switch channel {
case client.aRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBuf,
Type: RTP_TYPE_AUDIO,
}
case client.aRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBuf,
Type: RTP_TYPE_AUDIOCONTROL,
}
case client.vRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBuf,
Type: RTP_TYPE_VIDEO,
}
case client.vRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBuf,
Type: RTP_TYPE_VIDEOCONTROL,
}
default:
Printf("unknow rtp pack type, channel:%v", channel)
continue
}
if pack == nil {
Printf("session tcp got nil rtp pack")
continue
}
pack.Unmarshal(content)
//if client.debugLogEnable {
// rtp := ParseRTP(pack.Buffer)
// if rtp != nil {
@@ -357,7 +349,7 @@ func (client *RTSP) startStream() {
//}
client.InBytes += int(length + 4)
client.handleRTP(pack)
client.HandleRTP(pack)
default: // rtsp
builder := bytes.Buffer{}

6
go.mod
View File

@@ -3,12 +3,14 @@ module github.com/Monibuca/plugin-rtsp
go 1.13
require (
github.com/Monibuca/engine/v2 v2.0.0
github.com/Monibuca/engine/v2 v2.0.1
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.3 // indirect
github.com/jinzhu/gorm v1.9.12 // indirect
github.com/pixelbender/go-sdp v1.0.0
github.com/pion/rtcp v1.2.2 // indirect
github.com/pion/rtp v1.5.4
github.com/pixelbender/go-sdp v1.0.0 // indirect
github.com/reactivex/rxgo v1.0.0 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
)

6
go.sum
View File

@@ -7,6 +7,8 @@ github.com/Monibuca/engine v1.2.1/go.mod h1:WbDkXENLjcPjyjCR1Mix1GA+uAlwORkv/+8a
github.com/Monibuca/engine v1.2.2 h1:hNjsrZpOmui8lYhgCJ5ltJU8g/k0Rrdysx2tHNGGnbI=
github.com/Monibuca/engine/v2 v2.0.0 h1:8FjaScrtN8QdbcxO9zZYABMC0Re3I1O1T4p94zAXYb0=
github.com/Monibuca/engine/v2 v2.0.0/go.mod h1:34EYjjV15G6myuHOKaJkO7y5tJ1Arq/NfC9Weacr2mc=
github.com/Monibuca/engine/v2 v2.0.1 h1:z27YwXVSelYgGfbrx5RmJ+ROveIf5QWGnwZqfOdqhM0=
github.com/Monibuca/engine/v2 v2.0.1/go.mod h1:34EYjjV15G6myuHOKaJkO7y5tJ1Arq/NfC9Weacr2mc=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
@@ -50,6 +52,10 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v2.0.1+incompatible h1:xQ15muvnzGBHpIpdrNi1DA5x0+TcBZzsIDwmw9uTHzw=
github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/pion/rtcp v1.2.2 h1:q0aGpfC8ahggd4ASrLudlecf99BUOGKk8IUrQvHY42Y=
github.com/pion/rtcp v1.2.2/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.5.4 h1:PuNg6xqV3brIUihatcKZj1YDUs+M45L0ZbrZWYtkDxY=
github.com/pion/rtp v1.5.4/go.mod h1:bg60AL5GotNOlYZsqycbhDtEV3TkfbpXG0KBiUq29Mg=
github.com/pixelbender/go-sdp v1.0.0 h1:hLP2ALBN4sLpgp2r3EDcFUSN3AyOkg1jonuWEJniotY=
github.com/pixelbender/go-sdp v1.0.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

67
main.go
View File

@@ -192,25 +192,21 @@ func (conn *RichConn) Write(b []byte) (n int, err error) {
}
return conn.Conn.Write(b)
}
func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts int64) {
func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts uint32) {
rtsp.SyncCount++
vl := len(payload)
switch nalType {
// case NALU_SPS:
// r := bytes.NewBuffer([]byte{})
// r.Write(RTMP_AVC_HEAD)
// util.BigEndian.PutUint16(spsHead[1:], uint16(vl))
// r.Write(spsHead)
// r.Write(payload)
// case NALU_PPS:
// r := bytes.NewBuffer([]byte{})
// util.BigEndian.PutUint16(ppsHead[1:], uint16(vl))
// r.Write(ppsHead)
// r.Write(payload)
// rtsp.PushVideo(0, r.Bytes())
// avcsent = true
case NALU_SPS:
rtsp.SPS = append([]byte{}, payload...)
case NALU_PPS:
rtsp.PPS = append([]byte{}, payload...)
case NALU_Access_Unit_Delimiter:
case NALU_IDR_Picture:
if !rtsp.avcsent {
if rtsp.SPS == nil || rtsp.PPS == nil {
break
}
r := bytes.NewBuffer([]byte{})
r.Write(RTMP_AVC_HEAD)
spsHead := []byte{0xE1, 0, 0}
@@ -224,7 +220,7 @@ func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts int64) {
rtsp.PushVideo(0, r.Bytes())
rtsp.avcsent = true
}
r := bytes.NewBuffer([]byte{})
r := rtsp.GetBuffer()
iframeHead := []byte{0x17, 0x01, 0, 0, 0}
util.BigEndian.PutUint24(iframeHead[2:], 0)
r.Write(iframeHead)
@@ -232,10 +228,10 @@ func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts int64) {
util.BigEndian.PutUint32(nalLength, uint32(vl))
r.Write(nalLength)
r.Write(payload)
rtsp.PushVideo(uint32(ts), r.Bytes())
rtsp.PushVideo(ts, r.Bytes())
case NALU_Non_IDR_Picture:
if rtsp.avcsent {
r := bytes.NewBuffer([]byte{})
r := rtsp.GetBuffer()
pframeHead := []byte{0x27, 0x01, 0, 0, 0}
util.BigEndian.PutUint24(pframeHead[2:], 0)
r.Write(pframeHead)
@@ -243,21 +239,21 @@ func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts int64) {
util.BigEndian.PutUint32(nalLength, uint32(vl))
r.Write(nalLength)
r.Write(payload)
rtsp.PushVideo(uint32(ts), r.Bytes())
rtsp.PushVideo(ts, r.Bytes())
}
default:
Println(nalType)
}
}
func (rtsp *RTSP) handleRTP(pack *RTPPack) {
data := pack.Buffer
func (rtsp *RTSP) HandleRTP(pack *RTPPack) {
switch pack.Type {
case RTP_TYPE_AUDIO:
if !rtsp.aacsent {
rtsp.PushAudio(0, append([]byte{0xAF, 0x00}, rtsp.AudioSpecificConfig...))
rtsp.aacsent = true
}
cc := data[0] & 0xF
rtphdr := 12 + cc*4
payload := data[rtphdr:]
payload := pack.Payload
auHeaderLen := (int16(payload[0]) << 8) + int16(payload[1])
auHeaderLen = auHeaderLen >> 3
auHeaderCount := int(auHeaderLen / 2)
@@ -275,21 +271,13 @@ func (rtsp *RTSP) handleRTP(pack *RTPPack) {
startOffset = startOffset + auLen
}
case RTP_TYPE_VIDEO:
cc := data[0] & 0xF
//rtp header
rtphdr := 12 + cc*4
//packet time
ts := (int64(data[4]) << 24) + (int64(data[5]) << 16) + (int64(data[6]) << 8) + (int64(data[7]))
//packet number
//packno := (int64(data[6]) << 8) + int64(data[7])
data = data[rtphdr:]
ts := pack.Timestamp
data := pack.Payload
Println(len(data))
nalType := data[0] & 0x1F
if nalType >= 1 && nalType <= 23 {
rtsp.handleNALU(nalType, data, ts)
} else if nalType == 28 {
} else if nalType == 28 { //FU-A
isStart := data[1]&0x80 != 0
isEnd := data[1]&0x40 != 0
nalType := data[1] & 0x1F
@@ -303,6 +291,15 @@ func (rtsp *RTSP) handleRTP(pack *RTPPack) {
rtsp.fuBuffer[0] = nal
rtsp.handleNALU(nalType, rtsp.fuBuffer, ts)
}
} else if nalType == 24 { //STAP-A
var naluLen uint16
for data = data[1:]; len(data) > 3; data = data[naluLen+2:] {
naluLen = (uint16(data[0]) << 8) + uint16(data[1])
nalType = data[2] & 0x1F
rtsp.handleNALU(nalType, data[3:naluLen+2], ts)
}
} else {
Println("not support yet ", nalType)
}
}
}

View File

@@ -13,12 +13,13 @@ import (
"time"
. "github.com/Monibuca/engine/v2"
"github.com/pion/rtp"
"github.com/teris-io/shortid"
)
type RTPPack struct {
Type RTPType
Buffer []byte
Type RTPType
rtp.Packet
}
type SessionType int
@@ -128,53 +129,41 @@ func (session *RTSP) AcceptPush() {
}
channel := int(buf1)
rtpLen := int(binary.BigEndian.Uint16(buf2))
pack := new(RTPPack)
rtpBytes := make([]byte, rtpLen)
if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil {
Println(err)
return
}
var pack *RTPPack
if err = pack.Unmarshal(rtpBytes); err != nil {
Println(err)
return
}
switch channel {
case session.aRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBytes,
}
pack.Type = RTP_TYPE_AUDIO
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an audio RTP package")
timer = time.Now()
}
case session.aRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBytes,
}
pack.Type = RTP_TYPE_AUDIOCONTROL
case session.vRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBytes,
}
pack.Type = RTP_TYPE_VIDEO
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an video RTP package")
timer = time.Now()
}
case session.vRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBytes,
}
pack.Type = RTP_TYPE_VIDEOCONTROL
default:
Printf("unknow rtp pack type, %v", pack.Type)
continue
}
if pack == nil {
Printf("session tcp got nil rtp pack")
continue
}
session.InBytes += rtpLen + 4
session.handleRTP(pack)
session.HandleRTP(pack)
} else { // rtsp cmd
reqBuf := bytes.NewBuffer(nil)
reqBuf.WriteByte(buf1)
@@ -577,7 +566,7 @@ func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
return
}
err = session.UDPClient.SendRTP(pack)
session.OutBytes += len(pack.Buffer)
session.OutBytes += len(pack.Raw)
return
}
switch pack.Type {
@@ -588,12 +577,12 @@ func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_AUDIOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
@@ -601,12 +590,12 @@ func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_VIDEO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
@@ -614,12 +603,12 @@ func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_VIDEOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
@@ -627,12 +616,12 @@ func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
session.OutBytes += len(pack.Raw) + 4
default:
err = fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type)
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"net"
"strings"
. "github.com/Monibuca/engine/v2"
)
@@ -151,7 +152,7 @@ func (c *UDPClient) SendRTP(pack *RTPPack) (err error) {
return
}
if _, err = conn.Write(pack.Buffer);err != nil {
if _, err = conn.Write(pack.Raw); err != nil {
err = fmt.Errorf("udp client write bytes error, %v", err)
return
}

View File

@@ -12,7 +12,7 @@ import (
)
type UDPServer struct {
Session *RTSP
Session *RTSP
UDPClient
sync.Mutex
}
@@ -29,7 +29,7 @@ func (s *UDPServer) HandleRTP(pack *RTPPack) {
s.Lock()
defer s.Unlock()
if s.Session != nil {
s.Session.handleRTP(pack)
s.Session.HandleRTP(pack)
}
}
@@ -90,13 +90,11 @@ func (s *UDPServer) SetupAudio() (err error) {
Printf("Package recv from AConn.len:%d\n", n)
timer = time.Now()
}
rtpBytes := make([]byte, n)
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBytes,
Type: RTP_TYPE_AUDIO,
}
pack.Unmarshal(bufUDP)
s.HandleRTP(pack)
} else {
Println("udp server read audio pack error", err)
@@ -131,13 +129,11 @@ func (s *UDPServer) SetupAudio() (err error) {
for !s.Stoped {
if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil {
//Printf("Package recv from AControlConn.len:%d\n", n)
rtpBytes := make([]byte, n)
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBytes,
Type: RTP_TYPE_AUDIOCONTROL,
}
pack.Unmarshal(bufUDP)
s.HandleRTP(pack)
} else {
Println("udp server read audio control pack error", err)
@@ -182,13 +178,11 @@ func (s *UDPServer) SetupVideo() (err error) {
Printf("Package recv from VConn.len:%d\n", n)
timer = time.Now()
}
rtpBytes := make([]byte, n)
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBytes,
Type: RTP_TYPE_VIDEO,
}
pack.Unmarshal(bufUDP)
s.HandleRTP(pack)
} else {
Println("udp server read video pack error", err)
@@ -224,13 +218,11 @@ func (s *UDPServer) SetupVideo() (err error) {
for !s.Stoped {
if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil {
//Printf("Package recv from VControlConn.len:%d\n", n)
rtpBytes := make([]byte, n)
s.AddInputBytes(n)
copy(rtpBytes, bufUDP)
pack := &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBytes,
Type: RTP_TYPE_VIDEOCONTROL,
}
pack.Unmarshal(bufUDP)
s.HandleRTP(pack)
} else {
Println("udp server read video control pack error", err)