mirror of
https://github.com/lkmio/lkm.git
synced 2025-10-07 08:00:59 +08:00
feat: 适配livegbs一对一对讲接口
This commit is contained in:
2
api.go
2
api.go
@@ -132,7 +132,7 @@ func startApiServer(addr string) {
|
|||||||
|
|
||||||
if stream.AppConfig.GB28181.Enable {
|
if stream.AppConfig.GB28181.Enable {
|
||||||
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||||
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnLiveGBSTalk) // livegbs一对一对讲
|
||||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
|
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
|
||||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
|
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
|
||||||
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
|
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
|
||||||
|
@@ -310,9 +310,7 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取id
|
|
||||||
id := device + "/" + channel + ".broadcast"
|
id := device + "/" + channel + ".broadcast"
|
||||||
|
|
||||||
talkSource := gb28181.NewTalkSource(id, conn)
|
talkSource := gb28181.NewTalkSource(id, conn)
|
||||||
talkSource.Init()
|
talkSource.Init()
|
||||||
talkSource.SetUrlValues(r.Form)
|
talkSource.SetUrlValues(r.Form)
|
||||||
@@ -349,9 +347,9 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// base64解密
|
// base64解密
|
||||||
var pcmN int
|
var pcmN int
|
||||||
pcmN, err = base64.StdEncoding.Decode(bytes, pcm)
|
pcmN, err = base64.StdEncoding.Decode(pcm, bytes)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf(err.Error())
|
log.Sugar.Errorf("base64解密失败, source: %s err: %s", id, err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -85,6 +85,7 @@ type transStreamPublisher struct {
|
|||||||
source string
|
source string
|
||||||
streamEvents *NonBlockingChannel[*StreamEvent]
|
streamEvents *NonBlockingChannel[*StreamEvent]
|
||||||
mainContextEvents chan func()
|
mainContextEvents chan func()
|
||||||
|
earlyEvents collections.LinkedList[func()] // 早于启动前的事件, 等待启动后执行
|
||||||
|
|
||||||
sinkCount int // 拉流计数
|
sinkCount int // 拉流计数
|
||||||
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
|
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
|
||||||
@@ -108,6 +109,8 @@ type transStreamPublisher struct {
|
|||||||
streamEndInfo *StreamEndInfo // 上次结束推流的信息
|
streamEndInfo *StreamEndInfo // 上次结束推流的信息
|
||||||
lastStreamEndTime time.Time // 最近结束拉流的时间
|
lastStreamEndTime time.Time // 最近结束拉流的时间
|
||||||
bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区
|
bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区
|
||||||
|
mute sync.Mutex
|
||||||
|
started bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) Post(event *StreamEvent) {
|
func (t *transStreamPublisher) Post(event *StreamEvent) {
|
||||||
@@ -157,6 +160,9 @@ func (t *transStreamPublisher) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) start() {
|
func (t *transStreamPublisher) start() {
|
||||||
|
t.mute.Lock()
|
||||||
|
defer t.mute.Unlock()
|
||||||
|
|
||||||
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
|
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
|
||||||
t.mainContextEvents = make(chan func(), 256)
|
t.mainContextEvents = make(chan func(), 256)
|
||||||
|
|
||||||
@@ -166,10 +172,26 @@ func (t *transStreamPublisher) start() {
|
|||||||
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
|
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
|
||||||
|
|
||||||
go t.run()
|
go t.run()
|
||||||
|
t.started = true
|
||||||
|
|
||||||
|
// 放置先于启动的事件到主管道
|
||||||
|
for t.earlyEvents.Size() > 0 {
|
||||||
|
t.mainContextEvents <- t.earlyEvents.Remove(0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) PostEvent(cb func()) {
|
func (t *transStreamPublisher) PostEvent(cb func()) {
|
||||||
|
if t.started {
|
||||||
t.mainContextEvents <- cb
|
t.mainContextEvents <- cb
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 早于启动前的事件, 添加到等待队列
|
||||||
|
t.mute.Lock()
|
||||||
|
defer t.mute.Unlock()
|
||||||
|
if !t.started {
|
||||||
|
t.earlyEvents.Add(cb)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) {
|
func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) {
|
||||||
|
Reference in New Issue
Block a user