feat: add subscriber type

This commit is contained in:
langhuihui
2024-12-18 13:23:10 +08:00
parent 0042568dff
commit 6037cbe18d
19 changed files with 1309 additions and 722 deletions

74
api.go
View File

@@ -48,7 +48,7 @@ func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoRespon
Data: &pb.SysInfoData{
Version: Version,
LocalIP: localIP,
PublicIP: util.GetPublicIP(localIP),
PublicIP: util.GetPublicIP(""),
StartTime: timestamppb.New(s.StartTime),
GoVersion: runtime.Version(),
Os: runtime.GOOS,
@@ -168,9 +168,23 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err
}
func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
recording := false
s.Records.Call(func() error {
for record := range s.Records.Range {
if record.StreamPath == req.StreamPath {
recording = true
break
}
}
return nil
})
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
if err != nil {
return err
}
res.Data.Recording = recording
} else {
err = pkg.ErrNotFound
}
@@ -233,10 +247,6 @@ func (s *Server) RestartTask(ctx context.Context, req *pb.RequestWithId64) (resp
}
func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb.RecordingListResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
s.Records.Call(func() error {
resp = &pb.RecordingListResponse{}
for record := range s.Records.Range {
@@ -244,6 +254,7 @@ func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb
StreamPath: record.StreamPath,
StartTime: timestamppb.New(record.StartTime),
Type: reflect.TypeOf(record.recorder).String(),
Pointer: uint64(uintptr(unsafe.Pointer(record.GetTask()))),
})
}
return nil
@@ -345,7 +356,14 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
if r.URL.RawQuery != "" {
streamPath += "?" + r.URL.RawQuery
}
suber, err := s.Subscribe(r.Context(), streamPath)
suber, err := s.SubscribeWithConfig(r.Context(), streamPath, config.Subscribe{
SubVideo: true,
})
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
suber.Type = SubscribeTypeAPI
sse := util.NewSSE(rw, r.Context())
PlayBlock(suber, (func(frame *pkg.AVFrame) (err error))(nil), func(frame *pkg.AVFrame) (err error) {
var snap pb.TrackSnapShot
@@ -369,6 +387,42 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
}
}
func (s *Server) api_AudioTrack_SSE(rw http.ResponseWriter, r *http.Request) {
streamPath := r.PathValue("streamPath")
if r.URL.RawQuery != "" {
streamPath += "?" + r.URL.RawQuery
}
suber, err := s.SubscribeWithConfig(r.Context(), streamPath, config.Subscribe{
SubAudio: true,
})
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
suber.Type = SubscribeTypeAPI
sse := util.NewSSE(rw, r.Context())
PlayBlock(suber, func(frame *pkg.AVFrame) (err error) {
var snap pb.TrackSnapShot
snap.Sequence = frame.Sequence
snap.Timestamp = uint32(frame.Timestamp / time.Millisecond)
snap.WriteTime = timestamppb.New(frame.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(frame.Wraps))
snap.KeyFrame = frame.IDR
for i, wrap := range frame.Wraps {
snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Size: uint32(wrap.GetSize()),
Data: wrap.String(),
}
}
return sse.WriteJSON(&snap)
}, (func(frame *pkg.AVFrame) (err error))(nil))
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
}
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
@@ -510,6 +564,13 @@ func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (re
// /api/stream/list
func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
recordingSet := make(map[string]struct{})
s.Records.Call(func() error {
for record := range s.Records.Range {
recordingSet[record.StreamPath] = struct{}{}
}
return nil
})
s.Streams.Call(func() error {
var streams []*pb.StreamInfo
for publisher := range s.Streams.Range {
@@ -517,6 +578,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
if err != nil {
continue
}
_, info.Data.Recording = recordingSet[info.Data.Path]
streams = append(streams, info.Data)
}
res = &pb.StreamListResponse{Data: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize}