fix: device pull

This commit is contained in:
langhuihui
2024-10-24 14:51:05 +08:00
parent 2cbd351a8b
commit 5795d921f5
12 changed files with 77 additions and 48 deletions

View File

@@ -1,5 +1,5 @@
# Compile Stage
FROM golang:1.23.1-bullseye AS builder
FROM golang:1.23.2-bullseye AS builder
LABEL stage=gobuilder
@@ -22,7 +22,7 @@ RUN go build -tags sqlite -o ./build/monibuca ./example/default/main.go
RUN cp -r /monibuca/example/default/config.yaml /monibuca/build
# Running Stage
FROM alpine:latest
FROM alpine:3.20
WORKDIR /monibuca
COPY --from=builder /monibuca/build /monibuca/

23
api.go
View File

@@ -664,30 +664,35 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
if !canReplace {
defer s.OnSubscribe(req.StreamPath, u.Query())
}
if aliasStream, ok := s.AliasStreams.Get(req.Alias); ok { //modify alias
aliasStream.AutoRemove = req.AutoRemove
if aliasStream.StreamPath != req.StreamPath {
aliasStream.StreamPath = req.StreamPath
if aliasInfo, ok := s.AliasStreams.Get(req.Alias); ok { //modify alias
aliasInfo.AutoRemove = req.AutoRemove
if aliasInfo.StreamPath != req.StreamPath {
aliasInfo.StreamPath = req.StreamPath
if canReplace {
if aliasStream.Publisher != nil {
aliasStream.TransferSubscribers(publisher) // replace stream
if aliasInfo.Publisher != nil {
aliasInfo.TransferSubscribers(publisher) // replace stream
} else {
s.Waiting.WakeUp(req.Alias, publisher)
}
}
}
} else { // create alias
s.AliasStreams.Add(&AliasStream{
aliasInfo := &AliasStream{
AutoRemove: req.AutoRemove,
StreamPath: req.StreamPath,
Alias: req.Alias,
})
}
s.AliasStreams.Add(aliasInfo)
aliasStream, ok := s.Streams.Get(aliasInfo.Alias)
if canReplace {
if aliasStream, ok := s.Streams.Get(req.Alias); ok {
aliasInfo.Publisher = publisher
if ok {
aliasStream.TransferSubscribers(publisher) // replace stream
} else {
s.Waiting.WakeUp(req.Alias, publisher)
}
} else {
aliasInfo.Publisher = aliasStream
}
}
} else {

View File

@@ -66,8 +66,21 @@ func (d *Device) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if devicePlugin, ok := plugin.handler.(IDevicePlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
deviceTask := devicePlugin.OnDeviceAdd(d)
if deviceTask != nil {
d.AddTask(deviceTask)
if deviceTask == nil {
continue
}
if deviceTask, ok := deviceTask.(IDevice); ok {
d.Handler = deviceTask
}
if t, ok := deviceTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(DeviceStatusOnline)
}
}
}
@@ -104,7 +117,7 @@ func (d *Device) Update() {
func (d *HTTPDevice) Start() (err error) {
d.url, err = url.Parse(d.Device.URL)
return
return d.DeviceTask.Start()
}
func (d *HTTPDevice) GetTickInterval() time.Duration {

View File

@@ -11,3 +11,7 @@ gb28181:
onsub:
pull:
.* : $0
mp4:
onsub:
pull:
^vod/(.+)$: live/$1

View File

@@ -4,6 +4,7 @@ import (
"database/sql/driver"
"fmt"
"net/http"
"net/url"
"time"
"github.com/mcuadros/go-defaults"
@@ -48,8 +49,8 @@ type (
RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔
Proxy string `desc:"代理地址"` // 代理地址
Header HTTPValus
Args HTTPValus
PubConf *Publish `gorm:"-:all"`
Args HTTPValus `gorm:"-:all"` // 拉流参数
PubConf *Publish `gorm:"-:all"`
}
Push struct {
URL string `desc:"推送地址"` // 推送地址
@@ -125,3 +126,7 @@ func (v *HTTPValus) Scan(value any) error {
func (v HTTPValus) Value() (driver.Value, error) {
return yaml.Marshal(v)
}
func (v HTTPValus) Get(key string) string {
return url.Values(v).Get(key)
}

View File

@@ -84,7 +84,7 @@ type (
}
IDevicePlugin interface {
OnDeviceAdd(device *Device) task.ITask
OnDeviceAdd(device *Device) any
}
)
@@ -411,11 +411,7 @@ func (p *Plugin) OnSubscribe(streamPath string, args url.Values) {
p.handler.Pull(streamPath, conf)
}
}
for device := range p.Server.Devices.Range {
if device.Status == DeviceStatusOnline && device.GetStreamPath() == streamPath && !device.PullOnStart {
device.Handler.Pull()
}
}
//if !avoidTrans {
// for reg, conf := range plugin.GetCommonConf().OnSub.Transform {
// if plugin.Meta.Transformer != nil {
@@ -488,6 +484,9 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber *
func (p *Plugin) Pull(streamPath string, conf config.Pull) {
puller := p.Meta.Puller(conf)
if puller == nil {
return
}
puller.GetPullJob().Init(puller, p, streamPath, conf)
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/gobwas/ws/wsutil"
m7s "m7s.live/pro"
"m7s.live/pro/pkg/task"
"m7s.live/pro/pkg/util"
. "m7s.live/pro/plugin/flv/pkg"
)
@@ -79,7 +78,7 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = live.Run()
}
func (plugin *FLVPlugin) OnDeviceAdd(device *m7s.Device) (ret task.ITask) {
func (plugin *FLVPlugin) OnDeviceAdd(device *m7s.Device) any {
d := &FLVDevice{}
d.Device = device
d.Plugin = &plugin.Plugin

View File

@@ -103,16 +103,15 @@ func (gb *GB28181Plugin) OnInit() (err error) {
return
}
func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) (ret task.ITask) {
func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) any {
deviceID, channelID, _ := strings.Cut(device.URL, "/")
if d, ok := p.devices.Get(deviceID); ok {
if channel, ok := d.channels.Get(channelID); ok {
channel.AbstractDevice = device
device.Handler = channel
device.ChangeStatus(m7s.DeviceStatusOnline)
return channel
}
}
return
return nil
}
func (gb *GB28181Plugin) RegisterHandler() map[string]http.HandlerFunc {

View File

@@ -7,7 +7,6 @@ import (
"strings"
"time"
"github.com/deepch/vdk/codec/h265parser"
m7s "m7s.live/pro"
"m7s.live/pro/pkg/codec"
"m7s.live/pro/pkg/config"
@@ -27,7 +26,10 @@ func NewPuller(conf config.Pull) m7s.IPuller {
if strings.HasPrefix(conf.URL, "http") || strings.HasSuffix(conf.URL, ".mp4") {
return &HTTPReader{}
}
return &RecordReader{}
if conf.Args.Get(m7s.StartKey) != "" {
return &RecordReader{}
}
return nil
}
func (p *RecordReader) Run() (err error) {
@@ -46,6 +48,9 @@ func (p *RecordReader) Run() (err error) {
}
for i, stream := range p.Streams {
tsOffset = ts
if p.File != nil {
p.File.Close()
}
p.File, err = os.Open(stream.FilePath)
if err != nil {
return
@@ -76,7 +81,8 @@ func (p *RecordReader) Run() (err error) {
}
startTimestamp := p.PullStartTime.Sub(stream.StartTime).Milliseconds()
if _, err = p.demuxer.SeekTime(uint64(startTimestamp)); err != nil {
return
tsOffset = 0
continue
}
tsOffset = -startTimestamp
}
@@ -88,10 +94,10 @@ func (p *RecordReader) Run() (err error) {
if publisher.Paused != nil {
publisher.Paused.Await()
}
if _, err = p.demuxer.reader.Seek(int64(sample.Offset), io.SeekStart); err != nil {
if _, err = p.demuxer.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return
}
sample.Data = allocator.Malloc(int(sample.Size))
sample.Data = allocator.Malloc(sample.Size)
if _, err = io.ReadFull(p.demuxer.reader, sample.Data); err != nil {
allocator.Free(sample.Data)
return
@@ -102,32 +108,21 @@ func (p *RecordReader) Run() (err error) {
}
switch track.Cid {
case box.MP4_CODEC_H264:
keyFrame := codec.ParseH264NALUType(sample.Data[5]) == codec.NALU_IDR_Picture
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.PTS - sample.DTS)
videoFrame.Timestamp = uint32(ts)
videoFrame.AppendOne([]byte{util.Conditional[byte](keyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteVideo(&videoFrame)
case box.MP4_CODEC_H265:
var keyFrame bool
switch codec.ParseH265NALUType(sample.Data[5]) {
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
keyFrame = true
}
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.PTS - sample.DTS)
videoFrame.Timestamp = uint32(ts)
var head []byte
var b0 byte = 0b1010_0000
if keyFrame {
if sample.KeyFrame {
b0 = 0b1001_0000
}
if videoFrame.CTS == 0 {

View File

@@ -24,11 +24,10 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
return ret
}
func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) task.ITask {
func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) any {
ret := &RTSPDevice{}
ret.Device = device
ret.Plugin = &p.Plugin
ret.Logger = p.With("device", device.Name)
device.Handler = ret
return ret
}

View File

@@ -217,3 +217,9 @@ func (p *RecordFilePuller) Start() (err error) {
p.Info("vod", "streams", p.Streams)
return
}
func (p *RecordFilePuller) Dispose() {
if p.File != nil {
p.File.Close()
}
}

View File

@@ -393,6 +393,11 @@ func (s *Server) OnSubscribe(streamPath string, args url.Values) {
for plugin := range s.Plugins.Range {
plugin.OnSubscribe(streamPath, args)
}
for device := range s.Devices.Range {
if device.Status == DeviceStatusOnline && device.GetStreamPath() == streamPath && !device.PullOnStart {
device.Handler.Pull()
}
}
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {