gb使用receivebuffer

This commit is contained in:
yangjiechina
2024-06-17 22:59:03 +08:00
parent 89de34bd98
commit cb4eed8389
12 changed files with 289 additions and 267 deletions

15
api.go
View File

@@ -98,7 +98,6 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
//请求参数
v := &struct {
Source string `json:"source"` //SourceId
Transport string `json:"transport,omitempty"`
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
}{}
@@ -129,9 +128,17 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
return
}
tcp := strings.Contains(v.Transport, "tcp")
tcp := true
var active bool
if tcp && "active" == v.Setup {
if v.Setup == "passive" {
} else if v.Setup == "active" {
active = true
} else {
tcp = false
//udp收流
}
if tcp && active {
if !stream.AppConfig.GB28181.IsMultiPort() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 单端口模式下不能主动拉流"}
} else if !tcp {
@@ -143,8 +150,6 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
active = true
}
_, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active)

View File

@@ -40,9 +40,7 @@ type GBSource interface {
TransportType() TransportType
PrepareTransDeMuxer(id string, ssrc uint32)
PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource)
PreparePublish(conn net.Conn, ssrc uint32, source GBSource)
SetConn(conn net.Conn)
@@ -58,106 +56,6 @@ type BaseGBSource struct {
ssrc uint32
transport transport.ITransport
receiveBuffer *stream.ReceiveBuffer
}
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint16, error) {
if tcp {
utils.Assert(stream.AppConfig.GB28181.EnableTCP())
} else {
utils.Assert(stream.AppConfig.GB28181.EnableUDP())
}
if active {
utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort())
}
var source GBSource
var port uint16
var err error
if active {
source, port, err = NewActiveSource()
} else if tcp {
source = NewPassiveSource()
} else {
source = NewUDPSource()
}
if err != nil {
return nil, 0, err
}
//单端口模式绑定ssrc
if !stream.AppConfig.GB28181.IsMultiPort() {
var success bool
if tcp {
success = SharedTCPServer.filter.AddSource(ssrc, source)
} else {
success = SharedUDPServer.filter.AddSource(ssrc, source)
}
if !success {
return nil, 0, fmt.Errorf("ssrc conflict")
}
port = stream.AppConfig.GB28181.Port[0]
} else if !active {
if tcp {
err := TransportManger.AllocTransport(true, func(port_ uint16) error {
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_))
server, err := NewTCPServer(addr, NewSingleFilter(source))
if err != nil {
return err
}
source.(*PassiveSource).transport = server.tcp
port = port_
return nil
})
if err != nil {
return nil, 0, err
}
} else {
err := TransportManger.AllocTransport(false, func(port_ uint16) error {
addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_))
server, err := NewUDPServer(addr, NewSingleFilter(source))
if err != nil {
return err
}
source.(*UDPSource).transport = server.udp
port = port_
return nil
})
if err != nil {
return nil, 0, err
}
}
}
source.PrepareTransDeMuxer(id, ssrc)
_, state := stream.PreparePublishSource(source, false)
if utils.HookStateOK != state {
return nil, 0, fmt.Errorf("error code %d", state)
}
var bufferBlockCount int
if active || tcp {
bufferBlockCount = stream.ReceiveBufferTCPBlockCount
} else {
bufferBlockCount = stream.ReceiveBufferUdpBlockCount
}
source.SetType(stream.SourceType28181)
source.Init(source.Input, source.Close, bufferBlockCount)
go source.LoopEvent()
return source, port, err
}
func (source *BaseGBSource) InputRtp(pkt *rtp.Packet) error {
@@ -168,14 +66,14 @@ func (source *BaseGBSource) Transport() TransportType {
panic("implement me")
}
func (source *BaseGBSource) PrepareTransDeMuxer(id string, ssrc uint32) {
source.Id_ = id
source.ssrc = ssrc
func (source *BaseGBSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) {
source.deMuxerCtx = libmpeg.NewPSDeMuxerContext(make([]byte, PsProbeBufferSize))
source.deMuxerCtx.SetHandler(source)
source.SetType(stream.SourceType28181)
source.PublishSource.Init(inputCB, closeCB, receiveQueueSize)
}
// Input 输入PS流
// Input 解析PS流, 确保在loop event协程调用此函数
func (source *BaseGBSource) Input(data []byte) error {
return source.deMuxerCtx.Input(data)
}
@@ -345,18 +243,9 @@ func (source *BaseGBSource) SetSSRC(ssrc uint32) {
source.ssrc = ssrc
}
func (source *BaseGBSource) SetReceiveBuffer(buffer *stream.ReceiveBuffer) {
source.receiveBuffer = buffer
}
func (source *BaseGBSource) ReceiveBuffer() *stream.ReceiveBuffer {
return source.receiveBuffer
}
func (source *BaseGBSource) PreparePublishSource(conn net.Conn, ssrc uint32, source_ GBSource) {
func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ GBSource) {
source.SetConn(conn)
source.SetSSRC(ssrc)
source.SetState(stream.SessionStateTransferring)
if stream.AppConfig.Hook.EnablePublishEvent() {
@@ -372,3 +261,102 @@ func (source *BaseGBSource) PreparePublishSource(conn net.Conn, ssrc uint32, sou
}()
}
}
// NewGBSource 创建gb源,返回监听的收流端口
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint16, error) {
if tcp {
utils.Assert(stream.AppConfig.GB28181.EnableTCP())
} else {
utils.Assert(stream.AppConfig.GB28181.EnableUDP())
}
if active {
utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort())
}
var source GBSource
var port uint16
var err error
if active {
source, port, err = NewActiveSource()
} else if tcp {
source = NewPassiveSource()
} else {
source = NewUDPSource()
}
if err != nil {
return nil, 0, err
}
//单端口模式绑定ssrc
if !stream.AppConfig.GB28181.IsMultiPort() {
var success bool
if tcp {
success = SharedTCPServer.filter.AddSource(ssrc, source)
} else {
success = SharedUDPServer.filter.AddSource(ssrc, source)
}
if !success {
return nil, 0, fmt.Errorf("ssrc conflict")
}
port = stream.AppConfig.GB28181.Port[0]
} else if !active {
if tcp {
err := TransportManger.AllocTransport(true, func(port_ uint16) error {
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_))
server, err := NewTCPServer(addr, NewSingleFilter(source))
if err != nil {
return err
}
source.(*PassiveSource).transport = server.tcp
port = port_
return nil
})
if err != nil {
return nil, 0, err
}
} else {
err := TransportManger.AllocTransport(false, func(port_ uint16) error {
addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_))
server, err := NewUDPServer(addr, NewSingleFilter(source))
if err != nil {
return err
}
source.(*UDPSource).transport = server.udp
port = port_
return nil
})
if err != nil {
return nil, 0, err
}
}
}
var bufferBlockCount int
if active || tcp {
bufferBlockCount = stream.ReceiveBufferTCPBlockCount
} else {
bufferBlockCount = stream.ReceiveBufferUdpBlockCount
}
source.SetId(id)
source.SetSSRC(ssrc)
source.Init(source.Input, source.Close, bufferBlockCount)
if _, state := stream.PreparePublishSource(source, false); utils.HookStateOK != state {
return nil, 0, fmt.Errorf("error code %d", state)
}
go source.LoopEvent()
return source, port, err
}

View File

@@ -16,8 +16,8 @@ func (t PassiveSource) TransportType() TransportType {
return TransportTypeTCPPassive
}
// InputRtp tcp收流,直接解析ps流.
func (t PassiveSource) InputRtp(pkt *rtp.Packet) error {
//TCP收流, 解析rtp后直接送给ps解析
t.Input(pkt.Payload)
return nil
}

View File

@@ -25,7 +25,7 @@ func (u UDPSource) TransportType() TransportType {
return TransportTypeUDP
}
// InputRtp UDP收流会先拷贝rtp包,交给jitter buffer处理后再发给source
// InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source
func (u UDPSource) InputRtp(pkt *rtp.Packet) error {
block := u.receiveBuffer.GetBlock()
@@ -33,12 +33,9 @@ func (u UDPSource) InputRtp(pkt *rtp.Packet) error {
pkt.Payload = block[:len(pkt.Payload)]
u.jitterBuffer.Push(pkt)
for {
pkt, _ := u.jitterBuffer.Pop()
if pkt == nil {
return nil
for rtp, _ := u.jitterBuffer.Pop(); rtp != nil; rtp, _ = u.jitterBuffer.Pop() {
u.PublishSource.Input(rtp.Payload)
}
u.PublishSource.Input(pkt.Payload)
}
return nil
}

View File

@@ -1,10 +1,8 @@
package gb28181
import (
"github.com/pion/rtp"
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
@@ -13,10 +11,46 @@ type TCPServer struct {
filter Filter
}
type TCPSession struct {
source GBSource
decoder *transport.LengthFieldFrameDecoder
receiveBuffer *stream.ReceiveBuffer
func (T *TCPServer) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
session := NewTCPSession(conn, T.filter)
con.Data = session
//TCP使用ReceiveBuffer区别在于,多端口模式从第一包就使用ReceiveBuffer, 单端口模式先解析出ssrc, 找到source. 后续再使用ReceiveBuffer.
if session.source != nil {
return session.receiveBuffer.GetBlock()
}
return nil
}
func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
session := conn.(*transport.Conn).Data.(*TCPSession)
//单端口收流
if session.source == nil {
//直接传给解码器, 先根据ssrc找到source. 后续还是会直接传给source
session.Input(data)
} else {
session.source.(*PassiveSource).PublishSource.Input(data)
}
if session.source != nil {
return session.receiveBuffer.GetBlock()
}
return nil
}
func (T *TCPServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
if con.Data != nil && con.Data.(*TCPSession).source != nil {
con.Data.(*TCPSession).source.Close()
}
con.Data = nil
}
func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) {
@@ -33,81 +67,3 @@ func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) {
server.tcp = tcp
return server, nil
}
func (T *TCPServer) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
session := &TCPSession{}
if stream.AppConfig.GB28181.IsMultiPort() {
session.source = T.filter.(*singleFilter).source
session.source.SetConn(con)
session.receiveBuffer = stream.NewTCPReceiveBuffer()
}
session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) {
packet := rtp.Packet{}
err := packet.Unmarshal(bytes)
if err != nil {
log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
return
}
//单端口模式,ssrc匹配source
if session.source == nil {
//匹配不到直接关闭链接
source := T.filter.FindSource(packet.SSRC)
if source == nil {
conn.Close()
return
}
session.source = source
session.receiveBuffer = stream.NewTCPReceiveBuffer()
session.source.SetConn(con)
//直接丢给ps解析器, 虽然是非线程安全, 但是是阻塞执行的, 不会和后续走loop event的包冲突
session.source.InputRtp(&packet)
}
if stream.SessionStateHandshakeDone == session.source.State() {
session.source.PreparePublishSource(conn, packet.SSRC, session.source)
}
session.source.InputRtp(&packet)
})
con.Data = session
if session.source != nil {
return session.receiveBuffer.GetBlock()
}
return nil
}
func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
session := conn.(*transport.Conn).Data.(*TCPSession)
//单端口收流
if session.source == nil {
//直接传给解码器, 先根据ssrc找到source. 后续还是会直接传给source
if err := session.decoder.Input(data); err != nil {
conn.Close()
}
} else {
session.source.Input(data)
}
return session.receiveBuffer.GetBlock()
}
func (T *TCPServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
if con.Data != nil && con.Data.(*TCPSession).source != nil {
con.Data.(*TCPSession).source.Close()
}
con.Data = nil
}

77
gb28181/tcp_session.go Normal file
View File

@@ -0,0 +1,77 @@
package gb28181
import (
"encoding/hex"
"github.com/pion/rtp"
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
type TCPSession struct {
conn net.Conn
source GBSource
decoder *transport.LengthFieldFrameDecoder
receiveBuffer *stream.ReceiveBuffer
}
// Input 处理source收到的流
func (t *TCPSession) Input(data []byte) error {
if err := t.decoder.Input(data); err != nil {
t.conn.Close()
}
return nil
}
func (t *TCPSession) Init(source GBSource) {
t.source = source
t.source.SetConn(t.conn)
//重新设置收流回调
t.source.SetInputCb(t.Input)
t.receiveBuffer = stream.NewTCPReceiveBuffer()
}
func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
session := &TCPSession{
conn: conn,
}
if stream.AppConfig.GB28181.IsMultiPort() {
session.Init(filter.(*singleFilter).source)
}
session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) {
packet := rtp.Packet{}
err := packet.Unmarshal(bytes)
if err != nil {
log.Sugar.Errorf("解析rtp失败 err:%s conn:%s data:%s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
//单端口模式,ssrc匹配source
if session.source == nil {
//匹配不到直接关闭链接
source := filter.FindSource(packet.SSRC)
if source == nil {
log.Sugar.Errorf("gb28181推流失败 ssrc:%x配置不到source conn:%s data:%s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
session.Init(source)
}
if stream.SessionStateHandshakeDone == session.source.State() {
session.source.PreparePublish(conn, packet.SSRC, session.source)
}
session.source.InputRtp(&packet)
})
return session
}

View File

@@ -13,6 +13,36 @@ type UDPServer struct {
filter Filter
}
func (U UDPServer) OnConnected(conn net.Conn) []byte {
return nil
}
func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
packet := rtp.Packet{}
err := packet.Unmarshal(data)
if err != nil {
log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
return nil
}
source := U.filter.FindSource(packet.SSRC)
if source == nil {
log.Sugar.Errorf("ssrc匹配source失败 ssrc:%x conn:%s", packet.SSRC, conn.RemoteAddr().String())
return nil
}
if stream.SessionStateHandshakeDone == source.State() {
source.PreparePublish(conn, packet.SSRC, source)
}
source.InputRtp(&packet)
return nil
}
func (U UDPServer) OnDisConnected(conn net.Conn, err error) {
}
func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) {
server := &UDPServer{
filter: filter,
@@ -26,34 +56,3 @@ func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) {
server.udp = udp
return server, nil
}
func (U UDPServer) OnConnected(conn net.Conn) []byte {
return nil
}
func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
packet := rtp.Packet{}
err := packet.Unmarshal(data)
if err != nil {
log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
return nil
}
source := U.filter.FindSource(packet.SSRC)
if source == nil {
log.Sugar.Errorf("ssrc匹配source失败 ssrc:%x conn:%s", packet.SSRC, conn.RemoteAddr().String())
return nil
}
if stream.SessionStateHandshakeDone == source.State() {
source.PreparePublishSource(conn, packet.SSRC, source)
}
source.InputRtp(&packet)
return nil
}
func (U UDPServer) OnDisConnected(conn net.Conn, err error) {
}

View File

@@ -50,8 +50,6 @@ func (s *server) OnConnected(conn net.Conn) []byte {
}
func (s *server) OnPacket(conn net.Conn, data []byte) []byte {
log.Sugar.Infof("rtmp包大小:%d", len(data))
t := conn.(*transport.Conn)
session := t.Data.(*Session)
err := session.Input(conn, data)

View File

@@ -38,7 +38,7 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState)
//设置推流的音视频回调
s.stack.SetOnPublishHandler(source)
//初始化放在add source前面, 以防add-init空窗期, 拉流队列空指针.
//初始化放在add source前面, 以防add后再init,空窗期拉流队列空指针.
source.Init(source.Input, source.Close, stream.ReceiveBufferTCPBlockCount)
//推流事件Source统一处理, 是否已经存在, Hook回调....

View File

@@ -66,7 +66,7 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
if err == nil {
//创建rtp udp server
sender.Rtp = &transport.UDPTransport{}
sender.Rtp = &transport.UDPServer{}
sender.Rtp.SetHandler2(nil, sender.OnRTPPacket, nil)
err = sender.Rtp.Bind(addr)
}
@@ -80,7 +80,7 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
if err == nil {
//创建rtcp udp server
sender.Rtcp = &transport.UDPTransport{}
sender.Rtcp = &transport.UDPServer{}
sender.Rtcp.SetHandler2(nil, sender.OnRTCPPacket, nil)
err = sender.Rtcp.Bind(addr)
} else {

View File

@@ -1,9 +1,9 @@
package stream
const (
ReceiveBufferUdpBlockCount = 200
ReceiveBufferUdpBlockCount = 300
ReceiveBufferTCPBlockCount = 100
ReceiveBufferTCPBlockCount = 50
)
// ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率.
@@ -19,7 +19,7 @@ type ReceiveBuffer struct {
func (r *ReceiveBuffer) GetBlock() []byte {
bytes := r.data[r.index*r.blockSize:]
r.index = r.index + 1%r.blockCount
r.index = (r.index + 1) % r.blockCount
return bytes[:r.blockSize]
}

View File

@@ -3,7 +3,6 @@ package stream
import (
"fmt"
"github.com/yangjiechina/lkm/log"
"math"
"net"
"time"
@@ -56,6 +55,8 @@ type Source interface {
// Id Source的唯一ID/**
Id() string
SetId(id string)
// Input 输入推流数据
//@Return bool fatal error.释放Source
Input(data []byte) error
@@ -173,6 +174,10 @@ func (s *PublishSource) Id() string {
return s.Id_
}
func (s *PublishSource) SetId(id string) {
s.Id_ = id
}
func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) {
s.inputCB = inputCB
s.closeCB = closeCB
@@ -234,7 +239,7 @@ func (s *PublishSource) LoopEvent() {
for {
select {
case data := <-s.inputDataEvent:
if !s.closed {
if s.closed {
break
}
@@ -242,10 +247,6 @@ func (s *PublishSource) LoopEvent() {
s.lastPacketTime = time.Now()
}
if s.state == SessionStateHandshakeDone {
s.state = SessionStateTransferring
}
if err := s.inputCB(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error())
s.Close()
@@ -622,7 +623,8 @@ func (s *PublishSource) StartReceiveDataTimer() {
}
}
s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis))))
//对精度没要求
s.receiveDataTimer.Reset(time.Duration(AppConfig.ReceiveTimeout))
})
}
@@ -643,7 +645,7 @@ func (s *PublishSource) StartIdleTimer() {
}
}
s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis)))))
s.idleTimer.Reset(time.Duration(AppConfig.IdleTimeout))
})
}