mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 03:25:56 +08:00
Compare commits
3 Commits
48a17138d0
...
dev
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a1b40bd7b8 | ||
![]() |
827f6eac8d | ||
![]() |
ee056144a8 |
6
api.go
6
api.go
@@ -184,7 +184,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
|
||||
if record.StreamPath == req.StreamPath {
|
||||
recordings = append(recordings, &pb.RecordingDetail{
|
||||
FilePath: record.RecConf.FilePath,
|
||||
Mode: record.Mode,
|
||||
Mode: record.RecConf.Mode,
|
||||
Fragment: durationpb.New(record.RecConf.Fragment),
|
||||
Append: record.RecConf.Append,
|
||||
PluginName: record.Plugin.Meta.Name,
|
||||
@@ -554,7 +554,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
|
||||
for record := range s.Records.SafeRange {
|
||||
recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{
|
||||
FilePath: record.RecConf.FilePath,
|
||||
Mode: record.Mode,
|
||||
Mode: record.RecConf.Mode,
|
||||
Fragment: durationpb.New(record.RecConf.Fragment),
|
||||
Append: record.RecConf.Append,
|
||||
PluginName: record.Plugin.Meta.Name,
|
||||
@@ -750,7 +750,7 @@ func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp
|
||||
offset := (req.PageNum - 1) * req.PageSize // 计算偏移量
|
||||
var totalCount int64 //总条数
|
||||
|
||||
var result []*RecordStream
|
||||
var result []*EventRecordStream
|
||||
query := s.DB.Model(&RecordStream{})
|
||||
if strings.Contains(req.StreamPath, "*") {
|
||||
query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%"))
|
||||
|
@@ -664,7 +664,7 @@ message ReqRecordList {
|
||||
string end = 4;
|
||||
uint32 pageNum = 5;
|
||||
uint32 pageSize = 6;
|
||||
string mode = 7;
|
||||
string eventId = 7;
|
||||
string type = 8;
|
||||
string eventLevel = 9;
|
||||
}
|
||||
|
@@ -16,6 +16,9 @@ const (
|
||||
RelayModeRelay = "relay"
|
||||
RelayModeMix = "mix"
|
||||
|
||||
RecordModeAuto RecordMode = "auto"
|
||||
RecordModeEvent RecordMode = "event"
|
||||
|
||||
HookOnServerKeepAlive HookType = "server_keep_alive"
|
||||
HookOnPublishStart HookType = "publish_start"
|
||||
HookOnPublishEnd HookType = "publish_end"
|
||||
@@ -29,11 +32,16 @@ const (
|
||||
HookOnRecordEnd HookType = "record_end"
|
||||
HookOnTransformStart HookType = "transform_start"
|
||||
HookOnTransformEnd HookType = "transform_end"
|
||||
|
||||
EventLevelLow EventLevel = "low"
|
||||
EventLevelHigh EventLevel = "high"
|
||||
)
|
||||
|
||||
type (
|
||||
HookType string
|
||||
Publish struct {
|
||||
EventLevel = string
|
||||
RecordMode = string
|
||||
HookType string
|
||||
Publish struct {
|
||||
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
|
||||
PubAudio bool `default:"true" desc:"是否发布音频"`
|
||||
PubVideo bool `default:"true" desc:"是否发布视频"`
|
||||
@@ -84,11 +92,21 @@ type (
|
||||
Proxy string `desc:"代理地址"` // 代理地址
|
||||
Header HTTPValues
|
||||
}
|
||||
RecordEvent struct {
|
||||
EventId string
|
||||
BeforeDuration uint32 `json:"beforeDuration" desc:"事件前缓存时长" gorm:"comment:事件前缓存时长;default:30000"`
|
||||
AfterDuration uint32 `json:"afterDuration" desc:"事件后缓存时长" gorm:"comment:事件后缓存时长;default:30000"`
|
||||
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"`
|
||||
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件,无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
|
||||
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
|
||||
}
|
||||
Record struct {
|
||||
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
|
||||
FilePath string `desc:"录制文件路径"` // 录制文件路径
|
||||
Fragment time.Duration `desc:"分片时长"` // 分片时长
|
||||
Append bool `desc:"是否追加录制"` // 是否追加录制
|
||||
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式,event=事件录像模式;default:'auto'"`
|
||||
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
|
||||
FilePath string `desc:"录制文件路径"` // 录制文件路径
|
||||
Fragment time.Duration `desc:"分片时长"` // 分片时长
|
||||
Append bool `desc:"是否追加录制"` // 是否追加录制
|
||||
Event *RecordEvent `json:"event" desc:"事件录像配置" gorm:"-"` // 事件录像配置
|
||||
}
|
||||
TransfromOutput struct {
|
||||
Target string `desc:"转码目标"` // 转码目标
|
||||
|
133
pkg/port.go
133
pkg/port.go
@@ -13,6 +13,7 @@ type (
|
||||
Port struct {
|
||||
Protocol string
|
||||
Ports [2]int
|
||||
Map [2]int // 映射端口范围,通常用于 NAT 或端口转发
|
||||
}
|
||||
IPort interface {
|
||||
IsTCP() bool
|
||||
@@ -22,10 +23,23 @@ type (
|
||||
)
|
||||
|
||||
func (p Port) String() string {
|
||||
var result string
|
||||
if p.Ports[0] == p.Ports[1] {
|
||||
return p.Protocol + ":" + strconv.Itoa(p.Ports[0])
|
||||
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0])
|
||||
} else {
|
||||
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
|
||||
}
|
||||
return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
|
||||
|
||||
// 如果有端口映射,添加映射信息
|
||||
if p.HasMapping() {
|
||||
if p.Map[0] == p.Map[1] {
|
||||
result += ":" + strconv.Itoa(p.Map[0])
|
||||
} else {
|
||||
result += ":" + strconv.Itoa(p.Map[0]) + "-" + strconv.Itoa(p.Map[1])
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (p Port) IsTCP() bool {
|
||||
@@ -40,6 +54,36 @@ func (p Port) IsRange() bool {
|
||||
return p.Ports[0] != p.Ports[1]
|
||||
}
|
||||
|
||||
func (p Port) HasMapping() bool {
|
||||
return p.Map[0] > 0 || p.Map[1] > 0
|
||||
}
|
||||
|
||||
func (p Port) IsRangeMapping() bool {
|
||||
return p.HasMapping() && p.Map[0] != p.Map[1]
|
||||
}
|
||||
|
||||
// ParsePort2 解析端口配置字符串并返回对应的端口类型实例
|
||||
// 根据协议类型和端口范围返回不同的类型:
|
||||
// - TCP单端口:返回 TCPPort
|
||||
// - TCP端口范围:返回 TCPRangePort
|
||||
// - UDP单端口:返回 UDPPort
|
||||
// - UDP端口范围:返回 UDPRangePort
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// conf - 端口配置字符串,格式:protocol:port 或 protocol:port1-port2
|
||||
//
|
||||
// 返回值:
|
||||
//
|
||||
// ret - 端口实例 (TCPPort/UDPPort/TCPRangePort/UDPRangePort)
|
||||
// err - 解析错误
|
||||
//
|
||||
// 示例:
|
||||
//
|
||||
// ParsePort2("tcp:8080") // 返回 TCPPort(8080)
|
||||
// ParsePort2("tcp:8080-8090") // 返回 TCPRangePort([2]int{8080, 8090})
|
||||
// ParsePort2("udp:5000") // 返回 UDPPort(5000)
|
||||
// ParsePort2("udp:5000-5010") // 返回 UDPRangePort([2]int{5000, 5010})
|
||||
func ParsePort2(conf string) (ret any, err error) {
|
||||
var port Port
|
||||
port, err = ParsePort(conf)
|
||||
@@ -58,10 +102,84 @@ func ParsePort2(conf string) (ret any, err error) {
|
||||
return UDPPort(port.Ports[0]), nil
|
||||
}
|
||||
|
||||
// ParsePort 解析端口配置字符串为 Port 结构体
|
||||
// 支持协议前缀、端口号/端口范围以及端口映射的解析
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// conf - 端口配置字符串,格式:
|
||||
// - "protocol:port" 单端口,如 "tcp:8080"
|
||||
// - "protocol:port1-port2" 端口范围,如 "tcp:8080-8090"
|
||||
// - "protocol:port:mapPort" 单端口映射,如 "tcp:8080:9090"
|
||||
// - "protocol:port:mapPort1-mapPort2" 单端口映射到端口范围,如 "tcp:8080:9000-9010"
|
||||
// - "protocol:port1-port2:mapPort1-mapPort2" 端口范围映射,如 "tcp:8080-8090:9000-9010"
|
||||
//
|
||||
// 返回值:
|
||||
//
|
||||
// ret - Port 结构体,包含协议、端口和映射端口信息
|
||||
// err - 解析错误
|
||||
//
|
||||
// 注意:
|
||||
// - 如果端口范围中 min > max,会自动交换顺序
|
||||
// - 单端口时,Ports[0] 和 Ports[1] 值相同
|
||||
// - 端口映射时,Map[0] 和 Map[1] 存储映射的目标端口范围
|
||||
// - 单个映射端口时,Map[0] 和 Map[1] 值相同
|
||||
//
|
||||
// 示例:
|
||||
//
|
||||
// ParsePort("tcp:8080") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{0, 0}}
|
||||
// ParsePort("tcp:8080-8090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{0, 0}}
|
||||
// ParsePort("tcp:8080:9090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9090, 9090}}
|
||||
// ParsePort("tcp:8080:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9000, 9010}}
|
||||
// ParsePort("tcp:8080-8090:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{9000, 9010}}
|
||||
// ParsePort("udp:5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5000}, Map:[2]int{0, 0}}
|
||||
// ParsePort("udp:5010-5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5010}, Map:[2]int{0, 0}}
|
||||
func ParsePort(conf string) (ret Port, err error) {
|
||||
var port string
|
||||
var port, mapPort string
|
||||
var min, max int
|
||||
ret.Protocol, port, _ = strings.Cut(conf, ":")
|
||||
|
||||
// 按冒号分割,支持端口映射
|
||||
parts := strings.Split(conf, ":")
|
||||
if len(parts) < 2 || len(parts) > 3 {
|
||||
err = strconv.ErrSyntax
|
||||
return
|
||||
}
|
||||
|
||||
ret.Protocol = parts[0]
|
||||
port = parts[1]
|
||||
|
||||
// 处理端口映射
|
||||
if len(parts) == 3 {
|
||||
mapPort = parts[2]
|
||||
// 解析映射端口,支持单端口和端口范围
|
||||
if mapRange := strings.Split(mapPort, "-"); len(mapRange) == 2 {
|
||||
// 映射端口范围
|
||||
var mapMin, mapMax int
|
||||
mapMin, err = strconv.Atoi(mapRange[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mapMax, err = strconv.Atoi(mapRange[1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if mapMin < mapMax {
|
||||
ret.Map[0], ret.Map[1] = mapMin, mapMax
|
||||
} else {
|
||||
ret.Map[0], ret.Map[1] = mapMax, mapMin
|
||||
}
|
||||
} else {
|
||||
// 单个映射端口
|
||||
var mapPortNum int
|
||||
mapPortNum, err = strconv.Atoi(mapPort)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ret.Map[0], ret.Map[1] = mapPortNum, mapPortNum
|
||||
}
|
||||
}
|
||||
|
||||
// 处理端口范围
|
||||
if r := strings.Split(port, "-"); len(r) == 2 {
|
||||
min, err = strconv.Atoi(r[0])
|
||||
if err != nil {
|
||||
@@ -76,7 +194,12 @@ func ParsePort(conf string) (ret Port, err error) {
|
||||
} else {
|
||||
ret.Ports[0], ret.Ports[1] = max, min
|
||||
}
|
||||
} else if p, err := strconv.Atoi(port); err == nil {
|
||||
} else {
|
||||
var p int
|
||||
p, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ret.Ports[0], ret.Ports[1] = p, p
|
||||
}
|
||||
return
|
||||
|
370
pkg/port_test.go
Normal file
370
pkg/port_test.go
Normal file
@@ -0,0 +1,370 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParsePort(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expected Port
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
input: "tcp:8080",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
input: "tcp:8080-8090",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围(反序)",
|
||||
input: "tcp:8090-8080",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射到单端口",
|
||||
input: "tcp:8080:9090",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9090, 9090},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射到端口范围",
|
||||
input: "tcp:8080:9000-9010",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围映射到端口范围",
|
||||
input: "tcp:8080-8090:9000-9010",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP单端口",
|
||||
input: "udp:5000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围",
|
||||
input: "udp:5000-5010",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5010},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口映射",
|
||||
input: "udp:5000:6000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{6000, 6000},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围映射(映射范围反序)",
|
||||
input: "udp:5000-5010:6010-6000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5010},
|
||||
Map: [2]int{6000, 6010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
// 错误情况
|
||||
{
|
||||
name: "缺少协议",
|
||||
input: "8080",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "过多冒号",
|
||||
input: "tcp:8080:9090:extra",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效端口号",
|
||||
input: "tcp:abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效映射端口号",
|
||||
input: "tcp:8080:abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效端口范围",
|
||||
input: "tcp:8080-abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效映射端口范围",
|
||||
input: "tcp:8080:9000-abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := ParsePort(tt.input)
|
||||
|
||||
if tt.hasError {
|
||||
if err == nil {
|
||||
t.Errorf("期望有错误,但没有错误")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("意外的错误: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if result.Protocol != tt.expected.Protocol {
|
||||
t.Errorf("协议不匹配: 期望 %s, 得到 %s", tt.expected.Protocol, result.Protocol)
|
||||
}
|
||||
|
||||
if result.Ports != tt.expected.Ports {
|
||||
t.Errorf("端口不匹配: 期望 %v, 得到 %v", tt.expected.Ports, result.Ports)
|
||||
}
|
||||
|
||||
if result.Map != tt.expected.Map {
|
||||
t.Errorf("映射端口不匹配: 期望 %v, 得到 %v", tt.expected.Map, result.Map)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortMethods(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
port Port
|
||||
expectTCP bool
|
||||
expectUDP bool
|
||||
expectRange bool
|
||||
expectMapping bool
|
||||
expectRangeMap bool
|
||||
expectString string
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: false,
|
||||
expectMapping: false,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080",
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: true,
|
||||
expectMapping: false,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080-8090",
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9090, 9090},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: false,
|
||||
expectMapping: true,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080:9090",
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围映射",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: true,
|
||||
expectMapping: true,
|
||||
expectRangeMap: true,
|
||||
expectString: "tcp:8080-8090:9000-9010",
|
||||
},
|
||||
{
|
||||
name: "UDP单端口映射到端口范围",
|
||||
port: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{6000, 6010},
|
||||
},
|
||||
expectTCP: false,
|
||||
expectUDP: true,
|
||||
expectRange: false,
|
||||
expectMapping: true,
|
||||
expectRangeMap: true,
|
||||
expectString: "udp:5000:6000-6010",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.port.IsTCP() != tt.expectTCP {
|
||||
t.Errorf("IsTCP(): 期望 %v, 得到 %v", tt.expectTCP, tt.port.IsTCP())
|
||||
}
|
||||
|
||||
if tt.port.IsUDP() != tt.expectUDP {
|
||||
t.Errorf("IsUDP(): 期望 %v, 得到 %v", tt.expectUDP, tt.port.IsUDP())
|
||||
}
|
||||
|
||||
if tt.port.IsRange() != tt.expectRange {
|
||||
t.Errorf("IsRange(): 期望 %v, 得到 %v", tt.expectRange, tt.port.IsRange())
|
||||
}
|
||||
|
||||
if tt.port.HasMapping() != tt.expectMapping {
|
||||
t.Errorf("HasMapping(): 期望 %v, 得到 %v", tt.expectMapping, tt.port.HasMapping())
|
||||
}
|
||||
|
||||
if tt.port.IsRangeMapping() != tt.expectRangeMap {
|
||||
t.Errorf("IsRangeMapping(): 期望 %v, 得到 %v", tt.expectRangeMap, tt.port.IsRangeMapping())
|
||||
}
|
||||
|
||||
if tt.port.String() != tt.expectString {
|
||||
t.Errorf("String(): 期望 %s, 得到 %s", tt.expectString, tt.port.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePort2(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expectedType string
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
input: "tcp:8080",
|
||||
expectedType: "TCPPort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
input: "tcp:8080-8090",
|
||||
expectedType: "TCPRangePort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP单端口",
|
||||
input: "udp:5000",
|
||||
expectedType: "UDPPort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围",
|
||||
input: "udp:5000-5010",
|
||||
expectedType: "UDPRangePort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "无效输入",
|
||||
input: "invalid",
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := ParsePort2(tt.input)
|
||||
|
||||
if tt.hasError {
|
||||
if err == nil {
|
||||
t.Errorf("期望有错误,但没有错误")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("意外的错误: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch tt.expectedType {
|
||||
case "TCPPort":
|
||||
if _, ok := result.(TCPPort); !ok {
|
||||
t.Errorf("期望类型 TCPPort, 得到 %T", result)
|
||||
}
|
||||
case "TCPRangePort":
|
||||
if _, ok := result.(TCPRangePort); !ok {
|
||||
t.Errorf("期望类型 TCPRangePort, 得到 %T", result)
|
||||
}
|
||||
case "UDPPort":
|
||||
if _, ok := result.(UDPPort); !ok {
|
||||
t.Errorf("期望类型 UDPPort, 得到 %T", result)
|
||||
}
|
||||
case "UDPRangePort":
|
||||
if _, ok := result.(UDPRangePort); !ok {
|
||||
t.Errorf("期望类型 UDPRangePort, 得到 %T", result)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -156,6 +156,10 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
|
||||
p.disable(fmt.Sprintf("auto migrate record stream failed %v", err))
|
||||
return
|
||||
}
|
||||
if err = p.DB.AutoMigrate(&EventRecordStream{}); err != nil {
|
||||
p.disable(fmt.Sprintf("auto migrate event record stream failed %v", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := s.AddTask(instance).WaitStarted(); err != nil {
|
||||
p.disable(instance.StopReason().Error())
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"m7s.live/v5"
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/config"
|
||||
@@ -144,7 +143,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
|
||||
|
||||
type Recorder struct {
|
||||
m7s.DefaultRecorder
|
||||
stream m7s.RecordStream
|
||||
}
|
||||
|
||||
var CustomFileName = func(job *m7s.RecordJob) string {
|
||||
@@ -155,48 +153,21 @@ var CustomFileName = func(job *m7s.RecordJob) string {
|
||||
}
|
||||
|
||||
func (r *Recorder) createStream(start time.Time) (err error) {
|
||||
recordJob := &r.RecordJob
|
||||
sub := recordJob.Subscriber
|
||||
r.stream = m7s.RecordStream{
|
||||
StartTime: start,
|
||||
StreamPath: sub.StreamPath,
|
||||
FilePath: CustomFileName(&r.RecordJob),
|
||||
EventId: recordJob.EventId,
|
||||
EventDesc: recordJob.EventDesc,
|
||||
EventName: recordJob.EventName,
|
||||
EventLevel: recordJob.EventLevel,
|
||||
BeforeDuration: recordJob.BeforeDuration,
|
||||
AfterDuration: recordJob.AfterDuration,
|
||||
Mode: recordJob.Mode,
|
||||
Type: "flv",
|
||||
}
|
||||
dir := filepath.Dir(r.stream.FilePath)
|
||||
if err = os.MkdirAll(dir, 0755); err != nil {
|
||||
return
|
||||
}
|
||||
if sub.Publisher.HasAudioTrack() {
|
||||
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
|
||||
}
|
||||
if sub.Publisher.HasVideoTrack() {
|
||||
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
|
||||
}
|
||||
if recordJob.Plugin.DB != nil {
|
||||
recordJob.Plugin.DB.Save(&r.stream)
|
||||
}
|
||||
return
|
||||
return r.CreateStream(start, CustomFileName)
|
||||
}
|
||||
|
||||
func (r *Recorder) writeTailer(end time.Time) {
|
||||
if r.stream.EndTime.After(r.stream.StartTime) {
|
||||
if r.Event.EndTime.After(r.Event.StartTime) {
|
||||
return
|
||||
}
|
||||
r.stream.EndTime = end
|
||||
r.Event.EndTime = end
|
||||
if r.RecordJob.Plugin.DB != nil {
|
||||
r.RecordJob.Plugin.DB.Save(&r.stream)
|
||||
writeMetaTagQueueTask.AddTask(&eventRecordCheck{
|
||||
DB: r.RecordJob.Plugin.DB,
|
||||
streamPath: r.stream.StreamPath,
|
||||
})
|
||||
if r.RecordJob.Event != nil {
|
||||
r.RecordJob.Plugin.DB.Save(&r.Event)
|
||||
} else {
|
||||
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
|
||||
}
|
||||
writeMetaTagQueueTask.AddTask(m7s.NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,40 +175,6 @@ func (r *Recorder) Dispose() {
|
||||
r.writeTailer(time.Now())
|
||||
}
|
||||
|
||||
type eventRecordCheck struct {
|
||||
task.Task
|
||||
DB *gorm.DB
|
||||
streamPath string
|
||||
}
|
||||
|
||||
func (t *eventRecordCheck) Run() (err error) {
|
||||
var eventRecordStreams []m7s.RecordStream
|
||||
queryRecord := m7s.RecordStream{
|
||||
EventLevel: m7s.EventLevelHigh,
|
||||
Mode: m7s.RecordModeEvent,
|
||||
Type: "flv",
|
||||
}
|
||||
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
|
||||
if len(eventRecordStreams) > 0 {
|
||||
for _, recordStream := range eventRecordStreams {
|
||||
var unimportantEventRecordStreams []m7s.RecordStream
|
||||
queryRecord.EventLevel = m7s.EventLevelLow
|
||||
query := `(start_time BETWEEN ? AND ?)
|
||||
OR (end_time BETWEEN ? AND ?)
|
||||
OR (? BETWEEN start_time AND end_time)
|
||||
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
|
||||
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
|
||||
if len(unimportantEventRecordStreams) > 0 {
|
||||
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
|
||||
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
|
||||
t.DB.Save(&unimportantEventRecordStream)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Recorder) Run() (err error) {
|
||||
var file *os.File
|
||||
var filepositions []uint64
|
||||
@@ -248,14 +185,14 @@ func (r *Recorder) Run() (err error) {
|
||||
suber := ctx.Subscriber
|
||||
noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append
|
||||
startTime := time.Now()
|
||||
if ctx.BeforeDuration > 0 {
|
||||
startTime = startTime.Add(-ctx.BeforeDuration)
|
||||
if ctx.Event.BeforeDuration > 0 {
|
||||
startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
|
||||
}
|
||||
if err = r.createStream(startTime); err != nil {
|
||||
return
|
||||
}
|
||||
if noFragment {
|
||||
file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
|
||||
file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -291,7 +228,7 @@ func (r *Recorder) Run() (err error) {
|
||||
} else if ctx.RecConf.Fragment == 0 {
|
||||
_, err = file.Write(FLVHead)
|
||||
} else {
|
||||
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
|
||||
if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
|
||||
return
|
||||
}
|
||||
_, err = file.Write(FLVHead)
|
||||
@@ -307,7 +244,7 @@ func (r *Recorder) Run() (err error) {
|
||||
if err = r.createStream(time.Now()); err != nil {
|
||||
return
|
||||
}
|
||||
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
|
||||
if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
|
||||
return
|
||||
}
|
||||
_, err = file.Write(FLVHead)
|
||||
|
@@ -2,16 +2,13 @@ package hls
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"m7s.live/v5"
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/codec"
|
||||
"m7s.live/v5/pkg/config"
|
||||
"m7s.live/v5/pkg/task"
|
||||
"m7s.live/v5/pkg/util"
|
||||
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
|
||||
)
|
||||
@@ -22,7 +19,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
|
||||
|
||||
type Recorder struct {
|
||||
m7s.DefaultRecorder
|
||||
stream m7s.RecordStream
|
||||
ts *TsInFile
|
||||
pesAudio *mpegts.MpegtsPESFrame
|
||||
pesVideo *mpegts.MpegtsPESFrame
|
||||
@@ -39,81 +35,11 @@ var CustomFileName = func(job *m7s.RecordJob) string {
|
||||
}
|
||||
|
||||
func (r *Recorder) createStream(start time.Time) (err error) {
|
||||
recordJob := &r.RecordJob
|
||||
sub := recordJob.Subscriber
|
||||
r.stream = m7s.RecordStream{
|
||||
StartTime: start,
|
||||
StreamPath: sub.StreamPath,
|
||||
FilePath: CustomFileName(&r.RecordJob),
|
||||
EventId: recordJob.EventId,
|
||||
EventDesc: recordJob.EventDesc,
|
||||
EventName: recordJob.EventName,
|
||||
EventLevel: recordJob.EventLevel,
|
||||
BeforeDuration: recordJob.BeforeDuration,
|
||||
AfterDuration: recordJob.AfterDuration,
|
||||
Mode: recordJob.Mode,
|
||||
Type: "hls",
|
||||
}
|
||||
dir := filepath.Dir(r.stream.FilePath)
|
||||
dir = filepath.Clean(dir)
|
||||
if err = os.MkdirAll(dir, 0755); err != nil {
|
||||
r.Error("create directory failed", "err", err, "dir", dir)
|
||||
return
|
||||
}
|
||||
if sub.Publisher.HasAudioTrack() {
|
||||
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
|
||||
}
|
||||
if sub.Publisher.HasVideoTrack() {
|
||||
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
|
||||
}
|
||||
if recordJob.Plugin.DB != nil {
|
||||
recordJob.Plugin.DB.Save(&r.stream)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type eventRecordCheck struct {
|
||||
task.Task
|
||||
DB *gorm.DB
|
||||
streamPath string
|
||||
}
|
||||
|
||||
func (t *eventRecordCheck) Run() (err error) {
|
||||
var eventRecordStreams []m7s.RecordStream
|
||||
queryRecord := m7s.RecordStream{
|
||||
EventLevel: m7s.EventLevelHigh,
|
||||
Mode: m7s.RecordModeEvent,
|
||||
Type: "hls",
|
||||
}
|
||||
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
|
||||
if len(eventRecordStreams) > 0 {
|
||||
for _, recordStream := range eventRecordStreams {
|
||||
var unimportantEventRecordStreams []m7s.RecordStream
|
||||
queryRecord.EventLevel = m7s.EventLevelLow
|
||||
query := `(start_time BETWEEN ? AND ?)
|
||||
OR (end_time BETWEEN ? AND ?)
|
||||
OR (? BETWEEN start_time AND end_time)
|
||||
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
|
||||
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
|
||||
if len(unimportantEventRecordStreams) > 0 {
|
||||
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
|
||||
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
|
||||
t.DB.Save(&unimportantEventRecordStream)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return r.CreateStream(start, CustomFileName)
|
||||
}
|
||||
|
||||
func (r *Recorder) writeTailer(end time.Time) {
|
||||
if r.stream.EndTime.After(r.stream.StartTime) {
|
||||
return
|
||||
}
|
||||
r.stream.EndTime = end
|
||||
if r.RecordJob.Plugin.DB != nil {
|
||||
r.RecordJob.Plugin.DB.Save(&r.stream)
|
||||
}
|
||||
r.WriteTail(end, nil)
|
||||
}
|
||||
|
||||
func (r *Recorder) Dispose() {
|
||||
@@ -131,9 +57,9 @@ func (r *Recorder) createNewTs() {
|
||||
r.ts.Close()
|
||||
}
|
||||
var err error
|
||||
r.ts, err = NewTsInFile(r.stream.FilePath)
|
||||
r.ts, err = NewTsInFile(r.Event.FilePath)
|
||||
if err != nil {
|
||||
r.Error("create ts file failed", "err", err, "path", r.stream.FilePath)
|
||||
r.Error("create ts file failed", "err", err, "path", r.Event.FilePath)
|
||||
return
|
||||
}
|
||||
if oldPMT.Len() > 0 {
|
||||
@@ -175,8 +101,8 @@ func (r *Recorder) Run() (err error) {
|
||||
ctx := &r.RecordJob
|
||||
suber := ctx.Subscriber
|
||||
startTime := time.Now()
|
||||
if ctx.BeforeDuration > 0 {
|
||||
startTime = startTime.Add(-ctx.BeforeDuration)
|
||||
if ctx.Event.BeforeDuration > 0 {
|
||||
startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
|
||||
}
|
||||
|
||||
// 创建第一个片段记录
|
||||
|
@@ -165,10 +165,9 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// 构建查询条件,查找指定时间范围内的录制记录
|
||||
queryRecord := m7s.RecordStream{
|
||||
Mode: m7s.RecordModeAuto,
|
||||
Type: "mp4",
|
||||
}
|
||||
p.DB.Where(&queryRecord).Find(&streams, "end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
|
||||
p.DB.Where(&queryRecord).Find(&streams, "event_id=0 AND end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
|
||||
|
||||
// 创建 MP4 混合器
|
||||
muxer := mp4.NewMuxer(flag)
|
||||
@@ -533,42 +532,44 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
|
||||
Append: false,
|
||||
Fragment: 0,
|
||||
FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")),
|
||||
Mode: config.RecordModeEvent,
|
||||
Event: &config.RecordEvent{
|
||||
EventId: req.EventId,
|
||||
EventLevel: req.EventLevel,
|
||||
EventName: req.EventName,
|
||||
EventDesc: req.EventDesc,
|
||||
BeforeDuration: uint32(beforeDuration / time.Millisecond),
|
||||
AfterDuration: uint32(afterDuration / time.Millisecond),
|
||||
},
|
||||
}
|
||||
//recordJob := recorder.GetRecordJob()
|
||||
var subconfig config.Subscribe
|
||||
defaults.SetDefaults(&subconfig)
|
||||
subconfig.BufferTime = beforeDuration
|
||||
recordJob := p.Record(stream, recordConf, &subconfig)
|
||||
recordJob.EventId = req.EventId
|
||||
recordJob.EventLevel = req.EventLevel
|
||||
recordJob.EventName = req.EventName
|
||||
recordJob.EventDesc = req.EventDesc
|
||||
recordJob.AfterDuration = afterDuration
|
||||
recordJob.BeforeDuration = beforeDuration
|
||||
recordJob.Mode = m7s.RecordModeEvent
|
||||
p.Record(stream, recordConf, &subconfig)
|
||||
}
|
||||
} else {
|
||||
if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间
|
||||
tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration
|
||||
if tmpJob.Event != nil { //当前有事件录像正在录制,则更新该录像的结束时间
|
||||
tmpJob.Event.AfterDuration = tmpJob.Subscriber.VideoReader.AbsTime + uint32(afterDuration/time.Millisecond)
|
||||
if p.DB != nil {
|
||||
p.DB.Save(&tmpJob.Event)
|
||||
}
|
||||
} else { //当前有自动录像正在录制,则生成事件录像的记录,而不去生成事件录像的文件
|
||||
recordStream := &m7s.RecordStream{
|
||||
StreamPath: req.StreamPath,
|
||||
newEvent := &config.RecordEvent{
|
||||
EventId: req.EventId,
|
||||
EventLevel: req.EventLevel,
|
||||
EventDesc: req.EventDesc,
|
||||
EventName: req.EventName,
|
||||
Mode: m7s.RecordModeEvent,
|
||||
BeforeDuration: beforeDuration,
|
||||
AfterDuration: afterDuration,
|
||||
Type: "mp4",
|
||||
EventDesc: req.EventDesc,
|
||||
BeforeDuration: uint32(beforeDuration / time.Millisecond),
|
||||
AfterDuration: uint32(afterDuration / time.Millisecond),
|
||||
}
|
||||
now := time.Now()
|
||||
startTime := now.Add(-beforeDuration)
|
||||
endTime := now.Add(afterDuration)
|
||||
recordStream.StartTime = startTime
|
||||
recordStream.EndTime = endTime
|
||||
if p.DB != nil {
|
||||
p.DB.Save(&recordStream)
|
||||
p.DB.Save(&m7s.EventRecordStream{
|
||||
RecordEvent: newEvent,
|
||||
RecordStream: m7s.RecordStream{
|
||||
StreamPath: req.StreamPath,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -91,9 +91,6 @@ func (p *DeleteRecordTask) deleteOldestFile() {
|
||||
}
|
||||
for _, filePath := range filePaths {
|
||||
for p.getDiskOutOfSpace(filePath) {
|
||||
queryRecord := m7s.RecordStream{
|
||||
EventLevel: m7s.EventLevelLow, // 查询条件:event_level = 1,非重要事件
|
||||
}
|
||||
var eventRecords []m7s.RecordStream
|
||||
// 使用不同的方法进行路径匹配,避免ESCAPE语法问题
|
||||
// 解决方案:用MySQL能理解的简单方式匹配路径前缀
|
||||
@@ -103,7 +100,7 @@ func (p *DeleteRecordTask) deleteOldestFile() {
|
||||
searchPattern := basePath + "%"
|
||||
p.Info("deleteOldestFile", "searching with path pattern", searchPattern)
|
||||
|
||||
err := p.DB.Where(&queryRecord).Where("end_time IS NOT NULL").
|
||||
err := p.DB.Where("event_id=0 AND end_time IS NOT NULL").
|
||||
Where("file_path LIKE ?", searchPattern).
|
||||
Order("end_time ASC").Find(&eventRecords).Error
|
||||
if err == nil {
|
||||
@@ -149,14 +146,11 @@ func (t *DeleteRecordTask) Tick(any) {
|
||||
if t.RecordFileExpireDays <= 0 {
|
||||
return
|
||||
}
|
||||
//搜索event_records表中event_level值为1的(非重要)数据,并将其create_time与当前时间比对,大于RecordFileExpireDays则进行删除,数据库标记is_delete为1,磁盘上删除录像文件
|
||||
//搜索event_records表中event_id值为0的(非事件)录像,并将其create_time与当前时间比对,大于RecordFileExpireDays则进行删除,数据库标记is_delete为1,磁盘上删除录像文件
|
||||
var eventRecords []m7s.RecordStream
|
||||
expireTime := time.Now().AddDate(0, 0, -t.RecordFileExpireDays)
|
||||
t.Debug("RecordFileExpireDays is set to auto delete oldestfile", "expireTime", expireTime.Format("2006-01-02 15:04:05"))
|
||||
queryRecord := m7s.RecordStream{
|
||||
EventLevel: m7s.EventLevelLow, // 查询条件:event_level = low,非重要事件
|
||||
}
|
||||
err := t.DB.Where(&queryRecord).Find(&eventRecords, "end_time < ? AND end_time IS NOT NULL", expireTime).Error
|
||||
err := t.DB.Find(&eventRecords, "event_id=0 AND end_time < ? AND end_time IS NOT NULL", expireTime).Error
|
||||
if err == nil {
|
||||
for _, record := range eventRecords {
|
||||
t.Info("RecordFileExpireDays is set to auto delete oldestfile", "ID", record.ID, "create time", record.EndTime, "filepath", record.FilePath)
|
||||
|
@@ -7,7 +7,6 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
m7s "m7s.live/v5"
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/codec"
|
||||
@@ -107,39 +106,6 @@ func (t *writeTrailerTask) Run() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
type eventRecordCheck struct {
|
||||
task.Task
|
||||
DB *gorm.DB
|
||||
streamPath string
|
||||
}
|
||||
|
||||
func (t *eventRecordCheck) Run() (err error) {
|
||||
var eventRecordStreams []m7s.RecordStream
|
||||
queryRecord := m7s.RecordStream{
|
||||
EventLevel: m7s.EventLevelHigh,
|
||||
Mode: m7s.RecordModeEvent,
|
||||
Type: "mp4",
|
||||
StreamPath: t.streamPath,
|
||||
}
|
||||
t.DB.Where(&queryRecord).Find(&eventRecordStreams) //搜索事件录像,且为重要事件(无法自动删除)
|
||||
if len(eventRecordStreams) > 0 {
|
||||
for _, recordStream := range eventRecordStreams {
|
||||
var unimportantEventRecordStreams []m7s.RecordStream
|
||||
queryRecord.EventLevel = m7s.EventLevelLow
|
||||
queryRecord.Mode = m7s.RecordModeAuto
|
||||
query := `start_time <= ? and end_time >= ?`
|
||||
t.DB.Where(&queryRecord).Where(query, recordStream.EndTime, recordStream.StartTime).Find(&unimportantEventRecordStreams)
|
||||
if len(unimportantEventRecordStreams) > 0 {
|
||||
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
|
||||
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
|
||||
t.DB.Save(&unimportantEventRecordStream)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func init() {
|
||||
m7s.Servers.AddTask(&writeTrailerQueueTask)
|
||||
}
|
||||
@@ -150,20 +116,12 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
|
||||
|
||||
type Recorder struct {
|
||||
m7s.DefaultRecorder
|
||||
muxer *Muxer
|
||||
file *os.File
|
||||
stream m7s.RecordStream
|
||||
muxer *Muxer
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (r *Recorder) writeTailer(end time.Time) {
|
||||
r.stream.EndTime = end
|
||||
if r.RecordJob.Plugin.DB != nil {
|
||||
r.RecordJob.Plugin.DB.Save(&r.stream)
|
||||
writeTrailerQueueTask.AddTask(&eventRecordCheck{
|
||||
DB: r.RecordJob.Plugin.DB,
|
||||
streamPath: r.stream.StreamPath,
|
||||
})
|
||||
}
|
||||
r.WriteTail(end, &writeTrailerQueueTask)
|
||||
writeTrailerQueueTask.AddTask(&writeTrailerTask{
|
||||
muxer: r.muxer,
|
||||
file: r.file,
|
||||
@@ -178,46 +136,7 @@ var CustomFileName = func(job *m7s.RecordJob) string {
|
||||
}
|
||||
|
||||
func (r *Recorder) createStream(start time.Time) (err error) {
|
||||
recordJob := &r.RecordJob
|
||||
sub := recordJob.Subscriber
|
||||
r.stream = m7s.RecordStream{
|
||||
StartTime: start,
|
||||
StreamPath: sub.StreamPath,
|
||||
FilePath: CustomFileName(&r.RecordJob),
|
||||
EventId: recordJob.EventId,
|
||||
EventDesc: recordJob.EventDesc,
|
||||
EventName: recordJob.EventName,
|
||||
EventLevel: recordJob.EventLevel,
|
||||
BeforeDuration: recordJob.BeforeDuration,
|
||||
AfterDuration: recordJob.AfterDuration,
|
||||
Mode: recordJob.Mode,
|
||||
Type: "mp4",
|
||||
}
|
||||
dir := filepath.Dir(r.stream.FilePath)
|
||||
if err = os.MkdirAll(dir, 0755); err != nil {
|
||||
return
|
||||
}
|
||||
r.file, err = os.Create(r.stream.FilePath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if recordJob.RecConf.Type == "fmp4" {
|
||||
r.stream.Type = "fmp4"
|
||||
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.stream.StreamPath)
|
||||
} else {
|
||||
r.muxer = NewMuxerWithStreamPath(0, r.stream.StreamPath)
|
||||
}
|
||||
r.muxer.WriteInitSegment(r.file)
|
||||
if sub.Publisher.HasAudioTrack() {
|
||||
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
|
||||
}
|
||||
if sub.Publisher.HasVideoTrack() {
|
||||
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
|
||||
}
|
||||
if recordJob.Plugin.DB != nil {
|
||||
recordJob.Plugin.DB.Save(&r.stream)
|
||||
}
|
||||
return
|
||||
return r.CreateStream(start, CustomFileName)
|
||||
}
|
||||
|
||||
func (r *Recorder) Dispose() {
|
||||
@@ -231,17 +150,28 @@ func (r *Recorder) Run() (err error) {
|
||||
sub := recordJob.Subscriber
|
||||
var audioTrack, videoTrack *Track
|
||||
startTime := time.Now()
|
||||
if recordJob.BeforeDuration > 0 {
|
||||
startTime = startTime.Add(-recordJob.BeforeDuration)
|
||||
if recordJob.Event != nil {
|
||||
startTime = startTime.Add(-time.Duration(recordJob.Event.BeforeDuration) * time.Millisecond)
|
||||
}
|
||||
err = r.createStream(startTime)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.file, err = os.Create(r.Event.FilePath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if recordJob.RecConf.Type == "fmp4" {
|
||||
r.Event.Type = "fmp4"
|
||||
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath)
|
||||
} else {
|
||||
r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath)
|
||||
}
|
||||
r.muxer.WriteInitSegment(r.file)
|
||||
var at, vt *pkg.AVTrack
|
||||
|
||||
checkEventRecordStop := func(absTime uint32) (err error) {
|
||||
if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.AfterDuration+recordJob.BeforeDuration {
|
||||
if absTime >= recordJob.Event.AfterDuration+recordJob.Event.BeforeDuration {
|
||||
r.RecordJob.Stop(task.ErrStopByUser)
|
||||
}
|
||||
return
|
||||
@@ -269,9 +199,9 @@ func (r *Recorder) Run() (err error) {
|
||||
}
|
||||
|
||||
return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error {
|
||||
r.stream.Duration = sub.AudioReader.AbsTime
|
||||
r.Event.Duration = sub.AudioReader.AbsTime
|
||||
if sub.VideoReader == nil {
|
||||
if recordJob.AfterDuration != 0 {
|
||||
if recordJob.Event != nil {
|
||||
err := checkEventRecordStop(sub.VideoReader.AbsTime)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -314,9 +244,9 @@ func (r *Recorder) Run() (err error) {
|
||||
Timestamp: uint32(dts),
|
||||
})
|
||||
}, func(video *rtmp.RTMPVideo) error {
|
||||
r.stream.Duration = sub.VideoReader.AbsTime
|
||||
r.Event.Duration = sub.VideoReader.AbsTime
|
||||
if sub.VideoReader.Value.IDR {
|
||||
if recordJob.AfterDuration != 0 {
|
||||
if recordJob.Event != nil {
|
||||
err := checkEventRecordStop(sub.VideoReader.AbsTime)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@@ -185,8 +185,6 @@ func (t *RecordRecoveryTask) recoverRecordFromFile(filePath string) error {
|
||||
FilePath: filePath,
|
||||
StreamPath: streamPath,
|
||||
Type: "mp4",
|
||||
Mode: m7s.RecordModeAuto, // 默认为自动录制模式
|
||||
EventLevel: m7s.EventLevelLow, // 默认为低级别事件
|
||||
}
|
||||
|
||||
// 设置开始和结束时间
|
||||
|
@@ -37,12 +37,13 @@ var (
|
||||
|
||||
type WebRTCPlugin struct {
|
||||
m7s.Plugin
|
||||
ICEServers []ICEServer `desc:"ice服务器配置"`
|
||||
Port string `default:"tcp:9000" desc:"监听端口"`
|
||||
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求
|
||||
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
|
||||
MimeType []string `desc:"MimeType过滤列表,为空则不过滤"` // MimeType过滤列表,支持的格式如:video/H264, audio/opus
|
||||
s SettingEngine
|
||||
ICEServers []ICEServer `desc:"ice服务器配置"`
|
||||
Port string `default:"tcp:9000" desc:"监听端口"`
|
||||
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求
|
||||
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
|
||||
MimeType []string `desc:"MimeType过滤列表,为空则不过滤"` // MimeType过滤列表,支持的格式如:video/H264, audio/opus
|
||||
s SettingEngine
|
||||
portMapping map[int]int // 内部端口到外部端口的映射
|
||||
}
|
||||
|
||||
func (p *WebRTCPlugin) RegisterHandler() map[string]http.HandlerFunc {
|
||||
@@ -306,50 +307,90 @@ func (p *WebRTCPlugin) initSettingEngine() error {
|
||||
|
||||
// configurePort 配置端口设置
|
||||
func (p *WebRTCPlugin) configurePort() error {
|
||||
ports, err := ParsePort2(p.Port)
|
||||
// 使用 ParsePort 而不是 ParsePort2 来获取端口映射信息
|
||||
portInfo, err := ParsePort(p.Port)
|
||||
if err != nil {
|
||||
p.Error("webrtc port config error", "error", err, "port", p.Port)
|
||||
return err
|
||||
}
|
||||
|
||||
switch v := ports.(type) {
|
||||
case TCPPort:
|
||||
tcpport := int(v)
|
||||
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: tcpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = tcpl.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener tcp", "error", err)
|
||||
// 初始化端口映射
|
||||
p.portMapping = make(map[int]int)
|
||||
|
||||
// 如果有端口映射,存储映射关系
|
||||
if portInfo.HasMapping() {
|
||||
if portInfo.IsRange() {
|
||||
// 端口范围映射
|
||||
for i := 0; i <= portInfo.Ports[1]-portInfo.Ports[0]; i++ {
|
||||
internalPort := portInfo.Ports[0] + i
|
||||
var externalPort int
|
||||
if portInfo.IsRangeMapping() {
|
||||
// 映射端口也是范围
|
||||
externalPort = portInfo.Map[0] + i
|
||||
} else {
|
||||
// 映射端口是单个端口
|
||||
externalPort = portInfo.Map[0]
|
||||
}
|
||||
p.portMapping[internalPort] = externalPort
|
||||
}
|
||||
} else {
|
||||
// 单端口映射
|
||||
p.portMapping[portInfo.Ports[0]] = portInfo.Map[0]
|
||||
}
|
||||
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
|
||||
p.Info("webrtc start listen", "port", tcpport)
|
||||
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
|
||||
p.s.DisableSRTPReplayProtection(true)
|
||||
case UDPRangePort:
|
||||
p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1]))
|
||||
p.SetDescription("udp", fmt.Sprintf("%d-%d", v[0], v[1]))
|
||||
case UDPPort:
|
||||
// 创建共享WEBRTC端口 默认9000
|
||||
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: int(v),
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = udpListener.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener udp", "error", err)
|
||||
return err
|
||||
p.Info("Port mapping configured", "mapping", p.portMapping)
|
||||
}
|
||||
|
||||
// 根据协议类型进行配置
|
||||
if portInfo.IsTCP() {
|
||||
if portInfo.IsRange() {
|
||||
// TCP端口范围,这里可能需要特殊处理
|
||||
p.Error("TCP port range not supported in current implementation")
|
||||
return fmt.Errorf("TCP port range not supported")
|
||||
} else {
|
||||
// TCP单端口
|
||||
tcpport := portInfo.Ports[0]
|
||||
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: tcpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = tcpl.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener tcp", "error", err)
|
||||
return err
|
||||
}
|
||||
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
|
||||
p.Info("webrtc start listen", "port", tcpport)
|
||||
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
|
||||
p.s.DisableSRTPReplayProtection(true)
|
||||
}
|
||||
} else {
|
||||
// UDP配置
|
||||
if portInfo.IsRange() {
|
||||
// UDP端口范围
|
||||
p.s.SetEphemeralUDPPortRange(uint16(portInfo.Ports[0]), uint16(portInfo.Ports[1]))
|
||||
p.SetDescription("udp", fmt.Sprintf("%d-%d", portInfo.Ports[0], portInfo.Ports[1]))
|
||||
} else {
|
||||
// UDP单端口
|
||||
udpport := portInfo.Ports[0]
|
||||
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: udpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = udpListener.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener udp", "error", err)
|
||||
return err
|
||||
}
|
||||
p.SetDescription("udp", fmt.Sprintf("%d", udpport))
|
||||
p.Info("webrtc start listen", "port", udpport)
|
||||
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
|
||||
}
|
||||
p.SetDescription("udp", fmt.Sprintf("%d", v))
|
||||
p.Info("webrtc start listen", "port", v)
|
||||
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -368,9 +409,33 @@ func (p *WebRTCPlugin) CreatePC(sd SessionDescription, conf Configuration) (pc *
|
||||
return
|
||||
}
|
||||
pc, err = api.NewPeerConnection(conf)
|
||||
if err == nil {
|
||||
err = pc.SetRemoteDescription(sd)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 如果有端口映射配置,记录 ICE 候选者信息以供调试
|
||||
if len(p.portMapping) > 0 {
|
||||
pc.OnICECandidate(func(candidate *ICECandidate) {
|
||||
if candidate != nil {
|
||||
// 记录端口映射信息(用于调试和监控)
|
||||
if mappedPort, exists := p.portMapping[int(candidate.Port)]; exists {
|
||||
p.Debug("ICE candidate with port mapping detected",
|
||||
"original_port", candidate.Port,
|
||||
"mapped_port", mappedPort,
|
||||
"candidate_address", candidate.Address,
|
||||
"candidate_type", candidate.Typ)
|
||||
candidate.Port = uint16(mappedPort) // 更新候选者端口为映射后的端口
|
||||
} else {
|
||||
p.Debug("ICE candidate generated",
|
||||
"port", candidate.Port,
|
||||
"address", candidate.Address,
|
||||
"type", candidate.Typ)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
err = pc.SetRemoteDescription(sd)
|
||||
return
|
||||
}
|
||||
|
||||
|
@@ -238,10 +238,9 @@ func (p *RecordFilePuller) queryRecordStreams(startTime, endTime time.Time) (err
|
||||
return pkg.ErrNoDB
|
||||
}
|
||||
queryRecord := RecordStream{
|
||||
Mode: RecordModeAuto,
|
||||
Type: p.Type,
|
||||
}
|
||||
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
|
||||
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "event_id=0 AND end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
|
140
recoder.go
140
recoder.go
@@ -1,6 +1,8 @@
|
||||
package m7s
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -12,58 +14,46 @@ import (
|
||||
"m7s.live/v5/pkg"
|
||||
)
|
||||
|
||||
const (
|
||||
RecordModeAuto RecordMode = "auto"
|
||||
RecordModeEvent RecordMode = "event"
|
||||
EventLevelLow EventLevel = "low"
|
||||
EventLevelHigh EventLevel = "high"
|
||||
)
|
||||
|
||||
type (
|
||||
EventLevel = string
|
||||
RecordMode = string
|
||||
IRecorder interface {
|
||||
IRecorder interface {
|
||||
task.ITask
|
||||
GetRecordJob() *RecordJob
|
||||
}
|
||||
RecorderFactory = func(config.Record) IRecorder
|
||||
RecordJob struct {
|
||||
// RecordEvent 包含录像事件的公共字段
|
||||
|
||||
EventRecordStream struct {
|
||||
CreatedAt time.Time
|
||||
*config.RecordEvent
|
||||
RecordStream
|
||||
}
|
||||
RecordJob struct {
|
||||
task.Job
|
||||
StreamPath string // 对应本地流
|
||||
Plugin *Plugin
|
||||
Subscriber *Subscriber
|
||||
SubConf *config.Subscribe
|
||||
RecConf *config.Record
|
||||
recorder IRecorder
|
||||
EventId string `json:"eventId" desc:"事件编号"`
|
||||
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式"`
|
||||
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长"`
|
||||
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长"`
|
||||
EventDesc string `json:"eventDesc" desc:"事件描述"`
|
||||
EventLevel EventLevel `json:"eventLevel" desc:"事件级别"`
|
||||
EventName string `json:"eventName" desc:"事件名称"`
|
||||
Event *config.RecordEvent
|
||||
StreamPath string // 对应本地流
|
||||
Plugin *Plugin
|
||||
Subscriber *Subscriber
|
||||
SubConf *config.Subscribe
|
||||
RecConf *config.Record
|
||||
recorder IRecorder
|
||||
}
|
||||
DefaultRecorder struct {
|
||||
task.Task
|
||||
RecordJob RecordJob
|
||||
Event EventRecordStream
|
||||
}
|
||||
RecordStream struct {
|
||||
ID uint `gorm:"primarykey"`
|
||||
StartTime, EndTime time.Time `gorm:"type:datetime;default:NULL"`
|
||||
Duration uint32 `gorm:"comment:录像时长;default:0"`
|
||||
EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"`
|
||||
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式,event=事件录像模式;default:'auto'"`
|
||||
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
|
||||
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:BIGINT;comment:事件前缓存时长;default:30000000000"`
|
||||
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:BIGINT;comment:事件后缓存时长;default:30000000000"`
|
||||
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
|
||||
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"`
|
||||
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
|
||||
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件,无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
|
||||
FilePath string
|
||||
StreamPath string
|
||||
AudioCodec, VideoCodec string
|
||||
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
|
||||
ID uint `gorm:"primarykey"`
|
||||
StartTime time.Time `gorm:"type:datetime;default:NULL"`
|
||||
EndTime time.Time `gorm:"type:datetime;default:NULL"`
|
||||
Duration uint32 `gorm:"comment:录像时长;default:0"`
|
||||
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
|
||||
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
|
||||
FilePath string
|
||||
StreamPath string
|
||||
AudioCodec string
|
||||
VideoCodec string
|
||||
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
|
||||
}
|
||||
)
|
||||
|
||||
@@ -75,6 +65,52 @@ func (r *DefaultRecorder) Start() (err error) {
|
||||
return r.RecordJob.Subscribe()
|
||||
}
|
||||
|
||||
func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*RecordJob) string) (err error) {
|
||||
recordJob := &r.RecordJob
|
||||
sub := recordJob.Subscriber
|
||||
r.Event.RecordStream = RecordStream{
|
||||
StartTime: start,
|
||||
StreamPath: sub.StreamPath,
|
||||
FilePath: customFileName(recordJob),
|
||||
Type: recordJob.RecConf.Type,
|
||||
}
|
||||
dir := filepath.Dir(r.Event.FilePath)
|
||||
if err = os.MkdirAll(dir, 0755); err != nil {
|
||||
return
|
||||
}
|
||||
if sub.Publisher.HasAudioTrack() {
|
||||
r.Event.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
|
||||
}
|
||||
if sub.Publisher.HasVideoTrack() {
|
||||
r.Event.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
|
||||
}
|
||||
if recordJob.Plugin.DB != nil {
|
||||
if recordJob.Event != nil {
|
||||
r.Event.RecordEvent = recordJob.Event
|
||||
recordJob.Plugin.DB.Save(&r.Event)
|
||||
} else {
|
||||
recordJob.Plugin.DB.Save(&r.Event.RecordStream)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {
|
||||
r.Event.EndTime = end
|
||||
if r.RecordJob.Plugin.DB != nil {
|
||||
// 将事件和录像记录关联
|
||||
if r.RecordJob.Event != nil {
|
||||
r.RecordJob.Plugin.DB.Save(&r.Event)
|
||||
} else {
|
||||
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
|
||||
}
|
||||
}
|
||||
if tailJob == nil {
|
||||
return
|
||||
}
|
||||
tailJob.AddTask(NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
|
||||
}
|
||||
|
||||
func (p *RecordJob) GetKey() string {
|
||||
return p.RecConf.FilePath
|
||||
}
|
||||
@@ -150,3 +186,27 @@ func (p *RecordJob) Start() (err error) {
|
||||
p.AddTask(p.recorder, p.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
func NewEventRecordCheck(t string, streamPath string, db *gorm.DB) *eventRecordCheck {
|
||||
return &eventRecordCheck{
|
||||
DB: db,
|
||||
streamPath: streamPath,
|
||||
Type: t,
|
||||
}
|
||||
}
|
||||
|
||||
type eventRecordCheck struct {
|
||||
task.Task
|
||||
DB *gorm.DB
|
||||
streamPath string
|
||||
Type string
|
||||
}
|
||||
|
||||
func (t *eventRecordCheck) Run() (err error) {
|
||||
var eventRecordStreams []EventRecordStream
|
||||
t.DB.Find(&eventRecordStreams, "type=? AND level=high AND stream_path=?", t.Type, t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
|
||||
for _, recordStream := range eventRecordStreams {
|
||||
t.DB.Model(&EventRecordStream{}).Where(`level=low AND start_time <= ? and end_time >= ?`, recordStream.EndTime, recordStream.StartTime).Update("level", config.EventLevelHigh)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user