Compare commits

...

3 Commits

Author SHA1 Message Date
langhuihui
438a8ddee1 fix: rtmp play write timeout 2025-09-08 13:28:52 +08:00
langhuihui
4e68cfccba fix: reorder udp 2025-09-08 13:00:04 +08:00
百川8488
21b3bd053a feat: 海康SDK插件 (#331) 2025-09-08 12:42:57 +08:00
69 changed files with 3189 additions and 9 deletions

View File

@@ -13,7 +13,6 @@ import (
_ "m7s.live/v5/plugin/flv"
_ "m7s.live/v5/plugin/gb28181"
_ "m7s.live/v5/plugin/logrotate"
_ "m7s.live/v5/plugin/monitor"
_ "m7s.live/v5/plugin/mp4"
mp4 "m7s.live/v5/plugin/mp4/pkg"
_ "m7s.live/v5/plugin/preview"

View File

@@ -9,7 +9,6 @@ import (
_ "m7s.live/v5/plugin/debug"
_ "m7s.live/v5/plugin/flv"
_ "m7s.live/v5/plugin/logrotate"
_ "m7s.live/v5/plugin/monitor"
_ "m7s.live/v5/plugin/rtmp"
_ "m7s.live/v5/plugin/rtsp"
_ "m7s.live/v5/plugin/test"

45
plugin/hiksdk/client.go Normal file
View File

@@ -0,0 +1,45 @@
package plugin_hiksdk
import (
"m7s.live/v5"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
)
const (
DIRECTION_PULL = "pull"
DIRECTION_PUSH = "push"
)
type ClientPlugin struct {
task.Job
conf *HikPlugin
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
}
func (c *ClientPlugin) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *ClientPlugin) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}
func NewPuller(_ config.Pull) m7s.IPuller {
client := &ClientPlugin{
direction: DIRECTION_PULL,
}
client.SetDescription(task.OwnerTypeKey, "HikPuller")
return client
}
func NewPusher() m7s.IPusher {
client := &ClientPlugin{
direction: DIRECTION_PUSH,
}
client.SetDescription(task.OwnerTypeKey, "HikPusher")
return client
}

80
plugin/hiksdk/device.go Normal file
View File

@@ -0,0 +1,80 @@
package plugin_hiksdk
import (
"fmt"
"strings"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/hiksdk/pkg"
"github.com/prometheus/client_golang/prometheus"
)
type HikDevice struct {
task.Job
IP string
UserName string
Password string
Port int
Device pkg.Device
Conf *HikPlugin
}
func (d *HikDevice) Start() (err error) {
info := pkg.DeviceInfo{
IP: d.IP,
UserName: d.UserName,
Password: d.Password,
Port: d.Port,
}
d.Device = pkg.NewHKDevice(info)
return
}
func (d *HikDevice) Run() (err error) {
if _, err := d.Device.Login(); err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("success login")
}
deviceInfo, err := d.Device.GetDeiceInfo() // 获取设备参数
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(deviceInfo)
}
channelNames, err := d.Device.GetChannelName() // 获取通道
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("通道:", channelNames)
}
d.AutoPullStream()
return
}
func (d *HikDevice) AutoPullStream() {
deviceInfo, _ := d.Device.GetDeiceInfo() // 获取设备参数
channelNames, _ := d.Device.GetChannelName() // 获取通道
for i := 1; i <= int(deviceInfo.ByChanNum); i++ {
d.PullStream(deviceInfo.DeviceID, channelNames[i], i)
}
}
func (d *HikDevice) PullStream(ifname string, channelName string, channelId int) error {
// 生成流路径
ifname = strings.ReplaceAll(ifname, "-", "_")
streamPath := fmt.Sprintf("%s/%s", ifname, channelName)
receiver := &pkg.PSReceiver{}
receiver.Device = d.Device
receiver.ChannelId = channelId
receiver.Publisher, _ = d.Conf.Publish(d.Conf, streamPath)
go d.Conf.RunTask(receiver)
return nil
}
func (d *HikDevice) Describe(ch chan<- *prometheus.Desc) {
d.Device.Logout()
}

File diff suppressed because it is too large Load Diff

54
plugin/hiksdk/index.go Normal file
View File

@@ -0,0 +1,54 @@
package plugin_hiksdk
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
m7s "m7s.live/v5"
"m7s.live/v5/plugin/hiksdk/pkg"
)
type HikPlugin struct {
m7s.Plugin
Clients []Client `yaml:"client,omitempty"` //共享的通道格式为GBID流地址
list []HikDevice
}
type Client struct {
IP string `yaml:"ip"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
Port int `yaml:"port"`
}
var _ = m7s.InstallPlugin[HikPlugin](m7s.PluginMeta{
NewPuller: NewPuller,
NewPusher: NewPusher,
})
func init() {
fmt.Println("success 初始化海康SDK")
pkg.InitHikSDK()
}
func (hik *HikPlugin) Start() (err error) {
for i, client := range hik.Clients {
fmt.Printf("Client[%d]: IP=%s, UserName=%s, Password=%s, Port=%d\n", i, client.IP, client.UserName, client.Password, client.Port)
device := HikDevice{
IP: client.IP,
UserName: client.UserName,
Password: client.Password,
Port: client.Port,
Conf: hik,
}
hik.list = append(hik.list, device)
}
for _, device := range hik.list {
go hik.AddTask(&device)
}
return
}
func (hik *HikPlugin) Describe(ch chan<- *prometheus.Desc) {
pkg.HKExit()
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,26 @@
#ȱʡ<C8B1><CAA1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̨
#FATAL<41><4C>ERROR<4F><52>WARN<52><4E>INFO<46><4F>DEBUG <20><><EFBFBD>ȼ<EFBFBD>˳<EFBFBD><CBB3> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD>͸<EFBFBD>ģ<EFBFBD>鶼ƥ<E9B6BC><EFBFBD><E4A3AC>ô<EFBFBD><C3B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
log4j.rootLogger=DEBUG, stdout
#log4j.rootLogger=DEBUG
##hlog.async=false
##hlog.secret.show=true
##hlog.secret.encrypt=false
#log4j.logger<65><72><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD>ɼ<EFBFBD><C9BC><EFBFBD><EFBFBD>𼰲ɼ<F0BCB0B2><C9BC><EFBFBD><EFBFBD>ݣ<EFBFBD>Threshold<6C><64><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<><D3A6><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD>̨
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d][%t][%-5p]- %m%n
log4j.logger.NPQ=ERROR, NPQ
log4j.appender.NPQ=org.apache.log4j.RollingFileAppender
log4j.appender.NPQ.File=./NPQLog/NPQ.log
log4j.appender.NPQ.MaxFileSize=80MB
log4j.appender.NPQ.MaxBackupIndex=12
log4j.appender.NPQ.Append=true
log4j.appender.NPQ.Threshold=TRACE
log4j.appender.NPQ.layout=org.apache.log4j.PatternLayout
log4j.appender.NPQ.layout.ConversionPattern=[%d][%t][%-5p]- %m%n
log4j.additivity.NPQ = false
#<23><><EFBFBD><EFBFBD>һλ<D2BB>޸<EFBFBD>Ϊtrue <20>ȿ<EFBFBD><C8BF>Կ<EFBFBD><D4BF><EFBFBD>̨<EFBFBD><CCA8><EFBFBD><EFBFBD><EFBFBD>ֿ<EFBFBD><D6BF><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD>

Binary file not shown.

View File

@@ -0,0 +1,506 @@
package pkg
/*
#cgo CFLAGS: -I../include
// Linux平台的链接配置
#cgo linux LDFLAGS: -L../lib/Linux -lHCCore -lhpr -lhcnetsdk
// Windows平台的链接配置
#cgo windows LDFLAGS: -L../lib/Windows -lHCCore -lHCNetSDK
#include <stdio.h>
#include <stdlib.h>
#include "HCNetSDK.h"
extern void AlarmCallBack(LONG lCommand, NET_DVR_ALARMER *pAlarmer, char *pAlarmInfo, DWORD dwBufLen, void* pUser);
extern void RealDataCallBack_V30(LONG lRealHandle, DWORD dwDataType, BYTE *pBuffer, DWORD dwBufSize, void *pUser);
*/
import "C"
import (
"errors"
"fmt"
"log"
"strings"
"sync"
"unsafe"
"golang.org/x/text/encoding/simplifiedchinese"
)
// 全局指针映射用于安全地在CGO中传递Go指针
var (
pointerMap = make(map[uintptr]*Receiver)
pointerMutex sync.RWMutex
pointerCounter uintptr = 1
)
// 存储Go指针返回一个唯一的标识符
func storePointer(receiver *Receiver) uintptr {
pointerMutex.Lock()
defer pointerMutex.Unlock()
id := pointerCounter
pointerCounter++
pointerMap[id] = receiver
return id
}
// 根据标识符获取Go指针
func getPointer(id uintptr) *Receiver {
pointerMutex.RLock()
defer pointerMutex.RUnlock()
return pointerMap[id]
}
// 删除存储的指针
func removePointer(id uintptr) {
pointerMutex.Lock()
defer pointerMutex.Unlock()
delete(pointerMap, id)
}
/*************************参数配置命令 begin*******************************/
//用于NET_DVR_SetDVRConfig和NET_DVR_GetDVRConfig,注意其对应的配置结构
const (
NET_DVR_GET_DEVICECFG = 100 // 获取设备参数
NET_DVR_SET_DEVICECFG = 101 // 设置设备参数
NET_DVR_GET_NETCFG = 102 // 获取网络参数
NET_DVR_SET_NETCFG = 103 // 设置网络参数
NET_DVR_GET_PICCFG = 104 // 获取图象参数
NET_DVR_SET_PICCFG = 105 // 设置图象参数
NET_DVR_GET_COMPRESSCFG = 106 // 获取压缩参数
NET_DVR_SET_COMPRESSCFG = 107 // 设置压缩参数
NET_DVR_GET_RECORDCFG = 108 // 获取录像时间参数
NET_DVR_SET_RECORDCFG = 109 // 设置录像时间参数
NET_DVR_GET_DECODERCFG = 110 // 获取解码器参数
NET_DVR_SET_DECODERCFG = 111 // 设置解码器参数
NET_DVR_GET_RS232CFG = 112 // 获取232串口参数
NET_DVR_SET_RS232CFG = 113 // 设置232串口参数
NET_DVR_GET_ALARMINCFG = 114 // 获取报警输入参数
NET_DVR_SET_ALARMINCFG = 115 // 设置报警输入参数
NET_DVR_GET_ALARMOUTCFG = 116 // 获取报警输出参数
NET_DVR_SET_ALARMOUTCFG = 117 // 设置报警输出参数
NET_DVR_GET_TIMECFG = 118 // 获取DVR时间
NET_DVR_SET_TIMECFG = 119 // 设置DVR时间
NET_DVR_GET_PREVIEWCFG = 120 // 获取预览参数
NET_DVR_SET_PREVIEWCFG = 121 // 设置预览参数
NET_DVR_GET_VIDEOOUTCFG = 122 // 获取视频输出参数
NET_DVR_SET_VIDEOOUTCFG = 123 // 设置视频输出参数
NET_DVR_GET_USERCFG = 124 // 获取用户参数
NET_DVR_SET_USERCFG = 125 // 设置用户参数
NET_DVR_GET_EXCEPTIONCFG = 126 // 获取异常参数
NET_DVR_SET_EXCEPTIONCFG = 127 // 设置异常参数
NET_DVR_GET_ZONEANDDST = 128 // 获取时区和夏时制参数
NET_DVR_SET_ZONEANDDST = 129 // 设置时区和夏时制参数
NET_DVR_GET_DEVICECFG_V40 = 1100 // 获取设备参数
NET_DVR_SET_PTZPOS = 292 //云台设置PTZ位置
NET_DVR_GET_PTZPOS = 293 //云台获取PTZ位置
NET_DVR_GET_PTZSCOPE = 294 //云台获取PTZ范围
)
// strcpy safely copies a Go string to a C character array
func strcpy(dst unsafe.Pointer, src string, maxLen int) {
if len(src) >= maxLen {
copy((*[1 << 30]byte)(dst)[:maxLen-1], src)
(*[1 << 30]byte)(dst)[maxLen-1] = 0
} else {
copy((*[1 << 30]byte)(dst)[:len(src)], src)
(*[1 << 30]byte)(dst)[len(src)] = 0
}
}
// GBK → UTF-8
func GBKToUTF8(b []byte) (string, error) {
r, err := simplifiedchinese.GBK.NewDecoder().Bytes(b)
return string(r), err
}
// UTF-8 → GBK
func UTF8ToGBK(s string) ([]byte, error) {
return simplifiedchinese.GBK.NewEncoder().Bytes([]byte(s))
}
//export AlarmCallBack
func AlarmCallBack(command C.LONG, alarm *C.NET_DVR_ALARMER, info *C.char, len C.DWORD, user unsafe.Pointer) {
fmt.Println("receive alarm")
}
//export RealDataCallBack_V30
func RealDataCallBack_V30(lRealHandle C.LONG, dwDataType C.DWORD, pBuffer *C.BYTE, dwBufSize C.DWORD, pUser unsafe.Pointer) {
// 从指针映射中获取Receiver
receiverID := uintptr(pUser)
receiver := getPointer(receiverID)
if receiver == nil {
fmt.Println("Error: receiver not found for ID", receiverID)
return
}
size := int(dwBufSize)
if size > 0 && pBuffer != nil {
// 将C指针转换为Go的byte切片
buffer := (*[1 << 30]C.BYTE)(unsafe.Pointer(pBuffer))[:size:size]
// 使用unsafe.Pointer高效转换[]C.BYTE为[]byte
goBuffer := (*[1 << 30]byte)(unsafe.Pointer(&buffer[0]))[:len(buffer):len(buffer)]
receiver.ReadPSData(goBuffer)
}
}
type HKDevice struct {
ip string
port int
username string
password string
loginId int
alarmHandle int
lRealHandle int
byChanNum int
receiverID uintptr // 存储指针映射ID
}
// InitHikSDK hk sdk init
func InitHikSDK() {
// 初始化SDK
C.NET_DVR_Init()
C.NET_DVR_SetConnectTime(2000, 5)
C.NET_DVR_SetReconnect(10000, 1)
}
// HKExit hk sdk clean
func HKExit() {
C.NET_DVR_Cleanup()
}
// NewHKDevice new hk-device instance
func NewHKDevice(info DeviceInfo) Device {
return &HKDevice{
ip: info.IP,
port: info.Port,
username: info.UserName,
password: info.Password}
}
// Login hk device loin
func (device *HKDevice) Login() (int, error) {
// init data
var deviceInfoV30 C.NET_DVR_DEVICEINFO_V30
ip := C.CString(device.ip)
usr := C.CString(device.username)
passwd := C.CString(device.password)
defer func() {
C.free(unsafe.Pointer(ip))
C.free(unsafe.Pointer(usr))
C.free(unsafe.Pointer(passwd))
}()
device.loginId = int(C.NET_DVR_Login_V30(ip, C.WORD(device.port), usr, passwd,
(*C.NET_DVR_DEVICEINFO_V30)(unsafe.Pointer(&deviceInfoV30)),
))
// 正确地将_Ctype_BYTE数组转换为string
serialNumber := string(unsafe.Slice(&deviceInfoV30.sSerialNumber[0], len(deviceInfoV30.sSerialNumber)))
// 去除可能的空字符
serialNumber = strings.Trim(serialNumber, "\x00")
fmt.Println("设备序列号:", serialNumber)
fmt.Println("登录成功,设备信息已获取")
if device.loginId < 0 {
return -1, device.HKErr("login ")
}
log.Println("login success")
return device.loginId, nil
}
// Logout hk device logout
func (device *HKDevice) Logout() error {
C.NET_DVR_Logout_V30(C.LONG(device.loginId))
if err := device.HKErr("NVRLogout"); err != nil {
return err
}
return nil
}
// Login hk device loin
func (device *HKDevice) LoginV4() (int, error) {
// init data
var deviceInfoV40 C.NET_DVR_DEVICEINFO_V40
var userLoginInfo C.NET_DVR_USER_LOGIN_INFO
// 使用strcpy函数将字符串复制到C结构体的字符数组中
strcpy(unsafe.Pointer(&userLoginInfo.sDeviceAddress[0]), device.ip, len(userLoginInfo.sDeviceAddress))
userLoginInfo.wPort = C.WORD(device.port)
strcpy(unsafe.Pointer(&userLoginInfo.sUserName[0]), device.username, len(userLoginInfo.sUserName))
strcpy(unsafe.Pointer(&userLoginInfo.sPassword[0]), device.password, len(userLoginInfo.sPassword))
// 正确调用NET_DVR_Login_V40函数
device.loginId = int(C.NET_DVR_Login_V40(&userLoginInfo, (*C.NET_DVR_DEVICEINFO_V40)(unsafe.Pointer(&deviceInfoV40))))
// 正确地将_Ctype_BYTE数组转换为string
serialNumber := string(unsafe.Slice(&deviceInfoV40.struDeviceV30.sSerialNumber[0], len(deviceInfoV40.struDeviceV30.sSerialNumber)))
// 去除可能的空字符
serialNumber = strings.Trim(serialNumber, "\x00")
// fmt.Println("设备序列号:", serialNumber)
// bySupportDev5是一个字节值而不是数组直接输出其数值
// fmt.Println("支持的设备类型标志:", int(deviceInfoV40.bySupportDev5))
// fmt.Println("登录成功,设备信息已获取")
if device.loginId < 0 {
return -1, device.HKErr("login ")
}
log.Println("login success")
return device.loginId, nil
}
func (device *HKDevice) GetDeiceInfo() (*DeviceInfo, error) {
// BOOL NET_DVR_GetDVRConfig(LONG lUserID, DWORD dwCommand,LONG lChannel, LPVOID lpOutBuffer, DWORD dwOutBufferSize, LPDWORD lpBytesReturned);
var deviceInfo C.NET_DVR_DEVICECFG
var bytesReturned C.DWORD
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_DEVICECFG), C.LONG(0), (C.LPVOID)(unsafe.Pointer(&deviceInfo)), C.DWORD(unsafe.Sizeof(deviceInfo)), &bytesReturned) != C.TRUE {
// fmt.Println("获取设备信息失败")
}
sDVRName := string(unsafe.Slice(&deviceInfo.sDVRName[0], len(deviceInfo.sDVRName)))
sSerialNumber := string(unsafe.Slice(&deviceInfo.sSerialNumber[0], len(deviceInfo.sSerialNumber)))
// 清理字符串中的空字符和空格
sDVRName = strings.TrimRight(sDVRName, "\x00")
sDVRName = strings.TrimSpace(sDVRName)
sSerialNumber = strings.TrimRight(sSerialNumber, "\x00")
sSerialNumber = strings.TrimSpace(sSerialNumber)
sDVRName, _ = GBKToUTF8([]byte(sDVRName))
device.byChanNum = int(deviceInfo.byChanNum)
return &DeviceInfo{
IP: device.ip,
Port: device.port,
UserName: device.username,
Password: device.password,
DeviceID: sSerialNumber,
DeviceName: sDVRName,
ByChanNum: int(deviceInfo.byChanNum),
}, nil
}
// 获取通道名称,俯仰角,横滚角
func (device *HKDevice) GetChannelName() (map[int]string, error) {
channelNames := make(map[int]string)
for i := 1; i <= int(device.byChanNum); i++ {
var channelInfo C.NET_DVR_PICCFG
var bytesReturned C.DWORD
var sDVRName string
// if C.NET_DVR_GetChannelInfo(C.LONG(device.loginId), C.LONG(i), (*C.NET_DVR_CHANNELINFO)(unsafe.Pointer(&channelInfo))) != C.TRUE {
// return nil, device.HKErr("get channel info")
// }
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PICCFG), C.LONG(i), (C.LPVOID)(unsafe.Pointer(&channelInfo)), C.DWORD(unsafe.Sizeof(channelInfo)), &bytesReturned) != C.TRUE {
// fmt.Println("获取通道名称失败")
// return nil, device.HKErr("get device info")
sDVRName = "camera" + fmt.Sprintf("%d", i)
} else {
sDVRName = string(unsafe.Slice(&channelInfo.sChanName[0], len(channelInfo.sChanName)))
// 清理字符串中的空字符和空格
sDVRName = strings.TrimRight(sDVRName, "\x00")
sDVRName = strings.TrimSpace(sDVRName)
sDVRName, _ = GBKToUTF8([]byte(sDVRName))
}
channelNames[i] = sDVRName
}
return channelNames, nil
}
// 获取通道名称,俯仰角,横滚角
func (device *HKDevice) GetChannelPTZ(channel int) {
var ptzPos C.NET_DVR_PTZPOS
var ptzScope C.NET_DVR_PTZSCOPE
var bytesReturned C.DWORD
// if C.NET_DVR_GetChannelInfo(C.LONG(device.loginId), C.LONG(i), (*C.NET_DVR_CHANNELINFO)(unsafe.Pointer(&channelInfo))) != C.TRUE {
// return nil, device.HKErr("get channel info")
// }
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PTZPOS), C.LONG(channel), (C.LPVOID)(unsafe.Pointer(&ptzPos)), C.DWORD(unsafe.Sizeof(ptzPos)), &bytesReturned) != C.TRUE {
// fmt.Println("获取PTZ位置信息失败")
}
// fmt.Println("PTZ位置信息:", ptzPos)
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PTZSCOPE), C.LONG(channel), (C.LPVOID)(unsafe.Pointer(&ptzScope)), C.DWORD(unsafe.Sizeof(ptzScope)), &bytesReturned) != C.TRUE {
// fmt.Println("获取PTZ范围信息失败")
}
fmt.Println("PTZ范围信息:", ptzScope)
// 计算PTZ位置 - 显示原始值用于调试
fmt.Println("原始PTZ值 - wPanPos:", ptzPos.wPanPos, "wPanPosMin:", ptzScope.wPanPosMin, "wPanPosMax:", ptzScope.wPanPosMax)
fmt.Println("原始PTZ值 - wTiltPos:", ptzPos.wTiltPos, "wTiltPosMin:", ptzScope.wTiltPosMin, "wTiltPosMax:", ptzScope.wTiltPosMax)
fmt.Println("原始PTZ值 - wZoomPos:", ptzPos.wZoomPos, "wZoomPosMin:", ptzScope.wZoomPosMin, "wZoomPosMax:", ptzScope.wZoomPosMax)
// 计算差值
deltaPan := ptzScope.wPanPosMax - ptzScope.wPanPosMin
deltaTilt := ptzScope.wTiltPosMax - ptzScope.wTiltPosMin
deltaZoom := ptzScope.wZoomPosMax - ptzScope.wZoomPosMin
fmt.Println("差值计算 - deltaPan:", deltaPan, "deltaTilt:", deltaTilt, "deltaZoom:", deltaZoom)
fmt.Println("位置差值 - Pan:", ptzPos.wPanPos-ptzScope.wPanPosMin, "Tilt:", ptzPos.wTiltPos-ptzScope.wTiltPosMin, "Zoom:", ptzPos.wZoomPos-ptzScope.wZoomPosMin)
// 添加除零检查和边界处理
// 计算水平位置 (Pan)
if deltaPan > 0 {
// 计算实际比例
panRatio := float64(ptzPos.wPanPos-ptzScope.wPanPosMin) / float64(deltaPan)
fmt.Println("Pan比例:", panRatio)
// 计算Pan位置0-360度
panPos := int(panRatio * 360)
// 确保结果在0-360范围内
if panPos < 0 {
panPos = 0
} else if panPos > 360 {
panPos = 360
}
ptzPos.wPanPos = C.WORD(panPos)
fmt.Println("计算后Pan位置:", panPos)
} else {
ptzPos.wPanPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ水平范围无效设置为默认值")
}
// 计算垂直位置 (Tilt)
if deltaTilt > 0 {
// 计算实际比例
tiltRatio := float64(ptzPos.wTiltPos-ptzScope.wTiltPosMin) / float64(deltaTilt)
fmt.Println("Tilt比例:", tiltRatio)
// 计算Tilt位置0-360度
tiltPos := int(tiltRatio * 90)
// 确保结果在0-360范围内
if tiltPos < 0 {
tiltPos = 0
} else if tiltPos > 90 {
tiltPos = 90
}
ptzPos.wTiltPos = C.WORD(tiltPos)
fmt.Println("计算后Tilt位置:", tiltPos)
} else {
ptzPos.wTiltPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ垂直范围无效设置为默认值")
}
// 计算缩放位置 (Zoom)
if deltaZoom > 0 {
// 计算实际比例
zoomRatio := float64(ptzPos.wZoomPos-ptzScope.wZoomPosMin) / float64(deltaZoom)
fmt.Println("Zoom比例:", zoomRatio)
// 计算Zoom位置0-100%
zoomPos := int(zoomRatio * 100)
// 确保结果在0-100范围内
if zoomPos < 0 {
zoomPos = 0
} else if zoomPos > 100 {
zoomPos = 100
}
ptzPos.wZoomPos = C.WORD(zoomPos)
fmt.Println("计算后Zoom位置:", zoomPos)
} else {
ptzPos.wZoomPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ缩放范围无效设置为默认值")
}
fmt.Println("PTZ位置信息:", ptzPos)
}
func (device *HKDevice) SetAlarmCallBack() error { //监听报警信息
if C.NET_DVR_SetDVRMessageCallBack_V30(C.MSGCallBack(C.AlarmCallBack), C.NULL) != C.TRUE {
return device.HKErr(device.ip + ":set alarm callback")
}
return nil
}
func (device *HKDevice) StartListenAlarmMsg() error {
var struAlarmParam C.NET_DVR_SETUPALARM_PARAM
// 根据平台使用正确的类型
struAlarmParam.dwSize = C.DWORD(unsafe.Sizeof(struAlarmParam))
struAlarmParam.byAlarmInfoType = 0
// 转换为正确的类型
device.alarmHandle = int(C.NET_DVR_SetupAlarmChan_V41(C.LONG(device.loginId), &struAlarmParam))
if device.alarmHandle < 0 {
return device.HKErr("setup alarm chan")
}
return nil
}
func (device *HKDevice) StopListenAlarmMsg() error {
if C.NET_DVR_CloseAlarmChan_V30(C.LONG(device.alarmHandle)) != C.TRUE {
return device.HKErr("stop alarm chan")
}
return nil
}
// HKErr Detect success of operation
func (device *HKDevice) HKErr(operation string) error {
errno := int64(C.NET_DVR_GetLastError())
if errno > 0 {
reMsg := fmt.Sprintf("%s:%s摄像头失败,失败代码号:%d", device.ip, operation, errno)
return errors.New(reMsg)
}
return nil
}
// // Login hk device loin
func (device *HKDevice) PTZControlWithSpeed(dwPTZCommand, dwStop, dwSpeed int) (bool, error) {
// init data
if C.NET_DVR_PTZControlWithSpeed(C.LONG(device.loginId), C.DWORD(dwPTZCommand), C.DWORD(dwStop), C.DWORD(dwSpeed)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControlWithSpeed_Other(lChannel, dwPTZCommand, dwStop, dwSpeed int) (bool, error) {
if C.NET_DVR_PTZControlWithSpeed_Other(C.LONG(device.loginId), C.LONG(lChannel), C.DWORD(dwPTZCommand), C.DWORD(dwStop), C.DWORD(dwSpeed)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControl(dwPTZCommand, dwStop int) (bool, error) {
// init data
if C.NET_DVR_PTZControl(C.LONG(device.loginId), C.DWORD(dwPTZCommand), C.DWORD(dwStop)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControl_Other(lChannel, dwPTZCommand, dwStop int) (bool, error) {
// init data
if C.NET_DVR_PTZControl_Other(C.LONG(device.loginId), C.LONG(lChannel), C.DWORD(dwPTZCommand), C.DWORD(dwStop)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
func (device *HKDevice) RealPlay_V40(ChannelId int, receiver *Receiver) (int, error) {
previewInfo := C.NET_DVR_PREVIEWINFO{}
previewInfo.hPlayWnd = nil
previewInfo.lChannel = C.LONG(ChannelId)
previewInfo.dwStreamType = C.DWORD(0)
previewInfo.dwLinkMode = C.DWORD(0)
previewInfo.bBlocked = C.DWORD(0)
previewInfo.byProtoType = C.BYTE(0)
// 存储receiver指针并获取安全ID
receiverID := storePointer(receiver)
device.receiverID = receiverID
//LONG NET_DVR_RealPlay_V40(LONG lUserID, LPNET_DVR_PREVIEWINFO lpPreviewInfo, REALDATACALLBACK fRealDataCallBack_V30, void* pUser);
device.lRealHandle = int(C.NET_DVR_RealPlay_V40(C.LONG(device.loginId), &previewInfo, C.REALDATACALLBACK(C.RealDataCallBack_V30), unsafe.Pointer(receiverID)))
log.Println("Play success", device.lRealHandle)
return device.lRealHandle, nil
}
// StopRealPlay 停止预览
func (device *HKDevice) StopRealPlay() {
if device.lRealHandle != 0 {
C.NET_DVR_StopRealPlay(C.LONG(device.lRealHandle))
device.lRealHandle = 0
// 清理指针映射
if device.receiverID != 0 {
removePointer(device.receiverID)
device.receiverID = 0
}
log.Println("Stop preview success")
}
}

416
plugin/hiksdk/pkg/audio.go Normal file
View File

@@ -0,0 +1,416 @@
package pkg
import (
"encoding/hex"
"fmt"
"strings"
"time"
"unsafe"
"github.com/bluenviron/mediacommon/pkg/bits"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
. "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
type RTPData struct {
Sample
Packets util.ReuseArray[rtp.Packet]
}
func (r *RTPData) Recycle() {
r.RecyclableMemory.Recycle()
r.Packets.Reset()
}
func (r *RTPData) String() (s string) {
for p := range r.Packets.RangePoint {
s += fmt.Sprintf("t: %d, s: %d, p: %02X %d\n", p.Timestamp, p.SequenceNumber, p.Payload[0:2], len(p.Payload))
}
return
}
func (r *RTPData) GetSize() (s int) {
for p := range r.Packets.RangePoint {
s += p.MarshalSize()
}
return
}
type (
RTPCtx struct {
webrtc.RTPCodecParameters
Fmtp map[string]string
SequenceNumber uint16
SSRC uint32
}
PCMACtx struct {
RTPCtx
*codec.PCMACtx
}
PCMUCtx struct {
RTPCtx
*codec.PCMUCtx
}
OPUSCtx struct {
RTPCtx
*codec.OPUSCtx
}
AACCtx struct {
RTPCtx
*codec.AACCtx
SizeLength int // 通常为13
IndexLength int
IndexDeltaLength int
}
IRTPCtx interface {
GetRTPCodecParameter() webrtc.RTPCodecParameters
}
)
func (r *RTPCtx) ParseFmtpLine(cp *webrtc.RTPCodecParameters) {
r.RTPCodecParameters = *cp
r.Fmtp = make(map[string]string)
kvs := strings.Split(r.SDPFmtpLine, ";")
for _, kv := range kvs {
if kv = strings.TrimSpace(kv); kv == "" {
continue
}
if key, value, found := strings.Cut(kv, "="); found {
r.Fmtp[strings.TrimSpace(key)] = strings.TrimSpace(value)
}
}
}
func (r *RTPCtx) GetInfo() string {
return r.GetRTPCodecParameter().SDPFmtpLine
}
func (r *AACCtx) GetInfo() string {
return r.AACCtx.GetInfo()
}
func (r *OPUSCtx) GetInfo() string {
return r.OPUSCtx.GetInfo()
}
func (r *RTPCtx) GetRTPCodecParameter() webrtc.RTPCodecParameters {
return r.RTPCodecParameters
}
func (r *RTPData) Append(ctx *RTPCtx, ts uint32, payload []byte) *rtp.Packet {
ctx.SequenceNumber++
r.Packets = append(r.Packets, rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: ts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: payload,
})
return &r.Packets[len(r.Packets)-1]
}
var _ IAVFrame = (*AudioFrame)(nil)
type AudioFrame struct {
RTPData
}
func (r *AudioFrame) Parse(data IAVFrame) (err error) {
input := data.(*AudioFrame)
r.Packets = append(r.Packets[:0], input.Packets...)
return
}
func payloadLengthInfoDecode(buf []byte) (int, int, error) {
lb := len(buf)
l := 0
n := 0
for {
if (lb - n) == 0 {
return 0, 0, fmt.Errorf("not enough bytes")
}
b := buf[n]
n++
l += int(b)
if b != 255 {
break
}
}
return l, n, nil
}
func (r *AudioFrame) Demux() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
data := r.GetAudioData()
// 从编解码器上下文获取 MimeType
var mimeType string
if rtpCtx, ok := r.ICodecCtx.(IRTPCtx); ok {
mimeType = rtpCtx.GetRTPCodecParameter().MimeType
}
switch mimeType {
case "audio/MP4A-LATM":
var fragments util.Memory
var fragmentsExpected int
var fragmentsSize int
for packet := range r.Packets.RangePoint {
if len(packet.Payload) == 0 {
continue
}
if packet.Padding {
packet.Padding = false
}
buf := packet.Payload
if fragments.Size == 0 {
pl, n, err := payloadLengthInfoDecode(buf)
if err != nil {
return err
}
buf = buf[n:]
bl := len(buf)
if pl <= bl {
data.PushOne(buf[:pl])
// there could be other data, due to otherDataPresent. Ignore it.
} else {
if pl > 5*1024 {
fragments = util.Memory{} // discard pending fragments
return fmt.Errorf("access unit size (%d) is too big, maximum is %d",
pl, 5*1024)
}
fragments.PushOne(buf)
fragmentsSize = pl
fragmentsExpected = pl - bl
continue
}
} else {
bl := len(buf)
if fragmentsExpected > bl {
fragments.PushOne(buf)
fragmentsExpected -= bl
continue
}
fragments.PushOne(buf[:fragmentsExpected])
// there could be other data, due to otherDataPresent. Ignore it.
data.Push(fragments.Buffers...)
if fragments.Size != fragmentsSize {
return fmt.Errorf("fragmented AU size is not correct %d != %d", data.Size, fragmentsSize)
}
fragments = util.Memory{}
}
}
case "audio/MPEG4-GENERIC":
var fragments util.Memory
for packet := range r.Packets.RangePoint {
if len(packet.Payload) < 2 {
continue
}
auHeaderLen := util.ReadBE[int](packet.Payload[:2])
if auHeaderLen == 0 {
data.PushOne(packet.Payload)
} else {
dataLens, err := r.readAUHeaders(r.ICodecCtx.(*AACCtx), packet.Payload[2:], auHeaderLen)
if err != nil {
return err
}
payload := packet.Payload[2:]
pos := auHeaderLen >> 3
if (auHeaderLen % 8) != 0 {
pos++
}
payload = payload[pos:]
if fragments.Size == 0 {
if packet.Marker {
for _, dataLen := range dataLens {
if len(payload) < int(dataLen) {
return fmt.Errorf("invalid data len %d", dataLen)
}
data.PushOne(payload[:dataLen])
payload = payload[dataLen:]
}
} else {
if len(dataLens) != 1 {
return fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.PushOne(payload)
}
} else {
if len(dataLens) != 1 {
return fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.PushOne(payload)
if !packet.Header.Marker {
continue
}
if uint64(fragments.Size) != dataLens[0] {
return fmt.Errorf("fragmented AU size is not correct %d != %d", dataLens[0], fragments.Size)
}
data.Push(fragments.Buffers...)
fragments = util.Memory{}
}
}
break
}
default:
for packet := range r.Packets.RangePoint {
data.PushOne(packet.Payload)
}
}
return nil
}
func (r *AudioFrame) Mux(from *Sample) (err error) {
data := from.Raw.(*AudioData)
var ctx *RTPCtx
var lastPacket *rtp.Packet
switch base := from.GetBase().(type) {
case *codec.AACCtx:
var c *AACCtx
if r.ICodecCtx == nil {
c = &AACCtx{}
c.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
c.AACCtx = base
c.MimeType = "audio/MPEG4-GENERIC"
c.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=%s", hex.EncodeToString(c.ConfigBytes))
c.IndexLength = 3
c.IndexDeltaLength = 3
c.SizeLength = 13
c.RTPCtx.Channels = uint16(base.GetChannels())
c.PayloadType = 97
c.ClockRate = uint32(base.CodecData.SampleRate())
r.ICodecCtx = c
} else {
c = r.ICodecCtx.(*AACCtx)
}
ctx = &c.RTPCtx
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度又因为单个auheader字节长度2字节所以再除以2就是auheader的个数。
auHeaderLen := []byte{0x00, 0x10, (byte)((data.Size & 0x1fe0) >> 5), (byte)((data.Size & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3
for reader := data.NewReader(); reader.Length > 0; {
payloadLen := MTUSize
if reader.Length+4 < MTUSize {
payloadLen = reader.Length + 4
}
mem := r.NextN(payloadLen)
copy(mem, auHeaderLen)
reader.Read(mem[4:])
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
return
case *codec.PCMACtx:
if r.ICodecCtx == nil {
var ctx PCMACtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMACtx = base
ctx.MimeType = webrtc.MimeTypePCMA
ctx.PayloadType = 8
ctx.ClockRate = uint32(ctx.SampleRate)
r.ICodecCtx = &ctx
} else {
ctx = &r.ICodecCtx.(*PCMACtx).RTPCtx
}
case *codec.PCMUCtx:
if r.ICodecCtx == nil {
var ctx PCMUCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMUCtx = base
ctx.MimeType = webrtc.MimeTypePCMU
ctx.PayloadType = 0
ctx.ClockRate = uint32(ctx.SampleRate)
r.ICodecCtx = &ctx
} else {
ctx = &r.ICodecCtx.(*PCMUCtx).RTPCtx
}
}
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
if reader := data.NewReader(); reader.Length > MTUSize {
for reader.Length > 0 {
payloadLen := MTUSize
if reader.Length < MTUSize {
payloadLen = reader.Length
}
mem := r.NextN(payloadLen)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
return
}
func (r *AudioFrame) readAUHeaders(ctx *AACCtx, buf []byte, headersLen int) ([]uint64, error) {
firstRead := false
count := 0
for i := 0; i < headersLen; {
if i == 0 {
i += ctx.SizeLength
i += ctx.IndexLength
} else {
i += ctx.SizeLength
i += ctx.IndexDeltaLength
}
count++
}
dataLens := make([]uint64, count)
pos := 0
i := 0
for headersLen > 0 {
dataLen, err := bits.ReadBits(buf, &pos, ctx.SizeLength)
if err != nil {
return nil, err
}
headersLen -= ctx.SizeLength
if !firstRead {
firstRead = true
if ctx.IndexLength > 0 {
auIndex, err := bits.ReadBits(buf, &pos, ctx.IndexLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexLength
if auIndex != 0 {
return nil, fmt.Errorf("AU-index different than zero is not supported")
}
}
} else if ctx.IndexDeltaLength > 0 {
auIndexDelta, err := bits.ReadBits(buf, &pos, ctx.IndexDeltaLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexDeltaLength
if auIndexDelta != 0 {
return nil, fmt.Errorf("AU-index-delta different than zero is not supported")
}
}
dataLens[i] = dataLen
i++
}
return dataLens, nil
}

View File

@@ -0,0 +1,28 @@
package pkg
type Device interface {
Login() (int, error)
LoginV4() (int, error)
GetDeiceInfo() (*DeviceInfo, error)
GetChannelName() (map[int]string, error)
Logout() error
SetAlarmCallBack() error
StartListenAlarmMsg() error
StopListenAlarmMsg() error
PTZControlWithSpeed(dwPTZCommand, dwStop, dwSpeed int) (bool, error)
PTZControlWithSpeed_Other(lChannel, dwPTZCommand, dwStop, dwSpeed int) (bool, error)
PTZControl(dwPTZCommand, dwStop int) (bool, error)
PTZControl_Other(lChannel, dwPTZCommand, dwStop int) (bool, error)
GetChannelPTZ(channel int)
RealPlay_V40(ChannelId int,receiver *Receiver) (int, error)
StopRealPlay()
}
type DeviceInfo struct {
IP string
Port int
UserName string
Password string
DeviceID string //序列号
DeviceName string //DVR名称
ByChanNum int //通道数量
}

View File

@@ -0,0 +1,164 @@
package pkg
import (
"fmt"
"io"
mpegps "m7s.live/v5/pkg/format/ps"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
type ChanReader chan []byte
func (r ChanReader) Read(buf []byte) (n int, err error) {
b, ok := <-r
if !ok {
return 0, io.EOF
}
copy(buf, b)
return len(b), nil
}
type Receiver struct {
task.Task
*util.BufReader
SSRC uint32 // RTP SSRC
PSMouth chan []byte // 直接处理PS数据的通道
psBuffer []byte // PS数据缓冲区用于处理跨包的PS起始码
}
type PSReceiver struct {
Device Device // 设备
ChannelId int // 通道号
Receiver
mpegps.MpegPsDemuxer
}
func (p *PSReceiver) Start() error {
err := p.Receiver.Start()
if err == nil {
p.Using(p.Publisher)
}
// 初始化播放控制通道(如果未初始化)
p.Device.RealPlay_V40(p.ChannelId, &p.Receiver)
return err
}
func (p *PSReceiver) Run() error {
p.MpegPsDemuxer.Allocator = util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
p.Using(p.MpegPsDemuxer.Allocator)
// 确保Publisher已设置
if p.MpegPsDemuxer.Publisher == nil {
return fmt.Errorf("Publisher未设置")
}
err := p.MpegPsDemuxer.Feed(p.BufReader)
return err
}
func (p *PSReceiver) Dispose() {
p.Device.StopRealPlay()
// 停止设备播放
}
func (p *Receiver) Start() (err error) {
p.PSMouth = make(chan []byte, 500) // 增加PS数据通道缓冲区到500避免数据丢失
psReader := (ChanReader)(p.PSMouth) // 直接使用PS数据通道
p.BufReader = util.NewBufReader(psReader)
return
}
func (p *Receiver) ReadPSData(data util.Buffer) (err error) {
// 将新数据添加到缓冲区
p.psBuffer = append(p.psBuffer, data...)
// 处理缓冲区中的完整PS包
for {
syncedData, remaining := p.extractSynchronizedPSData(p.psBuffer)
if syncedData == nil {
// 没有找到完整的PS包保留剩余数据等待更多数据
p.psBuffer = remaining
break
}
// 发送同步后的PS数据到处理通道
select {
case p.PSMouth <- syncedData:
// 成功发送数据到PS处理通道
// fmt.Printf("发送同步PS数据到通道长度: %d\n", len(syncedData))
default:
// 通道满了,跳过这个数据包
fmt.Printf("PS通道满了跳过数据包当前缓冲区大小: %d/%d\n", len(p.PSMouth), cap(p.PSMouth))
// 跳过当前数据包,但不返回错误,避免阻塞
}
// 更新缓冲区为剩余数据
p.psBuffer = remaining
}
return nil
}
// extractSynchronizedPSData 从缓冲区中提取同步的PS数据包
func (p *Receiver) extractSynchronizedPSData(buffer []byte) ([]byte, []byte) {
if len(buffer) < 4 {
return nil, buffer // 数据不足,返回所有数据等待更多
}
// 寻找PS起始码
startIndex := -1
for i := 0; i <= len(buffer)-4; i++ {
if buffer[i] == 0x00 && buffer[i+1] == 0x00 && buffer[i+2] == 0x01 {
// 检查第四个字节是否为有效的PS起始码
startCode := uint32(buffer[i])<<24 | uint32(buffer[i+1])<<16 |
uint32(buffer[i+2])<<8 | uint32(buffer[i+3])
switch startCode {
case mpegps.StartCodePS, mpegps.StartCodeVideo, mpegps.StartCodeVideo1,
mpegps.StartCodeVideo2, mpegps.StartCodeAudio, mpegps.StartCodeMAP,
mpegps.StartCodeSYS, mpegps.PrivateStreamCode:
startIndex = i
// fmt.Println("在数据源头找到PS起始码:", fmt.Sprintf("0x%08x", startCode))
goto found
}
}
}
found:
if startIndex == -1 {
// 没有找到有效起始码
if len(buffer) > 3 {
// 保留最后3个字节丢弃其余数据
return nil, buffer[len(buffer)-3:]
}
return nil, buffer
}
// 寻找下一个起始码来确定当前包的结束位置
nextStartIndex := -1
for i := startIndex + 4; i <= len(buffer)-4; i++ {
if buffer[i] == 0x00 && buffer[i+1] == 0x00 && buffer[i+2] == 0x01 {
startCode := uint32(buffer[i])<<24 | uint32(buffer[i+1])<<16 |
uint32(buffer[i+2])<<8 | uint32(buffer[i+3])
switch startCode {
case mpegps.StartCodePS, mpegps.StartCodeVideo, mpegps.StartCodeVideo1,
mpegps.StartCodeVideo2, mpegps.StartCodeAudio, mpegps.StartCodeMAP,
mpegps.StartCodeSYS, mpegps.PrivateStreamCode:
nextStartIndex = i
goto nextFound
}
}
}
nextFound:
if nextStartIndex == -1 {
// 没有找到下一个起始码,返回从当前起始码到缓冲区末尾的所有数据
return buffer[startIndex:], nil
}
// 返回从当前起始码到下一个起始码之间的数据
return buffer[startIndex:nextStartIndex], buffer[nextStartIndex:]
}

493
plugin/hiksdk/pkg/video.go Normal file
View File

@@ -0,0 +1,493 @@
package pkg
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"slices"
"time"
"unsafe"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
. "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
type (
H26xCtx struct {
RTPCtx
seq uint16
dtsEst util.DTSEstimator
}
H264Ctx struct {
H26xCtx
*codec.H264Ctx
}
H265Ctx struct {
H26xCtx
*codec.H265Ctx
DONL bool
}
AV1Ctx struct {
RTPCtx
*codec.AV1Ctx
}
VP9Ctx struct {
RTPCtx
}
VideoFrame struct {
RTPData
}
)
var (
_ IAVFrame = (*VideoFrame)(nil)
_ IVideoCodecCtx = (*H264Ctx)(nil)
_ IVideoCodecCtx = (*H265Ctx)(nil)
_ IVideoCodecCtx = (*AV1Ctx)(nil)
)
const (
H265_NALU_AP = h265parser.NAL_UNIT_UNSPECIFIED_48
H265_NALU_FU = h265parser.NAL_UNIT_UNSPECIFIED_49
startBit = 1 << 7
endBit = 1 << 6
MTUSize = 1460
)
func (r *VideoFrame) Parse(data IAVFrame) (err error) {
input := data.(*VideoFrame)
r.Packets = append(r.Packets[:0], input.Packets...)
return
}
func (r *VideoFrame) Recycle() {
r.RecyclableMemory.Recycle()
r.Packets.Reset()
}
func (r *VideoFrame) CheckCodecChange() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
old := r.ICodecCtx
// 解复用数据
if err = r.Demux(); err != nil {
return
}
// 处理时间戳和序列号
pts := r.Packets[0].Timestamp
nalus := r.Raw.(*Nalus)
switch ctx := old.(type) {
case *H264Ctx:
dts := ctx.dtsEst.Feed(pts)
r.SetDTS(time.Duration(dts))
r.SetPTS(time.Duration(pts))
// 检查 SPS、PPS 和 IDR 帧
var sps, pps []byte
var hasSPSPPS bool
for nalu := range nalus.RangePoint {
nalType := codec.ParseH264NALUType(nalu.Buffers[0][0])
switch nalType {
case h264parser.NALU_SPS:
sps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h264parser.NALU_PPS:
pps = nalu.ToBytes()
defer nalus.Remove(nalu)
case codec.NALU_IDR_Picture:
r.IDR = true
}
}
// 如果发现新的 SPS/PPS更新编解码器上下文
if hasSPSPPS = sps != nil && pps != nil; hasSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) {
var newCodecData h264parser.CodecData
if newCodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
return
}
newCtx := &H264Ctx{
H26xCtx: ctx.H26xCtx,
H264Ctx: &codec.H264Ctx{
CodecData: newCodecData,
},
}
// 保持原有的 RTP 参数
if oldCtx, ok := old.(*H264Ctx); ok {
newCtx.RTPCtx = oldCtx.RTPCtx
}
r.ICodecCtx = newCtx
} else {
// 如果是 IDR 帧但没有 SPS/PPS需要插入
if r.IDR && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 {
spsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP)
}
}
// 更新序列号
for p := range r.Packets.RangePoint {
p.SequenceNumber = ctx.seq
ctx.seq++
}
case *H265Ctx:
dts := ctx.dtsEst.Feed(pts)
r.SetDTS(time.Duration(dts))
r.SetPTS(time.Duration(pts))
// 检查 VPS、SPS、PPS 和 IDR 帧
var vps, sps, pps []byte
var hasVPSSPSPPS bool
for nalu := range nalus.RangePoint {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
vps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_SPS:
sps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_PPS:
pps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
r.IDR = true
}
}
// 如果发现新的 VPS/SPS/PPS更新编解码器上下文
if hasVPSSPSPPS = vps != nil && sps != nil && pps != nil; hasVPSSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(vps, ctx.VPS()) || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) {
var newCodecData h265parser.CodecData
if newCodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps); err != nil {
return
}
newCtx := &H265Ctx{
H26xCtx: ctx.H26xCtx,
H265Ctx: &codec.H265Ctx{
CodecData: newCodecData,
},
}
// 保持原有的 RTP 参数
if oldCtx, ok := old.(*H265Ctx); ok {
newCtx.RTPCtx = oldCtx.RTPCtx
}
r.ICodecCtx = newCtx
} else {
// 如果是 IDR 帧但没有 VPS/SPS/PPS需要插入
if r.IDR && len(ctx.VPS()) > 0 && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 {
vpsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.VPS(),
}
spsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP)
}
}
// 更新序列号
for p := range r.Packets.RangePoint {
p.SequenceNumber = ctx.seq
ctx.seq++
}
}
return
}
func (h264 *H264Ctx) GetInfo() string {
return h264.SDPFmtpLine
}
func (h265 *H265Ctx) GetInfo() string {
return h265.SDPFmtpLine
}
func (av1 *AV1Ctx) GetInfo() string {
return av1.SDPFmtpLine
}
func (r *VideoFrame) Mux(baseFrame *Sample) error {
// 获取编解码器上下文
codecCtx := r.ICodecCtx
if codecCtx == nil {
switch base := baseFrame.GetBase().(type) {
case *codec.H264Ctx:
var ctx H264Ctx
ctx.H264Ctx = base
ctx.PayloadType = 96
ctx.MimeType = webrtc.MimeTypeH264
ctx.ClockRate = 90000
spsInfo := ctx.SPSInfo
ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc)
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
codecCtx = &ctx
case *codec.H265Ctx:
var ctx H265Ctx
ctx.H265Ctx = base
ctx.PayloadType = 98
ctx.MimeType = webrtc.MimeTypeH265
ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), base64.StdEncoding.EncodeToString(ctx.VPS()))
ctx.ClockRate = 90000
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
codecCtx = &ctx
}
r.ICodecCtx = codecCtx
}
// 获取时间戳信息
pts := uint32(baseFrame.GetPTS())
switch c := codecCtx.(type) {
case *H264Ctx:
ctx := &c.RTPCtx
var lastPacket *rtp.Packet
if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 {
r.Append(ctx, pts, c.SPS())
r.Append(ctx, pts, c.PPS())
}
for nalu := range baseFrame.Raw.(*Nalus).RangePoint {
if reader := nalu.NewReader(); reader.Length > MTUSize {
payloadLen := MTUSize
if reader.Length+1 < payloadLen {
payloadLen = reader.Length + 1
}
//fu-a
mem := r.NextN(payloadLen)
reader.Read(mem[1:])
fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f
mem[0], mem[1] = fuaHead, naluType|startBit
lastPacket = r.Append(ctx, pts, mem)
for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) {
if reader.Length+2 < payloadLen {
payloadLen = reader.Length + 2
}
mem = r.NextN(payloadLen)
reader.Read(mem[2:])
mem[0], mem[1] = fuaHead, naluType
}
lastPacket.Payload[1] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
}
lastPacket.Header.Marker = true
case *H265Ctx:
ctx := &c.RTPCtx
var lastPacket *rtp.Packet
if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 && len(c.RecordInfo.VPS) > 0 {
r.Append(ctx, pts, c.VPS())
r.Append(ctx, pts, c.SPS())
r.Append(ctx, pts, c.PPS())
}
for nalu := range baseFrame.Raw.(*Nalus).RangePoint {
if reader := nalu.NewReader(); reader.Length > MTUSize {
var b0, b1 byte
_ = reader.ReadByteTo(&b0, &b1)
//fu
naluType := byte(codec.ParseH265NALUType(b0))
b0 = (byte(H265_NALU_FU) << 1) | (b0 & 0b10000001)
payloadLen := MTUSize
if reader.Length+3 < payloadLen {
payloadLen = reader.Length + 3
}
mem := r.NextN(payloadLen)
reader.Read(mem[3:])
mem[0], mem[1], mem[2] = b0, b1, naluType|startBit
lastPacket = r.Append(ctx, pts, mem)
for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) {
if reader.Length+3 < payloadLen {
payloadLen = reader.Length + 3
}
mem = r.NextN(payloadLen)
reader.Read(mem[3:])
mem[0], mem[1], mem[2] = b0, b1, naluType
}
lastPacket.Payload[2] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
}
lastPacket.Header.Marker = true
}
return nil
}
func (r *VideoFrame) Demux() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
switch c := r.ICodecCtx.(type) {
case *H264Ctx:
nalus := r.GetNalus()
nalu := nalus.GetNextPointer()
var naluType codec.H264NALUType
gotNalu := func() {
if nalu.Size > 0 {
nalu = nalus.GetNextPointer()
}
}
for packet := range r.Packets.RangePoint {
if len(packet.Payload) < 2 {
continue
}
if packet.Padding {
packet.Padding = false
}
if len(packet.Payload) == 0 {
continue
}
b0 := packet.Payload[0]
if t := codec.ParseH264NALUType(b0); t < 24 {
nalu.PushOne(packet.Payload)
gotNalu()
} else {
offset := t.Offset()
switch t {
case codec.NALU_STAPA, codec.NALU_STAPB:
if len(packet.Payload) <= offset {
return fmt.Errorf("invalid nalu size %d", len(packet.Payload))
}
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu.PushOne(buffer.ReadN(nextSize))
gotNalu()
} else {
return fmt.Errorf("invalid nalu size %d", nextSize)
}
}
case codec.NALU_FUA, codec.NALU_FUB:
b1 := packet.Payload[1]
if util.Bit1(b1, 0) {
naluType.Parse(b1)
nalu.PushOne([]byte{naluType.Or(b0 & 0x60)})
}
if nalu.Size > 0 {
nalu.PushOne(packet.Payload[offset:])
} else {
continue
}
if util.Bit1(b1, 1) {
gotNalu()
}
default:
return fmt.Errorf("unsupported nalu type %d", t)
}
}
}
nalus.Reduce()
return nil
case *H265Ctx:
nalus := r.GetNalus()
nalu := nalus.GetNextPointer()
gotNalu := func() {
if nalu.Size > 0 {
nalu = nalus.GetNextPointer()
}
}
for _, packet := range r.Packets {
if len(packet.Payload) == 0 {
continue
}
b0 := packet.Payload[0]
if t := codec.ParseH265NALUType(b0); t < H265_NALU_AP {
nalu.PushOne(packet.Payload)
gotNalu()
} else {
var buffer = util.Buffer(packet.Payload)
switch t {
case H265_NALU_AP:
buffer.ReadUint16()
if c.DONL {
buffer.ReadUint16()
}
for buffer.CanRead() {
nalu.PushOne(buffer.ReadN(int(buffer.ReadUint16())))
gotNalu()
}
if c.DONL {
buffer.ReadByte()
}
case H265_NALU_FU:
if buffer.Len() < 3 {
return io.ErrShortBuffer
}
first3 := buffer.ReadN(3)
fuHeader := first3[2]
if c.DONL {
buffer.ReadUint16()
}
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
nalu.PushOne([]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]})
}
nalu.PushOne(buffer)
if util.Bit1(fuHeader, 1) {
gotNalu()
}
default:
return fmt.Errorf("unsupported nalu type %d", t)
}
}
}
nalus.Reduce()
return nil
}
return ErrUnsupportCodec
}

15
plugin/hiksdk/readme.md Normal file
View File

@@ -0,0 +1,15 @@
需要将lib下的linux或windows下的文件复制到main.go同级目录下,才能正常运行
配置文件
hik:
loglevel: debug //根据实际情况修改
client:
- ip: "172.16.9.35"
username: "admin"
password: "123456"
port: 8000
- ip: "172.16.9.200"
username: "admin"
password: "123456"
port: 8000

View File

@@ -449,11 +449,11 @@ func (nc *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
if sid, ok := msg.(HaveStreamID); ok {
head.MessageStreamID = sid.GetStreamID()
}
nc.SetWriteDeadline(time.Now().Add(time.Second * 5)) // 设置写入超时时间为5秒
return nc.sendChunk(util.NewMemory(nc.tmpBuf), head, RTMP_CHUNK_HEAD_12)
}
func (nc *NetConnection) sendChunk(mem util.Memory, head *ChunkHeader, headType byte) (err error) {
nc.SetWriteDeadline(time.Now().Add(time.Second * 5)) // 设置写入超时时间为5秒
head.WriteTo(headType, &nc.chunkHeaderBuf)
defer func(reuse net.Buffers) {
nc.sendBuffers = reuse

View File

@@ -16,7 +16,6 @@ type IRTPReader interface {
type RTPUDPReader struct {
io.Reader
RTPReorder[*rtp.Packet]
UdpCacheSize int
}
func NewRTPUDPReader(r io.Reader) *RTPUDPReader {
@@ -24,11 +23,11 @@ func NewRTPUDPReader(r io.Reader) *RTPUDPReader {
}
func (r *RTPUDPReader) Read(packet *rtp.Packet) error {
for {
ordered := r.Pop()
var ordered *rtp.Packet
for ordered == nil {
ordered = r.Pop()
if ordered != nil {
*packet = *ordered
return nil
break
}
var buf [MTUSize]byte
var pack rtp.Packet
@@ -40,8 +39,10 @@ func (r *RTPUDPReader) Read(packet *rtp.Packet) error {
if err != nil {
return err
}
r.Push(pack.SequenceNumber, &pack)
ordered = r.Push(pack.SequenceNumber, &pack)
}
*packet = *ordered
return nil
}
type RTPTCPReader struct {