feat: jt1078支持2019版本20位sim卡号推流

This commit is contained in:
ydajiang
2025-07-09 16:42:16 +08:00
parent 111d2121e2
commit 0247cff0e9
8 changed files with 107 additions and 44 deletions

View File

@@ -45,7 +45,9 @@
"jt1078": { "jt1078": {
"enable": true, "enable": true,
"port": 1078 "port": 1078,
"?port_2019": "2019版本协议sim卡号20位, 单独启动一个收流端口",
"port_2019": 1079
}, },
"record": { "record": {

View File

@@ -11,6 +11,7 @@ type Demuxer struct {
sim string sim string
channel int channel int
lastError string lastError string
version int
} }
func (d *Demuxer) ProcessPrevPacket() error { func (d *Demuxer) ProcessPrevPacket() error {
@@ -42,7 +43,9 @@ func (d *Demuxer) ProcessPrevPacket() error {
} }
func (d *Demuxer) Input(data []byte) (int, error) { func (d *Demuxer) Input(data []byte) (int, error) {
packet := Packet{} packet := Packet{
version: d.version,
}
if err := packet.Unmarshal(data); err != nil { if err := packet.Unmarshal(data); err != nil {
return 0, err return 0, err
} else if len(packet.payload) == 0 { } else if len(packet.payload) == 0 {
@@ -86,12 +89,13 @@ func (d *Demuxer) Input(data []byte) (int, error) {
return len(data), nil return len(data), nil
} }
func NewDemuxer() *Demuxer { func NewDemuxer(version int) *Demuxer {
return &Demuxer{ return &Demuxer{
BaseDemuxer: avformat.BaseDemuxer{ BaseDemuxer: avformat.BaseDemuxer{
DataPipeline: &avformat.StreamsBuffer{}, DataPipeline: &avformat.StreamsBuffer{},
Name: "jt1078", // vob Name: "jt1078", // vob
AutoFree: false, AutoFree: false,
}, },
version: version,
} }
} }

View File

@@ -30,8 +30,10 @@ const (
// Packet 1078-2016音视频和透传包. 视频帧包头26个字节, 音频帧22个字节, 透传数据14个字节. // Packet 1078-2016音视频和透传包. 视频帧包头26个字节, 音频帧22个字节, 透传数据14个字节.
type Packet struct { type Packet struct {
version int // 2016-sim卡号6字节长度/2019-sim卡号10字节长度
pt byte pt byte
packetType byte packetType byte
seq uint16
ts uint64 ts uint64
subMark byte subMark byte
simNumber string simNumber string
@@ -41,46 +43,65 @@ type Packet struct {
func (p *Packet) Unmarshal(data []byte) error { func (p *Packet) Unmarshal(data []byte) error {
length := len(data) length := len(data)
if length < 12 {
return fmt.Errorf("invaild data") var packetType byte
if p.version == 2019 {
if length < 16 {
return fmt.Errorf("invaild data")
}
packetType = data[15] >> 4 & 0x0F
} else {
if length < 12 {
return fmt.Errorf("invaild data")
}
packetType = data[11] >> 4 & 0x0F
} }
packetType := data[11] >> 4 & 0x0F var minSize int
if packetType < AudioFrameMark { if packetType < AudioFrameMark {
if length < 26 { minSize = 26
return fmt.Errorf("invaild data")
}
} else if AudioFrameMark == packetType { } else if AudioFrameMark == packetType {
if length < 22 { minSize = 22
return fmt.Errorf("invaild data")
}
} else if TransmissionDataMark == packetType { } else if TransmissionDataMark == packetType {
if length < 14 { minSize = 14
return fmt.Errorf("invaild data")
}
} else { } else {
return fmt.Errorf("unknown packet type %x", packetType) return fmt.Errorf("unknown packet type %x", packetType)
} }
simNumberLength := 6
if p.version == 2019 {
minSize += 4
simNumberLength += 4
}
if length < minSize {
return fmt.Errorf("invaild data")
}
// x扩展位,固定为0 // x扩展位,固定为0
_ = data[0] >> 4 & 0x1 _ = data[0] >> 4 & 0x1
pt := data[1] & 0x7F pt := data[1] & 0x7F
// seq // seq
_ = binary.BigEndian.Uint16(data[2:]) seq := binary.BigEndian.Uint16(data[2:])
var simNumber string var simNumber string
for i := 4; i < 10; i++ { n := 4
simNumber += fmt.Sprintf("%02x", data[i]) for ; simNumberLength > 0; simNumberLength-- {
simNumber += fmt.Sprintf("%02x", data[n])
n++
} }
simNumber = strings.TrimLeft(simNumber, "0") simNumber = strings.TrimLeft(simNumber, "0")
// channel // channel
channelNumber := data[10] channelNumber := data[n]
n++
// subMark // subMark
subMark := data[11] & 0x0F subMark := data[n] & 0x0F
n++
// 时间戳,单位ms // 时间戳,单位ms
var ts uint64 var ts uint64
n := 12
// 音视频帧才有时间戳字段 // 音视频帧才有时间戳字段
if TransmissionDataMark != packetType { if TransmissionDataMark != packetType {
ts = binary.BigEndian.Uint64(data[n:]) ts = binary.BigEndian.Uint64(data[n:])
@@ -103,6 +124,7 @@ func (p *Packet) Unmarshal(data []byte) error {
p.pt = pt p.pt = pt
p.packetType = packetType p.packetType = packetType
p.seq = seq
p.ts = ts p.ts = ts
p.simNumber = simNumber p.simNumber = simNumber
p.channelNumber = channelNumber p.channelNumber = channelNumber

View File

@@ -16,11 +16,12 @@ type Server interface {
type jtServer struct { type jtServer struct {
stream.StreamServer[*Session] stream.StreamServer[*Session]
tcp *transport.TCPServer tcp *transport.TCPServer
version int
} }
func (s *jtServer) OnNewSession(conn net.Conn) *Session { func (s *jtServer) OnNewSession(conn net.Conn) *Session {
return NewSession(conn) return NewSession(conn, s.version)
} }
func (s *jtServer) OnCloseSession(session *Session) { func (s *jtServer) OnCloseSession(session *Session) {
@@ -57,8 +58,10 @@ func (s *jtServer) Close() {
panic("implement me") panic("implement me")
} }
func NewServer() Server { func NewServer(version int) Server {
j := &jtServer{} j := &jtServer{
version: version,
}
j.StreamServer = stream.StreamServer[*Session]{ j.StreamServer = stream.StreamServer[*Session]{
SourceType: stream.SourceType1078, SourceType: stream.SourceType1078,
Handler: j, Handler: j,

View File

@@ -65,13 +65,13 @@ func (s *Session) Close() {
stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)]) stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)])
} }
func NewSession(conn net.Conn) *Session { func NewSession(conn net.Conn, version int) *Session {
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64} delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
session := Session{ session := Session{
PublishSource: stream.PublishSource{ PublishSource: stream.PublishSource{
Conn: conn, Conn: conn,
Type: stream.SourceType1078, Type: stream.SourceType1078,
TransDemuxer: NewDemuxer(), TransDemuxer: NewDemuxer(version),
}, },
decoder: transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:]), decoder: transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:]),

View File

@@ -81,9 +81,9 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) {
} }
} }
func publish(path string) { func publish(path string, port string) {
client := transport.TCPClient{} client := transport.TCPClient{}
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078") addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("127.0.0.1", port))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -106,9 +106,27 @@ func publish(path string) {
} }
} }
func PackType2String(t int) string {
if t == AudioFrameMark {
return "audio"
} else if t == VideoBFrameMark {
return "b frame"
} else if t == VideoIFrameMark {
return "i frame"
} else if t == VideoPFrameMark {
return "p frame"
} else if t == TransmissionDataMark {
return "transmission"
} else {
return "unknown"
}
}
func TestPublish(t *testing.T) { func TestPublish(t *testing.T) {
t.Run("decode_1078_data", func(t *testing.T) { t.Run("decode_1078_data", func(t *testing.T) {
data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659") //data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659")
data, err := os.ReadFile("../dump/jt1078-127.0.0.1.5472")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -140,7 +158,7 @@ func TestPublish(t *testing.T) {
panic(err) panic(err)
} }
fmt.Printf("1078 packet ts: %d\r\n", packet.ts) fmt.Printf("1078 packet seq: %d type: %s ts: %d\r\n", packet.seq, PackType2String(int(packet.packetType)), packet.ts)
} }
j += size j += size
@@ -150,8 +168,12 @@ func TestPublish(t *testing.T) {
t.Run("publish", func(t *testing.T) { t.Run("publish", func(t *testing.T) {
path := "../../source_files/10352264314-2.bin" path := "../../source_files/10352264314-2.bin"
//path := "../../source_files/013800138000-1.bin" publish(path, "1078")
publish(path) })
t.Run("publish_2019", func(t *testing.T) {
path := "../../source_files/jt_1078_2019.raw"
publish(path, "1079")
}) })
// 1078->ps->rtp // 1078->ps->rtp
@@ -176,7 +198,7 @@ func TestPublish(t *testing.T) {
panic(err) panic(err)
} }
demuxer := NewDemuxer() demuxer := NewDemuxer(2016)
demuxer.SetHandler(&Handler{ demuxer.SetHandler(&Handler{
muxer: mpeg.NewPsMuxer(), muxer: mpeg.NewPsMuxer(),
buffer: make([]byte, 1024*1024*2), buffer: make([]byte, 1024*1024*2),
@@ -240,7 +262,7 @@ func TestPublish(t *testing.T) {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
go publish(path) go publish(path, "1078")
}) })
server := &http.Server{ server := &http.Server{

26
main.go
View File

@@ -163,18 +163,26 @@ func main() {
} }
if stream.AppConfig.JT1078.Enable { if stream.AppConfig.JT1078.Enable {
jtAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(stream.AppConfig.JT1078.Port)) // 无法通过包头区分2016和2019, 每个版本创建一个Server
if err != nil { ports := [][2]int{{stream.AppConfig.JT1078.Port, 2016}}
panic(err) if stream.AppConfig.JT1078.Port2019 > 0 {
ports = append(ports, [2]int{stream.AppConfig.JT1078.Port2019, 2019})
} }
server := jt1078.NewServer() for _, port := range ports {
err = server.Start(jtAddr) jtAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(port[0]))
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) server := jt1078.NewServer(port[1])
err = server.Start(jtAddr)
if err != nil {
panic(err)
}
log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String())
}
} }
if stream.AppConfig.Hooks.IsEnableOnStarted() { if stream.AppConfig.Hooks.IsEnableOnStarted() {

View File

@@ -72,6 +72,8 @@ type HlsConfig struct {
type JT1078Config struct { type JT1078Config struct {
enableConfig enableConfig
portConfig portConfig
Port2019 int `json:"port_2019"`
} }
type RtspConfig struct { type RtspConfig struct {