Compare commits

...

3 Commits

Author SHA1 Message Date
langhuihui
a1b40bd7b8 feat: add port map to webrtc 2025-06-13 16:54:29 +08:00
pggiroro
827f6eac8d fix: ignore RecordEvent in gorm 2025-06-13 12:51:46 +08:00
langhuihui
ee056144a8 refactor: record 2025-06-13 12:51:41 +08:00
15 changed files with 812 additions and 387 deletions

6
api.go
View File

@@ -184,7 +184,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
if record.StreamPath == req.StreamPath {
recordings = append(recordings, &pb.RecordingDetail{
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Mode: record.RecConf.Mode,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
@@ -554,7 +554,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
for record := range s.Records.SafeRange {
recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Mode: record.RecConf.Mode,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
@@ -750,7 +750,7 @@ func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp
offset := (req.PageNum - 1) * req.PageSize // 计算偏移量
var totalCount int64 //总条数
var result []*RecordStream
var result []*EventRecordStream
query := s.DB.Model(&RecordStream{})
if strings.Contains(req.StreamPath, "*") {
query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%"))

View File

@@ -664,7 +664,7 @@ message ReqRecordList {
string end = 4;
uint32 pageNum = 5;
uint32 pageSize = 6;
string mode = 7;
string eventId = 7;
string type = 8;
string eventLevel = 9;
}

View File

@@ -16,6 +16,9 @@ const (
RelayModeRelay = "relay"
RelayModeMix = "mix"
RecordModeAuto RecordMode = "auto"
RecordModeEvent RecordMode = "event"
HookOnServerKeepAlive HookType = "server_keep_alive"
HookOnPublishStart HookType = "publish_start"
HookOnPublishEnd HookType = "publish_end"
@@ -29,11 +32,16 @@ const (
HookOnRecordEnd HookType = "record_end"
HookOnTransformStart HookType = "transform_start"
HookOnTransformEnd HookType = "transform_end"
EventLevelLow EventLevel = "low"
EventLevelHigh EventLevel = "high"
)
type (
HookType string
Publish struct {
EventLevel = string
RecordMode = string
HookType string
Publish struct {
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true" desc:"是否发布视频"`
@@ -84,11 +92,21 @@ type (
Proxy string `desc:"代理地址"` // 代理地址
Header HTTPValues
}
RecordEvent struct {
EventId string
BeforeDuration uint32 `json:"beforeDuration" desc:"事件前缓存时长" gorm:"comment:事件前缓存时长;default:30000"`
AfterDuration uint32 `json:"afterDuration" desc:"事件后缓存时长" gorm:"comment:事件后缓存时长;default:30000"`
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
}
Record struct {
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
FilePath string `desc:"录制文件路径"` // 录制文件路径
Fragment time.Duration `desc:"分片时长"` // 分片时长
Append bool `desc:"是否追加录制"` // 是否追加录制
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式event=事件录像模式;default:'auto'"`
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
FilePath string `desc:"录制文件路径"` // 录制文件路径
Fragment time.Duration `desc:"分片时长"` // 分片时长
Append bool `desc:"是否追加录制"` // 是否追加录制
Event *RecordEvent `json:"event" desc:"事件录像配置" gorm:"-"` // 事件录像配置
}
TransfromOutput struct {
Target string `desc:"转码目标"` // 转码目标

View File

@@ -13,6 +13,7 @@ type (
Port struct {
Protocol string
Ports [2]int
Map [2]int // 映射端口范围,通常用于 NAT 或端口转发
}
IPort interface {
IsTCP() bool
@@ -22,10 +23,23 @@ type (
)
func (p Port) String() string {
var result string
if p.Ports[0] == p.Ports[1] {
return p.Protocol + ":" + strconv.Itoa(p.Ports[0])
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0])
} else {
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
}
return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
// 如果有端口映射,添加映射信息
if p.HasMapping() {
if p.Map[0] == p.Map[1] {
result += ":" + strconv.Itoa(p.Map[0])
} else {
result += ":" + strconv.Itoa(p.Map[0]) + "-" + strconv.Itoa(p.Map[1])
}
}
return result
}
func (p Port) IsTCP() bool {
@@ -40,6 +54,36 @@ func (p Port) IsRange() bool {
return p.Ports[0] != p.Ports[1]
}
func (p Port) HasMapping() bool {
return p.Map[0] > 0 || p.Map[1] > 0
}
func (p Port) IsRangeMapping() bool {
return p.HasMapping() && p.Map[0] != p.Map[1]
}
// ParsePort2 解析端口配置字符串并返回对应的端口类型实例
// 根据协议类型和端口范围返回不同的类型:
// - TCP单端口返回 TCPPort
// - TCP端口范围返回 TCPRangePort
// - UDP单端口返回 UDPPort
// - UDP端口范围返回 UDPRangePort
//
// 参数:
//
// conf - 端口配置字符串格式protocol:port 或 protocol:port1-port2
//
// 返回值:
//
// ret - 端口实例 (TCPPort/UDPPort/TCPRangePort/UDPRangePort)
// err - 解析错误
//
// 示例:
//
// ParsePort2("tcp:8080") // 返回 TCPPort(8080)
// ParsePort2("tcp:8080-8090") // 返回 TCPRangePort([2]int{8080, 8090})
// ParsePort2("udp:5000") // 返回 UDPPort(5000)
// ParsePort2("udp:5000-5010") // 返回 UDPRangePort([2]int{5000, 5010})
func ParsePort2(conf string) (ret any, err error) {
var port Port
port, err = ParsePort(conf)
@@ -58,10 +102,84 @@ func ParsePort2(conf string) (ret any, err error) {
return UDPPort(port.Ports[0]), nil
}
// ParsePort 解析端口配置字符串为 Port 结构体
// 支持协议前缀、端口号/端口范围以及端口映射的解析
//
// 参数:
//
// conf - 端口配置字符串,格式:
// - "protocol:port" 单端口,如 "tcp:8080"
// - "protocol:port1-port2" 端口范围,如 "tcp:8080-8090"
// - "protocol:port:mapPort" 单端口映射,如 "tcp:8080:9090"
// - "protocol:port:mapPort1-mapPort2" 单端口映射到端口范围,如 "tcp:8080:9000-9010"
// - "protocol:port1-port2:mapPort1-mapPort2" 端口范围映射,如 "tcp:8080-8090:9000-9010"
//
// 返回值:
//
// ret - Port 结构体,包含协议、端口和映射端口信息
// err - 解析错误
//
// 注意:
// - 如果端口范围中 min > max会自动交换顺序
// - 单端口时Ports[0] 和 Ports[1] 值相同
// - 端口映射时Map[0] 和 Map[1] 存储映射的目标端口范围
// - 单个映射端口时Map[0] 和 Map[1] 值相同
//
// 示例:
//
// ParsePort("tcp:8080") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{0, 0}}
// ParsePort("tcp:8080-8090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{0, 0}}
// ParsePort("tcp:8080:9090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9090, 9090}}
// ParsePort("tcp:8080:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9000, 9010}}
// ParsePort("tcp:8080-8090:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{9000, 9010}}
// ParsePort("udp:5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5000}, Map:[2]int{0, 0}}
// ParsePort("udp:5010-5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5010}, Map:[2]int{0, 0}}
func ParsePort(conf string) (ret Port, err error) {
var port string
var port, mapPort string
var min, max int
ret.Protocol, port, _ = strings.Cut(conf, ":")
// 按冒号分割,支持端口映射
parts := strings.Split(conf, ":")
if len(parts) < 2 || len(parts) > 3 {
err = strconv.ErrSyntax
return
}
ret.Protocol = parts[0]
port = parts[1]
// 处理端口映射
if len(parts) == 3 {
mapPort = parts[2]
// 解析映射端口,支持单端口和端口范围
if mapRange := strings.Split(mapPort, "-"); len(mapRange) == 2 {
// 映射端口范围
var mapMin, mapMax int
mapMin, err = strconv.Atoi(mapRange[0])
if err != nil {
return
}
mapMax, err = strconv.Atoi(mapRange[1])
if err != nil {
return
}
if mapMin < mapMax {
ret.Map[0], ret.Map[1] = mapMin, mapMax
} else {
ret.Map[0], ret.Map[1] = mapMax, mapMin
}
} else {
// 单个映射端口
var mapPortNum int
mapPortNum, err = strconv.Atoi(mapPort)
if err != nil {
return
}
ret.Map[0], ret.Map[1] = mapPortNum, mapPortNum
}
}
// 处理端口范围
if r := strings.Split(port, "-"); len(r) == 2 {
min, err = strconv.Atoi(r[0])
if err != nil {
@@ -76,7 +194,12 @@ func ParsePort(conf string) (ret Port, err error) {
} else {
ret.Ports[0], ret.Ports[1] = max, min
}
} else if p, err := strconv.Atoi(port); err == nil {
} else {
var p int
p, err = strconv.Atoi(port)
if err != nil {
return
}
ret.Ports[0], ret.Ports[1] = p, p
}
return

370
pkg/port_test.go Normal file
View File

@@ -0,0 +1,370 @@
package pkg
import (
"testing"
)
func TestParsePort(t *testing.T) {
tests := []struct {
name string
input string
expected Port
hasError bool
}{
{
name: "TCP单端口",
input: "tcp:8080",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP端口范围",
input: "tcp:8080-8090",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP端口范围反序",
input: "tcp:8090-8080",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP单端口映射到单端口",
input: "tcp:8080:9090",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9090, 9090},
},
hasError: false,
},
{
name: "TCP单端口映射到端口范围",
input: "tcp:8080:9000-9010",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9000, 9010},
},
hasError: false,
},
{
name: "TCP端口范围映射到端口范围",
input: "tcp:8080-8090:9000-9010",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{9000, 9010},
},
hasError: false,
},
{
name: "UDP单端口",
input: "udp:5000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "UDP端口范围",
input: "udp:5000-5010",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5010},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "UDP端口映射",
input: "udp:5000:6000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{6000, 6000},
},
hasError: false,
},
{
name: "UDP端口范围映射映射范围反序",
input: "udp:5000-5010:6010-6000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5010},
Map: [2]int{6000, 6010},
},
hasError: false,
},
// 错误情况
{
name: "缺少协议",
input: "8080",
expected: Port{},
hasError: true,
},
{
name: "过多冒号",
input: "tcp:8080:9090:extra",
expected: Port{},
hasError: true,
},
{
name: "无效端口号",
input: "tcp:abc",
expected: Port{},
hasError: true,
},
{
name: "无效映射端口号",
input: "tcp:8080:abc",
expected: Port{},
hasError: true,
},
{
name: "无效端口范围",
input: "tcp:8080-abc",
expected: Port{},
hasError: true,
},
{
name: "无效映射端口范围",
input: "tcp:8080:9000-abc",
expected: Port{},
hasError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ParsePort(tt.input)
if tt.hasError {
if err == nil {
t.Errorf("期望有错误,但没有错误")
}
return
}
if err != nil {
t.Errorf("意外的错误: %v", err)
return
}
if result.Protocol != tt.expected.Protocol {
t.Errorf("协议不匹配: 期望 %s, 得到 %s", tt.expected.Protocol, result.Protocol)
}
if result.Ports != tt.expected.Ports {
t.Errorf("端口不匹配: 期望 %v, 得到 %v", tt.expected.Ports, result.Ports)
}
if result.Map != tt.expected.Map {
t.Errorf("映射端口不匹配: 期望 %v, 得到 %v", tt.expected.Map, result.Map)
}
})
}
}
func TestPortMethods(t *testing.T) {
tests := []struct {
name string
port Port
expectTCP bool
expectUDP bool
expectRange bool
expectMapping bool
expectRangeMap bool
expectString string
}{
{
name: "TCP单端口",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{0, 0},
},
expectTCP: true,
expectUDP: false,
expectRange: false,
expectMapping: false,
expectRangeMap: false,
expectString: "tcp:8080",
},
{
name: "TCP端口范围",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
expectTCP: true,
expectUDP: false,
expectRange: true,
expectMapping: false,
expectRangeMap: false,
expectString: "tcp:8080-8090",
},
{
name: "TCP单端口映射",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9090, 9090},
},
expectTCP: true,
expectUDP: false,
expectRange: false,
expectMapping: true,
expectRangeMap: false,
expectString: "tcp:8080:9090",
},
{
name: "TCP端口范围映射",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{9000, 9010},
},
expectTCP: true,
expectUDP: false,
expectRange: true,
expectMapping: true,
expectRangeMap: true,
expectString: "tcp:8080-8090:9000-9010",
},
{
name: "UDP单端口映射到端口范围",
port: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{6000, 6010},
},
expectTCP: false,
expectUDP: true,
expectRange: false,
expectMapping: true,
expectRangeMap: true,
expectString: "udp:5000:6000-6010",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.port.IsTCP() != tt.expectTCP {
t.Errorf("IsTCP(): 期望 %v, 得到 %v", tt.expectTCP, tt.port.IsTCP())
}
if tt.port.IsUDP() != tt.expectUDP {
t.Errorf("IsUDP(): 期望 %v, 得到 %v", tt.expectUDP, tt.port.IsUDP())
}
if tt.port.IsRange() != tt.expectRange {
t.Errorf("IsRange(): 期望 %v, 得到 %v", tt.expectRange, tt.port.IsRange())
}
if tt.port.HasMapping() != tt.expectMapping {
t.Errorf("HasMapping(): 期望 %v, 得到 %v", tt.expectMapping, tt.port.HasMapping())
}
if tt.port.IsRangeMapping() != tt.expectRangeMap {
t.Errorf("IsRangeMapping(): 期望 %v, 得到 %v", tt.expectRangeMap, tt.port.IsRangeMapping())
}
if tt.port.String() != tt.expectString {
t.Errorf("String(): 期望 %s, 得到 %s", tt.expectString, tt.port.String())
}
})
}
}
func TestParsePort2(t *testing.T) {
tests := []struct {
name string
input string
expectedType string
hasError bool
}{
{
name: "TCP单端口",
input: "tcp:8080",
expectedType: "TCPPort",
hasError: false,
},
{
name: "TCP端口范围",
input: "tcp:8080-8090",
expectedType: "TCPRangePort",
hasError: false,
},
{
name: "UDP单端口",
input: "udp:5000",
expectedType: "UDPPort",
hasError: false,
},
{
name: "UDP端口范围",
input: "udp:5000-5010",
expectedType: "UDPRangePort",
hasError: false,
},
{
name: "无效输入",
input: "invalid",
hasError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ParsePort2(tt.input)
if tt.hasError {
if err == nil {
t.Errorf("期望有错误,但没有错误")
}
return
}
if err != nil {
t.Errorf("意外的错误: %v", err)
return
}
switch tt.expectedType {
case "TCPPort":
if _, ok := result.(TCPPort); !ok {
t.Errorf("期望类型 TCPPort, 得到 %T", result)
}
case "TCPRangePort":
if _, ok := result.(TCPRangePort); !ok {
t.Errorf("期望类型 TCPRangePort, 得到 %T", result)
}
case "UDPPort":
if _, ok := result.(UDPPort); !ok {
t.Errorf("期望类型 UDPPort, 得到 %T", result)
}
case "UDPRangePort":
if _, ok := result.(UDPRangePort); !ok {
t.Errorf("期望类型 UDPRangePort, 得到 %T", result)
}
}
})
}
}

View File

@@ -156,6 +156,10 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
p.disable(fmt.Sprintf("auto migrate record stream failed %v", err))
return
}
if err = p.DB.AutoMigrate(&EventRecordStream{}); err != nil {
p.disable(fmt.Sprintf("auto migrate event record stream failed %v", err))
return
}
}
if err := s.AddTask(instance).WaitStarted(); err != nil {
p.disable(instance.StopReason().Error())

View File

@@ -8,7 +8,6 @@ import (
"slices"
"time"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
@@ -144,7 +143,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
stream m7s.RecordStream
}
var CustomFileName = func(job *m7s.RecordJob) string {
@@ -155,48 +153,21 @@ var CustomFileName = func(job *m7s.RecordJob) string {
}
func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "flv",
}
dir := filepath.Dir(r.stream.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
return r.CreateStream(start, CustomFileName)
}
func (r *Recorder) writeTailer(end time.Time) {
if r.stream.EndTime.After(r.stream.StartTime) {
if r.Event.EndTime.After(r.Event.StartTime) {
return
}
r.stream.EndTime = end
r.Event.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
writeMetaTagQueueTask.AddTask(&eventRecordCheck{
DB: r.RecordJob.Plugin.DB,
streamPath: r.stream.StreamPath,
})
if r.RecordJob.Event != nil {
r.RecordJob.Plugin.DB.Save(&r.Event)
} else {
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
}
writeMetaTagQueueTask.AddTask(m7s.NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
}
}
@@ -204,40 +175,6 @@ func (r *Recorder) Dispose() {
r.writeTailer(time.Now())
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "flv",
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
query := `(start_time BETWEEN ? AND ?)
OR (end_time BETWEEN ? AND ?)
OR (? BETWEEN start_time AND end_time)
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
}
func (r *Recorder) Run() (err error) {
var file *os.File
var filepositions []uint64
@@ -248,14 +185,14 @@ func (r *Recorder) Run() (err error) {
suber := ctx.Subscriber
noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append
startTime := time.Now()
if ctx.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration)
if ctx.Event.BeforeDuration > 0 {
startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
}
if err = r.createStream(startTime); err != nil {
return
}
if noFragment {
file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
if err != nil {
return
}
@@ -291,7 +228,7 @@ func (r *Recorder) Run() (err error) {
} else if ctx.RecConf.Fragment == 0 {
_, err = file.Write(FLVHead)
} else {
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
_, err = file.Write(FLVHead)
@@ -307,7 +244,7 @@ func (r *Recorder) Run() (err error) {
if err = r.createStream(time.Now()); err != nil {
return
}
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
_, err = file.Write(FLVHead)

View File

@@ -2,16 +2,13 @@ package hls
import (
"fmt"
"os"
"path/filepath"
"time"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
)
@@ -22,7 +19,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
stream m7s.RecordStream
ts *TsInFile
pesAudio *mpegts.MpegtsPESFrame
pesVideo *mpegts.MpegtsPESFrame
@@ -39,81 +35,11 @@ var CustomFileName = func(job *m7s.RecordJob) string {
}
func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "hls",
}
dir := filepath.Dir(r.stream.FilePath)
dir = filepath.Clean(dir)
if err = os.MkdirAll(dir, 0755); err != nil {
r.Error("create directory failed", "err", err, "dir", dir)
return
}
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "hls",
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
query := `(start_time BETWEEN ? AND ?)
OR (end_time BETWEEN ? AND ?)
OR (? BETWEEN start_time AND end_time)
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
return r.CreateStream(start, CustomFileName)
}
func (r *Recorder) writeTailer(end time.Time) {
if r.stream.EndTime.After(r.stream.StartTime) {
return
}
r.stream.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
}
r.WriteTail(end, nil)
}
func (r *Recorder) Dispose() {
@@ -131,9 +57,9 @@ func (r *Recorder) createNewTs() {
r.ts.Close()
}
var err error
r.ts, err = NewTsInFile(r.stream.FilePath)
r.ts, err = NewTsInFile(r.Event.FilePath)
if err != nil {
r.Error("create ts file failed", "err", err, "path", r.stream.FilePath)
r.Error("create ts file failed", "err", err, "path", r.Event.FilePath)
return
}
if oldPMT.Len() > 0 {
@@ -175,8 +101,8 @@ func (r *Recorder) Run() (err error) {
ctx := &r.RecordJob
suber := ctx.Subscriber
startTime := time.Now()
if ctx.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration)
if ctx.Event.BeforeDuration > 0 {
startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
}
// 创建第一个片段记录

View File

@@ -165,10 +165,9 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
// 构建查询条件,查找指定时间范围内的录制记录
queryRecord := m7s.RecordStream{
Mode: m7s.RecordModeAuto,
Type: "mp4",
}
p.DB.Where(&queryRecord).Find(&streams, "end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
p.DB.Where(&queryRecord).Find(&streams, "event_id=0 AND end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
// 创建 MP4 混合器
muxer := mp4.NewMuxer(flag)
@@ -533,42 +532,44 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
Append: false,
Fragment: 0,
FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")),
Mode: config.RecordModeEvent,
Event: &config.RecordEvent{
EventId: req.EventId,
EventLevel: req.EventLevel,
EventName: req.EventName,
EventDesc: req.EventDesc,
BeforeDuration: uint32(beforeDuration / time.Millisecond),
AfterDuration: uint32(afterDuration / time.Millisecond),
},
}
//recordJob := recorder.GetRecordJob()
var subconfig config.Subscribe
defaults.SetDefaults(&subconfig)
subconfig.BufferTime = beforeDuration
recordJob := p.Record(stream, recordConf, &subconfig)
recordJob.EventId = req.EventId
recordJob.EventLevel = req.EventLevel
recordJob.EventName = req.EventName
recordJob.EventDesc = req.EventDesc
recordJob.AfterDuration = afterDuration
recordJob.BeforeDuration = beforeDuration
recordJob.Mode = m7s.RecordModeEvent
p.Record(stream, recordConf, &subconfig)
}
} else {
if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间
tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration
if tmpJob.Event != nil { //当前有事件录像正在录制,则更新该录像的结束时间
tmpJob.Event.AfterDuration = tmpJob.Subscriber.VideoReader.AbsTime + uint32(afterDuration/time.Millisecond)
if p.DB != nil {
p.DB.Save(&tmpJob.Event)
}
} else { //当前有自动录像正在录制,则生成事件录像的记录,而不去生成事件录像的文件
recordStream := &m7s.RecordStream{
StreamPath: req.StreamPath,
newEvent := &config.RecordEvent{
EventId: req.EventId,
EventLevel: req.EventLevel,
EventDesc: req.EventDesc,
EventName: req.EventName,
Mode: m7s.RecordModeEvent,
BeforeDuration: beforeDuration,
AfterDuration: afterDuration,
Type: "mp4",
EventDesc: req.EventDesc,
BeforeDuration: uint32(beforeDuration / time.Millisecond),
AfterDuration: uint32(afterDuration / time.Millisecond),
}
now := time.Now()
startTime := now.Add(-beforeDuration)
endTime := now.Add(afterDuration)
recordStream.StartTime = startTime
recordStream.EndTime = endTime
if p.DB != nil {
p.DB.Save(&recordStream)
p.DB.Save(&m7s.EventRecordStream{
RecordEvent: newEvent,
RecordStream: m7s.RecordStream{
StreamPath: req.StreamPath,
},
})
}
}
}

View File

@@ -91,9 +91,6 @@ func (p *DeleteRecordTask) deleteOldestFile() {
}
for _, filePath := range filePaths {
for p.getDiskOutOfSpace(filePath) {
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelLow, // 查询条件event_level = 1,非重要事件
}
var eventRecords []m7s.RecordStream
// 使用不同的方法进行路径匹配避免ESCAPE语法问题
// 解决方案用MySQL能理解的简单方式匹配路径前缀
@@ -103,7 +100,7 @@ func (p *DeleteRecordTask) deleteOldestFile() {
searchPattern := basePath + "%"
p.Info("deleteOldestFile", "searching with path pattern", searchPattern)
err := p.DB.Where(&queryRecord).Where("end_time IS NOT NULL").
err := p.DB.Where("event_id=0 AND end_time IS NOT NULL").
Where("file_path LIKE ?", searchPattern).
Order("end_time ASC").Find(&eventRecords).Error
if err == nil {
@@ -149,14 +146,11 @@ func (t *DeleteRecordTask) Tick(any) {
if t.RecordFileExpireDays <= 0 {
return
}
//搜索event_records表中event_level值为1的(非重要)数据并将其create_time与当前时间比对大于RecordFileExpireDays则进行删除数据库标记is_delete为1磁盘上删除录像文件
//搜索event_records表中event_id值为0的(非事件)录像并将其create_time与当前时间比对大于RecordFileExpireDays则进行删除数据库标记is_delete为1磁盘上删除录像文件
var eventRecords []m7s.RecordStream
expireTime := time.Now().AddDate(0, 0, -t.RecordFileExpireDays)
t.Debug("RecordFileExpireDays is set to auto delete oldestfile", "expireTime", expireTime.Format("2006-01-02 15:04:05"))
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelLow, // 查询条件event_level = low,非重要事件
}
err := t.DB.Where(&queryRecord).Find(&eventRecords, "end_time < ? AND end_time IS NOT NULL", expireTime).Error
err := t.DB.Find(&eventRecords, "event_id=0 AND end_time < ? AND end_time IS NOT NULL", expireTime).Error
if err == nil {
for _, record := range eventRecords {
t.Info("RecordFileExpireDays is set to auto delete oldestfile", "ID", record.ID, "create time", record.EndTime, "filepath", record.FilePath)

View File

@@ -7,7 +7,6 @@ import (
"path/filepath"
"time"
"gorm.io/gorm"
m7s "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
@@ -107,39 +106,6 @@ func (t *writeTrailerTask) Run() (err error) {
return
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "mp4",
StreamPath: t.streamPath,
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
queryRecord.Mode = m7s.RecordModeAuto
query := `start_time <= ? and end_time >= ?`
t.DB.Where(&queryRecord).Where(query, recordStream.EndTime, recordStream.StartTime).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
}
func init() {
m7s.Servers.AddTask(&writeTrailerQueueTask)
}
@@ -150,20 +116,12 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
muxer *Muxer
file *os.File
stream m7s.RecordStream
muxer *Muxer
file *os.File
}
func (r *Recorder) writeTailer(end time.Time) {
r.stream.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
writeTrailerQueueTask.AddTask(&eventRecordCheck{
DB: r.RecordJob.Plugin.DB,
streamPath: r.stream.StreamPath,
})
}
r.WriteTail(end, &writeTrailerQueueTask)
writeTrailerQueueTask.AddTask(&writeTrailerTask{
muxer: r.muxer,
file: r.file,
@@ -178,46 +136,7 @@ var CustomFileName = func(job *m7s.RecordJob) string {
}
func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "mp4",
}
dir := filepath.Dir(r.stream.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
r.file, err = os.Create(r.stream.FilePath)
if err != nil {
return
}
if recordJob.RecConf.Type == "fmp4" {
r.stream.Type = "fmp4"
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.stream.StreamPath)
} else {
r.muxer = NewMuxerWithStreamPath(0, r.stream.StreamPath)
}
r.muxer.WriteInitSegment(r.file)
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
return r.CreateStream(start, CustomFileName)
}
func (r *Recorder) Dispose() {
@@ -231,17 +150,28 @@ func (r *Recorder) Run() (err error) {
sub := recordJob.Subscriber
var audioTrack, videoTrack *Track
startTime := time.Now()
if recordJob.BeforeDuration > 0 {
startTime = startTime.Add(-recordJob.BeforeDuration)
if recordJob.Event != nil {
startTime = startTime.Add(-time.Duration(recordJob.Event.BeforeDuration) * time.Millisecond)
}
err = r.createStream(startTime)
if err != nil {
return
}
r.file, err = os.Create(r.Event.FilePath)
if err != nil {
return
}
if recordJob.RecConf.Type == "fmp4" {
r.Event.Type = "fmp4"
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath)
} else {
r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath)
}
r.muxer.WriteInitSegment(r.file)
var at, vt *pkg.AVTrack
checkEventRecordStop := func(absTime uint32) (err error) {
if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.AfterDuration+recordJob.BeforeDuration {
if absTime >= recordJob.Event.AfterDuration+recordJob.Event.BeforeDuration {
r.RecordJob.Stop(task.ErrStopByUser)
}
return
@@ -269,9 +199,9 @@ func (r *Recorder) Run() (err error) {
}
return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error {
r.stream.Duration = sub.AudioReader.AbsTime
r.Event.Duration = sub.AudioReader.AbsTime
if sub.VideoReader == nil {
if recordJob.AfterDuration != 0 {
if recordJob.Event != nil {
err := checkEventRecordStop(sub.VideoReader.AbsTime)
if err != nil {
return err
@@ -314,9 +244,9 @@ func (r *Recorder) Run() (err error) {
Timestamp: uint32(dts),
})
}, func(video *rtmp.RTMPVideo) error {
r.stream.Duration = sub.VideoReader.AbsTime
r.Event.Duration = sub.VideoReader.AbsTime
if sub.VideoReader.Value.IDR {
if recordJob.AfterDuration != 0 {
if recordJob.Event != nil {
err := checkEventRecordStop(sub.VideoReader.AbsTime)
if err != nil {
return err

View File

@@ -185,8 +185,6 @@ func (t *RecordRecoveryTask) recoverRecordFromFile(filePath string) error {
FilePath: filePath,
StreamPath: streamPath,
Type: "mp4",
Mode: m7s.RecordModeAuto, // 默认为自动录制模式
EventLevel: m7s.EventLevelLow, // 默认为低级别事件
}
// 设置开始和结束时间

View File

@@ -37,12 +37,13 @@ var (
type WebRTCPlugin struct {
m7s.Plugin
ICEServers []ICEServer `desc:"ice服务器配置"`
Port string `default:"tcp:9000" desc:"监听端口"`
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后发送PLI请求
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
MimeType []string `desc:"MimeType过滤列表为空则不过滤"` // MimeType过滤列表支持的格式如video/H264, audio/opus
s SettingEngine
ICEServers []ICEServer `desc:"ice服务器配置"`
Port string `default:"tcp:9000" desc:"监听端口"`
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后发送PLI请求
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
MimeType []string `desc:"MimeType过滤列表为空则不过滤"` // MimeType过滤列表支持的格式如video/H264, audio/opus
s SettingEngine
portMapping map[int]int // 内部端口到外部端口的映射
}
func (p *WebRTCPlugin) RegisterHandler() map[string]http.HandlerFunc {
@@ -306,50 +307,90 @@ func (p *WebRTCPlugin) initSettingEngine() error {
// configurePort 配置端口设置
func (p *WebRTCPlugin) configurePort() error {
ports, err := ParsePort2(p.Port)
// 使用 ParsePort 而不是 ParsePort2 来获取端口映射信息
portInfo, err := ParsePort(p.Port)
if err != nil {
p.Error("webrtc port config error", "error", err, "port", p.Port)
return err
}
switch v := ports.(type) {
case TCPPort:
tcpport := int(v)
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
p.OnDispose(func() {
_ = tcpl.Close()
})
if err != nil {
p.Error("webrtc listener tcp", "error", err)
// 初始化端口映射
p.portMapping = make(map[int]int)
// 如果有端口映射,存储映射关系
if portInfo.HasMapping() {
if portInfo.IsRange() {
// 端口范围映射
for i := 0; i <= portInfo.Ports[1]-portInfo.Ports[0]; i++ {
internalPort := portInfo.Ports[0] + i
var externalPort int
if portInfo.IsRangeMapping() {
// 映射端口也是范围
externalPort = portInfo.Map[0] + i
} else {
// 映射端口是单个端口
externalPort = portInfo.Map[0]
}
p.portMapping[internalPort] = externalPort
}
} else {
// 单端口映射
p.portMapping[portInfo.Ports[0]] = portInfo.Map[0]
}
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
p.Info("webrtc start listen", "port", tcpport)
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
p.s.DisableSRTPReplayProtection(true)
case UDPRangePort:
p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1]))
p.SetDescription("udp", fmt.Sprintf("%d-%d", v[0], v[1]))
case UDPPort:
// 创建共享WEBRTC端口 默认9000
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: int(v),
})
p.OnDispose(func() {
_ = udpListener.Close()
})
if err != nil {
p.Error("webrtc listener udp", "error", err)
return err
p.Info("Port mapping configured", "mapping", p.portMapping)
}
// 根据协议类型进行配置
if portInfo.IsTCP() {
if portInfo.IsRange() {
// TCP端口范围这里可能需要特殊处理
p.Error("TCP port range not supported in current implementation")
return fmt.Errorf("TCP port range not supported")
} else {
// TCP单端口
tcpport := portInfo.Ports[0]
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
p.OnDispose(func() {
_ = tcpl.Close()
})
if err != nil {
p.Error("webrtc listener tcp", "error", err)
return err
}
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
p.Info("webrtc start listen", "port", tcpport)
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
p.s.DisableSRTPReplayProtection(true)
}
} else {
// UDP配置
if portInfo.IsRange() {
// UDP端口范围
p.s.SetEphemeralUDPPortRange(uint16(portInfo.Ports[0]), uint16(portInfo.Ports[1]))
p.SetDescription("udp", fmt.Sprintf("%d-%d", portInfo.Ports[0], portInfo.Ports[1]))
} else {
// UDP单端口
udpport := portInfo.Ports[0]
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: udpport,
})
p.OnDispose(func() {
_ = udpListener.Close()
})
if err != nil {
p.Error("webrtc listener udp", "error", err)
return err
}
p.SetDescription("udp", fmt.Sprintf("%d", udpport))
p.Info("webrtc start listen", "port", udpport)
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
}
p.SetDescription("udp", fmt.Sprintf("%d", v))
p.Info("webrtc start listen", "port", v)
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
}
return nil
@@ -368,9 +409,33 @@ func (p *WebRTCPlugin) CreatePC(sd SessionDescription, conf Configuration) (pc *
return
}
pc, err = api.NewPeerConnection(conf)
if err == nil {
err = pc.SetRemoteDescription(sd)
if err != nil {
return
}
// 如果有端口映射配置,记录 ICE 候选者信息以供调试
if len(p.portMapping) > 0 {
pc.OnICECandidate(func(candidate *ICECandidate) {
if candidate != nil {
// 记录端口映射信息(用于调试和监控)
if mappedPort, exists := p.portMapping[int(candidate.Port)]; exists {
p.Debug("ICE candidate with port mapping detected",
"original_port", candidate.Port,
"mapped_port", mappedPort,
"candidate_address", candidate.Address,
"candidate_type", candidate.Typ)
candidate.Port = uint16(mappedPort) // 更新候选者端口为映射后的端口
} else {
p.Debug("ICE candidate generated",
"port", candidate.Port,
"address", candidate.Address,
"type", candidate.Typ)
}
}
})
}
err = pc.SetRemoteDescription(sd)
return
}

View File

@@ -238,10 +238,9 @@ func (p *RecordFilePuller) queryRecordStreams(startTime, endTime time.Time) (err
return pkg.ErrNoDB
}
queryRecord := RecordStream{
Mode: RecordModeAuto,
Type: p.Type,
}
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "event_id=0 AND end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
if tx.Error != nil {
return tx.Error
}

View File

@@ -1,6 +1,8 @@
package m7s
import (
"os"
"path/filepath"
"time"
"gorm.io/gorm"
@@ -12,58 +14,46 @@ import (
"m7s.live/v5/pkg"
)
const (
RecordModeAuto RecordMode = "auto"
RecordModeEvent RecordMode = "event"
EventLevelLow EventLevel = "low"
EventLevelHigh EventLevel = "high"
)
type (
EventLevel = string
RecordMode = string
IRecorder interface {
IRecorder interface {
task.ITask
GetRecordJob() *RecordJob
}
RecorderFactory = func(config.Record) IRecorder
RecordJob struct {
// RecordEvent 包含录像事件的公共字段
EventRecordStream struct {
CreatedAt time.Time
*config.RecordEvent
RecordStream
}
RecordJob struct {
task.Job
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
SubConf *config.Subscribe
RecConf *config.Record
recorder IRecorder
EventId string `json:"eventId" desc:"事件编号"`
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式"`
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长"`
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长"`
EventDesc string `json:"eventDesc" desc:"事件描述"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别"`
EventName string `json:"eventName" desc:"事件名称"`
Event *config.RecordEvent
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
SubConf *config.Subscribe
RecConf *config.Record
recorder IRecorder
}
DefaultRecorder struct {
task.Task
RecordJob RecordJob
Event EventRecordStream
}
RecordStream struct {
ID uint `gorm:"primarykey"`
StartTime, EndTime time.Time `gorm:"type:datetime;default:NULL"`
Duration uint32 `gorm:"comment:录像时长;default:0"`
EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"`
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式event=事件录像模式;default:'auto'"`
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:BIGINT;comment:事件前缓存时长;default:30000000000"`
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:BIGINT;comment:事件后缓存时长;default:30000000000"`
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"`
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
FilePath string
StreamPath string
AudioCodec, VideoCodec string
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
ID uint `gorm:"primarykey"`
StartTime time.Time `gorm:"type:datetime;default:NULL"`
EndTime time.Time `gorm:"type:datetime;default:NULL"`
Duration uint32 `gorm:"comment:录像时长;default:0"`
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
FilePath string
StreamPath string
AudioCodec string
VideoCodec string
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
}
)
@@ -75,6 +65,52 @@ func (r *DefaultRecorder) Start() (err error) {
return r.RecordJob.Subscribe()
}
func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*RecordJob) string) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.Event.RecordStream = RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: customFileName(recordJob),
Type: recordJob.RecConf.Type,
}
dir := filepath.Dir(r.Event.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if sub.Publisher.HasAudioTrack() {
r.Event.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.Event.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
if recordJob.Event != nil {
r.Event.RecordEvent = recordJob.Event
recordJob.Plugin.DB.Save(&r.Event)
} else {
recordJob.Plugin.DB.Save(&r.Event.RecordStream)
}
}
return
}
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {
r.Event.EndTime = end
if r.RecordJob.Plugin.DB != nil {
// 将事件和录像记录关联
if r.RecordJob.Event != nil {
r.RecordJob.Plugin.DB.Save(&r.Event)
} else {
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
}
}
if tailJob == nil {
return
}
tailJob.AddTask(NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
}
func (p *RecordJob) GetKey() string {
return p.RecConf.FilePath
}
@@ -150,3 +186,27 @@ func (p *RecordJob) Start() (err error) {
p.AddTask(p.recorder, p.Logger)
return
}
func NewEventRecordCheck(t string, streamPath string, db *gorm.DB) *eventRecordCheck {
return &eventRecordCheck{
DB: db,
streamPath: streamPath,
Type: t,
}
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
Type string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []EventRecordStream
t.DB.Find(&eventRecordStreams, "type=? AND level=high AND stream_path=?", t.Type, t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
for _, recordStream := range eventRecordStreams {
t.DB.Model(&EventRecordStream{}).Where(`level=low AND start_time <= ? and end_time >= ?`, recordStream.EndTime, recordStream.StartTime).Update("level", config.EventLevelHigh)
}
return
}