mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
Compare commits
2 Commits
69308c466b
...
ef313b1ea4
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ef313b1ea4 | ||
![]() |
a5b7fc6f24 |
47
api.go
47
api.go
@@ -131,9 +131,11 @@ func startApiServer(addr string) {
|
||||
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
|
||||
|
||||
if stream.AppConfig.GB28181.Enable {
|
||||
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址
|
||||
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
|
||||
}
|
||||
|
||||
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
|
||||
@@ -511,8 +513,22 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request
|
||||
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("streamid")
|
||||
source := stream.SourceManager.Find(id)
|
||||
if source == nil || !source.IsCompleted() || source.IsClosed() {
|
||||
if source == nil || source.IsClosed() {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
httpResponseJson(w, "stream not found")
|
||||
return
|
||||
} else if !source.IsCompleted() {
|
||||
// 在请求结束前, 每隔1秒检查track探测是否完成
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for !source.IsClosed() && !source.IsCompleted() && r.Context().Err() == nil {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
break
|
||||
case <-r.Context().Done():
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracks := source.OriginTracks()
|
||||
@@ -572,8 +588,26 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
|
||||
recordStartTime = startTime.Format("2006-01-02 15:04:05")
|
||||
}
|
||||
|
||||
gbSource := Source2GBSource(source)
|
||||
var downloadInfo *DownloadInfo
|
||||
if gbSource != nil && InviteTypeDownload == gbSource.GetSessionName() {
|
||||
progress := gbSource.GetPlaybackProgress()
|
||||
gbSource.GetTransStreamPublisher()
|
||||
downloadInfo = &DownloadInfo{
|
||||
PlaybackDuration: gbSource.GetDuration(),
|
||||
PlaybackSpeed: gbSource.GetSpeed(),
|
||||
PlaybackFileSize: gbSource.GetFileSize(),
|
||||
PlaybackStartTime: gbSource.GetStartTime(),
|
||||
PlaybackEndTime: gbSource.GetEndTime(),
|
||||
PlaybackFileURL: gbSource.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
|
||||
PlaybackProgress: progress,
|
||||
Progress: progress,
|
||||
}
|
||||
|
||||
}
|
||||
statistics := source.GetBitrateStatistics()
|
||||
response := struct {
|
||||
*DownloadInfo
|
||||
AudioEnable bool `json:"AudioEnable"`
|
||||
CDN string `json:"CDN"`
|
||||
CascadeSize int `json:"CascadeSize"`
|
||||
@@ -612,6 +646,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
|
||||
WEBRTC string `json:"WEBRTC"`
|
||||
WS_FLV string `json:"WS_FLV"`
|
||||
}{
|
||||
DownloadInfo: downloadInfo,
|
||||
AudioEnable: true,
|
||||
CDN: "",
|
||||
CascadeSize: 0,
|
||||
@@ -669,14 +704,14 @@ func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) {
|
||||
if source == nil {
|
||||
log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId)
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
} else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok {
|
||||
} else if ok := source.GetTransStreamPublisher().StartRecord(); !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
} else {
|
||||
// 返回拉流地址
|
||||
httpResponseJson(w, &struct {
|
||||
DownloadURL string `json:"DownloadURL"`
|
||||
}{
|
||||
DownloadURL: url,
|
||||
DownloadURL: source.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
|
||||
})
|
||||
}
|
||||
|
||||
|
196
api_gb.go
196
api_gb.go
@@ -1,7 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
audio_transcoder "github.com/lkmio/audio-transcoder"
|
||||
"github.com/lkmio/avformat/bufio"
|
||||
"github.com/lkmio/lkm/gb28181"
|
||||
"github.com/lkmio/lkm/log"
|
||||
@@ -9,6 +12,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -20,11 +24,26 @@ const (
|
||||
)
|
||||
|
||||
type SDP struct {
|
||||
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
|
||||
Addr string `json:"addr,omitempty"` // 连接地址
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
Setup string `json:"setup,omitempty"` // active/passive
|
||||
Transport string `json:"transport,omitempty"` // tcp/udp
|
||||
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
|
||||
Addr string `json:"addr,omitempty"` // 连接地址
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
Setup string `json:"setup,omitempty"` // active/passive
|
||||
Transport string `json:"transport,omitempty"` // tcp/udp
|
||||
Speed float64 `json:"speed,omitempty"`
|
||||
StartTime int `json:"start_time,omitempty"`
|
||||
EndTime int `json:"end_time,omitempty"`
|
||||
FileSize int `json:"file_size,omitempty"`
|
||||
}
|
||||
|
||||
type DownloadInfo struct {
|
||||
PlaybackDuration int // 回放/下载时长
|
||||
PlaybackSpeed float64 // 回放/下载速度
|
||||
PlaybackFileURL string // 回放/下载文件URL
|
||||
PlaybackStartTime string // 回放/下载开始时间
|
||||
PlaybackEndTime string // 回放/下载结束时间
|
||||
PlaybackFileSize int // 回放/下载文件大小
|
||||
PlaybackProgress float64 // 1-下载完成
|
||||
Progress float64
|
||||
}
|
||||
|
||||
type SourceSDP struct {
|
||||
@@ -38,6 +57,18 @@ type GBOffer struct {
|
||||
TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"`
|
||||
}
|
||||
|
||||
func Source2GBSource(source stream.Source) gb28181.GBSource {
|
||||
if gbSource, ok := source.(*gb28181.PassiveSource); ok {
|
||||
return gbSource
|
||||
} else if gbSource, ok := source.(*gb28181.ActiveSource); ok {
|
||||
return gbSource
|
||||
} else if gbSource, ok := source.(*gb28181.PassiveSource); ok {
|
||||
return gbSource
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("创建国标源: %v", v)
|
||||
|
||||
@@ -73,18 +104,29 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
|
||||
}
|
||||
|
||||
var ssrc string
|
||||
if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback {
|
||||
if InviteTypeDownload == v.SessionName || InviteTypePlayback == v.SessionName {
|
||||
ssrc = gb28181.GetVodSSRC()
|
||||
} else {
|
||||
ssrc = gb28181.GetLiveSSRC()
|
||||
}
|
||||
|
||||
ssrcValue, _ := strconv.Atoi(ssrc)
|
||||
_, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
|
||||
gbSource, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
|
||||
if err != nil {
|
||||
return
|
||||
} else if InviteTypeDownload == v.SessionName {
|
||||
// 开启录制
|
||||
gbSource.GetTransStreamPublisher().StartRecord()
|
||||
}
|
||||
|
||||
startTime := time.Unix(int64(v.StartTime), 0).Format("2006-01-02T15:04:05")
|
||||
endTime := time.Unix(int64(v.EndTime), 0).Format("2006-01-02T15:04:05")
|
||||
gbSource.SetSessionName(v.SessionName)
|
||||
gbSource.SetStartTime(startTime)
|
||||
gbSource.SetEndTime(endTime)
|
||||
gbSource.SetSpeed(v.Speed)
|
||||
gbSource.SetDuration(v.EndTime - v.StartTime)
|
||||
|
||||
response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
|
||||
response.Urls = stream.GetStreamPlayUrls(v.Source)
|
||||
response.SSRC = ssrc
|
||||
@@ -94,13 +136,13 @@ func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *h
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
|
||||
log.Sugar.Infof("设置国标应答: %v", v)
|
||||
|
||||
var err error
|
||||
// 响应错误消息
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("设置国标主动拉流失败 err: %s", err.Error())
|
||||
log.Sugar.Errorf("设置国标应答失败 err: %s", err.Error())
|
||||
httpResponseError(w, err.Error())
|
||||
}
|
||||
}()
|
||||
@@ -108,22 +150,24 @@ func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *
|
||||
source := stream.SourceManager.Find(v.Source)
|
||||
if source == nil {
|
||||
err = fmt.Errorf("%s 源不存在", v.Source)
|
||||
return
|
||||
}
|
||||
} else if stream.SourceType28181 != source.GetType() {
|
||||
err = fmt.Errorf("%s 源不是28181类型", v.Source)
|
||||
} else if activeSource, ok := source.(*gb28181.ActiveSource); ok {
|
||||
activeSource.SetFileSize(v.FileSize)
|
||||
// 主动连接取流
|
||||
var addr *net.TCPAddr
|
||||
addr, err = net.ResolveTCPAddr("tcp", v.Addr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
activeSource, ok := source.(*gb28181.ActiveSource)
|
||||
if !ok {
|
||||
err = fmt.Errorf("%s 源不是Active拉流类型", v.Source)
|
||||
return
|
||||
}
|
||||
|
||||
addr, err := net.ResolveTCPAddr("tcp", v.Addr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = activeSource.Connect(addr); err == nil {
|
||||
httpResponseOK(w, nil)
|
||||
if err = activeSource.Connect(addr); err == nil {
|
||||
httpResponseOK(w, nil)
|
||||
}
|
||||
} else if passiveSource, ok := source.(*gb28181.PassiveSource); ok {
|
||||
passiveSource.SetFileSize(v.FileSize)
|
||||
} else if udpSource, ok := source.(*gb28181.UDPSource); ok {
|
||||
udpSource.SetFileSize(v.FileSize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,6 +229,21 @@ func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transp
|
||||
httpResponseOK(w, &response)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("添加sink: %v", *v)
|
||||
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
|
||||
httpResponseError(w, "不支持的协议")
|
||||
return
|
||||
}
|
||||
|
||||
setup := gb28181.SetupTypeFromString(v.Setup)
|
||||
if v.AnswerSetup != "" {
|
||||
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
|
||||
}
|
||||
|
||||
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
|
||||
}
|
||||
|
||||
// OnGBTalk 国标广播/对讲流程:
|
||||
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
|
||||
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
|
||||
@@ -237,17 +296,90 @@ func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
|
||||
talkSource.Close()
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("添加sink: %v", *v)
|
||||
if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
|
||||
httpResponseError(w, "不支持的协议")
|
||||
// OnLiveGBSTalk liveGBS前端对讲
|
||||
func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
device := vars["device"]
|
||||
channel := vars["channel"]
|
||||
_ = r.URL.Query().Get("format")
|
||||
|
||||
conn, err := api.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
setup := gb28181.SetupTypeFromString(v.Setup)
|
||||
if v.AnswerSetup != "" {
|
||||
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
|
||||
// 获取id
|
||||
id := device + "/" + channel + ".broadcast"
|
||||
|
||||
talkSource := gb28181.NewTalkSource(id, conn)
|
||||
talkSource.Init()
|
||||
talkSource.SetUrlValues(r.Form)
|
||||
|
||||
_, err = stream.PreparePublishSource(talkSource, true)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
|
||||
log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)
|
||||
|
||||
stream.LoopEvent(talkSource)
|
||||
|
||||
data := stream.UDPReceiveBufferPool.Get().([]byte)
|
||||
pcm := make([]byte, 32000)
|
||||
g711aPacket := make([]byte, stream.UDPReceiveBufferSize/2)
|
||||
|
||||
for {
|
||||
_, bytes, err := conn.ReadMessage()
|
||||
length := len(bytes)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
|
||||
break
|
||||
} else if length < 1 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 扩容
|
||||
if int(float64(len(bytes))*1.4) > len(pcm) {
|
||||
pcm = make([]byte, len(bytes)*2)
|
||||
}
|
||||
|
||||
// base64解密
|
||||
var pcmN int
|
||||
pcmN, err = base64.StdEncoding.Decode(bytes, pcm)
|
||||
if err == nil {
|
||||
log.Sugar.Errorf(err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
for i := 0; i < pcmN; {
|
||||
// 控制每包大小
|
||||
n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
|
||||
copy(data, pcm[:n])
|
||||
|
||||
// 编码成G711A
|
||||
audio_transcoder.EncodeAlawToBuffer(data, g711aPacket)
|
||||
|
||||
_, _ = talkSource.PublishSource.Input(g711aPacket[:n/2])
|
||||
i += n
|
||||
}
|
||||
}
|
||||
|
||||
talkSource.Close()
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSpeedSet(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
source := stream.SourceManager.Find(v.Source)
|
||||
if source == nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
httpResponseError(w, "stream not found")
|
||||
} else if stream.SourceType28181 != source.GetType() {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
httpResponseError(w, "stream type not support")
|
||||
} else if gbSource := Source2GBSource(source); gbSource != nil {
|
||||
gbSource.SetSpeed(v.Speed)
|
||||
}
|
||||
}
|
||||
|
@@ -78,6 +78,21 @@ type GBSource interface {
|
||||
ProcessPacket(data []byte) error
|
||||
|
||||
SetTransport(transport transport.Transport)
|
||||
|
||||
GetDuration() int
|
||||
GetSpeed() float64
|
||||
GetSessionName() string
|
||||
GetStartTime() string
|
||||
GetEndTime() string
|
||||
GetFileSize() int
|
||||
GetPlaybackProgress() float64
|
||||
|
||||
SetDuration(duration int)
|
||||
SetSpeed(speed float64)
|
||||
SetSessionName(sessionName string)
|
||||
SetStartTime(startTime string)
|
||||
SetEndTime(endTime string)
|
||||
SetFileSize(fileSize int)
|
||||
}
|
||||
|
||||
type BaseGBSource struct {
|
||||
@@ -94,6 +109,15 @@ type BaseGBSource struct {
|
||||
isSystemClock bool // 推流时间戳不正确, 是否使用系统时间.
|
||||
lastRtpTimestamp int64
|
||||
sameTimePackets [][]byte
|
||||
|
||||
sessionName string // play/playback/download...
|
||||
duration int // 回放/下载时长, 单位秒
|
||||
speed float64 // 回放/下载速度
|
||||
startTime string // 回放/下载开始时间
|
||||
endTime string // 回放/下载结束时间
|
||||
fileSize int // 回放/下载文件大小
|
||||
playbackProgress float64 // 1-下载完成
|
||||
playbackDataSize int // 已下载数据大小
|
||||
}
|
||||
|
||||
// ProcessPacket 输入rtp包, 处理PS流, 负责解析->封装->推流
|
||||
@@ -106,6 +130,13 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error {
|
||||
source.InitializePublish(packet.SSRC)
|
||||
}
|
||||
|
||||
// 统计下载的进度
|
||||
source.playbackDataSize += len(data)
|
||||
source.playbackProgress = float64(source.playbackDataSize) / float64(source.fileSize)
|
||||
if source.playbackProgress > 1 {
|
||||
source.playbackProgress = 1
|
||||
}
|
||||
|
||||
// 国标级联转发
|
||||
if source.GetTransStreamPublisher().GetForwardTransStream() != nil {
|
||||
if source.lastRtpTimestamp == -1 {
|
||||
@@ -274,6 +305,58 @@ func (source *BaseGBSource) SetTransport(transport transport.Transport) {
|
||||
source.transport = transport
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetSessionName() string {
|
||||
return source.sessionName
|
||||
}
|
||||
func (source *BaseGBSource) GetStartTime() string {
|
||||
return source.startTime
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetEndTime() string {
|
||||
return source.endTime
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetFileSize() int {
|
||||
return source.fileSize
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetPlaybackProgress() float64 {
|
||||
return source.playbackProgress
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetStartTime(startTime string) {
|
||||
source.startTime = startTime
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetEndTime(endTime string) {
|
||||
source.endTime = endTime
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetFileSize(fileSize int) {
|
||||
source.fileSize = fileSize
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetSessionName(sessionName string) {
|
||||
// 转小写
|
||||
source.sessionName = strings.ToLower(sessionName)
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetDuration() int {
|
||||
return source.duration
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) GetSpeed() float64 {
|
||||
return source.speed
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetDuration(duration int) {
|
||||
source.duration = duration
|
||||
}
|
||||
|
||||
func (source *BaseGBSource) SetSpeed(speed float64) {
|
||||
source.speed = speed
|
||||
}
|
||||
|
||||
// NewGBSource 创建国标推流源, 返回监听的收流端口
|
||||
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) {
|
||||
var transportServer transport.Transport
|
||||
|
@@ -205,5 +205,5 @@ func CloseSource(id string) {
|
||||
// LoopEvent 循环读取事件
|
||||
func LoopEvent(source Source) {
|
||||
source.StartTimers(source)
|
||||
go source.GetTransStreamPublisher().run()
|
||||
source.GetTransStreamPublisher().start()
|
||||
}
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
@@ -11,8 +10,6 @@ import (
|
||||
"github.com/lkmio/lkm/transcode"
|
||||
"github.com/lkmio/transport"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -35,7 +32,7 @@ type StreamEvent struct {
|
||||
type TransStreamPublisher interface {
|
||||
Post(event *StreamEvent)
|
||||
|
||||
run()
|
||||
start()
|
||||
|
||||
close()
|
||||
|
||||
@@ -73,13 +70,15 @@ type TransStreamPublisher interface {
|
||||
|
||||
// StartRecord 开启录制
|
||||
// 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false
|
||||
StartRecord() (string, bool)
|
||||
StartRecord() bool
|
||||
|
||||
// StopRecord 停止录制
|
||||
// 如果AppConfig已经开启了全局录制, 返回error
|
||||
StopRecord() error
|
||||
|
||||
RecordStartTime() time.Time
|
||||
|
||||
GetRecordStreamPlayUrl() string
|
||||
}
|
||||
|
||||
type transStreamPublisher struct {
|
||||
@@ -90,12 +89,13 @@ type transStreamPublisher struct {
|
||||
sinkCount int // 拉流计数
|
||||
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
|
||||
|
||||
recordSink Sink // 每个Source的录制流
|
||||
recordFilePath string // 录制流文件路径
|
||||
recordStartTime time.Time // 开始录制时间
|
||||
hlsStream TransStream // HLS传输流
|
||||
originTracks TrackManager // 推流的原始track
|
||||
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
|
||||
recordSink Sink // 每个Source的录制流
|
||||
recordFilePath string // 录制流文件路径
|
||||
recordStartTime time.Time // 开始录制时间
|
||||
hasManualRecording bool // 是否开启手动录像
|
||||
hlsStream TransStream // HLS传输流
|
||||
originTracks TrackManager // 推流的原始track
|
||||
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
|
||||
|
||||
transStreams map[TransStreamID]TransStream // 所有输出流
|
||||
forwardTransStream TransStream // 转发流
|
||||
@@ -114,26 +114,7 @@ func (t *transStreamPublisher) Post(event *StreamEvent) {
|
||||
t.streamEvents.Post(event)
|
||||
}
|
||||
|
||||
func getGoroutineID() uint64 {
|
||||
b := make([]byte, 64)
|
||||
b = b[:runtime.Stack(b, false)]
|
||||
b = bytes.TrimPrefix(b, []byte("goroutine "))
|
||||
b = b[:bytes.IndexByte(b, ' ')]
|
||||
n, _ := strconv.ParseUint(string(b), 10, 64)
|
||||
return n
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) run() {
|
||||
log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID())
|
||||
|
||||
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
|
||||
t.mainContextEvents = make(chan func(), 256)
|
||||
|
||||
t.transStreams = make(map[TransStreamID]TransStream, 10)
|
||||
t.sinks = make(map[SinkID]Sink, 128)
|
||||
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
|
||||
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
|
||||
|
||||
defer func() {
|
||||
// 清空管道
|
||||
for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() {
|
||||
@@ -175,6 +156,18 @@ func (t *transStreamPublisher) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) start() {
|
||||
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
|
||||
t.mainContextEvents = make(chan func(), 256)
|
||||
|
||||
t.transStreams = make(map[TransStreamID]TransStream, 10)
|
||||
t.sinks = make(map[SinkID]Sink, 128)
|
||||
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
|
||||
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
|
||||
|
||||
go t.run()
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) PostEvent(cb func()) {
|
||||
t.mainContextEvents <- cb
|
||||
}
|
||||
@@ -210,7 +203,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
|
||||
}
|
||||
|
||||
// 创建录制流
|
||||
if AppConfig.Record.Enable {
|
||||
if AppConfig.Record.Enable || t.hasManualRecording {
|
||||
t.createRecordSink()
|
||||
}
|
||||
|
||||
@@ -906,25 +899,25 @@ func (t *transStreamPublisher) SetSourceID(id string) {
|
||||
t.source = id
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) StartRecord() (string, bool) {
|
||||
func (t *transStreamPublisher) StartRecord() bool {
|
||||
if AppConfig.Record.Enable || t.recordSink != nil {
|
||||
return "", false
|
||||
return false
|
||||
}
|
||||
|
||||
var ok bool
|
||||
t.ExecuteSyncEvent(func() {
|
||||
t.hasManualRecording = true
|
||||
// 如果探测还未结束
|
||||
if !t.completed.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
if t.recordSink == nil && t.createRecordSink() {
|
||||
ok = t.doAddSink(t.recordSink, false)
|
||||
}
|
||||
})
|
||||
|
||||
var url string
|
||||
if ok {
|
||||
// 去掉反斜杠
|
||||
url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
|
||||
}
|
||||
|
||||
return url, ok
|
||||
return ok
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) StopRecord() error {
|
||||
@@ -933,6 +926,7 @@ func (t *transStreamPublisher) StopRecord() error {
|
||||
}
|
||||
|
||||
t.ExecuteSyncEvent(func() {
|
||||
t.hasManualRecording = false
|
||||
if t.recordSink != nil {
|
||||
t.clearSinkStreaming(t.recordSink)
|
||||
t.recordSink.Close()
|
||||
@@ -949,6 +943,10 @@ func (t *transStreamPublisher) RecordStartTime() time.Time {
|
||||
return t.recordStartTime
|
||||
}
|
||||
|
||||
func (t *transStreamPublisher) GetRecordStreamPlayUrl() string {
|
||||
return GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
|
||||
}
|
||||
|
||||
func NewTransStreamPublisher(source string) TransStreamPublisher {
|
||||
return &transStreamPublisher{
|
||||
transStreams: make(map[TransStreamID]TransStream),
|
||||
|
Reference in New Issue
Block a user