Compare commits

...

2 Commits

Author SHA1 Message Date
ydajiang
ef313b1ea4 feat: 支持国标倍速播放 2025-09-14 21:39:50 +08:00
ydajiang
a5b7fc6f24 feat: 支持国标录像下载 2025-09-14 19:35:39 +08:00
5 changed files with 327 additions and 79 deletions

47
api.go
View File

@@ -131,9 +131,11 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
if stream.AppConfig.GB28181.Enable {
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
}
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
@@ -511,8 +513,22 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("streamid")
source := stream.SourceManager.Find(id)
if source == nil || !source.IsCompleted() || source.IsClosed() {
if source == nil || source.IsClosed() {
w.WriteHeader(http.StatusBadRequest)
httpResponseJson(w, "stream not found")
return
} else if !source.IsCompleted() {
// 在请求结束前, 每隔1秒检查track探测是否完成
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for !source.IsClosed() && !source.IsCompleted() && r.Context().Err() == nil {
select {
case <-ticker.C:
break
case <-r.Context().Done():
break
}
}
}
tracks := source.OriginTracks()
@@ -572,8 +588,26 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
recordStartTime = startTime.Format("2006-01-02 15:04:05")
}
gbSource := Source2GBSource(source)
var downloadInfo *DownloadInfo
if gbSource != nil && InviteTypeDownload == gbSource.GetSessionName() {
progress := gbSource.GetPlaybackProgress()
gbSource.GetTransStreamPublisher()
downloadInfo = &DownloadInfo{
PlaybackDuration: gbSource.GetDuration(),
PlaybackSpeed: gbSource.GetSpeed(),
PlaybackFileSize: gbSource.GetFileSize(),
PlaybackStartTime: gbSource.GetStartTime(),
PlaybackEndTime: gbSource.GetEndTime(),
PlaybackFileURL: gbSource.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
PlaybackProgress: progress,
Progress: progress,
}
}
statistics := source.GetBitrateStatistics()
response := struct {
*DownloadInfo
AudioEnable bool `json:"AudioEnable"`
CDN string `json:"CDN"`
CascadeSize int `json:"CascadeSize"`
@@ -612,6 +646,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
WEBRTC string `json:"WEBRTC"`
WS_FLV string `json:"WS_FLV"`
}{
DownloadInfo: downloadInfo,
AudioEnable: true,
CDN: "",
CascadeSize: 0,
@@ -669,14 +704,14 @@ func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) {
if source == nil {
log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId)
w.WriteHeader(http.StatusNotFound)
} else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok {
} else if ok := source.GetTransStreamPublisher().StartRecord(); !ok {
w.WriteHeader(http.StatusBadRequest)
} else {
// 返回拉流地址
httpResponseJson(w, &struct {
DownloadURL string `json:"DownloadURL"`
}{
DownloadURL: url,
DownloadURL: source.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
})
}

196
api_gb.go
View File

@@ -1,7 +1,10 @@
package main
import (
"encoding/base64"
"fmt"
"github.com/gorilla/mux"
audio_transcoder "github.com/lkmio/audio-transcoder"
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log"
@@ -9,6 +12,7 @@ import (
"net"
"net/http"
"strconv"
"time"
)
const (
@@ -20,11 +24,26 @@ const (
)
type SDP struct {
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
Addr string `json:"addr,omitempty"` // 连接地址
SSRC string `json:"ssrc,omitempty"`
Setup string `json:"setup,omitempty"` // active/passive
Transport string `json:"transport,omitempty"` // tcp/udp
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
Addr string `json:"addr,omitempty"` // 连接地址
SSRC string `json:"ssrc,omitempty"`
Setup string `json:"setup,omitempty"` // active/passive
Transport string `json:"transport,omitempty"` // tcp/udp
Speed float64 `json:"speed,omitempty"`
StartTime int `json:"start_time,omitempty"`
EndTime int `json:"end_time,omitempty"`
FileSize int `json:"file_size,omitempty"`
}
type DownloadInfo struct {
PlaybackDuration int // 回放/下载时长
PlaybackSpeed float64 // 回放/下载速度
PlaybackFileURL string // 回放/下载文件URL
PlaybackStartTime string // 回放/下载开始时间
PlaybackEndTime string // 回放/下载结束时间
PlaybackFileSize int // 回放/下载文件大小
PlaybackProgress float64 // 1-下载完成
Progress float64
}
type SourceSDP struct {
@@ -38,6 +57,18 @@ type GBOffer struct {
TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"`
}
func Source2GBSource(source stream.Source) gb28181.GBSource {
if gbSource, ok := source.(*gb28181.PassiveSource); ok {
return gbSource
} else if gbSource, ok := source.(*gb28181.ActiveSource); ok {
return gbSource
} else if gbSource, ok := source.(*gb28181.PassiveSource); ok {
return gbSource
}
return nil
}
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建国标源: %v", v)
@@ -73,18 +104,29 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
}
var ssrc string
if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback {
if InviteTypeDownload == v.SessionName || InviteTypePlayback == v.SessionName {
ssrc = gb28181.GetVodSSRC()
} else {
ssrc = gb28181.GetLiveSSRC()
}
ssrcValue, _ := strconv.Atoi(ssrc)
_, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
gbSource, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
if err != nil {
return
} else if InviteTypeDownload == v.SessionName {
// 开启录制
gbSource.GetTransStreamPublisher().StartRecord()
}
startTime := time.Unix(int64(v.StartTime), 0).Format("2006-01-02T15:04:05")
endTime := time.Unix(int64(v.EndTime), 0).Format("2006-01-02T15:04:05")
gbSource.SetSessionName(v.SessionName)
gbSource.SetStartTime(startTime)
gbSource.SetEndTime(endTime)
gbSource.SetSpeed(v.Speed)
gbSource.SetDuration(v.EndTime - v.StartTime)
response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
response.Urls = stream.GetStreamPlayUrls(v.Source)
response.SSRC = ssrc
@@ -94,13 +136,13 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
}
func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
log.Sugar.Infof("设置国标应答: %v", v)
var err error
// 响应错误消息
defer func() {
if err != nil {
log.Sugar.Errorf("设置国标主动拉流失败 err: %s", err.Error())
log.Sugar.Errorf("设置国标应答失败 err: %s", err.Error())
httpResponseError(w, err.Error())
}
}()
@@ -108,22 +150,24 @@ func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *
source := stream.SourceManager.Find(v.Source)
if source == nil {
err = fmt.Errorf("%s 源不存在", v.Source)
return
}
} else if stream.SourceType28181 != source.GetType() {
err = fmt.Errorf("%s 源不是28181类型", v.Source)
} else if activeSource, ok := source.(*gb28181.ActiveSource); ok {
activeSource.SetFileSize(v.FileSize)
// 主动连接取流
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr("tcp", v.Addr)
if err != nil {
return
}
activeSource, ok := source.(*gb28181.ActiveSource)
if !ok {
err = fmt.Errorf("%s 源不是Active拉流类型", v.Source)
return
}
addr, err := net.ResolveTCPAddr("tcp", v.Addr)
if err != nil {
return
}
if err = activeSource.Connect(addr); err == nil {
httpResponseOK(w, nil)
if err = activeSource.Connect(addr); err == nil {
httpResponseOK(w, nil)
}
} else if passiveSource, ok := source.(*gb28181.PassiveSource); ok {
passiveSource.SetFileSize(v.FileSize)
} else if udpSource, ok := source.(*gb28181.UDPSource); ok {
udpSource.SetFileSize(v.FileSize)
}
}
@@ -185,6 +229,21 @@ func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transp
httpResponseOK(w, &response)
}
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("添加sink: %v", *v)
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
httpResponseError(w, "不支持的协议")
return
}
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
}
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
}
// OnGBTalk 国标广播/对讲流程:
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
@@ -237,17 +296,90 @@ func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
talkSource.Close()
}
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("添加sink: %v", *v)
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
httpResponseError(w, "不支持的协议")
// OnLiveGBSTalk liveGBS前端对讲
func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
device := vars["device"]
channel := vars["channel"]
_ = r.URL.Query().Get("format")
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
conn.Close()
return
}
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
// 获取id
id := device + "/" + channel + ".broadcast"
talkSource := gb28181.NewTalkSource(id, conn)
talkSource.Init()
talkSource.SetUrlValues(r.Form)
_, err = stream.PreparePublishSource(talkSource, true)
if err != nil {
log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource)
conn.Close()
return
}
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)
stream.LoopEvent(talkSource)
data := stream.UDPReceiveBufferPool.Get().([]byte)
pcm := make([]byte, 32000)
g711aPacket := make([]byte, stream.UDPReceiveBufferSize/2)
for {
_, bytes, err := conn.ReadMessage()
length := len(bytes)
if err != nil {
log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
break
} else if length < 1 {
continue
}
// 扩容
if int(float64(len(bytes))*1.4) > len(pcm) {
pcm = make([]byte, len(bytes)*2)
}
// base64解密
var pcmN int
pcmN, err = base64.StdEncoding.Decode(bytes, pcm)
if err == nil {
log.Sugar.Errorf(err.Error())
continue
}
for i := 0; i < pcmN; {
// 控制每包大小
n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
copy(data, pcm[:n])
// 编码成G711A
audio_transcoder.EncodeAlawToBuffer(data, g711aPacket)
_, _ = talkSource.PublishSource.Input(g711aPacket[:n/2])
i += n
}
}
talkSource.Close()
}
func (api *ApiServer) OnGBSpeedSet(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
source := stream.SourceManager.Find(v.Source)
if source == nil {
w.WriteHeader(http.StatusBadRequest)
httpResponseError(w, "stream not found")
} else if stream.SourceType28181 != source.GetType() {
w.WriteHeader(http.StatusBadRequest)
httpResponseError(w, "stream type not support")
} else if gbSource := Source2GBSource(source); gbSource != nil {
gbSource.SetSpeed(v.Speed)
}
}

View File

@@ -78,6 +78,21 @@ type GBSource interface {
ProcessPacket(data []byte) error
SetTransport(transport transport.Transport)
GetDuration() int
GetSpeed() float64
GetSessionName() string
GetStartTime() string
GetEndTime() string
GetFileSize() int
GetPlaybackProgress() float64
SetDuration(duration int)
SetSpeed(speed float64)
SetSessionName(sessionName string)
SetStartTime(startTime string)
SetEndTime(endTime string)
SetFileSize(fileSize int)
}
type BaseGBSource struct {
@@ -94,6 +109,15 @@ type BaseGBSource struct {
isSystemClock bool // 推流时间戳不正确, 是否使用系统时间.
lastRtpTimestamp int64
sameTimePackets [][]byte
sessionName string // play/playback/download...
duration int // 回放/下载时长, 单位秒
speed float64 // 回放/下载速度
startTime string // 回放/下载开始时间
endTime string // 回放/下载结束时间
fileSize int // 回放/下载文件大小
playbackProgress float64 // 1-下载完成
playbackDataSize int // 已下载数据大小
}
// ProcessPacket 输入rtp包, 处理PS流, 负责解析->封装->推流
@@ -106,6 +130,13 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error {
source.InitializePublish(packet.SSRC)
}
// 统计下载的进度
source.playbackDataSize += len(data)
source.playbackProgress = float64(source.playbackDataSize) / float64(source.fileSize)
if source.playbackProgress > 1 {
source.playbackProgress = 1
}
// 国标级联转发
if source.GetTransStreamPublisher().GetForwardTransStream() != nil {
if source.lastRtpTimestamp == -1 {
@@ -274,6 +305,58 @@ func (source *BaseGBSource) SetTransport(transport transport.Transport) {
source.transport = transport
}
func (source *BaseGBSource) GetSessionName() string {
return source.sessionName
}
func (source *BaseGBSource) GetStartTime() string {
return source.startTime
}
func (source *BaseGBSource) GetEndTime() string {
return source.endTime
}
func (source *BaseGBSource) GetFileSize() int {
return source.fileSize
}
func (source *BaseGBSource) GetPlaybackProgress() float64 {
return source.playbackProgress
}
func (source *BaseGBSource) SetStartTime(startTime string) {
source.startTime = startTime
}
func (source *BaseGBSource) SetEndTime(endTime string) {
source.endTime = endTime
}
func (source *BaseGBSource) SetFileSize(fileSize int) {
source.fileSize = fileSize
}
func (source *BaseGBSource) SetSessionName(sessionName string) {
// 转小写
source.sessionName = strings.ToLower(sessionName)
}
func (source *BaseGBSource) GetDuration() int {
return source.duration
}
func (source *BaseGBSource) GetSpeed() float64 {
return source.speed
}
func (source *BaseGBSource) SetDuration(duration int) {
source.duration = duration
}
func (source *BaseGBSource) SetSpeed(speed float64) {
source.speed = speed
}
// NewGBSource 创建国标推流源, 返回监听的收流端口
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) {
var transportServer transport.Transport

View File

@@ -205,5 +205,5 @@ func CloseSource(id string) {
// LoopEvent 循环读取事件
func LoopEvent(source Source) {
source.StartTimers(source)
go source.GetTransStreamPublisher().run()
source.GetTransStreamPublisher().start()
}

View File

@@ -1,7 +1,6 @@
package stream
import (
"bytes"
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
@@ -11,8 +10,6 @@ import (
"github.com/lkmio/lkm/transcode"
"github.com/lkmio/transport"
"path/filepath"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -35,7 +32,7 @@ type StreamEvent struct {
type TransStreamPublisher interface {
Post(event *StreamEvent)
run()
start()
close()
@@ -73,13 +70,15 @@ type TransStreamPublisher interface {
// StartRecord 开启录制
// 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false
StartRecord() (string, bool)
StartRecord() bool
// StopRecord 停止录制
// 如果AppConfig已经开启了全局录制, 返回error
StopRecord() error
RecordStartTime() time.Time
GetRecordStreamPlayUrl() string
}
type transStreamPublisher struct {
@@ -90,12 +89,13 @@ type transStreamPublisher struct {
sinkCount int // 拉流计数
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径
recordStartTime time.Time // 开始录制时间
hlsStream TransStream // HLS传输流
originTracks TrackManager // 推流的原始track
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径
recordStartTime time.Time // 开始录制时间
hasManualRecording bool // 是否开启手动录像
hlsStream TransStream // HLS传输流
originTracks TrackManager // 推流的原始track
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
transStreams map[TransStreamID]TransStream // 所有输出流
forwardTransStream TransStream // 转发流
@@ -114,26 +114,7 @@ func (t *transStreamPublisher) Post(event *StreamEvent) {
t.streamEvents.Post(event)
}
func getGoroutineID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func (t *transStreamPublisher) run() {
log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID())
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
t.mainContextEvents = make(chan func(), 256)
t.transStreams = make(map[TransStreamID]TransStream, 10)
t.sinks = make(map[SinkID]Sink, 128)
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
defer func() {
// 清空管道
for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() {
@@ -175,6 +156,18 @@ func (t *transStreamPublisher) run() {
}
}
func (t *transStreamPublisher) start() {
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
t.mainContextEvents = make(chan func(), 256)
t.transStreams = make(map[TransStreamID]TransStream, 10)
t.sinks = make(map[SinkID]Sink, 128)
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
go t.run()
}
func (t *transStreamPublisher) PostEvent(cb func()) {
t.mainContextEvents <- cb
}
@@ -210,7 +203,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
}
// 创建录制流
if AppConfig.Record.Enable {
if AppConfig.Record.Enable || t.hasManualRecording {
t.createRecordSink()
}
@@ -906,25 +899,25 @@ func (t *transStreamPublisher) SetSourceID(id string) {
t.source = id
}
func (t *transStreamPublisher) StartRecord() (string, bool) {
func (t *transStreamPublisher) StartRecord() bool {
if AppConfig.Record.Enable || t.recordSink != nil {
return "", false
return false
}
var ok bool
t.ExecuteSyncEvent(func() {
t.hasManualRecording = true
// 如果探测还未结束
if !t.completed.Load() {
return
}
if t.recordSink == nil && t.createRecordSink() {
ok = t.doAddSink(t.recordSink, false)
}
})
var url string
if ok {
// 去掉反斜杠
url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
}
return url, ok
return ok
}
func (t *transStreamPublisher) StopRecord() error {
@@ -933,6 +926,7 @@ func (t *transStreamPublisher) StopRecord() error {
}
t.ExecuteSyncEvent(func() {
t.hasManualRecording = false
if t.recordSink != nil {
t.clearSinkStreaming(t.recordSink)
t.recordSink.Close()
@@ -949,6 +943,10 @@ func (t *transStreamPublisher) RecordStartTime() time.Time {
return t.recordStartTime
}
func (t *transStreamPublisher) GetRecordStreamPlayUrl() string {
return GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
}
func NewTransStreamPublisher(source string) TransStreamPublisher {
return &transStreamPublisher{
transStreams: make(map[TransStreamID]TransStream),