feat: 支持国标录像下载

This commit is contained in:
ydajiang
2025-09-14 19:35:39 +08:00
parent 69308c466b
commit a5b7fc6f24
5 changed files with 307 additions and 73 deletions

44
api.go
View File

@@ -131,9 +131,10 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流 apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
if stream.AppConfig.GB28181.Enable { if stream.AppConfig.GB28181.Enable {
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址 apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 应答的sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
} }
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) { apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
@@ -511,8 +512,22 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("streamid") id := r.URL.Query().Get("streamid")
source := stream.SourceManager.Find(id) source := stream.SourceManager.Find(id)
if source == nil || !source.IsCompleted() || source.IsClosed() { if source == nil || source.IsClosed() {
w.WriteHeader(http.StatusBadRequest)
httpResponseJson(w, "stream not found")
return return
} else if !source.IsCompleted() {
// 在请求结束前, 每隔1秒检查track探测是否完成
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for !source.IsClosed() && !source.IsCompleted() && r.Context().Err() == nil {
select {
case <-ticker.C:
break
case <-r.Context().Done():
break
}
}
} }
tracks := source.OriginTracks() tracks := source.OriginTracks()
@@ -572,8 +587,26 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
recordStartTime = startTime.Format("2006-01-02 15:04:05") recordStartTime = startTime.Format("2006-01-02 15:04:05")
} }
gbSource := Source2GBSource(source)
var downloadInfo *DownloadInfo
if gbSource != nil && InviteTypeDownload == gbSource.GetSessionName() {
progress := gbSource.GetPlaybackProgress()
gbSource.GetTransStreamPublisher()
downloadInfo = &DownloadInfo{
PlaybackDuration: gbSource.GetDuration(),
PlaybackSpeed: gbSource.GetSpeed(),
PlaybackFileSize: gbSource.GetFileSize(),
PlaybackStartTime: gbSource.GetStartTime(),
PlaybackEndTime: gbSource.GetEndTime(),
PlaybackFileURL: gbSource.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
PlaybackProgress: progress,
Progress: progress,
}
}
statistics := source.GetBitrateStatistics() statistics := source.GetBitrateStatistics()
response := struct { response := struct {
*DownloadInfo
AudioEnable bool `json:"AudioEnable"` AudioEnable bool `json:"AudioEnable"`
CDN string `json:"CDN"` CDN string `json:"CDN"`
CascadeSize int `json:"CascadeSize"` CascadeSize int `json:"CascadeSize"`
@@ -612,6 +645,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
WEBRTC string `json:"WEBRTC"` WEBRTC string `json:"WEBRTC"`
WS_FLV string `json:"WS_FLV"` WS_FLV string `json:"WS_FLV"`
}{ }{
DownloadInfo: downloadInfo,
AudioEnable: true, AudioEnable: true,
CDN: "", CDN: "",
CascadeSize: 0, CascadeSize: 0,
@@ -669,14 +703,14 @@ func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) {
if source == nil { if source == nil {
log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId) log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok { } else if ok := source.GetTransStreamPublisher().StartRecord(); !ok {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
} else { } else {
// 返回拉流地址 // 返回拉流地址
httpResponseJson(w, &struct { httpResponseJson(w, &struct {
DownloadURL string `json:"DownloadURL"` DownloadURL string `json:"DownloadURL"`
}{ }{
DownloadURL: url, DownloadURL: source.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
}) })
} }

173
api_gb.go
View File

@@ -1,7 +1,10 @@
package main package main
import ( import (
"encoding/base64"
"fmt" "fmt"
"github.com/gorilla/mux"
audio_transcoder "github.com/lkmio/audio-transcoder"
"github.com/lkmio/avformat/bufio" "github.com/lkmio/avformat/bufio"
"github.com/lkmio/lkm/gb28181" "github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log" "github.com/lkmio/lkm/log"
@@ -9,6 +12,7 @@ import (
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"time"
) )
const ( const (
@@ -25,6 +29,21 @@ type SDP struct {
SSRC string `json:"ssrc,omitempty"` SSRC string `json:"ssrc,omitempty"`
Setup string `json:"setup,omitempty"` // active/passive Setup string `json:"setup,omitempty"` // active/passive
Transport string `json:"transport,omitempty"` // tcp/udp Transport string `json:"transport,omitempty"` // tcp/udp
Speed int `json:"speed,omitempty"`
StartTime int `json:"start_time,omitempty"`
EndTime int `json:"end_time,omitempty"`
FileSize int `json:"file_size,omitempty"`
}
type DownloadInfo struct {
PlaybackDuration int // 回放/下载时长
PlaybackSpeed int // 回放/下载速度
PlaybackFileURL string // 回放/下载文件URL
PlaybackStartTime string // 回放/下载开始时间
PlaybackEndTime string // 回放/下载结束时间
PlaybackFileSize int // 回放/下载文件大小
PlaybackProgress float64 // 1-下载完成
Progress float64
} }
type SourceSDP struct { type SourceSDP struct {
@@ -38,6 +57,18 @@ type GBOffer struct {
TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"` TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"`
} }
func Source2GBSource(source stream.Source) gb28181.GBSource {
if gbSource, ok := source.(*gb28181.PassiveSource); ok {
return gbSource
} else if gbSource, ok := source.(*gb28181.ActiveSource); ok {
return gbSource
} else if gbSource, ok := source.(*gb28181.PassiveSource); ok {
return gbSource
}
return nil
}
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) { func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建国标源: %v", v) log.Sugar.Infof("创建国标源: %v", v)
@@ -73,18 +104,29 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
} }
var ssrc string var ssrc string
if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback { if InviteTypeDownload == v.SessionName || InviteTypePlayback == v.SessionName {
ssrc = gb28181.GetVodSSRC() ssrc = gb28181.GetVodSSRC()
} else { } else {
ssrc = gb28181.GetLiveSSRC() ssrc = gb28181.GetLiveSSRC()
} }
ssrcValue, _ := strconv.Atoi(ssrc) ssrcValue, _ := strconv.Atoi(ssrc)
_, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active) gbSource, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
if err != nil { if err != nil {
return return
} else if InviteTypeDownload == v.SessionName {
// 开启录制
gbSource.GetTransStreamPublisher().StartRecord()
} }
startTime := time.Unix(int64(v.StartTime), 0).Format("2006-01-02T15:04:05")
endTime := time.Unix(int64(v.EndTime), 0).Format("2006-01-02T15:04:05")
gbSource.SetSessionName(v.SessionName)
gbSource.SetStartTime(startTime)
gbSource.SetEndTime(endTime)
gbSource.SetSpeed(v.Speed)
gbSource.SetDuration(v.EndTime - v.StartTime)
response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port)) response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
response.Urls = stream.GetStreamPlayUrls(v.Source) response.Urls = stream.GetStreamPlayUrls(v.Source)
response.SSRC = ssrc response.SSRC = ssrc
@@ -94,13 +136,13 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
} }
func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) { func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v) log.Sugar.Infof("设置国标应答: %v", v)
var err error var err error
// 响应错误消息 // 响应错误消息
defer func() { defer func() {
if err != nil { if err != nil {
log.Sugar.Errorf("设置国标主动拉流失败 err: %s", err.Error()) log.Sugar.Errorf("设置国标应答失败 err: %s", err.Error())
httpResponseError(w, err.Error()) httpResponseError(w, err.Error())
} }
}() }()
@@ -108,22 +150,24 @@ func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *
source := stream.SourceManager.Find(v.Source) source := stream.SourceManager.Find(v.Source)
if source == nil { if source == nil {
err = fmt.Errorf("%s 源不存在", v.Source) err = fmt.Errorf("%s 源不存在", v.Source)
return } else if stream.SourceType28181 != source.GetType() {
} err = fmt.Errorf("%s 源不是28181类型", v.Source)
} else if activeSource, ok := source.(*gb28181.ActiveSource); ok {
activeSource.SetFileSize(v.FileSize)
// 主动连接取流
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr("tcp", v.Addr)
if err != nil {
return
}
activeSource, ok := source.(*gb28181.ActiveSource) if err = activeSource.Connect(addr); err == nil {
if !ok { httpResponseOK(w, nil)
err = fmt.Errorf("%s 源不是Active拉流类型", v.Source) }
return } else if passiveSource, ok := source.(*gb28181.PassiveSource); ok {
} passiveSource.SetFileSize(v.FileSize)
} else if udpSource, ok := source.(*gb28181.UDPSource); ok {
addr, err := net.ResolveTCPAddr("tcp", v.Addr) udpSource.SetFileSize(v.FileSize)
if err != nil {
return
}
if err = activeSource.Connect(addr); err == nil {
httpResponseOK(w, nil)
} }
} }
@@ -185,6 +229,21 @@ func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transp
httpResponseOK(w, &response) httpResponseOK(w, &response)
} }
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("添加sink: %v", *v)
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
httpResponseError(w, "不支持的协议")
return
}
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
}
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
}
// OnGBTalk 国标广播/对讲流程: // OnGBTalk 国标广播/对讲流程:
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接 // 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求 // 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
@@ -237,17 +296,77 @@ func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
talkSource.Close() talkSource.Close()
} }
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) { // OnLiveGBSTalk liveGBS前端对讲
log.Sugar.Infof("添加sink: %v", *v) func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol { vars := mux.Vars(r)
httpResponseError(w, "不支持的协议") device := vars["device"]
channel := vars["channel"]
_ = r.URL.Query().Get("format")
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
conn.Close()
return return
} }
setup := gb28181.SetupTypeFromString(v.Setup) // 获取id
if v.AnswerSetup != "" { id := device + "/" + channel + ".broadcast"
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
talkSource := gb28181.NewTalkSource(id, conn)
talkSource.Init()
talkSource.SetUrlValues(r.Form)
_, err = stream.PreparePublishSource(talkSource, true)
if err != nil {
log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource)
conn.Close()
return
} }
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r) log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)
stream.LoopEvent(talkSource)
data := stream.UDPReceiveBufferPool.Get().([]byte)
pcm := make([]byte, 32000)
g711aPacket := make([]byte, stream.UDPReceiveBufferSize/2)
for {
_, bytes, err := conn.ReadMessage()
length := len(bytes)
if err != nil {
log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
break
} else if length < 1 {
continue
}
// 扩容
if int(float64(len(bytes))*1.4) > len(pcm) {
pcm = make([]byte, len(bytes)*2)
}
// base64解密
var pcmN int
pcmN, err = base64.StdEncoding.Decode(bytes, pcm)
if err == nil {
log.Sugar.Errorf(err.Error())
continue
}
for i := 0; i < pcmN; {
// 控制每包大小
n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
copy(data, pcm[:n])
// 编码成G711A
audio_transcoder.EncodeAlawToBuffer(data, g711aPacket)
_, _ = talkSource.PublishSource.Input(g711aPacket[:n/2])
i += n
}
}
talkSource.Close()
} }

View File

@@ -78,6 +78,21 @@ type GBSource interface {
ProcessPacket(data []byte) error ProcessPacket(data []byte) error
SetTransport(transport transport.Transport) SetTransport(transport transport.Transport)
GetDuration() int
GetSpeed() int
GetSessionName() string
GetStartTime() string
GetEndTime() string
GetFileSize() int
GetPlaybackProgress() float64
SetDuration(duration int)
SetSpeed(speed int)
SetSessionName(sessionName string)
SetStartTime(startTime string)
SetEndTime(endTime string)
SetFileSize(fileSize int)
} }
type BaseGBSource struct { type BaseGBSource struct {
@@ -94,6 +109,15 @@ type BaseGBSource struct {
isSystemClock bool // 推流时间戳不正确, 是否使用系统时间. isSystemClock bool // 推流时间戳不正确, 是否使用系统时间.
lastRtpTimestamp int64 lastRtpTimestamp int64
sameTimePackets [][]byte sameTimePackets [][]byte
sessionName string // play/playback/download...
duration int // 回放/下载时长, 单位秒
speed int // 回放/下载速度
startTime string // 回放/下载开始时间
endTime string // 回放/下载结束时间
fileSize int // 回放/下载文件大小
playbackProgress float64 // 1-下载完成
playbackDataSize int // 已下载数据大小
} }
// ProcessPacket 输入rtp包, 处理PS流, 负责解析->封装->推流 // ProcessPacket 输入rtp包, 处理PS流, 负责解析->封装->推流
@@ -106,6 +130,13 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error {
source.InitializePublish(packet.SSRC) source.InitializePublish(packet.SSRC)
} }
// 统计下载的进度
source.playbackDataSize += len(data)
source.playbackProgress = float64(source.playbackDataSize) / float64(source.fileSize)
if source.playbackProgress > 1 {
source.playbackProgress = 1
}
// 国标级联转发 // 国标级联转发
if source.GetTransStreamPublisher().GetForwardTransStream() != nil { if source.GetTransStreamPublisher().GetForwardTransStream() != nil {
if source.lastRtpTimestamp == -1 { if source.lastRtpTimestamp == -1 {
@@ -274,6 +305,58 @@ func (source *BaseGBSource) SetTransport(transport transport.Transport) {
source.transport = transport source.transport = transport
} }
func (source *BaseGBSource) GetSessionName() string {
return source.sessionName
}
func (source *BaseGBSource) GetStartTime() string {
return source.startTime
}
func (source *BaseGBSource) GetEndTime() string {
return source.endTime
}
func (source *BaseGBSource) GetFileSize() int {
return source.fileSize
}
func (source *BaseGBSource) GetPlaybackProgress() float64 {
return source.playbackProgress
}
func (source *BaseGBSource) SetStartTime(startTime string) {
source.startTime = startTime
}
func (source *BaseGBSource) SetEndTime(endTime string) {
source.endTime = endTime
}
func (source *BaseGBSource) SetFileSize(fileSize int) {
source.fileSize = fileSize
}
func (source *BaseGBSource) SetSessionName(sessionName string) {
// 转小写
source.sessionName = strings.ToLower(sessionName)
}
func (source *BaseGBSource) GetDuration() int {
return source.duration
}
func (source *BaseGBSource) GetSpeed() int {
return source.speed
}
func (source *BaseGBSource) SetDuration(duration int) {
source.duration = duration
}
func (source *BaseGBSource) SetSpeed(speed int) {
source.speed = speed
}
// NewGBSource 创建国标推流源, 返回监听的收流端口 // NewGBSource 创建国标推流源, 返回监听的收流端口
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) { func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) {
var transportServer transport.Transport var transportServer transport.Transport

View File

@@ -205,5 +205,5 @@ func CloseSource(id string) {
// LoopEvent 循环读取事件 // LoopEvent 循环读取事件
func LoopEvent(source Source) { func LoopEvent(source Source) {
source.StartTimers(source) source.StartTimers(source)
go source.GetTransStreamPublisher().run() source.GetTransStreamPublisher().start()
} }

View File

@@ -1,7 +1,6 @@
package stream package stream
import ( import (
"bytes"
"fmt" "fmt"
"github.com/lkmio/avformat" "github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/collections"
@@ -11,8 +10,6 @@ import (
"github.com/lkmio/lkm/transcode" "github.com/lkmio/lkm/transcode"
"github.com/lkmio/transport" "github.com/lkmio/transport"
"path/filepath" "path/filepath"
"runtime"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -35,7 +32,7 @@ type StreamEvent struct {
type TransStreamPublisher interface { type TransStreamPublisher interface {
Post(event *StreamEvent) Post(event *StreamEvent)
run() start()
close() close()
@@ -73,13 +70,15 @@ type TransStreamPublisher interface {
// StartRecord 开启录制 // StartRecord 开启录制
// 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false // 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false
StartRecord() (string, bool) StartRecord() bool
// StopRecord 停止录制 // StopRecord 停止录制
// 如果AppConfig已经开启了全局录制, 返回error // 如果AppConfig已经开启了全局录制, 返回error
StopRecord() error StopRecord() error
RecordStartTime() time.Time RecordStartTime() time.Time
GetRecordStreamPlayUrl() string
} }
type transStreamPublisher struct { type transStreamPublisher struct {
@@ -90,12 +89,13 @@ type transStreamPublisher struct {
sinkCount int // 拉流计数 sinkCount int // 拉流计数
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
recordSink Sink // 每个Source的录制流 recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径 recordFilePath string // 录制流文件路径
recordStartTime time.Time // 开始录制时间 recordStartTime time.Time // 开始录制时间
hlsStream TransStream // HLS传输流 hasManualRecording bool // 是否开启手动录像
originTracks TrackManager // 推流的原始track hlsStream TransStream // HLS传输流
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track originTracks TrackManager // 推流的原始track
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
transStreams map[TransStreamID]TransStream // 所有输出流 transStreams map[TransStreamID]TransStream // 所有输出流
forwardTransStream TransStream // 转发流 forwardTransStream TransStream // 转发流
@@ -114,26 +114,7 @@ func (t *transStreamPublisher) Post(event *StreamEvent) {
t.streamEvents.Post(event) t.streamEvents.Post(event)
} }
func getGoroutineID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func (t *transStreamPublisher) run() { func (t *transStreamPublisher) run() {
log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID())
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
t.mainContextEvents = make(chan func(), 256)
t.transStreams = make(map[TransStreamID]TransStream, 10)
t.sinks = make(map[SinkID]Sink, 128)
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
defer func() { defer func() {
// 清空管道 // 清空管道
for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() { for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() {
@@ -175,6 +156,18 @@ func (t *transStreamPublisher) run() {
} }
} }
func (t *transStreamPublisher) start() {
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
t.mainContextEvents = make(chan func(), 256)
t.transStreams = make(map[TransStreamID]TransStream, 10)
t.sinks = make(map[SinkID]Sink, 128)
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
go t.run()
}
func (t *transStreamPublisher) PostEvent(cb func()) { func (t *transStreamPublisher) PostEvent(cb func()) {
t.mainContextEvents <- cb t.mainContextEvents <- cb
} }
@@ -210,7 +203,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
} }
// 创建录制流 // 创建录制流
if AppConfig.Record.Enable { if AppConfig.Record.Enable || t.hasManualRecording {
t.createRecordSink() t.createRecordSink()
} }
@@ -906,25 +899,25 @@ func (t *transStreamPublisher) SetSourceID(id string) {
t.source = id t.source = id
} }
func (t *transStreamPublisher) StartRecord() (string, bool) { func (t *transStreamPublisher) StartRecord() bool {
if AppConfig.Record.Enable || t.recordSink != nil { if AppConfig.Record.Enable || t.recordSink != nil {
return "", false return false
} }
var ok bool var ok bool
t.ExecuteSyncEvent(func() { t.ExecuteSyncEvent(func() {
t.hasManualRecording = true
// 如果探测还未结束
if !t.completed.Load() {
return
}
if t.recordSink == nil && t.createRecordSink() { if t.recordSink == nil && t.createRecordSink() {
ok = t.doAddSink(t.recordSink, false) ok = t.doAddSink(t.recordSink, false)
} }
}) })
var url string return ok
if ok {
// 去掉反斜杠
url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
}
return url, ok
} }
func (t *transStreamPublisher) StopRecord() error { func (t *transStreamPublisher) StopRecord() error {
@@ -933,6 +926,7 @@ func (t *transStreamPublisher) StopRecord() error {
} }
t.ExecuteSyncEvent(func() { t.ExecuteSyncEvent(func() {
t.hasManualRecording = false
if t.recordSink != nil { if t.recordSink != nil {
t.clearSinkStreaming(t.recordSink) t.clearSinkStreaming(t.recordSink)
t.recordSink.Close() t.recordSink.Close()
@@ -949,6 +943,10 @@ func (t *transStreamPublisher) RecordStartTime() time.Time {
return t.recordStartTime return t.recordStartTime
} }
func (t *transStreamPublisher) GetRecordStreamPlayUrl() string {
return GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
}
func NewTransStreamPublisher(source string) TransStreamPublisher { func NewTransStreamPublisher(source string) TransStreamPublisher {
return &transStreamPublisher{ return &transStreamPublisher{
transStreams: make(map[TransStreamID]TransStream), transStreams: make(map[TransStreamID]TransStream),