mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-05 20:16:53 +08:00
152 lines
3.3 KiB
Go
152 lines
3.3 KiB
Go
package m7s
|
|
|
|
import (
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-ping/ping"
|
|
"gorm.io/gorm"
|
|
"m7s.live/pro/pkg/config"
|
|
"m7s.live/pro/pkg/task"
|
|
)
|
|
|
|
const (
|
|
DeviceStatusOffline byte = iota
|
|
DeviceStatusOnline
|
|
DeviceStatusPulling
|
|
DeviceStatusDisabled
|
|
)
|
|
|
|
type (
|
|
IDevice interface {
|
|
Pull()
|
|
}
|
|
Device struct {
|
|
server *Server `gorm:"-:all"`
|
|
task.Work `gorm:"-:all" yaml:"-"`
|
|
ID uint `gorm:"primarykey"`
|
|
CreatedAt, UpdatedAt time.Time `yaml:"-"`
|
|
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
|
|
Name string
|
|
StreamPath string
|
|
PullOnStart bool
|
|
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
|
|
config.Record `gorm:"embedded;embeddedPrefix:record_"`
|
|
ParentID uint
|
|
Type string
|
|
Status byte
|
|
Description string
|
|
RTT time.Duration
|
|
Handler IDevice `gorm:"-:all" yaml:"-"`
|
|
}
|
|
DeviceManager struct {
|
|
task.Manager[uint32, *Device]
|
|
}
|
|
DeviceTask struct {
|
|
task.TickTask
|
|
Device *Device
|
|
Plugin *Plugin
|
|
}
|
|
HTTPDevice struct {
|
|
DeviceTask
|
|
url *url.URL
|
|
}
|
|
)
|
|
|
|
func (d *Device) GetStreamPath() string {
|
|
if d.StreamPath == "" {
|
|
return fmt.Sprintf("device/%s/%d", d.Type, d.ID)
|
|
}
|
|
return d.StreamPath
|
|
}
|
|
|
|
func (d *Device) Start() (err error) {
|
|
for plugin := range d.server.Plugins.Range {
|
|
if devicePlugin, ok := plugin.handler.(IDevicePlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
|
|
deviceTask := devicePlugin.OnDeviceAdd(d)
|
|
if deviceTask == nil {
|
|
continue
|
|
}
|
|
if deviceTask, ok := deviceTask.(IDevice); ok {
|
|
d.Handler = deviceTask
|
|
}
|
|
if t, ok := deviceTask.(task.ITask); ok {
|
|
if ticker, ok := t.(task.IChannelTask); ok {
|
|
t.OnStart(func() {
|
|
ticker.Tick(nil)
|
|
})
|
|
}
|
|
d.AddTask(t)
|
|
} else {
|
|
d.ChangeStatus(DeviceStatusOnline)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (d *Device) ChangeStatus(status byte) {
|
|
if d.Status == status {
|
|
return
|
|
}
|
|
from := d.Status
|
|
d.Info("device status changed", "from", from, "to", status)
|
|
d.Status = status
|
|
d.Update()
|
|
switch status {
|
|
case DeviceStatusOnline:
|
|
if d.PullOnStart && from == DeviceStatusOffline {
|
|
d.Handler.Pull()
|
|
}
|
|
case DeviceStatusPulling:
|
|
if from == DeviceStatusOnline && d.FilePath != "" {
|
|
if mp4Plugin, ok := d.server.Plugins.Get("MP4"); ok {
|
|
mp4Plugin.Record(d.GetStreamPath(), d.Record)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Device) Update() {
|
|
if d.server.DB != nil {
|
|
d.server.DB.Save(d)
|
|
}
|
|
}
|
|
|
|
func (d *HTTPDevice) Start() (err error) {
|
|
d.url, err = url.Parse(d.Device.URL)
|
|
return d.DeviceTask.Start()
|
|
}
|
|
|
|
func (d *HTTPDevice) GetTickInterval() time.Duration {
|
|
return time.Second * 10
|
|
}
|
|
|
|
func (d *HTTPDevice) Tick(any) {
|
|
pinger, err := ping.NewPinger(d.url.Hostname())
|
|
if err != nil {
|
|
d.Device.ChangeStatus(DeviceStatusOffline)
|
|
return
|
|
}
|
|
pinger.Count = 1
|
|
err = pinger.Run() // Blocks until finished.
|
|
if err != nil {
|
|
d.Device.ChangeStatus(DeviceStatusOffline)
|
|
return
|
|
}
|
|
stats := pinger.Statistics()
|
|
d.Device.RTT = stats.AvgRtt
|
|
d.Device.ChangeStatus(DeviceStatusOnline)
|
|
}
|
|
|
|
func (d *DeviceTask) Dispose() {
|
|
d.Device.ChangeStatus(DeviceStatusOffline)
|
|
d.TickTask.Dispose()
|
|
}
|
|
|
|
func (d *DeviceTask) Pull() {
|
|
d.Plugin.handler.Pull(d.Device.GetStreamPath(), d.Device.Pull)
|
|
}
|