mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
Compare commits
2 Commits
ef313b1ea4
...
6ff273c9e5
Author | SHA1 | Date | |
---|---|---|---|
![]() |
6ff273c9e5 | ||
![]() |
fc5b8f5a1b |
2
api.go
2
api.go
@@ -132,7 +132,7 @@ func startApiServer(addr string) {
|
||||
|
||||
if stream.AppConfig.GB28181.Enable {
|
||||
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnLiveGBSTalk) // livegbs一对一对讲
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
|
||||
|
@@ -310,9 +310,7 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取id
|
||||
id := device + "/" + channel + ".broadcast"
|
||||
|
||||
talkSource := gb28181.NewTalkSource(id, conn)
|
||||
talkSource.Init()
|
||||
talkSource.SetUrlValues(r.Form)
|
||||
@@ -349,9 +347,9 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// base64解密
|
||||
var pcmN int
|
||||
pcmN, err = base64.StdEncoding.Decode(bytes, pcm)
|
||||
if err == nil {
|
||||
log.Sugar.Errorf(err.Error())
|
||||
pcmN, err = base64.StdEncoding.Decode(pcm, bytes)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("base64解密失败, source: %s err: %s", id, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -2,6 +2,7 @@ package stream
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"github.com/lkmio/transport"
|
||||
@@ -34,6 +35,7 @@ func (t TransportType) String() string {
|
||||
type ForwardSink struct {
|
||||
BaseSink
|
||||
socket transport.Transport
|
||||
spareSocket transport.Transport // 对讲备选udp发送
|
||||
transportType TransportType
|
||||
receiveTimer *time.Timer
|
||||
ssrc uint32
|
||||
@@ -68,7 +70,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
|
||||
|
||||
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
|
||||
// TCP等待连接后再转发数据
|
||||
if TransportTypeUDP != f.transportType && f.Conn == nil {
|
||||
if TransportTypeUDP != f.transportType && f.Conn == nil && f.spareSocket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -121,9 +123,15 @@ func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]by
|
||||
processedData = data
|
||||
}
|
||||
|
||||
if TransportTypeUDP == f.transportType {
|
||||
spare := f.Conn == nil && f.spareSocket != nil
|
||||
if TransportTypeUDP == f.transportType || spare {
|
||||
var sender = f.socket
|
||||
if spare {
|
||||
sender = f.spareSocket
|
||||
}
|
||||
|
||||
for _, datum := range processedData {
|
||||
f.socket.(*transport.UDPClient).Write(datum.Get()[2:])
|
||||
sender.(*transport.UDPClient).Write(datum.Get()[2:])
|
||||
}
|
||||
} else {
|
||||
return f.BaseSink.Write(index, processedData, ts, keyVideo)
|
||||
@@ -140,6 +148,10 @@ func (f *ForwardSink) Close() {
|
||||
f.socket.Close()
|
||||
}
|
||||
|
||||
if f.spareSocket != nil {
|
||||
f.spareSocket.Close()
|
||||
}
|
||||
|
||||
if f.receiveTimer != nil {
|
||||
f.receiveTimer.Stop()
|
||||
}
|
||||
@@ -175,14 +187,12 @@ func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, s
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
client, err := manager.NewUDPClient(remoteAddr)
|
||||
if err != nil {
|
||||
} else if client, err := manager.NewUDPClient(remoteAddr); err != nil {
|
||||
return nil, 0, err
|
||||
} else {
|
||||
sink.socket = client
|
||||
}
|
||||
|
||||
sink.socket = client
|
||||
} else if transportType == TransportTypeTCPClient {
|
||||
client := transport.TCPClient{}
|
||||
err := manager.AllocPort(true, func(port uint16) error {
|
||||
@@ -220,7 +230,20 @@ func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, s
|
||||
tcpServer.SetHandler(sink)
|
||||
tcpServer.Accept()
|
||||
sink.socket = tcpServer
|
||||
sink.StartReceiveTimer()
|
||||
|
||||
// 同时创建udp发送器, 兼容不支持tcp对讲的设备
|
||||
if TransStreamGBTalk == protocol {
|
||||
localAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", AppConfig.ListenIP, tcpServer.ListenPort()))
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
|
||||
udp := &transport.UDPClient{}
|
||||
err = udp.Connect(localAddr, remoteAddr)
|
||||
if err == nil {
|
||||
sink.spareSocket = udp
|
||||
}
|
||||
} else {
|
||||
sink.StartReceiveTimer()
|
||||
}
|
||||
}
|
||||
|
||||
return sink, sink.socket.ListenPort(), nil
|
||||
|
@@ -85,6 +85,7 @@ type transStreamPublisher struct {
|
||||
source string
|
||||
streamEvents *NonBlockingChannel[*StreamEvent]
|
||||
mainContextEvents chan func()
|
||||
earlyEvents collections.LinkedList[func()] // 早于启动前的事件, 等待启动后执行
|
||||
|
||||
sinkCount int // 拉流计数
|
||||
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
|
||||
@@ -108,6 +109,8 @@ type transStreamPublisher struct {
|
||||
streamEndInfo *StreamEndInfo // 上次结束推流的信息
|
||||
lastStreamEndTime time.Time // 最近结束拉流的时间
|
||||
bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区
|
||||
mute sync.Mutex
|
||||
started bool
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) Post(event *StreamEvent) {
|
||||
@@ -157,6 +160,9 @@ func (t *transStreamPublisher) run() {
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) start() {
|
||||
t.mute.Lock()
|
||||
defer t.mute.Unlock()
|
||||
|
||||
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
|
||||
t.mainContextEvents = make(chan func(), 256)
|
||||
|
||||
@@ -166,10 +172,26 @@ func (t *transStreamPublisher) start() {
|
||||
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
|
||||
|
||||
go t.run()
|
||||
t.started = true
|
||||
|
||||
// 放置先于启动的事件到主管道
|
||||
for t.earlyEvents.Size() > 0 {
|
||||
t.mainContextEvents <- t.earlyEvents.Remove(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) PostEvent(cb func()) {
|
||||
t.mainContextEvents <- cb
|
||||
if t.started {
|
||||
t.mainContextEvents <- cb
|
||||
return
|
||||
}
|
||||
|
||||
// 早于启动前的事件, 添加到等待队列
|
||||
t.mute.Lock()
|
||||
defer t.mute.Unlock()
|
||||
if !t.started {
|
||||
t.earlyEvents.Add(cb)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) {
|
||||
|
Reference in New Issue
Block a user