mirror of
				https://github.com/langhuihui/monibuca.git
				synced 2025-10-31 18:22:34 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			945 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			945 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package m7s
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"reflect"
 | |
| 	"runtime"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"m7s.live/v5/pkg/task"
 | |
| 
 | |
| 	myip "github.com/husanpao/ip"
 | |
| 	"github.com/shirou/gopsutil/v4/cpu"
 | |
| 	"github.com/shirou/gopsutil/v4/disk"
 | |
| 	"github.com/shirou/gopsutil/v4/mem"
 | |
| 	. "github.com/shirou/gopsutil/v4/net"
 | |
| 	"google.golang.org/protobuf/types/known/durationpb"
 | |
| 	"google.golang.org/protobuf/types/known/emptypb"
 | |
| 	"google.golang.org/protobuf/types/known/timestamppb"
 | |
| 	"gopkg.in/yaml.v3"
 | |
| 	"m7s.live/v5/pb"
 | |
| 	"m7s.live/v5/pkg"
 | |
| 	"m7s.live/v5/pkg/config"
 | |
| 	"m7s.live/v5/pkg/util"
 | |
| )
 | |
| 
 | |
| var localIP string
 | |
| 
 | |
| func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoResponse, err error) {
 | |
| 	if localIP == "" {
 | |
| 		localIP = myip.LocalIP()
 | |
| 		// if conn, err := net.Dial("udp", "114.114.114.114:80"); err == nil {
 | |
| 		// 	localIP, _, _ = strings.Cut(conn.LocalAddr().String(), ":")
 | |
| 		// }
 | |
| 	}
 | |
| 	res = &pb.SysInfoResponse{
 | |
| 		Code:    0,
 | |
| 		Message: "success",
 | |
| 		Data: &pb.SysInfoData{
 | |
| 			Version:   Version,
 | |
| 			LocalIP:   localIP,
 | |
| 			PublicIP:  util.GetPublicIP(""),
 | |
| 			StartTime: timestamppb.New(s.StartTime),
 | |
| 			GoVersion: runtime.Version(),
 | |
| 			Os:        runtime.GOOS,
 | |
| 			Arch:      runtime.GOARCH,
 | |
| 			Cpus:      int32(runtime.NumCPU()),
 | |
| 		},
 | |
| 	}
 | |
| 	for p := range s.Plugins.Range {
 | |
| 		res.Data.Plugins = append(res.Data.Plugins, &pb.PluginInfo{
 | |
| 			Name:        p.Meta.Name,
 | |
| 			PushAddr:    p.PushAddr,
 | |
| 			PlayAddr:    p.PlayAddr,
 | |
| 			Description: p.GetDescriptions(),
 | |
| 		})
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) DisabledPlugins(ctx context.Context, _ *emptypb.Empty) (res *pb.DisabledPluginsResponse, err error) {
 | |
| 	res = &pb.DisabledPluginsResponse{
 | |
| 		Data: make([]*pb.PluginInfo, len(s.disabledPlugins)),
 | |
| 	}
 | |
| 	for i, p := range s.disabledPlugins {
 | |
| 		res.Data[i] = &pb.PluginInfo{
 | |
| 			Name:        p.Meta.Name,
 | |
| 			Description: p.GetDescriptions(),
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // /api/stream/annexb/{streamPath}
 | |
| func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
 | |
| 	publisher, ok := s.Streams.SafeGet(r.PathValue("streamPath"))
 | |
| 	if !ok || publisher.VideoTrack.AVTrack == nil {
 | |
| 		http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound)
 | |
| 		return
 | |
| 	}
 | |
| 	err := publisher.VideoTrack.WaitReady()
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 	rw.Header().Set("Content-Type", "application/octet-stream")
 | |
| 	reader := pkg.NewAVRingReader(publisher.VideoTrack.AVTrack, "Origin")
 | |
| 	err = reader.StartRead(publisher.VideoTrack.GetIDR())
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 	defer reader.StopRead()
 | |
| 	var annexb *pkg.AnnexB
 | |
| 	var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.AVTrack, nil)
 | |
| 	annexb, err = converter.ConvertFromAVFrame(&reader.Value)
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 	annexb.WriteTo(rw)
 | |
| }
 | |
| 
 | |
| func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err error) {
 | |
| 	tmp, _ := json.Marshal(pub.GetDescriptions())
 | |
| 	res = &pb.StreamInfoResponse{
 | |
| 		Data: &pb.StreamInfo{
 | |
| 			Meta:      string(tmp),
 | |
| 			Path:      pub.StreamPath,
 | |
| 			State:     int32(pub.State),
 | |
| 			StartTime: timestamppb.New(pub.StartTime),
 | |
| 			// Subscribers: int32(pub.Subscribers.Length),
 | |
| 			PluginName: pub.Plugin.Meta.Name,
 | |
| 			Type:       pub.Type,
 | |
| 			Speed:      float32(pub.Speed),
 | |
| 			StopOnIdle: pub.DelayCloseTimeout > 0,
 | |
| 			IsPaused:   pub.Paused != nil,
 | |
| 			Gop:        int32(pub.GOP),
 | |
| 			BufferTime: durationpb.New(pub.BufferTime),
 | |
| 		},
 | |
| 	}
 | |
| 	var audioBpsOut, videoBpsOut uint32
 | |
| 	var serverSubCount int32
 | |
| 	for sub := range pub.Subscribers.Range {
 | |
| 		if sub.AudioReader != nil {
 | |
| 			audioBpsOut += sub.AudioReader.BPS
 | |
| 		}
 | |
| 		if sub.VideoReader != nil {
 | |
| 			videoBpsOut += sub.VideoReader.BPS
 | |
| 		}
 | |
| 		if sub.Type == SubscribeTypeServer {
 | |
| 			serverSubCount++
 | |
| 		}
 | |
| 	}
 | |
| 	res.Data.Subscribers = serverSubCount
 | |
| 	if t := pub.AudioTrack.AVTrack; t != nil {
 | |
| 		if t.ICodecCtx != nil {
 | |
| 			res.Data.AudioTrack = &pb.AudioTrackInfo{
 | |
| 				Codec:  t.FourCC().String(),
 | |
| 				Meta:   t.GetInfo(),
 | |
| 				Bps:    uint32(t.BPS),
 | |
| 				BpsOut: audioBpsOut,
 | |
| 				Fps:    uint32(t.FPS),
 | |
| 				Delta:  pub.AudioTrack.Delta.String(),
 | |
| 			}
 | |
| 			res.Data.AudioTrack.SampleRate = uint32(t.ICodecCtx.(pkg.IAudioCodecCtx).GetSampleRate())
 | |
| 			res.Data.AudioTrack.Channels = uint32(t.ICodecCtx.(pkg.IAudioCodecCtx).GetChannels())
 | |
| 		}
 | |
| 	}
 | |
| 	if t := pub.VideoTrack.AVTrack; t != nil {
 | |
| 		if t.ICodecCtx != nil {
 | |
| 			res.Data.VideoTrack = &pb.VideoTrackInfo{
 | |
| 				Codec:  t.FourCC().String(),
 | |
| 				Meta:   t.GetInfo(),
 | |
| 				Bps:    uint32(t.BPS),
 | |
| 				BpsOut: videoBpsOut,
 | |
| 				Fps:    uint32(t.FPS),
 | |
| 				Delta:  pub.VideoTrack.Delta.String(),
 | |
| 				Gop:    uint32(pub.GOP),
 | |
| 			}
 | |
| 			res.Data.VideoTrack.Width = uint32(t.ICodecCtx.(pkg.IVideoCodecCtx).Width())
 | |
| 			res.Data.VideoTrack.Height = uint32(t.ICodecCtx.(pkg.IVideoCodecCtx).Height())
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
 | |
| 	var recordings []*pb.RecordingDetail
 | |
| 	s.Records.SafeRange(func(record *RecordJob) bool {
 | |
| 		if record.StreamPath == req.StreamPath {
 | |
| 			recordings = append(recordings, &pb.RecordingDetail{
 | |
| 				FilePath:   record.RecConf.FilePath,
 | |
| 				Mode:       record.RecConf.Mode,
 | |
| 				Fragment:   durationpb.New(record.RecConf.Fragment),
 | |
| 				Append:     record.RecConf.Append,
 | |
| 				PluginName: record.Plugin.Meta.Name,
 | |
| 			})
 | |
| 		}
 | |
| 		return true
 | |
| 	})
 | |
| 	if pub, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		res, err = s.getStreamInfo(pub)
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		res.Data.Recording = recordings
 | |
| 	} else {
 | |
| 		err = pkg.ErrNotFound
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
 | |
| 	var fillData func(m task.ITask) *pb.TaskTreeData
 | |
| 	fillData = func(m task.ITask) (res *pb.TaskTreeData) {
 | |
| 		if m == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		t := m.GetTask()
 | |
| 		res = &pb.TaskTreeData{
 | |
| 			Id:          m.GetTaskID(),
 | |
| 			Pointer:     uint64(t.GetTaskPointer()),
 | |
| 			State:       uint32(m.GetState()),
 | |
| 			Type:        uint32(m.GetTaskType()),
 | |
| 			Owner:       m.GetOwnerType(),
 | |
| 			StartTime:   timestamppb.New(t.StartTime),
 | |
| 			Description: m.GetDescriptions(),
 | |
| 			StartReason: t.StartReason,
 | |
| 		}
 | |
| 		if job, ok := m.(task.IJob); ok {
 | |
| 			if blockedTask := job.Blocked(); blockedTask != nil {
 | |
| 				res.Blocked = fillData(blockedTask)
 | |
| 			}
 | |
| 			for t := range job.RangeSubTask {
 | |
| 				child := fillData(t)
 | |
| 				if child == nil {
 | |
| 					continue
 | |
| 				}
 | |
| 				res.Children = append(res.Children, child)
 | |
| 			}
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 	res = &pb.TaskTreeResponse{Data: fillData(&Servers)}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) StopTask(ctx context.Context, req *pb.RequestWithId64) (resp *pb.SuccessResponse, err error) {
 | |
| 	t := task.FromPointer(uintptr(req.Id))
 | |
| 	if t == nil {
 | |
| 		return nil, pkg.ErrNotFound
 | |
| 	}
 | |
| 	t.Stop(task.ErrStopByUser)
 | |
| 	return &pb.SuccessResponse{}, nil
 | |
| }
 | |
| 
 | |
| func (s *Server) RestartTask(ctx context.Context, req *pb.RequestWithId64) (resp *pb.SuccessResponse, err error) {
 | |
| 	t := task.FromPointer(uintptr(req.Id))
 | |
| 	if t == nil {
 | |
| 		return nil, pkg.ErrNotFound
 | |
| 	}
 | |
| 	t.Stop(task.ErrRestart)
 | |
| 	return &pb.SuccessResponse{}, nil
 | |
| }
 | |
| 
 | |
| func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb.RecordingListResponse, err error) {
 | |
| 	resp = &pb.RecordingListResponse{}
 | |
| 	s.Records.SafeRange(func(record *RecordJob) bool {
 | |
| 		resp.Data = append(resp.Data, &pb.Recording{
 | |
| 			StreamPath: record.StreamPath,
 | |
| 			StartTime:  timestamppb.New(record.StartTime),
 | |
| 			Type:       reflect.TypeOf(record.recorder).String(),
 | |
| 			Pointer:    uint64(record.GetTaskPointer()),
 | |
| 		})
 | |
| 		return true
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *pb.SubscribersResponse, err error) {
 | |
| 	s.Streams.Call(func() error {
 | |
| 		var subscribers []*pb.SubscriberSnapShot
 | |
| 		for subscriber := range s.Subscribers.Range {
 | |
| 			meta, _ := json.Marshal(subscriber.GetDescriptions())
 | |
| 			snap := &pb.SubscriberSnapShot{
 | |
| 				Id:         subscriber.ID,
 | |
| 				StartTime:  timestamppb.New(subscriber.StartTime),
 | |
| 				Meta:       string(meta),
 | |
| 				Type:       subscriber.Type,
 | |
| 				PluginName: subscriber.Plugin.Meta.Name,
 | |
| 				SubMode:    int32(subscriber.SubMode),
 | |
| 				SyncMode:   int32(subscriber.SyncMode),
 | |
| 				BufferTime: durationpb.New(subscriber.BufferTime),
 | |
| 				RemoteAddr: subscriber.RemoteAddr,
 | |
| 			}
 | |
| 			if ar := subscriber.AudioReader; ar != nil {
 | |
| 				snap.AudioReader = &pb.RingReaderSnapShot{
 | |
| 					Sequence:  ar.Value.Sequence,
 | |
| 					Timestamp: ar.AbsTime,
 | |
| 					Delay:     ar.Delay,
 | |
| 					State:     int32(ar.State),
 | |
| 					Bps:       ar.BPS,
 | |
| 				}
 | |
| 			}
 | |
| 			if vr := subscriber.VideoReader; vr != nil {
 | |
| 				snap.VideoReader = &pb.RingReaderSnapShot{
 | |
| 					Sequence:  vr.Value.Sequence,
 | |
| 					Timestamp: vr.AbsTime,
 | |
| 					Delay:     vr.Delay,
 | |
| 					State:     int32(vr.State),
 | |
| 					Bps:       vr.BPS,
 | |
| 				}
 | |
| 			}
 | |
| 			subscribers = append(subscribers, snap)
 | |
| 		}
 | |
| 		res = &pb.SubscribersResponse{
 | |
| 			Data:  subscribers,
 | |
| 			Total: int32(s.Subscribers.Length),
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
 | |
| 	if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasAudioTrack() {
 | |
| 		data := &pb.TrackSnapShotData{}
 | |
| 		if pub.AudioTrack.Allocator != nil {
 | |
| 			for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
 | |
| 				var list []*pb.MemoryBlock
 | |
| 				for _, block := range memlist.GetBlocks() {
 | |
| 					list = append(list, &pb.MemoryBlock{
 | |
| 						S: uint32(block.Start),
 | |
| 						E: uint32(block.End),
 | |
| 					})
 | |
| 				}
 | |
| 				data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
 | |
| 			}
 | |
| 		}
 | |
| 		pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) {
 | |
| 			if len(v.Wraps) > 0 {
 | |
| 				var snap pb.TrackSnapShot
 | |
| 				snap.Sequence = v.Sequence
 | |
| 				snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
 | |
| 				snap.WriteTime = timestamppb.New(v.WriteTime)
 | |
| 				snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
 | |
| 				snap.KeyFrame = v.IDR
 | |
| 				data.RingDataSize += uint32(v.Wraps[0].GetSize())
 | |
| 				for i, wrap := range v.Wraps {
 | |
| 					snap.Wrap[i] = &pb.Wrap{
 | |
| 						Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
 | |
| 						Size:      uint32(wrap.GetSize()),
 | |
| 						Data:      wrap.String(),
 | |
| 					}
 | |
| 				}
 | |
| 				data.Ring = append(data.Ring, &snap)
 | |
| 			}
 | |
| 		})
 | |
| 		res = &pb.TrackSnapShotResponse{
 | |
| 			Code:    0,
 | |
| 			Message: "success",
 | |
| 			Data:    data,
 | |
| 		}
 | |
| 	} else {
 | |
| 		err = pkg.ErrNotFound
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
 | |
| 	streamPath := r.PathValue("streamPath")
 | |
| 	if r.URL.RawQuery != "" {
 | |
| 		streamPath += "?" + r.URL.RawQuery
 | |
| 	}
 | |
| 	suber, err := s.SubscribeWithConfig(r.Context(), streamPath, config.Subscribe{
 | |
| 		SubVideo: true,
 | |
| 		SubType:  SubscribeTypeAPI,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 	util.NewSSE(rw, r.Context(), func(sse *util.SSE) {
 | |
| 		PlayBlock(suber, (func(frame *pkg.AVFrame) (err error))(nil), func(frame *pkg.AVFrame) (err error) {
 | |
| 			var snap pb.TrackSnapShot
 | |
| 			snap.Sequence = frame.Sequence
 | |
| 			snap.Timestamp = uint32(frame.Timestamp / time.Millisecond)
 | |
| 			snap.WriteTime = timestamppb.New(frame.WriteTime)
 | |
| 			snap.Wrap = make([]*pb.Wrap, len(frame.Wraps))
 | |
| 			snap.KeyFrame = frame.IDR
 | |
| 			for i, wrap := range frame.Wraps {
 | |
| 				snap.Wrap[i] = &pb.Wrap{
 | |
| 					Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
 | |
| 					Size:      uint32(wrap.GetSize()),
 | |
| 					Data:      wrap.String(),
 | |
| 				}
 | |
| 			}
 | |
| 			return sse.WriteJSON(&snap)
 | |
| 		})
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (s *Server) api_AudioTrack_SSE(rw http.ResponseWriter, r *http.Request) {
 | |
| 	streamPath := r.PathValue("streamPath")
 | |
| 	if r.URL.RawQuery != "" {
 | |
| 		streamPath += "?" + r.URL.RawQuery
 | |
| 	}
 | |
| 	suber, err := s.SubscribeWithConfig(r.Context(), streamPath, config.Subscribe{
 | |
| 		SubAudio: true,
 | |
| 		SubType:  SubscribeTypeAPI,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 	util.NewSSE(rw, r.Context(), func(sse *util.SSE) {
 | |
| 		PlayBlock(suber, func(frame *pkg.AVFrame) (err error) {
 | |
| 			var snap pb.TrackSnapShot
 | |
| 			snap.Sequence = frame.Sequence
 | |
| 			snap.Timestamp = uint32(frame.Timestamp / time.Millisecond)
 | |
| 			snap.WriteTime = timestamppb.New(frame.WriteTime)
 | |
| 			snap.Wrap = make([]*pb.Wrap, len(frame.Wraps))
 | |
| 			snap.KeyFrame = frame.IDR
 | |
| 			for i, wrap := range frame.Wraps {
 | |
| 				snap.Wrap[i] = &pb.Wrap{
 | |
| 					Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
 | |
| 					Size:      uint32(wrap.GetSize()),
 | |
| 					Data:      wrap.String(),
 | |
| 				}
 | |
| 			}
 | |
| 			return sse.WriteJSON(&snap)
 | |
| 		}, (func(frame *pkg.AVFrame) (err error))(nil))
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
 | |
| 	if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasVideoTrack() {
 | |
| 		data := &pb.TrackSnapShotData{}
 | |
| 		if pub.VideoTrack.Allocator != nil {
 | |
| 			for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
 | |
| 				var list []*pb.MemoryBlock
 | |
| 				for _, block := range memlist.GetBlocks() {
 | |
| 					list = append(list, &pb.MemoryBlock{
 | |
| 						S: uint32(block.Start),
 | |
| 						E: uint32(block.End),
 | |
| 					})
 | |
| 				}
 | |
| 				data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
 | |
| 			}
 | |
| 		}
 | |
| 		pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
 | |
| 			if len(v.Wraps) > 0 {
 | |
| 				var snap pb.TrackSnapShot
 | |
| 				snap.Sequence = v.Sequence
 | |
| 				snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
 | |
| 				snap.WriteTime = timestamppb.New(v.WriteTime)
 | |
| 				snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
 | |
| 				snap.KeyFrame = v.IDR
 | |
| 				data.RingDataSize += uint32(v.Wraps[0].GetSize())
 | |
| 				for i, wrap := range v.Wraps {
 | |
| 					snap.Wrap[i] = &pb.Wrap{
 | |
| 						Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
 | |
| 						Size:      uint32(wrap.GetSize()),
 | |
| 						Data:      wrap.String(),
 | |
| 					}
 | |
| 				}
 | |
| 				data.Ring = append(data.Ring, &snap)
 | |
| 			}
 | |
| 		})
 | |
| 		res = &pb.TrackSnapShotResponse{
 | |
| 			Code:    0,
 | |
| 			Message: "success",
 | |
| 			Data:    data,
 | |
| 		}
 | |
| 	} else {
 | |
| 		err = pkg.ErrNotFound
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Restart stops the server with a restart error and returns
 | |
| // a success response. This method is used to restart the server
 | |
| // gracefully.
 | |
| func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
 | |
| 	s.Stop(pkg.ErrRestart)
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
 | |
| 	s.Stop(task.ErrStopByUser)
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	s.Streams.Call(func() error {
 | |
| 		if subscriber, ok := s.Subscribers.Get(req.Id); ok {
 | |
| 			if pub, ok := s.Streams.Get(req.StreamPath); ok {
 | |
| 				subscriber.Publisher.RemoveSubscriber(subscriber)
 | |
| 				subscriber.StreamPath = req.StreamPath
 | |
| 				pub.AddSubscriber(subscriber)
 | |
| 				return nil
 | |
| 			}
 | |
| 		}
 | |
| 		err = pkg.ErrNotFound
 | |
| 		return nil
 | |
| 	})
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
 | |
| 	s.Streams.Call(func() error {
 | |
| 		if subscriber, ok := s.Subscribers.Get(req.Id); ok {
 | |
| 			subscriber.Stop(errors.New("stop by api"))
 | |
| 		} else {
 | |
| 			err = pkg.ErrNotFound
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		s.Pause()
 | |
| 	}
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		s.Resume()
 | |
| 	}
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		s.Speed = float64(req.Speed)
 | |
| 		s.Scale = float64(req.Speed)
 | |
| 		s.Info("set stream speed", "speed", req.Speed)
 | |
| 	}
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		s.Seek(time.Unix(int64(req.TimeStamp), 0))
 | |
| 	}
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
 | |
| 		s.Stop(task.ErrStopByUser)
 | |
| 	}
 | |
| 	return &pb.SuccessResponse{}, err
 | |
| }
 | |
| 
 | |
| // /api/stream/list
 | |
| func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
 | |
| 	recordingMap := make(map[string][]*pb.RecordingDetail)
 | |
| 	for record := range s.Records.SafeRange {
 | |
| 		recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{
 | |
| 			FilePath:   record.RecConf.FilePath,
 | |
| 			Mode:       record.RecConf.Mode,
 | |
| 			Fragment:   durationpb.New(record.RecConf.Fragment),
 | |
| 			Append:     record.RecConf.Append,
 | |
| 			PluginName: record.Plugin.Meta.Name,
 | |
| 			Pointer:    uint64(record.GetTaskPointer()),
 | |
| 		})
 | |
| 	}
 | |
| 	var streams []*pb.StreamInfo
 | |
| 	for publisher := range s.Streams.SafeRange {
 | |
| 		info, err := s.getStreamInfo(publisher)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		info.Data.Recording = recordingMap[info.Data.Path]
 | |
| 		streams = append(streams, info.Data)
 | |
| 	}
 | |
| 	res = &pb.StreamListResponse{Data: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) WaitList(context.Context, *emptypb.Empty) (res *pb.StreamWaitListResponse, err error) {
 | |
| 	s.Streams.Call(func() error {
 | |
| 		res = &pb.StreamWaitListResponse{
 | |
| 			List: make(map[string]int32),
 | |
| 		}
 | |
| 		for subs := range s.Waiting.Range {
 | |
| 			res.List[subs.StreamPath] = int32(subs.Length)
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) Api_Summary_SSE(rw http.ResponseWriter, r *http.Request) {
 | |
| 	util.ReturnFetchValue(func() *pb.SummaryResponse {
 | |
| 		ret, _ := s.Summary(r.Context(), nil)
 | |
| 		return ret
 | |
| 	}, rw, r)
 | |
| }
 | |
| 
 | |
| func (s *Server) Api_Stream_Position_SSE(rw http.ResponseWriter, r *http.Request) {
 | |
| 	streamPath := r.URL.Query().Get("streamPath")
 | |
| 	util.ReturnFetchValue(func() (t time.Time) {
 | |
| 		if pub, ok := s.Streams.SafeGet(streamPath); ok {
 | |
| 			t = pub.GetPosition()
 | |
| 		}
 | |
| 		return
 | |
| 	}, rw, r)
 | |
| }
 | |
| 
 | |
| // func (s *Server) Api_Vod_Position(rw http.ResponseWriter, r *http.Request) {
 | |
| // 	streamPath := r.URL.Query().Get("streamPath")
 | |
| // 	if pub, ok := s.Streams.SafeGet(streamPath); ok {
 | |
| // 		t = pub.GetPosition()
 | |
| // 	}
 | |
| // }
 | |
| 
 | |
| func (s *Server) Summary(context.Context, *emptypb.Empty) (res *pb.SummaryResponse, err error) {
 | |
| 	dur := time.Since(s.lastSummaryTime)
 | |
| 	if dur < time.Second {
 | |
| 		res = s.lastSummary
 | |
| 		return
 | |
| 	}
 | |
| 	v, _ := mem.VirtualMemory()
 | |
| 	d, _ := disk.Usage("/")
 | |
| 	nv, _ := IOCounters(true)
 | |
| 	res = &pb.SummaryResponse{
 | |
| 		Memory: &pb.Usage{
 | |
| 			Total: v.Total >> 20,
 | |
| 			Free:  v.Available >> 20,
 | |
| 			Used:  v.Used >> 20,
 | |
| 			Usage: float32(v.UsedPercent),
 | |
| 		},
 | |
| 		HardDisk: &pb.Usage{
 | |
| 			Total: d.Total >> 30,
 | |
| 			Free:  d.Free >> 30,
 | |
| 			Used:  d.Used >> 30,
 | |
| 			Usage: float32(d.UsedPercent),
 | |
| 		},
 | |
| 	}
 | |
| 	if cc, _ := cpu.Percent(0, false); len(cc) > 0 {
 | |
| 		res.CpuUsage = float32(cc[0])
 | |
| 	}
 | |
| 	netWorks := []*pb.NetWorkInfo{}
 | |
| 	for i, n := range nv {
 | |
| 		info := &pb.NetWorkInfo{
 | |
| 			Name:    n.Name,
 | |
| 			Receive: n.BytesRecv,
 | |
| 			Sent:    n.BytesSent,
 | |
| 		}
 | |
| 		if s.lastSummary != nil && len(s.lastSummary.NetWork) > i {
 | |
| 			info.ReceiveSpeed = (n.BytesRecv - s.lastSummary.NetWork[i].Receive) / uint64(dur.Seconds())
 | |
| 			info.SentSpeed = (n.BytesSent - s.lastSummary.NetWork[i].Sent) / uint64(dur.Seconds())
 | |
| 		}
 | |
| 		netWorks = append(netWorks, info)
 | |
| 	}
 | |
| 	res.StreamCount = int32(s.Streams.Length)
 | |
| 	res.PullCount = int32(s.Pulls.Length)
 | |
| 	res.PushCount = int32(s.Pushs.Length)
 | |
| 	res.SubscribeCount = int32(s.Subscribers.Length)
 | |
| 	res.RecordCount = int32(s.Records.Length)
 | |
| 	res.TransformCount = int32(s.Transforms.Length)
 | |
| 	res.NetWork = netWorks
 | |
| 	s.lastSummary = res
 | |
| 	s.lastSummaryTime = time.Now()
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // /api/config/json/{name}
 | |
| func (s *Server) api_Config_JSON_(rw http.ResponseWriter, r *http.Request) {
 | |
| 	name := r.PathValue("name")
 | |
| 	var conf *config.Config
 | |
| 	if name == "global" {
 | |
| 		conf = &s.Config
 | |
| 	} else {
 | |
| 		p, ok := s.Plugins.Get(name)
 | |
| 		if !ok {
 | |
| 			http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound)
 | |
| 			return
 | |
| 		}
 | |
| 		conf = &p.Config
 | |
| 	}
 | |
| 	rw.Header().Set("Content-Type", "application/json")
 | |
| 	err := json.NewEncoder(rw).Encode(conf.GetMap())
 | |
| 	if err != nil {
 | |
| 		http.Error(rw, err.Error(), http.StatusInternalServerError)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Server) GetConfigFile(_ context.Context, req *emptypb.Empty) (res *pb.GetConfigFileResponse, err error) {
 | |
| 	res = &pb.GetConfigFileResponse{}
 | |
| 	res.Data = string(s.configFileContent)
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) UpdateConfigFile(_ context.Context, req *pb.UpdateConfigFileRequest) (res *pb.SuccessResponse, err error) {
 | |
| 	if s.configFileContent != nil {
 | |
| 		s.configFileContent = []byte(req.Content)
 | |
| 		os.WriteFile(s.configFilePath, s.configFileContent, 0644)
 | |
| 		res = &pb.SuccessResponse{}
 | |
| 	} else {
 | |
| 		err = pkg.ErrNotFound
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetConfig(_ context.Context, req *pb.GetConfigRequest) (res *pb.GetConfigResponse, err error) {
 | |
| 	res = &pb.GetConfigResponse{
 | |
| 		Data: &pb.ConfigData{},
 | |
| 	}
 | |
| 	var conf *config.Config
 | |
| 	if req.Name == "global" {
 | |
| 		conf = &s.Config
 | |
| 	} else {
 | |
| 		p, ok := s.Plugins.Get(req.Name)
 | |
| 		if !ok {
 | |
| 			err = pkg.ErrNotFound
 | |
| 			return
 | |
| 		}
 | |
| 		conf = &p.Config
 | |
| 	}
 | |
| 	var mm []byte
 | |
| 	mm, err = yaml.Marshal(conf.File)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	res.Data.File = string(mm)
 | |
| 
 | |
| 	mm, err = yaml.Marshal(conf.Modify)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	res.Data.Modified = string(mm)
 | |
| 
 | |
| 	mm, err = yaml.Marshal(conf.GetMap())
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	res.Data.Merged = string(mm)
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp *pb.RecordResponseList, err error) {
 | |
| 	if s.DB == nil {
 | |
| 		err = pkg.ErrNoDB
 | |
| 		return
 | |
| 	}
 | |
| 	if req.PageSize == 0 {
 | |
| 		req.PageSize = 10
 | |
| 	}
 | |
| 	if req.PageNum == 0 {
 | |
| 		req.PageNum = 1
 | |
| 	}
 | |
| 	offset := (req.PageNum - 1) * req.PageSize // 计算偏移量
 | |
| 	var totalCount int64                       //总条数
 | |
| 
 | |
| 	var result []*RecordStream
 | |
| 	query := s.DB.Model(&RecordStream{})
 | |
| 	if strings.Contains(req.StreamPath, "*") {
 | |
| 		query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%"))
 | |
| 	} else if req.StreamPath != "" {
 | |
| 		query = query.Where("stream_path = ?", req.StreamPath)
 | |
| 	}
 | |
| 	if req.Type != "" {
 | |
| 		query = query.Where("type = ?", req.Type)
 | |
| 	}
 | |
| 	startTime, endTime, err := util.TimeRangeQueryParse(url.Values{"range": []string{req.Range}, "start": []string{req.Start}, "end": []string{req.End}})
 | |
| 	if err == nil {
 | |
| 		if !startTime.IsZero() {
 | |
| 			query = query.Where("start_time >= ?", startTime)
 | |
| 		}
 | |
| 		if !endTime.IsZero() {
 | |
| 			query = query.Where("end_time <= ?", endTime)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	query.Count(&totalCount)
 | |
| 	err = query.Offset(int(offset)).Limit(int(req.PageSize)).Order("start_time desc").Find(&result).Error
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	resp = &pb.RecordResponseList{
 | |
| 		Total:    uint32(totalCount),
 | |
| 		PageNum:  req.PageNum,
 | |
| 		PageSize: req.PageSize,
 | |
| 	}
 | |
| 	for _, recordFile := range result {
 | |
| 		resp.Data = append(resp.Data, &pb.RecordFile{
 | |
| 			Id:         uint32(recordFile.ID),
 | |
| 			StartTime:  timestamppb.New(recordFile.StartTime),
 | |
| 			EndTime:    timestamppb.New(recordFile.EndTime),
 | |
| 			FilePath:   recordFile.FilePath,
 | |
| 			StreamPath: recordFile.StreamPath,
 | |
| 		})
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetEventRecordList(ctx context.Context, req *pb.ReqRecordList) (resp *pb.EventRecordResponseList, err error) {
 | |
| 	if s.DB == nil {
 | |
| 		err = pkg.ErrNoDB
 | |
| 		return
 | |
| 	}
 | |
| 	if req.PageSize == 0 {
 | |
| 		req.PageSize = 10
 | |
| 	}
 | |
| 	if req.PageNum == 0 {
 | |
| 		req.PageNum = 1
 | |
| 	}
 | |
| 	offset := (req.PageNum - 1) * req.PageSize // 计算偏移量
 | |
| 	var totalCount int64                       //总条数
 | |
| 
 | |
| 	var result []*EventRecordStream
 | |
| 	query := s.DB.Model(&EventRecordStream{})
 | |
| 	if strings.Contains(req.StreamPath, "*") {
 | |
| 		query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%"))
 | |
| 	} else if req.StreamPath != "" {
 | |
| 		query = query.Where("stream_path = ?", req.StreamPath)
 | |
| 	}
 | |
| 	if req.Type != "" {
 | |
| 		query = query.Where("type = ?", req.Type)
 | |
| 	}
 | |
| 	startTime, endTime, err := util.TimeRangeQueryParse(url.Values{"range": []string{req.Range}, "start": []string{req.Start}, "end": []string{req.End}})
 | |
| 	if err == nil {
 | |
| 		if !startTime.IsZero() {
 | |
| 			query = query.Where("start_time >= ?", startTime)
 | |
| 		}
 | |
| 		if !endTime.IsZero() {
 | |
| 			query = query.Where("end_time <= ?", endTime)
 | |
| 		}
 | |
| 	}
 | |
| 	if req.EventLevel != "" {
 | |
| 		query = query.Where("event_level = ?", req.EventLevel)
 | |
| 	}
 | |
| 
 | |
| 	query.Count(&totalCount)
 | |
| 	err = query.Offset(int(offset)).Limit(int(req.PageSize)).Order("start_time desc").Find(&result).Error
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	resp = &pb.EventRecordResponseList{
 | |
| 		Total:    uint32(totalCount),
 | |
| 		PageNum:  req.PageNum,
 | |
| 		PageSize: req.PageSize,
 | |
| 	}
 | |
| 	for _, recordFile := range result {
 | |
| 		resp.Data = append(resp.Data, &pb.EventRecordFile{
 | |
| 			Id:         uint32(recordFile.ID),
 | |
| 			StartTime:  timestamppb.New(recordFile.StartTime),
 | |
| 			EndTime:    timestamppb.New(recordFile.EndTime),
 | |
| 			FilePath:   recordFile.FilePath,
 | |
| 			StreamPath: recordFile.StreamPath,
 | |
| 			EventLevel: recordFile.EventLevel,
 | |
| 			EventId:    recordFile.EventId,
 | |
| 			EventName:  recordFile.EventName,
 | |
| 			EventDesc:  recordFile.EventDesc,
 | |
| 		})
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetRecordCatalog(ctx context.Context, req *pb.ReqRecordCatalog) (resp *pb.ResponseCatalog, err error) {
 | |
| 	if s.DB == nil {
 | |
| 		err = pkg.ErrNoDB
 | |
| 		return
 | |
| 	}
 | |
| 	resp = &pb.ResponseCatalog{}
 | |
| 	var result []struct {
 | |
| 		StreamPath string
 | |
| 		Count      uint
 | |
| 		StartTime  time.Time
 | |
| 		EndTime    time.Time
 | |
| 	}
 | |
| 	query := s.DB.Model(&RecordStream{})
 | |
| 	if req.Type != "" {
 | |
| 		query = query.Where("type = ?", req.Type)
 | |
| 	}
 | |
| 	err = query.Select("stream_path,count(id) as count,min(start_time) as start_time,max(end_time) as end_time").Group("stream_path").Find(&result).Error
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	for _, row := range result {
 | |
| 		resp.Data = append(resp.Data, &pb.Catalog{
 | |
| 			StreamPath: row.StreamPath,
 | |
| 			Count:      uint32(row.Count),
 | |
| 			StartTime:  timestamppb.New(row.StartTime),
 | |
| 			EndTime:    timestamppb.New(row.EndTime),
 | |
| 		})
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) DeleteRecord(ctx context.Context, req *pb.ReqRecordDelete) (resp *pb.ResponseDelete, err error) {
 | |
| 	if s.DB == nil {
 | |
| 		err = pkg.ErrNoDB
 | |
| 		return
 | |
| 	}
 | |
| 	ids := req.GetIds()
 | |
| 	var result []*RecordStream
 | |
| 	if len(ids) > 0 {
 | |
| 		s.DB.Find(&result, "stream_path=? AND type=? AND id IN ?", req.StreamPath, req.Type, ids)
 | |
| 	} else {
 | |
| 		startTime, endTime, err := util.TimeRangeQueryParse(url.Values{"range": []string{req.Range}, "start": []string{req.StartTime}, "end": []string{req.EndTime}})
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		s.DB.Find(&result, "stream_path=? AND type=? AND start_time>=? AND end_time<=?", req.StreamPath, req.Type, startTime, endTime)
 | |
| 	}
 | |
| 	err = s.DB.Delete(result).Error
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	var apiResult []*pb.RecordFile
 | |
| 	for _, recordFile := range result {
 | |
| 		apiResult = append(apiResult, &pb.RecordFile{
 | |
| 			Id:         uint32(recordFile.ID),
 | |
| 			StartTime:  timestamppb.New(recordFile.StartTime),
 | |
| 			EndTime:    timestamppb.New(recordFile.EndTime),
 | |
| 			FilePath:   recordFile.FilePath,
 | |
| 			StreamPath: recordFile.StreamPath,
 | |
| 		})
 | |
| 		err = os.Remove(recordFile.FilePath)
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	resp = &pb.ResponseDelete{
 | |
| 		Data: apiResult,
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (s *Server) GetTransformList(ctx context.Context, req *emptypb.Empty) (res *pb.TransformListResponse, err error) {
 | |
| 	res = &pb.TransformListResponse{}
 | |
| 	s.Transforms.Call(func() error {
 | |
| 		for transform := range s.Transforms.Range {
 | |
| 			info := &pb.Transform{
 | |
| 				StreamPath: transform.StreamPath,
 | |
| 				Target:     transform.Target,
 | |
| 			}
 | |
| 			if transform.TransformJob != nil {
 | |
| 				info.PluginName = transform.TransformJob.Plugin.Meta.Name
 | |
| 				var result []byte
 | |
| 				result, err = yaml.Marshal(transform.TransformJob.Config)
 | |
| 				if err != nil {
 | |
| 					s.Error("marshal transform config failed", "error", err)
 | |
| 					return err
 | |
| 				}
 | |
| 				info.Config = string(result)
 | |
| 			}
 | |
| 			res.Data = append(res.Data, info)
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	return
 | |
| }
 | 
