feat: 支持1078流转GB28181

This commit is contained in:
ydajiang
2025-05-31 21:06:33 +08:00
parent 976fd12b4b
commit 3e371c1ac7
18 changed files with 440 additions and 130 deletions

8
api.go
View File

@@ -100,13 +100,13 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/source/close", filterRequestBodyParams(apiServer.OnSourceClose, &IDS{})) // 关闭推流源
apiServer.router.HandleFunc("/api/v1/sink/list", filterRequestBodyParams(apiServer.OnSinkList, &IDS{})) // 查询某个推流源下,所有的拉流端列表
apiServer.router.HandleFunc("/api/v1/sink/close", filterRequestBodyParams(apiServer.OnSinkClose, &IDS{})) // 关闭拉流端
apiServer.router.HandleFunc("/api/v1/sink/add", filterRequestBodyParams(apiServer.OnSinkAdd, &GBOffer{})) // 级联/广播/JT转GB
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
if stream.AppConfig.GB28181.Enable {
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/gb28181/offer/create", filterRequestBodyParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/create", filterRequestBodyParams(apiServer.OnGBAnswerCreate, &GBOffer{}))
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", filterRequestBodyParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址
}
@@ -139,7 +139,7 @@ func (api *ApiServer) generateSinkID(remoteAddr string) stream.SinkID {
panic(err)
}
return stream.NetAddr2SinkId(tcpAddr)
return stream.NetAddr2SinkID(tcpAddr)
}
func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
@@ -426,7 +426,7 @@ func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request)
for _, sink := range sinks {
details = append(details,
SinkDetails{
ID: stream.SinkId2String(sink.GetID()),
ID: stream.SinkID2String(sink.GetID()),
Protocol: sink.GetProtocol().String(),
Time: sink.CreateTime(),
},

View File

@@ -36,6 +36,7 @@ type SourceSDP struct {
type GBOffer struct {
SourceSDP
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"`
}
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
@@ -152,71 +153,21 @@ func (api *ApiServer) OnGBOfferCreate(v *SourceSDP, w http.ResponseWriter, r *ht
}
}
func (api *ApiServer) OnGBAnswerCreate(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建应答 offer: %v", v)
var sink stream.Sink
var err error
// 响应错误消息
defer func() {
if err != nil {
log.Sugar.Errorf("创建应答失败 err: %s", err.Error())
httpResponseError(w, err.Error())
if sink != nil {
sink.Close()
}
}
}()
source := stream.SourceManager.Find(v.Source)
if source == nil {
err = fmt.Errorf("%s 源不存在", v.Source)
return
}
addr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
sinkId := stream.NetAddr2SinkId(addr)
// sinkId添加随机数
if ipv4, ok := sinkId.(uint64); ok {
random := uint64(utils.RandomIntInRange(0x1000, 0xFFFF0000))
sinkId = (ipv4 & 0xFFFFFFFF00000000) | (random << 16) | (ipv4 & 0xFFFF)
}
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
}
var protocol stream.TransStreamProtocol
// 级联转发
if v.SessionName == "" || v.SessionName == InviteTypePlay ||
v.SessionName == InviteTypePlayback ||
v.SessionName == InviteTypeDownload {
protocol = stream.TransStreamGBCascadedForward
} else {
// 对讲广播转发
protocol = stream.TransStreamGBTalkForward
}
func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transport stream.TransportType, sourceId string, remoteAddr string, w http.ResponseWriter, r *http.Request) {
var port int
sink, port, err = stream.NewForwardSink(setup.TransportType(), protocol, sinkId, v.Source, v.Addr, gb28181.TransportManger)
sink, port, err := stream.ForwardStream(protocol, transport, sourceId, r.URL.Query(), remoteAddr, gb28181.TransportManger)
if err != nil {
log.Sugar.Errorf("创建转发sink失败 err: %s", err.Error())
httpResponseError(w, err.Error())
return
}
log.Sugar.Infof("创建转发sink成功, sink: %s port: %d transport: %s", sink.GetID(), port, setup.TransportType())
ok := stream.SubscribeStream(sink, r.URL.Query())
if utils.HookStateOK != ok {
err = fmt.Errorf("failed to prepare play sink")
return
}
log.Sugar.Infof("创建转发sink成功, sink: %s port: %d transport: %s", sink.GetID(), port, transport)
response := struct {
Sink string `json:"sink"` //sink id
Sink string `json:"sink"` // sink id
SDP
}{Sink: stream.SinkId2String(sinkId), SDP: SDP{Addr: net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))}}
}{Sink: stream.SinkID2String(sink.GetID()), SDP: SDP{Addr: net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))}}
httpResponseOK(w, &response)
}
@@ -271,3 +222,18 @@ func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
talkSource.Close()
}
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("添加sink: %v", *v)
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
httpResponseError(w, "不支持的协议")
return
}
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
}
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, w, r)
}

View File

@@ -1,9 +0,0 @@
package gb28181
import (
"github.com/lkmio/lkm/stream"
)
func CascadedTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return stream.NewRtpTransStream(stream.TransStreamGBCascadedForward, 1024), nil
}

100
gb28181/gateway.go Normal file
View File

@@ -0,0 +1,100 @@
package gb28181
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"github.com/lkmio/rtp"
)
type GBGateway struct {
stream.BaseTransStream
ps *mpeg.PSMuxer
psBuffer []byte
tracks map[utils.AVCodecID]struct {
index int
rtp rtp.Muxer
}
}
func (s *GBGateway) WriteHeader() error {
if len(s.tracks) == 0 {
return fmt.Errorf("no tracks available")
}
return nil
}
func (s *GBGateway) AddTrack(track *stream.Track) error {
s.BaseTransStream.AddTrack(track)
var muxer rtp.Muxer
if utils.AVCodecIdH264 == track.Stream.CodecID || utils.AVCodecIdH265 == track.Stream.CodecID || utils.AVCodecIdAAC == track.Stream.CodecID || utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
muxer = rtp.NewMuxer(96, 0, 0xFFFFFFFF)
} else {
log.Sugar.Errorf("不支持的编码格式: %d", track.Stream.CodecID)
return nil
}
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
if err != nil {
log.Sugar.Error("添加%s到ps muxer失败", track.Stream.CodecID)
return nil
}
s.tracks[track.Stream.CodecID] = struct {
index int
rtp rtp.Muxer
}{index: index, rtp: muxer}
return nil
}
func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
track, ok := s.tracks[packet.CodecID]
if !ok {
log.Sugar.Errorf("未找到对应的track: %d", packet.CodecID)
return nil, 0, false, nil
}
dts := packet.ConvertDts(90000)
pts := packet.ConvertPts(90000)
data := avformat.AVCCPacket2AnnexB(s.BaseTransStream.Tracks[packet.Index].Stream, packet)
if cap(s.psBuffer) < len(data)+1024*64 {
s.psBuffer = make([]byte, len(data)*2)
}
n := s.ps.Input(s.psBuffer, track.index, packet.Key, data, &pts, &dts)
var result []*collections.ReferenceCounter[[]byte]
var rtpBuffer []byte
track.rtp.Input(s.psBuffer[:n], uint32(dts), func() []byte {
rtpBuffer = stream.UDPReceiveBufferPool.Get().([]byte)
return rtpBuffer[2:]
}, func(bytes []byte) {
binary.BigEndian.PutUint16(rtpBuffer, uint16(len(bytes)))
refPacket := collections.NewReferenceCounter(rtpBuffer[:2+len(bytes)])
result = append(result, refPacket)
})
return result, 0, true, nil
}
func NewGBGateway() *GBGateway {
return &GBGateway{
ps: mpeg.NewPsMuxer(),
psBuffer: make([]byte, 1024*1024*2),
tracks: make(map[utils.AVCodecID]struct {
index int
rtp rtp.Muxer
}),
}
}
func GatewayTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewGBGateway(), nil
}

View File

@@ -31,7 +31,7 @@ func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceC
func NewTalkTransStream() (stream.TransStream, error) {
return &TalkStream{
RtpStream: stream.NewRtpTransStream(stream.TransStreamGBTalkForward, 1024),
RtpStream: stream.NewRtpTransStream(stream.TransStreamGBTalk, 1024),
muxer: rtp.NewMuxer(8, 0, 0xFFFFFFFF),
packet: make([]byte, 1500),
}, nil

View File

@@ -2,17 +2,114 @@ package jt1078
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"github.com/lkmio/transport"
"net"
"net/http"
"os"
"testing"
"time"
)
func TestPublish(t *testing.T) {
type Handler struct {
muxer *mpeg.PSMuxer
fos *os.File
buffer []byte
tracks map[int]int
gateway *gb28181.GBGateway
udp *transport.UDPClient
}
func (h Handler) OnNewTrack(track avformat.Track) {
addTrack, err := h.muxer.AddTrack(track.GetStream().MediaType, track.GetStream().CodecID)
if err != nil {
println(err.Error())
} else {
h.tracks[track.GetStream().Index] = addTrack
h.gateway.AddTrack(&stream.Track{Stream: track.GetStream()})
}
}
func (h Handler) OnTrackComplete() {
}
func (h Handler) OnTrackNotFind() {
//TODO implement me
panic("implement me")
}
func (h Handler) OnPacket(packet *avformat.AVPacket) {
i, ok := h.tracks[packet.Index]
if !ok {
return
}
dts := packet.ConvertDts(90000)
pts := packet.ConvertPts(90000)
var n int
if packet.MediaType == utils.AVMediaTypeVideo {
// 1078流已经是annexb打包
// annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
n = h.muxer.Input(h.buffer, i, packet.Key, packet.Data, &pts, &dts)
} else {
n = h.muxer.Input(h.buffer, i, true, packet.Data, &pts, &dts)
}
if n > 0 {
h.fos.Write(h.buffer[:n])
}
packets, _, _, err := h.gateway.Input(packet)
if err != nil {
panic(err)
}
for _, refPacket := range packets {
bytes := refPacket.Get()
err = h.udp.Write(bytes[2:])
if err != nil {
panic(err)
}
}
}
func publish() {
//path := "../../source_files/10352264314-2.bin"
path := "../../source_files/013800138000-1.bin"
client := transport.TCPClient{}
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078")
if err != nil {
panic(err)
}
_, err = client.Connect(nil, addr)
if err != nil {
panic(err)
}
file, err := os.ReadFile(path)
if err != nil {
panic(err)
}
index := 0
for index < len(file) {
n := bufio.MinInt(len(file)-index, 1500)
client.Write(file[index : index+n])
index += n
time.Sleep(1 * time.Millisecond)
}
}
func TestPublish(t *testing.T) {
t.Run("decode_1078_data", func(t *testing.T) {
data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659")
if err != nil {
@@ -55,31 +152,98 @@ func TestPublish(t *testing.T) {
})
t.Run("publish", func(t *testing.T) {
publish()
})
// 1078->ps->rtp
// 1078封装成ps流保存到文件, 再用rtp打包发送出去, 用wireshark导出ps流看播放是否正常
t.Run("jt2gb", func(t *testing.T) {
//path := "../../source_files/10352264314-2.bin"
path := "../../source_files/013800138000-1.bin"
client := transport.TCPClient{}
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078")
if err != nil {
panic(err)
}
_, err = client.Connect(nil, addr)
if err != nil {
panic(err)
}
file, err := os.ReadFile(path)
if err != nil {
panic(err)
}
index := 0
for index < len(file) {
n := bufio.MinInt(len(file)-index, 1500)
client.Write(file[index : index+n])
index += n
time.Sleep(1 * time.Millisecond)
openFile, err := os.OpenFile(path+".ps", os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
client := &transport.UDPClient{}
addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:10000")
err = client.Connect(nil, addr)
if err != nil {
panic(err)
}
demuxer := NewDemuxer()
demuxer.SetHandler(&Handler{
muxer: mpeg.NewPsMuxer(),
buffer: make([]byte, 1024*1024*2),
fos: openFile,
tracks: make(map[int]int),
gateway: gb28181.NewGBGateway().(*gb28181.GBGateway),
udp: client,
})
defer demuxer.Close()
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
decoder := transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:])
var n int
for {
r, bytes, err := decoder.Input(file[n:])
if err != nil || bytes == nil {
break
}
n += r
_, err = demuxer.Input(bytes)
if err != nil {
panic(err)
}
}
})
// hook gb-cms的on_invite回调, 处理invite请求, 推送本地文件,发送200响应
t.Run("hook_on_invite", func(t *testing.T) {
// 创建http server
router := mux.NewRouter()
// 示例路由
router.HandleFunc("/api/v1/jt1078/on_invite", func(w http.ResponseWriter, r *http.Request) {
v := struct {
SimNumber string `json:"sim_number,omitempty"`
ChannelNumber string `json:"channel_number,omitempty"`
}{}
// 读取请求体
bytes := make([]byte, 1024)
n, err := r.Body.Read(bytes)
if n < 1 {
panic(err)
}
err = json.Unmarshal(bytes[:n], &v)
if err != nil {
panic(err)
}
fmt.Printf("on_invite sim_number: %s, channel_number: %s\r\n", v.SimNumber, v.ChannelNumber)
w.WriteHeader(http.StatusOK)
go publish()
})
server := &http.Server{
Addr: "localhost:8081",
Handler: router,
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
err := server.ListenAndServe()
if err != nil {
panic(err)
}
})
}

View File

@@ -28,8 +28,9 @@ func init() {
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascadedForward, gb28181.CascadedTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalkForward, gb28181.TalkTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory)
stream.SetRecordStreamFactory(record.NewFLVFileSink)
stream.StreamEndInfoBride = NewStreamEndInfo

View File

@@ -56,7 +56,7 @@ func (s *Session) OnPlay(app, stream_ string) utils.HookState {
streamName, values := stream.ParseUrl(stream_)
sourceId := s.generateSourceID(app, streamName)
sinkId := stream.NetAddr2SinkId(s.conn.RemoteAddr())
sinkId := stream.NetAddr2SinkID(s.conn.RemoteAddr())
log.Sugar.Infof("rtmp onplay app: %s stream: %s sink: %v conn: %s", app, stream_, sinkId, s.conn.RemoteAddr().String())
sink := NewSink(sinkId, sourceId, s.conn, s.stack)

View File

@@ -124,7 +124,7 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
}
}
sinkId := stream.NetAddr2SinkId(request.session.conn.RemoteAddr())
sinkId := stream.NetAddr2SinkID(request.session.conn.RemoteAddr())
sink := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
// 响应sdp回调
response = NewOKResponse(request.headers.Get("Cseq"))
@@ -132,7 +132,7 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
request.session.response(response, []byte(sdp))
})
ok := stream.SubscribeStreamWithRead(sink, request.url.Query(), false)
ok := stream.SubscribeStreamWithOptions(sink, request.url.Query(), false, false)
if utils.HookStateOK != ok {
return nil, nil, fmt.Errorf("hook failed. code: %d", ok)
}

View File

@@ -68,7 +68,9 @@ func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]by
}
if TransportTypeUDP == f.transportType {
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
for _, datum := range data {
f.socket.(*transport.UDPClient).Write(datum.Get()[2:])
}
} else {
return f.BaseSink.Write(index, data, ts, keyVideo)
}
@@ -91,7 +93,7 @@ func (f *ForwardSink) Close() {
// StartReceiveTimer 启动tcp sever计时器, 如果计时器触发, 没有连接, 则关闭流
func (f *ForwardSink) StartReceiveTimer() {
f.receiveTimer = time.AfterFunc(time.Second*10, func() {
f.receiveTimer = time.AfterFunc(ForwardSinkWaitTimeout*time.Second, func() {
if f.Conn == nil {
log.Sugar.Infof("%s 等待连接超时, 关闭sink", f.Protocol)
f.Close()

View File

@@ -13,7 +13,7 @@ import (
// 每个通知事件都需要携带的字段
type eventInfo struct {
Stream string `json:"stream"` //stream GetID
Protocol string `json:"protocol"` //推拉流协议
Protocol int `json:"protocol"` //推拉流协议
RemoteAddr string `json:"remote_addr"` //peer地址
}
@@ -71,11 +71,11 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
}
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().String(), RemoteAddr: sink.RemoteAddr()}
return eventInfo{Stream: sink.GetSourceID(), Protocol: int(sink.GetProtocol()), RemoteAddr: sink.RemoteAddr()}
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{Stream: source.GetID(), Protocol: source.GetType().String(), RemoteAddr: source.RemoteAddr()}
return eventInfo{Stream: source.GetID(), Protocol: int(source.GetType()), RemoteAddr: source.RemoteAddr()}
}
func NewRecordEventInfo(source Source, path string) interface{} {

View File

@@ -1,12 +1,18 @@
package stream
import (
"context"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"net/http"
"time"
)
func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
const (
ForwardSinkWaitTimeout = 20
)
func PreparePlaySink(sink Sink, waitTimeout bool) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hooks.IsEnableOnPlay() {
@@ -32,6 +38,16 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
log.Sugar.Warnf("添加到%s sink到等待队列失败, sink已经断开连接 %s", sink.GetProtocol(), sink.GetID())
return response, utils.HookStateFailure
} else {
if waitTimeout {
go func() {
timeout := sink.StartWaitTimer(context.Background(), ForwardSinkWaitTimeout*time.Second)
if timeout {
log.Sugar.Warnf("在等待队列超时, 删除%s sink id: %v source: %s", sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
sink.Close()
}
}()
}
sink.SetState(SessionStateWaiting)
AddSinkToWaitingQueue(sink.GetSourceID(), sink)
}
@@ -52,7 +68,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
Sink string `json:"sink"`
}{
eventInfo: NewHookPlayEventInfo(sink),
Sink: SinkId2String(sink.GetID()),
Sink: SinkID2String(sink.GetID()),
}
hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), body)

View File

@@ -54,3 +54,7 @@ func NewRtpTransStream(protocol TransStreamProtocol, capacity int) *RtpStream {
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](capacity),
}
}
func GBCascadedTransStreamFactory(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) {
return NewRtpTransStream(TransStreamGBCascaded, 1024), nil
}

View File

@@ -97,6 +97,10 @@ type Sink interface {
// EnableAsyncWriteMode 开启异步发送
EnableAsyncWriteMode(queueSize int)
StartWaitTimer(ctx context.Context, duration time.Duration) bool
StopWaitTimer()
}
type BaseSink struct {
@@ -126,6 +130,9 @@ type BaseSink struct {
cancelFunc func()
cancelCtx context.Context
waitCtx context.Context
waitCancelFunc context.CancelFunc
}
func (s *BaseSink) GetID() SinkID {
@@ -326,7 +333,7 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
// 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink.
// 2. 发送PlayDoneHook事件
func (s *BaseSink) Close() {
log.Sugar.Debugf("closing the %s sink. id: %s. current session state: %s", s.Protocol, SinkId2String(s.ID), s.State)
log.Sugar.Debugf("closing the %s sink. id: %s. current session state: %s", s.Protocol, SinkID2String(s.ID), s.State)
s.Lock()
defer func() {
@@ -433,3 +440,19 @@ func (s *BaseSink) CreateTime() time.Time {
func (s *BaseSink) SetCreateTime(time time.Time) {
s.createTime = time
}
func (s *BaseSink) StartWaitTimer(ctx context.Context, duration time.Duration) bool {
s.waitCtx, s.waitCancelFunc = context.WithCancel(ctx)
select {
case <-time.After(duration):
return true
case <-s.waitCtx.Done():
return false
}
}
func (s *BaseSink) StopWaitTimer() {
if s.waitCancelFunc != nil {
s.waitCancelFunc()
}
}

View File

@@ -4,9 +4,11 @@ import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/transport"
"net"
"net/url"
"strconv"
"time"
)
// SinkID IPV4使用uint64、IPV6使用string作为ID类型
@@ -20,8 +22,8 @@ func ipv4Addr2UInt64(ip uint32, port int) uint64 {
return (uint64(ip) << 32) | uint64(port)
}
// NetAddr2SinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
func NetAddr2SinkId(addr net.Addr) SinkID {
// NetAddr2SinkID 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
func NetAddr2SinkID(addr net.Addr) SinkID {
network := addr.Network()
if "tcp" == network {
to4 := addr.(*net.TCPAddr).IP.To4()
@@ -44,7 +46,7 @@ func NetAddr2SinkId(addr net.Addr) SinkID {
return addr.String()
}
func SinkId2String(id SinkID) string {
func SinkID2String(id SinkID) string {
if i, ok := id.(uint64); ok {
return strconv.FormatUint(i, 10)
}
@@ -52,6 +54,10 @@ func SinkId2String(id SinkID) string {
return id.(string)
}
func GenerateUint64SinkID() SinkID {
return uint64(time.Now().UnixNano()&0xFFFFFFFF)<<32 | uint64(utils.RandomIntInRange(0, 0xFFFFFFFF))
}
func CreateSinkDisconnectionMessage(sink Sink) string {
return fmt.Sprintf("%s sink断开连接. id: %s", sink.GetProtocol(), sink.GetID())
}
@@ -67,12 +73,34 @@ func ExecuteSyncEventOnTransStreamPublisher(sourceId string, event func()) bool
}
func SubscribeStream(sink Sink, values url.Values) utils.HookState {
return SubscribeStreamWithRead(sink, values, true)
return SubscribeStreamWithOptions(sink, values, true, false)
}
func SubscribeStreamWithRead(sink Sink, values url.Values, ready bool) utils.HookState {
func SubscribeStreamWithOptions(sink Sink, values url.Values, ready bool, timeout bool) utils.HookState {
sink.SetReady(ready)
sink.SetUrlValues(values)
_, state := PreparePlaySink(sink)
_, state := PreparePlaySink(sink, timeout)
return state
}
func ForwardStream(protocol TransStreamProtocol, transport TransportType, sourceId string, values url.Values, remoteAddr string, manager transport.Manager) (Sink, int, error) {
//source := SourceManager.Find(sourceId)
//if source == nil {
// return nil, 0, fmt.Errorf("source %s 不存在", sourceId)
//}
sinkId := GenerateUint64SinkID()
var port int
sink, port, err := NewForwardSink(transport, protocol, sinkId, sourceId, remoteAddr, manager)
if err != nil {
return nil, 0, err
}
state := SubscribeStreamWithOptions(sink, values, true, true)
if utils.HookStateOK != state {
sink.Close()
return nil, 0, fmt.Errorf("failed to prepare play sink")
}
return sink, port, nil
}

View File

@@ -27,15 +27,21 @@ func AddSinkToWaitingQueue(streamId string, sink Sink) {
}
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool) {
var sink Sink
mutex.Lock()
defer mutex.Unlock()
defer func() {
mutex.Unlock()
if sink != nil {
sink.StopWaitTimer()
}
}()
m, ok := waitingSinks[sourceId]
if !ok {
return nil, false
}
sink, ok := m[sinkId]
sink, ok = m[sinkId]
if ok {
delete(m, sinkId)
}
@@ -44,15 +50,21 @@ func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool) {
}
func PopWaitingSinks(sourceId string) []Sink {
var sinks []Sink
mutex.Lock()
defer mutex.Unlock()
defer func() {
mutex.Unlock()
for _, sink := range sinks {
sink.StopWaitTimer()
}
}()
source, ok := waitingSinks[sourceId]
if !ok {
return nil
}
sinks := make([]Sink, len(source))
sinks = make([]Sink, len(source))
var index = 0
for _, sink := range source {
sinks[index] = sink

View File

@@ -30,8 +30,9 @@ const (
TransStreamRtsp = TransStreamProtocol(3)
TransStreamHls = TransStreamProtocol(4)
TransStreamRtc = TransStreamProtocol(5)
TransStreamGBCascadedForward = TransStreamProtocol(6) // 国标级联转发
TransStreamGBTalkForward = TransStreamProtocol(7) // 国标广播/对讲转发
TransStreamGBCascaded = TransStreamProtocol(6) // 国标级联转发
TransStreamGBTalk = TransStreamProtocol(7) // 国标广播/对讲转发
TransStreamGBGateway = TransStreamProtocol(8) // 国标网关
)
const (
@@ -48,7 +49,7 @@ func (s SourceType) String() string {
if SourceTypeRtmp == s {
return "rtmp"
} else if SourceType28181 == s {
return "28181"
return "gb28181"
} else if SourceType1078 == s {
return "jt1078"
} else if SourceTypeGBTalk == s {
@@ -69,10 +70,12 @@ func (p TransStreamProtocol) String() string {
return "hls"
} else if TransStreamRtc == p {
return "rtc"
} else if TransStreamGBCascadedForward == p {
return "gb_cascaded_forward"
} else if TransStreamGBTalkForward == p {
return "gb_talk_forward"
} else if TransStreamGBCascaded == p {
return "gb_cascaded"
} else if TransStreamGBTalk == p {
return "gb_talk"
} else if TransStreamGBGateway == p {
return "gb_gateway"
}
panic(fmt.Sprintf("unknown stream protocol %d", p))

View File

@@ -132,7 +132,7 @@ func (t *transStreamPublisher) run() {
t.OnPacket(event.Data.(*collections.ReferenceCounter[*avformat.AVPacket]))
case StreamEventTypeRawPacket:
// 发送原始数据包, 目前仅用于国标级联转发
if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascadedForward {
if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascaded {
packets := event.Data.([][]byte)
for _, data := range packets {
t.DispatchPacket(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]})
@@ -233,7 +233,7 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
_ = transStream.WriteHeader()
// 设置转发流
if TransStreamGBCascadedForward == transStream.GetProtocol() {
if TransStreamGBCascaded == transStream.GetProtocol() {
t.forwardTransStream = transStream
}
@@ -381,7 +381,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), t.source)
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkID2String(sink.GetID()), t.source)
return false
}
@@ -417,7 +417,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
}
// 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && t.existVideo && TransStreamGBCascadedForward != transStream.GetProtocol() {
if !exist && AppConfig.GOPCache && t.existVideo && TransStreamGBCascaded != transStream.GetProtocol() {
t.DispatchGOPBuffer(transStream)
}
@@ -641,7 +641,7 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av
// 分发给各个传输流
for _, transStream := range t.transStreams {
if TransStreamGBCascadedForward != transStream.GetProtocol() {
if TransStreamGBCascaded != transStream.GetProtocol() {
t.DispatchPacket(transStream, packet.Get())
}
}