From aab75eecfd964a1dde3a87d1a7effdfaae28eefb Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Fri, 15 Nov 2024 20:05:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8E=A8=E6=B5=81=E7=A0=81?= =?UTF-8?q?=E6=B5=81=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 2 +- main.go | 1 + stream/bitrate_statistics.go | 56 ++++++++++++++++++++++++++++++++++++ stream/source.go | 13 +++++++-- 4 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 stream/bitrate_statistics.go diff --git a/api.go b/api.go index 145168d..0c3c3a5 100644 --- a/api.go +++ b/api.go @@ -405,7 +405,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) { Protocol: source.GetType().String(), Time: source.CreateTime(), SinkCount: source.SinkCount(), - Bitrate: "", // 后续开发 + Bitrate: strconv.Itoa(source.GetBitrateStatistics().PreviousSecond()/1024) + "KBS", // 后续开发 Tracks: tracks, }) } diff --git a/main.go b/main.go index 3d3c35f..acd8787 100644 --- a/main.go +++ b/main.go @@ -135,6 +135,7 @@ func init() { "record": &config.Record, } + // 读取运行参数 disableOptions, enableOptions := readRunArgs() mergeArgs(options, disableOptions, enableOptions) diff --git a/stream/bitrate_statistics.go b/stream/bitrate_statistics.go new file mode 100644 index 0000000..bc3190a --- /dev/null +++ b/stream/bitrate_statistics.go @@ -0,0 +1,56 @@ +package stream + +import "time" + +// BitrateStatistics 码流统计, 单位Byte +type BitrateStatistics struct { + totalBytes int64 // 总共传输的字节数 + elapsedSeconds int // 经过的秒数 + currentSecond int // 当前秒数 + + previousSecondBytes int // 前一秒传输的字节数 + latestSecondBytes int // 当前秒正在传输的字节数 +} + +func (b *BitrateStatistics) Input(size int) { + b.totalBytes += int64(size) + + second := time.Now().Second() + if b.currentSecond == -1 { + b.currentSecond = second + } + + if second != b.currentSecond { + b.elapsedSeconds++ + b.currentSecond = second + b.previousSecondBytes = b.latestSecondBytes + b.latestSecondBytes = 0 + } + + b.latestSecondBytes += size +} + +// Average 返回每秒平均码流大小 +func (b *BitrateStatistics) Average() int { + if b.elapsedSeconds < 1 { + return b.latestSecondBytes + } + + return int((b.totalBytes - int64(b.latestSecondBytes)) / int64(b.elapsedSeconds)) +} + +// Total 返回总码流大小 +func (b *BitrateStatistics) Total() int64 { + return b.totalBytes +} + +// PreviousSecond 返回前一秒的码流大小 +func (b *BitrateStatistics) PreviousSecond() int { + return b.previousSecondBytes +} + +func NewBitrateStatistics() *BitrateStatistics { + return &BitrateStatistics{ + currentSecond: -1, + } +} diff --git a/stream/source.go b/stream/source.go index a32b4d6..5d0e46f 100644 --- a/stream/source.go +++ b/stream/source.go @@ -117,6 +117,8 @@ type Source interface { SetCreateTime(time time.Time) Sinks() []Sink + + GetBitrateStatistics() *BitrateStatistics } type PublishSource struct { @@ -154,7 +156,8 @@ type PublishSource struct { sinkCount int // 拉流端计数 urlValues url.Values // 推流url携带的参数 timeoutTracks []int - createTime time.Time // source创建时间 + createTime time.Time // source创建时间 + statistics *BitrateStatistics // 码流统计 } func (s *PublishSource) SetLastPacketTime(time2 time.Time) { @@ -204,6 +207,7 @@ func (s *PublishSource) Init(receiveQueueSize int) { s.TransStreams = make(map[TransStreamID]TransStream, 10) s.sinks = make(map[SinkID]Sink, 128) s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) + s.statistics = NewBitrateStatistics() } func (s *PublishSource) CreateDefaultOutStreams() { @@ -263,6 +267,7 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe func (s *PublishSource) Input(data []byte) error { s.streamPipe <- data + s.statistics.Input(len(data)) return nil } @@ -348,7 +353,7 @@ func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int //return } - // 推流失败, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. + // 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. // 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流. _, ok := err.(*transport.ZeroWindowSizeError) if ok { @@ -845,3 +850,7 @@ func (s *PublishSource) Sinks() []Sink { group.Wait() return sinks } + +func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics { + return s.statistics +}