refactor: device change to pullproxy

This commit is contained in:
langhuihui
2024-12-04 14:07:22 +08:00
parent 645596d319
commit 04fbefd537
34 changed files with 3678 additions and 1562 deletions

172
api.go
View File

@@ -47,6 +47,7 @@ func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoRespon
Data: &pb.SysInfoData{
Version: Version,
LocalIP: localIP,
PublicIP: util.GetPublicIP(localIP),
StartTime: timestamppb.New(s.StartTime),
GoVersion: runtime.Version(),
Os: runtime.GOOS,
@@ -57,13 +58,27 @@ func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoRespon
for p := range s.Plugins.Range {
res.Data.Plugins = append(res.Data.Plugins, &pb.PluginInfo{
Name: p.Meta.Name,
Disabled: p.Disabled,
PushAddr: p.PushAddr,
PlayAddr: p.PlayAddr,
Description: p.GetDescriptions(),
})
}
return
}
func (s *Server) DisabledPlugins(ctx context.Context, _ *emptypb.Empty) (res *pb.DisabledPluginsResponse, err error) {
res = &pb.DisabledPluginsResponse{
Data: make([]*pb.PluginInfo, len(s.disabledPlugins)),
}
for i, p := range s.disabledPlugins {
res.Data[i] = &pb.PluginInfo{
Name: p.Meta.Name,
Description: p.GetDescriptions(),
}
}
return
}
// /api/stream/annexb/{streamPath}
func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
publisher, ok := s.Streams.Get(r.PathValue("streamPath"))
@@ -166,7 +181,17 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
var fillData func(m task.ITask) *pb.TaskTreeData
fillData = func(m task.ITask) (res *pb.TaskTreeData) {
res = &pb.TaskTreeData{Id: m.GetTaskID(), Pointer: uint64(uintptr(unsafe.Pointer(m.GetTask()))), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: m.GetDescriptions()}
t := m.GetTask()
res = &pb.TaskTreeData{
Id: m.GetTaskID(),
Pointer: uint64(uintptr(unsafe.Pointer(t))),
State: uint32(m.GetState()),
Type: uint32(m.GetTaskType()),
Owner: m.GetOwnerType(),
StartTime: timestamppb.New(t.StartTime),
Description: m.GetDescriptions(),
StartReason: t.StartReason,
}
if job, ok := m.(task.IJob); ok {
if blockedTask := job.Blocked(); blockedTask != nil {
res.Blocked = fillData(blockedTask)
@@ -353,7 +378,6 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
}
}
pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
//if v.TryRLock() {
if len(v.Wraps) > 0 {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
@@ -371,8 +395,6 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
}
data.Ring = append(data.Ring, &snap)
}
//v.RUnlock()
//}
})
res = &pb.TrackSnapShotResponse{
Code: 0,
@@ -676,10 +698,10 @@ func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (r
return
}
func (s *Server) GetDeviceList(ctx context.Context, req *emptypb.Empty) (res *pb.DeviceListResponse, err error) {
res = &pb.DeviceListResponse{}
for device := range s.Devices.Range {
res.Data = append(res.Data, &pb.DeviceInfo{
func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) {
res = &pb.PullProxyListResponse{}
for device := range s.PullProxies.Range {
res.Data = append(res.Data, &pb.PullProxyInfo{
Name: device.Name,
CreateTime: timestamppb.New(device.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt),
@@ -701,8 +723,8 @@ func (s *Server) GetDeviceList(ctx context.Context, req *emptypb.Empty) (res *pb
return
}
func (s *Server) AddDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.SuccessResponse, err error) {
device := &Device{
func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PullProxy{
server: s,
Name: req.Name,
Type: req.Type,
@@ -723,17 +745,17 @@ func (s *Server) AddDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.Suc
return
}
s.DB.Create(device)
s.Devices.Add(device)
s.PullProxies.Add(device)
res = &pb.SuccessResponse{}
return
}
func (s *Server) UpdateDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.SuccessResponse, err error) {
func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
target := &Device{}
target := &PullProxy{}
s.DB.First(target, req.ID)
target.Name = req.Name
target.URL = req.PullURL
@@ -752,33 +774,33 @@ func (s *Server) UpdateDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.
return
}
func (s *Server) RemoveDevice(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
func (s *Server) RemovePullProxy(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
res = &pb.SuccessResponse{}
if req.Id > 0 {
tx := s.DB.Delete(&Device{
tx := s.DB.Delete(&PullProxy{
ID: uint(req.Id),
})
err = tx.Error
s.Devices.Call(func() error {
if device, ok := s.Devices.Get(uint(req.Id)); ok {
s.PullProxies.Call(func() error {
if device, ok := s.PullProxies.Get(uint(req.Id)); ok {
device.Stop(task.ErrStopByUser)
}
return nil
})
return
} else if req.StreamPath != "" {
var deviceList []Device
var deviceList []PullProxy
s.DB.Find(&deviceList, "stream_path=?", req.StreamPath)
if len(deviceList) > 0 {
for _, device := range deviceList {
tx := s.DB.Delete(&Device{}, device.ID)
tx := s.DB.Delete(&PullProxy{}, device.ID)
err = tx.Error
s.Devices.Call(func() error {
if device, ok := s.Devices.Get(uint(device.ID)); ok {
s.PullProxies.Call(func() error {
if device, ok := s.PullProxies.Get(uint(device.ID)); ok {
device.Stop(task.ErrStopByUser)
}
return nil
@@ -889,3 +911,109 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
})
return
}
func (s *Server) GetPushProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PushProxyListResponse, err error) {
res = &pb.PushProxyListResponse{}
for device := range s.PushProxies.Range {
res.Data = append(res.Data, &pb.PushProxyInfo{
Name: device.Name,
CreateTime: timestamppb.New(device.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt),
Type: device.Type,
PushURL: device.URL,
ParentID: uint32(device.ParentID),
Status: uint32(device.Status),
ID: uint32(device.ID),
PushOnStart: device.PushOnStart,
Audio: device.Audio,
Description: device.Description,
Rtt: uint32(device.RTT.Milliseconds()),
StreamPath: device.GetStreamPath(),
})
}
return
}
func (s *Server) AddPushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PushProxy{
server: s,
Name: req.Name,
Type: req.Type,
ParentID: uint(req.ParentID),
PushOnStart: req.PushOnStart,
Description: req.Description,
StreamPath: req.StreamPath,
}
defaults.SetDefaults(&device.Push)
device.URL = req.PushURL
device.Audio = req.Audio
if s.DB == nil {
err = pkg.ErrNoDB
return
}
s.DB.Create(device)
s.PushProxies.Add(device)
res = &pb.SuccessResponse{}
return
}
func (s *Server) UpdatePushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
target := &PushProxy{}
s.DB.First(target, req.ID)
target.Name = req.Name
target.URL = req.PushURL
target.ParentID = uint(req.ParentID)
target.Type = req.Type
target.PushOnStart = req.PushOnStart
target.Audio = req.Audio
target.Description = req.Description
target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond
target.StreamPath = req.StreamPath
s.DB.Save(target)
res = &pb.SuccessResponse{}
return
}
func (s *Server) RemovePushProxy(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
res = &pb.SuccessResponse{}
if req.Id > 0 {
tx := s.DB.Delete(&PushProxy{
ID: uint(req.Id),
})
err = tx.Error
s.PushProxies.Call(func() error {
if device, ok := s.PushProxies.Get(uint(req.Id)); ok {
device.Stop(task.ErrStopByUser)
}
return nil
})
return
} else if req.StreamPath != "" {
var deviceList []PushProxy
s.DB.Find(&deviceList, "stream_path=?", req.StreamPath)
if len(deviceList) > 0 {
for _, device := range deviceList {
tx := s.DB.Delete(&PushProxy{}, device.ID)
err = tx.Error
s.PushProxies.Call(func() error {
if device, ok := s.PushProxies.Get(uint(device.ID)); ok {
device.Stop(task.ErrStopByUser)
}
return nil
})
}
}
return
} else {
res.Message = "parameter wrong"
return
}
}

175
device.go
View File

@@ -1,175 +0,0 @@
package m7s
import (
"fmt"
"net"
"net/url"
"strings"
"time"
"gorm.io/gorm"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
const (
DeviceStatusOffline byte = iota
DeviceStatusOnline
DeviceStatusPulling
DeviceStatusDisabled
)
type (
IDevice interface {
Pull()
}
Device struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"`
Name string
StreamPath string
PullOnStart, Audio, StopOnIdle bool
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
config.Record `gorm:"embedded;embeddedPrefix:record_"`
ParentID uint
Type string
Status byte
Description string
RTT time.Duration
Handler IDevice `gorm:"-:all" yaml:"-"`
}
DeviceManager struct {
task.Manager[uint, *Device]
}
DeviceTask struct {
task.TickTask
Device *Device
Plugin *Plugin
}
HTTPDevice struct {
DeviceTask
tcpAddr *net.TCPAddr
url *url.URL
}
)
func (d *Device) GetKey() uint {
return d.ID
}
func (d *Device) GetStreamPath() string {
if d.StreamPath == "" {
return fmt.Sprintf("device/%s/%d", d.Type, d.ID)
}
return d.StreamPath
}
func (d *Device) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if devicePlugin, ok := plugin.handler.(IDevicePlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
deviceTask := devicePlugin.OnDeviceAdd(d)
if deviceTask == nil {
continue
}
if deviceTask, ok := deviceTask.(IDevice); ok {
d.Handler = deviceTask
}
if t, ok := deviceTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(DeviceStatusOnline)
}
}
}
return
}
func (d *Device) ChangeStatus(status byte) {
if d.Status == status {
return
}
from := d.Status
d.Info("device status changed", "from", from, "to", status)
d.Status = status
d.Update()
switch status {
case DeviceStatusOnline:
if d.PullOnStart && from == DeviceStatusOffline {
d.Handler.Pull()
}
}
}
func (d *Device) Update() {
if d.server.DB != nil {
d.server.DB.Omit("deleted_at").Save(d)
}
}
func (d *DeviceTask) Dispose() {
d.Device.ChangeStatus(DeviceStatusOffline)
d.TickTask.Dispose()
d.Plugin.Server.Streams.Call(func() error {
if stream, ok := d.Plugin.Server.Streams.Get(d.Device.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser)
}
return nil
})
}
func (d *DeviceTask) Pull() {
var pubConf = d.Plugin.config.Publish
pubConf.PubAudio = d.Device.Audio
pubConf.DelayCloseTimeout = util.Conditional(d.Device.StopOnIdle, time.Second*5, 0)
d.Plugin.handler.Pull(d.Device.GetStreamPath(), d.Device.Pull, &pubConf)
}
func (d *HTTPDevice) Start() (err error) {
d.url, err = url.Parse(d.Device.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.url.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.url.Hostname())
} else {
d.tcpAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.url.Port()))
if err != nil {
return err
}
if d.tcpAddr.Port == 0 {
if d.url.Scheme == "https" || d.url.Scheme == "wss" {
d.tcpAddr.Port = 443
} else {
d.tcpAddr.Port = 80
}
}
}
return d.DeviceTask.Start()
}
func (d *HTTPDevice) GetTickInterval() time.Duration {
return time.Second * 10
}
func (d *HTTPDevice) Tick(any) {
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.tcpAddr)
if err != nil {
d.Device.ChangeStatus(DeviceStatusOffline)
return
}
conn.Close()
d.Device.RTT = time.Since(startTime)
d.Device.ChangeStatus(DeviceStatusOnline)
}

File diff suppressed because it is too large Load Diff

View File

@@ -50,6 +50,24 @@ func local_request_Api_SysInfo_0(ctx context.Context, marshaler runtime.Marshale
}
func request_Api_DisabledPlugins_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.DisabledPlugins(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_DisabledPlugins_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.DisabledPlugins(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_Summary_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
@@ -1234,51 +1252,51 @@ func local_request_Api_ModifyConfig_0(ctx context.Context, marshaler runtime.Mar
}
func request_Api_GetDeviceList_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func request_Api_GetPullProxyList_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.GetDeviceList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
msg, err := client.GetPullProxyList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_GetDeviceList_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func local_request_Api_GetPullProxyList_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.GetDeviceList(ctx, &protoReq)
msg, err := server.GetPullProxyList(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_AddDevice_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeviceInfo
func request_Api_AddPullProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PullProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.AddDevice(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
msg, err := client.AddPullProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_AddDevice_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeviceInfo
func local_request_Api_AddPullProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PullProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.AddDevice(ctx, &protoReq)
msg, err := server.AddPullProxy(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_RemoveDevice_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func request_Api_RemovePullProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestWithId
var metadata runtime.ServerMetadata
@@ -1303,12 +1321,12 @@ func request_Api_RemoveDevice_0(ctx context.Context, marshaler runtime.Marshaler
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.RemoveDevice(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
msg, err := client.RemovePullProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_RemoveDevice_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func local_request_Api_RemovePullProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestWithId
var metadata runtime.ServerMetadata
@@ -1333,33 +1351,163 @@ func local_request_Api_RemoveDevice_0(ctx context.Context, marshaler runtime.Mar
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.RemoveDevice(ctx, &protoReq)
msg, err := server.RemovePullProxy(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_UpdateDevice_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeviceInfo
func request_Api_UpdatePullProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PullProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.UpdateDevice(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
msg, err := client.UpdatePullProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_UpdateDevice_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeviceInfo
func local_request_Api_UpdatePullProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PullProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.UpdateDevice(ctx, &protoReq)
msg, err := server.UpdatePullProxy(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_GetPushProxyList_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.GetPushProxyList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_GetPushProxyList_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.GetPushProxyList(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_AddPushProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.AddPushProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_AddPushProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.AddPushProxy(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_RemovePushProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestWithId
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.RemovePushProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_RemovePushProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestWithId
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.RemovePushProxy(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_UpdatePushProxy_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.UpdatePushProxy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_UpdatePushProxy_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushProxyInfo
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.UpdatePushProxy(ctx, &protoReq)
return msg, metadata, err
}
@@ -1382,6 +1530,50 @@ func local_request_Api_GetRecording_0(ctx context.Context, marshaler runtime.Mar
}
func request_Api_GetPushes_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.GetPushes(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_GetPushes_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.GetPushes(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_AddPush_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq AddPushRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.AddPush(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_AddPush_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq AddPushRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.AddPush(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@@ -1413,6 +1605,31 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
})
mux.Handle("GET", pattern_Api_DisabledPlugins_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/DisabledPlugins", runtime.WithHTTPPathPattern("/api/plugins/disabled"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_DisabledPlugins_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_DisabledPlugins_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_Summary_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -2063,7 +2280,7 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
})
mux.Handle("GET", pattern_Api_GetDeviceList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("GET", pattern_Api_GetPullProxyList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
@@ -2071,12 +2288,12 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/GetDeviceList", runtime.WithHTTPPathPattern("/api/device/list"))
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/GetPullProxyList", runtime.WithHTTPPathPattern("/api/proxy/pull/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_GetDeviceList_0(annotatedContext, inboundMarshaler, server, req, pathParams)
resp, md, err := local_request_Api_GetPullProxyList_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
@@ -2084,11 +2301,11 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
return
}
forward_Api_GetDeviceList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_GetPullProxyList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_AddPullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
@@ -2096,12 +2313,12 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/AddDevice", runtime.WithHTTPPathPattern("/api/device/add"))
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/AddPullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_AddDevice_0(annotatedContext, inboundMarshaler, server, req, pathParams)
resp, md, err := local_request_Api_AddPullProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
@@ -2109,11 +2326,11 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
return
}
forward_Api_AddDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_AddPullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RemoveDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_RemovePullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
@@ -2121,12 +2338,12 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/RemoveDevice", runtime.WithHTTPPathPattern("/api/device/remove/{id}"))
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/RemovePullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/remove/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_RemoveDevice_0(annotatedContext, inboundMarshaler, server, req, pathParams)
resp, md, err := local_request_Api_RemovePullProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
@@ -2134,11 +2351,11 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
return
}
forward_Api_RemoveDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_RemovePullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdateDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_UpdatePullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
@@ -2146,12 +2363,12 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/UpdateDevice", runtime.WithHTTPPathPattern("/api/device/update"))
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/UpdatePullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/update"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_UpdateDevice_0(annotatedContext, inboundMarshaler, server, req, pathParams)
resp, md, err := local_request_Api_UpdatePullProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
@@ -2159,7 +2376,107 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
return
}
forward_Api_UpdateDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_UpdatePullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_GetPushProxyList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/GetPushProxyList", runtime.WithHTTPPathPattern("/api/proxy/push/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_GetPushProxyList_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetPushProxyList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddPushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/AddPushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_AddPushProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_AddPushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RemovePushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/RemovePushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/remove/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_RemovePushProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_RemovePushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdatePushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/UpdatePushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/update"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_UpdatePushProxy_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_UpdatePushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
@@ -2188,6 +2505,56 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
})
mux.Handle("GET", pattern_Api_GetPushes_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/GetPushes", runtime.WithHTTPPathPattern("/api/push/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_GetPushes_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetPushes_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddPush_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/AddPush", runtime.WithHTTPPathPattern("/api/push/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_AddPush_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_AddPush_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@@ -2251,6 +2618,28 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
})
mux.Handle("GET", pattern_Api_DisabledPlugins_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/DisabledPlugins", runtime.WithHTTPPathPattern("/api/plugins/disabled"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_DisabledPlugins_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_DisabledPlugins_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_Summary_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -2823,91 +3212,179 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
})
mux.Handle("GET", pattern_Api_GetDeviceList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("GET", pattern_Api_GetPullProxyList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/GetDeviceList", runtime.WithHTTPPathPattern("/api/device/list"))
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/GetPullProxyList", runtime.WithHTTPPathPattern("/api/proxy/pull/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_GetDeviceList_0(annotatedContext, inboundMarshaler, client, req, pathParams)
resp, md, err := request_Api_GetPullProxyList_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetDeviceList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_GetPullProxyList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_AddPullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/AddDevice", runtime.WithHTTPPathPattern("/api/device/add"))
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/AddPullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_AddDevice_0(annotatedContext, inboundMarshaler, client, req, pathParams)
resp, md, err := request_Api_AddPullProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_AddDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_AddPullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RemoveDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_RemovePullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/RemoveDevice", runtime.WithHTTPPathPattern("/api/device/remove/{id}"))
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/RemovePullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/remove/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_RemoveDevice_0(annotatedContext, inboundMarshaler, client, req, pathParams)
resp, md, err := request_Api_RemovePullProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_RemoveDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_RemovePullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdateDevice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_UpdatePullProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/UpdateDevice", runtime.WithHTTPPathPattern("/api/device/update"))
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/UpdatePullProxy", runtime.WithHTTPPathPattern("/api/proxy/pull/update"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_UpdateDevice_0(annotatedContext, inboundMarshaler, client, req, pathParams)
resp, md, err := request_Api_UpdatePullProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_UpdateDevice_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_UpdatePullProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_GetPushProxyList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/GetPushProxyList", runtime.WithHTTPPathPattern("/api/proxy/push/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_GetPushProxyList_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetPushProxyList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddPushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/AddPushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_AddPushProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_AddPushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RemovePushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/RemovePushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/remove/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_RemovePushProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_RemovePushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdatePushProxy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/UpdatePushProxy", runtime.WithHTTPPathPattern("/api/proxy/push/update"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_UpdatePushProxy_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_UpdatePushProxy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
@@ -2933,12 +3410,58 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
})
mux.Handle("GET", pattern_Api_GetPushes_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/GetPushes", runtime.WithHTTPPathPattern("/api/push/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_GetPushes_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetPushes_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_AddPush_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/AddPush", runtime.WithHTTPPathPattern("/api/push/add"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_AddPush_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_AddPush_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Api_SysInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"api", "sysinfo"}, ""))
pattern_Api_DisabledPlugins_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "plugins", "disabled"}, ""))
pattern_Api_Summary_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"api", "summary"}, ""))
pattern_Api_Shutdown_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"api", "shutdown"}, ""))
@@ -2991,20 +3514,34 @@ var (
pattern_Api_ModifyConfig_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "config", "modify", "name"}, ""))
pattern_Api_GetDeviceList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "device", "list"}, ""))
pattern_Api_GetPullProxyList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "pull", "list"}, ""))
pattern_Api_AddDevice_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "device", "add"}, ""))
pattern_Api_AddPullProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "pull", "add"}, ""))
pattern_Api_RemoveDevice_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "device", "remove", "id"}, ""))
pattern_Api_RemovePullProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3, 1, 0, 4, 1, 5, 4}, []string{"api", "proxy", "pull", "remove", "id"}, ""))
pattern_Api_UpdateDevice_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "device", "update"}, ""))
pattern_Api_UpdatePullProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "pull", "update"}, ""))
pattern_Api_GetPushProxyList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "push", "list"}, ""))
pattern_Api_AddPushProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "push", "add"}, ""))
pattern_Api_RemovePushProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3, 1, 0, 4, 1, 5, 4}, []string{"api", "proxy", "push", "remove", "id"}, ""))
pattern_Api_UpdatePushProxy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"api", "proxy", "push", "update"}, ""))
pattern_Api_GetRecording_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "record", "list"}, ""))
pattern_Api_GetPushes_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "push", "list"}, ""))
pattern_Api_AddPush_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "push", "add"}, ""))
)
var (
forward_Api_SysInfo_0 = runtime.ForwardResponseMessage
forward_Api_DisabledPlugins_0 = runtime.ForwardResponseMessage
forward_Api_Summary_0 = runtime.ForwardResponseMessage
forward_Api_Shutdown_0 = runtime.ForwardResponseMessage
@@ -3057,13 +3594,25 @@ var (
forward_Api_ModifyConfig_0 = runtime.ForwardResponseMessage
forward_Api_GetDeviceList_0 = runtime.ForwardResponseMessage
forward_Api_GetPullProxyList_0 = runtime.ForwardResponseMessage
forward_Api_AddDevice_0 = runtime.ForwardResponseMessage
forward_Api_AddPullProxy_0 = runtime.ForwardResponseMessage
forward_Api_RemoveDevice_0 = runtime.ForwardResponseMessage
forward_Api_RemovePullProxy_0 = runtime.ForwardResponseMessage
forward_Api_UpdateDevice_0 = runtime.ForwardResponseMessage
forward_Api_UpdatePullProxy_0 = runtime.ForwardResponseMessage
forward_Api_GetPushProxyList_0 = runtime.ForwardResponseMessage
forward_Api_AddPushProxy_0 = runtime.ForwardResponseMessage
forward_Api_RemovePushProxy_0 = runtime.ForwardResponseMessage
forward_Api_UpdatePushProxy_0 = runtime.ForwardResponseMessage
forward_Api_GetRecording_0 = runtime.ForwardResponseMessage
forward_Api_GetPushes_0 = runtime.ForwardResponseMessage
forward_Api_AddPush_0 = runtime.ForwardResponseMessage
)

View File

@@ -13,6 +13,11 @@ service api {
get: "/api/sysinfo"
};
}
rpc DisabledPlugins (google.protobuf.Empty) returns (DisabledPluginsResponse) {
option (google.api.http) = {
get: "/api/plugins/disabled"
};
}
rpc Summary (google.protobuf.Empty) returns (SummaryResponse) {
option (google.api.http) = {
get: "/api/summary"
@@ -153,26 +158,49 @@ service api {
body: "yaml"
};
}
rpc GetDeviceList (google.protobuf.Empty) returns (DeviceListResponse) {
rpc GetPullProxyList (google.protobuf.Empty) returns (PullProxyListResponse) {
option (google.api.http) = {
get: "/api/device/list"
get: "/api/proxy/pull/list"
};
}
rpc AddDevice (DeviceInfo) returns (SuccessResponse) {
rpc AddPullProxy (PullProxyInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/add"
post: "/api/proxy/pull/add"
body: "*"
};
}
rpc RemoveDevice (RequestWithId) returns (SuccessResponse) {
rpc RemovePullProxy (RequestWithId) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/remove/{id}"
post: "/api/proxy/pull/remove/{id}"
body: "*"
};
}
rpc UpdateDevice (DeviceInfo) returns (SuccessResponse) {
rpc UpdatePullProxy (PullProxyInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/update"
post: "/api/proxy/pull/update"
body: "*"
};
}
rpc GetPushProxyList (google.protobuf.Empty) returns (PushProxyListResponse) {
option (google.api.http) = {
get: "/api/proxy/push/list"
};
}
rpc AddPushProxy (PushProxyInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/proxy/push/add"
body: "*"
};
}
rpc RemovePushProxy (RequestWithId) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/proxy/push/remove/{id}"
body: "*"
};
}
rpc UpdatePushProxy (PushProxyInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/proxy/push/update"
body: "*"
};
}
@@ -181,6 +209,23 @@ service api {
get: "/api/record/list"
};
}
rpc GetPushes (google.protobuf.Empty) returns (PushListResponse) {
option (google.api.http) = {
get: "/api/push/list"
};
}
rpc AddPush (AddPushRequest) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/push/add"
body: "*"
};
}
}
message DisabledPluginsResponse {
int32 code = 1;
string message = 2;
repeated PluginInfo data = 3;
}
message GetConfigRequest {
@@ -257,19 +302,21 @@ message SummaryResponse {
message PluginInfo {
string name = 1;
bool disabled = 2;
map<string, string> description = 3;
repeated string pushAddr = 2;
repeated string playAddr = 3;
map<string, string> description = 4;
}
message SysInfoData {
google.protobuf.Timestamp startTime = 1;
string localIP = 2;
string version = 3;
string goVersion = 4;
string os = 5;
string arch = 6;
int32 cpus = 7;
repeated PluginInfo plugins = 8;
string publicIP = 3;
string version = 4;
string goVersion = 5;
string os = 6;
string arch = 7;
int32 cpus = 8;
repeated PluginInfo plugins = 9;
}
message SysInfoResponse {
@@ -288,6 +335,7 @@ message TaskTreeData {
uint32 state = 7;
TaskTreeData blocked = 8;
uint64 pointer = 9;
string startReason = 10;
}
message TaskTreeResponse {
@@ -454,13 +502,13 @@ message SubscribersResponse {
repeated SubscriberSnapShot data = 6;
}
message DeviceListResponse {
message PullProxyListResponse {
int32 code = 1;
string message = 2;
repeated DeviceInfo data = 3;
repeated PullProxyInfo data = 3;
}
message DeviceInfo {
message PullProxyInfo {
uint32 ID = 1;
google.protobuf.Timestamp createTime = 2;
google.protobuf.Timestamp updateTime = 3; // 更新时间
@@ -480,6 +528,29 @@ message DeviceInfo {
string streamPath = 16; // 流路径
}
message PushProxyInfo {
uint32 ID = 1;
google.protobuf.Timestamp createTime = 2;
google.protobuf.Timestamp updateTime = 3;
uint32 parentID = 4; // 父设备ID
string name = 5; // 设备名称
string type = 6; // 设备类型
uint32 status = 7; // 设备状态
string pushURL = 8; // 推流地址
bool pushOnStart = 9; // 启动时推流
bool audio = 10; // 是否推音频
string description = 11; // 设备描述
uint32 rtt = 12; // 平均RTT
string streamPath = 13; // 流路径
}
message PushProxyListResponse {
int32 code = 1;
string message = 2;
repeated PushProxyInfo data = 3;
}
message SetStreamAliasRequest {
string streamPath = 1;
string alias = 2;
@@ -519,4 +590,22 @@ message RecordingListResponse {
int32 code = 1;
string message = 2;
repeated Recording data = 3;
}
message PushInfo {
string streamPath = 1;
string targetURL = 2;
google.protobuf.Timestamp startTime = 3;
string status = 4;
}
message PushListResponse {
int32 code = 1;
string message = 2;
repeated PushInfo data = 3;
}
message AddPushRequest {
string streamPath = 1;
string targetURL = 2;
}

View File

@@ -24,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ApiClient interface {
SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error)
DisabledPlugins(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DisabledPluginsResponse, error)
Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error)
Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
@@ -50,11 +51,17 @@ type ApiClient interface {
GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error)
GetDeviceList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DeviceListResponse, error)
AddDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
RemoveDevice(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
UpdateDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
GetPullProxyList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PullProxyListResponse, error)
AddPullProxy(ctx context.Context, in *PullProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
RemovePullProxy(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
UpdatePullProxy(ctx context.Context, in *PullProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
GetPushProxyList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PushProxyListResponse, error)
AddPushProxy(ctx context.Context, in *PushProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
RemovePushProxy(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
UpdatePushProxy(ctx context.Context, in *PushProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
GetRecording(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*RecordingListResponse, error)
GetPushes(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PushListResponse, error)
AddPush(ctx context.Context, in *AddPushRequest, opts ...grpc.CallOption) (*SuccessResponse, error)
}
type apiClient struct {
@@ -74,6 +81,15 @@ func (c *apiClient) SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc
return out, nil
}
func (c *apiClient) DisabledPlugins(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DisabledPluginsResponse, error) {
out := new(DisabledPluginsResponse)
err := c.cc.Invoke(ctx, "/global.api/DisabledPlugins", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error) {
out := new(SummaryResponse)
err := c.cc.Invoke(ctx, "/global.api/Summary", in, out, opts...)
@@ -308,36 +324,72 @@ func (c *apiClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, o
return out, nil
}
func (c *apiClient) GetDeviceList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DeviceListResponse, error) {
out := new(DeviceListResponse)
err := c.cc.Invoke(ctx, "/global.api/GetDeviceList", in, out, opts...)
func (c *apiClient) GetPullProxyList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PullProxyListResponse, error) {
out := new(PullProxyListResponse)
err := c.cc.Invoke(ctx, "/global.api/GetPullProxyList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) AddDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) AddPullProxy(ctx context.Context, in *PullProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/AddDevice", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/AddPullProxy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) RemoveDevice(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) RemovePullProxy(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/RemoveDevice", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/RemovePullProxy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) UpdateDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) UpdatePullProxy(ctx context.Context, in *PullProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/UpdateDevice", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/UpdatePullProxy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) GetPushProxyList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PushProxyListResponse, error) {
out := new(PushProxyListResponse)
err := c.cc.Invoke(ctx, "/global.api/GetPushProxyList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) AddPushProxy(ctx context.Context, in *PushProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/AddPushProxy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) RemovePushProxy(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/RemovePushProxy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) UpdatePushProxy(ctx context.Context, in *PushProxyInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/UpdatePushProxy", in, out, opts...)
if err != nil {
return nil, err
}
@@ -353,11 +405,30 @@ func (c *apiClient) GetRecording(ctx context.Context, in *emptypb.Empty, opts ..
return out, nil
}
func (c *apiClient) GetPushes(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PushListResponse, error) {
out := new(PushListResponse)
err := c.cc.Invoke(ctx, "/global.api/GetPushes", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) AddPush(ctx context.Context, in *AddPushRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/AddPush", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility
type ApiServer interface {
SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error)
DisabledPlugins(context.Context, *emptypb.Empty) (*DisabledPluginsResponse, error)
Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error)
Shutdown(context.Context, *RequestWithId) (*SuccessResponse, error)
Restart(context.Context, *RequestWithId) (*SuccessResponse, error)
@@ -384,11 +455,17 @@ type ApiServer interface {
GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error)
GetDeviceList(context.Context, *emptypb.Empty) (*DeviceListResponse, error)
AddDevice(context.Context, *DeviceInfo) (*SuccessResponse, error)
RemoveDevice(context.Context, *RequestWithId) (*SuccessResponse, error)
UpdateDevice(context.Context, *DeviceInfo) (*SuccessResponse, error)
GetPullProxyList(context.Context, *emptypb.Empty) (*PullProxyListResponse, error)
AddPullProxy(context.Context, *PullProxyInfo) (*SuccessResponse, error)
RemovePullProxy(context.Context, *RequestWithId) (*SuccessResponse, error)
UpdatePullProxy(context.Context, *PullProxyInfo) (*SuccessResponse, error)
GetPushProxyList(context.Context, *emptypb.Empty) (*PushProxyListResponse, error)
AddPushProxy(context.Context, *PushProxyInfo) (*SuccessResponse, error)
RemovePushProxy(context.Context, *RequestWithId) (*SuccessResponse, error)
UpdatePushProxy(context.Context, *PushProxyInfo) (*SuccessResponse, error)
GetRecording(context.Context, *emptypb.Empty) (*RecordingListResponse, error)
GetPushes(context.Context, *emptypb.Empty) (*PushListResponse, error)
AddPush(context.Context, *AddPushRequest) (*SuccessResponse, error)
mustEmbedUnimplementedApiServer()
}
@@ -399,6 +476,9 @@ type UnimplementedApiServer struct {
func (UnimplementedApiServer) SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SysInfo not implemented")
}
func (UnimplementedApiServer) DisabledPlugins(context.Context, *emptypb.Empty) (*DisabledPluginsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DisabledPlugins not implemented")
}
func (UnimplementedApiServer) Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Summary not implemented")
}
@@ -477,21 +557,39 @@ func (UnimplementedApiServer) GetFormily(context.Context, *GetConfigRequest) (*G
func (UnimplementedApiServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ModifyConfig not implemented")
}
func (UnimplementedApiServer) GetDeviceList(context.Context, *emptypb.Empty) (*DeviceListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetDeviceList not implemented")
func (UnimplementedApiServer) GetPullProxyList(context.Context, *emptypb.Empty) (*PullProxyListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetPullProxyList not implemented")
}
func (UnimplementedApiServer) AddDevice(context.Context, *DeviceInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddDevice not implemented")
func (UnimplementedApiServer) AddPullProxy(context.Context, *PullProxyInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddPullProxy not implemented")
}
func (UnimplementedApiServer) RemoveDevice(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveDevice not implemented")
func (UnimplementedApiServer) RemovePullProxy(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemovePullProxy not implemented")
}
func (UnimplementedApiServer) UpdateDevice(context.Context, *DeviceInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateDevice not implemented")
func (UnimplementedApiServer) UpdatePullProxy(context.Context, *PullProxyInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdatePullProxy not implemented")
}
func (UnimplementedApiServer) GetPushProxyList(context.Context, *emptypb.Empty) (*PushProxyListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetPushProxyList not implemented")
}
func (UnimplementedApiServer) AddPushProxy(context.Context, *PushProxyInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddPushProxy not implemented")
}
func (UnimplementedApiServer) RemovePushProxy(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemovePushProxy not implemented")
}
func (UnimplementedApiServer) UpdatePushProxy(context.Context, *PushProxyInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdatePushProxy not implemented")
}
func (UnimplementedApiServer) GetRecording(context.Context, *emptypb.Empty) (*RecordingListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetRecording not implemented")
}
func (UnimplementedApiServer) GetPushes(context.Context, *emptypb.Empty) (*PushListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetPushes not implemented")
}
func (UnimplementedApiServer) AddPush(context.Context, *AddPushRequest) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddPush not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
@@ -523,6 +621,24 @@ func _Api_SysInfo_Handler(srv interface{}, ctx context.Context, dec func(interfa
return interceptor(ctx, in, info, handler)
}
func _Api_DisabledPlugins_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).DisabledPlugins(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/DisabledPlugins",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).DisabledPlugins(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_Summary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
@@ -991,74 +1107,146 @@ func _Api_ModifyConfig_Handler(srv interface{}, ctx context.Context, dec func(in
return interceptor(ctx, in, info, handler)
}
func _Api_GetDeviceList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_GetPullProxyList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).GetDeviceList(ctx, in)
return srv.(ApiServer).GetPullProxyList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/GetDeviceList",
FullMethod: "/global.api/GetPullProxyList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetDeviceList(ctx, req.(*emptypb.Empty))
return srv.(ApiServer).GetPullProxyList(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_AddDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeviceInfo)
func _Api_AddPullProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PullProxyInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).AddDevice(ctx, in)
return srv.(ApiServer).AddPullProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/AddDevice",
FullMethod: "/global.api/AddPullProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).AddDevice(ctx, req.(*DeviceInfo))
return srv.(ApiServer).AddPullProxy(ctx, req.(*PullProxyInfo))
}
return interceptor(ctx, in, info, handler)
}
func _Api_RemoveDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_RemovePullProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).RemoveDevice(ctx, in)
return srv.(ApiServer).RemovePullProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/RemoveDevice",
FullMethod: "/global.api/RemovePullProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).RemoveDevice(ctx, req.(*RequestWithId))
return srv.(ApiServer).RemovePullProxy(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Api_UpdateDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeviceInfo)
func _Api_UpdatePullProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PullProxyInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).UpdateDevice(ctx, in)
return srv.(ApiServer).UpdatePullProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/UpdateDevice",
FullMethod: "/global.api/UpdatePullProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).UpdateDevice(ctx, req.(*DeviceInfo))
return srv.(ApiServer).UpdatePullProxy(ctx, req.(*PullProxyInfo))
}
return interceptor(ctx, in, info, handler)
}
func _Api_GetPushProxyList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).GetPushProxyList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/GetPushProxyList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetPushProxyList(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_AddPushProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PushProxyInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).AddPushProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/AddPushProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).AddPushProxy(ctx, req.(*PushProxyInfo))
}
return interceptor(ctx, in, info, handler)
}
func _Api_RemovePushProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).RemovePushProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/RemovePushProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).RemovePushProxy(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Api_UpdatePushProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PushProxyInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).UpdatePushProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/UpdatePushProxy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).UpdatePushProxy(ctx, req.(*PushProxyInfo))
}
return interceptor(ctx, in, info, handler)
}
@@ -1081,6 +1269,42 @@ func _Api_GetRecording_Handler(srv interface{}, ctx context.Context, dec func(in
return interceptor(ctx, in, info, handler)
}
func _Api_GetPushes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).GetPushes(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/GetPushes",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetPushes(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_AddPush_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddPushRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).AddPush(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/AddPush",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).AddPush(ctx, req.(*AddPushRequest))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -1092,6 +1316,10 @@ var Api_ServiceDesc = grpc.ServiceDesc{
MethodName: "SysInfo",
Handler: _Api_SysInfo_Handler,
},
{
MethodName: "DisabledPlugins",
Handler: _Api_DisabledPlugins_Handler,
},
{
MethodName: "Summary",
Handler: _Api_Summary_Handler,
@@ -1197,25 +1425,49 @@ var Api_ServiceDesc = grpc.ServiceDesc{
Handler: _Api_ModifyConfig_Handler,
},
{
MethodName: "GetDeviceList",
Handler: _Api_GetDeviceList_Handler,
MethodName: "GetPullProxyList",
Handler: _Api_GetPullProxyList_Handler,
},
{
MethodName: "AddDevice",
Handler: _Api_AddDevice_Handler,
MethodName: "AddPullProxy",
Handler: _Api_AddPullProxy_Handler,
},
{
MethodName: "RemoveDevice",
Handler: _Api_RemoveDevice_Handler,
MethodName: "RemovePullProxy",
Handler: _Api_RemovePullProxy_Handler,
},
{
MethodName: "UpdateDevice",
Handler: _Api_UpdateDevice_Handler,
MethodName: "UpdatePullProxy",
Handler: _Api_UpdatePullProxy_Handler,
},
{
MethodName: "GetPushProxyList",
Handler: _Api_GetPushProxyList_Handler,
},
{
MethodName: "AddPushProxy",
Handler: _Api_AddPushProxy_Handler,
},
{
MethodName: "RemovePushProxy",
Handler: _Api_RemovePushProxy_Handler,
},
{
MethodName: "UpdatePushProxy",
Handler: _Api_UpdatePushProxy_Handler,
},
{
MethodName: "GetRecording",
Handler: _Api_GetRecording_Handler,
},
{
MethodName: "GetPushes",
Handler: _Api_GetPushes_Handler,
},
{
MethodName: "AddPush",
Handler: _Api_AddPush_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "global.proto",

View File

@@ -58,6 +58,7 @@ type (
OnInit() error
OnStop()
Pull(string, config.Pull, *config.Publish)
Push(string, config.Push, *config.Subscribe)
Transform(*Publisher, config.Transform)
OnPublish(*Publisher)
}
@@ -81,9 +82,11 @@ type (
IQUICPlugin interface {
OnQUICConnect(quic.Connection) task.ITask
}
IDevicePlugin interface {
OnDeviceAdd(device *Device) any
IPullProxyPlugin interface {
OnPullProxyAdd(pullProxy *PullProxy) any
}
IPushProxyPlugin interface {
OnPushProxyAdd(pushProxy *PushProxy) any
}
)
@@ -101,7 +104,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
p.Logger = s.Logger.With("plugin", plugin.Name)
upperName := strings.ToUpper(plugin.Name)
if os.Getenv(upperName+"_ENABLE") == "false" {
p.Disabled = true
p.disable("env")
p.Warn("disabled by env")
return
}
@@ -137,6 +140,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
p.Disabled = false
}
if p.Disabled {
p.disable("config")
p.Warn("plugin disabled")
return
} else {
@@ -151,7 +155,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
s.DB, err = gorm.Open(factory(p.config.DSN), &gorm.Config{})
if err != nil {
s.Error("failed to connect database", "error", err, "dsn", s.config.DSN, "type", s.config.DBType)
p.Disabled = true
p.disable(fmt.Sprintf("database %v", err))
return
}
}
@@ -209,13 +213,14 @@ var _ IPlugin = (*Plugin)(nil)
type Plugin struct {
task.Work
Disabled bool
Meta *PluginMeta
config config.Common
config.Config
handler IPlugin
Server *Server
DB *gorm.DB
Disabled bool
Meta *PluginMeta
PushAddr, PlayAddr []string
config config.Common
handler IPlugin
Server *Server
DB *gorm.DB
}
func (Plugin) nothing() {
@@ -256,6 +261,12 @@ func (p *Plugin) settingPath() string {
return filepath.Join(p.Server.SettingDir, strings.ToLower(p.Meta.Name)+".yaml")
}
func (p *Plugin) disable(reason string) {
p.Disabled = true
p.SetDescription("disableReason", reason)
p.Server.disabledPlugins = append(p.Server.disabledPlugins, p)
}
func (p *Plugin) assign() {
f, err := os.Open(p.settingPath())
defer f.Close()
@@ -280,6 +291,7 @@ func (p *Plugin) Start() (err error) {
s.grpcServer.RegisterService(p.Meta.ServiceDesc, p.handler)
if p.Meta.RegisterGRPCHandler != nil {
if err = p.Meta.RegisterGRPCHandler(p.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn); err != nil {
p.disable(fmt.Sprintf("grpc %v", err))
return
} else {
p.Info("grpc handler registered")
@@ -288,9 +300,11 @@ func (p *Plugin) Start() (err error) {
}
s.Plugins.Add(p)
if err = p.listen(); err != nil {
p.disable(fmt.Sprintf("listen %v", err))
return
}
if err = p.handler.OnInit(); err != nil {
p.disable(fmt.Sprintf("init %v", err))
return
}
return
@@ -374,7 +388,7 @@ func (p *Plugin) OnPublish(pub *Publisher) {
if p.Meta.Pusher != nil {
for r, pushConf := range onPublish.Push {
if pushConf.URL = r.Replace(pub.StreamPath, pushConf.URL); pushConf.URL != "" {
p.Push(pub, pushConf)
p.Push(pub.StreamPath, pushConf, nil)
}
}
}
@@ -505,10 +519,9 @@ func (p *Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publi
puller.GetPullJob().Init(puller, p, streamPath, conf, pubConf)
}
func (p *Plugin) Push(pub *Publisher, conf config.Push) {
func (p *Plugin) Push(streamPath string, conf config.Push, subConf *config.Subscribe) {
pusher := p.Meta.Pusher()
job := pusher.GetPushJob().Init(pusher, p, pub.StreamPath, conf)
job.Depend(pub)
pusher.GetPushJob().Init(pusher, p, streamPath, conf, subConf)
}
func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) {

View File

@@ -1,6 +1,7 @@
package plugin_flv
import (
"fmt"
"net"
"net/http"
"strings"
@@ -24,6 +25,22 @@ const defaultConfig m7s.DefaultYaml = `publish:
var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, NewRecorder)
func (plugin *FLVPlugin) OnInit() (err error) {
_, port, _ := strings.Cut(plugin.GetCommonConf().HTTP.ListenAddr, ":")
if port == "80" {
plugin.PlayAddr = append(plugin.PlayAddr, "http://{hostName}/flv/{streamPath}", "ws://{hostName}/flv/{streamPath}")
} else if port != "" {
plugin.PlayAddr = append(plugin.PlayAddr, fmt.Sprintf("http://{hostName}:%s/flv/{streamPath}", port), fmt.Sprintf("ws://{hostName}:%s/flv/{streamPath}", port))
}
_, port, _ = strings.Cut(plugin.GetCommonConf().HTTP.ListenAddrTLS, ":")
if port == "443" {
plugin.PlayAddr = append(plugin.PlayAddr, "https://{hostName}/flv/{streamPath}", "wss://{hostName}/flv/{streamPath}")
} else if port != "" {
plugin.PlayAddr = append(plugin.PlayAddr, fmt.Sprintf("https://{hostName}:%s/flv/{streamPath}", port), fmt.Sprintf("wss://{hostName}:%s/flv/{streamPath}", port))
}
return
}
func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv")
var err error
@@ -79,9 +96,9 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = live.Run()
}
func (plugin *FLVPlugin) OnDeviceAdd(device *m7s.Device) any {
d := &FLVDevice{}
d.Device = device
func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
d := &m7s.HTTPPullProxy{}
d.PullProxy = pullProxy
d.Plugin = &plugin.Plugin
return d
}

View File

@@ -1,7 +0,0 @@
package flv
import (
"m7s.live/v5"
)
type FLVDevice = m7s.HTTPDevice

View File

@@ -28,7 +28,7 @@ type Channel struct {
RecordReqs util.Collection[int, *RecordRequest]
*slog.Logger
gb28181.ChannelInfo
AbstractDevice *m7s.Device
AbstractDevice *m7s.PullProxy
}
func (c *Channel) GetKey() string {

View File

@@ -116,11 +116,11 @@ func (gb *GB28181Plugin) OnInit() (err error) {
return
}
func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) any {
deviceID, channelID, _ := strings.Cut(device.URL, "/")
func (p *GB28181Plugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
deviceID, channelID, _ := strings.Cut(pullProxy.URL, "/")
if d, ok := p.devices.Get(deviceID); ok {
if channel, ok := d.channels.Get(channelID); ok {
channel.AbstractDevice = device
channel.AbstractDevice = pullProxy
return channel
}
}
@@ -336,12 +336,12 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) {
d.OnStart(func() {
gb.devices.Add(d)
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.Devices.Find(func(absDevice *m7s.Device) bool {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool {
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID)
}); ok {
c.AbstractDevice = absDevice
absDevice.Handler = c
absDevice.ChangeStatus(m7s.DeviceStatusOnline)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
if gb.AutoInvite {
gb.Pull(fmt.Sprintf("%s/%s", d.ID, c.DeviceID), config.Pull{
@@ -355,7 +355,7 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) {
if gb.devices.RemoveByKey(d.ID) {
for c := range d.channels.Range {
if c.AbstractDevice != nil {
c.AbstractDevice.ChangeStatus(m7s.DeviceStatusOffline)
c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}

View File

@@ -2,12 +2,14 @@ package plugin_hls
import (
"embed"
"m7s.live/v5/pkg/util"
"fmt"
"net/http"
"path"
"strings"
"time"
"m7s.live/v5/pkg/util"
"m7s.live/v5"
hls "m7s.live/v5/plugin/hls/pkg"
)
@@ -21,9 +23,25 @@ type HLSPlugin struct {
m7s.Plugin
}
func (p *HLSPlugin) OnDeviceAdd(device *m7s.Device) any {
d := &hls.HLSDevice{}
d.Device = device
func (p *HLSPlugin) OnInit() (err error) {
_, port, _ := strings.Cut(p.GetCommonConf().HTTP.ListenAddr, ":")
if port == "80" {
p.PlayAddr = append(p.PlayAddr, "http://{hostName}/hls/{streamPath}.m3u8")
} else if port != "" {
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("http://{hostName}:%s/hls/{streamPath}.m3u8", port))
}
_, port, _ = strings.Cut(p.GetCommonConf().HTTP.ListenAddrTLS, ":")
if port == "443" {
p.PlayAddr = append(p.PlayAddr, "https://{hostName}/hls/{streamPath}.m3u8")
} else if port != "" {
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("https://{hostName}:%s/hls/{streamPath}.m3u8", port))
}
return
}
func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
d := &m7s.HTTPPullProxy{}
d.PullProxy = pullProxy
d.Plugin = &p.Plugin
return d
}

View File

@@ -35,6 +35,22 @@ type LLHLSPlugin struct {
Plugin
}
func (c *LLHLSPlugin) OnInit() (err error) {
_, port, _ := strings.Cut(c.GetCommonConf().HTTP.ListenAddr, ":")
if port == "80" {
c.PlayAddr = append(c.PlayAddr, "http://{hostName}/llhls/{streamPath}/index.m3u8")
} else if port != "" {
c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("http://{hostName}:%s/llhls/{streamPath}/index.m3u8", port))
}
_, port, _ = strings.Cut(c.GetCommonConf().HTTP.ListenAddrTLS, ":")
if port == "443" {
c.PlayAddr = append(c.PlayAddr, "https://{hostName}/llhls/{streamPath}/index.m3u8")
} else if port != "" {
c.PlayAddr = append(c.PlayAddr, fmt.Sprintf("https://{hostName}:%s/llhls/{streamPath}/index.m3u8", port))
}
return
}
func (c *LLHLSPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, ".html") {
w.Write([]byte(`<html><body><video src="/llhls/` + strings.TrimSuffix(r.URL.Path, ".html") + `/index.m3u8"></video></body></html>`))

View File

@@ -1,5 +0,0 @@
package hls
import "m7s.live/v5"
type HLSDevice = m7s.HTTPDevice

View File

@@ -1,6 +1,7 @@
package plugin_mp4
import (
"fmt"
"io"
"net"
"net/http"
@@ -103,6 +104,18 @@ func (p *MP4Plugin) OnInit() (err error) {
// p.SendToThirdPartyAPI(exception)
// }
// }()
_, port, _ := strings.Cut(p.GetCommonConf().HTTP.ListenAddr, ":")
if port == "80" {
p.PlayAddr = append(p.PlayAddr, "http://{hostName}/mp4/{streamPath}.mp4")
} else if port != "" {
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("http://{hostName}:%s/mp4/{streamPath}.mp4", port))
}
_, port, _ = strings.Cut(p.GetCommonConf().HTTP.ListenAddrTLS, ":")
if port == "443" {
p.PlayAddr = append(p.PlayAddr, "https://{hostName}/mp4/{streamPath}.mp4")
} else if port != "" {
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("https://{hostName}:%s/mp4/{streamPath}.mp4", port))
}
return
}
func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {

View File

@@ -10,6 +10,6 @@ import (
func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) {
pusher := rtmp.NewPusher()
err = pusher.GetPushJob().Init(pusher, &r.Plugin, req.StreamPath, config.Push{URL: req.RemoteURL}).WaitStarted()
err = pusher.GetPushJob().Init(pusher, &r.Plugin, req.StreamPath, config.Push{URL: req.RemoteURL}, nil).WaitStarted()
return &gpb.SuccessResponse{}, err
}

View File

@@ -1,57 +0,0 @@
package plugin_rtmp
import (
"fmt"
"net"
"net/url"
"time"
"m7s.live/v5"
)
type RTMPDevice struct {
m7s.DeviceTask
tcpAddr *net.TCPAddr
url *url.URL
}
func (d *RTMPDevice) Start() (err error) {
d.url, err = url.Parse(d.Device.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.url.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.url.Hostname())
} else {
d.tcpAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.url.Port()))
if err != nil {
return err
}
if d.tcpAddr.Port == 0 {
if d.url.Scheme == "rtmps" {
d.tcpAddr.Port = 443
} else {
d.tcpAddr.Port = 1935
}
}
}
return d.DeviceTask.Start()
}
func (d *RTMPDevice) GetTickInterval() time.Duration {
return time.Second * 5
}
func (d *RTMPDevice) Tick(any) {
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.tcpAddr)
if err != nil {
d.Device.ChangeStatus(m7s.DeviceStatusOffline)
return
}
conn.Close()
d.Device.RTT = time.Since(startTime)
d.Device.ChangeStatus(m7s.DeviceStatusOnline)
}

View File

@@ -2,8 +2,10 @@ package plugin_rtmp
import (
"errors"
"fmt"
"io"
"net"
"strings"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
@@ -180,10 +182,42 @@ func (task *RTMPServer) Go() (err error) {
return
}
func (p *RTMPPlugin) OnDeviceAdd(device *m7s.Device) any {
ret := &RTMPDevice{}
ret.Device = device
func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
ret := &RTMPPullProxy{}
ret.PullProxy = pullProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("device", device.Name)
ret.Logger = p.With("pullProxy", pullProxy.Name)
return ret
}
func (p *RTMPPlugin) OnPushProxyAdd(pushProxy *m7s.PushProxy) any {
ret := &RTMPPushProxy{}
ret.PushProxy = pushProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pushProxy", pushProxy.Name)
return ret
}
func (p *RTMPPlugin) OnInit() (err error) {
if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" {
_, port, _ := strings.Cut(tcpAddr, ":")
if port == "1935" {
p.PushAddr = append(p.PushAddr, "rtmp://{hostName}/{streamPath}")
p.PlayAddr = append(p.PlayAddr, "rtmp://{hostName}/{streamPath}")
} else {
p.PushAddr = append(p.PushAddr, fmt.Sprintf("rtmp://{hostName}:%s/{streamPath}", port))
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("rtmp://{hostName}:%s/{streamPath}", port))
}
}
if tcpAddrTLS := p.GetCommonConf().TCP.ListenAddrTLS; tcpAddrTLS != "" {
_, port, _ := strings.Cut(tcpAddrTLS, ":")
if port == "443" {
p.PushAddr = append(p.PushAddr, "rtmps://{hostName}/{streamPath}")
p.PlayAddr = append(p.PlayAddr, "rtmps://{hostName}/{streamPath}")
} else {
p.PushAddr = append(p.PushAddr, fmt.Sprintf("rtmps://{hostName}:%s/{streamPath}", port))
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("rtmps://{hostName}:%s/{streamPath}", port))
}
}
return
}

38
plugin/rtmp/pull-proxy.go Normal file
View File

@@ -0,0 +1,38 @@
package plugin_rtmp
import (
"fmt"
"net"
"net/url"
"m7s.live/v5"
)
type RTMPPullProxy struct {
m7s.TCPPullProxy
}
func (d *RTMPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
if d.URL.Scheme == "rtmps" {
d.TCPAddr.Port = 443
} else {
d.TCPAddr.Port = 1935
}
}
}
return d.TCPPullProxy.Start()
}

38
plugin/rtmp/push-proxy.go Normal file
View File

@@ -0,0 +1,38 @@
package plugin_rtmp
import (
"fmt"
"net"
"net/url"
"m7s.live/v5"
)
type RTMPPushProxy struct {
m7s.TCPPushProxy
}
func (d *RTMPPushProxy) Start() (err error) {
d.URL, err = url.Parse(d.PushProxy.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
if d.URL.Scheme == "rtmps" {
d.TCPAddr.Port = 443
} else {
d.TCPAddr.Port = 1935
}
}
}
return d.TCPPushProxy.Start()
}

View File

@@ -1,45 +0,0 @@
package plugin_rtsp
import (
"time"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
. "m7s.live/v5/plugin/rtsp/pkg"
)
type RTSPDevice struct {
m7s.DeviceTask
conn Stream
}
func (d *RTSPDevice) Start() (err error) {
d.conn.NetConnection = &NetConnection{
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version,
}
d.conn.Logger = d.Plugin.Logger
return d.TickTask.Start()
}
func (d *RTSPDevice) GetTickInterval() time.Duration {
return time.Second * 5
}
func (d *RTSPDevice) Tick(any) {
switch d.Device.Status {
case m7s.DeviceStatusOffline:
err := d.conn.Connect(d.Device.URL)
if err != nil {
return
}
d.Device.ChangeStatus(m7s.DeviceStatusOnline)
case m7s.DeviceStatusOnline, m7s.DeviceStatusPulling:
t := time.Now()
err := d.conn.Options()
d.Device.RTT = time.Since(t)
if err != nil {
d.Device.ChangeStatus(m7s.DeviceStatusOffline)
}
}
}

View File

@@ -1,7 +1,9 @@
package plugin_rtsp
import (
"fmt"
"net"
"strings"
"m7s.live/v5/pkg/task"
@@ -24,10 +26,32 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
return ret
}
func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) any {
ret := &RTSPDevice{}
ret.Device = device
func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
ret := &RTSPPullProxy{}
ret.PullProxy = pullProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("device", device.Name)
ret.Logger = p.With("pullProxy", pullProxy.Name)
return ret
}
func (p *RTSPPlugin) OnPushProxyAdd(pushProxy *m7s.PushProxy) any {
ret := &RTSPPushProxy{}
ret.PushProxy = pushProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pushProxy", pushProxy.Name)
return ret
}
func (p *RTSPPlugin) OnInit() (err error) {
if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" {
_, port, _ := strings.Cut(tcpAddr, ":")
if port == "554" {
p.PlayAddr = append(p.PlayAddr, "rtsp://{hostName}/{streamPath}")
p.PushAddr = append(p.PushAddr, "rtsp://{hostName}/{streamPath}")
} else {
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("rtsp://{hostName}:%s/{streamPath}", port))
p.PushAddr = append(p.PushAddr, fmt.Sprintf("rtsp://{hostName}:%s/{streamPath}", port))
}
}
return
}

66
plugin/rtsp/pull-proxy.go Normal file
View File

@@ -0,0 +1,66 @@
package plugin_rtsp
import (
"fmt"
"net"
"net/url"
"time"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
. "m7s.live/v5/plugin/rtsp/pkg"
)
type RTSPPullProxy struct {
m7s.TCPPullProxy
conn Stream
}
func (d *RTSPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
d.TCPAddr.Port = 554 // Default RTSP port
}
}
d.conn.NetConnection = &NetConnection{
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version,
}
d.conn.Logger = d.Plugin.Logger
return d.TCPPullProxy.Start()
}
func (d *RTSPPullProxy) GetTickInterval() time.Duration {
return time.Second * 5
}
func (d *RTSPPullProxy) Tick(any) {
switch d.PullProxy.Status {
case m7s.PullProxyStatusOffline:
err := d.conn.Connect(d.PullProxy.URL)
if err != nil {
return
}
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline)
case m7s.PullProxyStatusOnline, m7s.PullProxyStatusPulling:
t := time.Now()
err := d.conn.Options()
d.PullProxy.RTT = time.Since(t)
if err != nil {
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}

62
plugin/rtsp/push-proxy.go Normal file
View File

@@ -0,0 +1,62 @@
package plugin_rtsp
import (
"fmt"
"net"
"net/url"
"time"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
. "m7s.live/v5/plugin/rtsp/pkg"
)
type RTSPPushProxy struct {
m7s.TCPPushProxy
conn Stream
}
func (d *RTSPPushProxy) Start() (err error) {
d.URL, err = url.Parse(d.PushProxy.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
d.TCPAddr.Port = 554 // Default RTSP port
}
}
d.conn.NetConnection = &NetConnection{
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version,
}
d.conn.Logger = d.Plugin.Logger
return d.TCPPushProxy.Start()
}
func (d *RTSPPushProxy) Tick(any) {
switch d.PushProxy.Status {
case m7s.PushProxyStatusOffline:
err := d.conn.Connect(d.PushProxy.URL)
if err != nil {
return
}
d.PushProxy.ChangeStatus(m7s.PushProxyStatusOnline)
case m7s.PushProxyStatusOnline, m7s.PushProxyStatusPushing:
t := time.Now()
err := d.conn.Options()
d.PushProxy.RTT = time.Since(t)
if err != nil {
d.PushProxy.ChangeStatus(m7s.PushProxyStatusOffline)
}
}
}

View File

@@ -1,6 +1,7 @@
package plugin_srt
import (
"fmt"
"strings"
srt "github.com/datarhei/gosrt"
@@ -30,6 +31,14 @@ func (p *SRTPlugin) OnInit() error {
t.server.Addr = p.ListenAddr
t.plugin = p
p.AddTask(&t)
_, port, _ := strings.Cut(p.ListenAddr, ":")
if port == "6000" {
p.PushAddr = append(p.PushAddr, "srt://{hostName}?streamid=publish:/{streamPath}")
p.PlayAddr = append(p.PlayAddr, "srt://{hostName}?streamid=subscribe:/{streamPath}")
} else if port != "" {
p.PushAddr = append(p.PushAddr, fmt.Sprintf("srt://{hostName}:%s?streamid=publish:/{streamPath}", port))
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("srt://{hostName}:%s?streamid=subscribe:/{streamPath}", port))
}
return nil
}

View File

@@ -43,7 +43,7 @@ func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pu
if i := r.pushers.Length; count > i {
for j := i; j < count; j++ {
p := pusher()
ctx := p.GetPushJob().Init(p, &r.Plugin, streamPath, config.Push{URL: fmt.Sprintf(format, remoteHost, j)})
ctx := p.GetPushJob().Init(p, &r.Plugin, streamPath, config.Push{URL: fmt.Sprintf(format, remoteHost, j)}, nil)
if err = ctx.WaitStarted(); err != nil {
return
}

View File

@@ -2,14 +2,15 @@ package plugin_vmlog
import (
"fmt"
"log/slog"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"log/slog"
"m7s.live/v5"
"net/http"
//"m7s.live/m7s/v5/plugin/logrotate/pb"
)
@@ -37,7 +38,7 @@ func (config *VmLogPlugin) OnInit() (err error) {
vlinsert.Init()
config.handler, err = NewVmLogHandler(nil, nil)
if err == nil {
config.AddLogHandler(config.handler)
config.Server.LogHandler.Add(config.handler)
}
return
}

View File

@@ -136,6 +136,23 @@ func (p *WebRTCPlugin) OnInit() (err error) {
}
p.api = NewAPI(WithMediaEngine(&p.m),
WithInterceptorRegistry(i), WithSettingEngine(p.s))
_, port, _ := strings.Cut(p.GetCommonConf().HTTP.ListenAddr, ":")
if port == "80" {
p.PushAddr = append(p.PushAddr, "http://{hostName}/webrtc/push")
p.PlayAddr = append(p.PlayAddr, "http://{hostName}/webrtc/play")
} else if port != "" {
p.PushAddr = append(p.PushAddr, fmt.Sprintf("http://{hostName}:%s/webrtc/push", port))
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("http://{hostName}:%s/webrtc/play", port))
}
_, port, _ = strings.Cut(p.GetCommonConf().HTTP.ListenAddrTLS, ":")
if port == "443" {
p.PushAddr = append(p.PushAddr, "https://{hostName}/webrtc/push")
p.PlayAddr = append(p.PlayAddr, "https://{hostName}/webrtc/play")
} else if port != "" {
p.PushAddr = append(p.PushAddr, fmt.Sprintf("https://{hostName}:%s/webrtc/push", port))
p.PlayAddr = append(p.PlayAddr, fmt.Sprintf("https://{hostName}:%s/webrtc/play", port))
}
return
}

View File

@@ -143,7 +143,7 @@ type Publisher struct {
GOP int
OnSeek func(time.Time)
OnGetPosition func() time.Time
Device *Device
PullProxy *PullProxy
dumpFile *os.File
}
@@ -189,14 +189,14 @@ func (p *Publisher) Start() (err error) {
}
s.Streams.Set(p)
p.Info("publish")
if device, ok := s.Devices.Find(func(device *Device) bool {
return device.GetStreamPath() == p.StreamPath
if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool {
return pullProxy.GetStreamPath() == p.StreamPath
}); ok {
p.Device = device
if device.Status == DeviceStatusOnline {
device.ChangeStatus(DeviceStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && device.FilePath != "" {
mp4Plugin.Record(p, device.Record, nil)
p.PullProxy = pullProxy
if pullProxy.Status == PullProxyStatusOnline {
pullProxy.ChangeStatus(PullProxyStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" {
mp4Plugin.Record(p, pullProxy.Record, nil)
}
}
}
@@ -627,8 +627,8 @@ func (p *Publisher) Dispose() {
p.dumpFile.Close()
}
p.State = PublisherStateDisposed
if p.Device != nil && p.Device.Status == DeviceStatusPulling && s.Devices.Has(p.Device.GetKey()) {
p.Device.ChangeStatus(DeviceStatusOnline)
if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) {
p.PullProxy.ChangeStatus(PullProxyStatusOnline)
}
}

178
pull-proxy.go Normal file
View File

@@ -0,0 +1,178 @@
package m7s
import (
"fmt"
"net"
"net/url"
"strings"
"time"
"gorm.io/gorm"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
const (
PullProxyStatusOffline byte = iota
PullProxyStatusOnline
PullProxyStatusPulling
PullProxyStatusDisabled
)
type (
IPullProxy interface {
Pull()
}
PullProxy struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"`
Name string
StreamPath string
PullOnStart, Audio, StopOnIdle bool
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
config.Record `gorm:"embedded;embeddedPrefix:record_"`
ParentID uint
Type string
Status byte
Description string
RTT time.Duration
Handler IPullProxy `gorm:"-:all" yaml:"-"`
}
PullProxyManager struct {
task.Manager[uint, *PullProxy]
}
PullProxyTask struct {
task.TickTask
PullProxy *PullProxy
Plugin *Plugin
}
HTTPPullProxy struct {
TCPPullProxy
}
TCPPullProxy struct {
PullProxyTask
TCPAddr *net.TCPAddr
URL *url.URL
}
)
func (d *PullProxy) GetKey() uint {
return d.ID
}
func (d *PullProxy) GetStreamPath() string {
if d.StreamPath == "" {
return fmt.Sprintf("pull/%s/%d", d.Type, d.ID)
}
return d.StreamPath
}
func (d *PullProxy) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
pullTask := pullPlugin.OnPullProxyAdd(d)
if pullTask == nil {
continue
}
if pullTask, ok := pullTask.(IPullProxy); ok {
d.Handler = pullTask
}
if t, ok := pullTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(PullProxyStatusOnline)
}
}
}
return
}
func (d *PullProxy) ChangeStatus(status byte) {
if d.Status == status {
return
}
from := d.Status
d.Info("device status changed", "from", from, "to", status)
d.Status = status
d.Update()
switch status {
case PullProxyStatusOnline:
if d.PullOnStart && from == PullProxyStatusOffline {
d.Handler.Pull()
}
}
}
func (d *PullProxy) Update() {
if d.server.DB != nil {
d.server.DB.Omit("deleted_at").Save(d)
}
}
func (d *PullProxyTask) Dispose() {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
d.TickTask.Dispose()
d.Plugin.Server.Streams.Call(func() error {
if stream, ok := d.Plugin.Server.Streams.Get(d.PullProxy.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser)
}
return nil
})
}
func (d *PullProxyTask) Pull() {
var pubConf = d.Plugin.config.Publish
pubConf.PubAudio = d.PullProxy.Audio
pubConf.DelayCloseTimeout = util.Conditional(d.PullProxy.StopOnIdle, time.Second*5, 0)
d.Plugin.handler.Pull(d.PullProxy.GetStreamPath(), d.PullProxy.Pull, &pubConf)
}
func (d *HTTPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
if d.URL.Scheme == "https" || d.URL.Scheme == "wss" {
d.TCPAddr.Port = 443
} else {
d.TCPAddr.Port = 80
}
}
}
return d.PullProxyTask.Start()
}
func (d *TCPPullProxy) GetTickInterval() time.Duration {
return time.Second * 10
}
func (d *TCPPullProxy) Tick(any) {
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
return
}
conn.Close()
d.PullProxy.RTT = time.Since(startTime)
d.PullProxy.ChangeStatus(PullProxyStatusOnline)
}

141
push-proxy.go Normal file
View File

@@ -0,0 +1,141 @@
package m7s
import (
"fmt"
"net"
"net/url"
"strings"
"time"
"gorm.io/gorm"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
)
const (
PushProxyStatusOffline byte = iota
PushProxyStatusOnline
PushProxyStatusPushing
PushProxyStatusDisabled
)
type (
IPushProxy interface {
Push()
}
PushProxy struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"`
Name string
StreamPath string
Audio, PushOnStart bool
config.Push `gorm:"embedded;embeddedPrefix:push_"`
ParentID uint
Type string
Status byte
Description string
RTT time.Duration
Handler IPushProxy `gorm:"-:all" yaml:"-"`
}
PushProxyManager struct {
task.Manager[uint, *PushProxy]
}
PushProxyTask struct {
task.TickTask
PushProxy *PushProxy
Plugin *Plugin
}
TCPPushProxy struct {
PushProxyTask
TCPAddr *net.TCPAddr
URL *url.URL
}
)
func (d *PushProxy) GetKey() uint {
return d.ID
}
func (d *PushProxy) GetStreamPath() string {
if d.StreamPath == "" {
return fmt.Sprintf("push/%s/%d", d.Type, d.ID)
}
return d.StreamPath
}
func (d *PushProxy) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if pushPlugin, ok := plugin.handler.(IPushProxyPlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
pushTask := pushPlugin.OnPushProxyAdd(d)
if pushTask == nil {
continue
}
if pushTask, ok := pushTask.(IPushProxy); ok {
d.Handler = pushTask
}
if t, ok := pushTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(PushProxyStatusOnline)
}
}
}
return
}
func (d *PushProxy) ChangeStatus(status byte) {
if d.Status == status {
return
}
from := d.Status
d.Info("device status changed", "from", from, "to", status)
d.Status = status
d.Update()
switch status {
case PushProxyStatusOnline:
if d.PushOnStart && from == PushProxyStatusOffline {
d.Handler.Push()
}
}
}
func (d *PushProxy) Update() {
if d.server.DB != nil {
d.server.DB.Omit("deleted_at").Save(d)
}
}
func (d *PushProxyTask) Dispose() {
d.PushProxy.ChangeStatus(PushProxyStatusOffline)
d.TickTask.Dispose()
}
func (d *PushProxyTask) Push() {
var subConf = d.Plugin.config.Subscribe
subConf.SubAudio = d.PushProxy.Audio
d.Plugin.handler.Push(d.PushProxy.GetStreamPath(), d.PushProxy.Push, &subConf)
}
func (d *TCPPushProxy) GetTickInterval() time.Duration {
return time.Second * 10
}
func (d *TCPPushProxy) Tick(any) {
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil {
d.PushProxy.ChangeStatus(PushProxyStatusOffline)
return
}
conn.Close()
d.PushProxy.RTT = time.Since(startTime)
d.PushProxy.ChangeStatus(PushProxyStatusOnline)
}

View File

@@ -19,6 +19,7 @@ type Pusher = func() IPusher
type PushJob struct {
Connection
Subscriber *Subscriber
SubConf *config.Subscribe
pusher IPusher
}
@@ -26,9 +27,10 @@ func (p *PushJob) GetKey() string {
return p.Connection.RemoteURL
}
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push) *PushJob {
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push, subConf *config.Subscribe) *PushJob {
p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, http.Header(conf.Header))
p.pusher = pusher
p.SubConf = subConf
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
@@ -41,7 +43,11 @@ func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf c
}
func (p *PushJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.pusher.GetTask().Context, p.StreamPath)
if p.SubConf != nil {
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.pusher.GetTask().Context, p.StreamPath, *p.SubConf)
} else {
p.Subscriber, err = p.Plugin.Subscribe(p.pusher.GetTask().Context, p.StreamPath)
}
if p.Subscriber != nil {
p.Subscriber.Internal = true
}

0
scripts/protoc_global.sh Normal file → Executable file
View File

View File

@@ -59,7 +59,7 @@ type (
PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
StreamAlias map[config.Regexp]string `desc:"流别名"`
Device []*Device
Device []*PullProxy
}
WaitStream struct {
StreamPath string
@@ -77,7 +77,8 @@ type (
Pushs task.Manager[string, *PushJob]
Records task.Manager[string, *RecordJob]
Transforms Transforms
Devices DeviceManager
PullProxies PullProxyManager
PushProxies PushProxyManager
Subscribers SubscriberCollection
LogHandler MultiLogHandler
apiList []string
@@ -87,6 +88,7 @@ type (
lastSummary *pb.SummaryResponse
conf any
configFileContent []byte
disabledPlugins []*Plugin
prometheusDesc prometheusDesc
}
CheckSubWaitTimeout struct {
@@ -107,7 +109,8 @@ func (w *WaitStream) GetKey() string {
func NewServer(conf any) (s *Server) {
s = &Server{
conf: conf,
conf: conf,
disabledPlugins: make([]*Plugin, 0),
}
s.ID = task.GetNextTaskID()
s.Meta = &serverMeta
@@ -266,7 +269,7 @@ func (s *Server) Start() (err error) {
s.AddTask(&s.Pulls)
s.AddTask(&s.Pushs)
s.AddTask(&s.Transforms)
s.AddTask(&s.Devices)
s.AddTask(&s.PullProxies)
promReg := prometheus.NewPedanticRegistry()
promReg.MustRegister(s)
for _, plugin := range plugins {
@@ -309,7 +312,7 @@ func (s *Server) Start() (err error) {
}
}
if s.DB != nil {
s.DB.AutoMigrate(&Device{})
s.DB.AutoMigrate(&PullProxy{})
}
for _, d := range s.Device {
if d.ID != 0 {
@@ -338,18 +341,18 @@ func (s *Server) Start() (err error) {
if s.DB != nil {
s.DB.Save(d)
} else {
s.Devices.Add(d, s.Logger.With("device", d.ID, "type", d.Type, "name", d.Name))
s.PullProxies.Add(d, s.Logger.With("pullProxy", d.ID, "type", d.Type, "name", d.Name))
}
}
}
if s.DB != nil {
var devices []*Device
s.DB.Find(&devices)
for _, d := range devices {
var pullProxies []*PullProxy
s.DB.Find(&pullProxies)
for _, d := range pullProxies {
d.server = s
d.Logger = s.Logger.With("device", d.ID, "type", d.Type, "name", d.Name)
d.ChangeStatus(DeviceStatusOffline)
s.Devices.Add(d)
d.Logger = s.Logger.With("pullProxy", d.ID, "type", d.Type, "name", d.Name)
d.ChangeStatus(PullProxyStatusOffline)
s.PullProxies.Add(d)
}
}
return nil
@@ -397,9 +400,9 @@ func (s *Server) OnSubscribe(streamPath string, args url.Values) {
for plugin := range s.Plugins.Range {
plugin.OnSubscribe(streamPath, args)
}
for device := range s.Devices.Range {
if device.Status == DeviceStatusOnline && device.GetStreamPath() == streamPath && !device.PullOnStart {
device.Handler.Pull()
for pullProxy := range s.PullProxies.Range {
if pullProxy.Status == PullProxyStatusOnline && pullProxy.GetStreamPath() == streamPath && !pullProxy.PullOnStart {
pullProxy.Handler.Pull()
}
}
}