mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3046bcde3 | ||
|
|
9c970ad282 | ||
|
|
c4de92e9f6 | ||
|
|
cf5a803971 | ||
|
|
f487be5fdb | ||
|
|
bd70d24a16 | ||
|
|
708cd042df |
132
channel.go
132
channel.go
@@ -1,9 +1,7 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -155,64 +153,8 @@ func (channel *Channel) Control(PTZCmd string) int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
ssrc := make([]byte, 10)
|
||||
if o.IsLive() {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
o.ssrc = string(ssrc)
|
||||
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
|
||||
o.SSRC = uint32(_ssrc)
|
||||
}
|
||||
|
||||
//Invite 发送Invite报文,注意里面的锁保证不同时发送invite报文,该锁由channel持有
|
||||
// Invite 发送Invite报文 invites a channel to play
|
||||
// 注意里面的锁保证不同时发送invite报文,该锁由channel持有
|
||||
/***
|
||||
f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
|
||||
各项具体含义:
|
||||
@@ -257,13 +199,13 @@ f = v/a/编码格式/码率大小/采样率
|
||||
f字段中视、音频参数段之间不需空格分割。
|
||||
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
|
||||
*/
|
||||
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
if opt.IsLive() {
|
||||
if !channel.liveInviteLock.TryLock() {
|
||||
return 304, nil
|
||||
}
|
||||
defer func() {
|
||||
if code != 200 {
|
||||
if code != OK {
|
||||
channel.liveInviteLock.Unlock()
|
||||
}
|
||||
}()
|
||||
@@ -280,10 +222,6 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
// size := 1
|
||||
// fps := 15
|
||||
// bitrate := 200
|
||||
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
@@ -294,35 +232,27 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if conf.tcpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenTCP()
|
||||
if err != nil {
|
||||
return 500, err
|
||||
return ServerInternalError, err
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
publisher.DisableReorder = true
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenUDP()
|
||||
if err != nil {
|
||||
code = 500
|
||||
code = ServerInternalError
|
||||
return
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
}
|
||||
// if opt.MediaPort == 0 {
|
||||
// opt.MediaPort = conf.MediaPort
|
||||
// if conf.IsMediaNetworkTCP() {
|
||||
// protocol = "TCP/"
|
||||
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
|
||||
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
// channel.tcpPortIndex = 0
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", d.ID, d.mediaIP),
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
|
||||
"s=" + s,
|
||||
"u=" + channel.DeviceID + ":0",
|
||||
"c=IN IP4 " + d.mediaIP,
|
||||
@@ -353,7 +283,8 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
}
|
||||
code = int(publisher.inviteRes.StatusCode())
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
|
||||
if code == 200 {
|
||||
|
||||
if code == OK {
|
||||
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
|
||||
for _, l := range ds {
|
||||
if ls := strings.Split(l, "="); len(ls) > 1 {
|
||||
@@ -371,14 +302,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
publisher.udpCache = utils.NewPqRtp()
|
||||
}
|
||||
if err = plugin.Publish(streamPath, publisher); err != nil {
|
||||
code = 403
|
||||
code = ServerInternalError
|
||||
return
|
||||
}
|
||||
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
|
||||
srv.Send(ack)
|
||||
} else if opt.IsLive() && conf.AutoInvite {
|
||||
} else if channel.CanInvite() {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
channel.Invite(InviteOptions{})
|
||||
channel.TryAutoInvite()
|
||||
})
|
||||
}
|
||||
return
|
||||
@@ -398,3 +329,38 @@ func (channel *Channel) Bye(live bool) int {
|
||||
}
|
||||
return 404
|
||||
}
|
||||
|
||||
func (channel *Channel) TryAutoInvite() {
|
||||
if conf.AutoInvite && channel.CanInvite() {
|
||||
go channel.Invite(&InviteOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *Channel) CanInvite() bool {
|
||||
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
|
||||
return false
|
||||
}
|
||||
|
||||
if conf.InviteIDs == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
// 11~13位是设备类型编码
|
||||
typeID := channel.DeviceID[10:13]
|
||||
|
||||
// format: start-end,type1,type2
|
||||
tokens := strings.Split(conf.InviteIDs, ",")
|
||||
for _, tok := range tokens {
|
||||
if first, second, ok := strings.Cut(tok, "-"); ok {
|
||||
if typeID >= first && typeID <= second {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if typeID == first {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
115
const.go
Normal file
115
const.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package gb28181
|
||||
|
||||
const (
|
||||
Trying = 100
|
||||
Ringing = 180
|
||||
CallIsBeingForwarded = 181
|
||||
Queued = 182
|
||||
SessionProgress = 183
|
||||
OK = 200
|
||||
Accepted = 202
|
||||
MultipleChoices = 300
|
||||
MovedPermanently = 301
|
||||
MovedTemporarily = 302
|
||||
UseProxy = 305
|
||||
AlternativeService = 380
|
||||
BadRequest = 400
|
||||
Unauthorized = 401
|
||||
PaymentRequired = 402
|
||||
Forbidden = 403
|
||||
NotFound = 404
|
||||
MethodNotAllowed = 405
|
||||
NotAcceptable = 406
|
||||
ProxyAuthenticationRequired = 407
|
||||
RequestTimeout = 408
|
||||
Gone = 410
|
||||
RequestEntityTooLarge = 413
|
||||
RequestURITooLong = 414
|
||||
UnsupportedMediaType = 415
|
||||
UnsupportedURIScheme = 416
|
||||
BadExtension = 420
|
||||
ExtensionRequired = 421
|
||||
IntervalTooBrief = 423
|
||||
TemporarilyUnavailable = 480
|
||||
CallTransactionDoesNotExist = 481
|
||||
LoopDetected = 482
|
||||
TooManyHops = 483
|
||||
AddressIncomplete = 484
|
||||
Ambiguous = 485
|
||||
BusyHere = 486
|
||||
RequestTerminated = 487
|
||||
NotAcceptableHere = 488
|
||||
BadEvent = 489
|
||||
RequestPending = 491
|
||||
Undecipherable = 493
|
||||
ServerInternalError = 500
|
||||
NotImplemented = 501
|
||||
BadGateway = 502
|
||||
ServiceUnavailable = 503
|
||||
ServerTim = 504
|
||||
VersionNotSupported = 505
|
||||
MessageTooLarge = 513
|
||||
BusyEverywhere = 600
|
||||
Decline = 603
|
||||
DoesNotExistAnywhere = 604
|
||||
SessionNotAcceptable = 606
|
||||
)
|
||||
|
||||
var reasons = map[int]string{
|
||||
100: "Trying",
|
||||
180: "Ringing",
|
||||
181: "Call Is Being Forwarded",
|
||||
182: "Queued",
|
||||
183: "Session Progress",
|
||||
200: "OK",
|
||||
202: "Accepted",
|
||||
300: "Multiple Choices",
|
||||
301: "Moved Permanently",
|
||||
302: "Moved Temporarily",
|
||||
305: "Use Proxy",
|
||||
380: "Alternative Service",
|
||||
400: "Bad Request",
|
||||
401: "Unauthorized",
|
||||
402: "Payment Required",
|
||||
403: "Forbidden",
|
||||
404: "Not Found",
|
||||
405: "Method Not Allowed",
|
||||
406: "Not Acceptable",
|
||||
407: "Proxy Authentication Required",
|
||||
408: "Request Timeout",
|
||||
410: "Gone",
|
||||
413: "Request Entity Too Large",
|
||||
414: "Request-URI Too Long",
|
||||
415: "Unsupported Media Type",
|
||||
416: "Unsupported URI Scheme",
|
||||
420: "Bad Extension",
|
||||
421: "Extension Required",
|
||||
423: "Interval Too Brief",
|
||||
480: "Temporarily Unavailable",
|
||||
481: "Call transaction Does Not Exist",
|
||||
482: "Loop Detected",
|
||||
483: "Too Many Hops",
|
||||
484: "Address Incomplete",
|
||||
485: "Ambiguous",
|
||||
486: "Busy Here",
|
||||
487: "Request Terminated",
|
||||
488: "Not Acceptable Here",
|
||||
489: "Bad Event",
|
||||
491: "Request Pending",
|
||||
493: "Undecipherable",
|
||||
500: "Server Internal Error",
|
||||
501: "Not Implemented",
|
||||
502: "Bad Gateway",
|
||||
503: "Service Unavailable",
|
||||
504: "Server Tim",
|
||||
505: "Version Not Supported",
|
||||
513: "message Too Large",
|
||||
600: "Busy Everywhere",
|
||||
603: "Decline",
|
||||
604: "Does Not Exist Anywhere",
|
||||
606: "SESSION NOT ACCEPTABLE",
|
||||
}
|
||||
|
||||
func Explain(statusCode int) string {
|
||||
return reasons[statusCode]
|
||||
}
|
||||
27
device.go
27
device.go
@@ -4,13 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"golang.org/x/exp/maps"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -240,12 +241,14 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
if c.ParentID != "" {
|
||||
path := strings.Split(c.ParentID, "/")
|
||||
parentId := path[len(path)-1]
|
||||
if c.DeviceID != parentId {
|
||||
if v, ok := Devices.Load(parentId); ok {
|
||||
parent := v.(*Device)
|
||||
parent.addOrUpdateChannel(c)
|
||||
continue
|
||||
}
|
||||
//如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。
|
||||
// 暂时不考虑级联目录的实现
|
||||
if d.ID != parentId {
|
||||
//if v, ok := Devices.Load(parentId); ok {
|
||||
// parent := v.(*Device)
|
||||
// parent.addOrUpdateChannel(c)
|
||||
continue
|
||||
//}
|
||||
}
|
||||
}
|
||||
//本设备增加通道
|
||||
@@ -260,9 +263,7 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
}
|
||||
if conf.AutoInvite && (c.LivePublisher == nil) {
|
||||
go c.Invite(InviteOptions{})
|
||||
}
|
||||
c.TryAutoInvite()
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
@@ -360,7 +361,7 @@ func (d *Device) Subscribe() int {
|
||||
|
||||
response, err := d.SipRequestForResponse(request)
|
||||
if err == nil && response != nil {
|
||||
if response.StatusCode() == 200 {
|
||||
if response.StatusCode() == OK {
|
||||
callId, _ := request.CallID()
|
||||
d.subscriber.CallID = string(*callId)
|
||||
} else {
|
||||
@@ -408,7 +409,7 @@ func (d *Device) QueryDeviceInfo() {
|
||||
// d.SipIP = received.String()
|
||||
// }
|
||||
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
|
||||
if response.StatusCode() == 200 {
|
||||
if response.StatusCode() == OK {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -436,7 +437,7 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
|
||||
|
||||
response, err := d.SipRequestForResponse(mobilePosition)
|
||||
if err == nil && response != nil {
|
||||
if response.StatusCode() == 200 {
|
||||
if response.StatusCode() == OK {
|
||||
callId, _ := mobilePosition.CallID()
|
||||
d.subscriber.CallID = callId.String()
|
||||
} else {
|
||||
|
||||
@@ -185,9 +185,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
go d.syncChannels()
|
||||
} else {
|
||||
for _, ch := range d.channelMap {
|
||||
if c.AutoInvite && (ch.LivePublisher == nil) {
|
||||
ch.Invite(InviteOptions{})
|
||||
}
|
||||
ch.TryAutoInvite()
|
||||
}
|
||||
}
|
||||
//为什么要查找子码流?
|
||||
|
||||
65
inviteoption.go
Normal file
65
inviteoption.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
ssrc := make([]byte, 10)
|
||||
if o.IsLive() {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
o.ssrc = string(ssrc)
|
||||
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
|
||||
o.SSRC = uint32(_ssrc)
|
||||
}
|
||||
58
main.go
58
main.go
@@ -12,44 +12,45 @@ import (
|
||||
|
||||
type GB28181PositionConfig struct {
|
||||
AutosubPosition bool //是否自动订阅定位
|
||||
Expires time.Duration //订阅周期(单位:秒)
|
||||
Interval time.Duration //订阅间隔(单位:秒)
|
||||
Expires time.Duration `default:"3600s"` //订阅周期(单位:秒)
|
||||
Interval time.Duration `default:"6s"` //订阅间隔(单位:秒)
|
||||
}
|
||||
|
||||
type GB28181Config struct {
|
||||
AutoInvite bool
|
||||
AutoInvite bool `default:"true"`
|
||||
PreFetchRecord bool
|
||||
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
|
||||
|
||||
//sip服务器的配置
|
||||
SipNetwork string //传输协议,默认UDP,可选TCP
|
||||
SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP
|
||||
SipIP string //sip 服务器公网IP
|
||||
SipPort uint16 //sip 服务器端口,默认 5060
|
||||
Serial string //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string //sip 服务器域,默认 3402000000
|
||||
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
|
||||
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
|
||||
Username string //sip 服务器账号
|
||||
Password string //sip 服务器密码
|
||||
|
||||
// AckTimeout uint16 //sip 服务应答超时,单位秒
|
||||
RegisterValidity time.Duration //注册有效期,单位秒,默认 3600
|
||||
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
|
||||
// RegisterInterval int //注册间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration //心跳间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration `default:"60s"` //心跳间隔,单位秒,默认 60
|
||||
// HeartbeatRetry int //心跳超时次数,默认 3
|
||||
|
||||
//媒体服务器配置
|
||||
MediaIP string //媒体服务器地址
|
||||
MediaPort uint16 //媒体服务器端口
|
||||
MediaNetwork string //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPort uint16 `default:"58200"` //媒体服务器端口
|
||||
MediaNetwork string `default:"udp"` //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPortMin uint16
|
||||
MediaPortMax uint16
|
||||
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
|
||||
RemoveBanInterval time.Duration //移除禁止设备间隔
|
||||
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
|
||||
UdpCacheSize int //udp缓存大小
|
||||
|
||||
config.Publish
|
||||
Server
|
||||
LogLevel string //trace, debug, info, warn, error, fatal, panic
|
||||
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
|
||||
@@ -80,33 +81,6 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
|
||||
return strings.ToLower(c.MediaNetwork) == "tcp"
|
||||
}
|
||||
|
||||
var conf = &GB28181Config{
|
||||
AutoInvite: true,
|
||||
PreFetchRecord: false,
|
||||
UdpCacheSize: 0,
|
||||
SipNetwork: "udp",
|
||||
SipIP: "",
|
||||
SipPort: 5060,
|
||||
Serial: "34020000002000000001",
|
||||
Realm: "3402000000",
|
||||
Username: "",
|
||||
Password: "",
|
||||
var conf GB28181Config
|
||||
|
||||
// AckTimeout: 10,
|
||||
RegisterValidity: 60 * time.Second,
|
||||
// RegisterInterval: 60,
|
||||
HeartbeatInterval: 60 * time.Second,
|
||||
// HeartbeatRetry: 3,
|
||||
|
||||
MediaIP: "",
|
||||
MediaPort: 58200,
|
||||
// MediaIdleTimeout: 30,
|
||||
MediaNetwork: "udp",
|
||||
|
||||
RemoveBanInterval: 600 * time.Second,
|
||||
LogLevel: "info",
|
||||
// WaitKeyFrame: true,
|
||||
Position: GB28181PositionConfig{AutosubPosition: false, Expires: 3600 * time.Second, Interval: 6 * time.Second},
|
||||
}
|
||||
|
||||
var plugin = InstallPlugin(conf)
|
||||
var plugin = InstallPlugin(&conf)
|
||||
|
||||
104
publisher.go
104
publisher.go
@@ -12,26 +12,19 @@ import (
|
||||
"github.com/pion/rtp/v2"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegps"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
)
|
||||
|
||||
type GBPublisher struct {
|
||||
Publisher
|
||||
InviteOptions
|
||||
PSPublisher
|
||||
*InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
parser mpegps.MpegPsStream
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
reorder util.RTPReorder[*rtp.Packet]
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
@@ -69,7 +62,7 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
go p.channel.Invite(InviteOptions{})
|
||||
go p.channel.Invite(&InviteOptions{})
|
||||
}
|
||||
case SEclose, SEKick:
|
||||
if p.IsLive() {
|
||||
@@ -106,100 +99,11 @@ func (p *GBPublisher) Bye() int {
|
||||
resp, err := p.channel.device.SipRequestForResponse(bye)
|
||||
if err != nil {
|
||||
p.Error("Bye", zap.Error(err))
|
||||
return 500
|
||||
return ServerInternalError
|
||||
}
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
|
||||
if p.VideoTrack == nil {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
default:
|
||||
//推测编码类型
|
||||
var maybe264 H264NALUType
|
||||
maybe264 = maybe264.Parse(es.Buffer[4])
|
||||
switch maybe264 {
|
||||
case NALU_Non_IDR_Picture,
|
||||
NALU_IDR_Picture,
|
||||
NALU_SEI,
|
||||
NALU_SPS,
|
||||
NALU_PPS,
|
||||
NALU_Access_Unit_Delimiter:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
default:
|
||||
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
payload, pts, dts := es.Buffer, es.PTS, es.DTS
|
||||
if len(payload) > 10 {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
} else {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
|
||||
}
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
// if binary.BigEndian.Uint32(payload) != 1 {
|
||||
// panic("not annexb")
|
||||
// }
|
||||
p.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
func (p *GBPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
|
||||
ts, payload := es.PTS, es.Buffer
|
||||
if p.AudioTrack == nil {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, true)
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, false)
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(ts, payload)
|
||||
case 0: //推测编码类型
|
||||
if payload[0] == 0xff && payload[1]>>4 == 0xf {
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(ts, payload)
|
||||
}
|
||||
default:
|
||||
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
|
||||
}
|
||||
} else if es.Type == mpegts.STREAM_TYPE_AAC {
|
||||
p.WriteADTS(ts, payload)
|
||||
} else {
|
||||
p.WriteRaw(ts, payload)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
if p.parser.EsHandler == nil {
|
||||
p.parser.EsHandler = p
|
||||
p.lastSeq = rtp.SequenceNumber - 1
|
||||
}
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
p.parser.Feed(rtp.Payload)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
|
||||
if rtp.SequenceNumber != p.lastSeq+1 {
|
||||
fmt.Println("drop", rtp.SequenceNumber, p.lastSeq)
|
||||
p.parser.Drop()
|
||||
if p.VideoTrack != nil {
|
||||
p.SetLostFlag()
|
||||
}
|
||||
}
|
||||
p.parser.Feed(rtp.Payload)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
|
||||
@@ -62,7 +62,7 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
http.NotFound(w, r)
|
||||
} else if opt.IsLive() && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else if code, err := c.Invite(opt); err == nil {
|
||||
} else if code, err := c.Invite(&opt); err == nil {
|
||||
w.WriteHeader(code)
|
||||
} else {
|
||||
http.Error(w, err.Error(), code)
|
||||
|
||||
@@ -31,7 +31,7 @@ type PortManager struct {
|
||||
}
|
||||
|
||||
func (pm *PortManager) Init(start, end uint16) {
|
||||
pm.pos = start
|
||||
pm.pos = start - 1
|
||||
pm.max = end
|
||||
if pm.pos > 0 && pm.max > pm.pos {
|
||||
pm.Valid = true
|
||||
@@ -146,11 +146,13 @@ func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
|
||||
lenBuf := make([]byte, 2)
|
||||
defer conn.Close()
|
||||
var err error
|
||||
ps := make(util.Buffer, 1024)
|
||||
for err == nil {
|
||||
if _, err = io.ReadFull(reader, lenBuf); err != nil {
|
||||
return
|
||||
}
|
||||
ps := make([]byte, binary.BigEndian.Uint16(lenBuf))
|
||||
ps.Reset()
|
||||
ps.Glow(int(binary.BigEndian.Uint16(lenBuf)))
|
||||
if _, err = io.ReadFull(reader, ps); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user