Compare commits

..

8 Commits

Author SHA1 Message Date
langhuihui
77613e52a8 feat: mp4 to ts convert 2025-06-16 09:03:26 +08:00
erroot
ec56bba75a Erroot v5 (#286)
* 插件数据库不同时,新建DB 对象赋值给插件

* MP4 plugin adds extraction, clips, images, compressed video, GOP clicp

* remove mp4/util panic code
2025-06-16 08:29:14 +08:00
pggiroro
b2b511d755 fix: user.LastLogin set gorm type:timestamp, gb28181 api GetGroupChannels modify 2025-06-15 22:19:14 +08:00
pggiroro
42acf47250 feature: gb28181 support single mediaport 2025-06-15 16:58:52 +08:00
langhuihui
6206ee847d fix: record table fit pg database 2025-06-15 15:58:12 +08:00
langhuihui
6cfdc03e4a fix: user mode fit pg database 2025-06-15 15:21:21 +08:00
pggiroro
b425b8da1f fix: ignore RecordEvent in gorm 2025-06-13 12:52:57 +08:00
langhuihui
e105243cd5 refactor: record 2025-06-13 12:52:57 +08:00
19 changed files with 3058 additions and 800 deletions

View File

@@ -9,14 +9,11 @@ import (
// User represents a user in the system
type User struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt gorm.DeletedAt `gorm:"index"`
Username string `gorm:"uniqueIndex;size:64"`
Password string `gorm:"size:60"` // bcrypt hash
Role string `gorm:"size:20;default:'user'"` // admin or user
LastLogin time.Time `gorm:"type:datetime;default:CURRENT_TIMESTAMP"`
gorm.Model
Username string `gorm:"uniqueIndex;size:64"`
Password string `gorm:"size:60"` // bcrypt hash
Role string `gorm:"size:20;default:'user'"` // admin or user
LastLogin time.Time `gorm:"type:timestamp;default:CURRENT_TIMESTAMP"`
}
// BeforeCreate hook to hash password before saving

View File

@@ -13,7 +13,6 @@ type (
Port struct {
Protocol string
Ports [2]int
Map [2]int // 映射端口范围,通常用于 NAT 或端口转发
}
IPort interface {
IsTCP() bool
@@ -23,23 +22,10 @@ type (
)
func (p Port) String() string {
var result string
if p.Ports[0] == p.Ports[1] {
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0])
} else {
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
return p.Protocol + ":" + strconv.Itoa(p.Ports[0])
}
// 如果有端口映射,添加映射信息
if p.HasMapping() {
if p.Map[0] == p.Map[1] {
result += ":" + strconv.Itoa(p.Map[0])
} else {
result += ":" + strconv.Itoa(p.Map[0]) + "-" + strconv.Itoa(p.Map[1])
}
}
return result
return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
}
func (p Port) IsTCP() bool {
@@ -54,36 +40,6 @@ func (p Port) IsRange() bool {
return p.Ports[0] != p.Ports[1]
}
func (p Port) HasMapping() bool {
return p.Map[0] > 0 || p.Map[1] > 0
}
func (p Port) IsRangeMapping() bool {
return p.HasMapping() && p.Map[0] != p.Map[1]
}
// ParsePort2 解析端口配置字符串并返回对应的端口类型实例
// 根据协议类型和端口范围返回不同的类型:
// - TCP单端口返回 TCPPort
// - TCP端口范围返回 TCPRangePort
// - UDP单端口返回 UDPPort
// - UDP端口范围返回 UDPRangePort
//
// 参数:
//
// conf - 端口配置字符串格式protocol:port 或 protocol:port1-port2
//
// 返回值:
//
// ret - 端口实例 (TCPPort/UDPPort/TCPRangePort/UDPRangePort)
// err - 解析错误
//
// 示例:
//
// ParsePort2("tcp:8080") // 返回 TCPPort(8080)
// ParsePort2("tcp:8080-8090") // 返回 TCPRangePort([2]int{8080, 8090})
// ParsePort2("udp:5000") // 返回 UDPPort(5000)
// ParsePort2("udp:5000-5010") // 返回 UDPRangePort([2]int{5000, 5010})
func ParsePort2(conf string) (ret any, err error) {
var port Port
port, err = ParsePort(conf)
@@ -102,84 +58,10 @@ func ParsePort2(conf string) (ret any, err error) {
return UDPPort(port.Ports[0]), nil
}
// ParsePort 解析端口配置字符串为 Port 结构体
// 支持协议前缀、端口号/端口范围以及端口映射的解析
//
// 参数:
//
// conf - 端口配置字符串,格式:
// - "protocol:port" 单端口,如 "tcp:8080"
// - "protocol:port1-port2" 端口范围,如 "tcp:8080-8090"
// - "protocol:port:mapPort" 单端口映射,如 "tcp:8080:9090"
// - "protocol:port:mapPort1-mapPort2" 单端口映射到端口范围,如 "tcp:8080:9000-9010"
// - "protocol:port1-port2:mapPort1-mapPort2" 端口范围映射,如 "tcp:8080-8090:9000-9010"
//
// 返回值:
//
// ret - Port 结构体,包含协议、端口和映射端口信息
// err - 解析错误
//
// 注意:
// - 如果端口范围中 min > max会自动交换顺序
// - 单端口时Ports[0] 和 Ports[1] 值相同
// - 端口映射时Map[0] 和 Map[1] 存储映射的目标端口范围
// - 单个映射端口时Map[0] 和 Map[1] 值相同
//
// 示例:
//
// ParsePort("tcp:8080") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{0, 0}}
// ParsePort("tcp:8080-8090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{0, 0}}
// ParsePort("tcp:8080:9090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9090, 9090}}
// ParsePort("tcp:8080:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9000, 9010}}
// ParsePort("tcp:8080-8090:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{9000, 9010}}
// ParsePort("udp:5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5000}, Map:[2]int{0, 0}}
// ParsePort("udp:5010-5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5010}, Map:[2]int{0, 0}}
func ParsePort(conf string) (ret Port, err error) {
var port, mapPort string
var port string
var min, max int
// 按冒号分割,支持端口映射
parts := strings.Split(conf, ":")
if len(parts) < 2 || len(parts) > 3 {
err = strconv.ErrSyntax
return
}
ret.Protocol = parts[0]
port = parts[1]
// 处理端口映射
if len(parts) == 3 {
mapPort = parts[2]
// 解析映射端口,支持单端口和端口范围
if mapRange := strings.Split(mapPort, "-"); len(mapRange) == 2 {
// 映射端口范围
var mapMin, mapMax int
mapMin, err = strconv.Atoi(mapRange[0])
if err != nil {
return
}
mapMax, err = strconv.Atoi(mapRange[1])
if err != nil {
return
}
if mapMin < mapMax {
ret.Map[0], ret.Map[1] = mapMin, mapMax
} else {
ret.Map[0], ret.Map[1] = mapMax, mapMin
}
} else {
// 单个映射端口
var mapPortNum int
mapPortNum, err = strconv.Atoi(mapPort)
if err != nil {
return
}
ret.Map[0], ret.Map[1] = mapPortNum, mapPortNum
}
}
// 处理端口范围
ret.Protocol, port, _ = strings.Cut(conf, ":")
if r := strings.Split(port, "-"); len(r) == 2 {
min, err = strconv.Atoi(r[0])
if err != nil {
@@ -194,12 +76,7 @@ func ParsePort(conf string) (ret Port, err error) {
} else {
ret.Ports[0], ret.Ports[1] = max, min
}
} else {
var p int
p, err = strconv.Atoi(port)
if err != nil {
return
}
} else if p, err := strconv.Atoi(port); err == nil {
ret.Ports[0], ret.Ports[1] = p, p
}
return

View File

@@ -1,370 +0,0 @@
package pkg
import (
"testing"
)
func TestParsePort(t *testing.T) {
tests := []struct {
name string
input string
expected Port
hasError bool
}{
{
name: "TCP单端口",
input: "tcp:8080",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP端口范围",
input: "tcp:8080-8090",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP端口范围反序",
input: "tcp:8090-8080",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "TCP单端口映射到单端口",
input: "tcp:8080:9090",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9090, 9090},
},
hasError: false,
},
{
name: "TCP单端口映射到端口范围",
input: "tcp:8080:9000-9010",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9000, 9010},
},
hasError: false,
},
{
name: "TCP端口范围映射到端口范围",
input: "tcp:8080-8090:9000-9010",
expected: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{9000, 9010},
},
hasError: false,
},
{
name: "UDP单端口",
input: "udp:5000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "UDP端口范围",
input: "udp:5000-5010",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5010},
Map: [2]int{0, 0},
},
hasError: false,
},
{
name: "UDP端口映射",
input: "udp:5000:6000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{6000, 6000},
},
hasError: false,
},
{
name: "UDP端口范围映射映射范围反序",
input: "udp:5000-5010:6010-6000",
expected: Port{
Protocol: "udp",
Ports: [2]int{5000, 5010},
Map: [2]int{6000, 6010},
},
hasError: false,
},
// 错误情况
{
name: "缺少协议",
input: "8080",
expected: Port{},
hasError: true,
},
{
name: "过多冒号",
input: "tcp:8080:9090:extra",
expected: Port{},
hasError: true,
},
{
name: "无效端口号",
input: "tcp:abc",
expected: Port{},
hasError: true,
},
{
name: "无效映射端口号",
input: "tcp:8080:abc",
expected: Port{},
hasError: true,
},
{
name: "无效端口范围",
input: "tcp:8080-abc",
expected: Port{},
hasError: true,
},
{
name: "无效映射端口范围",
input: "tcp:8080:9000-abc",
expected: Port{},
hasError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ParsePort(tt.input)
if tt.hasError {
if err == nil {
t.Errorf("期望有错误,但没有错误")
}
return
}
if err != nil {
t.Errorf("意外的错误: %v", err)
return
}
if result.Protocol != tt.expected.Protocol {
t.Errorf("协议不匹配: 期望 %s, 得到 %s", tt.expected.Protocol, result.Protocol)
}
if result.Ports != tt.expected.Ports {
t.Errorf("端口不匹配: 期望 %v, 得到 %v", tt.expected.Ports, result.Ports)
}
if result.Map != tt.expected.Map {
t.Errorf("映射端口不匹配: 期望 %v, 得到 %v", tt.expected.Map, result.Map)
}
})
}
}
func TestPortMethods(t *testing.T) {
tests := []struct {
name string
port Port
expectTCP bool
expectUDP bool
expectRange bool
expectMapping bool
expectRangeMap bool
expectString string
}{
{
name: "TCP单端口",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{0, 0},
},
expectTCP: true,
expectUDP: false,
expectRange: false,
expectMapping: false,
expectRangeMap: false,
expectString: "tcp:8080",
},
{
name: "TCP端口范围",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{0, 0},
},
expectTCP: true,
expectUDP: false,
expectRange: true,
expectMapping: false,
expectRangeMap: false,
expectString: "tcp:8080-8090",
},
{
name: "TCP单端口映射",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8080},
Map: [2]int{9090, 9090},
},
expectTCP: true,
expectUDP: false,
expectRange: false,
expectMapping: true,
expectRangeMap: false,
expectString: "tcp:8080:9090",
},
{
name: "TCP端口范围映射",
port: Port{
Protocol: "tcp",
Ports: [2]int{8080, 8090},
Map: [2]int{9000, 9010},
},
expectTCP: true,
expectUDP: false,
expectRange: true,
expectMapping: true,
expectRangeMap: true,
expectString: "tcp:8080-8090:9000-9010",
},
{
name: "UDP单端口映射到端口范围",
port: Port{
Protocol: "udp",
Ports: [2]int{5000, 5000},
Map: [2]int{6000, 6010},
},
expectTCP: false,
expectUDP: true,
expectRange: false,
expectMapping: true,
expectRangeMap: true,
expectString: "udp:5000:6000-6010",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.port.IsTCP() != tt.expectTCP {
t.Errorf("IsTCP(): 期望 %v, 得到 %v", tt.expectTCP, tt.port.IsTCP())
}
if tt.port.IsUDP() != tt.expectUDP {
t.Errorf("IsUDP(): 期望 %v, 得到 %v", tt.expectUDP, tt.port.IsUDP())
}
if tt.port.IsRange() != tt.expectRange {
t.Errorf("IsRange(): 期望 %v, 得到 %v", tt.expectRange, tt.port.IsRange())
}
if tt.port.HasMapping() != tt.expectMapping {
t.Errorf("HasMapping(): 期望 %v, 得到 %v", tt.expectMapping, tt.port.HasMapping())
}
if tt.port.IsRangeMapping() != tt.expectRangeMap {
t.Errorf("IsRangeMapping(): 期望 %v, 得到 %v", tt.expectRangeMap, tt.port.IsRangeMapping())
}
if tt.port.String() != tt.expectString {
t.Errorf("String(): 期望 %s, 得到 %s", tt.expectString, tt.port.String())
}
})
}
}
func TestParsePort2(t *testing.T) {
tests := []struct {
name string
input string
expectedType string
hasError bool
}{
{
name: "TCP单端口",
input: "tcp:8080",
expectedType: "TCPPort",
hasError: false,
},
{
name: "TCP端口范围",
input: "tcp:8080-8090",
expectedType: "TCPRangePort",
hasError: false,
},
{
name: "UDP单端口",
input: "udp:5000",
expectedType: "UDPPort",
hasError: false,
},
{
name: "UDP端口范围",
input: "udp:5000-5010",
expectedType: "UDPRangePort",
hasError: false,
},
{
name: "无效输入",
input: "invalid",
hasError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ParsePort2(tt.input)
if tt.hasError {
if err == nil {
t.Errorf("期望有错误,但没有错误")
}
return
}
if err != nil {
t.Errorf("意外的错误: %v", err)
return
}
switch tt.expectedType {
case "TCPPort":
if _, ok := result.(TCPPort); !ok {
t.Errorf("期望类型 TCPPort, 得到 %T", result)
}
case "TCPRangePort":
if _, ok := result.(TCPRangePort); !ok {
t.Errorf("期望类型 TCPRangePort, 得到 %T", result)
}
case "UDPPort":
if _, ok := result.(UDPPort); !ok {
t.Errorf("期望类型 UDPPort, 得到 %T", result)
}
case "UDPRangePort":
if _, ok := result.(UDPRangePort); !ok {
t.Errorf("期望类型 UDPRangePort, 得到 %T", result)
}
}
})
}
}

View File

@@ -1873,8 +1873,8 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
Select(`
IFNULL(gc.id, 0) AS id,
dc.channel_id,
dc.device_id,
dc.name AS channel_name,
d.device_id AS device_id,
d.name AS device_name,
dc.status AS status,
CASE
@@ -1883,11 +1883,11 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
END AS in_group
`).
Joins("LEFT JOIN "+deviceTable+" AS d ON dc.device_id = d.device_id").
Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND gc.group_id = ?", req.GroupId)
Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND dc.device_id = gc.device_id AND gc.group_id = ?", req.GroupId)
// 如果有设备ID过滤条件
if req.DeviceId != "" {
baseQuery = baseQuery.Where("d.device_id = ?", req.DeviceId)
baseQuery = baseQuery.Where("dc.device_id = ?", req.DeviceId)
}
// 统计符合条件的通道总数
@@ -1903,7 +1903,7 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
query := baseQuery
// 添加排序
query = query.Order("channel_id ASC")
query = query.Order("dc.device_id ASC, dc.channel_id ASC")
// 如果指定了分页参数,则应用分页
if req.Page > 0 && req.Count > 0 {
@@ -1922,12 +1922,14 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
var pbGroupChannels []*pb.GroupChannel
for _, result := range results {
channelInfo := &pb.GroupChannel{
Id: int32(result.ID),
GroupId: req.GroupId,
ChannelId: result.ChannelID,
DeviceId: result.DeviceID,
ChannelName: result.ChannelName,
DeviceName: result.DeviceName,
Status: result.Status,
InGroup: result.InGroup, // 设置inGroup字段
InGroup: result.InGroup,
}
// 从内存中获取设备信息以获取传输协议
@@ -1935,13 +1937,6 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
channelInfo.StreamMode = device.StreamMode
}
if result.InGroup {
channelInfo.Id = int32(result.ID)
channelInfo.GroupId = int32(req.GroupId)
} else {
channelInfo.Id = 0
}
pbGroupChannels = append(pbGroupChannels, channelInfo)
}
@@ -2082,19 +2077,19 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
InGroup bool `gorm:"column:in_group"`
}
// 构建查询
// 构建优化后的查询
query := gb.DB.Table(groupsChannelTable+" AS gc").
Select(`
gc.id AS id,
gc.channel_id AS channel_id,
gc.device_id AS device_id,
dc.name AS channel_name,
d.name AS device_name,
dc.status AS status,
ch.name AS channel_name,
dev.name AS device_name,
ch.status AS status,
true AS in_group
`).
Joins("LEFT JOIN "+deviceChannelTable+" AS dc ON gc.channel_id = dc.channel_id").
Joins("LEFT JOIN "+deviceTable+" AS d ON gc.device_id = d.device_id").
Joins("LEFT JOIN "+deviceChannelTable+" AS ch ON gc.device_id = ch.device_id AND gc.channel_id = ch.channel_id").
Joins("LEFT JOIN "+deviceTable+" AS dev ON ch.device_id = dev.device_id").
Where("gc.group_id = ?", groupId)
var results []Result
@@ -2107,7 +2102,7 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
for _, result := range results {
channelInfo := &pb.GroupChannel{
Id: int32(result.ID),
GroupId: groupId,
GroupId: groupId, // 使用函数参数 groupId
ChannelId: result.ChannelID,
DeviceId: result.DeviceID,
ChannelName: result.ChannelName,
@@ -2868,7 +2863,7 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
}
// 删除设备关联的通道
if err := tx.Delete(&gb28181.DeviceChannel{DeviceID: req.Id}).Error; err != nil {
if err := tx.Where("device_id = ?", req.Id).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
tx.Rollback()
resp.Code = 500
resp.Message = "删除设备通道失败"

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
@@ -99,15 +100,20 @@ func (d *Dialog) Start() (err error) {
d.gb.dialogs.Set(d)
//defer d.gb.dialogs.Remove(d)
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.tcpPorts:
default:
return fmt.Errorf("no available tcp port")
}
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
} else {
d.MediaPort = d.gb.MediaPort[0]
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.tcpPorts:
default:
return fmt.Errorf("no available tcp port")
}
} else {
d.MediaPort = d.gb.MediaPort[0]
}
}
ssrc := d.CreateSSRC(d.gb.Serial)
d.Info("MediaIp is ", device.MediaIp)
@@ -266,7 +272,7 @@ func (d *Dialog) Run() (err error) {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
d.SSRC = uint32(_ssrc)
} else {
d.gb.Error("read invite response y ", "err", err)
return errors.New("read invite respose y error" + err.Error())
}
}
case "c":
@@ -299,6 +305,18 @@ func (d *Dialog) Run() (err error) {
if d.StreamMode == "TCP-ACTIVE" {
pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
} else {
if d.gb.tcpPort > 0 {
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
if d.gb.netListener != nil {
d.Info("use gb.netListener", d.gb.netListener.Addr())
pub.Receiver.Listener = d.gb.netListener
} else {
d.Info("listen tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
pub.Receiver.Listener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
d.gb.netListener = pub.Receiver.Listener
}
pub.Receiver.SSRC = d.SSRC
}
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
pub.Receiver.StreamMode = d.StreamMode
@@ -316,7 +334,11 @@ func (d *Dialog) GetKey() uint32 {
}
func (d *Dialog) Dispose() {
d.gb.tcpPorts <- d.MediaPort
if d.gb.tcpPort == 0 {
// 如果没有设置tcp端口则将MediaPort设置为0表示不再使用
d.gb.tcpPorts <- d.MediaPort
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceID, "channelId", d.Channel.ChannelID)
if d.session != nil {
err := d.session.Bye(d)
if err != nil {

View File

@@ -3,9 +3,9 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"net"
"net/http"
"os"
"regexp"
"slices"
"strconv"
"strings"
@@ -41,7 +41,7 @@ type GB28181Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string
Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
@@ -55,12 +55,14 @@ type GB28181Plugin struct {
forwardDialogs util.Collection[uint32, *ForwardDialog]
platforms util.Collection[string, *Platform]
tcpPorts chan uint16
tcpPort uint16
sipPorts []int
SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"`
MediaIP string `desc:"流媒体IP用于接收流"`
deviceManager task.Manager[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *gb28181.DeviceChannel]
netListener net.Listener
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -75,6 +77,18 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
NewPullProxy: NewPullProxy,
})
func (gb *GB28181Plugin) Dispose() {
if gb.netListener != nil {
gb.Info("gb28181 plugin dispose")
err := gb.netListener.Close()
if err != nil {
gb.Error("Close netListener error", "error", err)
} else {
gb.Info("netListener closed")
}
}
}
func init() {
sip.SIPDebug = true
}
@@ -153,8 +167,16 @@ func (gb *GB28181Plugin) OnInit() (err error) {
if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
if gb.MediaPort.Size() == 0 {
gb.tcpPort = gb.MediaPort[0]
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
} else if gb.MediaPort.Size() == 1 {
gb.tcpPort = gb.MediaPort[0] + 1
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
} else {
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
}
}
} else {
gb.SetDescription("tcp", fmt.Sprintf("%d", gb.MediaPort[0]))
@@ -438,22 +460,9 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
from := req.From()
if from == nil || from.Address.User == "" {
gb.Error("OnRegister", "error", "no user")
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid sip from format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
deviceId := from.Address.User
// 验证设备ID是否符合GB28181规范(20位数字)
if match, _ := regexp.MatchString(`^\d{20}$`, deviceId); !match {
gb.Error("OnRegister", "error", "invalid device id format, must be 20 digits", "deviceId", deviceId)
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid device ID format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
registerHandlerTask := registerHandlerTask{
gb: gb,
req: req,

View File

@@ -44,8 +44,9 @@ type Receiver struct {
psAudio PSAudio
RTPReader *rtp2.TCP
ListenAddr string
listener net.Listener
Listener net.Listener
StreamMode string // 数据流传输模式UDP:udp传输/TCP-ACTIVEtcp主动模式/TCP-PASSIVEtcp被动模式
SSRC uint32 // RTP SSRC
}
func NewPSPublisher(puber *m7s.Publisher) *PSPublisher {
@@ -147,9 +148,19 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
p.Error("unmarshal error", "err", err)
return
}
// 如果设置了SSRC过滤只处理匹配的SSRC
if p.SSRC != 0 && p.SSRC != p.Packet.SSRC {
p.Info("into single port mode, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC)
if p.TraceEnabled() {
p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC)
}
return nil
}
if lastSeq == 0 || p.SequenceNumber == lastSeq+1 {
if p.TraceEnabled() {
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC)
}
copyData := make([]byte, len(p.Payload))
copy(copyData, p.Payload)
@@ -172,18 +183,24 @@ func (p *Receiver) Start() (err error) {
return nil
}
// TCP被动模式
p.listener, err = net.Listen("tcp4", p.ListenAddr)
if err != nil {
p.Error("start listen", "err", err)
return errors.New("start listen,err" + err.Error())
if p.Listener == nil {
p.Info("start new listener", "addr", p.ListenAddr)
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
if err != nil {
p.Error("start listen", "err", err)
return errors.New("start listen,err" + err.Error())
}
}
p.Info("start listen", "addr", p.ListenAddr)
return
}
func (p *Receiver) Dispose() {
if p.listener != nil {
p.listener.Close()
if p.SSRC == 0 {
p.Info("into multiport mode ,close listener ", p.SSRC)
if p.Listener != nil {
p.Listener.Close()
}
}
if p.RTPReader != nil {
p.RTPReader.Close()
@@ -216,7 +233,7 @@ func (p *Receiver) Go() error {
}
// TCP被动模式
p.Info("start accept")
conn, err := p.listener.Accept()
conn, err := p.Listener.Accept()
if err != nil {
p.Error("accept", "err", err)
return err

682
plugin/hls/download.go Normal file
View File

@@ -0,0 +1,682 @@
package plugin_hls
import (
"bufio"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
m7s "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/util"
hls "m7s.live/v5/plugin/hls/pkg"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
mp4 "m7s.live/v5/plugin/mp4/pkg"
"m7s.live/v5/plugin/mp4/pkg/box"
)
// requestParams 包含请求解析后的参数
type requestParams struct {
streamPath string
startTime time.Time
endTime time.Time
timeRange time.Duration
}
// fileInfo 包含文件信息
type fileInfo struct {
filePath string
startTime time.Time
endTime time.Time
startOffsetTime time.Duration
recordType string // "ts", "mp4", "fmp4"
}
// parseRequestParams 解析请求参数
func (plugin *HLSPlugin) parseRequestParams(r *http.Request) (*requestParams, error) {
// 从URL路径中提取流路径去除前缀 "/download/" 和后缀 ".ts"
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/download/"), ".ts")
// 解析URL查询参数中的时间范围start和end参数
startTime, endTime, err := util.TimeRangeQueryParse(r.URL.Query())
if err != nil {
return nil, err
}
return &requestParams{
streamPath: streamPath,
startTime: startTime,
endTime: endTime,
timeRange: endTime.Sub(startTime),
}, nil
}
// queryRecordStreams 从数据库查询录像记录
func (plugin *HLSPlugin) queryRecordStreams(params *requestParams) ([]m7s.RecordStream, error) {
// 检查数据库是否可用
if plugin.DB == nil {
return nil, fmt.Errorf("database not available")
}
var recordStreams []m7s.RecordStream
// 首先查询HLS记录 (ts)
query := plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type = ?", params.streamPath, "hls")
// 添加时间范围查询条件
if !params.startTime.IsZero() && !params.endTime.IsZero() {
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
params.endTime, params.startTime, params.startTime, params.endTime)
}
err := query.Order("start_time ASC").Find(&recordStreams).Error
if err != nil {
return nil, err
}
// 如果没有找到HLS记录尝试查询MP4记录
if len(recordStreams) == 0 {
query = plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type IN (?)", params.streamPath, []string{"mp4", "fmp4"})
if !params.startTime.IsZero() && !params.endTime.IsZero() {
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
params.endTime, params.startTime, params.startTime, params.endTime)
}
err = query.Order("start_time ASC").Find(&recordStreams).Error
if err != nil {
return nil, err
}
}
return recordStreams, nil
}
// buildFileInfoList 构建文件信息列表
func (plugin *HLSPlugin) buildFileInfoList(recordStreams []m7s.RecordStream, startTime, endTime time.Time) ([]*fileInfo, bool) {
var fileInfoList []*fileInfo
var found bool
for _, record := range recordStreams {
// 检查文件是否存在
if !util.Exist(record.FilePath) {
plugin.Warn("Record file not found", "filePath", record.FilePath)
continue
}
var startOffsetTime time.Duration
recordStartTime := record.StartTime
recordEndTime := record.EndTime
// 计算文件内的偏移时间
if startTime.After(recordStartTime) {
startOffsetTime = startTime.Sub(recordStartTime)
}
// 检查是否在时间范围内
if recordEndTime.Before(startTime) || recordStartTime.After(endTime) {
continue
}
fileInfoList = append(fileInfoList, &fileInfo{
filePath: record.FilePath,
startTime: recordStartTime,
endTime: recordEndTime,
startOffsetTime: startOffsetTime,
recordType: record.Type,
})
found = true
}
return fileInfoList, found
}
// hasOnlyMp4Records 检查是否只有MP4记录
func (plugin *HLSPlugin) hasOnlyMp4Records(fileInfoList []*fileInfo) bool {
if len(fileInfoList) == 0 {
return false
}
for _, info := range fileInfoList {
if info.recordType == "hls" {
return false
}
}
return true
}
// filterTsFiles 过滤HLS TS文件
func (plugin *HLSPlugin) filterTsFiles(fileInfoList []*fileInfo) []*fileInfo {
var filteredList []*fileInfo
for _, info := range fileInfoList {
if info.recordType == "hls" {
filteredList = append(filteredList, info)
}
}
plugin.Debug("TS files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
return filteredList
}
// filterMp4Files 过滤MP4文件
func (plugin *HLSPlugin) filterMp4Files(fileInfoList []*fileInfo) []*fileInfo {
var filteredList []*fileInfo
for _, info := range fileInfoList {
if info.recordType == "mp4" || info.recordType == "fmp4" {
filteredList = append(filteredList, info)
}
}
plugin.Debug("MP4 files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
return filteredList
}
// processMp4ToTs 将MP4记录转换为TS输出
func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
plugin.Info("Converting MP4 records to TS", "count", len(fileInfoList))
// 设置HTTP响应头
w.Header().Set("Content-Type", "video/mp2t")
w.Header().Set("Content-Disposition", "attachment")
// 创建一个TS写入器在循环外面所有MP4文件共享同一个TsInMemory
tsWriter := &simpleTsWriter{
TsInMemory: &hls.TsInMemory{},
plugin: plugin,
}
// 对于MP4到TS的转换我们采用简化的方法
// 直接将每个MP4文件转换输出
for _, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
plugin.Debug("Converting MP4 file to TS", "path", info.filePath)
// 创建MP4解复用器
demuxer := &mp4.DemuxerRange{
StartTime: params.startTime,
EndTime: params.endTime,
Streams: []m7s.RecordStream{{
FilePath: info.filePath,
StartTime: info.startTime,
EndTime: info.endTime,
Type: info.recordType,
}},
}
// 设置回调函数
demuxer.OnVideoExtraData = tsWriter.onVideoExtraData
demuxer.OnAudioExtraData = tsWriter.onAudioExtraData
demuxer.OnVideoSample = tsWriter.onVideoSample
demuxer.OnAudioSample = tsWriter.onAudioSample
// 执行解复用和转换
err := demuxer.Demux(r.Context())
if err != nil {
plugin.Error("MP4 to TS conversion failed", "err", err, "file", info.filePath)
if !tsWriter.hasWritten {
http.Error(w, "Conversion failed", http.StatusInternalServerError)
}
return
}
}
// 将所有累积的 TsInMemory 内容写入到响应
_, err := tsWriter.WriteTo(w)
if err != nil {
plugin.Error("Failed to write TS data to response", "error", err)
return
}
plugin.Info("MP4 to TS conversion completed")
}
// simpleTsWriter 简化的TS写入器
type simpleTsWriter struct {
*hls.TsInMemory
plugin *HLSPlugin
hasWritten bool
spsData []byte
ppsData []byte
videoCodec box.MP4_CODEC_TYPE
audioCodec box.MP4_CODEC_TYPE
}
func (w *simpleTsWriter) WritePMT() {
// 初始化 TsInMemory 的 PMT
var videoCodec, audioCodec [4]byte
switch w.videoCodec {
case box.MP4_CODEC_H264:
copy(videoCodec[:], []byte("H264"))
case box.MP4_CODEC_H265:
copy(videoCodec[:], []byte("H265"))
}
switch w.audioCodec {
case box.MP4_CODEC_AAC:
copy(audioCodec[:], []byte("MP4A"))
}
w.WritePMTPacket(audioCodec, videoCodec)
w.hasWritten = true
}
// onVideoExtraData 处理视频序列头
func (w *simpleTsWriter) onVideoExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
w.videoCodec = codecType
// 解析并存储SPS/PPS数据
if codecType == box.MP4_CODEC_H264 && len(data) > 0 {
if w.plugin != nil {
w.plugin.Debug("Processing H264 extra data", "size", len(data))
}
// 解析AVCC格式的extra data
if len(data) >= 8 {
// AVCC格式: configurationVersion(1) + AVCProfileIndication(1) + profile_compatibility(1) + AVCLevelIndication(1) +
// lengthSizeMinusOne(1) + numOfSequenceParameterSets(1) + ...
offset := 5 // 跳过前5个字节
if offset < len(data) {
// 读取SPS数量
numSPS := data[offset] & 0x1f
offset++
// 解析SPS
for i := 0; i < int(numSPS) && offset < len(data)-1; i++ {
if offset+1 >= len(data) {
break
}
spsLength := int(data[offset])<<8 | int(data[offset+1])
offset += 2
if offset+spsLength <= len(data) {
// 添加起始码并存储SPS
w.spsData = make([]byte, 4+spsLength)
copy(w.spsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
copy(w.spsData[4:], data[offset:offset+spsLength])
offset += spsLength
if w.plugin != nil {
w.plugin.Debug("Extracted SPS", "length", spsLength)
}
break // 只取第一个SPS
}
}
// 读取PPS数量
if offset < len(data) {
numPPS := data[offset]
offset++
// 解析PPS
for i := 0; i < int(numPPS) && offset < len(data)-1; i++ {
if offset+1 >= len(data) {
break
}
ppsLength := int(data[offset])<<8 | int(data[offset+1])
offset += 2
if offset+ppsLength <= len(data) {
// 添加起始码并存储PPS
w.ppsData = make([]byte, 4+ppsLength)
copy(w.ppsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
copy(w.ppsData[4:], data[offset:offset+ppsLength])
if w.plugin != nil {
w.plugin.Debug("Extracted PPS", "length", ppsLength)
}
break // 只取第一个PPS
}
}
}
}
}
}
return nil
}
// onAudioExtraData 处理音频序列头
func (w *simpleTsWriter) onAudioExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
w.audioCodec = codecType
w.plugin.Debug("Processing audio extra data", "codec", codecType, "size", len(data))
return nil
}
// onVideoSample 处理视频样本
func (w *simpleTsWriter) onVideoSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
if !w.hasWritten {
w.WritePMT()
}
w.plugin.Debug("Processing video sample", "size", len(sample.Data), "keyFrame", sample.KeyFrame, "timestamp", sample.Timestamp)
// 转换AVCC格式到Annex-B格式
annexBData, err := w.convertAVCCToAnnexB(sample.Data, sample.KeyFrame)
if err != nil {
w.plugin.Error("Failed to convert AVCC to Annex-B", "error", err)
return err
}
if len(annexBData) == 0 {
w.plugin.Warn("Empty Annex-B data after conversion")
return nil
}
// 创建视频帧结构
videoFrame := mpegts.MpegtsPESFrame{
Pid: mpegts.PID_VIDEO,
IsKeyFrame: sample.KeyFrame,
}
// 创建 AnnexB 帧
annexBFrame := &pkg.AnnexB{
PTS: (time.Duration(sample.Timestamp) + time.Duration(sample.CTS)) * 90,
DTS: time.Duration(sample.Timestamp) * 90, // 对于MP4转换假设PTS=DTS
}
// 根据编解码器类型设置 Hevc 标志
if codecType == box.MP4_CODEC_H265 {
annexBFrame.Hevc = true
}
annexBFrame.AppendOne(annexBData)
// 使用 WriteVideoFrame 写入TS包
err = w.WriteVideoFrame(annexBFrame, &videoFrame)
if err != nil {
w.plugin.Error("Failed to write video frame", "error", err)
return err
}
return nil
}
// convertAVCCToAnnexB 将AVCC格式转换为Annex-B格式
func (w *simpleTsWriter) convertAVCCToAnnexB(avccData []byte, isKeyFrame bool) ([]byte, error) {
if len(avccData) == 0 {
return nil, fmt.Errorf("empty AVCC data")
}
var annexBBuffer []byte
// 如果是关键帧先添加SPS和PPS
if isKeyFrame {
if len(w.spsData) > 0 {
annexBBuffer = append(annexBBuffer, w.spsData...)
w.plugin.Debug("Added SPS to key frame", "spsSize", len(w.spsData))
}
if len(w.ppsData) > 0 {
annexBBuffer = append(annexBBuffer, w.ppsData...)
w.plugin.Debug("Added PPS to key frame", "ppsSize", len(w.ppsData))
}
}
// 解析AVCC格式的NAL单元
offset := 0
nalCount := 0
for offset < len(avccData) {
// AVCC格式4字节长度 + NAL数据
if offset+4 > len(avccData) {
break
}
// 读取NAL单元长度大端序
nalLength := int(avccData[offset])<<24 |
int(avccData[offset+1])<<16 |
int(avccData[offset+2])<<8 |
int(avccData[offset+3])
offset += 4
if nalLength <= 0 || offset+nalLength > len(avccData) {
w.plugin.Warn("Invalid NAL length", "length", nalLength, "remaining", len(avccData)-offset)
break
}
nalData := avccData[offset : offset+nalLength]
offset += nalLength
nalCount++
if len(nalData) > 0 {
nalType := nalData[0] & 0x1f
w.plugin.Debug("Converting NAL unit", "type", nalType, "length", nalLength)
// 添加起始码前缀
annexBBuffer = append(annexBBuffer, []byte{0x00, 0x00, 0x00, 0x01}...)
annexBBuffer = append(annexBBuffer, nalData...)
}
}
if nalCount == 0 {
return nil, fmt.Errorf("no NAL units found in AVCC data")
}
w.plugin.Debug("AVCC to Annex-B conversion completed",
"inputSize", len(avccData),
"outputSize", len(annexBBuffer),
"nalUnits", nalCount)
return annexBBuffer, nil
}
// onAudioSample 处理音频样本
func (w *simpleTsWriter) onAudioSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
if !w.hasWritten {
w.WritePMT()
}
w.plugin.Debug("Processing audio sample", "codec", codecType, "size", len(sample.Data), "timestamp", sample.Timestamp)
// 创建音频帧结构
audioFrame := mpegts.MpegtsPESFrame{
Pid: mpegts.PID_AUDIO,
}
// 根据编解码器类型处理音频数据
switch codecType {
case box.MP4_CODEC_AAC: // AAC
// 创建 ADTS 帧
adtsFrame := &pkg.ADTS{
DTS: time.Duration(sample.Timestamp) * 90,
}
// 将音频数据添加到帧中
copy(adtsFrame.NextN(len(sample.Data)), sample.Data)
// 使用 WriteAudioFrame 写入TS包
err := w.WriteAudioFrame(adtsFrame, &audioFrame)
if err != nil {
w.plugin.Error("Failed to write audio frame", "error", err)
return err
}
default:
// 对于非AAC音频暂时使用原来的PES包方式
pesPacket := mpegts.MpegTsPESPacket{
Header: mpegts.MpegTsPESHeader{
PacketStartCodePrefix: 0x000001,
StreamID: mpegts.STREAM_ID_AUDIO,
},
}
// 设置可选字段
pesPacket.Header.ConstTen = 0x80
pesPacket.Header.PtsDtsFlags = 0x80 // 只有PTS
pesPacket.Header.PesHeaderDataLength = 5
pesPacket.Header.Pts = uint64(sample.Timestamp)
pesPacket.Buffers = append(pesPacket.Buffers, sample.Data)
// 写入TS包
err := w.WritePESPacket(&audioFrame, pesPacket)
if err != nil {
w.plugin.Error("Failed to write audio PES packet", "error", err)
return err
}
}
return nil
}
// processTsFiles 处理原生TS文件拼接
func (plugin *HLSPlugin) processTsFiles(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
plugin.Info("Processing TS files", "count", len(fileInfoList))
// 设置HTTP响应头
w.Header().Set("Content-Type", "video/mp2t")
w.Header().Set("Content-Disposition", "attachment")
var writer io.Writer = w
var totalSize uint64
// 第一次遍历:计算总大小
for _, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
fileInfo, err := os.Stat(info.filePath)
if err != nil {
plugin.Error("Failed to stat file", "path", info.filePath, "err", err)
continue
}
totalSize += uint64(fileInfo.Size())
}
// 设置内容长度
w.Header().Set("Content-Length", strconv.FormatUint(totalSize, 10))
w.WriteHeader(http.StatusOK)
// 第二次遍历:写入数据
for i, info := range fileInfoList {
if r.Context().Err() != nil {
return
}
plugin.Debug("Processing TS file", "path", info.filePath)
file, err := os.Open(info.filePath)
if err != nil {
plugin.Error("Failed to open file", "path", info.filePath, "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reader := bufio.NewReader(file)
if i == 0 {
// 第一个文件,直接拷贝
_, err = io.Copy(writer, reader)
} else {
// 后续文件跳过PAT/PMT包只拷贝媒体数据
err = plugin.copyTsFileSkipHeaders(writer, reader)
}
file.Close()
if err != nil {
plugin.Error("Failed to copy file", "path", info.filePath, "err", err)
return
}
}
plugin.Info("TS download completed")
}
// copyTsFileSkipHeaders 拷贝TS文件跳过PAT/PMT包
func (plugin *HLSPlugin) copyTsFileSkipHeaders(writer io.Writer, reader *bufio.Reader) error {
buffer := make([]byte, mpegts.TS_PACKET_SIZE)
for {
n, err := io.ReadFull(reader, buffer)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return err
}
if n != mpegts.TS_PACKET_SIZE {
continue
}
// 检查同步字节
if buffer[0] != 0x47 {
continue
}
// 提取PID
pid := uint16(buffer[1]&0x1f)<<8 | uint16(buffer[2])
// 跳过PAT(PID=0)和PMT(PID=256)包
if pid == mpegts.PID_PAT || pid == mpegts.PID_PMT {
continue
}
// 写入媒体数据包
_, err = writer.Write(buffer)
if err != nil {
return err
}
}
return nil
}
// download 下载处理函数
func (plugin *HLSPlugin) download(w http.ResponseWriter, r *http.Request) {
// 解析请求参数
params, err := plugin.parseRequestParams(r)
if err != nil {
plugin.Error("Failed to parse request params", "err", err)
http.Error(w, "Invalid parameters", http.StatusBadRequest)
return
}
plugin.Info("TS download request", "streamPath", params.streamPath, "timeRange", params.timeRange)
// 查询录像记录
recordStreams, err := plugin.queryRecordStreams(params)
if err != nil {
plugin.Error("Failed to query record streams", "err", err)
http.Error(w, "Database error", http.StatusInternalServerError)
return
}
if len(recordStreams) == 0 {
plugin.Warn("No records found", "streamPath", params.streamPath)
http.Error(w, "No records found", http.StatusNotFound)
return
}
// 构建文件信息列表
fileInfoList, found := plugin.buildFileInfoList(recordStreams, params.startTime, params.endTime)
if !found {
plugin.Warn("No valid files found", "streamPath", params.streamPath)
http.Error(w, "No valid files found", http.StatusNotFound)
return
}
// 检查文件类型并处理
if plugin.hasOnlyMp4Records(fileInfoList) {
// 只有MP4记录转换为TS
mp4Files := plugin.filterMp4Files(fileInfoList)
plugin.processMp4ToTs(w, r, mp4Files, params)
} else {
// 有TS记录优先使用TS文件
tsFiles := plugin.filterTsFiles(fileInfoList)
if len(tsFiles) > 0 {
plugin.processTsFiles(w, r, tsFiles, params)
} else {
// 没有TS文件使用MP4转换
mp4Files := plugin.filterMp4Files(fileInfoList)
plugin.processMp4ToTs(w, r, mp4Files, params)
}
}
}

View File

@@ -59,6 +59,7 @@ func (p *HLSPlugin) OnInit() (err error) {
func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/vod/{streamPath...}": p.vod,
"/download/{streamPath...}": p.download,
"/api/record/start/{streamPath...}": p.API_record_start,
"/api/record/stop/{id}": p.API_record_stop,
}

1209
plugin/mp4/api_extract.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,11 @@ var _ = m7s.InstallPlugin[MP4Plugin](m7s.PluginMeta{
func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/download/{streamPath...}": p.download,
"/download/{streamPath...}": p.download,
"/extractClip/{streamPath...}": p.extractClipToFileHandel,
"/extractCompressed/{streamPath...}": p.extractCompressedVideoHandel,
"/extractGop/{streamPath...}": p.extractGopVideoHandel,
"/snap/{streamPath...}": p.snapHandel,
}
}

View File

@@ -54,8 +54,16 @@ func (t *TrakBox) Unmarshal(buf []byte) (b IBox, err error) {
return t, err
}
// SampleCallback 定义样本处理回调函数类型
type SampleCallback func(sample *Sample, sampleIndex int) error
// ParseSamples parses the sample table and builds the sample list
func (t *TrakBox) ParseSamples() (samplelist []Sample) {
return t.ParseSamplesWithCallback(nil)
}
// ParseSamplesWithCallback parses the sample table and builds the sample list with optional callback
func (t *TrakBox) ParseSamplesWithCallback(callback SampleCallback) (samplelist []Sample) {
stbl := t.MDIA.MINF.STBL
var chunkOffsets []uint64
if stbl.STCO != nil {
@@ -150,6 +158,17 @@ func (t *TrakBox) ParseSamples() (samplelist []Sample) {
}
}
// 调用回调函数处理每个样本
if callback != nil {
for i := range samplelist {
if err := callback(&samplelist[i], i); err != nil {
// 如果回调返回错误,可以选择记录或处理,但不中断解析
// 这里为了保持向后兼容性,我们继续处理
continue
}
}
}
return samplelist
}

View File

@@ -6,8 +6,10 @@ import (
"slices"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/mp4/pkg/box"
. "m7s.live/v5/plugin/mp4/pkg/box"
rtmp "m7s.live/v5/plugin/rtmp/pkg"
)
type (
@@ -30,7 +32,7 @@ type (
Number uint32
CryptByteBlock uint8
SkipByteBlock uint8
PsshBoxes []*PsshBox
PsshBoxes []*box.PsshBox
}
SubSamplePattern struct {
BytesClear uint16
@@ -43,16 +45,28 @@ type (
chunkoffset uint64
}
RTMPFrame struct {
Frame any // 可以是 *rtmp.RTMPVideo 或 *rtmp.RTMPAudio
}
Demuxer struct {
reader io.ReadSeeker
Tracks []*Track
ReadSampleIdx []uint32
IsFragment bool
// pssh []*PsshBox
moov *MoovBox
mdat *MediaDataBox
// pssh []*box.PsshBox
moov *box.MoovBox
mdat *box.MediaDataBox
mdatOffset uint64
QuicTime bool
// 预生成的 RTMP 帧
RTMPVideoSequence *rtmp.RTMPVideo
RTMPAudioSequence *rtmp.RTMPAudio
RTMPFrames []RTMPFrame
// RTMP 帧生成配置
RTMPAllocator *util.ScalableMemoryAllocator
}
)
@@ -63,6 +77,10 @@ func NewDemuxer(r io.ReadSeeker) *Demuxer {
}
func (d *Demuxer) Demux() (err error) {
return d.DemuxWithAllocator(nil)
}
func (d *Demuxer) DemuxWithAllocator(allocator *util.ScalableMemoryAllocator) (err error) {
// decodeVisualSampleEntry := func() (offset int, err error) {
// var encv VisualSampleEntry
@@ -96,7 +114,7 @@ func (d *Demuxer) Demux() (err error) {
// }
// return
// }
var b IBox
var b box.IBox
var offset uint64
for {
b, err = box.ReadFrom(d.reader)
@@ -107,53 +125,59 @@ func (d *Demuxer) Demux() (err error) {
return err
}
offset += b.Size()
switch box := b.(type) {
case *FileTypeBox:
if slices.Contains(box.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) {
switch boxData := b.(type) {
case *box.FileTypeBox:
if slices.Contains(boxData.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) {
d.QuicTime = true
}
case *FreeBox:
case *MediaDataBox:
d.mdat = box
d.mdatOffset = offset - b.Size() + uint64(box.HeaderSize())
case *MoovBox:
if box.MVEX != nil {
case *box.FreeBox:
case *box.MediaDataBox:
d.mdat = boxData
d.mdatOffset = offset - b.Size() + uint64(boxData.HeaderSize())
case *box.MoovBox:
if boxData.MVEX != nil {
d.IsFragment = true
}
for _, trak := range box.Tracks {
for _, trak := range boxData.Tracks {
track := &Track{}
track.TrackId = trak.TKHD.TrackID
track.Duration = uint32(trak.TKHD.Duration)
track.Timescale = trak.MDIA.MDHD.Timescale
track.Samplelist = trak.ParseSamples()
// 创建RTMP样本处理回调
var sampleCallback box.SampleCallback
if d.RTMPAllocator != nil {
sampleCallback = d.createRTMPSampleCallback(track, trak)
}
track.Samplelist = trak.ParseSamplesWithCallback(sampleCallback)
if len(trak.MDIA.MINF.STBL.STSD.Entries) > 0 {
entryBox := trak.MDIA.MINF.STBL.STSD.Entries[0]
switch entry := entryBox.(type) {
case *AudioSampleEntry:
case *box.AudioSampleEntry:
switch entry.Type() {
case TypeMP4A:
track.Cid = MP4_CODEC_AAC
case TypeALAW:
track.Cid = MP4_CODEC_G711A
case TypeULAW:
track.Cid = MP4_CODEC_G711U
case TypeOPUS:
track.Cid = MP4_CODEC_OPUS
case box.TypeMP4A:
track.Cid = box.MP4_CODEC_AAC
case box.TypeALAW:
track.Cid = box.MP4_CODEC_G711A
case box.TypeULAW:
track.Cid = box.MP4_CODEC_G711U
case box.TypeOPUS:
track.Cid = box.MP4_CODEC_OPUS
}
track.SampleRate = entry.Samplerate
track.ChannelCount = uint8(entry.ChannelCount)
track.SampleSize = entry.SampleSize
switch extra := entry.ExtraData.(type) {
case *ESDSBox:
track.Cid, track.ExtraData = DecodeESDescriptor(extra.Data)
case *box.ESDSBox:
track.Cid, track.ExtraData = box.DecodeESDescriptor(extra.Data)
}
case *VisualSampleEntry:
track.ExtraData = entry.ExtraData.(*DataBox).Data
case *box.VisualSampleEntry:
track.ExtraData = entry.ExtraData.(*box.DataBox).Data
switch entry.Type() {
case TypeAVC1:
track.Cid = MP4_CODEC_H264
case TypeHVC1, TypeHEV1:
track.Cid = MP4_CODEC_H265
case box.TypeAVC1:
track.Cid = box.MP4_CODEC_H264
case box.TypeHVC1, box.TypeHEV1:
track.Cid = box.MP4_CODEC_H265
}
track.Width = uint32(entry.Width)
track.Height = uint32(entry.Height)
@@ -161,9 +185,9 @@ func (d *Demuxer) Demux() (err error) {
}
d.Tracks = append(d.Tracks, track)
}
d.moov = box
case *MovieFragmentBox:
for _, traf := range box.TRAFs {
d.moov = boxData
case *box.MovieFragmentBox:
for _, traf := range boxData.TRAFs {
track := d.Tracks[traf.TFHD.TrackID-1]
track.defaultSize = traf.TFHD.DefaultSampleSize
track.defaultDuration = traf.TFHD.DefaultSampleDuration
@@ -171,6 +195,7 @@ func (d *Demuxer) Demux() (err error) {
}
}
d.ReadSampleIdx = make([]uint32, len(d.Tracks))
// for _, track := range d.Tracks {
// if len(track.Samplelist) > 0 {
// track.StartDts = uint64(track.Samplelist[0].DTS) * 1000 / uint64(track.Timescale)
@@ -180,7 +205,7 @@ func (d *Demuxer) Demux() (err error) {
return nil
}
func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
func (d *Demuxer) SeekTime(dts uint64) (sample *box.Sample, err error) {
var audioTrack, videoTrack *Track
for _, track := range d.Tracks {
if track.Cid.IsAudio() {
@@ -218,6 +243,54 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
return
}
/**
* @brief 函数跳帧到dts 前面的第一个关键帧位置
*
* @param 参数名dts 跳帧位置
*
* @todo 待实现的功能或改进点 audioTrack 没有同步改进
* @author erroot
* @date 250614
*
**/
func (d *Demuxer) SeekTimePreIDR(dts uint64) (sample *Sample, err error) {
var audioTrack, videoTrack *Track
for _, track := range d.Tracks {
if track.Cid.IsAudio() {
audioTrack = track
} else if track.Cid.IsVideo() {
videoTrack = track
}
}
if videoTrack != nil {
idx := videoTrack.SeekPreIDR(dts)
if idx == -1 {
return nil, errors.New("seek failed")
}
d.ReadSampleIdx[videoTrack.TrackId-1] = uint32(idx)
sample = &videoTrack.Samplelist[idx]
if audioTrack != nil {
for i, sample := range audioTrack.Samplelist {
if sample.Offset < int64(videoTrack.Samplelist[idx].Offset) {
continue
}
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(i)
break
}
}
} else if audioTrack != nil {
idx := audioTrack.Seek(dts)
if idx == -1 {
return nil, errors.New("seek failed")
}
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(idx)
sample = &audioTrack.Samplelist[idx]
} else {
return nil, pkg.ErrNoTrack
}
return
}
// func (d *Demuxer) decodeTRUN(trun *TrackRunBox) {
// dataOffset := trun.Dataoffset
// nextDts := d.currentTrack.StartDts
@@ -377,10 +450,10 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
// return nil
// }
func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) {
func (d *Demuxer) ReadSample(yield func(*Track, box.Sample) bool) {
for {
maxdts := int64(-1)
minTsSample := Sample{Timestamp: uint32(maxdts)}
minTsSample := box.Sample{Timestamp: uint32(maxdts)}
var whichTrack *Track
whichTracki := 0
for i, track := range d.Tracks {
@@ -414,9 +487,9 @@ func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) {
}
}
func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
func (d *Demuxer) RangeSample(yield func(*Track, *box.Sample) bool) {
for {
var minTsSample *Sample
var minTsSample *box.Sample
var whichTrack *Track
whichTracki := 0
for i, track := range d.Tracks {
@@ -448,6 +521,244 @@ func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
}
// GetMoovBox returns the Movie Box from the demuxer
func (d *Demuxer) GetMoovBox() *MoovBox {
func (d *Demuxer) GetMoovBox() *box.MoovBox {
return d.moov
}
// CreateRTMPSequenceFrame 创建 RTMP 序列帧
func (d *Demuxer) CreateRTMPSequenceFrame(track *Track, allocator *util.ScalableMemoryAllocator) (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio, err error) {
switch track.Cid {
case box.MP4_CODEC_H264:
videoSeq = &rtmp.RTMPVideo{}
videoSeq.SetAllocator(allocator)
videoSeq.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData)
case box.MP4_CODEC_H265:
videoSeq = &rtmp.RTMPVideo{}
videoSeq.SetAllocator(allocator)
videoSeq.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData)
case box.MP4_CODEC_AAC:
audioSeq = &rtmp.RTMPAudio{}
audioSeq.SetAllocator(allocator)
audioSeq.Append([]byte{0xaf, 0x00}, track.ExtraData)
}
return
}
// ConvertSampleToRTMP 将 MP4 sample 转换为 RTMP 格式
func (d *Demuxer) ConvertSampleToRTMP(track *Track, sample box.Sample, allocator *util.ScalableMemoryAllocator, timestampOffset uint64) (videoFrame *rtmp.RTMPVideo, audioFrame *rtmp.RTMPAudio, err error) {
switch track.Cid {
case box.MP4_CODEC_H264:
videoFrame = &rtmp.RTMPVideo{}
videoFrame.SetAllocator(allocator)
videoFrame.CTS = sample.CTS
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_H265:
videoFrame = &rtmp.RTMPVideo{}
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.CTS)
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
var head []byte
var b0 byte = 0b1010_0000
if sample.KeyFrame {
b0 = 0b1001_0000
}
if videoFrame.CTS == 0 {
head = videoFrame.NextN(5)
head[0] = b0 | rtmp.PacketTypeCodedFramesX
} else {
head = videoFrame.NextN(8)
head[0] = b0 | rtmp.PacketTypeCodedFrames
util.PutBE(head[5:8], videoFrame.CTS) // cts
}
copy(head[1:], codec.FourCC_H265[:])
videoFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_AAC:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0xaf, 0x01})
audioFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_G711A:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x72})
audioFrame.AddRecycleBytes(sample.Data)
case box.MP4_CODEC_G711U:
audioFrame = &rtmp.RTMPAudio{}
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x82})
audioFrame.AddRecycleBytes(sample.Data)
}
return
}
// GetRTMPSequenceFrames 获取预生成的 RTMP 序列帧
func (d *Demuxer) GetRTMPSequenceFrames() (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio) {
return d.RTMPVideoSequence, d.RTMPAudioSequence
}
// IterateRTMPFrames 迭代预生成的 RTMP 帧
func (d *Demuxer) IterateRTMPFrames(timestampOffset uint64, yield func(*RTMPFrame) bool) {
for i := range d.RTMPFrames {
frame := &d.RTMPFrames[i]
// 应用时间戳偏移
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
f.Timestamp += uint32(timestampOffset)
case *rtmp.RTMPAudio:
f.Timestamp += uint32(timestampOffset)
}
if !yield(frame) {
return
}
}
}
// GetMaxTimestamp 获取所有帧中的最大时间戳
func (d *Demuxer) GetMaxTimestamp() uint64 {
var maxTimestamp uint64
for _, frame := range d.RTMPFrames {
var timestamp uint64
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
timestamp = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timestamp = uint64(f.Timestamp)
}
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
// generateRTMPFrames 生成RTMP序列帧和所有帧数据
func (d *Demuxer) generateRTMPFrames(allocator *util.ScalableMemoryAllocator) (err error) {
// 生成序列帧
for _, track := range d.Tracks {
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
d.RTMPVideoSequence, _, err = d.CreateRTMPSequenceFrame(track, allocator)
if err != nil {
return err
}
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
_, d.RTMPAudioSequence, err = d.CreateRTMPSequenceFrame(track, allocator)
if err != nil {
return err
}
}
}
// 预生成所有 RTMP 帧
d.RTMPFrames = make([]RTMPFrame, 0)
// 收集所有样本并按时间戳排序
type sampleInfo struct {
track *Track
sample box.Sample
sampleIndex uint32
trackIndex int
}
var allSamples []sampleInfo
for trackIdx, track := range d.Tracks {
for sampleIdx, sample := range track.Samplelist {
// 读取样本数据
if _, err = d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return err
}
sample.Data = allocator.Malloc(sample.Size)
if _, err = io.ReadFull(d.reader, sample.Data); err != nil {
allocator.Free(sample.Data)
return err
}
allSamples = append(allSamples, sampleInfo{
track: track,
sample: sample,
sampleIndex: uint32(sampleIdx),
trackIndex: trackIdx,
})
}
}
// 按时间戳排序样本
slices.SortFunc(allSamples, func(a, b sampleInfo) int {
timeA := uint64(a.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(a.track.Timescale)
timeB := uint64(b.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(b.track.Timescale)
if timeA < timeB {
return -1
} else if timeA > timeB {
return 1
}
return 0
})
// 预生成 RTMP 帧
for _, sampleInfo := range allSamples {
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(sampleInfo.track, sampleInfo.sample, allocator, 0)
if err != nil {
return err
}
if videoFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
}
if audioFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
}
}
return nil
}
// createRTMPSampleCallback 创建RTMP样本处理回调函数
func (d *Demuxer) createRTMPSampleCallback(track *Track, trak *box.TrakBox) box.SampleCallback {
// 首先生成序列帧
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
videoSeq, _, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
if err == nil {
d.RTMPVideoSequence = videoSeq
}
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
_, audioSeq, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
if err == nil {
d.RTMPAudioSequence = audioSeq
}
}
return func(sample *box.Sample, sampleIndex int) error {
// 读取样本数据
if _, err := d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return err
}
sample.Data = d.RTMPAllocator.Malloc(sample.Size)
if _, err := io.ReadFull(d.reader, sample.Data); err != nil {
d.RTMPAllocator.Free(sample.Data)
return err
}
// 转换为 RTMP 格式
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(track, *sample, d.RTMPAllocator, 0)
if err != nil {
return err
}
// 内部收集RTMP帧
if videoFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
}
if audioFrame != nil {
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
}
return nil
}
}

View File

@@ -3,13 +3,12 @@ package mp4
import (
"errors"
"io"
"slices"
"strings"
"time"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/mp4/pkg/box"
rtmp "m7s.live/v5/plugin/rtmp/pkg"
)
@@ -35,9 +34,40 @@ func (p *HTTPReader) Run() (err error) {
content, err = io.ReadAll(p.ReadCloser)
demuxer = NewDemuxer(strings.NewReader(string(content)))
}
if err = demuxer.Demux(); err != nil {
// 设置RTMP分配器以启用RTMP帧收集
demuxer.RTMPAllocator = allocator
if err = demuxer.DemuxWithAllocator(allocator); err != nil {
return
}
// 获取demuxer内部收集的RTMP帧
rtmpFrames := demuxer.RTMPFrames
// 按时间戳排序所有帧
slices.SortFunc(rtmpFrames, func(a, b RTMPFrame) int {
var timeA, timeB uint64
switch f := a.Frame.(type) {
case *rtmp.RTMPVideo:
timeA = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timeA = uint64(f.Timestamp)
}
switch f := b.Frame.(type) {
case *rtmp.RTMPVideo:
timeB = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timeB = uint64(f.Timestamp)
}
if timeA < timeB {
return -1
} else if timeA > timeB {
return 1
}
return 0
})
publisher.OnSeek = func(seekTime time.Time) {
p.Stop(errors.New("seek"))
pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
@@ -48,103 +78,61 @@ func (p *HTTPReader) Run() (err error) {
seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey))
demuxer.SeekTime(uint64(seekTime.UnixMilli()))
}
for _, track := range demuxer.Tracks {
switch track.Cid {
case box.MP4_CODEC_H264:
var sequence rtmp.RTMPVideo
sequence.SetAllocator(allocator)
sequence.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData)
err = publisher.WriteVideo(&sequence)
case box.MP4_CODEC_H265:
var sequence rtmp.RTMPVideo
sequence.SetAllocator(allocator)
sequence.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData)
err = publisher.WriteVideo(&sequence)
case box.MP4_CODEC_AAC:
var sequence rtmp.RTMPAudio
sequence.SetAllocator(allocator)
sequence.Append([]byte{0xaf, 0x00}, track.ExtraData)
err = publisher.WriteAudio(&sequence)
// 读取预生成的 RTMP 序列帧
videoSeq, audioSeq := demuxer.GetRTMPSequenceFrames()
if videoSeq != nil {
err = publisher.WriteVideo(videoSeq)
if err != nil {
return err
}
}
if audioSeq != nil {
err = publisher.WriteAudio(audioSeq)
if err != nil {
return err
}
}
// 计算最大时间戳用于累计偏移
var maxTimestamp uint64
for track, sample := range demuxer.ReadSample {
timestamp := uint64(sample.Timestamp) * 1000 / uint64(track.Timescale)
for _, frame := range rtmpFrames {
var timestamp uint64
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
timestamp = uint64(f.Timestamp)
case *rtmp.RTMPAudio:
timestamp = uint64(f.Timestamp)
}
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
var timestampOffset uint64
loop := p.PullJob.Loop
for {
demuxer.ReadSampleIdx = make([]uint32, len(demuxer.Tracks))
for track, sample := range demuxer.ReadSample {
// 使用预生成的 RTMP 帧进行播放
for _, frame := range rtmpFrames {
if p.IsStopped() {
return
return nil
}
if _, err = demuxer.reader.Seek(sample.Offset, io.SeekStart); err != nil {
return
// 应用时间戳偏移
switch f := frame.Frame.(type) {
case *rtmp.RTMPVideo:
f.Timestamp += uint32(timestampOffset)
err = publisher.WriteVideo(f)
case *rtmp.RTMPAudio:
f.Timestamp += uint32(timestampOffset)
err = publisher.WriteAudio(f)
}
sample.Data = allocator.Malloc(sample.Size)
if _, err = io.ReadFull(demuxer.reader, sample.Data); err != nil {
allocator.Free(sample.Data)
return
}
switch track.Cid {
case box.MP4_CODEC_H264:
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = sample.CTS
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
videoFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteVideo(&videoFrame)
case box.MP4_CODEC_H265:
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.CTS = uint32(sample.CTS)
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
var head []byte
var b0 byte = 0b1010_0000
if sample.KeyFrame {
b0 = 0b1001_0000
}
if videoFrame.CTS == 0 {
head = videoFrame.NextN(5)
head[0] = b0 | rtmp.PacketTypeCodedFramesX
} else {
head = videoFrame.NextN(8)
head[0] = b0 | rtmp.PacketTypeCodedFrames
util.PutBE(head[5:8], videoFrame.CTS) // cts
}
copy(head[1:], codec.FourCC_H265[:])
videoFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteVideo(&videoFrame)
case box.MP4_CODEC_AAC:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0xaf, 0x01})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
case box.MP4_CODEC_G711A:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x72})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
case box.MP4_CODEC_G711U:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
audioFrame.AppendOne([]byte{0x82})
audioFrame.AddRecycleBytes(sample.Data)
err = publisher.WriteAudio(&audioFrame)
if err != nil {
return err
}
}
if loop >= 0 {
loop--
if loop == -1 {

View File

@@ -102,6 +102,28 @@ func (track *Track) Seek(dts uint64) int {
return -1
}
/**
* @brief 函数跳帧到dts 前面的第一个关键帧位置
*
* @param 参数名dts 跳帧位置
*
* @author erroot
* @date 250614
*
**/
func (track *Track) SeekPreIDR(dts uint64) int {
idx := 0
for i, sample := range track.Samplelist {
if track.Cid.IsVideo() && sample.KeyFrame {
idx = i
}
if sample.Timestamp*1000/uint32(track.Timescale) > uint32(dts) {
break
}
}
return idx
}
func (track *Track) makeEdtsBox() *ContainerBox {
return CreateContainerBox(TypeEDTS, track.makeElstBox())
}

338
plugin/mp4/util.go Normal file
View File

@@ -0,0 +1,338 @@
package plugin_mp4
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"image"
"image/color"
"image/jpeg"
"io"
"log"
"os"
"os/exec"
mp4 "m7s.live/v5/plugin/mp4/pkg"
"m7s.live/v5/plugin/mp4/pkg/box"
)
func saveAsJPG(img image.Image, path string) error {
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
opt := jpeg.Options{Quality: 90}
return jpeg.Encode(file, img, &opt)
}
func ExtractH264SPSPPS(extraData []byte) (sps, pps []byte, err error) {
if len(extraData) < 7 {
return nil, nil, fmt.Errorf("extradata too short")
}
// 解析 SPS 数量 (第6字节低5位)
spsCount := int(extraData[5] & 0x1F)
offset := 6 // 当前解析位置
// 提取 SPS
for i := 0; i < spsCount; i++ {
if offset+2 > len(extraData) {
return nil, nil, fmt.Errorf("invalid sps length")
}
spsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
offset += 2
if offset+spsLen > len(extraData) {
return nil, nil, fmt.Errorf("sps data overflow")
}
sps = extraData[offset : offset+spsLen]
offset += spsLen
}
// 提取 PPS 数量
if offset >= len(extraData) {
return nil, nil, fmt.Errorf("missing pps count")
}
ppsCount := int(extraData[offset])
offset++
// 提取 PPS
for i := 0; i < ppsCount; i++ {
if offset+2 > len(extraData) {
return nil, nil, fmt.Errorf("invalid pps length")
}
ppsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
offset += 2
if offset+ppsLen > len(extraData) {
return nil, nil, fmt.Errorf("pps data overflow")
}
pps = extraData[offset : offset+ppsLen]
offset += ppsLen
}
return sps, pps, nil
}
// 转换函数(支持动态插入参数集)
func ConvertAVCCH264ToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
var buf bytes.Buffer
pos := 0
for pos < len(data) {
if pos+4 > len(data) {
break
}
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
pos += 4
nalStart := pos
pos += int(nalSize)
if pos > len(data) {
break
}
nalu := data[nalStart:pos]
nalType := nalu[0] & 0x1F
// 关键帧前插入SPS/PPS仅需执行一次
if *isFirst && nalType == 5 {
sps, pps, err := ExtractH264SPSPPS(extraData)
if err != nil {
//panic(err)
return nil, err
}
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(sps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(pps)
//buf.Write(videoTrack.ExtraData)
*isFirst = false // 仅首帧插入
}
// 保留SEI单元类型6和所有其他单元
if nalType == 5 || nalType == 6 { // IDR/SEI用4字节起始码
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
} else {
buf.Write([]byte{0x00, 0x00, 0x01}) // 其他用3字节
}
buf.Write(nalu)
}
return buf.Bytes(), nil
}
/*
H.264与H.265的AVCC格式差异
VPS引入H.265新增视频参数集VPS用于描述多层编码、时序等信息
*/
// 提取H.265的VPS/SPS/PPSHEVCDecoderConfigurationRecord格式
func ExtractHEVCParams(extraData []byte) (vps, sps, pps []byte, err error) {
if len(extraData) < 22 {
return nil, nil, nil, errors.New("extra data too short")
}
// HEVC的extradata格式参考ISO/IEC 14496-15
offset := 22 // 跳过头部22字节
if offset+2 > len(extraData) {
return nil, nil, nil, errors.New("invalid extra data")
}
numOfArrays := int(extraData[offset])
offset++
for i := 0; i < numOfArrays; i++ {
if offset+3 > len(extraData) {
break
}
naluType := extraData[offset] & 0x3F
offset++
count := int(binary.BigEndian.Uint16(extraData[offset:]))
offset += 2
for j := 0; j < count; j++ {
if offset+2 > len(extraData) {
break
}
naluSize := int(binary.BigEndian.Uint16(extraData[offset:]))
offset += 2
if offset+naluSize > len(extraData) {
break
}
naluData := extraData[offset : offset+naluSize]
offset += naluSize
// 根据类型存储参数集
switch naluType {
case 32: // VPS
if vps == nil {
vps = make([]byte, len(naluData))
copy(vps, naluData)
}
case 33: // SPS
if sps == nil {
sps = make([]byte, len(naluData))
copy(sps, naluData)
}
case 34: // PPS
if pps == nil {
pps = make([]byte, len(naluData))
copy(pps, naluData)
}
}
}
}
if vps == nil || sps == nil || pps == nil {
return nil, nil, nil, errors.New("missing required parameter sets")
}
return vps, sps, pps, nil
}
// H.265的AVCC转Annex B
func ConvertAVCCHEVCToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
var buf bytes.Buffer
pos := 0
// 首帧插入VPS/SPS/PPS
if *isFirst {
vps, sps, pps, err := ExtractHEVCParams(extraData)
if err == nil {
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(vps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(sps)
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
buf.Write(pps)
} else {
return nil, err
}
}
// 处理NALU
for pos < len(data) {
if pos+4 > len(data) {
break
}
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
pos += 4
nalStart := pos
pos += int(nalSize)
if pos > len(data) {
break
}
nalu := data[nalStart:pos]
nalType := (nalu[0] >> 1) & 0x3F // H.265的NALU类型在头部的第2-7位
// 关键帧或参数集使用4字节起始码
if nalType == 19 || nalType == 20 || nalType >= 32 && nalType <= 34 {
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
} else {
buf.Write([]byte{0x00, 0x00, 0x01})
}
buf.Write(nalu)
}
return buf.Bytes(), nil
}
// ffmpeg -hide_banner -i gop.mp4 -vf "select=eq(n\,15)" -vframes 1 -f image2 -pix_fmt bgr24 output.bmp
func ProcessWithFFmpeg(samples []box.Sample, index int, videoTrack *mp4.Track) (image.Image, error) {
// code := "h264"
// if videoTrack.Cid == box.MP4_CODEC_H265 {
// code = "hevc"
// }
cmd := exec.Command("ffmpeg",
"-hide_banner",
//"-f", code, //"h264" 强制指定输入格式为H.264裸流
"-i", "pipe:0",
"-vf", fmt.Sprintf("select=eq(n\\,%d)", index),
"-vframes", "1",
"-pix_fmt", "bgr24",
"-f", "rawvideo",
"pipe:1")
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
go func() {
errOutput, _ := io.ReadAll(stderr)
log.Printf("FFmpeg stderr: %s", errOutput)
}()
if err = cmd.Start(); err != nil {
log.Printf("cmd.Start失败: %v", err)
return nil, err
}
go func() {
defer stdin.Close()
isFirst := true
for _, sample := range samples {
if videoTrack.Cid == box.MP4_CODEC_H264 {
annexb, _ := ConvertAVCCH264ToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
if _, err := stdin.Write(annexb); err != nil {
log.Printf("写入失败: %v", err)
break
}
} else {
annexb, _ := ConvertAVCCHEVCToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
if _, err := stdin.Write(annexb); err != nil {
log.Printf("写入失败: %v", err)
break
}
}
}
}()
// 读取原始RGB数据
var buf bytes.Buffer
if _, err = io.Copy(&buf, stdout); err != nil {
log.Printf("读取失败: %v", err)
return nil, err
}
if err = cmd.Wait(); err != nil {
log.Printf("cmd.Wait失败: %v", err)
return nil, err
}
//log.Printf("ffmpeg 提取成功: data size:%v", buf.Len())
// 转换为image.Image对象
data := buf.Bytes()
//width, height := parseBMPDimensions(data)
width := int(videoTrack.Width)
height := int(videoTrack.Height)
log.Printf("ffmpeg size: %v,%v", width, height)
//FFmpeg的 rawvideo 输出默认采用​​从上到下​​的扫描方式
img := image.NewRGBA(image.Rect(0, 0, width, height))
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
//pos := (height-y-1)*width*3 + x*3
pos := (y*width + x) * 3 // 关键修复:按行顺序读取
img.Set(x, y, color.RGBA{
R: data[pos+2],
G: data[pos+1],
B: data[pos],
A: 255,
})
}
}
return img, nil
}

View File

@@ -37,13 +37,12 @@ var (
type WebRTCPlugin struct {
m7s.Plugin
ICEServers []ICEServer `desc:"ice服务器配置"`
Port string `default:"tcp:9000" desc:"监听端口"`
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后发送PLI请求
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
MimeType []string `desc:"MimeType过滤列表为空则不过滤"` // MimeType过滤列表支持的格式如video/H264, audio/opus
s SettingEngine
portMapping map[int]int // 内部端口到外部端口的映射
ICEServers []ICEServer `desc:"ice服务器配置"`
Port string `default:"tcp:9000" desc:"监听端口"`
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后发送PLI请求
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
MimeType []string `desc:"MimeType过滤列表为空则不过滤"` // MimeType过滤列表支持的格式如video/H264, audio/opus
s SettingEngine
}
func (p *WebRTCPlugin) RegisterHandler() map[string]http.HandlerFunc {
@@ -307,90 +306,50 @@ func (p *WebRTCPlugin) initSettingEngine() error {
// configurePort 配置端口设置
func (p *WebRTCPlugin) configurePort() error {
// 使用 ParsePort 而不是 ParsePort2 来获取端口映射信息
portInfo, err := ParsePort(p.Port)
ports, err := ParsePort2(p.Port)
if err != nil {
p.Error("webrtc port config error", "error", err, "port", p.Port)
return err
}
// 初始化端口映射
p.portMapping = make(map[int]int)
// 如果有端口映射,存储映射关系
if portInfo.HasMapping() {
if portInfo.IsRange() {
// 端口范围映射
for i := 0; i <= portInfo.Ports[1]-portInfo.Ports[0]; i++ {
internalPort := portInfo.Ports[0] + i
var externalPort int
if portInfo.IsRangeMapping() {
// 映射端口也是范围
externalPort = portInfo.Map[0] + i
} else {
// 映射端口是单个端口
externalPort = portInfo.Map[0]
}
p.portMapping[internalPort] = externalPort
}
} else {
// 单端口映射
p.portMapping[portInfo.Ports[0]] = portInfo.Map[0]
switch v := ports.(type) {
case TCPPort:
tcpport := int(v)
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
p.OnDispose(func() {
_ = tcpl.Close()
})
if err != nil {
p.Error("webrtc listener tcp", "error", err)
}
p.Info("Port mapping configured", "mapping", p.portMapping)
}
// 根据协议类型进行配置
if portInfo.IsTCP() {
if portInfo.IsRange() {
// TCP端口范围这里可能需要特殊处理
p.Error("TCP port range not supported in current implementation")
return fmt.Errorf("TCP port range not supported")
} else {
// TCP单端口
tcpport := portInfo.Ports[0]
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
p.OnDispose(func() {
_ = tcpl.Close()
})
if err != nil {
p.Error("webrtc listener tcp", "error", err)
return err
}
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
p.Info("webrtc start listen", "port", tcpport)
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
p.s.DisableSRTPReplayProtection(true)
}
} else {
// UDP配置
if portInfo.IsRange() {
// UDP端口范围
p.s.SetEphemeralUDPPortRange(uint16(portInfo.Ports[0]), uint16(portInfo.Ports[1]))
p.SetDescription("udp", fmt.Sprintf("%d-%d", portInfo.Ports[0], portInfo.Ports[1]))
} else {
// UDP单端口
udpport := portInfo.Ports[0]
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: udpport,
})
p.OnDispose(func() {
_ = udpListener.Close()
})
if err != nil {
p.Error("webrtc listener udp", "error", err)
return err
}
p.SetDescription("udp", fmt.Sprintf("%d", udpport))
p.Info("webrtc start listen", "port", udpport)
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
p.Info("webrtc start listen", "port", tcpport)
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
p.s.DisableSRTPReplayProtection(true)
case UDPRangePort:
p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1]))
p.SetDescription("udp", fmt.Sprintf("%d-%d", v[0], v[1]))
case UDPPort:
// 创建共享WEBRTC端口 默认9000
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: int(v),
})
p.OnDispose(func() {
_ = udpListener.Close()
})
if err != nil {
p.Error("webrtc listener udp", "error", err)
return err
}
p.SetDescription("udp", fmt.Sprintf("%d", v))
p.Info("webrtc start listen", "port", v)
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
}
return nil
@@ -409,33 +368,9 @@ func (p *WebRTCPlugin) CreatePC(sd SessionDescription, conf Configuration) (pc *
return
}
pc, err = api.NewPeerConnection(conf)
if err != nil {
return
if err == nil {
err = pc.SetRemoteDescription(sd)
}
// 如果有端口映射配置,记录 ICE 候选者信息以供调试
if len(p.portMapping) > 0 {
pc.OnICECandidate(func(candidate *ICECandidate) {
if candidate != nil {
// 记录端口映射信息(用于调试和监控)
if mappedPort, exists := p.portMapping[int(candidate.Port)]; exists {
p.Debug("ICE candidate with port mapping detected",
"original_port", candidate.Port,
"mapped_port", mappedPort,
"candidate_address", candidate.Address,
"candidate_type", candidate.Typ)
candidate.Port = uint16(mappedPort) // 更新候选者端口为映射后的端口
} else {
p.Debug("ICE candidate generated",
"port", candidate.Port,
"address", candidate.Address,
"type", candidate.Typ)
}
}
})
}
err = pc.SetRemoteDescription(sd)
return
}

View File

@@ -44,8 +44,8 @@ type (
}
RecordStream struct {
ID uint `gorm:"primarykey"`
StartTime time.Time `gorm:"type:datetime;default:NULL"`
EndTime time.Time `gorm:"type:datetime;default:NULL"`
StartTime time.Time `gorm:"default:NULL"`
EndTime time.Time `gorm:"default:NULL"`
Duration uint32 `gorm:"comment:录像时长;default:0"`
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`

202
scripts/packet_replayer.py Normal file
View File

@@ -0,0 +1,202 @@
#!/usr/bin/env python3
import argparse
from scapy.all import rdpcap, IP, TCP, UDP, Raw, send, sr1, sr, PcapReader
import sys
import time
from collections import defaultdict
import random
import threading
import queue
import socket
class PacketReplayer:
def __init__(self, pcap_file, target_ip, target_port):
self.pcap_file = pcap_file
self.target_ip = target_ip
self.target_port = target_port
self.connections = defaultdict(list) # 存储每个连接的包序列
self.response_queue = queue.Queue()
self.stop_reading = threading.Event()
self.socket = None
def establish_tcp_connection(self, src_port):
"""建立TCP连接"""
print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...")
try:
# 创建socket对象
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定源端口(如果指定了端口)
if src_port > 0:
try:
self.socket.bind(('0.0.0.0', src_port))
except socket.error as e:
print(f"指定端口 {src_port} 被占用,将使用随机端口")
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
else:
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
# 获取实际使用的端口
actual_port = self.socket.getsockname()[1]
print(f"使用本地端口: {actual_port}")
# 设置超时
self.socket.settimeout(5)
# 连接目标
self.socket.connect((self.target_ip, self.target_port))
print("TCP连接已建立")
return True
except Exception as e:
print(f"建立连接失败: {e}")
if self.socket:
self.socket.close()
self.socket = None
return False
def process_packet(self, packet, src_ip=None, src_port=None, protocol=None):
"""处理单个数据包"""
if IP not in packet:
return
# 检查源IP
if src_ip and packet[IP].src != src_ip:
return
# 检查协议和源端口
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
elif not protocol: # 如果没有指定协议则包含所有IP包
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
def response_reader(self, src_port):
"""持续读取服务器响应的线程函数"""
while not self.stop_reading.is_set() and self.socket:
try:
# 使用socket接收数据
data = self.socket.recv(4096)
if data:
self.response_queue.put(data)
print(f"收到响应: {len(data)} 字节")
except socket.timeout:
continue
except Exception as e:
if not self.stop_reading.is_set():
print(f"读取响应时出错: {e}")
break
time.sleep(0.1)
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
"""边读取边重放数据包"""
print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}")
try:
# 使用PcapReader逐包读取
reader = PcapReader(self.pcap_file)
packet_count = 0
connection_established = False
# 读取并处理数据包
for packet in reader:
packet_count += 1
if IP not in packet:
continue
# 检查源IP
if src_ip and packet[IP].src != src_ip:
continue
# 检查协议和源端口
current_src_port = None
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
elif not protocol: # 如果没有指定协议则包含所有IP包
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
else:
continue
else:
continue
# 找到第一个符合条件的包,建立连接
if not connection_established:
if not self.establish_tcp_connection(current_src_port):
print("无法建立连接,退出")
return
# 启动响应读取线程
self.stop_reading.clear()
reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,))
reader_thread.daemon = True
reader_thread.start()
connection_established = True
# 发送当前数据包
try:
if Raw in packet:
self.socket.send(packet[Raw].load)
packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time)))
print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (负载大小: {len(packet[Raw].load)} 字节)")
if delay > 0:
time.sleep(delay)
except Exception as e:
print(f"发送数据包 {packet_count} 时出错: {e}")
sys.exit(1) # 发送失败直接退出进程
print(f"总共处理了 {packet_count} 个数据包")
except Exception as e:
print(f"处理数据包时出错: {e}")
sys.exit(1) # 其他错误也直接退出进程
finally:
# 关闭连接和停止读取线程
self.stop_reading.set()
if self.socket:
self.socket.close()
self.socket = None
reader.close()
def main():
parser = argparse.ArgumentParser(description='Wireshark数据包重放工具')
parser.add_argument('pcap_file', help='pcap文件路径')
parser.add_argument('target_ip', help='目标IP地址')
parser.add_argument('target_port', type=int, help='目标端口')
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
parser.add_argument('--src-ip', help='过滤源IP地址')
parser.add_argument('--src-port', type=int, help='过滤源端口')
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
args = parser.parse_args()
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
if __name__ == '__main__':
main()