fix: flv record

This commit is contained in:
langhuihui
2024-08-13 20:11:47 +08:00
parent 78e8d74fec
commit 24fa98bdf3
32 changed files with 339 additions and 252 deletions

16
api.go
View File

@@ -131,7 +131,7 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err
}
func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
} else {
@@ -142,7 +142,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
return
}
func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest) (res *pb.SubscribersResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
var subscribers []*pb.SubscriberSnapShot
for subscriber := range s.Subscribers.Range {
meta, _ := json.Marshal(subscriber.Description)
@@ -178,7 +178,7 @@ func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest)
return
}
func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
@@ -257,7 +257,7 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
}
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
@@ -324,7 +324,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt
}
func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
subscriber.Publisher.RemoveSubscriber(subscriber)
@@ -340,7 +340,7 @@ func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeReq
}
func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
subscriber.Stop(errors.New("stop by api"))
} else {
@@ -353,7 +353,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res
// /api/stream/list
func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
var streams []*pb.StreamInfoResponse
for publisher := range s.Streams.Range {
info, err := s.getStreamInfo(publisher)
@@ -369,7 +369,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
}
func (s *Server) WaitList(context.Context, *emptypb.Empty) (res *pb.StreamWaitListResponse, err error) {
s.streamTask.Call(func(*pkg.Task) error {
s.streamTask.Call(func(*util.Task) error {
res = &pb.StreamWaitListResponse{
List: make(map[string]int32),
}

View File

@@ -25,10 +25,10 @@ rtmp:
pull:
pullonsub:
# live/pull: rtmp://localhost/live/test
# flv:
# pull:
# pullonstart:
# live/test: /Users/dexter/Movies/jb-demo.flv
flv:
pull:
pullonstart:
live/test: /Users/dexter/project/v5/monibuca/example/default/record/live/test.flv
gb28181:
sip:
listenaddr:

View File

@@ -3,6 +3,6 @@ global:
flv:
record:
enableregexp: true
fragment: 10s
# fragment: 10s
recordlist:
.+: record/$0.flv
.+: record/$0

View File

@@ -5,6 +5,7 @@ import (
"log/slog"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
"time"
)
@@ -22,6 +23,7 @@ const (
)
type AVRingReader struct {
*slog.Logger
RingReader
Track *AVTrack
State byte
@@ -34,7 +36,6 @@ type AVRingReader struct {
startTime time.Time
AbsTime uint32
Delay uint32
*slog.Logger
}
func (r *AVRingReader) DecConfChanged() bool {
@@ -160,7 +161,7 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) {
r.AbsTime = 1
}
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
r.Log(context.TODO(), TraceLevel, r.Track.FourCC().String(), "delay", r.Delay)
r.Log(context.TODO(), util.TraceLevel, r.Track.FourCC().String(), "delay", r.Delay)
return
}

View File

@@ -11,6 +11,7 @@ var (
ErrPublishTimeout = errors.New("publish timeout")
ErrPublishIdleTimeout = errors.New("publish idle timeout")
ErrPublishDelayCloseTimeout = errors.New("publish delay close timeout")
ErrPublishWaitCloseTimeout = errors.New("publish wait close timeout")
ErrPushRemoteURLExist = errors.New("push remote url exist")
ErrSubscribeTimeout = errors.New("subscribe timeout")
ErrRestart = errors.New("restart")
@@ -18,6 +19,6 @@ var (
ErrUnsupportCodec = errors.New("unsupport codec")
ErrMuted = errors.New("muted")
ErrLost = errors.New("lost")
ErrRetryRunOut = errors.New("retry run out")
ErrRecordSamePath = errors.New("record same path")
ErrRecordSamePath = errors.New("record same path")
)

View File

@@ -3,6 +3,7 @@ package pkg
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"slices"
)
@@ -11,7 +12,7 @@ var _ slog.Handler = (*MultiLogHandler)(nil)
func ParseLevel(level string) slog.Level {
var lv slog.LevelVar
if level == "trace" {
lv.Set(TraceLevel)
lv.Set(util.TraceLevel)
} else {
lv.UnmarshalText([]byte(level))
}

View File

@@ -139,7 +139,7 @@ func (rb *RingWriter) Step() (normal bool) {
isIDR := rb.Value.IDR
next := rb.Next()
if isIDR {
rb.SLogger.Log(nil, TraceLevel, "add idr")
rb.SLogger.Log(nil, util.TraceLevel, "add idr")
rb.PushIDR()
}
if rb.IDRingList.Len() > 0 {
@@ -153,12 +153,12 @@ func (rb *RingWriter) Step() (normal bool) {
}
} else if next == oldIDR.Value {
if nextOld := oldIDR.Next(); nextOld != nil && rb.durationFrom(nextOld.Value) > rb.BufferRange[0] {
rb.SLogger.Log(nil, TraceLevel, "remove old idr")
rb.SLogger.Log(nil, util.TraceLevel, "remove old idr")
rb.Lock()
rb.IDRingList.Remove(oldIDR)
rb.Unlock()
} else {
rb.SLogger.Log(nil, TraceLevel, "not enough buffer")
rb.SLogger.Log(nil, util.TraceLevel, "not enough buffer")
rb.glow(5)
next = rb.Next()
}

View File

@@ -116,5 +116,5 @@ func (t *Track) WaitReady() error {
}
func (t *Track) Trace(msg string, fields ...any) {
t.Log(context.TODO(), TraceLevel, msg, fields...)
t.Log(context.TODO(), util.TraceLevel, msg, fields...)
}

View File

@@ -1,4 +1,4 @@
package pkg
package util
import "reflect"

View File

@@ -1,9 +1,8 @@
package pkg
package util
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"os"
"reflect"
"slices"
@@ -86,7 +85,7 @@ func (mt *MarcoTask) lazyStart(t ITask) {
task.disposeHandler = EmptyDispose
}
mt.lazyRun.Do(func() {
mt.shutdown = util.NewPromise(context.Background())
mt.shutdown = NewPromise(context.Background())
go mt.run()
})
mt.addSub <- t

View File

@@ -1,11 +1,14 @@
package pkg
package util
import (
"errors"
"fmt"
"reflect"
"time"
)
var ErrRetryRunOut = errors.New("retry run out")
type RetryTask struct {
Task
MaxRetry int

View File

@@ -1,4 +1,4 @@
package pkg
package util
import (
"context"
@@ -6,8 +6,6 @@ import (
"log/slog"
"reflect"
"time"
"m7s.live/m7s/v5/pkg/util"
)
const TraceLevel = slog.Level(-8)
@@ -50,7 +48,7 @@ type (
afterStartListeners, afterDisposeListeners []func()
disposeHandler func()
Description map[string]any
startup, shutdown *util.Promise
startup, shutdown *Promise
parent *MarcoTask
parentCtx context.Context
}
@@ -137,6 +135,6 @@ func (task *Task) dispose() {
func (task *Task) init(ctx context.Context) {
task.parentCtx = ctx
task.Context, task.CancelCauseFunc = context.WithCancelCause(ctx)
task.startup = util.NewPromise(task.Context)
task.shutdown = util.NewPromise(context.Background())
task.startup = NewPromise(task.Context)
task.shutdown = NewPromise(context.Background())
}

View File

@@ -1,8 +1,9 @@
package pkg
package util
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg"
"os"
"testing"
"time"
@@ -33,7 +34,7 @@ type retryDemoTask struct {
}
func (task *retryDemoTask) Start() error {
return ErrRestart
return pkg.ErrRestart
}
func Test_RetryTask(t *testing.T) {

View File

@@ -116,8 +116,9 @@ type iPlugin interface {
}
type IPlugin interface {
ITask
util.ITask
OnInit() error
OnStop()
}
type IRegisterHandler interface {
@@ -182,7 +183,7 @@ func InstallPlugin[C iPlugin](options ...any) error {
}
type Plugin struct {
MarcoLongTask
util.MarcoLongTask
Disabled bool
Meta *PluginMeta
config config.Common
@@ -250,16 +251,10 @@ func (p *Plugin) assign() {
func (p *Plugin) Start() (err error) {
s := p.Server
err = p.handler.OnInit()
if err != nil {
p.Error("init", "error", err)
return
}
if p.Meta.ServiceDesc != nil && s.grpcServer != nil {
s.grpcServer.RegisterService(p.Meta.ServiceDesc, p.handler)
if p.Meta.RegisterGRPCHandler != nil {
if err = p.Meta.RegisterGRPCHandler(p.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn); err != nil {
p.Error("init", "error", err)
return
} else {
p.Info("grpc handler registered")
@@ -267,17 +262,25 @@ func (p *Plugin) Start() (err error) {
}
}
s.Plugins.Add(p)
p.listen()
err = p.listen()
if err != nil {
return
}
err = p.handler.OnInit()
if err != nil {
return
}
return
}
func (p *Plugin) Dispose() {
p.Server.Plugins.Remove(p)
p.handler.OnStop()
p.config.HTTP.StopListen()
p.config.TCP.StopListen()
p.Server.Plugins.Remove(p)
}
func (p *Plugin) listen() {
func (p *Plugin) listen() (err error) {
httpConf := &p.config.HTTP
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) {
p.Info("https listen at ", "addr", httpConf.ListenAddrTLS)
@@ -292,44 +295,49 @@ func (p *Plugin) listen() {
}()
}
defer func() {
if err != nil {
p.config.HTTP.StopListen()
}
}()
if tcphandler, ok := p.handler.(ITCPPlugin); ok {
tcpConf := &p.config.TCP
if tcpConf.ListenAddr != "" && tcpConf.AutoListen {
p.Info("listen tcp", "addr", tcpConf.ListenAddr)
go func() {
err := tcpConf.Listen(tcphandler.OnTCPConnect)
if err != nil {
p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err)
p.Stop(err)
}
}()
err = tcpConf.Listen(tcphandler.OnTCPConnect)
if err != nil {
p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err)
return
}
}
if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen {
p.Info("listen tcp tls", "addr", tcpConf.ListenAddrTLS)
go func() {
err := tcpConf.ListenTLS(tcphandler.OnTCPConnect)
if err != nil {
p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err)
p.Stop(err)
}
}()
err = tcpConf.ListenTLS(tcphandler.OnTCPConnect)
if err != nil {
p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err)
return
}
}
defer func() {
if err != nil {
p.config.TCP.StopListen()
}
}()
}
if udpHandler, ok := p.handler.(IUDPPlugin); ok {
udpConf := &p.config.UDP
if udpConf.ListenAddr != "" && udpConf.AutoListen {
p.Info("listen udp", "addr", udpConf.ListenAddr)
go func() {
err := udpConf.Listen(udpHandler.OnUDPConnect)
if err != nil {
p.Error("listen udp", "addr", udpConf.ListenAddr, "error", err)
p.Stop(err)
}
}()
err = udpConf.Listen(udpHandler.OnUDPConnect)
if err != nil {
p.Error("listen udp", "addr", udpConf.ListenAddr, "error", err)
return
}
}
}
return
}
func (p *Plugin) OnInit() error {
@@ -340,6 +348,10 @@ func (p *Plugin) OnExit() {
}
func (p *Plugin) OnStop() {
}
func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf config.Publish) (publisher *Publisher, err error) {
publisher = createPublisher(p, streamPath, conf)
if p.config.EnableAuth {
@@ -458,7 +470,7 @@ func (p *Plugin) AddLogHandler(handler slog.Handler) {
}
func (p *Plugin) SaveConfig() (err error) {
p.Server.Call(func(*Task) (err error) {
p.Server.Call(func(*util.Task) (err error) {
if p.Modify == nil {
os.Remove(p.settingPath())
return

View File

@@ -2,10 +2,10 @@ package flv
import (
"bufio"
"encoding/binary"
"io"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"net"
)
const (
@@ -17,28 +17,51 @@ const (
var FLVHead = []byte{'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0}
func AVCC2FLV(t byte, ts uint32, avcc ...[]byte) (flv net.Buffers) {
b := util.Buffer(make([]byte, 0, 15))
b.WriteByte(t)
dataSize := util.SizeOfBuffers(avcc)
b.WriteUint24(uint32(dataSize))
b.WriteUint24(ts)
b.WriteByte(byte(ts >> 24))
b.WriteUint24(0)
return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11))
type FlvWriter struct {
io.Writer
buf [15]byte
}
func NewFlvWriter(w io.Writer) *FlvWriter {
return &FlvWriter{Writer: w}
}
func (w *FlvWriter) WriteTag(t byte, ts, dataSize uint32, payload ...[]byte) (err error) {
WriteFLVTagHead(t, ts, dataSize, w.buf[:])
if _, err = w.Write(w.buf[:11]); err != nil {
return
}
for _, p := range payload {
if _, err = w.Write(p); err != nil {
return
}
}
binary.BigEndian.PutUint32(w.buf[11:], dataSize+11)
_, err = w.Write(w.buf[11:])
return
}
//func AVCC2FLV(t byte, ts uint32, avcc ...[]byte) (flv net.Buffers) {
// b := util.Buffer(make([]byte, 0, 15))
// b.WriteByte(t)
// dataSize := util.SizeOfBuffers(avcc)
// b.WriteUint24(uint32(dataSize))
// b.WriteUint24(ts)
// b.WriteByte(byte(ts >> 24))
// b.WriteUint24(0)
// return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11))
//}
func WriteFLVTagHead(t uint8, ts, dataSize uint32, b []byte) {
b[0] = t
b[1], b[2], b[3] = byte(dataSize>>16), byte(dataSize>>8), byte(dataSize)
b[4], b[5], b[6], b[7] = byte(ts>>16), byte(ts>>8), byte(ts), byte(ts>>24)
}
func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload ...[]byte) (err error) {
buffers := AVCC2FLV(t, timestamp, payload...)
_, err = buffers.WriteTo(w)
return
}
//func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload ...[]byte) (n int64, err error) {
// buffers := AVCC2FLV(t, timestamp, payload...)
// return buffers.WriteTo(w)
//}
func ReadMetaData(reader io.Reader) (metaData rtmp.EcmaArray, err error) {
r := bufio.NewReader(reader)

View File

@@ -93,13 +93,9 @@ func PullFLV(p *m7s.PullContext) (err error) {
return err
}
var frame rtmp.RTMPData
switch ds := int(dataSize); t {
case FLV_TAG_TYPE_AUDIO, FLV_TAG_TYPE_VIDEO:
frame.SetAllocator(allocator)
err = reader.ReadNto(ds, frame.NextN(ds))
default:
err = reader.Skip(ds)
}
ds := int(dataSize)
frame.SetAllocator(allocator)
err = reader.ReadNto(ds, frame.NextN(ds))
if err != nil {
return err
}
@@ -112,7 +108,20 @@ func PullFLV(p *m7s.PullContext) (err error) {
case FLV_TAG_TYPE_VIDEO:
err = p.Publisher.WriteVideo(frame.WrapVideo())
case FLV_TAG_TYPE_SCRIPT:
p.Info("script")
r := frame.NewReader()
amf := &rtmp.AMF{
Buffer: util.Buffer(r.ToBytes()),
}
var obj any
obj, err = amf.Unmarshal()
name := obj
obj, err = amf.Unmarshal()
metaData := obj
frame.Recycle()
if err != nil {
return err
}
p.Info("script", name, metaData)
}
}
return

View File

@@ -1,25 +1,29 @@
package flv
import (
"fmt"
"io"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"os"
"path/filepath"
"slices"
"time"
)
var writeMetaTagQueueTask pkg.MarcoLongTask
var writeMetaTagQueueTask util.MarcoLongTask
func init() {
pkg.RootTask.AddTask(&writeMetaTagQueueTask)
writeMetaTagQueueTask.Name = "writeMetaTagQueue"
util.RootTask.AddTask(&writeMetaTagQueueTask)
}
type writeMetaTagTask struct {
pkg.Task
util.Task
file *os.File
writer *FlvWriter
flags byte
metaData []byte
}
@@ -46,7 +50,8 @@ func (task *writeMetaTagTask) Start() (err error) {
task.Error(err.Error())
return
}
err = WriteFLVTag(tempFile, FLV_TAG_TYPE_SCRIPT, 0, task.metaData)
task.writer = NewFlvWriter(tempFile)
err = task.writer.WriteTag(FLV_TAG_TYPE_SCRIPT, 0, uint32(len(task.metaData)), task.metaData)
_, err = task.file.Seek(13, io.SeekStart)
if err != nil {
task.Error("writeMetaData Seek failed", "err", err)
@@ -67,71 +72,78 @@ func (task *writeMetaTagTask) Start() (err error) {
}
}
func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64, times []float64, duration *int64) {
ar, vr := suber.AudioReader, suber.VideoReader
hasAudio, hasVideo := ar != nil, vr != nil
var amf rtmp.AMF
metaData := rtmp.EcmaArray{
"MetaDataCreator": "m7s/" + m7s.Version,
"hasVideo": hasVideo,
"hasAudio": hasAudio,
"hasMatadata": true,
"canSeekToEnd": true,
"duration": float64(*duration) / 1000,
"hasKeyFrames": len(filepositions) > 0,
"filesize": 0,
}
var flags byte
if hasAudio {
ctx := ar.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx)
flags |= (1 << 2)
metaData["audiocodecid"] = int(rtmp.ParseAudioCodec(ctx.FourCC()))
metaData["audiosamplerate"] = ctx.GetSampleRate()
metaData["audiosamplesize"] = ctx.GetSampleSize()
metaData["stereo"] = ctx.GetChannels() == 2
}
if hasVideo {
ctx := vr.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx)
flags |= 1
metaData["videocodecid"] = int(rtmp.ParseVideoCodec(ctx.FourCC()))
metaData["width"] = ctx.Width()
metaData["height"] = ctx.Height()
metaData["framerate"] = vr.Track.FPS
metaData["videodatarate"] = vr.Track.BPS
metaData["keyframes"] = map[string]any{
"filepositions": filepositions,
"times": times,
}
}
amf.Marshals("onMetaData", metaData)
offset := amf.Len() + 13 + 15
if keyframesCount := len(filepositions); keyframesCount > 0 {
metaData["filesize"] = uint64(offset) + filepositions[keyframesCount-1]
for i := range filepositions {
filepositions[i] += uint64(offset)
}
metaData["keyframes"] = map[string]any{
"filepositions": filepositions,
"times": times,
}
}
amf.Reset()
marshals := amf.Marshals("onMetaData", metaData)
task := &writeMetaTagTask{
file: file,
flags: flags,
metaData: marshals,
}
task.Logger = suber.Logger.With("file", file.Name())
writeMetaTagQueueTask.AddTask(task)
}
func RecordFlv(ctx *m7s.RecordContext) (err error) {
var file *os.File
var filepositions []uint64
var times []float64
var offset int64
var duration int64
if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR|util.Conditoinal(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil {
return
}
suber := ctx.Subscriber
ar, vr := suber.AudioReader, suber.VideoReader
hasAudio, hasVideo := ar != nil, vr != nil
writeMetaTag := func(file *os.File, filepositions []uint64, times []float64) {
var amf rtmp.AMF
metaData := rtmp.EcmaArray{
"MetaDataCreator": "m7s/" + m7s.Version,
"hasVideo": hasVideo,
"hasAudio": hasAudio,
"hasMatadata": true,
"canSeekToEnd": true,
"duration": float64(duration) / 1000,
"hasKeyFrames": len(filepositions) > 0,
"filesize": 0,
noFragment := ctx.Fragment == 0 || ctx.Append
if noFragment {
if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR|util.Conditoinal(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil {
return
}
var flags byte
if hasAudio {
ctx := ar.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx)
flags |= (1 << 2)
metaData["audiocodecid"] = int(rtmp.ParseAudioCodec(ctx.FourCC()))
metaData["audiosamplerate"] = ctx.GetSampleRate()
metaData["audiosamplesize"] = ctx.GetSampleSize()
metaData["stereo"] = ctx.GetChannels() == 2
}
if hasVideo {
ctx := vr.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx)
flags |= 1
metaData["videocodecid"] = int(rtmp.ParseVideoCodec(ctx.FourCC()))
metaData["width"] = ctx.Width()
metaData["height"] = ctx.Height()
metaData["framerate"] = vr.Track.FPS
metaData["videodatarate"] = vr.Track.BPS
metaData["keyframes"] = map[string]any{
"filepositions": filepositions,
"times": times,
}
}
amf.Marshals("onMetaData", metaData)
offset := amf.Len() + 13 + 15
if keyframesCount := len(filepositions); keyframesCount > 0 {
metaData["filesize"] = uint64(offset) + filepositions[keyframesCount-1]
for i := range filepositions {
filepositions[i] += uint64(offset)
}
metaData["keyframes"] = map[string]any{
"filepositions": filepositions,
"times": times,
}
}
amf.Reset()
marshals := amf.Marshals("onMetaData", metaData)
writeMetaTagQueueTask.AddTask(&writeMetaTagTask{
file: file,
flags: flags,
metaData: marshals,
})
defer writeMetaTag(file, suber, filepositions, times, &duration)
}
if ctx.Append {
var metaData rtmp.EcmaArray
@@ -145,60 +157,67 @@ func RecordFlv(ctx *m7s.RecordContext) (err error) {
times = keyframes["times"].([]float64)
if _, err = file.Seek(-4, io.SeekEnd); err != nil {
ctx.Error("seek file failed", "err", err)
file.Write(FLVHead)
_, err = file.Write(FLVHead)
} else {
tmp := make(util.Buffer, 4)
tmp2 := tmp
file.Read(tmp)
_, err = file.Read(tmp)
tagSize := tmp.ReadUint32()
tmp = tmp2
file.Seek(int64(tagSize), io.SeekEnd)
file.Read(tmp2)
_, err = file.Seek(int64(tagSize), io.SeekEnd)
_, err = file.Read(tmp2)
ts := tmp2.ReadUint24() | (uint32(tmp[3]) << 24)
ctx.Info("append flv", "last tagSize", tagSize, "last ts", ts)
if hasVideo {
vr.StartTs = time.Duration(ts) * time.Millisecond
}
if hasAudio {
ar.StartTs = time.Duration(ts) * time.Millisecond
}
file.Seek(0, io.SeekEnd)
suber.StartAudioTS = time.Duration(ts) * time.Millisecond
suber.StartVideoTS = time.Duration(ts) * time.Millisecond
offset, err = file.Seek(0, io.SeekEnd)
}
} else {
} else if ctx.Fragment == 0 {
file.Write(FLVHead)
}
if ctx.Fragment == 0 {
defer writeMetaTag(file, filepositions, times)
}
checkFragment := func(absTime uint32) {
if ctx.Fragment == 0 {
} else {
if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
_, err = file.Write(FLVHead)
}
writer := NewFlvWriter(file)
checkFragment := func(absTime uint32) {
if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.Fragment {
writeMetaTag(file, filepositions, times)
writeMetaTag(file, suber, filepositions, times, &duration)
filepositions = []uint64{0}
times = []float64{0}
offset = 0
if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
file.Write(FLVHead)
if vr != nil {
_, err = file.Write(FLVHead)
writer = NewFlvWriter(file)
if vr := suber.VideoReader; vr != nil {
vr.ResetAbsTime()
err = WriteFLVTag(file, FLV_TAG_TYPE_VIDEO, 0, vr.Track.SequenceFrame.(*rtmp.RTMPVideo).Buffers...)
seq := vr.Track.SequenceFrame.(*rtmp.RTMPVideo)
err = writer.WriteTag(FLV_TAG_TYPE_VIDEO, 0, uint32(seq.Size), seq.Buffers...)
offset = int64(seq.Size + 15)
}
}
}
return m7s.PlayBlock(ctx.Subscriber, func(audio *rtmp.RTMPAudio) (err error) {
if !hasVideo {
checkFragment(ar.AbsTime)
if suber.VideoReader == nil && !noFragment {
checkFragment(suber.AudioReader.AbsTime)
}
return WriteFLVTag(file, FLV_TAG_TYPE_AUDIO, vr.AbsTime, audio.Buffers...)
err = writer.WriteTag(FLV_TAG_TYPE_AUDIO, suber.AudioReader.AbsTime, uint32(audio.Size), audio.Buffers...)
offset += int64(audio.Size + 15)
return
}, func(video *rtmp.RTMPVideo) (err error) {
if vr.Value.IDR {
if suber.VideoReader.Value.IDR {
filepositions = append(filepositions, uint64(offset))
times = append(times, float64(vr.AbsTime)/1000)
times = append(times, float64(suber.VideoReader.AbsTime)/1000)
if !noFragment {
checkFragment(suber.VideoReader.AbsTime)
}
}
return WriteFLVTag(file, FLV_TAG_TYPE_VIDEO, vr.AbsTime, video.Buffers...)
err = writer.WriteTag(FLV_TAG_TYPE_VIDEO, suber.VideoReader.AbsTime, uint32(video.Size), video.Buffers...)
offset += int64(video.Size + 15)
return
})
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/emiago/sipgo/sip"
"log/slog"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg"
"net/http"
@@ -24,7 +23,7 @@ const (
)
type Device struct {
pkg.Task
util.Task
ID string
Name string
Manufacturer string

View File

@@ -3,13 +3,13 @@ package plugin_preview
import (
"embed"
"fmt"
"m7s.live/m7s/v5/pkg/util"
"mime"
"net/http"
"path/filepath"
"strings"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
)
//go:embed ui
@@ -24,14 +24,14 @@ var _ = m7s.InstallPlugin[PreviewPlugin]()
func (p *PreviewPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
s := "<h1><h1><h2>Live Streams 引擎中正在发布的流</h2>"
p.Server.CallOnStreamTask(func(*pkg.Task) error {
p.Server.CallOnStreamTask(func(*util.Task) error {
for publisher := range p.Server.Streams.Range {
s += fmt.Sprintf("<a href='%s'>%s</a> [ %s ]<br>", publisher.StreamPath, publisher.StreamPath, publisher.Plugin.Meta.Name)
}
s += "<h2>pull stream on subscribe 订阅时才会触发拉流的流</h2>"
return nil
})
p.Server.Call(func(*pkg.Task) error {
p.Server.Call(func(*util.Task) error {
for plugin := range p.Server.Plugins.Range {
if pullPlugin, ok := plugin.GetHandler().(m7s.IPullerPlugin); ok {
s += fmt.Sprintf("<h3>%s</h3>", plugin.Meta.Name)

View File

@@ -7,7 +7,6 @@ import (
"runtime"
"sync/atomic"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
)
@@ -44,7 +43,7 @@ const (
)
type NetConnection struct {
pkg.MarcoTask
util.MarcoTask
*util.BufReader
net.Conn
bandwidth uint32
@@ -73,6 +72,7 @@ func NewNetConnection(conn net.Conn) (ret *NetConnection) {
chunkHeaderBuf: make(util.Buffer, 0, 20),
Receivers: make(map[uint32]*m7s.Publisher),
}
ret.Name = "NetConnection"
ret.mediaDataPool.SetAllocator(util.NewScalableMemoryAllocator(1 << util.MinPowerOf2))
return
}

View File

@@ -2,7 +2,7 @@ package rtmp
import (
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
)
type NetStream struct {
@@ -69,7 +69,7 @@ func (ns *NetStream) BeginPlay(tid uint64) (err error) {
}
func (ns *NetStream) Subscribe(suber *m7s.Subscriber) {
ns.AddCall(func(task *pkg.Task) error {
ns.AddCall(func(task *util.Task) error {
audio, video := ns.CreateSender(false)
return m7s.PlayBlock(suber, audio.HandleAudio, video.HandleVideo)
}, nil)

View File

@@ -11,7 +11,6 @@ import (
"time"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
)
@@ -27,7 +26,7 @@ func NewNetConnection(conn net.Conn) *NetConnection {
}
type NetConnection struct {
pkg.MarcoTask
util.MarcoTask
*util.BufReader
Backchannel bool
Media string

View File

@@ -8,8 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
@@ -114,15 +112,6 @@ func (h *VmLogHandler) WithGroup(name string) slog.Handler {
}
}
func init() {
envflag.Parse()
buildinfo.Init()
logger.Init()
vlstorage.Init()
vlselect.Init()
vlinsert.Init()
}
func NewVmLogHandler(opts *slog.HandlerOptions, converter Converter) (*VmLogHandler, error) {
if err := vlstorage.CanWriteData(); err != nil {

View File

@@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"log/slog"
"m7s.live/m7s/v5"
"net/http"
@@ -26,7 +27,14 @@ type VmLogPlugin struct {
var _ = m7s.InstallPlugin[VmLogPlugin]()
func init() {
logger.Init()
}
func (config *VmLogPlugin) OnInit() (err error) {
vlstorage.Init()
vlselect.Init()
vlinsert.Init()
config.handler, err = NewVmLogHandler(nil, nil)
if err == nil {
config.AddLogHandler(config.handler)
@@ -34,7 +42,7 @@ func (config *VmLogPlugin) OnInit() (err error) {
return
}
func (config *VmLogPlugin) OnExit() {
func (config *VmLogPlugin) OnStop() {
vlinsert.Stop()
vlselect.Stop()
vlstorage.Stop()
@@ -42,6 +50,10 @@ func (config *VmLogPlugin) OnExit() {
fmt.Print("VmLogPlugin OnClose")
}
func (config *VmLogPlugin) OnExit() {
}
func (config *VmLogPlugin) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
requestHandler(rw, r)
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/pion/rtp"
. "github.com/pion/webrtc/v3"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
@@ -426,7 +425,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
if videoSender == nil {
suber.SubVideo = false
}
conn.AddCall(func(task *pkg.Task) error {
conn.AddCall(func(task *util.Task) error {
return m7s.PlayBlock(suber, func(frame *mrtp.RTPAudio) (err error) {
for _, p := range frame.Packets {
if err = audioTLSRTP.WriteRTP(p); err != nil {

View File

@@ -2,11 +2,11 @@ package webrtc
import (
. "github.com/pion/webrtc/v3"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
)
type Connection struct {
pkg.MarcoTask
util.MarcoTask
*PeerConnection
SDP string
// LocalSDP *sdp.SessionDescription

View File

@@ -91,7 +91,8 @@ func (p *Publisher) GetKey() string {
func createPublisher(p *Plugin, streamPath string, conf config.Publish) (publisher *Publisher) {
publisher = &Publisher{Publish: conf}
publisher.ID = GetNextTaskID()
publisher.ID = util.GetNextTaskID()
publisher.Name = "publisher"
publisher.Plugin = p
publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout)
publisher.Logger = p.Logger.With("streamPath", streamPath, "pId", publisher.ID)
@@ -104,8 +105,8 @@ func (p *Publisher) Start() (err error) {
if oldPublisher, ok := s.Streams.Get(p.StreamPath); ok {
if p.KickExist {
p.Warn("kick")
oldPublisher.Stop(ErrKick)
p.TakeOver(oldPublisher)
oldPublisher.Stop(ErrKick)
} else {
return ErrStreamExist
}
@@ -159,6 +160,10 @@ func (p *Publisher) timeout() (err error) {
if p.Publish.DelayCloseTimeout > 0 {
err = ErrPublishDelayCloseTimeout
}
case PublisherStateDisposed:
if p.Publish.WaitCloseTimeout > 0 {
err = ErrPublishWaitCloseTimeout
}
}
return
}
@@ -255,7 +260,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
}
}
p.lastTs = frame.Timestamp
if p.Enabled(p, TraceLevel) {
if p.Enabled(p, util.TraceLevel) {
codec := t.FourCC().String()
data := frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data)
@@ -510,20 +515,9 @@ func (p *Publisher) Dispose() {
}
subscriber.TimeoutTimer.Reset(waitCloseTimeout)
}
p.Lock()
defer p.Unlock()
if p.dumpFile != nil {
p.dumpFile.Close()
}
if p.State == PublisherStateDisposed {
panic("disposed")
}
if p.HasAudioTrack() {
p.AudioTrack.Dispose()
}
if p.HasVideoTrack() {
p.VideoTrack.Dispose()
}
p.State = PublisherStateDisposed
}
@@ -533,7 +527,12 @@ func (p *Publisher) TakeOver(old *Publisher) {
for subscriber := range old.SubscriberRange {
p.AddSubscriber(subscriber)
}
old.Stop(ErrKick)
if old.HasAudioTrack() {
old.AudioTrack.Dispose()
}
if old.HasVideoTrack() {
old.VideoTrack.Dispose()
}
old.Subscribers = SubscriberCollection{}
}

View File

@@ -3,10 +3,11 @@ package m7s
import (
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
type Connection struct {
pkg.MarcoTask
util.MarcoTask
Plugin *Plugin
StreamPath string // 对应本地流
RemoteURL string // 远程服务器地址(用于推拉)
@@ -22,6 +23,7 @@ func createPullContext(p *Plugin, streamPath string, url string) (pullCtx *PullC
Pull: p.config.Pull,
publishConfig: &publishConfig,
}
pullCtx.Name = "pull"
pullCtx.Plugin = p
pullCtx.ConnectProxy = p.config.Pull.Proxy
pullCtx.RemoteURL = url
@@ -42,7 +44,7 @@ func (p *PullContext) GetKey() string {
}
type PullSubTask struct {
pkg.RetryTask
util.RetryTask
ctx *PullContext
Puller
}

View File

@@ -2,6 +2,7 @@ package m7s
import (
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/config"
)
@@ -10,6 +11,7 @@ type Pusher = func(*PushContext) error
func createPushContext(p *Plugin, streamPath string, url string) (pushCtx *PushContext) {
pushCtx = &PushContext{Push: p.config.Push}
pushCtx.Name = "push"
pushCtx.Plugin = p
pushCtx.RemoteURL = url
pushCtx.StreamPath = streamPath
@@ -29,7 +31,7 @@ func (p *PushContext) GetKey() string {
}
type PushSubTask struct {
pkg.RetryTask
util.RetryTask
ctx *PushContext
Pusher
}

View File

@@ -1,6 +1,7 @@
package m7s
import (
"m7s.live/m7s/v5/pkg/util"
"os"
"path/filepath"
"time"
@@ -18,12 +19,13 @@ func createRecoder(p *Plugin, streamPath string, filePath string) (recorder *Rec
FilePath: filePath,
StreamPath: streamPath,
}
recorder.Name = "record"
recorder.Logger = p.Logger.With("filePath", filePath, "streamPath", streamPath)
return
}
type RecordContext struct {
pkg.MarcoTask
util.MarcoTask
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
@@ -37,9 +39,12 @@ func (p *RecordContext) GetKey() string {
}
func (p *RecordContext) Do(recorder Recorder) {
p.AddCall(func(tmpTask *pkg.Task) (err error) {
p.AddCall(func(tmpTask *util.Task) (err error) {
dir := p.FilePath
if filepath.Ext(p.FilePath) != "" {
if p.Fragment == 0 || p.Append {
if filepath.Ext(p.FilePath) == "" {
p.FilePath += ".flv"
}
dir = filepath.Dir(p.FilePath)
}
if err = os.MkdirAll(dir, 0755); err != nil {

View File

@@ -66,13 +66,13 @@ type Server struct {
tcplis net.Listener
lastSummaryTime time.Time
lastSummary *pb.SummaryResponse
streamTask, pullTask, pushTask, recordTask MarcoLongTask
streamTask, pullTask, pushTask, recordTask util.MarcoLongTask
conf any
}
func NewServer() (s *Server) {
s = &Server{}
s.ID = GetNextTaskID()
s.ID = util.GetNextTaskID()
s.Meta = &serverMeta
return
}
@@ -86,7 +86,7 @@ type rawconfig = map[string]map[string]any
func init() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
RootTask.AddChan(signalChan, func(os.Signal) {
util.RootTask.AddChan(signalChan, func(os.Signal) {
for _, meta := range plugins {
if meta.OnExit != nil {
meta.OnExit()
@@ -237,13 +237,13 @@ func (s *Server) Start() (err error) {
}
}
for publisher := range s.Waiting.Range {
// TODO: ?
//if publisher.Plugin != nil {
// if err := publisher.checkTimeout(); err != nil {
// publisher.Stop(err)
// s.createWait(publisher.StreamPath)
// }
//}
if publisher.Plugin != nil {
if err := publisher.checkTimeout(); err != nil {
if publisher.Plugin != nil {
s.createWait(publisher.StreamPath).TakeOver(publisher)
}
}
}
for sub := range publisher.SubscriberRange {
select {
case <-sub.TimeoutTimer.C:
@@ -258,7 +258,7 @@ func (s *Server) Start() (err error) {
return
}
func (s *Server) CallOnStreamTask(callback func(*Task) error) {
func (s *Server) CallOnStreamTask(callback func(*util.Task) error) {
s.streamTask.Call(callback)
}
@@ -272,7 +272,7 @@ func (s *Server) Dispose() {
func (s *Server) Run(ctx context.Context, conf any) (err error) {
for {
s.Init(conf)
RootTask.AddTaskWithContext(ctx, s)
util.RootTask.AddTaskWithContext(ctx, s)
if err = s.WaitStarted(); err != nil {
return
}

View File

@@ -5,6 +5,7 @@ import (
"net/url"
"reflect"
"runtime"
"runtime/debug"
"strings"
"time"
@@ -16,7 +17,7 @@ import (
var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
type PubSubBase struct {
Task
util.Task
Plugin *Plugin
StreamPath string
Args url.Values
@@ -51,14 +52,15 @@ type SubscriberCollection = util.Collection[uint32, *Subscriber]
type Subscriber struct {
PubSubBase
config.Subscribe
Publisher *Publisher
AudioReader *AVRingReader
VideoReader *AVRingReader
Publisher *Publisher
AudioReader, VideoReader *AVRingReader
StartAudioTS, StartVideoTS time.Duration
}
func createSubscriber(p *Plugin, streamPath string, conf config.Subscribe) *Subscriber {
subscriber := &Subscriber{Subscribe: conf}
subscriber.ID = GetNextTaskID()
subscriber.ID = util.GetNextTaskID()
subscriber.Name = "subscriber"
subscriber.Plugin = p
subscriber.TimeoutTimer = time.NewTimer(subscriber.WaitTimeout)
subscriber.Logger = p.Logger.With("streamPath", streamPath, "sId", subscriber.ID)
@@ -168,7 +170,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (err error) {
var a1, v1 reflect.Type
var startAudioTs, startVideoTs time.Duration
startAudioTs, startVideoTs := s.StartAudioTS, s.StartVideoTS
var initState = 0
prePublisher := s.Publisher
var audioFrame, videoFrame *AVFrame
@@ -181,7 +183,12 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
awi := s.createAudioReader(a1, startAudioTs)
vwi := s.createVideoReader(v1, startVideoTs)
defer func() {
s.Stop(err)
if err != nil {
s.Stop(err)
} else {
err = recover().(error)
s.Error(err.Error(), "stack", string(debug.Stack()))
}
if s.AudioReader != nil {
s.AudioReader.StopRead()
}
@@ -192,7 +199,7 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
sendAudioFrame := func() (err error) {
if awi >= 0 {
if len(audioFrame.Wraps) > awi {
if s.Enabled(s, TraceLevel) {
if s.Enabled(s, util.TraceLevel) {
s.Trace("send audio frame", "seq", audioFrame.Sequence)
}
err = handler.OnAudio(audioFrame.Wraps[awi].(A))
@@ -216,7 +223,7 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
sendVideoFrame := func() (err error) {
if vwi >= 0 {
if len(videoFrame.Wraps) > vwi {
if s.Enabled(s, TraceLevel) {
if s.Enabled(s, util.TraceLevel) {
s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize())
}
err = handler.OnVideo(videoFrame.Wraps[vwi].(V))
@@ -261,6 +268,9 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
if vr != nil {
for err == nil {
err = vr.ReadFrame(&s.Subscribe)
if prePublisher != s.Publisher {
break
}
if err == nil {
videoFrame = &vr.Value
err = s.Err()
@@ -315,6 +325,9 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
}
}
if err = ar.ReadFrame(&s.Subscribe); err == nil {
if prePublisher != s.Publisher {
break
}
audioFrame = &ar.Value
err = s.Err()
} else {
@@ -346,6 +359,7 @@ func PlayBlock0[A any, V any](s *Subscriber, handler SubscribeHandler[A, V]) (er
awi = s.createAudioReader(a1, startAudioTs)
}
checkPublisherChange()
// TODO: 优化 select
runtime.Gosched()
}
return