mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-05 11:26:52 +08:00
Compare commits
8 Commits
dev
...
feat-mp42t
Author | SHA1 | Date | |
---|---|---|---|
![]() |
77613e52a8 | ||
![]() |
ec56bba75a | ||
![]() |
b2b511d755 | ||
![]() |
42acf47250 | ||
![]() |
6206ee847d | ||
![]() |
6cfdc03e4a | ||
![]() |
b425b8da1f | ||
![]() |
e105243cd5 |
@@ -9,14 +9,11 @@ import (
|
||||
|
||||
// User represents a user in the system
|
||||
type User struct {
|
||||
ID uint `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
DeletedAt gorm.DeletedAt `gorm:"index"`
|
||||
Username string `gorm:"uniqueIndex;size:64"`
|
||||
Password string `gorm:"size:60"` // bcrypt hash
|
||||
Role string `gorm:"size:20;default:'user'"` // admin or user
|
||||
LastLogin time.Time `gorm:"type:datetime;default:CURRENT_TIMESTAMP"`
|
||||
gorm.Model
|
||||
Username string `gorm:"uniqueIndex;size:64"`
|
||||
Password string `gorm:"size:60"` // bcrypt hash
|
||||
Role string `gorm:"size:20;default:'user'"` // admin or user
|
||||
LastLogin time.Time `gorm:"type:timestamp;default:CURRENT_TIMESTAMP"`
|
||||
}
|
||||
|
||||
// BeforeCreate hook to hash password before saving
|
||||
|
133
pkg/port.go
133
pkg/port.go
@@ -13,7 +13,6 @@ type (
|
||||
Port struct {
|
||||
Protocol string
|
||||
Ports [2]int
|
||||
Map [2]int // 映射端口范围,通常用于 NAT 或端口转发
|
||||
}
|
||||
IPort interface {
|
||||
IsTCP() bool
|
||||
@@ -23,23 +22,10 @@ type (
|
||||
)
|
||||
|
||||
func (p Port) String() string {
|
||||
var result string
|
||||
if p.Ports[0] == p.Ports[1] {
|
||||
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0])
|
||||
} else {
|
||||
result = p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
|
||||
return p.Protocol + ":" + strconv.Itoa(p.Ports[0])
|
||||
}
|
||||
|
||||
// 如果有端口映射,添加映射信息
|
||||
if p.HasMapping() {
|
||||
if p.Map[0] == p.Map[1] {
|
||||
result += ":" + strconv.Itoa(p.Map[0])
|
||||
} else {
|
||||
result += ":" + strconv.Itoa(p.Map[0]) + "-" + strconv.Itoa(p.Map[1])
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
|
||||
}
|
||||
|
||||
func (p Port) IsTCP() bool {
|
||||
@@ -54,36 +40,6 @@ func (p Port) IsRange() bool {
|
||||
return p.Ports[0] != p.Ports[1]
|
||||
}
|
||||
|
||||
func (p Port) HasMapping() bool {
|
||||
return p.Map[0] > 0 || p.Map[1] > 0
|
||||
}
|
||||
|
||||
func (p Port) IsRangeMapping() bool {
|
||||
return p.HasMapping() && p.Map[0] != p.Map[1]
|
||||
}
|
||||
|
||||
// ParsePort2 解析端口配置字符串并返回对应的端口类型实例
|
||||
// 根据协议类型和端口范围返回不同的类型:
|
||||
// - TCP单端口:返回 TCPPort
|
||||
// - TCP端口范围:返回 TCPRangePort
|
||||
// - UDP单端口:返回 UDPPort
|
||||
// - UDP端口范围:返回 UDPRangePort
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// conf - 端口配置字符串,格式:protocol:port 或 protocol:port1-port2
|
||||
//
|
||||
// 返回值:
|
||||
//
|
||||
// ret - 端口实例 (TCPPort/UDPPort/TCPRangePort/UDPRangePort)
|
||||
// err - 解析错误
|
||||
//
|
||||
// 示例:
|
||||
//
|
||||
// ParsePort2("tcp:8080") // 返回 TCPPort(8080)
|
||||
// ParsePort2("tcp:8080-8090") // 返回 TCPRangePort([2]int{8080, 8090})
|
||||
// ParsePort2("udp:5000") // 返回 UDPPort(5000)
|
||||
// ParsePort2("udp:5000-5010") // 返回 UDPRangePort([2]int{5000, 5010})
|
||||
func ParsePort2(conf string) (ret any, err error) {
|
||||
var port Port
|
||||
port, err = ParsePort(conf)
|
||||
@@ -102,84 +58,10 @@ func ParsePort2(conf string) (ret any, err error) {
|
||||
return UDPPort(port.Ports[0]), nil
|
||||
}
|
||||
|
||||
// ParsePort 解析端口配置字符串为 Port 结构体
|
||||
// 支持协议前缀、端口号/端口范围以及端口映射的解析
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// conf - 端口配置字符串,格式:
|
||||
// - "protocol:port" 单端口,如 "tcp:8080"
|
||||
// - "protocol:port1-port2" 端口范围,如 "tcp:8080-8090"
|
||||
// - "protocol:port:mapPort" 单端口映射,如 "tcp:8080:9090"
|
||||
// - "protocol:port:mapPort1-mapPort2" 单端口映射到端口范围,如 "tcp:8080:9000-9010"
|
||||
// - "protocol:port1-port2:mapPort1-mapPort2" 端口范围映射,如 "tcp:8080-8090:9000-9010"
|
||||
//
|
||||
// 返回值:
|
||||
//
|
||||
// ret - Port 结构体,包含协议、端口和映射端口信息
|
||||
// err - 解析错误
|
||||
//
|
||||
// 注意:
|
||||
// - 如果端口范围中 min > max,会自动交换顺序
|
||||
// - 单端口时,Ports[0] 和 Ports[1] 值相同
|
||||
// - 端口映射时,Map[0] 和 Map[1] 存储映射的目标端口范围
|
||||
// - 单个映射端口时,Map[0] 和 Map[1] 值相同
|
||||
//
|
||||
// 示例:
|
||||
//
|
||||
// ParsePort("tcp:8080") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{0, 0}}
|
||||
// ParsePort("tcp:8080-8090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{0, 0}}
|
||||
// ParsePort("tcp:8080:9090") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9090, 9090}}
|
||||
// ParsePort("tcp:8080:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8080}, Map:[2]int{9000, 9010}}
|
||||
// ParsePort("tcp:8080-8090:9000-9010") // Port{Protocol:"tcp", Ports:[2]int{8080, 8090}, Map:[2]int{9000, 9010}}
|
||||
// ParsePort("udp:5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5000}, Map:[2]int{0, 0}}
|
||||
// ParsePort("udp:5010-5000") // Port{Protocol:"udp", Ports:[2]int{5000, 5010}, Map:[2]int{0, 0}}
|
||||
func ParsePort(conf string) (ret Port, err error) {
|
||||
var port, mapPort string
|
||||
var port string
|
||||
var min, max int
|
||||
|
||||
// 按冒号分割,支持端口映射
|
||||
parts := strings.Split(conf, ":")
|
||||
if len(parts) < 2 || len(parts) > 3 {
|
||||
err = strconv.ErrSyntax
|
||||
return
|
||||
}
|
||||
|
||||
ret.Protocol = parts[0]
|
||||
port = parts[1]
|
||||
|
||||
// 处理端口映射
|
||||
if len(parts) == 3 {
|
||||
mapPort = parts[2]
|
||||
// 解析映射端口,支持单端口和端口范围
|
||||
if mapRange := strings.Split(mapPort, "-"); len(mapRange) == 2 {
|
||||
// 映射端口范围
|
||||
var mapMin, mapMax int
|
||||
mapMin, err = strconv.Atoi(mapRange[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mapMax, err = strconv.Atoi(mapRange[1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if mapMin < mapMax {
|
||||
ret.Map[0], ret.Map[1] = mapMin, mapMax
|
||||
} else {
|
||||
ret.Map[0], ret.Map[1] = mapMax, mapMin
|
||||
}
|
||||
} else {
|
||||
// 单个映射端口
|
||||
var mapPortNum int
|
||||
mapPortNum, err = strconv.Atoi(mapPort)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ret.Map[0], ret.Map[1] = mapPortNum, mapPortNum
|
||||
}
|
||||
}
|
||||
|
||||
// 处理端口范围
|
||||
ret.Protocol, port, _ = strings.Cut(conf, ":")
|
||||
if r := strings.Split(port, "-"); len(r) == 2 {
|
||||
min, err = strconv.Atoi(r[0])
|
||||
if err != nil {
|
||||
@@ -194,12 +76,7 @@ func ParsePort(conf string) (ret Port, err error) {
|
||||
} else {
|
||||
ret.Ports[0], ret.Ports[1] = max, min
|
||||
}
|
||||
} else {
|
||||
var p int
|
||||
p, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else if p, err := strconv.Atoi(port); err == nil {
|
||||
ret.Ports[0], ret.Ports[1] = p, p
|
||||
}
|
||||
return
|
||||
|
370
pkg/port_test.go
370
pkg/port_test.go
@@ -1,370 +0,0 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParsePort(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expected Port
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
input: "tcp:8080",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
input: "tcp:8080-8090",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围(反序)",
|
||||
input: "tcp:8090-8080",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射到单端口",
|
||||
input: "tcp:8080:9090",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9090, 9090},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射到端口范围",
|
||||
input: "tcp:8080:9000-9010",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围映射到端口范围",
|
||||
input: "tcp:8080-8090:9000-9010",
|
||||
expected: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP单端口",
|
||||
input: "udp:5000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围",
|
||||
input: "udp:5000-5010",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5010},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口映射",
|
||||
input: "udp:5000:6000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{6000, 6000},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围映射(映射范围反序)",
|
||||
input: "udp:5000-5010:6010-6000",
|
||||
expected: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5010},
|
||||
Map: [2]int{6000, 6010},
|
||||
},
|
||||
hasError: false,
|
||||
},
|
||||
// 错误情况
|
||||
{
|
||||
name: "缺少协议",
|
||||
input: "8080",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "过多冒号",
|
||||
input: "tcp:8080:9090:extra",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效端口号",
|
||||
input: "tcp:abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效映射端口号",
|
||||
input: "tcp:8080:abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效端口范围",
|
||||
input: "tcp:8080-abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
{
|
||||
name: "无效映射端口范围",
|
||||
input: "tcp:8080:9000-abc",
|
||||
expected: Port{},
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := ParsePort(tt.input)
|
||||
|
||||
if tt.hasError {
|
||||
if err == nil {
|
||||
t.Errorf("期望有错误,但没有错误")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("意外的错误: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if result.Protocol != tt.expected.Protocol {
|
||||
t.Errorf("协议不匹配: 期望 %s, 得到 %s", tt.expected.Protocol, result.Protocol)
|
||||
}
|
||||
|
||||
if result.Ports != tt.expected.Ports {
|
||||
t.Errorf("端口不匹配: 期望 %v, 得到 %v", tt.expected.Ports, result.Ports)
|
||||
}
|
||||
|
||||
if result.Map != tt.expected.Map {
|
||||
t.Errorf("映射端口不匹配: 期望 %v, 得到 %v", tt.expected.Map, result.Map)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortMethods(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
port Port
|
||||
expectTCP bool
|
||||
expectUDP bool
|
||||
expectRange bool
|
||||
expectMapping bool
|
||||
expectRangeMap bool
|
||||
expectString string
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: false,
|
||||
expectMapping: false,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080",
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{0, 0},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: true,
|
||||
expectMapping: false,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080-8090",
|
||||
},
|
||||
{
|
||||
name: "TCP单端口映射",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8080},
|
||||
Map: [2]int{9090, 9090},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: false,
|
||||
expectMapping: true,
|
||||
expectRangeMap: false,
|
||||
expectString: "tcp:8080:9090",
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围映射",
|
||||
port: Port{
|
||||
Protocol: "tcp",
|
||||
Ports: [2]int{8080, 8090},
|
||||
Map: [2]int{9000, 9010},
|
||||
},
|
||||
expectTCP: true,
|
||||
expectUDP: false,
|
||||
expectRange: true,
|
||||
expectMapping: true,
|
||||
expectRangeMap: true,
|
||||
expectString: "tcp:8080-8090:9000-9010",
|
||||
},
|
||||
{
|
||||
name: "UDP单端口映射到端口范围",
|
||||
port: Port{
|
||||
Protocol: "udp",
|
||||
Ports: [2]int{5000, 5000},
|
||||
Map: [2]int{6000, 6010},
|
||||
},
|
||||
expectTCP: false,
|
||||
expectUDP: true,
|
||||
expectRange: false,
|
||||
expectMapping: true,
|
||||
expectRangeMap: true,
|
||||
expectString: "udp:5000:6000-6010",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.port.IsTCP() != tt.expectTCP {
|
||||
t.Errorf("IsTCP(): 期望 %v, 得到 %v", tt.expectTCP, tt.port.IsTCP())
|
||||
}
|
||||
|
||||
if tt.port.IsUDP() != tt.expectUDP {
|
||||
t.Errorf("IsUDP(): 期望 %v, 得到 %v", tt.expectUDP, tt.port.IsUDP())
|
||||
}
|
||||
|
||||
if tt.port.IsRange() != tt.expectRange {
|
||||
t.Errorf("IsRange(): 期望 %v, 得到 %v", tt.expectRange, tt.port.IsRange())
|
||||
}
|
||||
|
||||
if tt.port.HasMapping() != tt.expectMapping {
|
||||
t.Errorf("HasMapping(): 期望 %v, 得到 %v", tt.expectMapping, tt.port.HasMapping())
|
||||
}
|
||||
|
||||
if tt.port.IsRangeMapping() != tt.expectRangeMap {
|
||||
t.Errorf("IsRangeMapping(): 期望 %v, 得到 %v", tt.expectRangeMap, tt.port.IsRangeMapping())
|
||||
}
|
||||
|
||||
if tt.port.String() != tt.expectString {
|
||||
t.Errorf("String(): 期望 %s, 得到 %s", tt.expectString, tt.port.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePort2(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expectedType string
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "TCP单端口",
|
||||
input: "tcp:8080",
|
||||
expectedType: "TCPPort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "TCP端口范围",
|
||||
input: "tcp:8080-8090",
|
||||
expectedType: "TCPRangePort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP单端口",
|
||||
input: "udp:5000",
|
||||
expectedType: "UDPPort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "UDP端口范围",
|
||||
input: "udp:5000-5010",
|
||||
expectedType: "UDPRangePort",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "无效输入",
|
||||
input: "invalid",
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := ParsePort2(tt.input)
|
||||
|
||||
if tt.hasError {
|
||||
if err == nil {
|
||||
t.Errorf("期望有错误,但没有错误")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("意外的错误: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch tt.expectedType {
|
||||
case "TCPPort":
|
||||
if _, ok := result.(TCPPort); !ok {
|
||||
t.Errorf("期望类型 TCPPort, 得到 %T", result)
|
||||
}
|
||||
case "TCPRangePort":
|
||||
if _, ok := result.(TCPRangePort); !ok {
|
||||
t.Errorf("期望类型 TCPRangePort, 得到 %T", result)
|
||||
}
|
||||
case "UDPPort":
|
||||
if _, ok := result.(UDPPort); !ok {
|
||||
t.Errorf("期望类型 UDPPort, 得到 %T", result)
|
||||
}
|
||||
case "UDPRangePort":
|
||||
if _, ok := result.(UDPRangePort); !ok {
|
||||
t.Errorf("期望类型 UDPRangePort, 得到 %T", result)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -1873,8 +1873,8 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
|
||||
Select(`
|
||||
IFNULL(gc.id, 0) AS id,
|
||||
dc.channel_id,
|
||||
dc.device_id,
|
||||
dc.name AS channel_name,
|
||||
d.device_id AS device_id,
|
||||
d.name AS device_name,
|
||||
dc.status AS status,
|
||||
CASE
|
||||
@@ -1883,11 +1883,11 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
|
||||
END AS in_group
|
||||
`).
|
||||
Joins("LEFT JOIN "+deviceTable+" AS d ON dc.device_id = d.device_id").
|
||||
Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND gc.group_id = ?", req.GroupId)
|
||||
Joins("LEFT JOIN "+groupsChannelTable+" AS gc ON dc.channel_id = gc.channel_id AND dc.device_id = gc.device_id AND gc.group_id = ?", req.GroupId)
|
||||
|
||||
// 如果有设备ID过滤条件
|
||||
if req.DeviceId != "" {
|
||||
baseQuery = baseQuery.Where("d.device_id = ?", req.DeviceId)
|
||||
baseQuery = baseQuery.Where("dc.device_id = ?", req.DeviceId)
|
||||
}
|
||||
|
||||
// 统计符合条件的通道总数
|
||||
@@ -1903,7 +1903,7 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
|
||||
query := baseQuery
|
||||
|
||||
// 添加排序
|
||||
query = query.Order("channel_id ASC")
|
||||
query = query.Order("dc.device_id ASC, dc.channel_id ASC")
|
||||
|
||||
// 如果指定了分页参数,则应用分页
|
||||
if req.Page > 0 && req.Count > 0 {
|
||||
@@ -1922,12 +1922,14 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
|
||||
var pbGroupChannels []*pb.GroupChannel
|
||||
for _, result := range results {
|
||||
channelInfo := &pb.GroupChannel{
|
||||
Id: int32(result.ID),
|
||||
GroupId: req.GroupId,
|
||||
ChannelId: result.ChannelID,
|
||||
DeviceId: result.DeviceID,
|
||||
ChannelName: result.ChannelName,
|
||||
DeviceName: result.DeviceName,
|
||||
Status: result.Status,
|
||||
InGroup: result.InGroup, // 设置inGroup字段
|
||||
InGroup: result.InGroup,
|
||||
}
|
||||
|
||||
// 从内存中获取设备信息以获取传输协议
|
||||
@@ -1935,13 +1937,6 @@ func (gb *GB28181Plugin) GetGroupChannels(ctx context.Context, req *pb.GetGroupC
|
||||
channelInfo.StreamMode = device.StreamMode
|
||||
}
|
||||
|
||||
if result.InGroup {
|
||||
channelInfo.Id = int32(result.ID)
|
||||
channelInfo.GroupId = int32(req.GroupId)
|
||||
} else {
|
||||
channelInfo.Id = 0
|
||||
}
|
||||
|
||||
pbGroupChannels = append(pbGroupChannels, channelInfo)
|
||||
}
|
||||
|
||||
@@ -2082,19 +2077,19 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
|
||||
InGroup bool `gorm:"column:in_group"`
|
||||
}
|
||||
|
||||
// 构建查询
|
||||
// 构建优化后的查询
|
||||
query := gb.DB.Table(groupsChannelTable+" AS gc").
|
||||
Select(`
|
||||
gc.id AS id,
|
||||
gc.channel_id AS channel_id,
|
||||
gc.device_id AS device_id,
|
||||
dc.name AS channel_name,
|
||||
d.name AS device_name,
|
||||
dc.status AS status,
|
||||
ch.name AS channel_name,
|
||||
dev.name AS device_name,
|
||||
ch.status AS status,
|
||||
true AS in_group
|
||||
`).
|
||||
Joins("LEFT JOIN "+deviceChannelTable+" AS dc ON gc.channel_id = dc.channel_id").
|
||||
Joins("LEFT JOIN "+deviceTable+" AS d ON gc.device_id = d.device_id").
|
||||
Joins("LEFT JOIN "+deviceChannelTable+" AS ch ON gc.device_id = ch.device_id AND gc.channel_id = ch.channel_id").
|
||||
Joins("LEFT JOIN "+deviceTable+" AS dev ON ch.device_id = dev.device_id").
|
||||
Where("gc.group_id = ?", groupId)
|
||||
|
||||
var results []Result
|
||||
@@ -2107,7 +2102,7 @@ func (gb *GB28181Plugin) getGroupChannels(groupId int32) ([]*pb.GroupChannel, er
|
||||
for _, result := range results {
|
||||
channelInfo := &pb.GroupChannel{
|
||||
Id: int32(result.ID),
|
||||
GroupId: groupId,
|
||||
GroupId: groupId, // 使用函数参数 groupId
|
||||
ChannelId: result.ChannelID,
|
||||
DeviceId: result.DeviceID,
|
||||
ChannelName: result.ChannelName,
|
||||
@@ -2868,7 +2863,7 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
|
||||
}
|
||||
|
||||
// 删除设备关联的通道
|
||||
if err := tx.Delete(&gb28181.DeviceChannel{DeviceID: req.Id}).Error; err != nil {
|
||||
if err := tx.Where("device_id = ?", req.Id).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
tx.Rollback()
|
||||
resp.Code = 500
|
||||
resp.Message = "删除设备通道失败"
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -99,15 +100,20 @@ func (d *Dialog) Start() (err error) {
|
||||
|
||||
d.gb.dialogs.Set(d)
|
||||
//defer d.gb.dialogs.Remove(d)
|
||||
if d.gb.MediaPort.Valid() {
|
||||
select {
|
||||
case d.MediaPort = <-d.gb.tcpPorts:
|
||||
default:
|
||||
return fmt.Errorf("no available tcp port")
|
||||
}
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.MediaPort = d.gb.tcpPort
|
||||
} else {
|
||||
d.MediaPort = d.gb.MediaPort[0]
|
||||
if d.gb.MediaPort.Valid() {
|
||||
select {
|
||||
case d.MediaPort = <-d.gb.tcpPorts:
|
||||
default:
|
||||
return fmt.Errorf("no available tcp port")
|
||||
}
|
||||
} else {
|
||||
d.MediaPort = d.gb.MediaPort[0]
|
||||
}
|
||||
}
|
||||
|
||||
ssrc := d.CreateSSRC(d.gb.Serial)
|
||||
d.Info("MediaIp is ", device.MediaIp)
|
||||
|
||||
@@ -266,7 +272,7 @@ func (d *Dialog) Run() (err error) {
|
||||
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
|
||||
d.SSRC = uint32(_ssrc)
|
||||
} else {
|
||||
d.gb.Error("read invite response y ", "err", err)
|
||||
return errors.New("read invite respose y error" + err.Error())
|
||||
}
|
||||
}
|
||||
case "c":
|
||||
@@ -299,6 +305,18 @@ func (d *Dialog) Run() (err error) {
|
||||
if d.StreamMode == "TCP-ACTIVE" {
|
||||
pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
|
||||
} else {
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
|
||||
if d.gb.netListener != nil {
|
||||
d.Info("use gb.netListener", d.gb.netListener.Addr())
|
||||
pub.Receiver.Listener = d.gb.netListener
|
||||
} else {
|
||||
d.Info("listen tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
|
||||
pub.Receiver.Listener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
|
||||
d.gb.netListener = pub.Receiver.Listener
|
||||
}
|
||||
pub.Receiver.SSRC = d.SSRC
|
||||
}
|
||||
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
|
||||
}
|
||||
pub.Receiver.StreamMode = d.StreamMode
|
||||
@@ -316,7 +334,11 @@ func (d *Dialog) GetKey() uint32 {
|
||||
}
|
||||
|
||||
func (d *Dialog) Dispose() {
|
||||
d.gb.tcpPorts <- d.MediaPort
|
||||
if d.gb.tcpPort == 0 {
|
||||
// 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用
|
||||
d.gb.tcpPorts <- d.MediaPort
|
||||
}
|
||||
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceID, "channelId", d.Channel.ChannelID)
|
||||
if d.session != nil {
|
||||
err := d.session.Bye(d)
|
||||
if err != nil {
|
||||
|
@@ -3,9 +3,9 @@ package plugin_gb28181pro
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -41,7 +41,7 @@ type GB28181Plugin struct {
|
||||
pb.UnimplementedApiServer
|
||||
m7s.Plugin
|
||||
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
|
||||
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
|
||||
Password string
|
||||
Sip SipConfig
|
||||
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
|
||||
@@ -55,12 +55,14 @@ type GB28181Plugin struct {
|
||||
forwardDialogs util.Collection[uint32, *ForwardDialog]
|
||||
platforms util.Collection[string, *Platform]
|
||||
tcpPorts chan uint16
|
||||
tcpPort uint16
|
||||
sipPorts []int
|
||||
SipIP string `desc:"sip发送命令的IP,一般是本地IP,多网卡时需要配置正确的IP"`
|
||||
MediaIP string `desc:"流媒体IP,用于接收流"`
|
||||
deviceManager task.Manager[string, *DeviceRegisterQueueTask]
|
||||
Platforms []*gb28181.PlatformModel
|
||||
channels util.Collection[string, *gb28181.DeviceChannel]
|
||||
netListener net.Listener
|
||||
}
|
||||
|
||||
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
|
||||
@@ -75,6 +77,18 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
|
||||
NewPullProxy: NewPullProxy,
|
||||
})
|
||||
|
||||
func (gb *GB28181Plugin) Dispose() {
|
||||
if gb.netListener != nil {
|
||||
gb.Info("gb28181 plugin dispose")
|
||||
err := gb.netListener.Close()
|
||||
if err != nil {
|
||||
gb.Error("Close netListener error", "error", err)
|
||||
} else {
|
||||
gb.Info("netListener closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
sip.SIPDebug = true
|
||||
}
|
||||
@@ -153,8 +167,16 @@ func (gb *GB28181Plugin) OnInit() (err error) {
|
||||
if gb.MediaPort.Valid() {
|
||||
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
|
||||
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
|
||||
for i := range gb.MediaPort.Size() {
|
||||
gb.tcpPorts <- gb.MediaPort[0] + i
|
||||
if gb.MediaPort.Size() == 0 {
|
||||
gb.tcpPort = gb.MediaPort[0]
|
||||
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
|
||||
} else if gb.MediaPort.Size() == 1 {
|
||||
gb.tcpPort = gb.MediaPort[0] + 1
|
||||
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
|
||||
} else {
|
||||
for i := range gb.MediaPort.Size() {
|
||||
gb.tcpPorts <- gb.MediaPort[0] + i
|
||||
}
|
||||
}
|
||||
} else {
|
||||
gb.SetDescription("tcp", fmt.Sprintf("%d", gb.MediaPort[0]))
|
||||
@@ -438,22 +460,9 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
|
||||
from := req.From()
|
||||
if from == nil || from.Address.User == "" {
|
||||
gb.Error("OnRegister", "error", "no user")
|
||||
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid sip from format", nil)
|
||||
if err := tx.Respond(response); err != nil {
|
||||
gb.Error("respond BadRequest", "error", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
deviceId := from.Address.User
|
||||
// 验证设备ID是否符合GB28181规范(20位数字)
|
||||
if match, _ := regexp.MatchString(`^\d{20}$`, deviceId); !match {
|
||||
gb.Error("OnRegister", "error", "invalid device id format, must be 20 digits", "deviceId", deviceId)
|
||||
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid device ID format", nil)
|
||||
if err := tx.Respond(response); err != nil {
|
||||
gb.Error("respond BadRequest", "error", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
registerHandlerTask := registerHandlerTask{
|
||||
gb: gb,
|
||||
req: req,
|
||||
|
@@ -44,8 +44,9 @@ type Receiver struct {
|
||||
psAudio PSAudio
|
||||
RTPReader *rtp2.TCP
|
||||
ListenAddr string
|
||||
listener net.Listener
|
||||
Listener net.Listener
|
||||
StreamMode string // 数据流传输模式(UDP:udp传输/TCP-ACTIVE:tcp主动模式/TCP-PASSIVE:tcp被动模式)
|
||||
SSRC uint32 // RTP SSRC
|
||||
}
|
||||
|
||||
func NewPSPublisher(puber *m7s.Publisher) *PSPublisher {
|
||||
@@ -147,9 +148,19 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
|
||||
p.Error("unmarshal error", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 如果设置了SSRC过滤,只处理匹配的SSRC
|
||||
if p.SSRC != 0 && p.SSRC != p.Packet.SSRC {
|
||||
p.Info("into single port mode, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC)
|
||||
if p.TraceEnabled() {
|
||||
p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if lastSeq == 0 || p.SequenceNumber == lastSeq+1 {
|
||||
if p.TraceEnabled() {
|
||||
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
|
||||
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC)
|
||||
}
|
||||
copyData := make([]byte, len(p.Payload))
|
||||
copy(copyData, p.Payload)
|
||||
@@ -172,18 +183,24 @@ func (p *Receiver) Start() (err error) {
|
||||
return nil
|
||||
}
|
||||
// TCP被动模式
|
||||
p.listener, err = net.Listen("tcp4", p.ListenAddr)
|
||||
if err != nil {
|
||||
p.Error("start listen", "err", err)
|
||||
return errors.New("start listen,err" + err.Error())
|
||||
if p.Listener == nil {
|
||||
p.Info("start new listener", "addr", p.ListenAddr)
|
||||
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
|
||||
if err != nil {
|
||||
p.Error("start listen", "err", err)
|
||||
return errors.New("start listen,err" + err.Error())
|
||||
}
|
||||
}
|
||||
p.Info("start listen", "addr", p.ListenAddr)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Receiver) Dispose() {
|
||||
if p.listener != nil {
|
||||
p.listener.Close()
|
||||
if p.SSRC == 0 {
|
||||
p.Info("into multiport mode ,close listener ", p.SSRC)
|
||||
if p.Listener != nil {
|
||||
p.Listener.Close()
|
||||
}
|
||||
}
|
||||
if p.RTPReader != nil {
|
||||
p.RTPReader.Close()
|
||||
@@ -216,7 +233,7 @@ func (p *Receiver) Go() error {
|
||||
}
|
||||
// TCP被动模式
|
||||
p.Info("start accept")
|
||||
conn, err := p.listener.Accept()
|
||||
conn, err := p.Listener.Accept()
|
||||
if err != nil {
|
||||
p.Error("accept", "err", err)
|
||||
return err
|
||||
|
682
plugin/hls/download.go
Normal file
682
plugin/hls/download.go
Normal file
@@ -0,0 +1,682 @@
|
||||
package plugin_hls
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
m7s "m7s.live/v5"
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/util"
|
||||
hls "m7s.live/v5/plugin/hls/pkg"
|
||||
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
|
||||
mp4 "m7s.live/v5/plugin/mp4/pkg"
|
||||
"m7s.live/v5/plugin/mp4/pkg/box"
|
||||
)
|
||||
|
||||
// requestParams 包含请求解析后的参数
|
||||
type requestParams struct {
|
||||
streamPath string
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
timeRange time.Duration
|
||||
}
|
||||
|
||||
// fileInfo 包含文件信息
|
||||
type fileInfo struct {
|
||||
filePath string
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
startOffsetTime time.Duration
|
||||
recordType string // "ts", "mp4", "fmp4"
|
||||
}
|
||||
|
||||
// parseRequestParams 解析请求参数
|
||||
func (plugin *HLSPlugin) parseRequestParams(r *http.Request) (*requestParams, error) {
|
||||
// 从URL路径中提取流路径,去除前缀 "/download/" 和后缀 ".ts"
|
||||
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/download/"), ".ts")
|
||||
|
||||
// 解析URL查询参数中的时间范围(start和end参数)
|
||||
startTime, endTime, err := util.TimeRangeQueryParse(r.URL.Query())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &requestParams{
|
||||
streamPath: streamPath,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
timeRange: endTime.Sub(startTime),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// queryRecordStreams 从数据库查询录像记录
|
||||
func (plugin *HLSPlugin) queryRecordStreams(params *requestParams) ([]m7s.RecordStream, error) {
|
||||
// 检查数据库是否可用
|
||||
if plugin.DB == nil {
|
||||
return nil, fmt.Errorf("database not available")
|
||||
}
|
||||
|
||||
var recordStreams []m7s.RecordStream
|
||||
|
||||
// 首先查询HLS记录 (ts)
|
||||
query := plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type = ?", params.streamPath, "hls")
|
||||
|
||||
// 添加时间范围查询条件
|
||||
if !params.startTime.IsZero() && !params.endTime.IsZero() {
|
||||
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
|
||||
params.endTime, params.startTime, params.startTime, params.endTime)
|
||||
}
|
||||
|
||||
err := query.Order("start_time ASC").Find(&recordStreams).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 如果没有找到HLS记录,尝试查询MP4记录
|
||||
if len(recordStreams) == 0 {
|
||||
query = plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type IN (?)", params.streamPath, []string{"mp4", "fmp4"})
|
||||
|
||||
if !params.startTime.IsZero() && !params.endTime.IsZero() {
|
||||
query = query.Where("(start_time <= ? AND end_time >= ?) OR (start_time >= ? AND start_time <= ?)",
|
||||
params.endTime, params.startTime, params.startTime, params.endTime)
|
||||
}
|
||||
|
||||
err = query.Order("start_time ASC").Find(&recordStreams).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return recordStreams, nil
|
||||
}
|
||||
|
||||
// buildFileInfoList 构建文件信息列表
|
||||
func (plugin *HLSPlugin) buildFileInfoList(recordStreams []m7s.RecordStream, startTime, endTime time.Time) ([]*fileInfo, bool) {
|
||||
var fileInfoList []*fileInfo
|
||||
var found bool
|
||||
|
||||
for _, record := range recordStreams {
|
||||
// 检查文件是否存在
|
||||
if !util.Exist(record.FilePath) {
|
||||
plugin.Warn("Record file not found", "filePath", record.FilePath)
|
||||
continue
|
||||
}
|
||||
|
||||
var startOffsetTime time.Duration
|
||||
recordStartTime := record.StartTime
|
||||
recordEndTime := record.EndTime
|
||||
|
||||
// 计算文件内的偏移时间
|
||||
if startTime.After(recordStartTime) {
|
||||
startOffsetTime = startTime.Sub(recordStartTime)
|
||||
}
|
||||
|
||||
// 检查是否在时间范围内
|
||||
if recordEndTime.Before(startTime) || recordStartTime.After(endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
fileInfoList = append(fileInfoList, &fileInfo{
|
||||
filePath: record.FilePath,
|
||||
startTime: recordStartTime,
|
||||
endTime: recordEndTime,
|
||||
startOffsetTime: startOffsetTime,
|
||||
recordType: record.Type,
|
||||
})
|
||||
|
||||
found = true
|
||||
}
|
||||
|
||||
return fileInfoList, found
|
||||
}
|
||||
|
||||
// hasOnlyMp4Records 检查是否只有MP4记录
|
||||
func (plugin *HLSPlugin) hasOnlyMp4Records(fileInfoList []*fileInfo) bool {
|
||||
if len(fileInfoList) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, info := range fileInfoList {
|
||||
if info.recordType == "hls" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// filterTsFiles 过滤HLS TS文件
|
||||
func (plugin *HLSPlugin) filterTsFiles(fileInfoList []*fileInfo) []*fileInfo {
|
||||
var filteredList []*fileInfo
|
||||
|
||||
for _, info := range fileInfoList {
|
||||
if info.recordType == "hls" {
|
||||
filteredList = append(filteredList, info)
|
||||
}
|
||||
}
|
||||
|
||||
plugin.Debug("TS files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
|
||||
return filteredList
|
||||
}
|
||||
|
||||
// filterMp4Files 过滤MP4文件
|
||||
func (plugin *HLSPlugin) filterMp4Files(fileInfoList []*fileInfo) []*fileInfo {
|
||||
var filteredList []*fileInfo
|
||||
|
||||
for _, info := range fileInfoList {
|
||||
if info.recordType == "mp4" || info.recordType == "fmp4" {
|
||||
filteredList = append(filteredList, info)
|
||||
}
|
||||
}
|
||||
|
||||
plugin.Debug("MP4 files filtered", "original", len(fileInfoList), "filtered", len(filteredList))
|
||||
return filteredList
|
||||
}
|
||||
|
||||
// processMp4ToTs 将MP4记录转换为TS输出
|
||||
func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
|
||||
plugin.Info("Converting MP4 records to TS", "count", len(fileInfoList))
|
||||
|
||||
// 设置HTTP响应头
|
||||
w.Header().Set("Content-Type", "video/mp2t")
|
||||
w.Header().Set("Content-Disposition", "attachment")
|
||||
|
||||
// 创建一个TS写入器,在循环外面,所有MP4文件共享同一个TsInMemory
|
||||
tsWriter := &simpleTsWriter{
|
||||
TsInMemory: &hls.TsInMemory{},
|
||||
plugin: plugin,
|
||||
}
|
||||
|
||||
// 对于MP4到TS的转换,我们采用简化的方法
|
||||
// 直接将每个MP4文件转换输出
|
||||
for _, info := range fileInfoList {
|
||||
if r.Context().Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
plugin.Debug("Converting MP4 file to TS", "path", info.filePath)
|
||||
|
||||
// 创建MP4解复用器
|
||||
demuxer := &mp4.DemuxerRange{
|
||||
StartTime: params.startTime,
|
||||
EndTime: params.endTime,
|
||||
Streams: []m7s.RecordStream{{
|
||||
FilePath: info.filePath,
|
||||
StartTime: info.startTime,
|
||||
EndTime: info.endTime,
|
||||
Type: info.recordType,
|
||||
}},
|
||||
}
|
||||
|
||||
// 设置回调函数
|
||||
demuxer.OnVideoExtraData = tsWriter.onVideoExtraData
|
||||
demuxer.OnAudioExtraData = tsWriter.onAudioExtraData
|
||||
demuxer.OnVideoSample = tsWriter.onVideoSample
|
||||
demuxer.OnAudioSample = tsWriter.onAudioSample
|
||||
|
||||
// 执行解复用和转换
|
||||
err := demuxer.Demux(r.Context())
|
||||
if err != nil {
|
||||
plugin.Error("MP4 to TS conversion failed", "err", err, "file", info.filePath)
|
||||
if !tsWriter.hasWritten {
|
||||
http.Error(w, "Conversion failed", http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 将所有累积的 TsInMemory 内容写入到响应
|
||||
_, err := tsWriter.WriteTo(w)
|
||||
if err != nil {
|
||||
plugin.Error("Failed to write TS data to response", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
plugin.Info("MP4 to TS conversion completed")
|
||||
}
|
||||
|
||||
// simpleTsWriter 简化的TS写入器
|
||||
type simpleTsWriter struct {
|
||||
*hls.TsInMemory
|
||||
plugin *HLSPlugin
|
||||
hasWritten bool
|
||||
spsData []byte
|
||||
ppsData []byte
|
||||
videoCodec box.MP4_CODEC_TYPE
|
||||
audioCodec box.MP4_CODEC_TYPE
|
||||
}
|
||||
|
||||
func (w *simpleTsWriter) WritePMT() {
|
||||
// 初始化 TsInMemory 的 PMT
|
||||
var videoCodec, audioCodec [4]byte
|
||||
switch w.videoCodec {
|
||||
case box.MP4_CODEC_H264:
|
||||
copy(videoCodec[:], []byte("H264"))
|
||||
case box.MP4_CODEC_H265:
|
||||
copy(videoCodec[:], []byte("H265"))
|
||||
}
|
||||
switch w.audioCodec {
|
||||
case box.MP4_CODEC_AAC:
|
||||
copy(audioCodec[:], []byte("MP4A"))
|
||||
|
||||
}
|
||||
w.WritePMTPacket(audioCodec, videoCodec)
|
||||
w.hasWritten = true
|
||||
}
|
||||
|
||||
// onVideoExtraData 处理视频序列头
|
||||
func (w *simpleTsWriter) onVideoExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
|
||||
w.videoCodec = codecType
|
||||
// 解析并存储SPS/PPS数据
|
||||
if codecType == box.MP4_CODEC_H264 && len(data) > 0 {
|
||||
if w.plugin != nil {
|
||||
w.plugin.Debug("Processing H264 extra data", "size", len(data))
|
||||
}
|
||||
|
||||
// 解析AVCC格式的extra data
|
||||
if len(data) >= 8 {
|
||||
// AVCC格式: configurationVersion(1) + AVCProfileIndication(1) + profile_compatibility(1) + AVCLevelIndication(1) +
|
||||
// lengthSizeMinusOne(1) + numOfSequenceParameterSets(1) + ...
|
||||
|
||||
offset := 5 // 跳过前5个字节
|
||||
if offset < len(data) {
|
||||
// 读取SPS数量
|
||||
numSPS := data[offset] & 0x1f
|
||||
offset++
|
||||
|
||||
// 解析SPS
|
||||
for i := 0; i < int(numSPS) && offset < len(data)-1; i++ {
|
||||
if offset+1 >= len(data) {
|
||||
break
|
||||
}
|
||||
spsLength := int(data[offset])<<8 | int(data[offset+1])
|
||||
offset += 2
|
||||
|
||||
if offset+spsLength <= len(data) {
|
||||
// 添加起始码并存储SPS
|
||||
w.spsData = make([]byte, 4+spsLength)
|
||||
copy(w.spsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
|
||||
copy(w.spsData[4:], data[offset:offset+spsLength])
|
||||
offset += spsLength
|
||||
|
||||
if w.plugin != nil {
|
||||
w.plugin.Debug("Extracted SPS", "length", spsLength)
|
||||
}
|
||||
break // 只取第一个SPS
|
||||
}
|
||||
}
|
||||
|
||||
// 读取PPS数量
|
||||
if offset < len(data) {
|
||||
numPPS := data[offset]
|
||||
offset++
|
||||
|
||||
// 解析PPS
|
||||
for i := 0; i < int(numPPS) && offset < len(data)-1; i++ {
|
||||
if offset+1 >= len(data) {
|
||||
break
|
||||
}
|
||||
ppsLength := int(data[offset])<<8 | int(data[offset+1])
|
||||
offset += 2
|
||||
|
||||
if offset+ppsLength <= len(data) {
|
||||
// 添加起始码并存储PPS
|
||||
w.ppsData = make([]byte, 4+ppsLength)
|
||||
copy(w.ppsData[0:4], []byte{0x00, 0x00, 0x00, 0x01})
|
||||
copy(w.ppsData[4:], data[offset:offset+ppsLength])
|
||||
|
||||
if w.plugin != nil {
|
||||
w.plugin.Debug("Extracted PPS", "length", ppsLength)
|
||||
}
|
||||
break // 只取第一个PPS
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// onAudioExtraData 处理音频序列头
|
||||
func (w *simpleTsWriter) onAudioExtraData(codecType box.MP4_CODEC_TYPE, data []byte) error {
|
||||
w.audioCodec = codecType
|
||||
w.plugin.Debug("Processing audio extra data", "codec", codecType, "size", len(data))
|
||||
return nil
|
||||
}
|
||||
|
||||
// onVideoSample 处理视频样本
|
||||
func (w *simpleTsWriter) onVideoSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
|
||||
if !w.hasWritten {
|
||||
w.WritePMT()
|
||||
}
|
||||
|
||||
w.plugin.Debug("Processing video sample", "size", len(sample.Data), "keyFrame", sample.KeyFrame, "timestamp", sample.Timestamp)
|
||||
|
||||
// 转换AVCC格式到Annex-B格式
|
||||
annexBData, err := w.convertAVCCToAnnexB(sample.Data, sample.KeyFrame)
|
||||
if err != nil {
|
||||
w.plugin.Error("Failed to convert AVCC to Annex-B", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(annexBData) == 0 {
|
||||
w.plugin.Warn("Empty Annex-B data after conversion")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建视频帧结构
|
||||
videoFrame := mpegts.MpegtsPESFrame{
|
||||
Pid: mpegts.PID_VIDEO,
|
||||
IsKeyFrame: sample.KeyFrame,
|
||||
}
|
||||
|
||||
// 创建 AnnexB 帧
|
||||
annexBFrame := &pkg.AnnexB{
|
||||
PTS: (time.Duration(sample.Timestamp) + time.Duration(sample.CTS)) * 90,
|
||||
DTS: time.Duration(sample.Timestamp) * 90, // 对于MP4转换,假设PTS=DTS
|
||||
}
|
||||
|
||||
// 根据编解码器类型设置 Hevc 标志
|
||||
if codecType == box.MP4_CODEC_H265 {
|
||||
annexBFrame.Hevc = true
|
||||
}
|
||||
|
||||
annexBFrame.AppendOne(annexBData)
|
||||
|
||||
// 使用 WriteVideoFrame 写入TS包
|
||||
err = w.WriteVideoFrame(annexBFrame, &videoFrame)
|
||||
if err != nil {
|
||||
w.plugin.Error("Failed to write video frame", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertAVCCToAnnexB 将AVCC格式转换为Annex-B格式
|
||||
func (w *simpleTsWriter) convertAVCCToAnnexB(avccData []byte, isKeyFrame bool) ([]byte, error) {
|
||||
if len(avccData) == 0 {
|
||||
return nil, fmt.Errorf("empty AVCC data")
|
||||
}
|
||||
|
||||
var annexBBuffer []byte
|
||||
|
||||
// 如果是关键帧,先添加SPS和PPS
|
||||
if isKeyFrame {
|
||||
if len(w.spsData) > 0 {
|
||||
annexBBuffer = append(annexBBuffer, w.spsData...)
|
||||
w.plugin.Debug("Added SPS to key frame", "spsSize", len(w.spsData))
|
||||
}
|
||||
if len(w.ppsData) > 0 {
|
||||
annexBBuffer = append(annexBBuffer, w.ppsData...)
|
||||
w.plugin.Debug("Added PPS to key frame", "ppsSize", len(w.ppsData))
|
||||
}
|
||||
}
|
||||
|
||||
// 解析AVCC格式的NAL单元
|
||||
offset := 0
|
||||
nalCount := 0
|
||||
|
||||
for offset < len(avccData) {
|
||||
// AVCC格式:4字节长度 + NAL数据
|
||||
if offset+4 > len(avccData) {
|
||||
break
|
||||
}
|
||||
|
||||
// 读取NAL单元长度(大端序)
|
||||
nalLength := int(avccData[offset])<<24 |
|
||||
int(avccData[offset+1])<<16 |
|
||||
int(avccData[offset+2])<<8 |
|
||||
int(avccData[offset+3])
|
||||
offset += 4
|
||||
|
||||
if nalLength <= 0 || offset+nalLength > len(avccData) {
|
||||
w.plugin.Warn("Invalid NAL length", "length", nalLength, "remaining", len(avccData)-offset)
|
||||
break
|
||||
}
|
||||
|
||||
nalData := avccData[offset : offset+nalLength]
|
||||
offset += nalLength
|
||||
nalCount++
|
||||
|
||||
if len(nalData) > 0 {
|
||||
nalType := nalData[0] & 0x1f
|
||||
w.plugin.Debug("Converting NAL unit", "type", nalType, "length", nalLength)
|
||||
|
||||
// 添加起始码前缀
|
||||
annexBBuffer = append(annexBBuffer, []byte{0x00, 0x00, 0x00, 0x01}...)
|
||||
annexBBuffer = append(annexBBuffer, nalData...)
|
||||
}
|
||||
}
|
||||
|
||||
if nalCount == 0 {
|
||||
return nil, fmt.Errorf("no NAL units found in AVCC data")
|
||||
}
|
||||
|
||||
w.plugin.Debug("AVCC to Annex-B conversion completed",
|
||||
"inputSize", len(avccData),
|
||||
"outputSize", len(annexBBuffer),
|
||||
"nalUnits", nalCount)
|
||||
|
||||
return annexBBuffer, nil
|
||||
}
|
||||
|
||||
// onAudioSample 处理音频样本
|
||||
func (w *simpleTsWriter) onAudioSample(codecType box.MP4_CODEC_TYPE, sample box.Sample) error {
|
||||
if !w.hasWritten {
|
||||
w.WritePMT()
|
||||
}
|
||||
|
||||
w.plugin.Debug("Processing audio sample", "codec", codecType, "size", len(sample.Data), "timestamp", sample.Timestamp)
|
||||
|
||||
// 创建音频帧结构
|
||||
audioFrame := mpegts.MpegtsPESFrame{
|
||||
Pid: mpegts.PID_AUDIO,
|
||||
}
|
||||
|
||||
// 根据编解码器类型处理音频数据
|
||||
switch codecType {
|
||||
case box.MP4_CODEC_AAC: // AAC
|
||||
// 创建 ADTS 帧
|
||||
adtsFrame := &pkg.ADTS{
|
||||
DTS: time.Duration(sample.Timestamp) * 90,
|
||||
}
|
||||
|
||||
// 将音频数据添加到帧中
|
||||
copy(adtsFrame.NextN(len(sample.Data)), sample.Data)
|
||||
|
||||
// 使用 WriteAudioFrame 写入TS包
|
||||
err := w.WriteAudioFrame(adtsFrame, &audioFrame)
|
||||
if err != nil {
|
||||
w.plugin.Error("Failed to write audio frame", "error", err)
|
||||
return err
|
||||
}
|
||||
default:
|
||||
// 对于非AAC音频,暂时使用原来的PES包方式
|
||||
pesPacket := mpegts.MpegTsPESPacket{
|
||||
Header: mpegts.MpegTsPESHeader{
|
||||
PacketStartCodePrefix: 0x000001,
|
||||
StreamID: mpegts.STREAM_ID_AUDIO,
|
||||
},
|
||||
}
|
||||
// 设置可选字段
|
||||
pesPacket.Header.ConstTen = 0x80
|
||||
pesPacket.Header.PtsDtsFlags = 0x80 // 只有PTS
|
||||
pesPacket.Header.PesHeaderDataLength = 5
|
||||
pesPacket.Header.Pts = uint64(sample.Timestamp)
|
||||
|
||||
pesPacket.Buffers = append(pesPacket.Buffers, sample.Data)
|
||||
|
||||
// 写入TS包
|
||||
err := w.WritePESPacket(&audioFrame, pesPacket)
|
||||
if err != nil {
|
||||
w.plugin.Error("Failed to write audio PES packet", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processTsFiles 处理原生TS文件拼接
|
||||
func (plugin *HLSPlugin) processTsFiles(w http.ResponseWriter, r *http.Request, fileInfoList []*fileInfo, params *requestParams) {
|
||||
plugin.Info("Processing TS files", "count", len(fileInfoList))
|
||||
|
||||
// 设置HTTP响应头
|
||||
w.Header().Set("Content-Type", "video/mp2t")
|
||||
w.Header().Set("Content-Disposition", "attachment")
|
||||
|
||||
var writer io.Writer = w
|
||||
var totalSize uint64
|
||||
|
||||
// 第一次遍历:计算总大小
|
||||
for _, info := range fileInfoList {
|
||||
if r.Context().Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
fileInfo, err := os.Stat(info.filePath)
|
||||
if err != nil {
|
||||
plugin.Error("Failed to stat file", "path", info.filePath, "err", err)
|
||||
continue
|
||||
}
|
||||
totalSize += uint64(fileInfo.Size())
|
||||
}
|
||||
|
||||
// 设置内容长度
|
||||
w.Header().Set("Content-Length", strconv.FormatUint(totalSize, 10))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
// 第二次遍历:写入数据
|
||||
for i, info := range fileInfoList {
|
||||
if r.Context().Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
plugin.Debug("Processing TS file", "path", info.filePath)
|
||||
file, err := os.Open(info.filePath)
|
||||
if err != nil {
|
||||
plugin.Error("Failed to open file", "path", info.filePath, "err", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reader := bufio.NewReader(file)
|
||||
|
||||
if i == 0 {
|
||||
// 第一个文件,直接拷贝
|
||||
_, err = io.Copy(writer, reader)
|
||||
} else {
|
||||
// 后续文件,跳过PAT/PMT包,只拷贝媒体数据
|
||||
err = plugin.copyTsFileSkipHeaders(writer, reader)
|
||||
}
|
||||
|
||||
file.Close()
|
||||
|
||||
if err != nil {
|
||||
plugin.Error("Failed to copy file", "path", info.filePath, "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
plugin.Info("TS download completed")
|
||||
}
|
||||
|
||||
// copyTsFileSkipHeaders 拷贝TS文件,跳过PAT/PMT包
|
||||
func (plugin *HLSPlugin) copyTsFileSkipHeaders(writer io.Writer, reader *bufio.Reader) error {
|
||||
buffer := make([]byte, mpegts.TS_PACKET_SIZE)
|
||||
|
||||
for {
|
||||
n, err := io.ReadFull(reader, buffer)
|
||||
if err != nil {
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if n != mpegts.TS_PACKET_SIZE {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查同步字节
|
||||
if buffer[0] != 0x47 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 提取PID
|
||||
pid := uint16(buffer[1]&0x1f)<<8 | uint16(buffer[2])
|
||||
|
||||
// 跳过PAT(PID=0)和PMT(PID=256)包
|
||||
if pid == mpegts.PID_PAT || pid == mpegts.PID_PMT {
|
||||
continue
|
||||
}
|
||||
|
||||
// 写入媒体数据包
|
||||
_, err = writer.Write(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// download 下载处理函数
|
||||
func (plugin *HLSPlugin) download(w http.ResponseWriter, r *http.Request) {
|
||||
// 解析请求参数
|
||||
params, err := plugin.parseRequestParams(r)
|
||||
if err != nil {
|
||||
plugin.Error("Failed to parse request params", "err", err)
|
||||
http.Error(w, "Invalid parameters", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
plugin.Info("TS download request", "streamPath", params.streamPath, "timeRange", params.timeRange)
|
||||
|
||||
// 查询录像记录
|
||||
recordStreams, err := plugin.queryRecordStreams(params)
|
||||
if err != nil {
|
||||
plugin.Error("Failed to query record streams", "err", err)
|
||||
http.Error(w, "Database error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(recordStreams) == 0 {
|
||||
plugin.Warn("No records found", "streamPath", params.streamPath)
|
||||
http.Error(w, "No records found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// 构建文件信息列表
|
||||
fileInfoList, found := plugin.buildFileInfoList(recordStreams, params.startTime, params.endTime)
|
||||
if !found {
|
||||
plugin.Warn("No valid files found", "streamPath", params.streamPath)
|
||||
http.Error(w, "No valid files found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查文件类型并处理
|
||||
if plugin.hasOnlyMp4Records(fileInfoList) {
|
||||
// 只有MP4记录,转换为TS
|
||||
mp4Files := plugin.filterMp4Files(fileInfoList)
|
||||
plugin.processMp4ToTs(w, r, mp4Files, params)
|
||||
} else {
|
||||
// 有TS记录,优先使用TS文件
|
||||
tsFiles := plugin.filterTsFiles(fileInfoList)
|
||||
if len(tsFiles) > 0 {
|
||||
plugin.processTsFiles(w, r, tsFiles, params)
|
||||
} else {
|
||||
// 没有TS文件,使用MP4转换
|
||||
mp4Files := plugin.filterMp4Files(fileInfoList)
|
||||
plugin.processMp4ToTs(w, r, mp4Files, params)
|
||||
}
|
||||
}
|
||||
}
|
@@ -59,6 +59,7 @@ func (p *HLSPlugin) OnInit() (err error) {
|
||||
func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc {
|
||||
return map[string]http.HandlerFunc{
|
||||
"/vod/{streamPath...}": p.vod,
|
||||
"/download/{streamPath...}": p.download,
|
||||
"/api/record/start/{streamPath...}": p.API_record_start,
|
||||
"/api/record/stop/{id}": p.API_record_stop,
|
||||
}
|
||||
|
1209
plugin/mp4/api_extract.go
Normal file
1209
plugin/mp4/api_extract.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -76,7 +76,11 @@ var _ = m7s.InstallPlugin[MP4Plugin](m7s.PluginMeta{
|
||||
|
||||
func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc {
|
||||
return map[string]http.HandlerFunc{
|
||||
"/download/{streamPath...}": p.download,
|
||||
"/download/{streamPath...}": p.download,
|
||||
"/extractClip/{streamPath...}": p.extractClipToFileHandel,
|
||||
"/extractCompressed/{streamPath...}": p.extractCompressedVideoHandel,
|
||||
"/extractGop/{streamPath...}": p.extractGopVideoHandel,
|
||||
"/snap/{streamPath...}": p.snapHandel,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -54,8 +54,16 @@ func (t *TrakBox) Unmarshal(buf []byte) (b IBox, err error) {
|
||||
return t, err
|
||||
}
|
||||
|
||||
// SampleCallback 定义样本处理回调函数类型
|
||||
type SampleCallback func(sample *Sample, sampleIndex int) error
|
||||
|
||||
// ParseSamples parses the sample table and builds the sample list
|
||||
func (t *TrakBox) ParseSamples() (samplelist []Sample) {
|
||||
return t.ParseSamplesWithCallback(nil)
|
||||
}
|
||||
|
||||
// ParseSamplesWithCallback parses the sample table and builds the sample list with optional callback
|
||||
func (t *TrakBox) ParseSamplesWithCallback(callback SampleCallback) (samplelist []Sample) {
|
||||
stbl := t.MDIA.MINF.STBL
|
||||
var chunkOffsets []uint64
|
||||
if stbl.STCO != nil {
|
||||
@@ -150,6 +158,17 @@ func (t *TrakBox) ParseSamples() (samplelist []Sample) {
|
||||
}
|
||||
}
|
||||
|
||||
// 调用回调函数处理每个样本
|
||||
if callback != nil {
|
||||
for i := range samplelist {
|
||||
if err := callback(&samplelist[i], i); err != nil {
|
||||
// 如果回调返回错误,可以选择记录或处理,但不中断解析
|
||||
// 这里为了保持向后兼容性,我们继续处理
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return samplelist
|
||||
}
|
||||
|
||||
|
@@ -6,8 +6,10 @@ import (
|
||||
"slices"
|
||||
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/codec"
|
||||
"m7s.live/v5/pkg/util"
|
||||
"m7s.live/v5/plugin/mp4/pkg/box"
|
||||
. "m7s.live/v5/plugin/mp4/pkg/box"
|
||||
rtmp "m7s.live/v5/plugin/rtmp/pkg"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -30,7 +32,7 @@ type (
|
||||
Number uint32
|
||||
CryptByteBlock uint8
|
||||
SkipByteBlock uint8
|
||||
PsshBoxes []*PsshBox
|
||||
PsshBoxes []*box.PsshBox
|
||||
}
|
||||
SubSamplePattern struct {
|
||||
BytesClear uint16
|
||||
@@ -43,16 +45,28 @@ type (
|
||||
chunkoffset uint64
|
||||
}
|
||||
|
||||
RTMPFrame struct {
|
||||
Frame any // 可以是 *rtmp.RTMPVideo 或 *rtmp.RTMPAudio
|
||||
}
|
||||
|
||||
Demuxer struct {
|
||||
reader io.ReadSeeker
|
||||
Tracks []*Track
|
||||
ReadSampleIdx []uint32
|
||||
IsFragment bool
|
||||
// pssh []*PsshBox
|
||||
moov *MoovBox
|
||||
mdat *MediaDataBox
|
||||
// pssh []*box.PsshBox
|
||||
moov *box.MoovBox
|
||||
mdat *box.MediaDataBox
|
||||
mdatOffset uint64
|
||||
QuicTime bool
|
||||
|
||||
// 预生成的 RTMP 帧
|
||||
RTMPVideoSequence *rtmp.RTMPVideo
|
||||
RTMPAudioSequence *rtmp.RTMPAudio
|
||||
RTMPFrames []RTMPFrame
|
||||
|
||||
// RTMP 帧生成配置
|
||||
RTMPAllocator *util.ScalableMemoryAllocator
|
||||
}
|
||||
)
|
||||
|
||||
@@ -63,6 +77,10 @@ func NewDemuxer(r io.ReadSeeker) *Demuxer {
|
||||
}
|
||||
|
||||
func (d *Demuxer) Demux() (err error) {
|
||||
return d.DemuxWithAllocator(nil)
|
||||
}
|
||||
|
||||
func (d *Demuxer) DemuxWithAllocator(allocator *util.ScalableMemoryAllocator) (err error) {
|
||||
|
||||
// decodeVisualSampleEntry := func() (offset int, err error) {
|
||||
// var encv VisualSampleEntry
|
||||
@@ -96,7 +114,7 @@ func (d *Demuxer) Demux() (err error) {
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
var b IBox
|
||||
var b box.IBox
|
||||
var offset uint64
|
||||
for {
|
||||
b, err = box.ReadFrom(d.reader)
|
||||
@@ -107,53 +125,59 @@ func (d *Demuxer) Demux() (err error) {
|
||||
return err
|
||||
}
|
||||
offset += b.Size()
|
||||
switch box := b.(type) {
|
||||
case *FileTypeBox:
|
||||
if slices.Contains(box.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) {
|
||||
switch boxData := b.(type) {
|
||||
case *box.FileTypeBox:
|
||||
if slices.Contains(boxData.CompatibleBrands, [4]byte{'q', 't', ' ', ' '}) {
|
||||
d.QuicTime = true
|
||||
}
|
||||
case *FreeBox:
|
||||
case *MediaDataBox:
|
||||
d.mdat = box
|
||||
d.mdatOffset = offset - b.Size() + uint64(box.HeaderSize())
|
||||
case *MoovBox:
|
||||
if box.MVEX != nil {
|
||||
case *box.FreeBox:
|
||||
case *box.MediaDataBox:
|
||||
d.mdat = boxData
|
||||
d.mdatOffset = offset - b.Size() + uint64(boxData.HeaderSize())
|
||||
case *box.MoovBox:
|
||||
if boxData.MVEX != nil {
|
||||
d.IsFragment = true
|
||||
}
|
||||
for _, trak := range box.Tracks {
|
||||
for _, trak := range boxData.Tracks {
|
||||
track := &Track{}
|
||||
track.TrackId = trak.TKHD.TrackID
|
||||
track.Duration = uint32(trak.TKHD.Duration)
|
||||
track.Timescale = trak.MDIA.MDHD.Timescale
|
||||
track.Samplelist = trak.ParseSamples()
|
||||
// 创建RTMP样本处理回调
|
||||
var sampleCallback box.SampleCallback
|
||||
if d.RTMPAllocator != nil {
|
||||
sampleCallback = d.createRTMPSampleCallback(track, trak)
|
||||
}
|
||||
|
||||
track.Samplelist = trak.ParseSamplesWithCallback(sampleCallback)
|
||||
if len(trak.MDIA.MINF.STBL.STSD.Entries) > 0 {
|
||||
entryBox := trak.MDIA.MINF.STBL.STSD.Entries[0]
|
||||
switch entry := entryBox.(type) {
|
||||
case *AudioSampleEntry:
|
||||
case *box.AudioSampleEntry:
|
||||
switch entry.Type() {
|
||||
case TypeMP4A:
|
||||
track.Cid = MP4_CODEC_AAC
|
||||
case TypeALAW:
|
||||
track.Cid = MP4_CODEC_G711A
|
||||
case TypeULAW:
|
||||
track.Cid = MP4_CODEC_G711U
|
||||
case TypeOPUS:
|
||||
track.Cid = MP4_CODEC_OPUS
|
||||
case box.TypeMP4A:
|
||||
track.Cid = box.MP4_CODEC_AAC
|
||||
case box.TypeALAW:
|
||||
track.Cid = box.MP4_CODEC_G711A
|
||||
case box.TypeULAW:
|
||||
track.Cid = box.MP4_CODEC_G711U
|
||||
case box.TypeOPUS:
|
||||
track.Cid = box.MP4_CODEC_OPUS
|
||||
}
|
||||
track.SampleRate = entry.Samplerate
|
||||
track.ChannelCount = uint8(entry.ChannelCount)
|
||||
track.SampleSize = entry.SampleSize
|
||||
switch extra := entry.ExtraData.(type) {
|
||||
case *ESDSBox:
|
||||
track.Cid, track.ExtraData = DecodeESDescriptor(extra.Data)
|
||||
case *box.ESDSBox:
|
||||
track.Cid, track.ExtraData = box.DecodeESDescriptor(extra.Data)
|
||||
}
|
||||
case *VisualSampleEntry:
|
||||
track.ExtraData = entry.ExtraData.(*DataBox).Data
|
||||
case *box.VisualSampleEntry:
|
||||
track.ExtraData = entry.ExtraData.(*box.DataBox).Data
|
||||
switch entry.Type() {
|
||||
case TypeAVC1:
|
||||
track.Cid = MP4_CODEC_H264
|
||||
case TypeHVC1, TypeHEV1:
|
||||
track.Cid = MP4_CODEC_H265
|
||||
case box.TypeAVC1:
|
||||
track.Cid = box.MP4_CODEC_H264
|
||||
case box.TypeHVC1, box.TypeHEV1:
|
||||
track.Cid = box.MP4_CODEC_H265
|
||||
}
|
||||
track.Width = uint32(entry.Width)
|
||||
track.Height = uint32(entry.Height)
|
||||
@@ -161,9 +185,9 @@ func (d *Demuxer) Demux() (err error) {
|
||||
}
|
||||
d.Tracks = append(d.Tracks, track)
|
||||
}
|
||||
d.moov = box
|
||||
case *MovieFragmentBox:
|
||||
for _, traf := range box.TRAFs {
|
||||
d.moov = boxData
|
||||
case *box.MovieFragmentBox:
|
||||
for _, traf := range boxData.TRAFs {
|
||||
track := d.Tracks[traf.TFHD.TrackID-1]
|
||||
track.defaultSize = traf.TFHD.DefaultSampleSize
|
||||
track.defaultDuration = traf.TFHD.DefaultSampleDuration
|
||||
@@ -171,6 +195,7 @@ func (d *Demuxer) Demux() (err error) {
|
||||
}
|
||||
}
|
||||
d.ReadSampleIdx = make([]uint32, len(d.Tracks))
|
||||
|
||||
// for _, track := range d.Tracks {
|
||||
// if len(track.Samplelist) > 0 {
|
||||
// track.StartDts = uint64(track.Samplelist[0].DTS) * 1000 / uint64(track.Timescale)
|
||||
@@ -180,7 +205,7 @@ func (d *Demuxer) Demux() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
|
||||
func (d *Demuxer) SeekTime(dts uint64) (sample *box.Sample, err error) {
|
||||
var audioTrack, videoTrack *Track
|
||||
for _, track := range d.Tracks {
|
||||
if track.Cid.IsAudio() {
|
||||
@@ -218,6 +243,54 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 函数跳帧到dts 前面的第一个关键帧位置
|
||||
*
|
||||
* @param 参数名dts 跳帧位置
|
||||
*
|
||||
* @todo 待实现的功能或改进点 audioTrack 没有同步改进
|
||||
* @author erroot
|
||||
* @date 250614
|
||||
*
|
||||
**/
|
||||
func (d *Demuxer) SeekTimePreIDR(dts uint64) (sample *Sample, err error) {
|
||||
var audioTrack, videoTrack *Track
|
||||
for _, track := range d.Tracks {
|
||||
if track.Cid.IsAudio() {
|
||||
audioTrack = track
|
||||
} else if track.Cid.IsVideo() {
|
||||
videoTrack = track
|
||||
}
|
||||
}
|
||||
if videoTrack != nil {
|
||||
idx := videoTrack.SeekPreIDR(dts)
|
||||
if idx == -1 {
|
||||
return nil, errors.New("seek failed")
|
||||
}
|
||||
d.ReadSampleIdx[videoTrack.TrackId-1] = uint32(idx)
|
||||
sample = &videoTrack.Samplelist[idx]
|
||||
if audioTrack != nil {
|
||||
for i, sample := range audioTrack.Samplelist {
|
||||
if sample.Offset < int64(videoTrack.Samplelist[idx].Offset) {
|
||||
continue
|
||||
}
|
||||
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if audioTrack != nil {
|
||||
idx := audioTrack.Seek(dts)
|
||||
if idx == -1 {
|
||||
return nil, errors.New("seek failed")
|
||||
}
|
||||
d.ReadSampleIdx[audioTrack.TrackId-1] = uint32(idx)
|
||||
sample = &audioTrack.Samplelist[idx]
|
||||
} else {
|
||||
return nil, pkg.ErrNoTrack
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// func (d *Demuxer) decodeTRUN(trun *TrackRunBox) {
|
||||
// dataOffset := trun.Dataoffset
|
||||
// nextDts := d.currentTrack.StartDts
|
||||
@@ -377,10 +450,10 @@ func (d *Demuxer) SeekTime(dts uint64) (sample *Sample, err error) {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) {
|
||||
func (d *Demuxer) ReadSample(yield func(*Track, box.Sample) bool) {
|
||||
for {
|
||||
maxdts := int64(-1)
|
||||
minTsSample := Sample{Timestamp: uint32(maxdts)}
|
||||
minTsSample := box.Sample{Timestamp: uint32(maxdts)}
|
||||
var whichTrack *Track
|
||||
whichTracki := 0
|
||||
for i, track := range d.Tracks {
|
||||
@@ -414,9 +487,9 @@ func (d *Demuxer) ReadSample(yield func(*Track, Sample) bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
|
||||
func (d *Demuxer) RangeSample(yield func(*Track, *box.Sample) bool) {
|
||||
for {
|
||||
var minTsSample *Sample
|
||||
var minTsSample *box.Sample
|
||||
var whichTrack *Track
|
||||
whichTracki := 0
|
||||
for i, track := range d.Tracks {
|
||||
@@ -448,6 +521,244 @@ func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
|
||||
}
|
||||
|
||||
// GetMoovBox returns the Movie Box from the demuxer
|
||||
func (d *Demuxer) GetMoovBox() *MoovBox {
|
||||
func (d *Demuxer) GetMoovBox() *box.MoovBox {
|
||||
return d.moov
|
||||
}
|
||||
|
||||
// CreateRTMPSequenceFrame 创建 RTMP 序列帧
|
||||
func (d *Demuxer) CreateRTMPSequenceFrame(track *Track, allocator *util.ScalableMemoryAllocator) (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio, err error) {
|
||||
switch track.Cid {
|
||||
case box.MP4_CODEC_H264:
|
||||
videoSeq = &rtmp.RTMPVideo{}
|
||||
videoSeq.SetAllocator(allocator)
|
||||
videoSeq.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData)
|
||||
case box.MP4_CODEC_H265:
|
||||
videoSeq = &rtmp.RTMPVideo{}
|
||||
videoSeq.SetAllocator(allocator)
|
||||
videoSeq.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData)
|
||||
case box.MP4_CODEC_AAC:
|
||||
audioSeq = &rtmp.RTMPAudio{}
|
||||
audioSeq.SetAllocator(allocator)
|
||||
audioSeq.Append([]byte{0xaf, 0x00}, track.ExtraData)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ConvertSampleToRTMP 将 MP4 sample 转换为 RTMP 格式
|
||||
func (d *Demuxer) ConvertSampleToRTMP(track *Track, sample box.Sample, allocator *util.ScalableMemoryAllocator, timestampOffset uint64) (videoFrame *rtmp.RTMPVideo, audioFrame *rtmp.RTMPAudio, err error) {
|
||||
switch track.Cid {
|
||||
case box.MP4_CODEC_H264:
|
||||
videoFrame = &rtmp.RTMPVideo{}
|
||||
videoFrame.SetAllocator(allocator)
|
||||
videoFrame.CTS = sample.CTS
|
||||
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
|
||||
videoFrame.AddRecycleBytes(sample.Data)
|
||||
case box.MP4_CODEC_H265:
|
||||
videoFrame = &rtmp.RTMPVideo{}
|
||||
videoFrame.SetAllocator(allocator)
|
||||
videoFrame.CTS = uint32(sample.CTS)
|
||||
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
var head []byte
|
||||
var b0 byte = 0b1010_0000
|
||||
if sample.KeyFrame {
|
||||
b0 = 0b1001_0000
|
||||
}
|
||||
if videoFrame.CTS == 0 {
|
||||
head = videoFrame.NextN(5)
|
||||
head[0] = b0 | rtmp.PacketTypeCodedFramesX
|
||||
} else {
|
||||
head = videoFrame.NextN(8)
|
||||
head[0] = b0 | rtmp.PacketTypeCodedFrames
|
||||
util.PutBE(head[5:8], videoFrame.CTS) // cts
|
||||
}
|
||||
copy(head[1:], codec.FourCC_H265[:])
|
||||
videoFrame.AddRecycleBytes(sample.Data)
|
||||
case box.MP4_CODEC_AAC:
|
||||
audioFrame = &rtmp.RTMPAudio{}
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0xaf, 0x01})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
case box.MP4_CODEC_G711A:
|
||||
audioFrame = &rtmp.RTMPAudio{}
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0x72})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
case box.MP4_CODEC_G711U:
|
||||
audioFrame = &rtmp.RTMPAudio{}
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0x82})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetRTMPSequenceFrames 获取预生成的 RTMP 序列帧
|
||||
func (d *Demuxer) GetRTMPSequenceFrames() (videoSeq *rtmp.RTMPVideo, audioSeq *rtmp.RTMPAudio) {
|
||||
return d.RTMPVideoSequence, d.RTMPAudioSequence
|
||||
}
|
||||
|
||||
// IterateRTMPFrames 迭代预生成的 RTMP 帧
|
||||
func (d *Demuxer) IterateRTMPFrames(timestampOffset uint64, yield func(*RTMPFrame) bool) {
|
||||
for i := range d.RTMPFrames {
|
||||
frame := &d.RTMPFrames[i]
|
||||
|
||||
// 应用时间戳偏移
|
||||
switch f := frame.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
f.Timestamp += uint32(timestampOffset)
|
||||
case *rtmp.RTMPAudio:
|
||||
f.Timestamp += uint32(timestampOffset)
|
||||
}
|
||||
|
||||
if !yield(frame) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetMaxTimestamp 获取所有帧中的最大时间戳
|
||||
func (d *Demuxer) GetMaxTimestamp() uint64 {
|
||||
var maxTimestamp uint64
|
||||
for _, frame := range d.RTMPFrames {
|
||||
var timestamp uint64
|
||||
switch f := frame.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
timestamp = uint64(f.Timestamp)
|
||||
case *rtmp.RTMPAudio:
|
||||
timestamp = uint64(f.Timestamp)
|
||||
}
|
||||
if timestamp > maxTimestamp {
|
||||
maxTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
return maxTimestamp
|
||||
}
|
||||
|
||||
// generateRTMPFrames 生成RTMP序列帧和所有帧数据
|
||||
func (d *Demuxer) generateRTMPFrames(allocator *util.ScalableMemoryAllocator) (err error) {
|
||||
// 生成序列帧
|
||||
for _, track := range d.Tracks {
|
||||
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
|
||||
d.RTMPVideoSequence, _, err = d.CreateRTMPSequenceFrame(track, allocator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
|
||||
_, d.RTMPAudioSequence, err = d.CreateRTMPSequenceFrame(track, allocator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 预生成所有 RTMP 帧
|
||||
d.RTMPFrames = make([]RTMPFrame, 0)
|
||||
|
||||
// 收集所有样本并按时间戳排序
|
||||
type sampleInfo struct {
|
||||
track *Track
|
||||
sample box.Sample
|
||||
sampleIndex uint32
|
||||
trackIndex int
|
||||
}
|
||||
|
||||
var allSamples []sampleInfo
|
||||
for trackIdx, track := range d.Tracks {
|
||||
for sampleIdx, sample := range track.Samplelist {
|
||||
// 读取样本数据
|
||||
if _, err = d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
sample.Data = allocator.Malloc(sample.Size)
|
||||
if _, err = io.ReadFull(d.reader, sample.Data); err != nil {
|
||||
allocator.Free(sample.Data)
|
||||
return err
|
||||
}
|
||||
|
||||
allSamples = append(allSamples, sampleInfo{
|
||||
track: track,
|
||||
sample: sample,
|
||||
sampleIndex: uint32(sampleIdx),
|
||||
trackIndex: trackIdx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 按时间戳排序样本
|
||||
slices.SortFunc(allSamples, func(a, b sampleInfo) int {
|
||||
timeA := uint64(a.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(a.track.Timescale)
|
||||
timeB := uint64(b.sample.Timestamp) * uint64(d.moov.MVHD.Timescale) / uint64(b.track.Timescale)
|
||||
if timeA < timeB {
|
||||
return -1
|
||||
} else if timeA > timeB {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// 预生成 RTMP 帧
|
||||
for _, sampleInfo := range allSamples {
|
||||
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(sampleInfo.track, sampleInfo.sample, allocator, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if videoFrame != nil {
|
||||
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
|
||||
}
|
||||
|
||||
if audioFrame != nil {
|
||||
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createRTMPSampleCallback 创建RTMP样本处理回调函数
|
||||
func (d *Demuxer) createRTMPSampleCallback(track *Track, trak *box.TrakBox) box.SampleCallback {
|
||||
// 首先生成序列帧
|
||||
if track.Cid.IsVideo() && d.RTMPVideoSequence == nil {
|
||||
videoSeq, _, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
|
||||
if err == nil {
|
||||
d.RTMPVideoSequence = videoSeq
|
||||
}
|
||||
} else if track.Cid.IsAudio() && d.RTMPAudioSequence == nil {
|
||||
_, audioSeq, err := d.CreateRTMPSequenceFrame(track, d.RTMPAllocator)
|
||||
if err == nil {
|
||||
d.RTMPAudioSequence = audioSeq
|
||||
}
|
||||
}
|
||||
|
||||
return func(sample *box.Sample, sampleIndex int) error {
|
||||
// 读取样本数据
|
||||
if _, err := d.reader.Seek(sample.Offset, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
sample.Data = d.RTMPAllocator.Malloc(sample.Size)
|
||||
if _, err := io.ReadFull(d.reader, sample.Data); err != nil {
|
||||
d.RTMPAllocator.Free(sample.Data)
|
||||
return err
|
||||
}
|
||||
|
||||
// 转换为 RTMP 格式
|
||||
videoFrame, audioFrame, err := d.ConvertSampleToRTMP(track, *sample, d.RTMPAllocator, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 内部收集RTMP帧
|
||||
if videoFrame != nil {
|
||||
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: videoFrame})
|
||||
}
|
||||
if audioFrame != nil {
|
||||
d.RTMPFrames = append(d.RTMPFrames, RTMPFrame{Frame: audioFrame})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@@ -3,13 +3,12 @@ package mp4
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
m7s "m7s.live/v5"
|
||||
"m7s.live/v5/pkg/codec"
|
||||
"m7s.live/v5/pkg/util"
|
||||
"m7s.live/v5/plugin/mp4/pkg/box"
|
||||
rtmp "m7s.live/v5/plugin/rtmp/pkg"
|
||||
)
|
||||
|
||||
@@ -35,9 +34,40 @@ func (p *HTTPReader) Run() (err error) {
|
||||
content, err = io.ReadAll(p.ReadCloser)
|
||||
demuxer = NewDemuxer(strings.NewReader(string(content)))
|
||||
}
|
||||
if err = demuxer.Demux(); err != nil {
|
||||
|
||||
// 设置RTMP分配器以启用RTMP帧收集
|
||||
demuxer.RTMPAllocator = allocator
|
||||
|
||||
if err = demuxer.DemuxWithAllocator(allocator); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取demuxer内部收集的RTMP帧
|
||||
rtmpFrames := demuxer.RTMPFrames
|
||||
|
||||
// 按时间戳排序所有帧
|
||||
slices.SortFunc(rtmpFrames, func(a, b RTMPFrame) int {
|
||||
var timeA, timeB uint64
|
||||
switch f := a.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
timeA = uint64(f.Timestamp)
|
||||
case *rtmp.RTMPAudio:
|
||||
timeA = uint64(f.Timestamp)
|
||||
}
|
||||
switch f := b.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
timeB = uint64(f.Timestamp)
|
||||
case *rtmp.RTMPAudio:
|
||||
timeB = uint64(f.Timestamp)
|
||||
}
|
||||
if timeA < timeB {
|
||||
return -1
|
||||
} else if timeA > timeB {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
publisher.OnSeek = func(seekTime time.Time) {
|
||||
p.Stop(errors.New("seek"))
|
||||
pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
|
||||
@@ -48,103 +78,61 @@ func (p *HTTPReader) Run() (err error) {
|
||||
seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey))
|
||||
demuxer.SeekTime(uint64(seekTime.UnixMilli()))
|
||||
}
|
||||
for _, track := range demuxer.Tracks {
|
||||
switch track.Cid {
|
||||
case box.MP4_CODEC_H264:
|
||||
var sequence rtmp.RTMPVideo
|
||||
sequence.SetAllocator(allocator)
|
||||
sequence.Append([]byte{0x17, 0x00, 0x00, 0x00, 0x00}, track.ExtraData)
|
||||
err = publisher.WriteVideo(&sequence)
|
||||
case box.MP4_CODEC_H265:
|
||||
var sequence rtmp.RTMPVideo
|
||||
sequence.SetAllocator(allocator)
|
||||
sequence.Append([]byte{0b1001_0000 | rtmp.PacketTypeSequenceStart}, codec.FourCC_H265[:], track.ExtraData)
|
||||
err = publisher.WriteVideo(&sequence)
|
||||
case box.MP4_CODEC_AAC:
|
||||
var sequence rtmp.RTMPAudio
|
||||
sequence.SetAllocator(allocator)
|
||||
sequence.Append([]byte{0xaf, 0x00}, track.ExtraData)
|
||||
err = publisher.WriteAudio(&sequence)
|
||||
|
||||
// 读取预生成的 RTMP 序列帧
|
||||
videoSeq, audioSeq := demuxer.GetRTMPSequenceFrames()
|
||||
if videoSeq != nil {
|
||||
err = publisher.WriteVideo(videoSeq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if audioSeq != nil {
|
||||
err = publisher.WriteAudio(audioSeq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 计算最大时间戳用于累计偏移
|
||||
var maxTimestamp uint64
|
||||
for track, sample := range demuxer.ReadSample {
|
||||
timestamp := uint64(sample.Timestamp) * 1000 / uint64(track.Timescale)
|
||||
for _, frame := range rtmpFrames {
|
||||
var timestamp uint64
|
||||
switch f := frame.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
timestamp = uint64(f.Timestamp)
|
||||
case *rtmp.RTMPAudio:
|
||||
timestamp = uint64(f.Timestamp)
|
||||
}
|
||||
if timestamp > maxTimestamp {
|
||||
maxTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
var timestampOffset uint64
|
||||
loop := p.PullJob.Loop
|
||||
for {
|
||||
demuxer.ReadSampleIdx = make([]uint32, len(demuxer.Tracks))
|
||||
for track, sample := range demuxer.ReadSample {
|
||||
// 使用预生成的 RTMP 帧进行播放
|
||||
for _, frame := range rtmpFrames {
|
||||
if p.IsStopped() {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if _, err = demuxer.reader.Seek(sample.Offset, io.SeekStart); err != nil {
|
||||
return
|
||||
|
||||
// 应用时间戳偏移
|
||||
switch f := frame.Frame.(type) {
|
||||
case *rtmp.RTMPVideo:
|
||||
f.Timestamp += uint32(timestampOffset)
|
||||
err = publisher.WriteVideo(f)
|
||||
case *rtmp.RTMPAudio:
|
||||
f.Timestamp += uint32(timestampOffset)
|
||||
err = publisher.WriteAudio(f)
|
||||
}
|
||||
sample.Data = allocator.Malloc(sample.Size)
|
||||
if _, err = io.ReadFull(demuxer.reader, sample.Data); err != nil {
|
||||
allocator.Free(sample.Data)
|
||||
return
|
||||
}
|
||||
switch track.Cid {
|
||||
case box.MP4_CODEC_H264:
|
||||
var videoFrame rtmp.RTMPVideo
|
||||
videoFrame.SetAllocator(allocator)
|
||||
videoFrame.CTS = sample.CTS
|
||||
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
videoFrame.AppendOne([]byte{util.Conditional[byte](sample.KeyFrame, 0x17, 0x27), 0x01, byte(videoFrame.CTS >> 24), byte(videoFrame.CTS >> 8), byte(videoFrame.CTS)})
|
||||
videoFrame.AddRecycleBytes(sample.Data)
|
||||
err = publisher.WriteVideo(&videoFrame)
|
||||
case box.MP4_CODEC_H265:
|
||||
var videoFrame rtmp.RTMPVideo
|
||||
videoFrame.SetAllocator(allocator)
|
||||
videoFrame.CTS = uint32(sample.CTS)
|
||||
videoFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
var head []byte
|
||||
var b0 byte = 0b1010_0000
|
||||
if sample.KeyFrame {
|
||||
b0 = 0b1001_0000
|
||||
}
|
||||
if videoFrame.CTS == 0 {
|
||||
head = videoFrame.NextN(5)
|
||||
head[0] = b0 | rtmp.PacketTypeCodedFramesX
|
||||
} else {
|
||||
head = videoFrame.NextN(8)
|
||||
head[0] = b0 | rtmp.PacketTypeCodedFrames
|
||||
util.PutBE(head[5:8], videoFrame.CTS) // cts
|
||||
}
|
||||
copy(head[1:], codec.FourCC_H265[:])
|
||||
videoFrame.AddRecycleBytes(sample.Data)
|
||||
err = publisher.WriteVideo(&videoFrame)
|
||||
case box.MP4_CODEC_AAC:
|
||||
var audioFrame rtmp.RTMPAudio
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0xaf, 0x01})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
err = publisher.WriteAudio(&audioFrame)
|
||||
case box.MP4_CODEC_G711A:
|
||||
var audioFrame rtmp.RTMPAudio
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0x72})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
err = publisher.WriteAudio(&audioFrame)
|
||||
case box.MP4_CODEC_G711U:
|
||||
var audioFrame rtmp.RTMPAudio
|
||||
audioFrame.SetAllocator(allocator)
|
||||
audioFrame.Timestamp = uint32(uint64(sample.Timestamp)*1000/uint64(track.Timescale) + timestampOffset)
|
||||
audioFrame.AppendOne([]byte{0x82})
|
||||
audioFrame.AddRecycleBytes(sample.Data)
|
||||
err = publisher.WriteAudio(&audioFrame)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if loop >= 0 {
|
||||
loop--
|
||||
if loop == -1 {
|
||||
|
@@ -102,6 +102,28 @@ func (track *Track) Seek(dts uint64) int {
|
||||
return -1
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 函数跳帧到dts 前面的第一个关键帧位置
|
||||
*
|
||||
* @param 参数名dts 跳帧位置
|
||||
*
|
||||
* @author erroot
|
||||
* @date 250614
|
||||
*
|
||||
**/
|
||||
func (track *Track) SeekPreIDR(dts uint64) int {
|
||||
idx := 0
|
||||
for i, sample := range track.Samplelist {
|
||||
if track.Cid.IsVideo() && sample.KeyFrame {
|
||||
idx = i
|
||||
}
|
||||
if sample.Timestamp*1000/uint32(track.Timescale) > uint32(dts) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
func (track *Track) makeEdtsBox() *ContainerBox {
|
||||
return CreateContainerBox(TypeEDTS, track.makeElstBox())
|
||||
}
|
||||
|
338
plugin/mp4/util.go
Normal file
338
plugin/mp4/util.go
Normal file
@@ -0,0 +1,338 @@
|
||||
package plugin_mp4
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"image"
|
||||
"image/color"
|
||||
"image/jpeg"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
|
||||
mp4 "m7s.live/v5/plugin/mp4/pkg"
|
||||
"m7s.live/v5/plugin/mp4/pkg/box"
|
||||
)
|
||||
|
||||
func saveAsJPG(img image.Image, path string) error {
|
||||
file, err := os.Create(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
opt := jpeg.Options{Quality: 90}
|
||||
return jpeg.Encode(file, img, &opt)
|
||||
}
|
||||
|
||||
func ExtractH264SPSPPS(extraData []byte) (sps, pps []byte, err error) {
|
||||
if len(extraData) < 7 {
|
||||
return nil, nil, fmt.Errorf("extradata too short")
|
||||
}
|
||||
|
||||
// 解析 SPS 数量 (第6字节低5位)
|
||||
spsCount := int(extraData[5] & 0x1F)
|
||||
offset := 6 // 当前解析位置
|
||||
|
||||
// 提取 SPS
|
||||
for i := 0; i < spsCount; i++ {
|
||||
if offset+2 > len(extraData) {
|
||||
return nil, nil, fmt.Errorf("invalid sps length")
|
||||
}
|
||||
spsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
|
||||
offset += 2
|
||||
if offset+spsLen > len(extraData) {
|
||||
return nil, nil, fmt.Errorf("sps data overflow")
|
||||
}
|
||||
sps = extraData[offset : offset+spsLen]
|
||||
offset += spsLen
|
||||
}
|
||||
|
||||
// 提取 PPS 数量
|
||||
if offset >= len(extraData) {
|
||||
return nil, nil, fmt.Errorf("missing pps count")
|
||||
}
|
||||
ppsCount := int(extraData[offset])
|
||||
offset++
|
||||
|
||||
// 提取 PPS
|
||||
for i := 0; i < ppsCount; i++ {
|
||||
if offset+2 > len(extraData) {
|
||||
return nil, nil, fmt.Errorf("invalid pps length")
|
||||
}
|
||||
ppsLen := int(binary.BigEndian.Uint16(extraData[offset : offset+2]))
|
||||
offset += 2
|
||||
if offset+ppsLen > len(extraData) {
|
||||
return nil, nil, fmt.Errorf("pps data overflow")
|
||||
}
|
||||
pps = extraData[offset : offset+ppsLen]
|
||||
offset += ppsLen
|
||||
}
|
||||
return sps, pps, nil
|
||||
}
|
||||
|
||||
// 转换函数(支持动态插入参数集)
|
||||
func ConvertAVCCH264ToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
pos := 0
|
||||
|
||||
for pos < len(data) {
|
||||
if pos+4 > len(data) {
|
||||
break
|
||||
}
|
||||
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
|
||||
pos += 4
|
||||
nalStart := pos
|
||||
pos += int(nalSize)
|
||||
if pos > len(data) {
|
||||
break
|
||||
}
|
||||
nalu := data[nalStart:pos]
|
||||
nalType := nalu[0] & 0x1F
|
||||
|
||||
// 关键帧前插入SPS/PPS(仅需执行一次)
|
||||
if *isFirst && nalType == 5 {
|
||||
sps, pps, err := ExtractH264SPSPPS(extraData)
|
||||
if err != nil {
|
||||
//panic(err)
|
||||
return nil, err
|
||||
}
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
buf.Write(sps)
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
buf.Write(pps)
|
||||
//buf.Write(videoTrack.ExtraData)
|
||||
*isFirst = false // 仅首帧插入
|
||||
}
|
||||
|
||||
// 保留SEI单元(类型6)和所有其他单元
|
||||
if nalType == 5 || nalType == 6 { // IDR/SEI用4字节起始码
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
} else {
|
||||
buf.Write([]byte{0x00, 0x00, 0x01}) // 其他用3字节
|
||||
}
|
||||
buf.Write(nalu)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
/*
|
||||
H.264与H.265的AVCC格式差异
|
||||
VPS引入:H.265新增视频参数集(VPS),用于描述多层编码、时序等信息
|
||||
*/
|
||||
// 提取H.265的VPS/SPS/PPS(HEVCDecoderConfigurationRecord格式)
|
||||
func ExtractHEVCParams(extraData []byte) (vps, sps, pps []byte, err error) {
|
||||
if len(extraData) < 22 {
|
||||
return nil, nil, nil, errors.New("extra data too short")
|
||||
}
|
||||
|
||||
// HEVC的extradata格式参考ISO/IEC 14496-15
|
||||
offset := 22 // 跳过头部22字节
|
||||
if offset+2 > len(extraData) {
|
||||
return nil, nil, nil, errors.New("invalid extra data")
|
||||
}
|
||||
|
||||
numOfArrays := int(extraData[offset])
|
||||
offset++
|
||||
|
||||
for i := 0; i < numOfArrays; i++ {
|
||||
if offset+3 > len(extraData) {
|
||||
break
|
||||
}
|
||||
|
||||
naluType := extraData[offset] & 0x3F
|
||||
offset++
|
||||
count := int(binary.BigEndian.Uint16(extraData[offset:]))
|
||||
offset += 2
|
||||
|
||||
for j := 0; j < count; j++ {
|
||||
if offset+2 > len(extraData) {
|
||||
break
|
||||
}
|
||||
|
||||
naluSize := int(binary.BigEndian.Uint16(extraData[offset:]))
|
||||
offset += 2
|
||||
|
||||
if offset+naluSize > len(extraData) {
|
||||
break
|
||||
}
|
||||
|
||||
naluData := extraData[offset : offset+naluSize]
|
||||
offset += naluSize
|
||||
|
||||
// 根据类型存储参数集
|
||||
switch naluType {
|
||||
case 32: // VPS
|
||||
if vps == nil {
|
||||
vps = make([]byte, len(naluData))
|
||||
copy(vps, naluData)
|
||||
}
|
||||
case 33: // SPS
|
||||
if sps == nil {
|
||||
sps = make([]byte, len(naluData))
|
||||
copy(sps, naluData)
|
||||
}
|
||||
case 34: // PPS
|
||||
if pps == nil {
|
||||
pps = make([]byte, len(naluData))
|
||||
copy(pps, naluData)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if vps == nil || sps == nil || pps == nil {
|
||||
return nil, nil, nil, errors.New("missing required parameter sets")
|
||||
}
|
||||
|
||||
return vps, sps, pps, nil
|
||||
}
|
||||
|
||||
// H.265的AVCC转Annex B
|
||||
func ConvertAVCCHEVCToAnnexB(data []byte, extraData []byte, isFirst *bool) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
pos := 0
|
||||
|
||||
// 首帧插入VPS/SPS/PPS
|
||||
if *isFirst {
|
||||
vps, sps, pps, err := ExtractHEVCParams(extraData)
|
||||
if err == nil {
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
buf.Write(vps)
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
buf.Write(sps)
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
buf.Write(pps)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 处理NALU
|
||||
for pos < len(data) {
|
||||
if pos+4 > len(data) {
|
||||
break
|
||||
}
|
||||
nalSize := binary.BigEndian.Uint32(data[pos : pos+4])
|
||||
pos += 4
|
||||
nalStart := pos
|
||||
pos += int(nalSize)
|
||||
if pos > len(data) {
|
||||
break
|
||||
}
|
||||
nalu := data[nalStart:pos]
|
||||
nalType := (nalu[0] >> 1) & 0x3F // H.265的NALU类型在头部的第2-7位
|
||||
|
||||
// 关键帧或参数集使用4字节起始码
|
||||
if nalType == 19 || nalType == 20 || nalType >= 32 && nalType <= 34 {
|
||||
buf.Write([]byte{0x00, 0x00, 0x00, 0x01})
|
||||
} else {
|
||||
buf.Write([]byte{0x00, 0x00, 0x01})
|
||||
}
|
||||
buf.Write(nalu)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// ffmpeg -hide_banner -i gop.mp4 -vf "select=eq(n\,15)" -vframes 1 -f image2 -pix_fmt bgr24 output.bmp
|
||||
func ProcessWithFFmpeg(samples []box.Sample, index int, videoTrack *mp4.Track) (image.Image, error) {
|
||||
// code := "h264"
|
||||
// if videoTrack.Cid == box.MP4_CODEC_H265 {
|
||||
// code = "hevc"
|
||||
// }
|
||||
cmd := exec.Command("ffmpeg",
|
||||
"-hide_banner",
|
||||
//"-f", code, //"h264" 强制指定输入格式为H.264裸流
|
||||
"-i", "pipe:0",
|
||||
"-vf", fmt.Sprintf("select=eq(n\\,%d)", index),
|
||||
"-vframes", "1",
|
||||
"-pix_fmt", "bgr24",
|
||||
"-f", "rawvideo",
|
||||
"pipe:1")
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
errOutput, _ := io.ReadAll(stderr)
|
||||
log.Printf("FFmpeg stderr: %s", errOutput)
|
||||
}()
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
log.Printf("cmd.Start失败: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer stdin.Close()
|
||||
isFirst := true
|
||||
for _, sample := range samples {
|
||||
|
||||
if videoTrack.Cid == box.MP4_CODEC_H264 {
|
||||
annexb, _ := ConvertAVCCH264ToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
|
||||
if _, err := stdin.Write(annexb); err != nil {
|
||||
log.Printf("写入失败: %v", err)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
annexb, _ := ConvertAVCCHEVCToAnnexB(sample.Data, videoTrack.ExtraData, &isFirst)
|
||||
if _, err := stdin.Write(annexb); err != nil {
|
||||
log.Printf("写入失败: %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 读取原始RGB数据
|
||||
var buf bytes.Buffer
|
||||
if _, err = io.Copy(&buf, stdout); err != nil {
|
||||
log.Printf("读取失败: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
if err = cmd.Wait(); err != nil {
|
||||
log.Printf("cmd.Wait失败: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//log.Printf("ffmpeg 提取成功: data size:%v", buf.Len())
|
||||
|
||||
// 转换为image.Image对象
|
||||
data := buf.Bytes()
|
||||
//width, height := parseBMPDimensions(data)
|
||||
|
||||
width := int(videoTrack.Width)
|
||||
height := int(videoTrack.Height)
|
||||
|
||||
log.Printf("ffmpeg size: %v,%v", width, height)
|
||||
|
||||
//FFmpeg的 rawvideo 输出默认采用从上到下的扫描方式
|
||||
|
||||
img := image.NewRGBA(image.Rect(0, 0, width, height))
|
||||
for y := 0; y < height; y++ {
|
||||
for x := 0; x < width; x++ {
|
||||
//pos := (height-y-1)*width*3 + x*3
|
||||
pos := (y*width + x) * 3 // 关键修复:按行顺序读取
|
||||
img.Set(x, y, color.RGBA{
|
||||
R: data[pos+2],
|
||||
G: data[pos+1],
|
||||
B: data[pos],
|
||||
A: 255,
|
||||
})
|
||||
}
|
||||
}
|
||||
return img, nil
|
||||
}
|
@@ -37,13 +37,12 @@ var (
|
||||
|
||||
type WebRTCPlugin struct {
|
||||
m7s.Plugin
|
||||
ICEServers []ICEServer `desc:"ice服务器配置"`
|
||||
Port string `default:"tcp:9000" desc:"监听端口"`
|
||||
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求
|
||||
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
|
||||
MimeType []string `desc:"MimeType过滤列表,为空则不过滤"` // MimeType过滤列表,支持的格式如:video/H264, audio/opus
|
||||
s SettingEngine
|
||||
portMapping map[int]int // 内部端口到外部端口的映射
|
||||
ICEServers []ICEServer `desc:"ice服务器配置"`
|
||||
Port string `default:"tcp:9000" desc:"监听端口"`
|
||||
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求
|
||||
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
|
||||
MimeType []string `desc:"MimeType过滤列表,为空则不过滤"` // MimeType过滤列表,支持的格式如:video/H264, audio/opus
|
||||
s SettingEngine
|
||||
}
|
||||
|
||||
func (p *WebRTCPlugin) RegisterHandler() map[string]http.HandlerFunc {
|
||||
@@ -307,90 +306,50 @@ func (p *WebRTCPlugin) initSettingEngine() error {
|
||||
|
||||
// configurePort 配置端口设置
|
||||
func (p *WebRTCPlugin) configurePort() error {
|
||||
// 使用 ParsePort 而不是 ParsePort2 来获取端口映射信息
|
||||
portInfo, err := ParsePort(p.Port)
|
||||
ports, err := ParsePort2(p.Port)
|
||||
if err != nil {
|
||||
p.Error("webrtc port config error", "error", err, "port", p.Port)
|
||||
return err
|
||||
}
|
||||
|
||||
// 初始化端口映射
|
||||
p.portMapping = make(map[int]int)
|
||||
|
||||
// 如果有端口映射,存储映射关系
|
||||
if portInfo.HasMapping() {
|
||||
if portInfo.IsRange() {
|
||||
// 端口范围映射
|
||||
for i := 0; i <= portInfo.Ports[1]-portInfo.Ports[0]; i++ {
|
||||
internalPort := portInfo.Ports[0] + i
|
||||
var externalPort int
|
||||
if portInfo.IsRangeMapping() {
|
||||
// 映射端口也是范围
|
||||
externalPort = portInfo.Map[0] + i
|
||||
} else {
|
||||
// 映射端口是单个端口
|
||||
externalPort = portInfo.Map[0]
|
||||
}
|
||||
p.portMapping[internalPort] = externalPort
|
||||
}
|
||||
} else {
|
||||
// 单端口映射
|
||||
p.portMapping[portInfo.Ports[0]] = portInfo.Map[0]
|
||||
switch v := ports.(type) {
|
||||
case TCPPort:
|
||||
tcpport := int(v)
|
||||
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: tcpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = tcpl.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener tcp", "error", err)
|
||||
}
|
||||
p.Info("Port mapping configured", "mapping", p.portMapping)
|
||||
}
|
||||
|
||||
// 根据协议类型进行配置
|
||||
if portInfo.IsTCP() {
|
||||
if portInfo.IsRange() {
|
||||
// TCP端口范围,这里可能需要特殊处理
|
||||
p.Error("TCP port range not supported in current implementation")
|
||||
return fmt.Errorf("TCP port range not supported")
|
||||
} else {
|
||||
// TCP单端口
|
||||
tcpport := portInfo.Ports[0]
|
||||
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: tcpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = tcpl.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener tcp", "error", err)
|
||||
return err
|
||||
}
|
||||
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
|
||||
p.Info("webrtc start listen", "port", tcpport)
|
||||
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
|
||||
p.s.DisableSRTPReplayProtection(true)
|
||||
}
|
||||
} else {
|
||||
// UDP配置
|
||||
if portInfo.IsRange() {
|
||||
// UDP端口范围
|
||||
p.s.SetEphemeralUDPPortRange(uint16(portInfo.Ports[0]), uint16(portInfo.Ports[1]))
|
||||
p.SetDescription("udp", fmt.Sprintf("%d-%d", portInfo.Ports[0], portInfo.Ports[1]))
|
||||
} else {
|
||||
// UDP单端口
|
||||
udpport := portInfo.Ports[0]
|
||||
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: udpport,
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = udpListener.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener udp", "error", err)
|
||||
return err
|
||||
}
|
||||
p.SetDescription("udp", fmt.Sprintf("%d", udpport))
|
||||
p.Info("webrtc start listen", "port", udpport)
|
||||
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
|
||||
p.SetDescription("tcp", fmt.Sprintf("%d", tcpport))
|
||||
p.Info("webrtc start listen", "port", tcpport)
|
||||
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
|
||||
p.s.DisableSRTPReplayProtection(true)
|
||||
case UDPRangePort:
|
||||
p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1]))
|
||||
p.SetDescription("udp", fmt.Sprintf("%d-%d", v[0], v[1]))
|
||||
case UDPPort:
|
||||
// 创建共享WEBRTC端口 默认9000
|
||||
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||
IP: net.IP{0, 0, 0, 0},
|
||||
Port: int(v),
|
||||
})
|
||||
p.OnDispose(func() {
|
||||
_ = udpListener.Close()
|
||||
})
|
||||
if err != nil {
|
||||
p.Error("webrtc listener udp", "error", err)
|
||||
return err
|
||||
}
|
||||
p.SetDescription("udp", fmt.Sprintf("%d", v))
|
||||
p.Info("webrtc start listen", "port", v)
|
||||
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
|
||||
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -409,33 +368,9 @@ func (p *WebRTCPlugin) CreatePC(sd SessionDescription, conf Configuration) (pc *
|
||||
return
|
||||
}
|
||||
pc, err = api.NewPeerConnection(conf)
|
||||
if err != nil {
|
||||
return
|
||||
if err == nil {
|
||||
err = pc.SetRemoteDescription(sd)
|
||||
}
|
||||
|
||||
// 如果有端口映射配置,记录 ICE 候选者信息以供调试
|
||||
if len(p.portMapping) > 0 {
|
||||
pc.OnICECandidate(func(candidate *ICECandidate) {
|
||||
if candidate != nil {
|
||||
// 记录端口映射信息(用于调试和监控)
|
||||
if mappedPort, exists := p.portMapping[int(candidate.Port)]; exists {
|
||||
p.Debug("ICE candidate with port mapping detected",
|
||||
"original_port", candidate.Port,
|
||||
"mapped_port", mappedPort,
|
||||
"candidate_address", candidate.Address,
|
||||
"candidate_type", candidate.Typ)
|
||||
candidate.Port = uint16(mappedPort) // 更新候选者端口为映射后的端口
|
||||
} else {
|
||||
p.Debug("ICE candidate generated",
|
||||
"port", candidate.Port,
|
||||
"address", candidate.Address,
|
||||
"type", candidate.Typ)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
err = pc.SetRemoteDescription(sd)
|
||||
return
|
||||
}
|
||||
|
||||
|
@@ -44,8 +44,8 @@ type (
|
||||
}
|
||||
RecordStream struct {
|
||||
ID uint `gorm:"primarykey"`
|
||||
StartTime time.Time `gorm:"type:datetime;default:NULL"`
|
||||
EndTime time.Time `gorm:"type:datetime;default:NULL"`
|
||||
StartTime time.Time `gorm:"default:NULL"`
|
||||
EndTime time.Time `gorm:"default:NULL"`
|
||||
Duration uint32 `gorm:"comment:录像时长;default:0"`
|
||||
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
|
||||
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
|
||||
|
202
scripts/packet_replayer.py
Normal file
202
scripts/packet_replayer.py
Normal file
@@ -0,0 +1,202 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
from scapy.all import rdpcap, IP, TCP, UDP, Raw, send, sr1, sr, PcapReader
|
||||
import sys
|
||||
import time
|
||||
from collections import defaultdict
|
||||
import random
|
||||
import threading
|
||||
import queue
|
||||
import socket
|
||||
|
||||
class PacketReplayer:
|
||||
def __init__(self, pcap_file, target_ip, target_port):
|
||||
self.pcap_file = pcap_file
|
||||
self.target_ip = target_ip
|
||||
self.target_port = target_port
|
||||
self.connections = defaultdict(list) # 存储每个连接的包序列
|
||||
self.response_queue = queue.Queue()
|
||||
self.stop_reading = threading.Event()
|
||||
self.socket = None
|
||||
|
||||
def establish_tcp_connection(self, src_port):
|
||||
"""建立TCP连接"""
|
||||
print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...")
|
||||
try:
|
||||
# 创建socket对象
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# 绑定源端口(如果指定了端口)
|
||||
if src_port > 0:
|
||||
try:
|
||||
self.socket.bind(('0.0.0.0', src_port))
|
||||
except socket.error as e:
|
||||
print(f"指定端口 {src_port} 被占用,将使用随机端口")
|
||||
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
|
||||
else:
|
||||
self.socket.bind(('0.0.0.0', 0)) # 使用随机可用端口
|
||||
|
||||
# 获取实际使用的端口
|
||||
actual_port = self.socket.getsockname()[1]
|
||||
print(f"使用本地端口: {actual_port}")
|
||||
|
||||
# 设置超时
|
||||
self.socket.settimeout(5)
|
||||
# 连接目标
|
||||
self.socket.connect((self.target_ip, self.target_port))
|
||||
print("TCP连接已建立")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"建立连接失败: {e}")
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
return False
|
||||
|
||||
def process_packet(self, packet, src_ip=None, src_port=None, protocol=None):
|
||||
"""处理单个数据包"""
|
||||
if IP not in packet:
|
||||
return
|
||||
|
||||
# 检查源IP
|
||||
if src_ip and packet[IP].src != src_ip:
|
||||
return
|
||||
|
||||
# 检查协议和源端口
|
||||
if protocol == 'tcp' and TCP in packet:
|
||||
if src_port and packet[TCP].sport != src_port:
|
||||
return
|
||||
conn_id = (packet[IP].src, packet[TCP].sport)
|
||||
self.connections[conn_id].append(packet)
|
||||
elif protocol == 'udp' and UDP in packet:
|
||||
if src_port and packet[UDP].sport != src_port:
|
||||
return
|
||||
conn_id = (packet[IP].src, packet[UDP].sport)
|
||||
self.connections[conn_id].append(packet)
|
||||
elif not protocol: # 如果没有指定协议,则包含所有IP包
|
||||
if TCP in packet:
|
||||
if src_port and packet[TCP].sport != src_port:
|
||||
return
|
||||
conn_id = (packet[IP].src, packet[TCP].sport)
|
||||
self.connections[conn_id].append(packet)
|
||||
elif UDP in packet:
|
||||
if src_port and packet[UDP].sport != src_port:
|
||||
return
|
||||
conn_id = (packet[IP].src, packet[UDP].sport)
|
||||
self.connections[conn_id].append(packet)
|
||||
|
||||
def response_reader(self, src_port):
|
||||
"""持续读取服务器响应的线程函数"""
|
||||
while not self.stop_reading.is_set() and self.socket:
|
||||
try:
|
||||
# 使用socket接收数据
|
||||
data = self.socket.recv(4096)
|
||||
if data:
|
||||
self.response_queue.put(data)
|
||||
print(f"收到响应: {len(data)} 字节")
|
||||
except socket.timeout:
|
||||
continue
|
||||
except Exception as e:
|
||||
if not self.stop_reading.is_set():
|
||||
print(f"读取响应时出错: {e}")
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
|
||||
"""边读取边重放数据包"""
|
||||
print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}")
|
||||
|
||||
try:
|
||||
# 使用PcapReader逐包读取
|
||||
reader = PcapReader(self.pcap_file)
|
||||
packet_count = 0
|
||||
connection_established = False
|
||||
|
||||
# 读取并处理数据包
|
||||
for packet in reader:
|
||||
packet_count += 1
|
||||
|
||||
if IP not in packet:
|
||||
continue
|
||||
|
||||
# 检查源IP
|
||||
if src_ip and packet[IP].src != src_ip:
|
||||
continue
|
||||
|
||||
# 检查协议和源端口
|
||||
current_src_port = None
|
||||
if protocol == 'tcp' and TCP in packet:
|
||||
if src_port and packet[TCP].sport != src_port:
|
||||
continue
|
||||
current_src_port = packet[TCP].sport
|
||||
elif protocol == 'udp' and UDP in packet:
|
||||
if src_port and packet[UDP].sport != src_port:
|
||||
continue
|
||||
current_src_port = packet[UDP].sport
|
||||
elif not protocol: # 如果没有指定协议,则包含所有IP包
|
||||
if TCP in packet:
|
||||
if src_port and packet[TCP].sport != src_port:
|
||||
continue
|
||||
current_src_port = packet[TCP].sport
|
||||
elif UDP in packet:
|
||||
if src_port and packet[UDP].sport != src_port:
|
||||
continue
|
||||
current_src_port = packet[UDP].sport
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
continue
|
||||
|
||||
# 找到第一个符合条件的包,建立连接
|
||||
if not connection_established:
|
||||
if not self.establish_tcp_connection(current_src_port):
|
||||
print("无法建立连接,退出")
|
||||
return
|
||||
# 启动响应读取线程
|
||||
self.stop_reading.clear()
|
||||
reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,))
|
||||
reader_thread.daemon = True
|
||||
reader_thread.start()
|
||||
connection_established = True
|
||||
|
||||
# 发送当前数据包
|
||||
try:
|
||||
if Raw in packet:
|
||||
self.socket.send(packet[Raw].load)
|
||||
packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time)))
|
||||
print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (负载大小: {len(packet[Raw].load)} 字节)")
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
except Exception as e:
|
||||
print(f"发送数据包 {packet_count} 时出错: {e}")
|
||||
sys.exit(1) # 发送失败直接退出进程
|
||||
|
||||
print(f"总共处理了 {packet_count} 个数据包")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理数据包时出错: {e}")
|
||||
sys.exit(1) # 其他错误也直接退出进程
|
||||
finally:
|
||||
# 关闭连接和停止读取线程
|
||||
self.stop_reading.set()
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
reader.close()
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Wireshark数据包重放工具')
|
||||
parser.add_argument('pcap_file', help='pcap文件路径')
|
||||
parser.add_argument('target_ip', help='目标IP地址')
|
||||
parser.add_argument('target_port', type=int, help='目标端口')
|
||||
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
|
||||
parser.add_argument('--src-ip', help='过滤源IP地址')
|
||||
parser.add_argument('--src-port', type=int, help='过滤源端口')
|
||||
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
|
||||
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user