mirror of
https://github.com/cnotch/ipchub.git
synced 2025-09-27 03:45:54 +08:00
add media/stream
This commit is contained in:
1
go.mod
1
go.mod
@@ -13,6 +13,7 @@ require (
|
||||
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3
|
||||
github.com/kelindar/rate v1.0.0
|
||||
github.com/pion/rtp v1.6.1
|
||||
github.com/pixelbender/go-sdp v1.1.0
|
||||
github.com/stretchr/testify v1.6.1
|
||||
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
|
2
go.sum
2
go.sum
@@ -33,6 +33,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
|
||||
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
|
||||
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
|
||||
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
|
||||
github.com/pixelbender/go-sdp v1.1.0 h1:rkm9aFBNKrnB+YGfhLmAkal3pC8XYXb9h+172PlrCBU=
|
||||
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
24
media/cid.go
24
media/cid.go
@@ -8,13 +8,13 @@ import (
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ConsumerType 消费者类型
|
||||
type ConsumerType uint32
|
||||
// PacketType 消费媒体包类型
|
||||
type PacketType uint32
|
||||
|
||||
// 预定义消费者类型
|
||||
// 预定义消费媒体包类型
|
||||
const (
|
||||
RTPConsumer ConsumerType = iota // 根据 RTP 协议打包的媒体
|
||||
FLVConsumer
|
||||
RTPPacket PacketType = iota // 根据 RTP 协议打包的媒体
|
||||
FLVPacket
|
||||
|
||||
maxConsumerSequence = 0x3fff_ffff
|
||||
)
|
||||
@@ -24,11 +24,11 @@ const (
|
||||
type CID uint32
|
||||
|
||||
// String 类型的字串表示
|
||||
func (t ConsumerType) String() string {
|
||||
func (t PacketType) String() string {
|
||||
switch t {
|
||||
case RTPConsumer:
|
||||
case RTPPacket:
|
||||
return "RTP"
|
||||
case FLVConsumer:
|
||||
case FLVPacket:
|
||||
return "FLV"
|
||||
default:
|
||||
return "Unknown"
|
||||
@@ -36,18 +36,18 @@ func (t ConsumerType) String() string {
|
||||
}
|
||||
|
||||
// NewCID 创建新的流消费ID
|
||||
func NewCID(consumerType ConsumerType, consumerSequenceSeed *uint32) CID {
|
||||
func NewCID(packetType PacketType, consumerSequenceSeed *uint32) CID {
|
||||
localid := atomic.AddUint32(consumerSequenceSeed, 1)
|
||||
if localid >= maxConsumerSequence {
|
||||
localid = 1
|
||||
atomic.StoreUint32(consumerSequenceSeed, localid)
|
||||
}
|
||||
return CID(consumerType<<30) | CID(localid&maxConsumerSequence)
|
||||
return CID(packetType<<30) | CID(localid&maxConsumerSequence)
|
||||
}
|
||||
|
||||
// Type 获取消费者类型
|
||||
func (id CID) Type() ConsumerType {
|
||||
return ConsumerType((id >> 30) & 0x3)
|
||||
func (id CID) Type() PacketType {
|
||||
return PacketType((id >> 30) & 0x3)
|
||||
}
|
||||
|
||||
// Sequence 获取消费者序号
|
||||
|
@@ -13,15 +13,15 @@ func TestCID(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
typ ConsumerType
|
||||
typ PacketType
|
||||
}{
|
||||
{
|
||||
"NewRTPConsumer",
|
||||
RTPConsumer,
|
||||
RTPPacket,
|
||||
},
|
||||
{
|
||||
"NewFLVConsumer",
|
||||
FLVConsumer,
|
||||
FLVPacket,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
121
media/consumption.go
Executable file
121
media/consumption.go
Executable file
@@ -0,0 +1,121 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"io"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/cnotch/ipchub/media/cache"
|
||||
"github.com/cnotch/ipchub/stats"
|
||||
"github.com/cnotch/xlog"
|
||||
)
|
||||
|
||||
// Consumer 消费者接口
|
||||
type Consumer interface {
|
||||
PacketType() PacketType
|
||||
NetType() string
|
||||
Addr() string
|
||||
Consume(pack cache.Pack)
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// consumption 流媒体消费者
|
||||
type consumption struct {
|
||||
startOn time.Time // 启动时间
|
||||
stream *Stream
|
||||
cid CID // 消费ID
|
||||
consumer Consumer // 消费者
|
||||
recvQueue *cache.PackQueue // 接收媒体源数据的队列
|
||||
closed bool // 消费者是否关闭
|
||||
Flow stats.Flow // 流量统计
|
||||
logger *xlog.Logger // 日志对象
|
||||
}
|
||||
|
||||
func (c *consumption) ID() CID {
|
||||
return c.cid
|
||||
}
|
||||
|
||||
// Close 关闭消费者
|
||||
func (c *consumption) Close() error {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.closed = true
|
||||
c.recvQueue.Signal()
|
||||
return nil
|
||||
}
|
||||
|
||||
// 向消费者发送媒体包
|
||||
func (c *consumption) send(pack cache.Pack) {
|
||||
c.recvQueue.Enqueue(pack)
|
||||
c.Flow.AddIn(int64(pack.Size()))
|
||||
}
|
||||
|
||||
// 向消费者发送一个图像组
|
||||
func (c *consumption) sendGop(packCache cache.PackCache) int {
|
||||
bytes := packCache.EnqueueTo(c.recvQueue)
|
||||
c.Flow.AddIn(int64(bytes))
|
||||
return bytes
|
||||
}
|
||||
|
||||
func (c *consumption) consume() {
|
||||
defer func() {
|
||||
defer func() { // 避免 handler 再 panic
|
||||
recover()
|
||||
}()
|
||||
|
||||
if r := recover(); r != nil {
|
||||
c.logger.Errorf("consume routine panic;r = %v \n %s", r, debug.Stack())
|
||||
}
|
||||
|
||||
// 停止消费
|
||||
c.stream.StopConsume(c.cid)
|
||||
c.consumer.Close()
|
||||
|
||||
// 尽早通知GC,回收内存
|
||||
c.recvQueue.Clear()
|
||||
c.stream = nil
|
||||
}()
|
||||
|
||||
for !c.closed {
|
||||
pack := c.recvQueue.Dequeue()
|
||||
if pack == nil {
|
||||
if !c.closed {
|
||||
c.logger.Warn("receive nil pack")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.consumer.Consume(pack)
|
||||
c.Flow.AddOut(int64(pack.Size()))
|
||||
}
|
||||
}
|
||||
|
||||
// ConsumptionInfo 消费者信息
|
||||
type ConsumptionInfo struct {
|
||||
ID uint32 `json:"id"`
|
||||
StartOn string `json:"start_on"`
|
||||
NetType string `json:"net_type"`
|
||||
Addr string `json:"addr,omitempty"`
|
||||
Flow stats.FlowSample `json:"flow"` // 转换成 K
|
||||
}
|
||||
|
||||
// Info 获取消费者信息
|
||||
func (c *consumption) Info() ConsumptionInfo {
|
||||
flow := c.Flow.GetSample()
|
||||
flow.InBytes /= 1024
|
||||
flow.OutBytes /= 1024
|
||||
|
||||
return ConsumptionInfo{
|
||||
ID: uint32(c.cid),
|
||||
StartOn: c.startOn.Format(time.RFC3339Nano),
|
||||
NetType: c.consumer.NetType(),
|
||||
Addr: c.consumer.Addr(),
|
||||
Flow: flow,
|
||||
}
|
||||
}
|
70
media/consumptions.go
Executable file
70
media/consumptions.go
Executable file
@@ -0,0 +1,70 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/cnotch/ipchub/media/cache"
|
||||
)
|
||||
|
||||
type consumptions struct {
|
||||
sync.Map
|
||||
count int32
|
||||
}
|
||||
|
||||
func (m *consumptions) SendToAll(p cache.Pack) {
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
c := value.(*consumption)
|
||||
c.send(p)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (m *consumptions) RemoveAndCloseAll() {
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
c := value.(*consumption)
|
||||
m.Delete(key)
|
||||
c.Close()
|
||||
return true
|
||||
})
|
||||
|
||||
atomic.StoreInt32(&m.count, 0)
|
||||
}
|
||||
|
||||
func (m *consumptions) Add(c *consumption) {
|
||||
m.Store(c.cid, c)
|
||||
atomic.AddInt32(&m.count, 1)
|
||||
}
|
||||
|
||||
func (m *consumptions) Remove(cid CID) *consumption {
|
||||
ci, ok := m.Load(cid)
|
||||
if ok {
|
||||
m.Delete(cid)
|
||||
atomic.AddInt32(&m.count, -1)
|
||||
return ci.(*consumption)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *consumptions) Count() int {
|
||||
return int(atomic.LoadInt32(&m.count))
|
||||
}
|
||||
|
||||
func (m *consumptions) Infos() []ConsumptionInfo {
|
||||
cs := make([]ConsumptionInfo, 0, 10)
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
cs = append(cs, value.(*consumption).Info())
|
||||
return true
|
||||
})
|
||||
|
||||
sort.Slice(cs, func(i, j int) bool {
|
||||
return cs[i].ID < cs[j].ID
|
||||
})
|
||||
|
||||
return cs
|
||||
}
|
62
media/options.go
Executable file
62
media/options.go
Executable file
@@ -0,0 +1,62 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Multicastable 支持组播模式的源
|
||||
type Multicastable interface {
|
||||
AddMember(io.Closer)
|
||||
ReleaseMember(io.Closer)
|
||||
MulticastIP() string
|
||||
Port(index int) int
|
||||
TTL() int
|
||||
SourceIP() string
|
||||
}
|
||||
|
||||
// Hlsable 支持Hls访问
|
||||
type Hlsable interface {
|
||||
WriteM3U8PlayListTo(w io.Writer) error
|
||||
GetTS(seq int) ([]byte, error)
|
||||
LastAccessTime() time.Time
|
||||
}
|
||||
|
||||
// Option 配置 Stream 的选项接口
|
||||
type Option interface {
|
||||
apply(*Stream)
|
||||
}
|
||||
|
||||
// optionFunc 包装函数以便它满足 Option 接口
|
||||
type optionFunc func(*Stream)
|
||||
|
||||
func (f optionFunc) apply(s *Stream) {
|
||||
f(s)
|
||||
}
|
||||
|
||||
// Attr 流属性选项
|
||||
func Attr(k, v string) Option {
|
||||
return optionFunc(func(s *Stream) {
|
||||
k := strings.ToLower(strings.TrimSpace(k))
|
||||
s.attrs[k] = v
|
||||
})
|
||||
}
|
||||
|
||||
// Multicast 流组播选项
|
||||
func Multicast(multicast Multicastable) Option {
|
||||
return optionFunc(func(s *Stream) {
|
||||
s.multicast = multicast
|
||||
})
|
||||
}
|
||||
|
||||
// Hls Hls选项
|
||||
func Hls(hls Hlsable) Option {
|
||||
return optionFunc(func(s *Stream) {
|
||||
s.hls = hls
|
||||
})
|
||||
}
|
139
media/parsemeta.go
Executable file
139
media/parsemeta.go
Executable file
@@ -0,0 +1,139 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
|
||||
"github.com/cnotch/ipchub/av"
|
||||
"github.com/cnotch/ipchub/av/h264"
|
||||
"github.com/cnotch/ipchub/utils/scan"
|
||||
"github.com/pixelbender/go-sdp/sdp"
|
||||
)
|
||||
|
||||
func parseMeta(rawsdp string, video *av.VideoMeta, audio *av.AudioMeta) {
|
||||
sdp, err := sdp.ParseString(rawsdp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, media := range sdp.Media {
|
||||
switch media.Type {
|
||||
case "video":
|
||||
video.Codec = media.Format[0].Name
|
||||
if video.Codec != "" {
|
||||
for _, bw := range media.Bandwidth {
|
||||
if bw.Type == "AS" {
|
||||
video.DataRate = float64(bw.Value)
|
||||
}
|
||||
}
|
||||
parseVideoMeta(media.Format[0], video)
|
||||
}
|
||||
|
||||
case "audio":
|
||||
audio.Codec = media.Format[0].Name
|
||||
if audio.Codec == "MPEG4-GENERIC" {
|
||||
audio.Codec = "AAC"
|
||||
}
|
||||
|
||||
if audio.Codec != "" {
|
||||
for _, bw := range media.Bandwidth {
|
||||
if bw.Type == "AS" {
|
||||
audio.DataRate = float64(bw.Value)
|
||||
}
|
||||
}
|
||||
parseAudioMeta(media.Format[0], audio)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseAudioMeta(m *sdp.Format, audio *av.AudioMeta) {
|
||||
audio.SampleRate = m.ClockRate
|
||||
audio.Stereo = m.Channels == 2
|
||||
audio.SampleSize = m.Channels * 8
|
||||
|
||||
// parse AAC config
|
||||
if len(m.Params) == 0 {
|
||||
return
|
||||
}
|
||||
if audio.Codec == "AAC" {
|
||||
for _, p := range m.Params {
|
||||
i := strings.Index(p, "config=")
|
||||
if i < 0 {
|
||||
continue
|
||||
}
|
||||
p = p[i+len("config="):]
|
||||
|
||||
endi := strings.IndexByte(p, ';')
|
||||
if endi > -1 {
|
||||
p = p[:endi]
|
||||
}
|
||||
sps, err := hex.DecodeString(p)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
audio.Sps = []byte{0x11, 0x90, 0x56, 0xe5, 0x00}
|
||||
copy(audio.Sps, sps)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseVideoMeta(m *sdp.Format, video *av.VideoMeta) {
|
||||
if len(m.Params) == 0 {
|
||||
return
|
||||
}
|
||||
switch video.Codec {
|
||||
case "h264", "H264":
|
||||
for _, p := range m.Params {
|
||||
i := strings.Index(p, "sprop-parameter-sets=")
|
||||
if i < 0 {
|
||||
continue
|
||||
}
|
||||
p = p[i+len("sprop-parameter-sets="):]
|
||||
|
||||
endi := strings.IndexByte(p, ';')
|
||||
if endi > -1 {
|
||||
p = p[:endi]
|
||||
}
|
||||
parseH264SpsPps(p, video)
|
||||
break
|
||||
}
|
||||
case "h265", "H265", "hevc", "HEVC":
|
||||
// TODO: parse H265 vps sps pps
|
||||
}
|
||||
}
|
||||
|
||||
func parseH264SpsPps(s string, video *av.VideoMeta) {
|
||||
ppsStr, spsStr, ok := scan.Comma.Scan(s)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
sps, err := base64.StdEncoding.DecodeString(spsStr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
pps, err := base64.StdEncoding.DecodeString(ppsStr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var rawSps h264.RawSPS
|
||||
err = rawSps.Decode(sps)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
video.Width = rawSps.Width()
|
||||
video.Height = rawSps.Height()
|
||||
video.FrameRate = rawSps.FrameRate()
|
||||
video.Sps = sps
|
||||
video.Pps = pps
|
||||
}
|
246
media/stream.go
Executable file
246
media/stream.go
Executable file
@@ -0,0 +1,246 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cnotch/ipchub/av"
|
||||
"github.com/cnotch/ipchub/config"
|
||||
"github.com/cnotch/ipchub/media/cache"
|
||||
"github.com/cnotch/ipchub/protos/rtp"
|
||||
"github.com/cnotch/ipchub/stats"
|
||||
"github.com/cnotch/xlog"
|
||||
)
|
||||
|
||||
// 流状态
|
||||
const (
|
||||
StreamOK int32 = iota
|
||||
StreamClosed // 源关闭
|
||||
StreamReplaced // 流被替换
|
||||
StreamNoConsumer
|
||||
)
|
||||
|
||||
// 错误定义
|
||||
var (
|
||||
// ErrStreamClosed 流被关闭
|
||||
ErrStreamClosed = errors.New("stream is closed")
|
||||
// ErrStreamReplaced 流被替换
|
||||
ErrStreamReplaced = errors.New("stream is replaced")
|
||||
statusErrors = []error{nil, ErrStreamClosed, ErrStreamReplaced}
|
||||
)
|
||||
|
||||
// Stream 媒体流
|
||||
type Stream struct {
|
||||
startOn time.Time // 启动时间
|
||||
path string // 流路径
|
||||
rawsdp string
|
||||
size uint64 // 流已经接收到的输入(字节)
|
||||
status int32 // 流状态
|
||||
consumerSequenceSeed uint32
|
||||
consumptions consumptions // 消费者列表
|
||||
cache cache.PackCache // 媒体包缓存
|
||||
attrs map[string]string // 流属性
|
||||
multicast Multicastable
|
||||
hls Hlsable
|
||||
logger *xlog.Logger // 日志对象
|
||||
Video av.VideoMeta
|
||||
Audio av.AudioMeta
|
||||
}
|
||||
|
||||
// NewStream 创建新的流
|
||||
func NewStream(path string, rawsdp string, options ...Option) *Stream {
|
||||
s := &Stream{
|
||||
startOn: time.Now(),
|
||||
path: path,
|
||||
rawsdp: rawsdp,
|
||||
status: StreamOK,
|
||||
consumerSequenceSeed: 0,
|
||||
attrs: make(map[string]string, 2),
|
||||
logger: xlog.L().With(xlog.Fields(xlog.F("path", path))),
|
||||
}
|
||||
|
||||
// parseMeta
|
||||
parseMeta(rawsdp, &s.Video, &s.Audio)
|
||||
|
||||
// init Cache
|
||||
switch s.Video.Codec {
|
||||
case "h264", "H264":
|
||||
s.cache = cache.NewH264Cache(config.CacheGop())
|
||||
case "h265", "H265", "hevc", "HEVC":
|
||||
s.cache = cache.NewHevcCache(config.CacheGop())
|
||||
default:
|
||||
s.cache = cache.NewEmptyCache()
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
option.apply(s)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Path 流路径
|
||||
func (s *Stream) Path() string {
|
||||
return s.path
|
||||
}
|
||||
|
||||
// Attr 流属性
|
||||
func (s *Stream) Attr(key string) string {
|
||||
return s.attrs[strings.ToLower(strings.TrimSpace(key))]
|
||||
}
|
||||
|
||||
// Close 关闭流
|
||||
func (s *Stream) Close() error {
|
||||
return s.close(StreamClosed)
|
||||
}
|
||||
func (s *Stream) close(status int32) error {
|
||||
if atomic.LoadInt32(&s.status) != StreamOK {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 修改流状态
|
||||
if status != StreamReplaced {
|
||||
status = StreamClosed
|
||||
}
|
||||
atomic.StoreInt32(&s.status, status)
|
||||
|
||||
s.consumptions.RemoveAndCloseAll()
|
||||
|
||||
// 尽早通知GC,回收内存
|
||||
s.cache.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
// WritePacket 向流写入一个媒体包
|
||||
func (s *Stream) WritePacket(packet *rtp.Packet) error {
|
||||
status := atomic.LoadInt32(&s.status)
|
||||
if status != StreamOK {
|
||||
return statusErrors[status]
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.size, uint64(packet.Size()))
|
||||
|
||||
s.cache.CachePack(packet)
|
||||
|
||||
s.consumptions.SendToAll(packet)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Multicastable 返回组播支持能力,不支持返回nil
|
||||
func (s *Stream) Multicastable() Multicastable {
|
||||
return s.multicast
|
||||
}
|
||||
|
||||
// Hlsable 返回支持hls能力,不支持返回nil
|
||||
func (s *Stream) Hlsable() Hlsable {
|
||||
return s.hls
|
||||
}
|
||||
|
||||
func (s *Stream) startConsume(consumer Consumer, useGopCache bool) CID {
|
||||
c := &consumption{
|
||||
startOn: time.Now(),
|
||||
stream: s,
|
||||
cid: NewCID(consumer.PacketType(), &s.consumerSequenceSeed),
|
||||
recvQueue: cache.NewPackQueue(),
|
||||
consumer: consumer,
|
||||
Flow: stats.NewFlow(),
|
||||
}
|
||||
|
||||
c.logger = s.logger.With(xlog.Fields(
|
||||
xlog.F("cid", uint32(c.cid)),
|
||||
xlog.F("packettype", consumer.PacketType().String()),
|
||||
xlog.F("nettype", consumer.NetType())))
|
||||
|
||||
if useGopCache {
|
||||
c.sendGop(s.cache) // 新消费者,先发送gop缓存
|
||||
}
|
||||
|
||||
s.consumptions.Add(c)
|
||||
go c.consume()
|
||||
|
||||
return c.cid
|
||||
}
|
||||
|
||||
// StartConsume 开始消费
|
||||
func (s *Stream) StartConsume(consumer Consumer) CID {
|
||||
return s.startConsume(consumer, true)
|
||||
}
|
||||
|
||||
// StartConsumeNoGopCache 开始消费,不使用GopCahce
|
||||
func (s *Stream) StartConsumeNoGopCache(consumer Consumer) CID {
|
||||
return s.startConsume(consumer, false)
|
||||
}
|
||||
|
||||
// StopConsume 开始消费
|
||||
func (s *Stream) StopConsume(cid CID) {
|
||||
c := s.consumptions.Remove(cid)
|
||||
if c != nil {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) canMigrateFrom(sourceStream *Stream) bool {
|
||||
return s.rawsdp == sourceStream.rawsdp
|
||||
}
|
||||
|
||||
// MigrateFrom 从源流迁移消费者
|
||||
func (s *Stream) migrateFrom(sourceStream *Stream) {
|
||||
sourceStream.consumptions.Range(func(key, value interface{}) bool {
|
||||
c := value.(*consumption)
|
||||
sourceStream.consumptions.Delete(key) // 仅移出,不关闭
|
||||
c.recvQueue.Clear() // 清除已有的缓冲
|
||||
c.stream = s
|
||||
s.consumptions.Add(c) // 添加到新的流中
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// StreamInfo 流信息
|
||||
type StreamInfo struct {
|
||||
StartOn string `json:"start_on"`
|
||||
Path string `json:"path"`
|
||||
Addr string `json:"addr"`
|
||||
Size int `json:"size"`
|
||||
Video *av.VideoMeta `json:"video,omitempty"`
|
||||
Audio *av.AudioMeta `json:"audio,omitempty"`
|
||||
ConsumptionCount int `json:"cc"`
|
||||
Consumptions []ConsumptionInfo `json:"cs,omitempty"`
|
||||
}
|
||||
|
||||
// Info 获取流信息
|
||||
func (s *Stream) Info(includeCS bool) *StreamInfo {
|
||||
si := &StreamInfo{
|
||||
StartOn: s.startOn.Format(time.RFC3339Nano),
|
||||
Path: s.path,
|
||||
Addr: s.Attr("addr"),
|
||||
Size: int(atomic.LoadUint64(&s.size) / 1024),
|
||||
ConsumptionCount: s.consumptions.Count(),
|
||||
}
|
||||
|
||||
if len(s.Video.Codec) != 0 {
|
||||
si.Video = &s.Video
|
||||
}
|
||||
if len(s.Audio.Codec) != 0 {
|
||||
si.Audio = &s.Audio
|
||||
}
|
||||
if includeCS {
|
||||
si.Consumptions = s.consumptions.Infos()
|
||||
}
|
||||
return si
|
||||
}
|
||||
|
||||
// GetConsumption 获取指定消费信息
|
||||
func (s *Stream) GetConsumption(cid CID) (ConsumptionInfo, bool) {
|
||||
c, ok := s.consumptions.Load(cid)
|
||||
if ok {
|
||||
return c.(*consumption).Info(), ok
|
||||
}
|
||||
return ConsumptionInfo{}, false
|
||||
}
|
176
media/stream_test.go
Executable file
176
media/stream_test.go
Executable file
@@ -0,0 +1,176 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package media
|
||||
|
||||
import (
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cnotch/ipchub/media/cache"
|
||||
"github.com/cnotch/ipchub/protos/rtp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type emptyMulticastable struct {
|
||||
}
|
||||
|
||||
func (m emptyMulticastable) AddMember(io.Closer) {}
|
||||
func (m emptyMulticastable) ReleaseMember(io.Closer) {}
|
||||
func (m emptyMulticastable) MulticastIP() string { return "234.0.0.1" }
|
||||
func (m emptyMulticastable) Port(int) int { return 0 }
|
||||
func (m emptyMulticastable) SourceIP() string { return "234.0.0.1" }
|
||||
func (m emptyMulticastable) TTL() int { return 0 }
|
||||
|
||||
type emptyConsumer struct {
|
||||
}
|
||||
|
||||
func (c emptyConsumer) PacketType() PacketType { return RTPPacket }
|
||||
func (c emptyConsumer) NetType() string { return "" }
|
||||
func (c emptyConsumer) Addr() string { return "" }
|
||||
func (c emptyConsumer) Consume(pack cache.Pack) {}
|
||||
func (c emptyConsumer) Close() error { return nil }
|
||||
|
||||
type panicConsumer struct {
|
||||
try int
|
||||
}
|
||||
|
||||
func (c *panicConsumer) PacketType() PacketType { return RTPPacket }
|
||||
func (c *panicConsumer) NetType() string { return "" }
|
||||
func (c *panicConsumer) Addr() string { return "" }
|
||||
func (c *panicConsumer) Consume(pack cache.Pack) {
|
||||
c.try++
|
||||
if c.try > 3 {
|
||||
panic("panicConsumer")
|
||||
}
|
||||
}
|
||||
func (c *panicConsumer) Close() error { return nil }
|
||||
|
||||
const sdpRaw = `v=0
|
||||
o=- 0 0 IN IP4 127.0.0.1
|
||||
s=No Name
|
||||
c=IN IP4 127.0.0.1
|
||||
t=0 0
|
||||
a=tool:libavformat 58.20.100
|
||||
m=video 0 RTP/AVP 96
|
||||
b=AS:2500
|
||||
a=rtpmap:96 H264/90000
|
||||
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAH6zZQFAFuhAAAAMAEAAAAwPI8YMZYA==,aO+8sA==; profile-level-id=64001F
|
||||
a=control:streamid=0
|
||||
m=audio 0 RTP/AVP 97
|
||||
b=AS:160
|
||||
a=rtpmap:97 MPEG4-GENERIC/44100/2
|
||||
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=121056E500
|
||||
a=control:streamid=1
|
||||
`
|
||||
|
||||
func TestNewStream(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
path string
|
||||
options []Option
|
||||
}{
|
||||
{
|
||||
name: "test01",
|
||||
path: "/live/enter",
|
||||
options: []Option{Attr(" ok ", "ok"), Attr("name", "chj"), Multicast(emptyMulticastable{})},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := NewStream(tt.path, sdpRaw, tt.options...)
|
||||
v := got.Attr("Ok")
|
||||
assert.Equal(t, "ok", v, "Must is ok")
|
||||
assert.NotNil(t, got.Multicastable(), "Must is not nil")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Consumption_Consume(t *testing.T) {
|
||||
s := NewStream("live/test", sdpRaw)
|
||||
|
||||
t.Run("Consumption_Consume", func(t *testing.T) {
|
||||
closed := false
|
||||
go func() {
|
||||
for !closed {
|
||||
s.WritePacket(&rtp.Packet{})
|
||||
<-time.After(time.Millisecond * 1)
|
||||
}
|
||||
}()
|
||||
cid := s.StartConsume(emptyConsumer{})
|
||||
assert.Equal(t, 1, s.consumptions.Count(), "must is 1")
|
||||
|
||||
<-time.After(time.Millisecond * 1000)
|
||||
cinfo, ok := s.GetConsumption(cid)
|
||||
assert.True(t, ok, "must is true")
|
||||
assert.NotZero(t, cinfo.Flow.OutBytes, "must > 0")
|
||||
|
||||
s.StopConsume(cid)
|
||||
assert.Equal(t, 0, s.consumptions.Count(), "must is 0")
|
||||
closed = true
|
||||
s.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Consumption_ConsumePanic(t *testing.T) {
|
||||
s := NewStream("live/test", sdpRaw)
|
||||
t.Run("Test_Consumption_ConsumePanic", func(t *testing.T) {
|
||||
closed := false
|
||||
go func() {
|
||||
for !closed {
|
||||
s.WritePacket(&rtp.Packet{})
|
||||
<-time.After(time.Millisecond * 1)
|
||||
}
|
||||
}()
|
||||
s.StartConsume(&panicConsumer{})
|
||||
assert.Equal(t, 1, s.consumptions.Count(), "must is 1")
|
||||
|
||||
<-time.After(time.Millisecond * 100)
|
||||
assert.Equal(t, 0, s.consumptions.Count(), "panic autoclose,must is 0")
|
||||
closed = true
|
||||
s.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func benchDispatch(n int, b *testing.B) {
|
||||
s := NewStream("/live/a", sdpRaw)
|
||||
for i := 0; i < n; i++ {
|
||||
s.StartConsume(emptyConsumer{})
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
s.WritePacket(&rtp.Packet{})
|
||||
}
|
||||
})
|
||||
s.Close()
|
||||
}
|
||||
func Benchmark_Stream_Dispatch1(b *testing.B) {
|
||||
benchDispatch(1, b)
|
||||
}
|
||||
func Benchmark_Stream_Dispatch5(b *testing.B) {
|
||||
benchDispatch(5, b)
|
||||
}
|
||||
func Benchmark_Stream_Dispatch10(b *testing.B) {
|
||||
benchDispatch(10, b)
|
||||
}
|
||||
func Benchmark_Stream_Dispatch50(b *testing.B) {
|
||||
benchDispatch(50, b)
|
||||
}
|
||||
func Benchmark_Stream_Dispatch100(b *testing.B) {
|
||||
benchDispatch(100, b)
|
||||
}
|
||||
func Benchmark_Stream_Dispatch500(b *testing.B) {
|
||||
benchDispatch(500, b)
|
||||
}
|
||||
|
||||
func Benchmark_Stream_Dispatch1000(b *testing.B) {
|
||||
benchDispatch(1000, b)
|
||||
}
|
||||
|
||||
func Benchmark_Stream_Dispatch10000(b *testing.B) {
|
||||
benchDispatch(10000, b)
|
||||
}
|
Reference in New Issue
Block a user