From 0247cff0e9070671d781d997f33c967a50ea4824 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Wed, 9 Jul 2025 16:42:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20jt1078=E6=94=AF=E6=8C=812019=E7=89=88?= =?UTF-8?q?=E6=9C=AC20=E4=BD=8Dsim=E5=8D=A1=E5=8F=B7=E6=8E=A8=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 4 ++- jt1078/jt_demuxer.go | 8 ++++-- jt1078/jt_packet.go | 58 ++++++++++++++++++++++++++++++-------------- jt1078/jt_server.go | 11 ++++++--- jt1078/jt_session.go | 4 +-- jt1078/jt_test.go | 38 +++++++++++++++++++++++------ main.go | 26 +++++++++++++------- stream/config.go | 2 ++ 8 files changed, 107 insertions(+), 44 deletions(-) diff --git a/config.json b/config.json index ba1d9f0..4209336 100644 --- a/config.json +++ b/config.json @@ -45,7 +45,9 @@ "jt1078": { "enable": true, - "port": 1078 + "port": 1078, + "?port_2019": "2019版本协议sim卡号20位, 单独启动一个收流端口", + "port_2019": 1079 }, "record": { diff --git a/jt1078/jt_demuxer.go b/jt1078/jt_demuxer.go index d5d2e58..226be6a 100644 --- a/jt1078/jt_demuxer.go +++ b/jt1078/jt_demuxer.go @@ -11,6 +11,7 @@ type Demuxer struct { sim string channel int lastError string + version int } func (d *Demuxer) ProcessPrevPacket() error { @@ -42,7 +43,9 @@ func (d *Demuxer) ProcessPrevPacket() error { } func (d *Demuxer) Input(data []byte) (int, error) { - packet := Packet{} + packet := Packet{ + version: d.version, + } if err := packet.Unmarshal(data); err != nil { return 0, err } else if len(packet.payload) == 0 { @@ -86,12 +89,13 @@ func (d *Demuxer) Input(data []byte) (int, error) { return len(data), nil } -func NewDemuxer() *Demuxer { +func NewDemuxer(version int) *Demuxer { return &Demuxer{ BaseDemuxer: avformat.BaseDemuxer{ DataPipeline: &avformat.StreamsBuffer{}, Name: "jt1078", // vob AutoFree: false, }, + version: version, } } diff --git a/jt1078/jt_packet.go b/jt1078/jt_packet.go index 2cd7036..8b6d967 100644 --- a/jt1078/jt_packet.go +++ b/jt1078/jt_packet.go @@ -30,8 +30,10 @@ const ( // Packet 1078-2016音视频和透传包. 视频帧包头26个字节, 音频帧22个字节, 透传数据14个字节. type Packet struct { + version int // 2016-sim卡号6字节长度/2019-sim卡号10字节长度 pt byte packetType byte + seq uint16 ts uint64 subMark byte simNumber string @@ -41,46 +43,65 @@ type Packet struct { func (p *Packet) Unmarshal(data []byte) error { length := len(data) - if length < 12 { - return fmt.Errorf("invaild data") + + var packetType byte + if p.version == 2019 { + if length < 16 { + return fmt.Errorf("invaild data") + } + + packetType = data[15] >> 4 & 0x0F + } else { + if length < 12 { + return fmt.Errorf("invaild data") + } + + packetType = data[11] >> 4 & 0x0F } - packetType := data[11] >> 4 & 0x0F + var minSize int if packetType < AudioFrameMark { - if length < 26 { - return fmt.Errorf("invaild data") - } + minSize = 26 } else if AudioFrameMark == packetType { - if length < 22 { - return fmt.Errorf("invaild data") - } + minSize = 22 } else if TransmissionDataMark == packetType { - if length < 14 { - return fmt.Errorf("invaild data") - } + minSize = 14 } else { return fmt.Errorf("unknown packet type %x", packetType) } + simNumberLength := 6 + if p.version == 2019 { + minSize += 4 + simNumberLength += 4 + } + + if length < minSize { + return fmt.Errorf("invaild data") + } + // x扩展位,固定为0 _ = data[0] >> 4 & 0x1 pt := data[1] & 0x7F // seq - _ = binary.BigEndian.Uint16(data[2:]) + seq := binary.BigEndian.Uint16(data[2:]) var simNumber string - for i := 4; i < 10; i++ { - simNumber += fmt.Sprintf("%02x", data[i]) + n := 4 + for ; simNumberLength > 0; simNumberLength-- { + simNumber += fmt.Sprintf("%02x", data[n]) + n++ } simNumber = strings.TrimLeft(simNumber, "0") // channel - channelNumber := data[10] + channelNumber := data[n] + n++ // subMark - subMark := data[11] & 0x0F + subMark := data[n] & 0x0F + n++ // 时间戳,单位ms var ts uint64 - n := 12 // 音视频帧才有时间戳字段 if TransmissionDataMark != packetType { ts = binary.BigEndian.Uint64(data[n:]) @@ -103,6 +124,7 @@ func (p *Packet) Unmarshal(data []byte) error { p.pt = pt p.packetType = packetType + p.seq = seq p.ts = ts p.simNumber = simNumber p.channelNumber = channelNumber diff --git a/jt1078/jt_server.go b/jt1078/jt_server.go index 43ff7d2..671ced9 100644 --- a/jt1078/jt_server.go +++ b/jt1078/jt_server.go @@ -16,11 +16,12 @@ type Server interface { type jtServer struct { stream.StreamServer[*Session] - tcp *transport.TCPServer + tcp *transport.TCPServer + version int } func (s *jtServer) OnNewSession(conn net.Conn) *Session { - return NewSession(conn) + return NewSession(conn, s.version) } func (s *jtServer) OnCloseSession(session *Session) { @@ -57,8 +58,10 @@ func (s *jtServer) Close() { panic("implement me") } -func NewServer() Server { - j := &jtServer{} +func NewServer(version int) Server { + j := &jtServer{ + version: version, + } j.StreamServer = stream.StreamServer[*Session]{ SourceType: stream.SourceType1078, Handler: j, diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index fefcaaf..3e36709 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -65,13 +65,13 @@ func (s *Session) Close() { stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)]) } -func NewSession(conn net.Conn) *Session { +func NewSession(conn net.Conn, version int) *Session { delimiter := [4]byte{0x30, 0x31, 0x63, 0x64} session := Session{ PublishSource: stream.PublishSource{ Conn: conn, Type: stream.SourceType1078, - TransDemuxer: NewDemuxer(), + TransDemuxer: NewDemuxer(version), }, decoder: transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:]), diff --git a/jt1078/jt_test.go b/jt1078/jt_test.go index 60d648c..9a1993e 100644 --- a/jt1078/jt_test.go +++ b/jt1078/jt_test.go @@ -81,9 +81,9 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) { } } -func publish(path string) { +func publish(path string, port string) { client := transport.TCPClient{} - addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078") + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("127.0.0.1", port)) if err != nil { panic(err) } @@ -106,9 +106,27 @@ func publish(path string) { } } +func PackType2String(t int) string { + if t == AudioFrameMark { + return "audio" + } else if t == VideoBFrameMark { + return "b frame" + } else if t == VideoIFrameMark { + return "i frame" + } else if t == VideoPFrameMark { + return "p frame" + } else if t == TransmissionDataMark { + return "transmission" + } else { + return "unknown" + } +} + func TestPublish(t *testing.T) { t.Run("decode_1078_data", func(t *testing.T) { - data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659") + //data, err := os.ReadFile("../dump/jt1078-127.0.0.1.50659") + data, err := os.ReadFile("../dump/jt1078-127.0.0.1.5472") + if err != nil { panic(err) } @@ -140,7 +158,7 @@ func TestPublish(t *testing.T) { panic(err) } - fmt.Printf("1078 packet ts: %d\r\n", packet.ts) + fmt.Printf("1078 packet seq: %d type: %s ts: %d\r\n", packet.seq, PackType2String(int(packet.packetType)), packet.ts) } j += size @@ -150,8 +168,12 @@ func TestPublish(t *testing.T) { t.Run("publish", func(t *testing.T) { path := "../../source_files/10352264314-2.bin" - //path := "../../source_files/013800138000-1.bin" - publish(path) + publish(path, "1078") + }) + + t.Run("publish_2019", func(t *testing.T) { + path := "../../source_files/jt_1078_2019.raw" + publish(path, "1079") }) // 1078->ps->rtp @@ -176,7 +198,7 @@ func TestPublish(t *testing.T) { panic(err) } - demuxer := NewDemuxer() + demuxer := NewDemuxer(2016) demuxer.SetHandler(&Handler{ muxer: mpeg.NewPsMuxer(), buffer: make([]byte, 1024*1024*2), @@ -240,7 +262,7 @@ func TestPublish(t *testing.T) { } w.WriteHeader(http.StatusOK) - go publish(path) + go publish(path, "1078") }) server := &http.Server{ diff --git a/main.go b/main.go index ae2e3b9..1417dda 100644 --- a/main.go +++ b/main.go @@ -163,18 +163,26 @@ func main() { } if stream.AppConfig.JT1078.Enable { - jtAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(stream.AppConfig.JT1078.Port)) - if err != nil { - panic(err) + // 无法通过包头区分2016和2019, 每个版本创建一个Server + ports := [][2]int{{stream.AppConfig.JT1078.Port, 2016}} + if stream.AppConfig.JT1078.Port2019 > 0 { + ports = append(ports, [2]int{stream.AppConfig.JT1078.Port2019, 2019}) } - server := jt1078.NewServer() - err = server.Start(jtAddr) - if err != nil { - panic(err) - } + for _, port := range ports { + jtAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(port[0])) + if err != nil { + panic(err) + } - log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) + server := jt1078.NewServer(port[1]) + err = server.Start(jtAddr) + if err != nil { + panic(err) + } + + log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) + } } if stream.AppConfig.Hooks.IsEnableOnStarted() { diff --git a/stream/config.go b/stream/config.go index 634f7cd..ab0c2a4 100644 --- a/stream/config.go +++ b/stream/config.go @@ -72,6 +72,8 @@ type HlsConfig struct { type JT1078Config struct { enableConfig portConfig + + Port2019 int `json:"port_2019"` } type RtspConfig struct {