fix: readd device no pull start

This commit is contained in:
langhuihui
2024-11-07 18:51:45 +08:00
parent 4ac22f13e0
commit e1180173d6
18 changed files with 137 additions and 53 deletions

40
api.go
View File

@@ -396,6 +396,46 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res
return &pb.SuccessResponse{}, err return &pb.SuccessResponse{}, err
} }
func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Pause()
}
return nil
})
return &pb.SuccessResponse{}, err
}
func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Resume()
}
return nil
})
return &pb.SuccessResponse{}, err
}
func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Speed = float64(req.Speed)
}
return nil
})
return &pb.SuccessResponse{}, err
}
func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Seek(time.Unix(int64(req.TimeStamp), 0))
}
return nil
})
return &pb.SuccessResponse{}, err
}
func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error { s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok { if s, ok := s.Streams.Get(req.StreamPath); ok {

View File

@@ -11,5 +11,5 @@ mp4:
onpub: onpub:
record: record:
^live/.+: ^live/.+:
fragment: 10s fragment: 1m
filepath: record/$0 filepath: record/$0

View File

@@ -5,7 +5,7 @@ import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto"; import "google/protobuf/duration.proto";
import "google/protobuf/any.proto"; import "google/protobuf/any.proto";
package global; package global;
option go_package="m7s.live/m7s/v5/pb"; option go_package="m7s.live/pro/pb";
service api { service api {
rpc SysInfo (google.protobuf.Empty) returns (SysInfoResponse) { rpc SysInfo (google.protobuf.Empty) returns (SysInfoResponse) {

View File

@@ -37,9 +37,9 @@ func (m *RootManager[K, T]) Init() {
m.Context, m.CancelCauseFunc = context.WithCancelCause(context.Background()) m.Context, m.CancelCauseFunc = context.WithCancelCause(context.Background())
m.handler = m m.handler = m
m.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) m.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
m.state = TASK_STATE_STARTED
m.StartTime = time.Now() m.StartTime = time.Now()
m.AddTask(&OSSignal{root: m}) m.AddTask(&OSSignal{root: m}).WaitStarted()
m.state = TASK_STATE_STARTED
} }
func (m *RootManager[K, T]) Shutdown() { func (m *RootManager[K, T]) Shutdown() {

View File

@@ -19,9 +19,9 @@ const (
) )
var ( var (
unixTimeReg = regexp.MustCompile(`^\d+$`) UnixTimeReg = regexp.MustCompile(`^\d+$`)
unixTimeRangeReg = regexp.MustCompile(`^(\d+)(~|-)(\d+)$`) UnixTimeRangeReg = regexp.MustCompile(`^(\d+)(~|-)(\d+)$`)
timeStrRangeReg = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})~(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})$`) TimeStrRangeReg = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})~(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})$`)
) )
func TimeRangeQueryParse(query url.Values) (startTime, endTime time.Time, err error) { func TimeRangeQueryParse(query url.Values) (startTime, endTime time.Time, err error) {
@@ -29,9 +29,21 @@ func TimeRangeQueryParse(query url.Values) (startTime, endTime time.Time, err er
if rangeStr == "" { if rangeStr == "" {
startTimeStr := query.Get(StartKey) startTimeStr := query.Get(StartKey)
endTimeStr := query.Get(EndKey) endTimeStr := query.Get(EndKey)
if endTimeStr == "" { if startTimeStr == "" {
startTime = time.Time{}
if endTimeStr == "" {
endTime = time.Now()
} else {
if UnixTimeReg.MatchString(endTimeStr) {
endTime, err = UnixTimeQueryParse(endTimeStr)
} else {
endTime, err = TimeQueryParse(endTimeStr)
}
}
return
} else if endTimeStr == "" {
endTime = time.Now() endTime = time.Now()
if unixTimeReg.MatchString(startTimeStr) { if UnixTimeReg.MatchString(startTimeStr) {
startTime, err = UnixTimeQueryParse(startTimeStr) startTime, err = UnixTimeQueryParse(startTimeStr)
} else { } else {
startTime, err = TimeQueryParse(startTimeStr) startTime, err = TimeQueryParse(startTimeStr)
@@ -40,13 +52,13 @@ func TimeRangeQueryParse(query url.Values) (startTime, endTime time.Time, err er
} }
rangeStr = startTimeStr + "~" + endTimeStr rangeStr = startTimeStr + "~" + endTimeStr
} }
if match := unixTimeRangeReg.FindStringSubmatch(rangeStr); len(match) == 4 { if match := UnixTimeRangeReg.FindStringSubmatch(rangeStr); len(match) == 4 {
startTime, err = UnixTimeQueryParse(match[1]) startTime, err = UnixTimeQueryParse(match[1])
if err != nil { if err != nil {
return return
} }
endTime, err = UnixTimeQueryParseRefer(match[3], startTime) endTime, err = UnixTimeQueryParseRefer(match[3], startTime)
} else if match := timeStrRangeReg.FindStringSubmatch(rangeStr); len(match) == 3 { } else if match := TimeStrRangeReg.FindStringSubmatch(rangeStr); len(match) == 3 {
startTime, err = TimeQueryParse(match[1]) startTime, err = TimeQueryParse(match[1])
if err != nil { if err != nil {
return return

View File

@@ -31,7 +31,7 @@ $ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```go ```go
package plugin_myplugin package plugin_myplugin
import ( import (
"m7s.live/m7s/v5" "m7s.live/pro"
) )
var _ = m7s.InstallPlugin[MyPlugin]() var _ = m7s.InstallPlugin[MyPlugin]()
@@ -152,7 +152,7 @@ syntax = "proto3";
import "google/api/annotations.proto"; import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
package myplugin; package myplugin;
option go_package="m7s.live/m7s/v5/plugin/myplugin/pb"; option go_package="m7s.live/pro/plugin/myplugin/pb";
service api { service api {
rpc MyMethod (MyRequest) returns (MyResponse) { rpc MyMethod (MyRequest) returns (MyResponse) {
@@ -217,8 +217,8 @@ func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.My
```go ```go
package plugin_myplugin package plugin_myplugin
import ( import (
"m7s.live/m7s/v5" "m7s.live/pro"
"m7s.live/m7s/v5/plugin/myplugin/pb" "m7s.live/pro/plugin/myplugin/pb"
) )
var _ = m7s.InstallPlugin[MyPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler) var _ = m7s.InstallPlugin[MyPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)

View File

@@ -48,6 +48,9 @@ func (p *Puller) Run() (err error) {
} }
allocator := util.NewScalableMemoryAllocator(1 << 10) allocator := util.NewScalableMemoryAllocator(1 << 10)
for offsetTs := absTS; err == nil; _, err = reader.ReadBE(4) { for offsetTs := absTS; err == nil; _, err = reader.ReadBE(4) {
if p.IsStopped() {
return p.StopReason()
}
t, err := reader.ReadByte() t, err := reader.ReadByte()
if err != nil { if err != nil {
return err return err
@@ -83,9 +86,17 @@ func (p *Puller) Run() (err error) {
//fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS) //fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS)
switch t { switch t {
case FLV_TAG_TYPE_AUDIO: case FLV_TAG_TYPE_AUDIO:
err = publisher.WriteAudio(frame.WrapAudio()) if publisher.PubAudio {
if err = publisher.WriteAudio(frame.WrapAudio()); err != nil {
return err
}
}
case FLV_TAG_TYPE_VIDEO: case FLV_TAG_TYPE_VIDEO:
err = publisher.WriteVideo(frame.WrapVideo()) if publisher.PubVideo {
if err = publisher.WriteVideo(frame.WrapVideo()); err != nil {
return err
}
}
case FLV_TAG_TYPE_SCRIPT: case FLV_TAG_TYPE_SCRIPT:
r := frame.NewReader() r := frame.NewReader()
amf := &rtmp.AMF{ amf := &rtmp.AMF{

View File

@@ -50,8 +50,10 @@ type GB28181Plugin struct {
Sip SipConfig Sip SipConfig
MediaPort util.Range[uint16] `default:"10000-20000" desc:"媒体端口范围"` //媒体端口范围 MediaPort util.Range[uint16] `default:"10000-20000" desc:"媒体端口范围"` //媒体端口范围
Position PositionConfig Position PositionConfig
Parent string `desc:"父级设备"`
ua *sipgo.UserAgent ua *sipgo.UserAgent
server *sipgo.Server server *sipgo.Server
client *sipgo.Client
devices util.Collection[string, *Device] devices util.Collection[string, *Device]
dialogs util.Collection[uint32, *Dialog] dialogs util.Collection[uint32, *Dialog]
tcpPorts chan uint16 tcpPorts chan uint16
@@ -68,37 +70,51 @@ func (gb *GB28181Plugin) OnInit() (err error) {
logger := zerolog.New(os.Stdout) logger := zerolog.New(os.Stdout)
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
// Creating client handle for ua // Creating client handle for ua
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua if len(gb.Sip.ListenAddr) > 0 {
gb.server.OnRegister(gb.OnRegister) gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnMessage(gb.OnMessage) gb.server.OnRegister(gb.OnRegister)
gb.server.OnBye(gb.OnBye) gb.server.OnMessage(gb.OnMessage)
gb.devices.L = new(sync.RWMutex) gb.server.OnBye(gb.OnBye)
gb.devices.L = new(sync.RWMutex)
if gb.MediaPort.Valid() { if gb.MediaPort.Valid() {
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
for i := range gb.MediaPort.Size() { for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i gb.tcpPorts <- gb.MediaPort[0] + i
}
} else {
tcpConfig := &gb.GetCommonConf().TCP
tcpConfig.ListenAddr = fmt.Sprintf(":%d", gb.MediaPort[0])
}
for _, addr := range gb.Sip.ListenAddr {
netWork, addr, _ := strings.Cut(addr, ":")
go gb.server.ListenAndServe(gb, netWork, addr)
}
if len(gb.Sip.ListenTLSAddr) > 0 {
if tslConfig, err := config.GetTLSConfig(gb.Sip.CertFile, gb.Sip.KeyFile); err == nil {
for _, addr := range gb.Sip.ListenTLSAddr {
netWork, addr, _ := strings.Cut(addr, ":")
go gb.server.ListenAndServeTLS(gb, netWork, addr, tslConfig)
} }
} else { } else {
return err tcpConfig := &gb.GetCommonConf().TCP
tcpConfig.ListenAddr = fmt.Sprintf(":%d", gb.MediaPort[0])
}
for _, addr := range gb.Sip.ListenAddr {
netWork, addr, _ := strings.Cut(addr, ":")
go gb.server.ListenAndServe(gb, netWork, addr)
}
if len(gb.Sip.ListenTLSAddr) > 0 {
if tslConfig, err := config.GetTLSConfig(gb.Sip.CertFile, gb.Sip.KeyFile); err == nil {
for _, addr := range gb.Sip.ListenTLSAddr {
netWork, addr, _ := strings.Cut(addr, ":")
go gb.server.ListenAndServeTLS(gb, netWork, addr, tslConfig)
}
} else {
return err
}
}
if gb.DB != nil {
gb.DB.AutoMigrate(&Device{})
} }
} }
if gb.DB != nil { if gb.Parent != "" {
gb.DB.AutoMigrate(&Device{}) host, portStr, _ := net.SplitHostPort(gb.Parent)
if portStr != "" {
portStr = "5060"
}
port, _ := strconv.Atoi(portStr)
gb.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(host), sipgo.WithClientPort(port))
gb.client.Do(gb, sip.NewRequest("REGISTER", sip.Uri{
Host: host,
Port: port,
}))
} }
return return
} }

View File

@@ -4,7 +4,7 @@ import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
//import "global.proto"; //import "global.proto";
package gb28181; package gb28181;
option go_package="m7s.live/m7s/v5/plugin/gb28181/pb"; option go_package="m7s.live/pro/plugin/gb28181/pb";
service api { service api {
rpc List (google.protobuf.Empty) returns (ResponseList) { rpc List (google.protobuf.Empty) returns (ResponseList) {

View File

@@ -2,7 +2,7 @@ syntax = "proto3";
import "google/api/annotations.proto"; import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
package logrotate; package logrotate;
option go_package="m7s.live/m7s/v5/plugin/logrotate/pb"; option go_package="m7s.live/pro/plugin/logrotate/pb";
service api { service api {
rpc List (google.protobuf.Empty) returns (ResponseFileInfo) { rpc List (google.protobuf.Empty) returns (ResponseFileInfo) {

View File

@@ -3,7 +3,7 @@ import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
package monitor; package monitor;
option go_package="m7s.live/m7s/v5/plugin/monitor/pb"; option go_package="m7s.live/pro/plugin/monitor/pb";
service api { service api {
rpc SearchTask (SearchTaskRequest) returns (SearchTaskResponse) { rpc SearchTask (SearchTaskRequest) returns (SearchTaskResponse) {

View File

@@ -3,7 +3,7 @@ import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
package mp4; package mp4;
option go_package="m7s.live/m7s/v5/plugin/mp4/pb"; option go_package="m7s.live/pro/plugin/mp4/pb";
service api { service api {
rpc List (ReqRecordList) returns (ResponseList) { rpc List (ReqRecordList) returns (ResponseList) {

View File

@@ -3,6 +3,7 @@ package mp4
import ( import (
"io" "io"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
@@ -43,7 +44,11 @@ func (p *RecordReader) Run() (err error) {
defer allocator.Recycle() defer allocator.Recycle()
publisher.OnSeek = func(seekTime time.Time) { publisher.OnSeek = func(seekTime time.Time) {
pullStartTime = seekTime pullStartTime = seekTime
pullJob.Args.Set(util.StartKey, pullStartTime.Local().Format(util.LocalTimeFormat)) if util.UnixTimeReg.MatchString(pullJob.Args.Get(util.EndKey)) {
pullJob.Args.Set(util.StartKey, strconv.FormatInt(pullStartTime.Unix(), 10))
} else {
pullJob.Args.Set(util.EndKey, pullStartTime.Local().Format(util.LocalTimeFormat))
}
} }
for i, stream := range p.Streams { for i, stream := range p.Streams {
tsOffset = ts tsOffset = ts

View File

@@ -3,7 +3,7 @@ import "google/api/annotations.proto";
//import "google/protobuf/empty.proto"; //import "google/protobuf/empty.proto";
import "global.proto"; import "global.proto";
package rtmp; package rtmp;
option go_package="m7s.live/m7s/v5/plugin/rtmp/pb"; option go_package="m7s.live/pro/plugin/rtmp/pb";
service api { service api {
rpc PushOut (PushRequest) returns (global.SuccessResponse) { rpc PushOut (PushRequest) returns (global.SuccessResponse) {

View File

@@ -3,7 +3,7 @@ import "google/api/annotations.proto";
//import "google/protobuf/empty.proto"; //import "google/protobuf/empty.proto";
import "global.proto"; import "global.proto";
package sei; package sei;
option go_package="m7s.live/m7s/v5/plugin/sei/pb"; option go_package="m7s.live/pro/plugin/sei/pb";
service api { service api {
rpc insert (InsertRequest) returns (global.SuccessResponse) { rpc insert (InsertRequest) returns (global.SuccessResponse) {

View File

@@ -3,7 +3,7 @@ import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
import "global.proto"; import "global.proto";
package stress; package stress;
option go_package="m7s.live/m7s/v5/plugin/stress/pb"; option go_package="m7s.live/pro/plugin/stress/pb";
service api { service api {
rpc PushRTMP (PushRequest) returns (m7s.SuccessResponse) { rpc PushRTMP (PushRequest) returns (m7s.SuccessResponse) {

View File

@@ -4,7 +4,7 @@ import "google/protobuf/empty.proto";
import "global.proto"; import "global.proto";
package transcode; package transcode;
option go_package="m7s.live/m7s/v5/plugin/transcode/pb"; option go_package="m7s.live/pro/plugin/transcode/pb";
service api { service api {
rpc launch (TransRequest) returns (global.SuccessResponse) { rpc launch (TransRequest) returns (global.SuccessResponse) {

View File

@@ -682,7 +682,7 @@ func (p *Publisher) Resume() {
} }
func (p *Publisher) Seek(ts time.Time) { func (p *Publisher) Seek(ts time.Time) {
p.Info("seek", "offset", ts) p.Info("seek", "time", ts)
if p.OnSeek != nil { if p.OnSeek != nil {
p.OnSeek(ts) p.OnSeek(ts)
} }