diff --git a/api.go b/api.go index 0c4a0d2..de1d4ef 100644 --- a/api.go +++ b/api.go @@ -131,9 +131,10 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流 if stream.AppConfig.GB28181.Enable { - apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 + apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 + apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) - apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址 + apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 应答的sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小 } apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) { @@ -511,8 +512,22 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("streamid") source := stream.SourceManager.Find(id) - if source == nil || !source.IsCompleted() || source.IsClosed() { + if source == nil || source.IsClosed() { + w.WriteHeader(http.StatusBadRequest) + httpResponseJson(w, "stream not found") return + } else if !source.IsCompleted() { + // 在请求结束前, 每隔1秒检查track探测是否完成 + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for !source.IsClosed() && !source.IsCompleted() && r.Context().Err() == nil { + select { + case <-ticker.C: + break + case <-r.Context().Done(): + break + } + } } tracks := source.OriginTracks() @@ -572,8 +587,26 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { recordStartTime = startTime.Format("2006-01-02 15:04:05") } + gbSource := Source2GBSource(source) + var downloadInfo *DownloadInfo + if gbSource != nil && InviteTypeDownload == gbSource.GetSessionName() { + progress := gbSource.GetPlaybackProgress() + gbSource.GetTransStreamPublisher() + downloadInfo = &DownloadInfo{ + PlaybackDuration: gbSource.GetDuration(), + PlaybackSpeed: gbSource.GetSpeed(), + PlaybackFileSize: gbSource.GetFileSize(), + PlaybackStartTime: gbSource.GetStartTime(), + PlaybackEndTime: gbSource.GetEndTime(), + PlaybackFileURL: gbSource.GetTransStreamPublisher().GetRecordStreamPlayUrl(), + PlaybackProgress: progress, + Progress: progress, + } + + } statistics := source.GetBitrateStatistics() response := struct { + *DownloadInfo AudioEnable bool `json:"AudioEnable"` CDN string `json:"CDN"` CascadeSize int `json:"CascadeSize"` @@ -612,6 +645,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { WEBRTC string `json:"WEBRTC"` WS_FLV string `json:"WS_FLV"` }{ + DownloadInfo: downloadInfo, AudioEnable: true, CDN: "", CascadeSize: 0, @@ -669,14 +703,14 @@ func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) { if source == nil { log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId) w.WriteHeader(http.StatusNotFound) - } else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok { + } else if ok := source.GetTransStreamPublisher().StartRecord(); !ok { w.WriteHeader(http.StatusBadRequest) } else { // 返回拉流地址 httpResponseJson(w, &struct { DownloadURL string `json:"DownloadURL"` }{ - DownloadURL: url, + DownloadURL: source.GetTransStreamPublisher().GetRecordStreamPlayUrl(), }) } diff --git a/api_gb.go b/api_gb.go index 41c3054..afbe86e 100644 --- a/api_gb.go +++ b/api_gb.go @@ -1,7 +1,10 @@ package main import ( + "encoding/base64" "fmt" + "github.com/gorilla/mux" + audio_transcoder "github.com/lkmio/audio-transcoder" "github.com/lkmio/avformat/bufio" "github.com/lkmio/lkm/gb28181" "github.com/lkmio/lkm/log" @@ -9,6 +12,7 @@ import ( "net" "net/http" "strconv" + "time" ) const ( @@ -25,6 +29,21 @@ type SDP struct { SSRC string `json:"ssrc,omitempty"` Setup string `json:"setup,omitempty"` // active/passive Transport string `json:"transport,omitempty"` // tcp/udp + Speed int `json:"speed,omitempty"` + StartTime int `json:"start_time,omitempty"` + EndTime int `json:"end_time,omitempty"` + FileSize int `json:"file_size,omitempty"` +} + +type DownloadInfo struct { + PlaybackDuration int // 回放/下载时长 + PlaybackSpeed int // 回放/下载速度 + PlaybackFileURL string // 回放/下载文件URL + PlaybackStartTime string // 回放/下载开始时间 + PlaybackEndTime string // 回放/下载结束时间 + PlaybackFileSize int // 回放/下载文件大小 + PlaybackProgress float64 // 1-下载完成 + Progress float64 } type SourceSDP struct { @@ -38,6 +57,18 @@ type GBOffer struct { TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"` } +func Source2GBSource(source stream.Source) gb28181.GBSource { + if gbSource, ok := source.(*gb28181.PassiveSource); ok { + return gbSource + } else if gbSource, ok := source.(*gb28181.ActiveSource); ok { + return gbSource + } else if gbSource, ok := source.(*gb28181.PassiveSource); ok { + return gbSource + } + + return nil +} + func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) { log.Sugar.Infof("创建国标源: %v", v) @@ -73,18 +104,29 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h } var ssrc string - if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback { + if InviteTypeDownload == v.SessionName || InviteTypePlayback == v.SessionName { ssrc = gb28181.GetVodSSRC() } else { ssrc = gb28181.GetLiveSSRC() } ssrcValue, _ := strconv.Atoi(ssrc) - _, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active) + gbSource, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active) if err != nil { return + } else if InviteTypeDownload == v.SessionName { + // 开启录制 + gbSource.GetTransStreamPublisher().StartRecord() } + startTime := time.Unix(int64(v.StartTime), 0).Format("2006-01-02T15:04:05") + endTime := time.Unix(int64(v.EndTime), 0).Format("2006-01-02T15:04:05") + gbSource.SetSessionName(v.SessionName) + gbSource.SetStartTime(startTime) + gbSource.SetEndTime(endTime) + gbSource.SetSpeed(v.Speed) + gbSource.SetDuration(v.EndTime - v.StartTime) + response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port)) response.Urls = stream.GetStreamPlayUrls(v.Source) response.SSRC = ssrc @@ -94,13 +136,13 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h } func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) { - log.Sugar.Infof("设置国标主动拉流连接地址: %v", v) + log.Sugar.Infof("设置国标应答: %v", v) var err error // 响应错误消息 defer func() { if err != nil { - log.Sugar.Errorf("设置国标主动拉流失败 err: %s", err.Error()) + log.Sugar.Errorf("设置国标应答失败 err: %s", err.Error()) httpResponseError(w, err.Error()) } }() @@ -108,22 +150,24 @@ func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r * source := stream.SourceManager.Find(v.Source) if source == nil { err = fmt.Errorf("%s 源不存在", v.Source) - return - } + } else if stream.SourceType28181 != source.GetType() { + err = fmt.Errorf("%s 源不是28181类型", v.Source) + } else if activeSource, ok := source.(*gb28181.ActiveSource); ok { + activeSource.SetFileSize(v.FileSize) + // 主动连接取流 + var addr *net.TCPAddr + addr, err = net.ResolveTCPAddr("tcp", v.Addr) + if err != nil { + return + } - activeSource, ok := source.(*gb28181.ActiveSource) - if !ok { - err = fmt.Errorf("%s 源不是Active拉流类型", v.Source) - return - } - - addr, err := net.ResolveTCPAddr("tcp", v.Addr) - if err != nil { - return - } - - if err = activeSource.Connect(addr); err == nil { - httpResponseOK(w, nil) + if err = activeSource.Connect(addr); err == nil { + httpResponseOK(w, nil) + } + } else if passiveSource, ok := source.(*gb28181.PassiveSource); ok { + passiveSource.SetFileSize(v.FileSize) + } else if udpSource, ok := source.(*gb28181.UDPSource); ok { + udpSource.SetFileSize(v.FileSize) } } @@ -185,6 +229,21 @@ func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transp httpResponseOK(w, &response) } +func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) { + log.Sugar.Infof("添加sink: %v", *v) + if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol { + httpResponseError(w, "不支持的协议") + return + } + + setup := gb28181.SetupTypeFromString(v.Setup) + if v.AnswerSetup != "" { + setup = gb28181.SetupTypeFromString(v.AnswerSetup) + } + + api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r) +} + // OnGBTalk 国标广播/对讲流程: // 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接 // 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求 @@ -237,17 +296,77 @@ func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) { talkSource.Close() } -func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) { - log.Sugar.Infof("添加sink: %v", *v) - if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol { - httpResponseError(w, "不支持的协议") +// OnLiveGBSTalk liveGBS前端对讲 +func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + device := vars["device"] + channel := vars["channel"] + _ = r.URL.Query().Get("format") + + conn, err := api.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error()) + conn.Close() return } - setup := gb28181.SetupTypeFromString(v.Setup) - if v.AnswerSetup != "" { - setup = gb28181.SetupTypeFromString(v.AnswerSetup) + // 获取id + id := device + "/" + channel + ".broadcast" + + talkSource := gb28181.NewTalkSource(id, conn) + talkSource.Init() + talkSource.SetUrlValues(r.Form) + + _, err = stream.PreparePublishSource(talkSource, true) + if err != nil { + log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource) + conn.Close() + return } - api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r) + log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource) + + stream.LoopEvent(talkSource) + + data := stream.UDPReceiveBufferPool.Get().([]byte) + pcm := make([]byte, 32000) + g711aPacket := make([]byte, stream.UDPReceiveBufferSize/2) + + for { + _, bytes, err := conn.ReadMessage() + length := len(bytes) + if err != nil { + log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error()) + break + } else if length < 1 { + continue + } + + // 扩容 + if int(float64(len(bytes))*1.4) > len(pcm) { + pcm = make([]byte, len(bytes)*2) + } + + // base64解密 + var pcmN int + pcmN, err = base64.StdEncoding.Decode(bytes, pcm) + if err == nil { + log.Sugar.Errorf(err.Error()) + continue + } + + for i := 0; i < pcmN; { + // 控制每包大小 + n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i) + copy(data, pcm[:n]) + + // 编码成G711A + audio_transcoder.EncodeAlawToBuffer(data, g711aPacket) + + _, _ = talkSource.PublishSource.Input(g711aPacket[:n/2]) + i += n + } + } + + talkSource.Close() } diff --git a/gb28181/source.go b/gb28181/source.go index a219c15..ce0c371 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -78,6 +78,21 @@ type GBSource interface { ProcessPacket(data []byte) error SetTransport(transport transport.Transport) + + GetDuration() int + GetSpeed() int + GetSessionName() string + GetStartTime() string + GetEndTime() string + GetFileSize() int + GetPlaybackProgress() float64 + + SetDuration(duration int) + SetSpeed(speed int) + SetSessionName(sessionName string) + SetStartTime(startTime string) + SetEndTime(endTime string) + SetFileSize(fileSize int) } type BaseGBSource struct { @@ -94,6 +109,15 @@ type BaseGBSource struct { isSystemClock bool // 推流时间戳不正确, 是否使用系统时间. lastRtpTimestamp int64 sameTimePackets [][]byte + + sessionName string // play/playback/download... + duration int // 回放/下载时长, 单位秒 + speed int // 回放/下载速度 + startTime string // 回放/下载开始时间 + endTime string // 回放/下载结束时间 + fileSize int // 回放/下载文件大小 + playbackProgress float64 // 1-下载完成 + playbackDataSize int // 已下载数据大小 } // ProcessPacket 输入rtp包, 处理PS流, 负责解析->封装->推流 @@ -106,6 +130,13 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error { source.InitializePublish(packet.SSRC) } + // 统计下载的进度 + source.playbackDataSize += len(data) + source.playbackProgress = float64(source.playbackDataSize) / float64(source.fileSize) + if source.playbackProgress > 1 { + source.playbackProgress = 1 + } + // 国标级联转发 if source.GetTransStreamPublisher().GetForwardTransStream() != nil { if source.lastRtpTimestamp == -1 { @@ -274,6 +305,58 @@ func (source *BaseGBSource) SetTransport(transport transport.Transport) { source.transport = transport } +func (source *BaseGBSource) GetSessionName() string { + return source.sessionName +} +func (source *BaseGBSource) GetStartTime() string { + return source.startTime +} + +func (source *BaseGBSource) GetEndTime() string { + return source.endTime +} + +func (source *BaseGBSource) GetFileSize() int { + return source.fileSize +} + +func (source *BaseGBSource) GetPlaybackProgress() float64 { + return source.playbackProgress +} + +func (source *BaseGBSource) SetStartTime(startTime string) { + source.startTime = startTime +} + +func (source *BaseGBSource) SetEndTime(endTime string) { + source.endTime = endTime +} + +func (source *BaseGBSource) SetFileSize(fileSize int) { + source.fileSize = fileSize +} + +func (source *BaseGBSource) SetSessionName(sessionName string) { + // 转小写 + source.sessionName = strings.ToLower(sessionName) +} + +func (source *BaseGBSource) GetDuration() int { + return source.duration +} + +func (source *BaseGBSource) GetSpeed() int { + return source.speed +} + +func (source *BaseGBSource) SetDuration(duration int) { + source.duration = duration +} + +func (source *BaseGBSource) SetSpeed(speed int) { + source.speed = speed +} + // NewGBSource 创建国标推流源, 返回监听的收流端口 func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) { var transportServer transport.Transport diff --git a/stream/source_utils.go b/stream/source_utils.go index ec5f44b..25059f8 100644 --- a/stream/source_utils.go +++ b/stream/source_utils.go @@ -205,5 +205,5 @@ func CloseSource(id string) { // LoopEvent 循环读取事件 func LoopEvent(source Source) { source.StartTimers(source) - go source.GetTransStreamPublisher().run() + source.GetTransStreamPublisher().start() } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index b326109..48b921d 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -1,7 +1,6 @@ package stream import ( - "bytes" "fmt" "github.com/lkmio/avformat" "github.com/lkmio/avformat/collections" @@ -11,8 +10,6 @@ import ( "github.com/lkmio/lkm/transcode" "github.com/lkmio/transport" "path/filepath" - "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -35,7 +32,7 @@ type StreamEvent struct { type TransStreamPublisher interface { Post(event *StreamEvent) - run() + start() close() @@ -73,13 +70,15 @@ type TransStreamPublisher interface { // StartRecord 开启录制 // 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false - StartRecord() (string, bool) + StartRecord() bool // StopRecord 停止录制 // 如果AppConfig已经开启了全局录制, 返回error StopRecord() error RecordStartTime() time.Time + + GetRecordStreamPlayUrl() string } type transStreamPublisher struct { @@ -90,12 +89,13 @@ type transStreamPublisher struct { sinkCount int // 拉流计数 gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop - recordSink Sink // 每个Source的录制流 - recordFilePath string // 录制流文件路径 - recordStartTime time.Time // 开始录制时间 - hlsStream TransStream // HLS传输流 - originTracks TrackManager // 推流的原始track - transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track + recordSink Sink // 每个Source的录制流 + recordFilePath string // 录制流文件路径 + recordStartTime time.Time // 开始录制时间 + hasManualRecording bool // 是否开启手动录像 + hlsStream TransStream // HLS传输流 + originTracks TrackManager // 推流的原始track + transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track transStreams map[TransStreamID]TransStream // 所有输出流 forwardTransStream TransStream // 转发流 @@ -114,26 +114,7 @@ func (t *transStreamPublisher) Post(event *StreamEvent) { t.streamEvents.Post(event) } -func getGoroutineID() uint64 { - b := make([]byte, 64) - b = b[:runtime.Stack(b, false)] - b = bytes.TrimPrefix(b, []byte("goroutine ")) - b = b[:bytes.IndexByte(b, ' ')] - n, _ := strconv.ParseUint(string(b), 10, 64) - return n -} - func (t *transStreamPublisher) run() { - log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID()) - - t.streamEvents = NewNonBlockingChannel[*StreamEvent](256) - t.mainContextEvents = make(chan func(), 256) - - t.transStreams = make(map[TransStreamID]TransStream, 10) - t.sinks = make(map[SinkID]Sink, 128) - t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) - t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4) - defer func() { // 清空管道 for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() { @@ -175,6 +156,18 @@ func (t *transStreamPublisher) run() { } } +func (t *transStreamPublisher) start() { + t.streamEvents = NewNonBlockingChannel[*StreamEvent](256) + t.mainContextEvents = make(chan func(), 256) + + t.transStreams = make(map[TransStreamID]TransStream, 10) + t.sinks = make(map[SinkID]Sink, 128) + t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) + t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4) + + go t.run() +} + func (t *transStreamPublisher) PostEvent(cb func()) { t.mainContextEvents <- cb } @@ -210,7 +203,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() { } // 创建录制流 - if AppConfig.Record.Enable { + if AppConfig.Record.Enable || t.hasManualRecording { t.createRecordSink() } @@ -906,25 +899,25 @@ func (t *transStreamPublisher) SetSourceID(id string) { t.source = id } -func (t *transStreamPublisher) StartRecord() (string, bool) { +func (t *transStreamPublisher) StartRecord() bool { if AppConfig.Record.Enable || t.recordSink != nil { - return "", false + return false } var ok bool t.ExecuteSyncEvent(func() { + t.hasManualRecording = true + // 如果探测还未结束 + if !t.completed.Load() { + return + } + if t.recordSink == nil && t.createRecordSink() { ok = t.doAddSink(t.recordSink, false) } }) - var url string - if ok { - // 去掉反斜杠 - url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath)) - } - - return url, ok + return ok } func (t *transStreamPublisher) StopRecord() error { @@ -933,6 +926,7 @@ func (t *transStreamPublisher) StopRecord() error { } t.ExecuteSyncEvent(func() { + t.hasManualRecording = false if t.recordSink != nil { t.clearSinkStreaming(t.recordSink) t.recordSink.Close() @@ -949,6 +943,10 @@ func (t *transStreamPublisher) RecordStartTime() time.Time { return t.recordStartTime } +func (t *transStreamPublisher) GetRecordStreamPlayUrl() string { + return GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath)) +} + func NewTransStreamPublisher(source string) TransStreamPublisher { return &transStreamPublisher{ transStreams: make(map[TransStreamID]TransStream),