feat: add publish type

This commit is contained in:
langhuihui
2024-10-22 14:32:08 +08:00
parent 4217713c77
commit a3ad3f8e81
14 changed files with 808 additions and 683 deletions

24
api.go
View File

@@ -38,16 +38,20 @@ func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoRespon
} }
} }
res = &pb.SysInfoResponse{ res = &pb.SysInfoResponse{
Version: Version, Code: 0,
LocalIP: localIP, Message: "success",
StartTime: timestamppb.New(s.StartTime), Data: &pb.SysInfoData{
GoVersion: runtime.Version(), Version: Version,
Os: runtime.GOOS, LocalIP: localIP,
Arch: runtime.GOARCH, StartTime: timestamppb.New(s.StartTime),
Cpus: int32(runtime.NumCPU()), GoVersion: runtime.Version(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Cpus: int32(runtime.NumCPU()),
},
} }
for p := range s.Plugins.Range { for p := range s.Plugins.Range {
res.Plugins = append(res.Plugins, &pb.PluginInfo{ res.Data.Plugins = append(res.Data.Plugins, &pb.PluginInfo{
Name: p.Meta.Name, Name: p.Meta.Name,
Version: p.Meta.Version, Version: p.Meta.Version,
Disabled: p.Disabled, Disabled: p.Disabled,
@@ -102,7 +106,7 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err
State: int32(pub.State), State: int32(pub.State),
StartTime: timestamppb.New(pub.StartTime), StartTime: timestamppb.New(pub.StartTime),
Subscribers: int32(pub.Subscribers.Length), Subscribers: int32(pub.Subscribers.Length),
Type: pub.Plugin.Meta.Name, PluginName: pub.Plugin.Meta.Name,
} }
if t := pub.AudioTrack.AVTrack; t != nil { if t := pub.AudioTrack.AVTrack; t != nil {
@@ -390,7 +394,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
} }
streams = append(streams, info) streams = append(streams, info)
} }
res = &pb.StreamListResponse{List: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize} res = &pb.StreamListResponse{Data: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize}
return nil return nil
}) })
return return

View File

@@ -5,6 +5,7 @@ import (
"flag" "flag"
"m7s.live/pro" "m7s.live/pro"
_ "m7s.live/pro/plugin/admin"
_ "m7s.live/pro/plugin/console" _ "m7s.live/pro/plugin/console"
_ "m7s.live/pro/plugin/debug" _ "m7s.live/pro/plugin/debug"
_ "m7s.live/pro/plugin/flv" _ "m7s.live/pro/plugin/flv"

File diff suppressed because it is too large Load Diff

View File

@@ -184,7 +184,7 @@ message PluginInfo {
bool disabled = 3; bool disabled = 3;
} }
message SysInfoResponse { message SysInfoData {
google.protobuf.Timestamp startTime = 1; google.protobuf.Timestamp startTime = 1;
string localIP = 2; string localIP = 2;
string version = 3; string version = 3;
@@ -195,6 +195,12 @@ message SysInfoResponse {
repeated PluginInfo plugins = 8; repeated PluginInfo plugins = 8;
} }
message SysInfoResponse {
int32 code = 1;
string message = 2;
SysInfoData data = 3;
}
message TaskTreeResponse { message TaskTreeResponse {
uint32 id = 1; uint32 id = 1;
uint32 type = 2; uint32 type = 2;
@@ -212,10 +218,12 @@ message StreamListRequest {
} }
message StreamListResponse { message StreamListResponse {
int32 total = 1; int32 code = 1;
int32 pageNum = 2; string message = 2;
int32 pageSize = 3; int32 total = 3;
repeated StreamInfoResponse list = 4; int32 pageNum = 4;
int32 pageSize = 5;
repeated StreamInfoResponse data = 6;
} }
message StreamWaitListResponse { message StreamWaitListResponse {
@@ -233,8 +241,9 @@ message StreamInfoResponse {
AudioTrackInfo audioTrack = 4; AudioTrackInfo audioTrack = 4;
VideoTrackInfo videoTrack = 5; VideoTrackInfo videoTrack = 5;
google.protobuf.Timestamp startTime = 6; google.protobuf.Timestamp startTime = 6;
string type = 7; string pluginName = 7;
string meta = 8; string type = 8;
string meta = 9;
} }
message Wrap { message Wrap {

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"time" "time"
"m7s.live/m7s/v5" m7s "m7s.live/pro"
) )
type AdminPlugin struct { type AdminPlugin struct {

View File

@@ -6,7 +6,7 @@ import (
"strings" "strings"
"time" "time"
"m7s.live/pro" m7s "m7s.live/pro"
"m7s.live/pro/pkg/task" "m7s.live/pro/pkg/task"
. "m7s.live/pro/plugin/flv/pkg" . "m7s.live/pro/plugin/flv/pkg"
@@ -24,31 +24,13 @@ var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, NewRecorder)
func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv") streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv")
//query := r.URL.Query()
//speedStr := query.Get("speed")
//speed, err := strconv.ParseFloat(speedStr, 64)
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
} }
}() }()
//if err != nil { var conn net.Conn
// speed = 1
//}
//if startTime, err := util.TimeQueryParse(query.Get("start")); err == nil {
// var vod Vod
// if err = vod.Init(startTime, filepath.Join(plugin.Path, streamPath)); err != nil {
// http.Error(w, err.Error(), http.StatusBadRequest)
// return
// }
// vod.Writer = w
// vod.SetSpeed(speed)
// plugin.Info("vod start", "streamPath", streamPath, "startTime", startTime, "speed", speed)
// err = vod.Run(r.Context())
// plugin.Info("vod done", "streamPath", streamPath, "err", err)
// return
//}
var live Live var live Live
if r.URL.RawQuery != "" { if r.URL.RawQuery != "" {
streamPath += "?" + r.URL.RawQuery streamPath += "?" + r.URL.RawQuery
@@ -57,7 +39,6 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
return return
} }
var conn net.Conn
conn, err = live.Subscriber.CheckWebSocket(w, r) conn, err = live.Subscriber.CheckWebSocket(w, r)
if err != nil { if err != nil {
return return

View File

@@ -104,6 +104,7 @@ func (gb *GB28181Plugin) api_ps_replay(w http.ResponseWriter, r *http.Request) {
} }
var pub *m7s.Publisher var pub *m7s.Publisher
if pub, err = gb.Publish(gb.Context, streamPath); err == nil { if pub, err = gb.Publish(gb.Context, streamPath); err == nil {
pub.Type = m7s.PublishTypeReplay
go gb.replayPS(pub, f) go gb.replayPS(pub, f)
util.ReturnOK(w, r) util.ReturnOK(w, r)
} else { } else {

View File

@@ -35,6 +35,9 @@ func (d *Dialog) Start() (err error) {
if err != nil { if err != nil {
return return
} }
if !d.IsLive() {
d.pullCtx.Publisher.Type = m7s.PublishTypeVod
}
sss := strings.Split(d.pullCtx.RemoteURL, "/") sss := strings.Split(d.pullCtx.RemoteURL, "/")
deviceId, channelId := sss[0], sss[1] deviceId, channelId := sss[0], sss[1]
if len(sss) == 2 { if len(sss) == 2 {

View File

@@ -33,6 +33,7 @@ func NewPuller(conf config.Pull) m7s.IPuller {
func (p *RecordReader) Run() (err error) { func (p *RecordReader) Run() (err error) {
pullJob := &p.PullJob pullJob := &p.PullJob
publisher := pullJob.Publisher publisher := pullJob.Publisher
publisher.Type = m7s.PublishTypeVod
allocator := util.NewScalableMemoryAllocator(1 << 10) allocator := util.NewScalableMemoryAllocator(1 << 10)
var ts, tsOffset int64 var ts, tsOffset int64
defer allocator.Recycle() defer allocator.Recycle()

View File

@@ -27,6 +27,14 @@ const (
PublisherStateDisposed PublisherStateDisposed
) )
const (
PublishTypePull = "pull"
PublishTypeServer = "server"
PublishTypeVod = "vod"
PublishTypeTransform = "transform"
PublishTypeReplay = "replay"
)
const threshold = 10 * time.Millisecond const threshold = 10 * time.Millisecond
type SpeedControl struct { type SpeedControl struct {
@@ -157,6 +165,7 @@ func (p *Publisher) GetKey() string {
// createPublisher -> Start -> WriteAudio/WriteVideo -> Dispose // createPublisher -> Start -> WriteAudio/WriteVideo -> Dispose
func createPublisher(p *Plugin, streamPath string, conf config.Publish) (publisher *Publisher) { func createPublisher(p *Plugin, streamPath string, conf config.Publish) (publisher *Publisher) {
publisher = &Publisher{Publish: conf} publisher = &Publisher{Publish: conf}
publisher.Type = PublishTypeServer
publisher.ID = task.GetNextTaskID() publisher.ID = task.GetNextTaskID()
publisher.Plugin = p publisher.Plugin = p
publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout) publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout)

View File

@@ -131,6 +131,7 @@ func (p *PullJob) Publish() (err error) {
streamPath += "?" + p.Args.Encode() streamPath += "?" + p.Args.Encode()
} }
p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.publishConfig) p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.publishConfig)
p.Publisher.Type = PublishTypePull
if err == nil && p.conf.MaxRetry != 0 { if err == nil && p.conf.MaxRetry != 0 {
p.Publisher.OnDispose(func() { p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout) { if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout) {

View File

@@ -393,15 +393,16 @@ func (s *Server) OnSubscribe(streamPath string, args url.Values) {
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/favicon.ico" { http.Redirect(w, r, "/admin", http.StatusPermanentRedirect)
http.ServeFile(w, r, "favicon.ico") // if r.URL.Path == "/favicon.ico" {
return // http.ServeFile(w, r, "favicon.ico")
} // return
_, _ = fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime) // }
for plugin := range s.Plugins.Range { // _, _ = fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime)
_, _ = fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version) // for plugin := range s.Plugins.Range {
} // _, _ = fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version)
for _, api := range s.apiList { // }
_, _ = fmt.Fprintf(w, "%s\n", api) // for _, api := range s.apiList {
} // _, _ = fmt.Fprintf(w, "%s\n", api)
// }
} }

View File

@@ -26,6 +26,7 @@ var Owner task.TaskContextKey = "owner"
type PubSubBase struct { type PubSubBase struct {
task.Job task.Job
Plugin *Plugin Plugin *Plugin
Type string
StreamPath string StreamPath string
Args url.Values Args url.Values
TimeoutTimer *time.Timer TimeoutTimer *time.Timer

View File

@@ -77,6 +77,7 @@ func (p *TransformJob) Subscribe() (err error) {
func (p *TransformJob) Publish(streamPath string) (err error) { func (p *TransformJob) Publish(streamPath string) (err error) {
p.Publisher, err = p.Plugin.Publish(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath) p.Publisher, err = p.Plugin.Publish(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath)
p.Publisher.Type = PublishTypeTransform
return return
} }