实现v4版本推流逻辑

This commit is contained in:
tianmiao
2022-04-07 21:31:45 +08:00
parent 01c36d591c
commit b8fcfb0213
7 changed files with 30 additions and 32 deletions

View File

@@ -203,7 +203,7 @@ func (channel *Channel) Invite(start, end string) (code int) {
fmt.Sprintf("o=%s 0 0 IN IP4 %s", d.Serial, d.SipIP),
"s=" + s,
"u=" + channel.DeviceID + ":0",
"c=IN IP4 " + d.SipIP,
"c=IN IP4 " + d.MediaIP,
fmt.Sprintf("t=%d %d", sint, eint),
fmt.Sprintf("m=video %d %sRTP/AVP 96", port, protocol),
"a=recvonly",
@@ -224,7 +224,7 @@ func (channel *Channel) Invite(start, end string) (code int) {
if response == nil {
return http.StatusRequestTimeout
}
fmt.Printf("Channel :%s invite response status code: %d\n", channel.DeviceID, response.GetStatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, response.GetStatusCode()))
if response.GetStatusCode() == 200 {
ds := strings.Split(response.Body, "\r\n")
@@ -241,10 +241,11 @@ func (channel *Channel) Invite(start, end string) (code int) {
}
publisher := &GBPublisher{
StreamPath: streamPath,
config: config,
}
if config.UdpCacheSize > 0 && !config.IsMediaNetworkTCP() {
publisher.udpCache = utils.NewPqRtp()
}
// if config.UdpCacheSize > 0 && !config.IsMediaNetworkTCP() {
// publisher.udpCache = utils.NewPqRtp()
// }
publishers := &config.publishers
if start == "" {
publisher.Type = "GB28181 Live"

View File

@@ -59,6 +59,7 @@ type Device struct {
to *sip.Contact
Addr string
SipIP string //暴露的IP
MediaIP string //Media Server 暴露的IP
SourceAddr net.Addr
channelMap map[string]*Channel
channelMutex sync.RWMutex
@@ -76,7 +77,7 @@ func (config *GB28181Config) StoreDevice(id string, s *transaction.Core, req *si
d.UpdateTime = time.Now()
d.from = &sip.Contact{Uri: req.StartLine.Uri, Params: make(map[string]string)}
d.to = req.To
d.Addr = req.Via.GetSendBy()
d.Addr = req.SourceAdd.String()
//TODO: Should we send GetDeviceInf request?
//message := d.CreateMessage(sip.MESSAGE)
//message.Body = sip.GetDeviceInfoXML(d.ID)
@@ -97,8 +98,9 @@ func (config *GB28181Config) StoreDevice(id string, s *transaction.Core, req *si
Core: s,
from: &sip.Contact{Uri: req.StartLine.Uri, Params: make(map[string]string)},
to: req.To,
Addr: req.Via.GetSendBy(),
Addr: req.SourceAdd.String(),
SipIP: config.SipIP,
MediaIP: config.MediaIP,
channelMap: make(map[string]*Channel),
config: config,
}
@@ -276,7 +278,7 @@ func (d *Device) Catalog() int {
func (d *Device) QueryDeviceInfo(req *sip.Request) {
for i := time.Duration(5); i < 100; i++ {
fmt.Printf("device.QueryDeviceInfo:%s ipaddr:%s", d.ID, d.Addr)
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s", d.ID, d.Addr))
time.Sleep(time.Second * i)
requestMsg := d.CreateMessage(sip.MESSAGE)
requestMsg.ContentType = "Application/MANSCDP+xml"
@@ -291,7 +293,7 @@ func (d *Device) QueryDeviceInfo(req *sip.Request) {
d.SipIP = response.Via.Params["received"]
}
if response.GetStatusCode() != 200 {
fmt.Printf("device %s send Catalog : %d\n", d.ID, response.GetStatusCode())
plugin.Error(fmt.Sprintf("device %s send Catalog : %d\n", d.ID, response.GetStatusCode()))
} else {
d.Subscribe()
break

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"github.com/logrusorgru/aurora"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/sip"
"m7s.live/plugin/gb28181/v4/transaction"
"m7s.live/plugin/gb28181/v4/utils"
@@ -71,9 +70,6 @@ func (config *GB28181Config) OnRegister(req *sip.Request, tx *transaction.GBTx)
}
}
func (config *GB28181Config) OnMessage(req *sip.Request, tx *transaction.GBTx) {
plugin.Debug("OnMessage", zap.String("event", req.Message.Event))
if v, ok := Devices.Load(req.From.Uri.UserInfo()); ok {
d := v.(*Device)
d.SourceAddr = req.SourceAdd

View File

@@ -15,6 +15,7 @@ type GB28181Config struct {
AutoInvite bool
AutoCloseAfter int
PreFetchRecord bool
UdpCacheSize int
config.Publish
Server
transaction.Config
@@ -52,6 +53,7 @@ var conf = &GB28181Config{
AutoInvite: true,
AutoCloseAfter: -1,
PreFetchRecord: false,
UdpCacheSize: 0,
Server: Server{
MediaNetwork: "udp",
},

View File

@@ -23,12 +23,12 @@ type GBPublisher struct {
}
func (p *GBPublisher) PushVideo(ts uint32, cts uint32, payload []byte) {
p.VideoTrack.WriteAnnexB(ts, cts, payload)
//p.pushVideo(ts, cts, payload)
//p.VideoTrack.WriteAnnexB(ts, cts, payload)
p.pushVideo(ts, cts, payload)
}
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
p.VideoTrack.WriteAVCC(ts, payload)
//p.pushAudio(ts, payload)
//p.VideoTrack.WriteAVCC(ts, payload)
p.pushAudio(ts, payload)
}
func (p *GBPublisher) Publish() (result bool) {
@@ -49,6 +49,7 @@ func (p *GBPublisher) Publish() (result bool) {
p.pushAudio = func(ts uint32, payload common.AVCCFrame) {
switch p.parser.AudioStreamType {
case utils.G711A:
case utils.G711A + 1:
at := NewG711(p.Stream, true)
at.SampleRate = 8000
at.SampleSize = 16
@@ -64,7 +65,6 @@ func (p *GBPublisher) Publish() (result bool) {
}
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
plugin.Debug("收到了推送流")
originRtp := *rtp
if p.config.UdpCacheSize > 0 && !p.config.IsMediaNetworkTCP() {
//序号小于第一个包的丢弃,rtp包序号达到65535后会从0开始所以这里需要判断一下

View File

@@ -9,6 +9,7 @@ import (
. "github.com/logrusorgru/aurora"
"github.com/pion/rtp"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/sip"
"m7s.live/plugin/gb28181/v4/transaction"
)
@@ -55,7 +56,7 @@ func (p *Publishers) Get(key uint32) *GBPublisher {
func (config *GB28181Config) startServer() {
config.publishers.data = make(map[uint32]*GBPublisher)
fmt.Println(Green("server gb28181 start at"), BrightBlue(config.SipIP+":"+strconv.Itoa(int(config.SipPort))))
plugin.Info(fmt.Sprint(Green("Server gb28181 start at"), BrightBlue(config.SipIP+":"+strconv.Itoa(int(config.SipPort)))))
s := transaction.NewCore(&config.Config)
s.RegistHandler(sip.REGISTER, config.OnRegister)
@@ -80,7 +81,7 @@ func (config *GB28181Config) startServer() {
go removeBanDevice(config)
}
s.StartAndWait()
go s.StartAndWait()
}
func (config *GB28181Config) startMediaServer() {
@@ -127,16 +128,17 @@ func listenMediaUDP(config *GB28181Config) {
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
fmt.Printf("listen udp %s err: %v", addr, err)
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return
}
bufUDP := make([]byte, networkBuffer)
fmt.Printf("udp server start listen video port[%d]", config.MediaPort)
defer fmt.Printf("udp server stop listen video port[%d]", config.MediaPort)
plugin.Info("Media udp server start.", zap.Uint16("port", config.MediaPort))
defer plugin.Info("Media udp server stop", zap.Uint16("port", config.MediaPort))
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
fmt.Println("gb28181 decode rtp error:", err)
plugin.Error("Decode rtp error:", zap.Error(err))
}
if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Err() == nil {
publisher.PushPS(&rtpPacket)

View File

@@ -159,7 +159,7 @@ again:
return err
}
var video []byte
var nextStartCode, videoTs, videoCts uint32
var nextStartCode uint32
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
@@ -172,19 +172,14 @@ loop:
case StartCodeMAP:
err = dec.decProgramStreamMap()
case StartCodeVideo:
var cts uint32
if err = dec.decPESPacket(); err == nil {
if len(video) == 0 {
if dec.PTS == 0 {
dec.PTS = ts
}
if dec.DTS != 0 {
cts = dec.PTS - dec.DTS
} else {
if dec.DTS == 0 {
dec.DTS = dec.PTS
}
videoTs = dec.DTS / 90
videoCts = cts / 90
}
video = append(video, dec.Payload...)
} else {
@@ -207,7 +202,7 @@ loop:
}
}
if len(video) > 0 {
pusher.PushVideo(videoTs, videoCts, video)
pusher.PushVideo(dec.PTS, dec.DTS, video)
}
if nextStartCode == StartCodePS {
fmt.Println(aurora.Red("StartCodePS recursion..."), err)