Files
monibuca/device.go
2024-10-24 14:51:05 +08:00

152 lines
3.3 KiB
Go

package m7s
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/go-ping/ping"
"gorm.io/gorm"
"m7s.live/pro/pkg/config"
"m7s.live/pro/pkg/task"
)
const (
DeviceStatusOffline byte = iota
DeviceStatusOnline
DeviceStatusPulling
DeviceStatusDisabled
)
type (
IDevice interface {
Pull()
}
Device struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
Name string
StreamPath string
PullOnStart bool
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
config.Record `gorm:"embedded;embeddedPrefix:record_"`
ParentID uint
Type string
Status byte
Description string
RTT time.Duration
Handler IDevice `gorm:"-:all" yaml:"-"`
}
DeviceManager struct {
task.Manager[uint32, *Device]
}
DeviceTask struct {
task.TickTask
Device *Device
Plugin *Plugin
}
HTTPDevice struct {
DeviceTask
url *url.URL
}
)
func (d *Device) GetStreamPath() string {
if d.StreamPath == "" {
return fmt.Sprintf("device/%s/%d", d.Type, d.ID)
}
return d.StreamPath
}
func (d *Device) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if devicePlugin, ok := plugin.handler.(IDevicePlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
deviceTask := devicePlugin.OnDeviceAdd(d)
if deviceTask == nil {
continue
}
if deviceTask, ok := deviceTask.(IDevice); ok {
d.Handler = deviceTask
}
if t, ok := deviceTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(DeviceStatusOnline)
}
}
}
return
}
func (d *Device) ChangeStatus(status byte) {
if d.Status == status {
return
}
from := d.Status
d.Info("device status changed", "from", from, "to", status)
d.Status = status
d.Update()
switch status {
case DeviceStatusOnline:
if d.PullOnStart && from == DeviceStatusOffline {
d.Handler.Pull()
}
case DeviceStatusPulling:
if from == DeviceStatusOnline && d.FilePath != "" {
if mp4Plugin, ok := d.server.Plugins.Get("MP4"); ok {
mp4Plugin.Record(d.GetStreamPath(), d.Record)
}
}
}
}
func (d *Device) Update() {
if d.server.DB != nil {
d.server.DB.Save(d)
}
}
func (d *HTTPDevice) Start() (err error) {
d.url, err = url.Parse(d.Device.URL)
return d.DeviceTask.Start()
}
func (d *HTTPDevice) GetTickInterval() time.Duration {
return time.Second * 10
}
func (d *HTTPDevice) Tick(any) {
pinger, err := ping.NewPinger(d.url.Hostname())
if err != nil {
d.Device.ChangeStatus(DeviceStatusOffline)
return
}
pinger.Count = 1
err = pinger.Run() // Blocks until finished.
if err != nil {
d.Device.ChangeStatus(DeviceStatusOffline)
return
}
stats := pinger.Statistics()
d.Device.RTT = stats.AvgRtt
d.Device.ChangeStatus(DeviceStatusOnline)
}
func (d *DeviceTask) Dispose() {
d.Device.ChangeStatus(DeviceStatusOffline)
d.TickTask.Dispose()
}
func (d *DeviceTask) Pull() {
d.Plugin.handler.Pull(d.Device.GetStreamPath(), d.Device.Pull)
}