Compare commits

...

8 Commits

Author SHA1 Message Date
langhuihui
77613e52a8 feat: mp4 to ts convert 2025-06-16 09:03:26 +08:00
erroot
ec56bba75a Erroot v5 (#286)
* 插件数据库不同时,新建DB 对象赋值给插件

* MP4 plugin adds extraction, clips, images, compressed video, GOP clicp

* remove mp4/util panic code
2025-06-16 08:29:14 +08:00
pggiroro
b2b511d755 fix: user.LastLogin set gorm type:timestamp, gb28181 api GetGroupChannels modify 2025-06-15 22:19:14 +08:00
pggiroro
42acf47250 feature: gb28181 support single mediaport 2025-06-15 16:58:52 +08:00
langhuihui
6206ee847d fix: record table fit pg database 2025-06-15 15:58:12 +08:00
langhuihui
6cfdc03e4a fix: user mode fit pg database 2025-06-15 15:21:21 +08:00
pggiroro
b425b8da1f fix: ignore RecordEvent in gorm 2025-06-13 12:52:57 +08:00
langhuihui
e105243cd5 refactor: record 2025-06-13 12:52:57 +08:00
27 changed files with 3210 additions and 527 deletions

6
api.go
View File

@@ -184,7 +184,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
if record.StreamPath == req.StreamPath { if record.StreamPath == req.StreamPath {
recordings = append(recordings, &pb.RecordingDetail{ recordings = append(recordings, &pb.RecordingDetail{
FilePath: record.RecConf.FilePath, FilePath: record.RecConf.FilePath,
Mode: record.Mode, Mode: record.RecConf.Mode,
Fragment: durationpb.New(record.RecConf.Fragment), Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append, Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name, PluginName: record.Plugin.Meta.Name,
@@ -554,7 +554,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
for record := range s.Records.SafeRange { for record := range s.Records.SafeRange {
recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{ recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{
FilePath: record.RecConf.FilePath, FilePath: record.RecConf.FilePath,
Mode: record.Mode, Mode: record.RecConf.Mode,
Fragment: durationpb.New(record.RecConf.Fragment), Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append, Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name, PluginName: record.Plugin.Meta.Name,
@@ -750,7 +750,7 @@ func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp
offset := (req.PageNum - 1) * req.PageSize // 计算偏移量 offset := (req.PageNum - 1) * req.PageSize // 计算偏移量
var totalCount int64 //总条数 var totalCount int64 //总条数
var result []*RecordStream var result []*EventRecordStream
query := s.DB.Model(&RecordStream{}) query := s.DB.Model(&RecordStream{})
if strings.Contains(req.StreamPath, "*") { if strings.Contains(req.StreamPath, "*") {
query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%")) query = query.Where("stream_path like ?", strings.ReplaceAll(req.StreamPath, "*", "%"))

View File

@@ -664,7 +664,7 @@ message ReqRecordList {
string end = 4; string end = 4;
uint32 pageNum = 5; uint32 pageNum = 5;
uint32 pageSize = 6; uint32 pageSize = 6;
string mode = 7; string eventId = 7;
string type = 8; string type = 8;
string eventLevel = 9; string eventLevel = 9;
} }

View File

@@ -16,6 +16,9 @@ const (
RelayModeRelay = "relay" RelayModeRelay = "relay"
RelayModeMix = "mix" RelayModeMix = "mix"
RecordModeAuto RecordMode = "auto"
RecordModeEvent RecordMode = "event"
HookOnServerKeepAlive HookType = "server_keep_alive" HookOnServerKeepAlive HookType = "server_keep_alive"
HookOnPublishStart HookType = "publish_start" HookOnPublishStart HookType = "publish_start"
HookOnPublishEnd HookType = "publish_end" HookOnPublishEnd HookType = "publish_end"
@@ -29,11 +32,16 @@ const (
HookOnRecordEnd HookType = "record_end" HookOnRecordEnd HookType = "record_end"
HookOnTransformStart HookType = "transform_start" HookOnTransformStart HookType = "transform_start"
HookOnTransformEnd HookType = "transform_end" HookOnTransformEnd HookType = "transform_end"
EventLevelLow EventLevel = "low"
EventLevelHigh EventLevel = "high"
) )
type ( type (
HookType string EventLevel = string
Publish struct { RecordMode = string
HookType string
Publish struct {
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量 MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
PubAudio bool `default:"true" desc:"是否发布音频"` PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true" desc:"是否发布视频"` PubVideo bool `default:"true" desc:"是否发布视频"`
@@ -84,11 +92,21 @@ type (
Proxy string `desc:"代理地址"` // 代理地址 Proxy string `desc:"代理地址"` // 代理地址
Header HTTPValues Header HTTPValues
} }
RecordEvent struct {
EventId string
BeforeDuration uint32 `json:"beforeDuration" desc:"事件前缓存时长" gorm:"comment:事件前缓存时长;default:30000"`
AfterDuration uint32 `json:"afterDuration" desc:"事件后缓存时长" gorm:"comment:事件后缓存时长;default:30000"`
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
}
Record struct { Record struct {
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7 Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式event=事件录像模式;default:'auto'"`
FilePath string `desc:"录制文件路径"` // 录制文件路径 Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
Fragment time.Duration `desc:"分片时长"` // 分片时长 FilePath string `desc:"录制文件路径"` // 录制文件路径
Append bool `desc:"是否追加录制"` // 是否追加录制 Fragment time.Duration `desc:"分片时长"` // 分片时长
Append bool `desc:"是否追加录制"` // 是否追加录制
Event *RecordEvent `json:"event" desc:"事件录像配置" gorm:"-"` // 事件录像配置
} }
TransfromOutput struct { TransfromOutput struct {
Target string `desc:"转码目标"` // 转码目标 Target string `desc:"转码目标"` // 转码目标

View File

@@ -9,14 +9,11 @@ import (
// User represents a user in the system // User represents a user in the system
type User struct { type User struct {
ID uint `gorm:"primarykey"` gorm.Model
CreatedAt time.Time Username string `gorm:"uniqueIndex;size:64"`
UpdatedAt time.Time Password string `gorm:"size:60"` // bcrypt hash
DeletedAt gorm.DeletedAt `gorm:"index"` Role string `gorm:"size:20;default:'user'"` // admin or user
Username string `gorm:"uniqueIndex;size:64"` LastLogin time.Time `gorm:"type:timestamp;default:CURRENT_TIMESTAMP"`
Password string `gorm:"size:60"` // bcrypt hash
Role string `gorm:"size:20;default:'user'"` // admin or user
LastLogin time.Time `gorm:"type:datetime;default:CURRENT_TIMESTAMP"`
} }
// BeforeCreate hook to hash password before saving // BeforeCreate hook to hash password before saving

View File

@@ -156,6 +156,10 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
p.disable(fmt.Sprintf("auto migrate record stream failed %v", err)) p.disable(fmt.Sprintf("auto migrate record stream failed %v", err))
return return
} }
if err = p.DB.AutoMigrate(&EventRecordStream{}); err != nil {
p.disable(fmt.Sprintf("auto migrate event record stream failed %v", err))
return
}
} }
if err := s.AddTask(instance).WaitStarted(); err != nil { if err := s.AddTask(instance).WaitStarted(); err != nil {
p.disable(instance.StopReason().Error()) p.disable(instance.StopReason().Error())

View File

@@ -8,7 +8,6 @@ import (
"slices" "slices"
"time" "time"
"gorm.io/gorm"
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg" "m7s.live/v5/pkg"
"m7s.live/v5/pkg/config" "m7s.live/v5/pkg/config"
@@ -144,7 +143,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct { type Recorder struct {
m7s.DefaultRecorder m7s.DefaultRecorder
stream m7s.RecordStream
} }
var CustomFileName = func(job *m7s.RecordJob) string { var CustomFileName = func(job *m7s.RecordJob) string {
@@ -155,48 +153,21 @@ var CustomFileName = func(job *m7s.RecordJob) string {
} }
func (r *Recorder) createStream(start time.Time) (err error) { func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob return r.CreateStream(start, CustomFileName)
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "flv",
}
dir := filepath.Dir(r.stream.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
} }
func (r *Recorder) writeTailer(end time.Time) { func (r *Recorder) writeTailer(end time.Time) {
if r.stream.EndTime.After(r.stream.StartTime) { if r.Event.EndTime.After(r.Event.StartTime) {
return return
} }
r.stream.EndTime = end r.Event.EndTime = end
if r.RecordJob.Plugin.DB != nil { if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream) if r.RecordJob.Event != nil {
writeMetaTagQueueTask.AddTask(&eventRecordCheck{ r.RecordJob.Plugin.DB.Save(&r.Event)
DB: r.RecordJob.Plugin.DB, } else {
streamPath: r.stream.StreamPath, r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
}) }
writeMetaTagQueueTask.AddTask(m7s.NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
} }
} }
@@ -204,40 +175,6 @@ func (r *Recorder) Dispose() {
r.writeTailer(time.Now()) r.writeTailer(time.Now())
} }
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "flv",
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
query := `(start_time BETWEEN ? AND ?)
OR (end_time BETWEEN ? AND ?)
OR (? BETWEEN start_time AND end_time)
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
}
func (r *Recorder) Run() (err error) { func (r *Recorder) Run() (err error) {
var file *os.File var file *os.File
var filepositions []uint64 var filepositions []uint64
@@ -248,14 +185,14 @@ func (r *Recorder) Run() (err error) {
suber := ctx.Subscriber suber := ctx.Subscriber
noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append
startTime := time.Now() startTime := time.Now()
if ctx.BeforeDuration > 0 { if ctx.Event.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration) startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
} }
if err = r.createStream(startTime); err != nil { if err = r.createStream(startTime); err != nil {
return return
} }
if noFragment { if noFragment {
file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666) file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
if err != nil { if err != nil {
return return
} }
@@ -291,7 +228,7 @@ func (r *Recorder) Run() (err error) {
} else if ctx.RecConf.Fragment == 0 { } else if ctx.RecConf.Fragment == 0 {
_, err = file.Write(FLVHead) _, err = file.Write(FLVHead)
} else { } else {
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil { if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return return
} }
_, err = file.Write(FLVHead) _, err = file.Write(FLVHead)
@@ -307,7 +244,7 @@ func (r *Recorder) Run() (err error) {
if err = r.createStream(time.Now()); err != nil { if err = r.createStream(time.Now()); err != nil {
return return
} }
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil { if file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return return
} }
_, err = file.Write(FLVHead) _, err = file.Write(FLVHead)

View File

@@ -1873,8 +1873,8 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
Select(` Select(`
IFNULL(gc.id, 0) AS id, IFNULL(gc.id, 0) AS id,
dc.channel_id, dc.channel_id,
dc.device_id,
dc.name AS channel_name, dc.name AS channel_name,
d.device_id AS device_id,
d.name AS device_name, d.name AS device_name,
dc.status AS status, dc.status AS status,
CASE CASE
@@ -1883,11 +1883,11 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
END AS in_group END AS in_group
`). `).
Joins("LEFT JOIN "+deviceTable+" AS d ON dc.device_id = d.device_id"). Joins("LEFT JOIN "+deviceTable+" AS d ON dc.device_id = d.device_id").
Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND gc.group_id = ?", req.GroupId) Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND dc.device_id = gc.device_id AND gc.group_id = ?", req.GroupId)
// 如果有设备ID过滤条件 // 如果有设备ID过滤条件
if req.DeviceId != "" { if req.DeviceId != "" {
baseQuery = baseQuery.Where("d.device_id = ?", req.DeviceId) baseQuery = baseQuery.Where("dc.device_id = ?", req.DeviceId)
} }
// 统计符合条件的通道总数 // 统计符合条件的通道总数
@@ -1903,7 +1903,7 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
query := baseQuery query := baseQuery
// 添加排序 // 添加排序
query = query.Order("channel_id ASC") query = query.Order("dc.device_id ASC, dc.channel_id ASC")
// 如果指定了分页参数,则应用分页 // 如果指定了分页参数,则应用分页
if req.Page > 0 && req.Count > 0 { if req.Page > 0 && req.Count > 0 {
@@ -1922,12 +1922,14 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
var pbGroupChannels []*pb.GroupChannel var pbGroupChannels []*pb.GroupChannel
for _, result := range results { for _, result := range results {
channelInfo := &pb.GroupChannel{ channelInfo := &pb.GroupChannel{
Id: int32(result.ID),
GroupId: req.GroupId,
ChannelId: result.ChannelID, ChannelId: result.ChannelID,
DeviceId: result.DeviceID, DeviceId: result.DeviceID,
ChannelName: result.ChannelName, ChannelName: result.ChannelName,
DeviceName: result.DeviceName, DeviceName: result.DeviceName,
Status: result.Status, Status: result.Status,
InGroup: result.InGroup, // 设置inGroup字段 InGroup: result.InGroup,
} }
// 从内存中获取设备信息以获取传输协议 // 从内存中获取设备信息以获取传输协议
@@ -1935,13 +1937,6 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
channelInfo.StreamMode = device.StreamMode channelInfo.StreamMode = device.StreamMode
} }
if result.InGroup {
channelInfo.Id = int32(result.ID)
channelInfo.GroupId = int32(req.GroupId)
} else {
channelInfo.Id = 0
}
pbGroupChannels = append(pbGroupChannels, channelInfo) pbGroupChannels = append(pbGroupChannels, channelInfo)
} }
@@ -2082,19 +2077,19 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
InGroup bool `gorm:"column:in_group"` InGroup bool `gorm:"column:in_group"`
} }
// 构建查询 // 构建优化后的查询
query := gb.DB.Table(groupsChannelTable+" AS gc"). query := gb.DB.Table(groupsChannelTable+" AS gc").
Select(` Select(`
gc.id AS id, gc.id AS id,
gc.channel_id AS channel_id, gc.channel_id AS channel_id,
gc.device_id AS device_id, gc.device_id AS device_id,
dc.name AS channel_name, ch.name AS channel_name,
d.name AS device_name, dev.name AS device_name,
dc.status AS status, ch.status AS status,
true AS in_group true AS in_group
`). `).
Joins("LEFT JOIN "+deviceChannelTable+" AS dc ON gc.channel_id = dc.channel_id"). Joins("LEFT JOIN "+deviceChannelTable+" AS ch ON gc.device_id = ch.device_id AND gc.channel_id = ch.channel_id").
Joins("LEFT JOIN "+deviceTable+" AS d ON gc.device_id = d.device_id"). Joins("LEFT JOIN "+deviceTable+" AS dev ON ch.device_id = dev.device_id").
Where("gc.group_id = ?", groupId) Where("gc.group_id = ?", groupId)
var results []Result var results []Result
@@ -2107,7 +2102,7 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
for _, result := range results { for _, result := range results {
channelInfo := &pb.GroupChannel{ channelInfo := &pb.GroupChannel{
Id: int32(result.ID), Id: int32(result.ID),
GroupId: groupId, GroupId: groupId, // 使用函数参数 groupId
ChannelId: result.ChannelID, ChannelId: result.ChannelID,
DeviceId: result.DeviceID, DeviceId: result.DeviceID,
ChannelName: result.ChannelName, ChannelName: result.ChannelName,
@@ -2868,7 +2863,7 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
} }
// 删除设备关联的通道 // 删除设备关联的通道
if err := tx.Delete(&gb28181.DeviceChannel{DeviceID: req.Id}).Error; err != nil { if err := tx.Where("device_id = ?", req.Id).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
tx.Rollback() tx.Rollback()
resp.Code = 500 resp.Code = 500
resp.Message = "删除设备通道失败" resp.Message = "删除设备通道失败"

View File

@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
@@ -99,15 +100,20 @@ func (d *Dialog) Start() (err error) {
d.gb.dialogs.Set(d) d.gb.dialogs.Set(d)
//defer d.gb.dialogs.Remove(d) //defer d.gb.dialogs.Remove(d)
if d.gb.MediaPort.Valid() { if d.gb.tcpPort > 0 {
select { d.MediaPort = d.gb.tcpPort
case d.MediaPort = <-d.gb.tcpPorts:
default:
return fmt.Errorf("no available tcp port")
}
} else { } else {
d.MediaPort = d.gb.MediaPort[0] if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.tcpPorts:
default:
return fmt.Errorf("no available tcp port")
}
} else {
d.MediaPort = d.gb.MediaPort[0]
}
} }
ssrc := d.CreateSSRC(d.gb.Serial) ssrc := d.CreateSSRC(d.gb.Serial)
d.Info("MediaIp is ", device.MediaIp) d.Info("MediaIp is ", device.MediaIp)
@@ -266,7 +272,7 @@ func (d *Dialog) Run() (err error) {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil { if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
d.SSRC = uint32(_ssrc) d.SSRC = uint32(_ssrc)
} else { } else {
d.gb.Error("read invite response y ", "err", err) return errors.New("read invite respose y error" + err.Error())
} }
} }
case "c": case "c":
@@ -299,6 +305,18 @@ func (d *Dialog) Run() (err error) {
if d.StreamMode == "TCP-ACTIVE" { if d.StreamMode == "TCP-ACTIVE" {
pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort) pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
} else { } else {
if d.gb.tcpPort > 0 {
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
if d.gb.netListener != nil {
d.Info("use gb.netListener", d.gb.netListener.Addr())
pub.Receiver.Listener = d.gb.netListener
} else {
d.Info("listen tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
pub.Receiver.Listener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
d.gb.netListener = pub.Receiver.Listener
}
pub.Receiver.SSRC = d.SSRC
}
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
} }
pub.Receiver.StreamMode = d.StreamMode pub.Receiver.StreamMode = d.StreamMode
@@ -316,7 +334,11 @@ func (d *Dialog) GetKey() uint32 {
} }
func (d *Dialog) Dispose() { func (d *Dialog) Dispose() {
d.gb.tcpPorts <- d.MediaPort if d.gb.tcpPort == 0 {
// 如果没有设置tcp端口则将MediaPort设置为0表示不再使用
d.gb.tcpPorts <- d.MediaPort
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceID, "channelId", d.Channel.ChannelID)
if d.session != nil { if d.session != nil {
err := d.session.Bye(d) err := d.session.Bye(d)
if err != nil { if err != nil {

View File

@@ -3,9 +3,9 @@ package plugin_gb28181pro
import ( import (
"errors" "errors"
"fmt" "fmt"
"net"
"net/http" "net/http"
"os" "os"
"regexp"
"slices" "slices"
"strconv" "strconv"
"strings" "strings"
@@ -41,7 +41,7 @@ type GB28181Plugin struct {
pb.UnimplementedApiServer pb.UnimplementedApiServer
m7s.Plugin m7s.Plugin
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string Password string
Sip SipConfig Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围 MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
@@ -55,12 +55,14 @@ type GB28181Plugin struct {
forwardDialogs util.Collection[uint32, *ForwardDialog] forwardDialogs util.Collection[uint32, *ForwardDialog]
platforms util.Collection[string, *Platform] platforms util.Collection[string, *Platform]
tcpPorts chan uint16 tcpPorts chan uint16
tcpPort uint16
sipPorts []int sipPorts []int
SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"` SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"`
MediaIP string `desc:"流媒体IP用于接收流"` MediaIP string `desc:"流媒体IP用于接收流"`
deviceManager task.Manager[string, *DeviceRegisterQueueTask] deviceManager task.Manager[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel Platforms []*gb28181.PlatformModel
channels util.Collection[string, *gb28181.DeviceChannel] channels util.Collection[string, *gb28181.DeviceChannel]
netListener net.Listener
} }
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -75,6 +77,18 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
NewPullProxy: NewPullProxy, NewPullProxy: NewPullProxy,
}) })
func (gb *GB28181Plugin) Dispose() {
if gb.netListener != nil {
gb.Info("gb28181 plugin dispose")
err := gb.netListener.Close()
if err != nil {
gb.Error("Close netListener error", "error", err)
} else {
gb.Info("netListener closed")
}
}
}
func init() { func init() {
sip.SIPDebug = true sip.SIPDebug = true
} }
@@ -153,8 +167,16 @@ func (gb *GB28181Plugin) OnInit() (err error) {
if gb.MediaPort.Valid() { if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
for i := range gb.MediaPort.Size() { if gb.MediaPort.Size() == 0 {
gb.tcpPorts <- gb.MediaPort[0] + i gb.tcpPort = gb.MediaPort[0]
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
} else if gb.MediaPort.Size() == 1 {
gb.tcpPort = gb.MediaPort[0] + 1
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
} else {
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
}
} }
} else { } else {
gb.SetDescription("tcp", fmt.Sprintf("%d", gb.MediaPort[0])) gb.SetDescription("tcp", fmt.Sprintf("%d", gb.MediaPort[0]))
@@ -438,22 +460,9 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
from := req.From() from := req.From()
if from == nil || from.Address.User == "" { if from == nil || from.Address.User == "" {
gb.Error("OnRegister", "error", "no user") gb.Error("OnRegister", "error", "no user")
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid sip from format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return return
} }
deviceId := from.Address.User deviceId := from.Address.User
// 验证设备ID是否符合GB28181规范(20位数字)
if match, _ := regexp.MatchString(`^\d{20}$`, deviceId); !match {
gb.Error("OnRegister", "error", "invalid device id format, must be 20 digits", "deviceId", deviceId)
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid device ID format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
registerHandlerTask := registerHandlerTask{ registerHandlerTask := registerHandlerTask{
gb: gb, gb: gb,
req: req, req: req,

View File

@@ -44,8 +44,9 @@ type Receiver struct {
psAudio PSAudio psAudio PSAudio
RTPReader *rtp2.TCP RTPReader *rtp2.TCP
ListenAddr string ListenAddr string
listener net.Listener Listener net.Listener
StreamMode string // 数据流传输模式UDP:udp传输/TCP-ACTIVEtcp主动模式/TCP-PASSIVEtcp被动模式 StreamMode string // 数据流传输模式UDP:udp传输/TCP-ACTIVEtcp主动模式/TCP-PASSIVEtcp被动模式
SSRC uint32 // RTP SSRC
} }
func NewPSPublisher(puber *m7s.Publisher) *PSPublisher { func NewPSPublisher(puber *m7s.Publisher) *PSPublisher {
@@ -147,9 +148,19 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
p.Error("unmarshal error", "err", err) p.Error("unmarshal error", "err", err)
return return
} }
// 如果设置了SSRC过滤只处理匹配的SSRC
if p.SSRC != 0 && p.SSRC != p.Packet.SSRC {
p.Info("into single port mode, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC)
if p.TraceEnabled() {
p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC)
}
return nil
}
if lastSeq == 0 || p.SequenceNumber == lastSeq+1 { if lastSeq == 0 || p.SequenceNumber == lastSeq+1 {
if p.TraceEnabled() { if p.TraceEnabled() {
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC) p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC)
} }
copyData := make([]byte, len(p.Payload)) copyData := make([]byte, len(p.Payload))
copy(copyData, p.Payload) copy(copyData, p.Payload)
@@ -172,18 +183,24 @@ func (p *Receiver) Start() (err error) {
return nil return nil
} }
// TCP被动模式 // TCP被动模式
p.listener, err = net.Listen("tcp4", p.ListenAddr) if p.Listener == nil {
if err != nil { p.Info("start new listener", "addr", p.ListenAddr)
p.Error("start listen", "err", err) p.Listener, err = net.Listen("tcp4", p.ListenAddr)
return errors.New("start listen,err" + err.Error()) if err != nil {
p.Error("start listen", "err", err)
return errors.New("start listen,err" + err.Error())
}
} }
p.Info("start listen", "addr", p.ListenAddr) p.Info("start listen", "addr", p.ListenAddr)
return return
} }
func (p *Receiver) Dispose() { func (p *Receiver) Dispose() {
if p.listener != nil { if p.SSRC == 0 {
p.listener.Close() p.Info("into multiport mode ,close listener ", p.SSRC)
if p.Listener != nil {
p.Listener.Close()
}
} }
if p.RTPReader != nil { if p.RTPReader != nil {
p.RTPReader.Close() p.RTPReader.Close()
@@ -216,7 +233,7 @@ func (p *Receiver) Go() error {
} }
// TCP被动模式 // TCP被动模式
p.Info("start accept") p.Info("start accept")
conn, err := p.listener.Accept() conn, err := p.Listener.Accept()
if err != nil { if err != nil {
p.Error("accept", "err", err) p.Error("accept", "err", err)
return err return err

682
plugin/hls/download.go Normal file
View File

@@ -0,0 +1,682 @@
package plugin_hls
import (
"bufio"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
m7s "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/util"
hls "m7s.live/v5/plugin/hls/pkg"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
mp4 "m7s.live/v5/plugin/mp4/pkg"
"m7s.live/v5/plugin/mp4/pkg/box"
)
// requestParams 包含请求解析后的参数
type requestParams struct {
streamPath string
startTime time.Time
endTime time.Time
timeRange time.Duration
}
// fileInfo 包含文件信息
type fileInfo struct {
filePath string
startTime time.Time
endTime time.Time
startOffsetTime time.Duration
recordType string // "ts", "mp4", "fmp4"
}
// parseRequestParams 解析请求参数
func (plugin *HLSPlugin) parseRequestParams(r *http.Request) (*requestParams, error) {
// 从URL路径中提取流路径去除前缀 "/download/" 和后缀 ".ts"
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/download/"), ".ts")
// 解析URL查询参数中的时间范围start和end参数
startTime, endTime, err := util.TimeRangeQueryParse(r.URL.Query())
if err != nil {
return nil, err
}
return &requestParams{
streamPath: streamPath,
startTime: startTime,
endTime: endTime,
timeRange: endTime.Sub(startTime),
}, nil
}
// queryRecordStreams 从数据库查询录像记录
func (plugin *HLSPlugin) queryRecordStreams(params *requestParams) ([]m7s.RecordStream, error) {
// 检查数据库是否可用
if plugin.DB == nil {
return nil, fmt.Errorf("database not available")
}
var recordStreams []m7s.RecordStream
// 首先查询HLS记录 (ts)
query := plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type = ?", params.streamPath, "hls")
// 添加时间范围查询条件
if !params.startTime.IsZero() && !params.endTime.IsZero() {
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
params.endTime, params.startTime, params.startTime, params.endTime)
}
err := query.Order("start_time ASC").Find(&recordStreams).Error
if err != nil {
return nil, err
}
// 如果没有找到HLS记录尝试查询MP4记录
if len(recordStreams) == 0 {
query = plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type IN (?)", params.streamPath, []string{"mp4", "fmp4"})
if !params.startTime.IsZero() && !params.endTime.IsZero() {
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
params.endTime, params.startTime, params.startTime, params.endTime)
}
err = query.Order("start_time ASC").Find(&recordStreams).Error
if err != nil {
return nil, err
}
}
return recordStreams, nil
}
// buildFileInfoList 构建文件信息列表
func (plugin *HLSPlugin) buildFileInfoList(recordStreams []m7s.RecordStream, startTime, endTime time.Time) ([]*fileInfo, bool) {
var fileInfoList []*fileInfo
var found bool
for _, record := range recordStreams {
// 检查文件是否存在
if !util.Exist(record.FilePath) {
plugin.Warn("Record file not found", "filePath", record.FilePath)
continue
}
var startOffsetTime time.Duration
recordStartTime := record.StartTime
recordEndTime := record.EndTime
// 计算文件内的偏移时间
if startTime.After(recordStartTime) {
startOffsetTime = startTime.Sub(recordStartTime)
}
// 检查是否在时间范围内
if recordEndTime.Before(startTime) || recordStartTime.After(endTime) {
continue
}
fileInfoList = append(fileInfoList, &fileInfo{
filePath: record.FilePath,
startTime: recordStartTime,
endTime: recordEndTime,
startOffsetTime: startOffsetTime,
recordType: record.Type,
})
found = true
}
return fileInfoList, found
}
// hasOnlyMp4Records 检查是否只有MP4记录
func (plugin *HLSPlugin) hasOnlyMp4Records(fileInfoList []*fileInfo) bool {
if len(fileInfoList) == 0 {
return false
}
for _, info := range fileInfoList {
if info.recordType == "hls" {
return false
}
}
return true
}
// filterTsFiles 过滤HLS TS文件
func (plugin *HLSPlugin) filterTsFiles(fileInfoList []*fileInfo) []*fileInfo {
var filteredList []*fileInfo
for _, info := range fileInfoList {
if info.recordType == "hls" {
filteredList = append(filteredList, info)
}
}
plugin.Debug("TS files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
return filteredList
}
// filterMp4Files 过滤MP4文件
func (plugin *HLSPlugin) filterMp4Files(fileInfoList []*fileInfo) []*fileInfo {
var filteredList []*fileInfo
for _, info := range fileInfoList {
if info.recordType == "mp4" || info.recordType == "fmp4" {
filteredList = append(filteredList, info)
}
}
plugin.Debug("MP4 files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
return filteredList
}
// processMp4ToTs 将MP4记录转换为TS输出
func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
plugin.Info("Converting MP4 records to TS", "count", len(fileInfoList))
// 设置HTTP响应头
w.Header().Set("Content-Type", "video/mp2t")
w.Header().Set("Content-Disposition", "attachment")
// 创建一个TS写入器在循环外面所有MP4文件共享同一个TsInMemory
tsWriter := &simpleTsWriter{
TsInMemory: &hls.TsInMemory{},
plugin: plugin,
}
// 对于MP4到TS的转换我们采用简化的方法
// 直接将每个MP4文件转换输出
for _, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
plugin.Debug("Converting MP4 file to TS", "path", info.filePath)
// 创建MP4解复用器
demuxer := &mp4.DemuxerRange{
StartTime: params.startTime,
EndTime: params.endTime,
Streams: []m7s.RecordStream{{
FilePath: info.filePath,
StartTime: info.startTime,
EndTime: info.endTime,
Type: info.recordType,
}},
}
// 设置回调函数
demuxer.OnVideoExtraData = tsWriter.onVideoExtraData
demuxer.OnAudioExtraData = tsWriter.onAudioExtraData
demuxer.OnVideoSample = tsWriter.onVideoSample
demuxer.OnAudioSample = tsWriter.onAudioSample
// 执行解复用和转换
err := demuxer.Demux(r.Context())
if err != nil {
plugin.Error("MP4 to TS conversion failed", "err", err, "file", info.filePath)
if !tsWriter.hasWritten {
http.Error(w, "Conversion failed", http.StatusInternalServerError)
}
return
}
}
// 将所有累积的 TsInMemory 内容写入到响应
_, err := tsWriter.WriteTo(w)
if err != nil {
plugin.Error("Failed to write TS data to response", "error", err)
return
}
plugin.Info("MP4 to TS conversion completed")
}
// simpleTsWriter 简化的TS写入器
type simpleTsWriter struct {
*hls.TsInMemory
plugin *HLSPlugin
hasWritten bool
spsData []byte
ppsData []byte
videoCodec box.MP4_CODEC_TYPE
audioCodec box.MP4_CODEC_TYPE
}
func (w *simpleTsWriter) WritePMT() {
// 初始化 TsInMemory 的 PMT
var videoCodec, audioCodec [4]byte
switch w.videoCodec {
case box.MP4_CODEC_H264:
copy(videoCodec[:], []byte("H264"))
case box.MP4_CODEC_H265:
copy(videoCodec[:], []byte("H265"))
}
switch w.audioCodec {
case box.MP4_CODEC_AAC:
copy(audioCodec[:], []byte("MP4A"))
}
w.WritePMTPacket(audioCodec, videoCodec)
w.hasWritten = true
}
// onVideoExtraData 处理视频序列头
func (w *simpleTsWriter) onVideoExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
w.videoCodec = codecType
// 解析并存储SPS/PPS数据
if codecType == box.MP4_CODEC_H264 && len(data) > 0 {
if w.plugin != nil {
w.plugin.Debug("Processing H264 extra data", "size", len(data))
}
// 解析AVCC格式的extra data
if len(data) >= 8 {
// AVCC格式: configurationVersion(1) + AVCProfileIndication(1) + profile_compatibility(1) + AVCLevelIndication(1) +
// lengthSizeMinusOne(1) + numOfSequenceParameterSets(1) + ...
offset := 5 // 跳过前5个字节
if offset < len(data) {
// 读取SPS数量
numSPS := data[offset] & 0x1f
offset++
// 解析SPS
for i := 0; i < int(numSPS) && offset < len(data)-1; i++ {
if offset+1 >= len(data) {
break
}
spsLength := int(data[offset])<<8 | int(data[offset+1])
offset += 2
if offset+spsLength <= len(data) {
// 添加起始码并存储SPS
w.spsData = make([]byte, 4+spsLength)
copy(w.spsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
copy(w.spsData[4:], data[offset:offset+spsLength])
offset += spsLength
if w.plugin != nil {
w.plugin.Debug("Extracted SPS", "length", spsLength)
}
break // 只取第一个SPS
}
}
// 读取PPS数量
if offset < len(data) {
numPPS := data[offset]
offset++
// 解析PPS
for i := 0; i < int(numPPS) && offset < len(data)-1; i++ {
if offset+1 >= len(data) {
break
}
ppsLength := int(data[offset])<<8 | int(data[offset+1])
offset += 2
if offset+ppsLength <= len(data) {
// 添加起始码并存储PPS
w.ppsData = make([]byte, 4+ppsLength)
copy(w.ppsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
copy(w.ppsData[4:], data[offset:offset+ppsLength])
if w.plugin != nil {
w.plugin.Debug("Extracted PPS", "length", ppsLength)
}
break // 只取第一个PPS
}
}
}
}
}
}
return nil
}
// onAudioExtraData 处理音频序列头
func (w *simpleTsWriter) onAudioExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
w.audioCodec = codecType
w.plugin.Debug("Processing audio extra data", "codec", codecType, "size", len(data))
return nil
}
// onVideoSample 处理视频样本
func (w *simpleTsWriter) onVideoSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
if !w.hasWritten {
w.WritePMT()
}
w.plugin.Debug("Processing video sample", "size", len(sample.Data), "keyFrame", sample.KeyFrame, "timestamp", sample.Timestamp)
// 转换AVCC格式到Annex-B格式
annexBData, err := w.convertAVCCToAnnexB(sample.Data, sample.KeyFrame)
if err != nil {
w.plugin.Error("Failed to convert AVCC to Annex-B", "error", err)
return err
}
if len(annexBData) == 0 {
w.plugin.Warn("Empty Annex-B data after conversion")
return nil
}
// 创建视频帧结构
videoFrame := mpegts.MpegtsPESFrame{
Pid: mpegts.PID_VIDEO,
IsKeyFrame: sample.KeyFrame,
}
// 创建 AnnexB 帧
annexBFrame := &pkg.AnnexB{
PTS: (time.Duration(sample.Timestamp) + time.Duration(sample.CTS)) * 90,
DTS: time.Duration(sample.Timestamp) * 90, // 对于MP4转换假设PTS=DTS
}
// 根据编解码器类型设置 Hevc 标志
if codecType == box.MP4_CODEC_H265 {
annexBFrame.Hevc = true
}
annexBFrame.AppendOne(annexBData)
// 使用 WriteVideoFrame 写入TS包
err = w.WriteVideoFrame(annexBFrame, &videoFrame)
if err != nil {
w.plugin.Error("Failed to write video frame", "error", err)
return err
}
return nil
}
// convertAVCCToAnnexB 将AVCC格式转换为Annex-B格式
func (w *simpleTsWriter) convertAVCCToAnnexB(avccData []byte, isKeyFrame bool) ([]byte, error) {
if len(avccData) == 0 {
return nil, fmt.Errorf("empty AVCC data")
}
var annexBBuffer []byte
// 如果是关键帧先添加SPS和PPS
if isKeyFrame {
if len(w.spsData) > 0 {
annexBBuffer = append(annexBBuffer, w.spsData...)
w.plugin.Debug("Added SPS to key frame", "spsSize", len(w.spsData))
}
if len(w.ppsData) > 0 {
annexBBuffer = append(annexBBuffer, w.ppsData...)
w.plugin.Debug("Added PPS to key frame", "ppsSize", len(w.ppsData))
}
}
// 解析AVCC格式的NAL单元
offset := 0
nalCount := 0
for offset < len(avccData) {
// AVCC格式4字节长度 + NAL数据
if offset+4 > len(avccData) {
break
}
// 读取NAL单元长度大端序
nalLength := int(avccData[offset])<<24 |
int(avccData[offset+1])<<16 |
int(avccData[offset+2])<<8 |
int(avccData[offset+3])
offset += 4
if nalLength <= 0 || offset+nalLength > len(avccData) {
w.plugin.Warn("Invalid NAL length", "length", nalLength, "remaining", len(avccData)-offset)
break
}
nalData := avccData[offset : offset+nalLength]
offset += nalLength
nalCount++
if len(nalData) > 0 {
nalType := nalData[0] & 0x1f
w.plugin.Debug("Converting NAL unit", "type", nalType, "length", nalLength)
// 添加起始码前缀
annexBBuffer = append(annexBBuffer, []byte{0x00, 0x00, 0x00, 0x01}...)
annexBBuffer = append(annexBBuffer, nalData...)
}
}
if nalCount == 0 {
return nil, fmt.Errorf("no NAL units found in AVCC data")
}
w.plugin.Debug("AVCC to Annex-B conversion completed",
"inputSize", len(avccData),
"outputSize", len(annexBBuffer),
"nalUnits", nalCount)
return annexBBuffer, nil
}
// onAudioSample 处理音频样本
func (w *simpleTsWriter) onAudioSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
if !w.hasWritten {
w.WritePMT()
}
w.plugin.Debug("Processing audio sample", "codec", codecType, "size", len(sample.Data), "timestamp", sample.Timestamp)
// 创建音频帧结构
audioFrame := mpegts.MpegtsPESFrame{
Pid: mpegts.PID_AUDIO,
}
// 根据编解码器类型处理音频数据
switch codecType {
case box.MP4_CODEC_AAC: // AAC
// 创建 ADTS 帧
adtsFrame := &pkg.ADTS{
DTS: time.Duration(sample.Timestamp) * 90,
}
// 将音频数据添加到帧中
copy(adtsFrame.NextN(len(sample.Data)), sample.Data)
// 使用 WriteAudioFrame 写入TS包
err := w.WriteAudioFrame(adtsFrame, &audioFrame)
if err != nil {
w.plugin.Error("Failed to write audio frame", "error", err)
return err
}
default:
// 对于非AAC音频暂时使用原来的PES包方式
pesPacket := mpegts.MpegTsPESPacket{
Header: mpegts.MpegTsPESHeader{
PacketStartCodePrefix: 0x000001,
StreamID: mpegts.STREAM_ID_AUDIO,
},
}
// 设置可选字段
pesPacket.Header.ConstTen = 0x80
pesPacket.Header.PtsDtsFlags = 0x80 // 只有PTS
pesPacket.Header.PesHeaderDataLength = 5
pesPacket.Header.Pts = uint64(sample.Timestamp)
pesPacket.Buffers = append(pesPacket.Buffers, sample.Data)
// 写入TS包
err := w.WritePESPacket(&audioFrame, pesPacket)
if err != nil {
w.plugin.Error("Failed to write audio PES packet", "error", err)
return err
}
}
return nil
}
// processTsFiles 处理原生TS文件拼接
func (plugin *HLSPlugin) processTsFiles(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
plugin.Info("Processing TS files", "count", len(fileInfoList))
// 设置HTTP响应头
w.Header().Set("Content-Type", "video/mp2t")
w.Header().Set("Content-Disposition", "attachment")
var writer io.Writer = w
var totalSize uint64
// 第一次遍历:计算总大小
for _, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
fileInfo, err := os.Stat(info.filePath)
if err != nil {
plugin.Error("Failed to stat file", "path", info.filePath, "err", err)
continue
}
totalSize += uint64(fileInfo.Size())
}
// 设置内容长度
w.Header().Set("Content-Length", strconv.FormatUint(totalSize, 10))
w.WriteHeader(http.StatusOK)
// 第二次遍历:写入数据
for i, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
plugin.Debug("Processing TS file", "path", info.filePath)
file, err := os.Open(info.filePath)
if err != nil {
plugin.Error("Failed to open file", "path", info.filePath, "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reader := bufio.NewReader(file)
if i == 0 {
// 第一个文件,直接拷贝
_, err = io.Copy(writer, reader)
} else {
// 后续文件跳过PAT/PMT包只拷贝媒体数据
err = plugin.copyTsFileSkipHeaders(writer, reader)
}
file.Close()
if err != nil {
plugin.Error("Failed to copy file", "path", info.filePath, "err", err)
return
}
}
plugin.Info("TS download completed")
}
// copyTsFileSkipHeaders 拷贝TS文件跳过PAT/PMT包
func (plugin *HLSPlugin) copyTsFileSkipHeaders(writer io.Writer, reader *bufio.Reader) error {
buffer := make([]byte, mpegts.TS_PACKET_SIZE)
for {
n, err := io.ReadFull(reader, buffer)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return err
}
if n != mpegts.TS_PACKET_SIZE {
continue
}
// 检查同步字节
if buffer[0] != 0x47 {
continue
}
// 提取PID
pid := uint16(buffer[1]&0x1f)<<8 | uint16(buffer[2])
// 跳过PAT(PID=0)和PMT(PID=256)包
if pid == mpegts.PID_PAT || pid == mpegts.PID_PMT {
continue
}
// 写入媒体数据包
_, err = writer.Write(buffer)
if err != nil {
return err
}
}
return nil
}
// download 下载处理函数
func (plugin *HLSPlugin) download(w http.ResponseWriter, r *http.Request) {
// 解析请求参数
params, err := plugin.parseRequestParams(r)
if err != nil {
plugin.Error("Failed to parse request params", "err", err)
http.Error(w, "Invalid parameters", http.StatusBadRequest)
return
}
plugin.Info("TS download request", "streamPath", params.streamPath, "timeRange", params.timeRange)
// 查询录像记录
recordStreams, err := plugin.queryRecordStreams(params)
if err != nil {
plugin.Error("Failed to query record streams", "err", err)
http.Error(w, "Database error", http.StatusInternalServerError)
return
}
if len(recordStreams) == 0 {
plugin.Warn("No records found", "streamPath", params.streamPath)
http.Error(w, "No records found", http.StatusNotFound)
return
}
// 构建文件信息列表
fileInfoList, found := plugin.buildFileInfoList(recordStreams, params.startTime, params.endTime)
if !found {
plugin.Warn("No valid files found", "streamPath", params.streamPath)
http.Error(w, "No valid files found", http.StatusNotFound)
return
}
// 检查文件类型并处理
if plugin.hasOnlyMp4Records(fileInfoList) {
// 只有MP4记录转换为TS
mp4Files := plugin.filterMp4Files(fileInfoList)
plugin.processMp4ToTs(w, r, mp4Files, params)
} else {
// 有TS记录优先使用TS文件
tsFiles := plugin.filterTsFiles(fileInfoList)
if len(tsFiles) > 0 {
plugin.processTsFiles(w, r, tsFiles, params)
} else {
// 没有TS文件使用MP4转换
mp4Files := plugin.filterMp4Files(fileInfoList)
plugin.processMp4ToTs(w, r, mp4Files, params)
}
}
}

View File

@@ -59,6 +59,7 @@ func (p *HLSPlugin) OnInit() (err error) {
func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc { func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{ return map[string]http.HandlerFunc{
"/vod/{streamPath...}": p.vod, "/vod/{streamPath...}": p.vod,
"/download/{streamPath...}": p.download,
"/api/record/start/{streamPath...}": p.API_record_start, "/api/record/start/{streamPath...}": p.API_record_start,
"/api/record/stop/{id}": p.API_record_stop, "/api/record/stop/{id}": p.API_record_stop,
} }

View File

@@ -2,16 +2,13 @@ package hls
import ( import (
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"time" "time"
"gorm.io/gorm"
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg" "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config" "m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts" mpegts "m7s.live/v5/plugin/hls/pkg/ts"
) )
@@ -22,7 +19,6 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct { type Recorder struct {
m7s.DefaultRecorder m7s.DefaultRecorder
stream m7s.RecordStream
ts *TsInFile ts *TsInFile
pesAudio *mpegts.MpegtsPESFrame pesAudio *mpegts.MpegtsPESFrame
pesVideo *mpegts.MpegtsPESFrame pesVideo *mpegts.MpegtsPESFrame
@@ -39,81 +35,11 @@ var CustomFileName = func(job *m7s.RecordJob) string {
} }
func (r *Recorder) createStream(start time.Time) (err error) { func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob return r.CreateStream(start, CustomFileName)
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "hls",
}
dir := filepath.Dir(r.stream.FilePath)
dir = filepath.Clean(dir)
if err = os.MkdirAll(dir, 0755); err != nil {
r.Error("create directory failed", "err", err, "dir", dir)
return
}
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "hls",
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
query := `(start_time BETWEEN ? AND ?)
OR (end_time BETWEEN ? AND ?)
OR (? BETWEEN start_time AND end_time)
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
} }
func (r *Recorder) writeTailer(end time.Time) { func (r *Recorder) writeTailer(end time.Time) {
if r.stream.EndTime.After(r.stream.StartTime) { r.WriteTail(end, nil)
return
}
r.stream.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
}
} }
func (r *Recorder) Dispose() { func (r *Recorder) Dispose() {
@@ -131,9 +57,9 @@ func (r *Recorder) createNewTs() {
r.ts.Close() r.ts.Close()
} }
var err error var err error
r.ts, err = NewTsInFile(r.stream.FilePath) r.ts, err = NewTsInFile(r.Event.FilePath)
if err != nil { if err != nil {
r.Error("create ts file failed", "err", err, "path", r.stream.FilePath) r.Error("create ts file failed", "err", err, "path", r.Event.FilePath)
return return
} }
if oldPMT.Len() > 0 { if oldPMT.Len() > 0 {
@@ -175,8 +101,8 @@ func (r *Recorder) Run() (err error) {
ctx := &r.RecordJob ctx := &r.RecordJob
suber := ctx.Subscriber suber := ctx.Subscriber
startTime := time.Now() startTime := time.Now()
if ctx.BeforeDuration > 0 { if ctx.Event.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration) startTime = startTime.Add(-time.Duration(ctx.Event.BeforeDuration) * time.Millisecond)
} }
// 创建第一个片段记录 // 创建第一个片段记录

View File

@@ -165,10 +165,9 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
// 构建查询条件,查找指定时间范围内的录制记录 // 构建查询条件,查找指定时间范围内的录制记录
queryRecord := m7s.RecordStream{ queryRecord := m7s.RecordStream{
Mode: m7s.RecordModeAuto,
Type: "mp4", Type: "mp4",
} }
p.DB.Where(&queryRecord).Find(&streams, "end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath) p.DB.Where(&queryRecord).Find(&streams, "event_id=0 AND end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
// 创建 MP4 混合器 // 创建 MP4 混合器
muxer := mp4.NewMuxer(flag) muxer := mp4.NewMuxer(flag)
@@ -533,42 +532,44 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
Append: false, Append: false,
Fragment: 0, Fragment: 0,
FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")), FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")),
Mode: config.RecordModeEvent,
Event: &config.RecordEvent{
EventId: req.EventId,
EventLevel: req.EventLevel,
EventName: req.EventName,
EventDesc: req.EventDesc,
BeforeDuration: uint32(beforeDuration / time.Millisecond),
AfterDuration: uint32(afterDuration / time.Millisecond),
},
} }
//recordJob := recorder.GetRecordJob() //recordJob := recorder.GetRecordJob()
var subconfig config.Subscribe var subconfig config.Subscribe
defaults.SetDefaults(&subconfig) defaults.SetDefaults(&subconfig)
subconfig.BufferTime = beforeDuration subconfig.BufferTime = beforeDuration
recordJob := p.Record(stream, recordConf, &subconfig) p.Record(stream, recordConf, &subconfig)
recordJob.EventId = req.EventId
recordJob.EventLevel = req.EventLevel
recordJob.EventName = req.EventName
recordJob.EventDesc = req.EventDesc
recordJob.AfterDuration = afterDuration
recordJob.BeforeDuration = beforeDuration
recordJob.Mode = m7s.RecordModeEvent
} }
} else { } else {
if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间 if tmpJob.Event != nil { //当前有事件录像正在录制,则更新该录像的结束时间
tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration tmpJob.Event.AfterDuration = tmpJob.Subscriber.VideoReader.AbsTime + uint32(afterDuration/time.Millisecond)
if p.DB != nil {
p.DB.Save(&tmpJob.Event)
}
} else { //当前有自动录像正在录制,则生成事件录像的记录,而不去生成事件录像的文件 } else { //当前有自动录像正在录制,则生成事件录像的记录,而不去生成事件录像的文件
recordStream := &m7s.RecordStream{ newEvent := &config.RecordEvent{
StreamPath: req.StreamPath,
EventId: req.EventId, EventId: req.EventId,
EventLevel: req.EventLevel, EventLevel: req.EventLevel,
EventDesc: req.EventDesc,
EventName: req.EventName, EventName: req.EventName,
Mode: m7s.RecordModeEvent, EventDesc: req.EventDesc,
BeforeDuration: beforeDuration, BeforeDuration: uint32(beforeDuration / time.Millisecond),
AfterDuration: afterDuration, AfterDuration: uint32(afterDuration / time.Millisecond),
Type: "mp4",
} }
now := time.Now()
startTime := now.Add(-beforeDuration)
endTime := now.Add(afterDuration)
recordStream.StartTime = startTime
recordStream.EndTime = endTime
if p.DB != nil { if p.DB != nil {
p.DB.Save(&recordStream) p.DB.Save(&m7s.EventRecordStream{
RecordEvent: newEvent,
RecordStream: m7s.RecordStream{
StreamPath: req.StreamPath,
},
})
} }
} }
} }

1209
plugin/mp4/api_extract.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -91,9 +91,6 @@ func (p *DeleteRecordTask) deleteOldestFile() {
} }
for _, filePath := range filePaths { for _, filePath := range filePaths {
for p.getDiskOutOfSpace(filePath) { for p.getDiskOutOfSpace(filePath) {
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelLow, // 查询条件event_level = 1,非重要事件
}
var eventRecords []m7s.RecordStream var eventRecords []m7s.RecordStream
// 使用不同的方法进行路径匹配避免ESCAPE语法问题 // 使用不同的方法进行路径匹配避免ESCAPE语法问题
// 解决方案用MySQL能理解的简单方式匹配路径前缀 // 解决方案用MySQL能理解的简单方式匹配路径前缀
@@ -103,7 +100,7 @@ func (p *DeleteRecordTask) deleteOldestFile() {
searchPattern := basePath + "%" searchPattern := basePath + "%"
p.Info("deleteOldestFile", "searching with path pattern", searchPattern) p.Info("deleteOldestFile", "searching with path pattern", searchPattern)
err := p.DB.Where(&queryRecord).Where("end_time IS NOT NULL"). err := p.DB.Where("event_id=0 AND end_time IS NOT NULL").
Where("file_path LIKE ?", searchPattern). Where("file_path LIKE ?", searchPattern).
Order("end_time ASC").Find(&eventRecords).Error Order("end_time ASC").Find(&eventRecords).Error
if err == nil { if err == nil {
@@ -149,14 +146,11 @@ func (t *DeleteRecordTask) Tick(any) {
if t.RecordFileExpireDays <= 0 { if t.RecordFileExpireDays <= 0 {
return return
} }
//搜索event_records表中event_level值为1的(非重要)数据并将其create_time与当前时间比对大于RecordFileExpireDays则进行删除数据库标记is_delete为1磁盘上删除录像文件 //搜索event_records表中event_id值为0的(非事件)录像并将其create_time与当前时间比对大于RecordFileExpireDays则进行删除数据库标记is_delete为1磁盘上删除录像文件
var eventRecords []m7s.RecordStream var eventRecords []m7s.RecordStream
expireTime := time.Now().AddDate(0, 0, -t.RecordFileExpireDays) expireTime := time.Now().AddDate(0, 0, -t.RecordFileExpireDays)
t.Debug("RecordFileExpireDays is set to auto delete oldestfile", "expireTime", expireTime.Format("2006-01-02 15:04:05")) t.Debug("RecordFileExpireDays is set to auto delete oldestfile", "expireTime", expireTime.Format("2006-01-02 15:04:05"))
queryRecord := m7s.RecordStream{ err := t.DB.Find(&eventRecords, "event_id=0 AND end_time < ? AND end_time IS NOT NULL", expireTime).Error
EventLevel: m7s.EventLevelLow, // 查询条件event_level = low,非重要事件
}
err := t.DB.Where(&queryRecord).Find(&eventRecords, "end_time < ? AND end_time IS NOT NULL", expireTime).Error
if err == nil { if err == nil {
for _, record := range eventRecords { for _, record := range eventRecords {
t.Info("RecordFileExpireDays is set to auto delete oldestfile", "ID", record.ID, "create time", record.EndTime, "filepath", record.FilePath) t.Info("RecordFileExpireDays is set to auto delete oldestfile", "ID", record.ID, "create time", record.EndTime, "filepath", record.FilePath)

View File

@@ -76,7 +76,11 @@ var _ = m7s.InstallPlugin[MP4Plugin](m7s.PluginMeta{
func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc { func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{ return map[string]http.HandlerFunc{
"/download/{streamPath...}": p.download, "/download/{streamPath...}": p.download,
"/extractClip/{streamPath...}": p.extractClipToFileHandel,
"/extractCompressed/{streamPath...}": p.extractCompressedVideoHandel,
"/extractGop/{streamPath...}": p.extractGopVideoHandel,
"/snap/{streamPath...}": p.snapHandel,
} }
} }

View File

@@ -54,8 +54,16 @@ func (t *TrakBox) Unmarshal(buf []byte) (b IBox, err error) {
return t, err return t, err
} }
// SampleCallback 定义样本处理回调函数类型
type SampleCallback func(sample *Sample, sampleIndex int) error
// ParseSamples parses the sample table and builds the sample list // ParseSamples parses the sample table and builds the sample list
func (t *TrakBox) ParseSamples() (samplelist []Sample) { func (t *TrakBox) ParseSamples() (samplelist []Sample) {
return t.ParseSamplesWithCallback(nil)
}
// ParseSamplesWithCallback parses the sample table and builds the sample list with optional callback
func (t *TrakBox) ParseSamplesWithCallback(callback SampleCallback) (samplelist []Sample) {
stbl := t.MDIA.MINF.STBL stbl := t.MDIA.MINF.STBL
var chunkOffsets []uint64 var chunkOffsets []uint64
if stbl.STCO != nil { if stbl.STCO != nil {
@@ -150,6 +158,17 @@ func (t *TrakBox) ParseSamples() (samplelist []Sample) {
} }
} }
// 调用回调函数处理每个样本
if callback != nil {
for i := range samplelist {
if err := callback(&samplelist[i], i); err != nil {
// 如果回调返回错误,可以选择记录或处理,但不中断解析
// 这里为了保持向后兼容性,我们继续处理
continue
}
}
}
return samplelist return samplelist
} }

View File

@@ -6,8 +6,10 @@ import (
"slices" "slices"
"m7s.live/v5/pkg" "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/mp4/pkg/box" "m7s.live/v5/plugin/mp4/pkg/box"
. "m7s.live/v5/plugin/mp4/pkg/box" rtmp "m7s.live/v5/plugin/rtmp/pkg"
) )
type ( type (
@@ -30,7 +32,7 @@ type (
Number uint32 Number uint32
CryptByteBlock uint8 CryptByteBlock uint8
SkipByteBlock uint8 SkipByteBlock uint8
PsshBoxes []*PsshBox PsshBoxes []*box.PsshBox
} }
SubSamplePattern struct { SubSamplePattern struct {
BytesClear uint16 BytesClear uint16
@@ -43,16 +45,28 @@ type (
chunkoffset uint64 chunkoffset uint64
} }
RTMPFrame struct {
Frame any // 可以是 *rtmp.RTMPVideo 或 *rtmp.RTMPAudio
}
Demuxer struct { Demuxer struct {
reader io.ReadSeeker reader io.ReadSeeker
Tracks []*Track Tracks []*Track
ReadSampleIdx []uint32 ReadSampleIdx []uint32
IsFragment bool IsFragment bool
// pssh []*PsshBox // pssh []*box.PsshBox
moov *MoovBox moov *box.MoovBox
mdat *MediaDataBox mdat *box.MediaDataBox
mdatOffset uint64 mdatOffset uint64
QuicTime bool QuicTime bool
// 预生成的 RTMP 帧
RTMPVideoSequence *rtmp.RTMPVideo
RTMPAudioSequence *rtmp.RTMPAudio
RTMPFrames []RTMPFrame
// RTMP 帧生成配置
RTMPAllocator *util.ScalableMemoryAllocator
} }
) )
@@ -63,6 +77,10 @@ func NewDemuxer(r io.ReadSeeker) *Demuxer {
} }
func (d *Demuxer) Demux() (err error) { func (d *Demuxer) Demux() (err error) {
return d.DemuxWithAllocator(nil)
}
func (d *Demuxer) DemuxWithAllocator(allocator *util.ScalableMemoryAllocator) (err error) {
// decodeVisualSampleEntry := func() (offset int, err error) { // decodeVisualSampleEntry := func() (offset int, err error) {
// var encv VisualSampleEntry // var encv VisualSampleEntry
@@ -96,7 +114,7 @@ func (d *Demuxer) Demux() (err error) {
// } // }
// return // return
// } // }
var b IBox var b box.IBox
var offset uint64 var offset uint64
for { for {
b, err = box.ReadFrom(d.reader) b, err = box.ReadFrom(d.reader)
@@ -107,53 +125,59 @@ func (d *Demuxer) Demux() (err error) {
return err return err
} }
offset += b.Size() offset += b.Size()
switch box := b.(type) { switch boxData := b.(type) {
case *FileTypeBox: case *box.FileTypeBox:
if slices.Contains(box.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) { if slices.Contains(boxData.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) {
d.QuicTime = true d.QuicTime = true
} }
case *FreeBox: case *box.FreeBox:
case *MediaDataBox: case *box.MediaDataBox:
d.mdat = box d.mdat = boxData
d.mdatOffset = offset - b.Size() + uint64(box.HeaderSize()) d.mdatOffset = offset - b.Size() + uint64(boxData.HeaderSize())
case *MoovBox: case *box.MoovBox:
if box.MVEX != nil { if boxData.MVEX != nil {
d.IsFragment = true d.IsFragment = true
} }
for _, trak := range box.Tracks { for _, trak := range boxData.Tracks {
track := &Track{} track := &Track{}
track.TrackId = trak.TKHD.TrackID track.TrackId = trak.TKHD.TrackID
track.Duration = uint32(trak.TKHD.Duration) track.Duration = uint32(trak.TKHD.Duration)
track.Timescale = trak.MDIA.MDHD.Timescale track.Timescale = trak.MDIA.MDHD.Timescale
track.Samplelist = trak.ParseSamples() // 创建RTMP样本处理回调
var sampleCallback box.SampleCallback
if d.RTMPAllocator != nil {
sampleCallback = d.createRTMPSampleCallback(track, trak)
}
track.Samplelist = trak.ParseSamplesWithCallback(sampleCallback)
if len(trak.MDIA.MINF.STBL.STSD.Entries) > 0 { if len(trak.MDIA.MINF.STBL.STSD.Entries) > 0 {
entryBox := trak.MDIA.MINF.STBL.STSD.Entries[0] entryBox := trak.MDIA.MINF.STBL.STSD.Entries[0]
switch entry := entryBox.(type) { switch entry := entryBox.(type) {
case *AudioSampleEntry: case *box.AudioSampleEntry:
switch entry.Type() { switch entry.Type() {
case TypeMP4A: case box.TypeMP4A:
track.Cid = MP4_CODEC_AAC track.Cid = box.MP4_CODEC_AAC
case TypeALAW: case box.TypeALAW:
track.Cid = MP4_CODEC_G711A track.Cid = box.MP4_CODEC_G711A
case TypeULAW: case box.TypeULAW:
track.Cid = MP4_CODEC_G711U track.Cid = box.MP4_CODEC_G711U
case TypeOPUS: case box.TypeOPUS:
track.Cid = MP4_CODEC_OPUS track.Cid = box.MP4_CODEC_OPUS
} }
track.SampleRate = entry.Samplerate track.SampleRate = entry.Samplerate
track.ChannelCount = uint8(entry.ChannelCount) track.ChannelCount = uint8(entry.ChannelCount)
track.SampleSize = entry.SampleSize track.SampleSize = entry.SampleSize
switch extra := entry.ExtraData.(type) { switch extra := entry.ExtraData.(type) {
case *ESDSBox: case *box.ESDSBox:
track.Cid, track.ExtraData = DecodeESDescriptor(extra.Data) track.Cid, track.ExtraData = box.DecodeESDescriptor(extra.Data)
} }
case *VisualSampleEntry: case *box.VisualSampleEntry:
track.ExtraData = entry.ExtraData.(*DataBox).Data track.ExtraData = entry.ExtraData.(*box.DataBox).Data
switch entry.Type() { switch entry.Type() {
case TypeAVC1: case box.TypeAVC1:
track.Cid = MP4_CODEC_H264 track.Cid = box.MP4_CODEC_H264
case TypeHVC1, TypeHEV1: case box.TypeHVC1, box.TypeHEV1:
track.Cid = MP4_CODEC_H265 track.Cid = box.MP4_CODEC_H265
} }
track.Width = uint32(entry.Width) track.Width = uint32(entry.Width)
track.Height = uint32(entry.Height) track.Height = uint32(entry.Height)
@@ -161,9 +185,9 @@ func (d *Demuxer) Demux() (err error) {
} }
d.Tracks = append(d.Tracks, track) d.Tracks = append(d.Tracks, track)
} }
d.moov = box d.moov = boxData
case *MovieFragmentBox: case *box.MovieFragmentBox:
for _, traf := range box.TRAFs { for _, traf := range boxData.TRAFs {
track := d.Tracks[traf.TFHD.TrackID-1] track := d.Tracks[traf.TFHD.TrackID-1]
track.defaultSize = traf.TFHD.DefaultSampleSize track.defaultSize = traf.TFHD.DefaultSampleSize
track.defaultDuration = traf.TFHD.DefaultSampleDuration track.defaultDuration = traf.TFHD.DefaultSampleDuration
@@ -171,6 +195,7 @@ func (d *Demuxer) Demux() (err error) {
} }
} }
d.ReadSampleIdx = make([]uint32, len(d.Tracks)) d.ReadSampleIdx = make([]uint32, len(d.Tracks))
// for _, track := range d.Tracks { // for _, track := range d.Tracks {
// if len(track.Samplelist) > 0 { // if len(track.Samplelist) > 0 {
// track.StartDts = uint64(track.Samplelist[0].DTS) * 1000 / uint64(track.Timescale) // track.StartDts = uint64(track.Samplelist[0].DTS) * 1000 / uint64(track.Timescale)
@@ -180,7 +205,7 @@ func (d *Demuxer) Demux() (err error) {
return nil return nil
} }
func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) { func (d *Demuxer) SeekTime(dts uint64) (sample *box.Sample, err error) {
var audioTrack, videoTrack *Track var audioTrack, videoTrack *Track
for _, track := range d.Tracks { for _, track := range d.Tracks {
if track.Cid.IsAudio() { if track.Cid.IsAudio() {
@@ -218,6 +243,54 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
return return
} }
/**
* @brief 函数跳帧到dts 前面的第一个关键帧位置
*
* @param 参数名dts 跳帧位置
*
* @todo 待实现的功能或改进点 audioTrack 没有同步改进
* @author erroot
* @date 250614
*
**/
func (d *Demuxer) SeekTimePreIDR(dts uint64) (sample *Sample, err error) {
var audioTrack, videoTrack *Track
for _, track := range d.Tracks {
if track.Cid.IsAudio() {
audioTrack = track
} else if track.Cid.IsVideo() {
videoTrack = track
}
}
if videoTrack != nil {
idx := videoTrack.SeekPreIDR(dts)
if idx == -1 {
return nil, errors.New("seek failed")
}
d.ReadSampleIdx[videoTrack.TrackId-1] = uint32(idx)
sample = &videoTrack.Samplelist[idx]
if audioTrack != nil {
for i, sample := range audioTrack.Samplelist {
if sample.Offset < int64(videoTrack.Samplelist[idx].Offset) {
continue
}
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(i)
break
}
}
} else if audioTrack != nil {
idx := audioTrack.Seek(dts)
if idx == -1 {
return nil, errors.New("seek failed")
}
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(idx)
sample = &audioTrack.Samplelist[idx]
} else {
return nil, pkg.ErrNoTrack
}
return
}
// func (d *Demuxer) decodeTRUN(trun *TrackRunBox) { // func (d *Demuxer) decodeTRUN(trun *TrackRunBox) {
// dataOffset := trun.Dataoffset // dataOffset := trun.Dataoffset
// nextDts := d.currentTrack.StartDts // nextDts := d.currentTrack.StartDts
@@ -377,10 +450,10 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
// return nil // return nil
// } // }
func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) { func (d *Demuxer) ReadSample(yield func(*Track, box.Sample) bool) {
for { for {
maxdts := int64(-1) maxdts := int64(-1)
minTsSample := Sample{Timestamp: uint32(maxdts)} minTsSample := box.Sample{Timestamp: uint32(maxdts)}
var whichTrack *Track var whichTrack *Track
whichTracki := 0 whichTracki := 0
for i, track := range d.Tracks { for i, track := range d.Tracks {
@@ -414,9 +487,9 @@ func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) {
} }
} }
func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) { func (d *Demuxer) RangeSample(yield func(*Track, *box.Sample) bool) {
for { for {
var minTsSample *Sample var minTsSample *box.Sample
var whichTrack *Track var whichTrack *Track
whichTracki := 0 whichTracki := 0
for i, track := range d.Tracks { for i, track := range d.Tracks {
@@ -448,6 +521,244 @@ func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
} }
// GetMoovBox returns the Movie Box from the demuxer // GetMoovBox returns the Movie Box from the demuxer
func (d *Demuxer) GetMoovBox() *MoovBox { func (d *Demuxer) GetMoovBox() *box.MoovBox {
return d.moov return d.moov
} }
// CreateRTMPSequenceFrame 创建 RTMP 序列帧
func (d *Demuxer) CreateRTMPSequenceFrame(track *Track, allocator *util.ScalableMemoryAllocator) (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio, err error) {
switch track.Cid {
case box.MP4_CODEC_H264:
videoSeq = &rtmp.RTMPVideo{}
videoSeq.SetAllocator(allocator)
videoSeq.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData)
case box.MP4_CODEC_H265:
videoSeq = &rtmp.RTMPVideo{}
videoSeq.SetAllocator(allocator)
videoSeq.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData)
case box.MP4_CODEC_AAC:
audioSeq = &rtmp.RTMPAudio{}
audioSeq.SetAllocator(allocator)
audioSeq.Append([]byte{0xaf, 0x00}, track.ExtraData)
}
return
}
// ConvertSampleToRTMP 将 MP4 sample 转换为 RTMP 格式
func (d *Demuxer) ConvertSampleToRTMP(track *Track, sample box.Sample, allocator *util.ScalableMemoryAllocator, timestampOffset uint64) (videoFrame *rtmp.RTMPVideo, audioFrame *rtmp.RTMPAudio, err error) {
switch track.Cid {
case box.MP4_CODEC_H264:
videoFrame = &rtmp.RTMPVideo{}
videoFrame.SetAllocator(allocator)
videoFrame.CTS = sample.CTS
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_H265:
videoFrame = &rtmp.RTMPVideo{}
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.CTS)
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
var head []byte
var b0 byte = 0b1010_0000
if sample.KeyFrame {
b0 = 0b1001_0000
}
if videoFrame.CTS == 0 {
head = videoFrame.NextN(5)
head[0] = b0 | rtmp.PacketTypeCodedFramesX
} else {
head = videoFrame.NextN(8)
head[0] = b0 | rtmp.PacketTypeCodedFrames
util.PutBE(head[5:8], videoFrame.CTS) // cts
}
copy(head[1:], codec.FourCC_H265[:])
videoFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_AAC:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0xaf, 0x01})
audioFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_G711A:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x72})
audioFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_G711U:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x82})
audioFrame.AddRecycleBytes(sample.Data)
}
return
}
// GetRTMPSequenceFrames 获取预生成的 RTMP 序列帧
func (d *Demuxer) GetRTMPSequenceFrames() (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio) {
return d.RTMPVideoSequence, d.RTMPAudioSequence
}
// IterateRTMPFrames 迭代预生成的 RTMP 帧
func (d *Demuxer) IterateRTMPFrames(timestampOffset uint64, yield func(*RTMPFrame) bool) {
for i := range d.RTMPFrames {
frame := &d.RTMPFrames[i]
// 应用时间戳偏移
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
f.Timestamp += uint32(timestampOffset)
case *rtmp.RTMPAudio:
f.Timestamp += uint32(timestampOffset)
}
if !yield(frame) {
return
}
}
}
// GetMaxTimestamp 获取所有帧中的最大时间戳
func (d *Demuxer) GetMaxTimestamp() uint64 {
var maxTimestamp uint64
for _, frame := range d.RTMPFrames {
var timestamp uint64
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
timestamp = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timestamp = uint64(f.Timestamp)
}
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
// generateRTMPFrames 生成RTMP序列帧和所有帧数据
func (d *Demuxer) generateRTMPFrames(allocator *util.ScalableMemoryAllocator) (err error) {
// 生成序列帧
for _, track := range d.Tracks {
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
d.RTMPVideoSequence, _, err = d.CreateRTMPSequenceFrame(track, allocator)
if err != nil {
return err
}
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
_, d.RTMPAudioSequence, err = d.CreateRTMPSequenceFrame(track, allocator)
if err != nil {
return err
}
}
}
// 预生成所有 RTMP 帧
d.RTMPFrames = make([]RTMPFrame, 0)
// 收集所有样本并按时间戳排序
type sampleInfo struct {
track *Track
sample box.Sample
sampleIndex uint32
trackIndex int
}
var allSamples []sampleInfo
for trackIdx, track := range d.Tracks {
for sampleIdx, sample := range track.Samplelist {
// 读取样本数据
if _, err = d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return err
}
sample.Data = allocator.Malloc(sample.Size)
if _, err = io.ReadFull(d.reader, sample.Data); err != nil {
allocator.Free(sample.Data)
return err
}
allSamples = append(allSamples, sampleInfo{
track: track,
sample: sample,
sampleIndex: uint32(sampleIdx),
trackIndex: trackIdx,
})
}
}
// 按时间戳排序样本
slices.SortFunc(allSamples, func(a, b sampleInfo) int {
timeA := uint64(a.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(a.track.Timescale)
timeB := uint64(b.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(b.track.Timescale)
if timeA < timeB {
return -1
} else if timeA > timeB {
return 1
}
return 0
})
// 预生成 RTMP 帧
for _, sampleInfo := range allSamples {
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(sampleInfo.track, sampleInfo.sample, allocator, 0)
if err != nil {
return err
}
if videoFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
}
if audioFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
}
}
return nil
}
// createRTMPSampleCallback 创建RTMP样本处理回调函数
func (d *Demuxer) createRTMPSampleCallback(track *Track, trak *box.TrakBox) box.SampleCallback {
// 首先生成序列帧
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
videoSeq, _, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
if err == nil {
d.RTMPVideoSequence = videoSeq
}
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
_, audioSeq, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
if err == nil {
d.RTMPAudioSequence = audioSeq
}
}
return func(sample *box.Sample, sampleIndex int) error {
// 读取样本数据
if _, err := d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return err
}
sample.Data = d.RTMPAllocator.Malloc(sample.Size)
if _, err := io.ReadFull(d.reader, sample.Data); err != nil {
d.RTMPAllocator.Free(sample.Data)
return err
}
// 转换为 RTMP 格式
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(track, *sample, d.RTMPAllocator, 0)
if err != nil {
return err
}
// 内部收集RTMP帧
if videoFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
}
if audioFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
}
return nil
}
}

View File

@@ -3,13 +3,12 @@ package mp4
import ( import (
"errors" "errors"
"io" "io"
"slices"
"strings" "strings"
"time" "time"
m7s "m7s.live/v5" m7s "m7s.live/v5"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/mp4/pkg/box"
rtmp "m7s.live/v5/plugin/rtmp/pkg" rtmp "m7s.live/v5/plugin/rtmp/pkg"
) )
@@ -35,9 +34,40 @@ func (p *HTTPReader) Run() (err error) {
content, err = io.ReadAll(p.ReadCloser) content, err = io.ReadAll(p.ReadCloser)
demuxer = NewDemuxer(strings.NewReader(string(content))) demuxer = NewDemuxer(strings.NewReader(string(content)))
} }
if err = demuxer.Demux(); err != nil {
// 设置RTMP分配器以启用RTMP帧收集
demuxer.RTMPAllocator = allocator
if err = demuxer.DemuxWithAllocator(allocator); err != nil {
return return
} }
// 获取demuxer内部收集的RTMP帧
rtmpFrames := demuxer.RTMPFrames
// 按时间戳排序所有帧
slices.SortFunc(rtmpFrames, func(a, b RTMPFrame) int {
var timeA, timeB uint64
switch f := a.Frame.(type) {
case *rtmp.RTMPVideo:
timeA = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timeA = uint64(f.Timestamp)
}
switch f := b.Frame.(type) {
case *rtmp.RTMPVideo:
timeB = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timeB = uint64(f.Timestamp)
}
if timeA < timeB {
return -1
} else if timeA > timeB {
return 1
}
return 0
})
publisher.OnSeek = func(seekTime time.Time) { publisher.OnSeek = func(seekTime time.Time) {
p.Stop(errors.New("seek")) p.Stop(errors.New("seek"))
pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat)) pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
@@ -48,103 +78,61 @@ func (p *HTTPReader) Run() (err error) {
seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey)) seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey))
demuxer.SeekTime(uint64(seekTime.UnixMilli())) demuxer.SeekTime(uint64(seekTime.UnixMilli()))
} }
for _, track := range demuxer.Tracks {
switch track.Cid { // 读取预生成的 RTMP 序列帧
case box.MP4_CODEC_H264: videoSeq, audioSeq := demuxer.GetRTMPSequenceFrames()
var sequence rtmp.RTMPVideo if videoSeq != nil {
sequence.SetAllocator(allocator) err = publisher.WriteVideo(videoSeq)
sequence.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData) if err != nil {
err = publisher.WriteVideo(&sequence) return err
case box.MP4_CODEC_H265: }
var sequence rtmp.RTMPVideo }
sequence.SetAllocator(allocator) if audioSeq != nil {
sequence.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData) err = publisher.WriteAudio(audioSeq)
err = publisher.WriteVideo(&sequence) if err != nil {
case box.MP4_CODEC_AAC: return err
var sequence rtmp.RTMPAudio
sequence.SetAllocator(allocator)
sequence.Append([]byte{0xaf, 0x00}, track.ExtraData)
err = publisher.WriteAudio(&sequence)
} }
} }
// 计算最大时间戳用于累计偏移 // 计算最大时间戳用于累计偏移
var maxTimestamp uint64 var maxTimestamp uint64
for track, sample := range demuxer.ReadSample { for _, frame := range rtmpFrames {
timestamp := uint64(sample.Timestamp) * 1000 / uint64(track.Timescale) var timestamp uint64
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
timestamp = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timestamp = uint64(f.Timestamp)
}
if timestamp > maxTimestamp { if timestamp > maxTimestamp {
maxTimestamp = timestamp maxTimestamp = timestamp
} }
} }
var timestampOffset uint64 var timestampOffset uint64
loop := p.PullJob.Loop loop := p.PullJob.Loop
for { for {
demuxer.ReadSampleIdx = make([]uint32, len(demuxer.Tracks)) // 使用预生成的 RTMP 帧进行播放
for track, sample := range demuxer.ReadSample { for _, frame := range rtmpFrames {
if p.IsStopped() { if p.IsStopped() {
return return nil
} }
if _, err = demuxer.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return // 应用时间戳偏移
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
f.Timestamp += uint32(timestampOffset)
err = publisher.WriteVideo(f)
case *rtmp.RTMPAudio:
f.Timestamp += uint32(timestampOffset)
err = publisher.WriteAudio(f)
} }
sample.Data = allocator.Malloc(sample.Size)
if _, err = io.ReadFull(demuxer.reader, sample.Data); err != nil { if err != nil {
allocator.Free(sample.Data) return err
return
}
switch track.Cid {
case box.MP4_CODEC_H264:
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = sample.CTS
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteVideo(&videoFrame)
case box.MP4_CODEC_H265:
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.CTS)
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
var head []byte
var b0 byte = 0b1010_0000
if sample.KeyFrame {
b0 = 0b1001_0000
}
if videoFrame.CTS == 0 {
head = videoFrame.NextN(5)
head[0] = b0 | rtmp.PacketTypeCodedFramesX
} else {
head = videoFrame.NextN(8)
head[0] = b0 | rtmp.PacketTypeCodedFrames
util.PutBE(head[5:8], videoFrame.CTS) // cts
}
copy(head[1:], codec.FourCC_H265[:])
videoFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteVideo(&videoFrame)
case box.MP4_CODEC_AAC:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0xaf, 0x01})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
case box.MP4_CODEC_G711A:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x72})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
case box.MP4_CODEC_G711U:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x82})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
} }
} }
if loop >= 0 { if loop >= 0 {
loop-- loop--
if loop == -1 { if loop == -1 {

View File

@@ -7,7 +7,6 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"gorm.io/gorm"
m7s "m7s.live/v5" m7s "m7s.live/v5"
"m7s.live/v5/pkg" "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/codec"
@@ -107,39 +106,6 @@ func (t *writeTrailerTask) Run() (err error) {
return return
} }
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "mp4",
StreamPath: t.streamPath,
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
queryRecord.Mode = m7s.RecordModeAuto
query := `start_time <= ? and end_time >= ?`
t.DB.Where(&queryRecord).Where(query, recordStream.EndTime, recordStream.StartTime).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
}
func init() { func init() {
m7s.Servers.AddTask(&writeTrailerQueueTask) m7s.Servers.AddTask(&writeTrailerQueueTask)
} }
@@ -150,20 +116,12 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct { type Recorder struct {
m7s.DefaultRecorder m7s.DefaultRecorder
muxer *Muxer muxer *Muxer
file *os.File file *os.File
stream m7s.RecordStream
} }
func (r *Recorder) writeTailer(end time.Time) { func (r *Recorder) writeTailer(end time.Time) {
r.stream.EndTime = end r.WriteTail(end, &writeTrailerQueueTask)
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
writeTrailerQueueTask.AddTask(&eventRecordCheck{
DB: r.RecordJob.Plugin.DB,
streamPath: r.stream.StreamPath,
})
}
writeTrailerQueueTask.AddTask(&writeTrailerTask{ writeTrailerQueueTask.AddTask(&writeTrailerTask{
muxer: r.muxer, muxer: r.muxer,
file: r.file, file: r.file,
@@ -178,46 +136,7 @@ var CustomFileName = func(job *m7s.RecordJob) string {
} }
func (r *Recorder) createStream(start time.Time) (err error) { func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob return r.CreateStream(start, CustomFileName)
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "mp4",
}
dir := filepath.Dir(r.stream.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
r.file, err = os.Create(r.stream.FilePath)
if err != nil {
return
}
if recordJob.RecConf.Type == "fmp4" {
r.stream.Type = "fmp4"
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.stream.StreamPath)
} else {
r.muxer = NewMuxerWithStreamPath(0, r.stream.StreamPath)
}
r.muxer.WriteInitSegment(r.file)
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
} }
func (r *Recorder) Dispose() { func (r *Recorder) Dispose() {
@@ -231,17 +150,28 @@ func (r *Recorder) Run() (err error) {
sub := recordJob.Subscriber sub := recordJob.Subscriber
var audioTrack, videoTrack *Track var audioTrack, videoTrack *Track
startTime := time.Now() startTime := time.Now()
if recordJob.BeforeDuration > 0 { if recordJob.Event != nil {
startTime = startTime.Add(-recordJob.BeforeDuration) startTime = startTime.Add(-time.Duration(recordJob.Event.BeforeDuration) * time.Millisecond)
} }
err = r.createStream(startTime) err = r.createStream(startTime)
if err != nil { if err != nil {
return return
} }
r.file, err = os.Create(r.Event.FilePath)
if err != nil {
return
}
if recordJob.RecConf.Type == "fmp4" {
r.Event.Type = "fmp4"
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath)
} else {
r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath)
}
r.muxer.WriteInitSegment(r.file)
var at, vt *pkg.AVTrack var at, vt *pkg.AVTrack
checkEventRecordStop := func(absTime uint32) (err error) { checkEventRecordStop := func(absTime uint32) (err error) {
if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.AfterDuration+recordJob.BeforeDuration { if absTime >= recordJob.Event.AfterDuration+recordJob.Event.BeforeDuration {
r.RecordJob.Stop(task.ErrStopByUser) r.RecordJob.Stop(task.ErrStopByUser)
} }
return return
@@ -269,9 +199,9 @@ func (r *Recorder) Run() (err error) {
} }
return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error { return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error {
r.stream.Duration = sub.AudioReader.AbsTime r.Event.Duration = sub.AudioReader.AbsTime
if sub.VideoReader == nil { if sub.VideoReader == nil {
if recordJob.AfterDuration != 0 { if recordJob.Event != nil {
err := checkEventRecordStop(sub.VideoReader.AbsTime) err := checkEventRecordStop(sub.VideoReader.AbsTime)
if err != nil { if err != nil {
return err return err
@@ -314,9 +244,9 @@ func (r *Recorder) Run() (err error) {
Timestamp: uint32(dts), Timestamp: uint32(dts),
}) })
}, func(video *rtmp.RTMPVideo) error { }, func(video *rtmp.RTMPVideo) error {
r.stream.Duration = sub.VideoReader.AbsTime r.Event.Duration = sub.VideoReader.AbsTime
if sub.VideoReader.Value.IDR { if sub.VideoReader.Value.IDR {
if recordJob.AfterDuration != 0 { if recordJob.Event != nil {
err := checkEventRecordStop(sub.VideoReader.AbsTime) err := checkEventRecordStop(sub.VideoReader.AbsTime)
if err != nil { if err != nil {
return err return err

View File

@@ -102,6 +102,28 @@ func (track *Track) Seek(dts uint64) int {
return -1 return -1
} }
/**
* @brief 函数跳帧到dts 前面的第一个关键帧位置
*
* @param 参数名dts 跳帧位置
*
* @author erroot
* @date 250614
*
**/
func (track *Track) SeekPreIDR(dts uint64) int {
idx := 0
for i, sample := range track.Samplelist {
if track.Cid.IsVideo() && sample.KeyFrame {
idx = i
}
if sample.Timestamp*1000/uint32(track.Timescale) > uint32(dts) {
break
}
}
return idx
}
func (track *Track) makeEdtsBox() *ContainerBox { func (track *Track) makeEdtsBox() *ContainerBox {
return CreateContainerBox(TypeEDTS, track.makeElstBox()) return CreateContainerBox(TypeEDTS, track.makeElstBox())
} }

View File

@@ -185,8 +185,6 @@ func (t *RecordRecoveryTask) recoverRecordFromFile(filePath string) error {
FilePath: filePath, FilePath: filePath,
StreamPath: streamPath, StreamPath: streamPath,
Type: "mp4", Type: "mp4",
Mode: m7s.RecordModeAuto, // 默认为自动录制模式
EventLevel: m7s.EventLevelLow, // 默认为低级别事件
} }
// 设置开始和结束时间 // 设置开始和结束时间

338
plugin/mp4/util.go Normal file
View File

@@ -0,0 +1,338 @@
package plugin_mp4
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"image"
"image/color"
"image/jpeg"
"io"
"log"
"os"
"os/exec"
mp4 "m7s.live/v5/plugin/mp4/pkg"
"m7s.live/v5/plugin/mp4/pkg/box"
)
func saveAsJPG(img image.Image, path string) error {
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
opt := jpeg.Options{Quality: 90}
return jpeg.Encode(file, img, &opt)
}
func ExtractH264SPSPPS(extraData []byte) (sps, pps []byte, err error) {
if len(extraData) < 7 {
return nil, nil, fmt.Errorf("extradata too short")
}
// 解析 SPS 数量 (第6字节低5位)
spsCount := int(extraData[5] & 0x1F)
offset := 6 // 当前解析位置
// 提取 SPS
for i := 0; i < spsCount; i++ {
if offset+2 > len(extraData) {
return nil, nil, fmt.Errorf("invalid sps length")
}
spsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
offset += 2
if offset+spsLen > len(extraData) {
return nil, nil, fmt.Errorf("sps data overflow")
}
sps = extraData[offset : offset+spsLen]
offset += spsLen
}
// 提取 PPS 数量
if offset >= len(extraData) {
return nil, nil, fmt.Errorf("missing pps count")
}
ppsCount := int(extraData[offset])
offset++
// 提取 PPS
for i := 0; i < ppsCount; i++ {
if offset+2 > len(extraData) {
return nil, nil, fmt.Errorf("invalid pps length")
}
ppsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
offset += 2
if offset+ppsLen > len(extraData) {
return nil, nil, fmt.Errorf("pps data overflow")
}
pps = extraData[offset : offset+ppsLen]
offset += ppsLen
}
return sps, pps, nil
}
// 转换函数(支持动态插入参数集)
func ConvertAVCCH264ToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
var buf bytes.Buffer
pos := 0
for pos < len(data) {
if pos+4 > len(data) {
break
}
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
pos += 4
nalStart := pos
pos += int(nalSize)
if pos > len(data) {
break
}
nalu := data[nalStart:pos]
nalType := nalu[0] & 0x1F
// 关键帧前插入SPS/PPS仅需执行一次
if *isFirst && nalType == 5 {
sps, pps, err := ExtractH264SPSPPS(extraData)
if err != nil {
//panic(err)
return nil, err
}
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(sps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(pps)
//buf.Write(videoTrack.ExtraData)
*isFirst = false // 仅首帧插入
}
// 保留SEI单元类型6和所有其他单元
if nalType == 5 || nalType == 6 { // IDR/SEI用4字节起始码
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
} else {
buf.Write([]byte{0x00, 0x00, 0x01}) // 其他用3字节
}
buf.Write(nalu)
}
return buf.Bytes(), nil
}
/*
H.264与H.265的AVCC格式差异
VPS引入H.265新增视频参数集VPS用于描述多层编码、时序等信息
*/
// 提取H.265的VPS/SPS/PPSHEVCDecoderConfigurationRecord格式
func ExtractHEVCParams(extraData []byte) (vps, sps, pps []byte, err error) {
if len(extraData) < 22 {
return nil, nil, nil, errors.New("extra data too short")
}
// HEVC的extradata格式参考ISO/IEC 14496-15
offset := 22 // 跳过头部22字节
if offset+2 > len(extraData) {
return nil, nil, nil, errors.New("invalid extra data")
}
numOfArrays := int(extraData[offset])
offset++
for i := 0; i < numOfArrays; i++ {
if offset+3 > len(extraData) {
break
}
naluType := extraData[offset] & 0x3F
offset++
count := int(binary.BigEndian.Uint16(extraData[offset:]))
offset += 2
for j := 0; j < count; j++ {
if offset+2 > len(extraData) {
break
}
naluSize := int(binary.BigEndian.Uint16(extraData[offset:]))
offset += 2
if offset+naluSize > len(extraData) {
break
}
naluData := extraData[offset : offset+naluSize]
offset += naluSize
// 根据类型存储参数集
switch naluType {
case 32: // VPS
if vps == nil {
vps = make([]byte, len(naluData))
copy(vps, naluData)
}
case 33: // SPS
if sps == nil {
sps = make([]byte, len(naluData))
copy(sps, naluData)
}
case 34: // PPS
if pps == nil {
pps = make([]byte, len(naluData))
copy(pps, naluData)
}
}
}
}
if vps == nil || sps == nil || pps == nil {
return nil, nil, nil, errors.New("missing required parameter sets")
}
return vps, sps, pps, nil
}
// H.265的AVCC转Annex B
func ConvertAVCCHEVCToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
var buf bytes.Buffer
pos := 0
// 首帧插入VPS/SPS/PPS
if *isFirst {
vps, sps, pps, err := ExtractHEVCParams(extraData)
if err == nil {
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(vps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(sps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(pps)
} else {
return nil, err
}
}
// 处理NALU
for pos < len(data) {
if pos+4 > len(data) {
break
}
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
pos += 4
nalStart := pos
pos += int(nalSize)
if pos > len(data) {
break
}
nalu := data[nalStart:pos]
nalType := (nalu[0] >> 1) & 0x3F // H.265的NALU类型在头部的第2-7位
// 关键帧或参数集使用4字节起始码
if nalType == 19 || nalType == 20 || nalType >= 32 && nalType <= 34 {
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
} else {
buf.Write([]byte{0x00, 0x00, 0x01})
}
buf.Write(nalu)
}
return buf.Bytes(), nil
}
// ffmpeg -hide_banner -i gop.mp4 -vf "select=eq(n\,15)" -vframes 1 -f image2 -pix_fmt bgr24 output.bmp
func ProcessWithFFmpeg(samples []box.Sample, index int, videoTrack *mp4.Track) (image.Image, error) {
// code := "h264"
// if videoTrack.Cid == box.MP4_CODEC_H265 {
// code = "hevc"
// }
cmd := exec.Command("ffmpeg",
"-hide_banner",
//"-f", code, //"h264" 强制指定输入格式为H.264裸流
"-i", "pipe:0",
"-vf", fmt.Sprintf("select=eq(n\\,%d)", index),
"-vframes", "1",
"-pix_fmt", "bgr24",
"-f", "rawvideo",
"pipe:1")
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
go func() {
errOutput, _ := io.ReadAll(stderr)
log.Printf("FFmpeg stderr: %s", errOutput)
}()
if err = cmd.Start(); err != nil {
log.Printf("cmd.Start失败: %v", err)
return nil, err
}
go func() {
defer stdin.Close()
isFirst := true
for _, sample := range samples {
if videoTrack.Cid == box.MP4_CODEC_H264 {
annexb, _ := ConvertAVCCH264ToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
if _, err := stdin.Write(annexb); err != nil {
log.Printf("写入失败: %v", err)
break
}
} else {
annexb, _ := ConvertAVCCHEVCToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
if _, err := stdin.Write(annexb); err != nil {
log.Printf("写入失败: %v", err)
break
}
}
}
}()
// 读取原始RGB数据
var buf bytes.Buffer
if _, err = io.Copy(&buf, stdout); err != nil {
log.Printf("读取失败: %v", err)
return nil, err
}
if err = cmd.Wait(); err != nil {
log.Printf("cmd.Wait失败: %v", err)
return nil, err
}
//log.Printf("ffmpeg 提取成功: data size:%v", buf.Len())
// 转换为image.Image对象
data := buf.Bytes()
//width, height := parseBMPDimensions(data)
width := int(videoTrack.Width)
height := int(videoTrack.Height)
log.Printf("ffmpeg size: %v,%v", width, height)
//FFmpeg的 rawvideo 输出默认采用​​从上到下​​的扫描方式
img := image.NewRGBA(image.Rect(0, 0, width, height))
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
//pos := (height-y-1)*width*3 + x*3
pos := (y*width + x) * 3 // 关键修复:按行顺序读取
img.Set(x, y, color.RGBA{
R: data[pos+2],
G: data[pos+1],
B: data[pos],
A: 255,
})
}
}
return img, nil
}

View File

@@ -238,10 +238,9 @@ func (p *RecordFilePuller) queryRecordStreams(startTime, endTime time.Time) (err
return pkg.ErrNoDB return pkg.ErrNoDB
} }
queryRecord := RecordStream{ queryRecord := RecordStream{
Mode: RecordModeAuto,
Type: p.Type, Type: p.Type,
} }
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL) tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "event_id=0 AND end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
if tx.Error != nil { if tx.Error != nil {
return tx.Error return tx.Error
} }

View File

@@ -1,6 +1,8 @@
package m7s package m7s
import ( import (
"os"
"path/filepath"
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
@@ -12,58 +14,46 @@ import (
"m7s.live/v5/pkg" "m7s.live/v5/pkg"
) )
const (
RecordModeAuto RecordMode = "auto"
RecordModeEvent RecordMode = "event"
EventLevelLow EventLevel = "low"
EventLevelHigh EventLevel = "high"
)
type ( type (
EventLevel = string IRecorder interface {
RecordMode = string
IRecorder interface {
task.ITask task.ITask
GetRecordJob() *RecordJob GetRecordJob() *RecordJob
} }
RecorderFactory = func(config.Record) IRecorder RecorderFactory = func(config.Record) IRecorder
RecordJob struct { // RecordEvent 包含录像事件的公共字段
EventRecordStream struct {
CreatedAt time.Time
*config.RecordEvent
RecordStream
}
RecordJob struct {
task.Job task.Job
StreamPath string // 对应本地流 Event *config.RecordEvent
Plugin *Plugin StreamPath string // 对应本地流
Subscriber *Subscriber Plugin *Plugin
SubConf *config.Subscribe Subscriber *Subscriber
RecConf *config.Record SubConf *config.Subscribe
recorder IRecorder RecConf *config.Record
EventId string `json:"eventId" desc:"事件编号"` recorder IRecorder
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式"`
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长"`
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长"`
EventDesc string `json:"eventDesc" desc:"事件描述"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别"`
EventName string `json:"eventName" desc:"事件名称"`
} }
DefaultRecorder struct { DefaultRecorder struct {
task.Task task.Task
RecordJob RecordJob RecordJob RecordJob
Event EventRecordStream
} }
RecordStream struct { RecordStream struct {
ID uint `gorm:"primarykey"` ID uint `gorm:"primarykey"`
StartTime, EndTime time.Time `gorm:"type:datetime;default:NULL"` StartTime time.Time `gorm:"default:NULL"`
Duration uint32 `gorm:"comment:录像时长;default:0"` EndTime time.Time `gorm:"default:NULL"`
EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"` Duration uint32 `gorm:"comment:录像时长;default:0"`
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式event=事件录像模式;default:'auto'"` Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"` Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:BIGINT;comment:事件前缓存时长;default:30000000000"` FilePath string
AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:BIGINT;comment:事件后缓存时长;default:30000000000"` StreamPath string
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"` AudioCodec string
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"` VideoCodec string
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"` DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
EventLevel EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
FilePath string
StreamPath string
AudioCodec, VideoCodec string
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
} }
) )
@@ -75,6 +65,52 @@ func (r *DefaultRecorder) Start() (err error) {
return r.RecordJob.Subscribe() return r.RecordJob.Subscribe()
} }
func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*RecordJob) string) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.Event.RecordStream = RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: customFileName(recordJob),
Type: recordJob.RecConf.Type,
}
dir := filepath.Dir(r.Event.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if sub.Publisher.HasAudioTrack() {
r.Event.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
if sub.Publisher.HasVideoTrack() {
r.Event.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
}
if recordJob.Plugin.DB != nil {
if recordJob.Event != nil {
r.Event.RecordEvent = recordJob.Event
recordJob.Plugin.DB.Save(&r.Event)
} else {
recordJob.Plugin.DB.Save(&r.Event.RecordStream)
}
}
return
}
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {
r.Event.EndTime = end
if r.RecordJob.Plugin.DB != nil {
// 将事件和录像记录关联
if r.RecordJob.Event != nil {
r.RecordJob.Plugin.DB.Save(&r.Event)
} else {
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
}
}
if tailJob == nil {
return
}
tailJob.AddTask(NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
}
func (p *RecordJob) GetKey() string { func (p *RecordJob) GetKey() string {
return p.RecConf.FilePath return p.RecConf.FilePath
} }
@@ -150,3 +186,27 @@ func (p *RecordJob) Start() (err error) {
p.AddTask(p.recorder, p.Logger) p.AddTask(p.recorder, p.Logger)
return return
} }
func NewEventRecordCheck(t string, streamPath string, db *gorm.DB) *eventRecordCheck {
return &eventRecordCheck{
DB: db,
streamPath: streamPath,
Type: t,
}
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
Type string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []EventRecordStream
t.DB.Find(&eventRecordStreams, "type=? AND level=high AND stream_path=?", t.Type, t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
for _, recordStream := range eventRecordStreams {
t.DB.Model(&EventRecordStream{}).Where(`level=low AND start_time <= ? and end_time >= ?`, recordStream.EndTime, recordStream.StartTime).Update("level", config.EventLevelHigh)
}
return
}

202
scripts/packet_replayer.py Normal file
View File

@@ -0,0 +1,202 @@
#!/usr/bin/env python3
import argparse
from scapy.all import rdpcap, IP, TCP, UDP, Raw, send, sr1, sr, PcapReader
import sys
import time
from collections import defaultdict
import random
import threading
import queue
import socket
class PacketReplayer:
def __init__(self, pcap_file, target_ip, target_port):
self.pcap_file = pcap_file
self.target_ip = target_ip
self.target_port = target_port
self.connections = defaultdict(list) # 存储每个连接的包序列
self.response_queue = queue.Queue()
self.stop_reading = threading.Event()
self.socket = None
def establish_tcp_connection(self, src_port):
"""建立TCP连接"""
print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...")
try:
# 创建socket对象
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定源端口(如果指定了端口)
if src_port > 0:
try:
self.socket.bind(('0.0.0.0', src_port))
except socket.error as e:
print(f"指定端口 {src_port} 被占用,将使用随机端口")
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
else:
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
# 获取实际使用的端口
actual_port = self.socket.getsockname()[1]
print(f"使用本地端口: {actual_port}")
# 设置超时
self.socket.settimeout(5)
# 连接目标
self.socket.connect((self.target_ip, self.target_port))
print("TCP连接已建立")
return True
except Exception as e:
print(f"建立连接失败: {e}")
if self.socket:
self.socket.close()
self.socket = None
return False
def process_packet(self, packet, src_ip=None, src_port=None, protocol=None):
"""处理单个数据包"""
if IP not in packet:
return
# 检查源IP
if src_ip and packet[IP].src != src_ip:
return
# 检查协议和源端口
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
elif not protocol: # 如果没有指定协议则包含所有IP包
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
def response_reader(self, src_port):
"""持续读取服务器响应的线程函数"""
while not self.stop_reading.is_set() and self.socket:
try:
# 使用socket接收数据
data = self.socket.recv(4096)
if data:
self.response_queue.put(data)
print(f"收到响应: {len(data)} 字节")
except socket.timeout:
continue
except Exception as e:
if not self.stop_reading.is_set():
print(f"读取响应时出错: {e}")
break
time.sleep(0.1)
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
"""边读取边重放数据包"""
print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}")
try:
# 使用PcapReader逐包读取
reader = PcapReader(self.pcap_file)
packet_count = 0
connection_established = False
# 读取并处理数据包
for packet in reader:
packet_count += 1
if IP not in packet:
continue
# 检查源IP
if src_ip and packet[IP].src != src_ip:
continue
# 检查协议和源端口
current_src_port = None
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
elif not protocol: # 如果没有指定协议则包含所有IP包
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
else:
continue
else:
continue
# 找到第一个符合条件的包,建立连接
if not connection_established:
if not self.establish_tcp_connection(current_src_port):
print("无法建立连接,退出")
return
# 启动响应读取线程
self.stop_reading.clear()
reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,))
reader_thread.daemon = True
reader_thread.start()
connection_established = True
# 发送当前数据包
try:
if Raw in packet:
self.socket.send(packet[Raw].load)
packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time)))
print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (负载大小: {len(packet[Raw].load)} 字节)")
if delay > 0:
time.sleep(delay)
except Exception as e:
print(f"发送数据包 {packet_count} 时出错: {e}")
sys.exit(1) # 发送失败直接退出进程
print(f"总共处理了 {packet_count} 个数据包")
except Exception as e:
print(f"处理数据包时出错: {e}")
sys.exit(1) # 其他错误也直接退出进程
finally:
# 关闭连接和停止读取线程
self.stop_reading.set()
if self.socket:
self.socket.close()
self.socket = None
reader.close()
def main():
parser = argparse.ArgumentParser(description='Wireshark数据包重放工具')
parser.add_argument('pcap_file', help='pcap文件路径')
parser.add_argument('target_ip', help='目标IP地址')
parser.add_argument('target_port', type=int, help='目标端口')
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
parser.add_argument('--src-ip', help='过滤源IP地址')
parser.add_argument('--src-port', type=int, help='过滤源端口')
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
args = parser.parse_args()
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
if __name__ == '__main__':
main()