From 36de8949304e0274d1338d0759533a9e8995597c Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Sun, 2 Jun 2024 11:58:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=811078=E6=8E=A8=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 4 + go.mod | 10 +- jt1078/jt_server.go | 60 +++++++++ jt1078/jt_session.go | 284 +++++++++++++++++++++++++++++++++++++++++ jt1078/jt_test.go | 39 ++++++ main.go | 23 +++- stream/config.go | 7 + stream/trans_stream.go | 8 +- 8 files changed, 426 insertions(+), 9 deletions(-) create mode 100644 jt1078/jt_server.go create mode 100644 jt1078/jt_session.go create mode 100644 jt1078/jt_test.go diff --git a/config.json b/config.json index 16e3615..8797a7f 100644 --- a/config.json +++ b/config.json @@ -38,6 +38,10 @@ "transport": "UDP|TCP" }, + "1078": { + "addr": "0.0.0.0:1078" + }, + "record": { "format": "mp4", "path": "" diff --git a/go.mod b/go.mod index 3b0cc5c..4f80d52 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,13 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/pion/interceptor v0.1.25 + github.com/pion/logging v0.2.2 + github.com/pion/rtcp v1.2.14 + github.com/pion/rtp v1.8.5 github.com/pion/webrtc/v3 v3.2.29 github.com/sirupsen/logrus v1.9.3 + github.com/stretchr/testify v1.9.0 github.com/x-cray/logrus-prefixed-formatter v0.5.2 go.uber.org/zap v1.27.0 ) @@ -22,12 +27,8 @@ require ( github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.7 // indirect github.com/pion/ice/v2 v2.3.13 // indirect - github.com/pion/interceptor v0.1.25 // indirect - github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.12 // indirect - github.com/pion/rtp v1.8.3 // indirect github.com/pion/sctp v1.8.12 // indirect github.com/pion/sdp/v3 v3.0.8 // indirect github.com/pion/srtp/v2 v2.0.18 // indirect @@ -35,7 +36,6 @@ require ( github.com/pion/transport/v2 v2.2.3 // indirect github.com/pion/turn/v2 v2.1.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.9.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/net v0.20.0 // indirect diff --git a/jt1078/jt_server.go b/jt1078/jt_server.go new file mode 100644 index 0000000..7e9e9c3 --- /dev/null +++ b/jt1078/jt_server.go @@ -0,0 +1,60 @@ +package jt1078 + +import ( + "github.com/yangjiechina/avformat/transport" + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/live-server/log" + "github.com/yangjiechina/live-server/stream" + "net" +) + +type Server interface { + Start(addr net.Addr) error + + Close() +} + +type jtServer struct { + tcp *transport.TCPServer +} + +func NewServer() Server { + return &jtServer{} +} + +func (s jtServer) OnConnected(conn net.Conn) { + log.Sugar.Debugf("jtserver连接 conn:%s", conn.RemoteAddr().String()) + + t := conn.(*transport.Conn) + t.Data = NewSession() +} + +func (s jtServer) OnPacket(conn net.Conn, data []byte) { + conn.(*transport.Conn).Data.(*Session).AddEvent(stream.SourceEventInput, data) +} + +func (s jtServer) OnDisConnected(conn net.Conn, err error) { + log.Sugar.Debugf("jtserver断开连接 conn:%s", conn.RemoteAddr().String()) + + t := conn.(*transport.Conn) + t.Data.(*Session).Close() +} + +func (s jtServer) Start(addr net.Addr) error { + utils.Assert(s.tcp == nil) + + server := &transport.TCPServer{} + server.SetHandler(s) + err := server.Bind(addr) + + if err != nil { + return err + } + + s.tcp = server + return nil +} + +func (s jtServer) Close() { + panic("implement me") +} diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go new file mode 100644 index 0000000..3d2947f --- /dev/null +++ b/jt1078/jt_session.go @@ -0,0 +1,284 @@ +package jt1078 + +import ( + "encoding/binary" + "fmt" + "github.com/yangjiechina/avformat/transport" + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/live-server/log" + "github.com/yangjiechina/live-server/stream" +) + +const ( + VideoIFrameMark = 0b000 + VideoPFrameMark = 0b001 + VideoBFrameMark = 0b010 + AudioFrameMark = 0b011 + TransmissionDataMark = 0b1000 + + PTVideoH264 = 98 + PTVideoH265 = 99 + PTVideoAVS = 100 + PTVideoSVAC = 101 + + PTAudioG711A = 6 + PTAudioG711U = 7 + PTAudioG726 = 8 + PTAudioG729A = 9 + PTAudioAAC = 19 + PTAudioMP3 = 25 + PTAudioADPCMA = 26 +) + +type Session struct { + stream.SourceImpl + + phone string + decoder *transport.DelimiterFrameDecoder + + audioIndex int + videoIndex int + audioStream utils.AVStream + videoStream utils.AVStream + audioBuffer stream.MemoryPool + videoBuffer stream.MemoryPool + rtpPacket *RtpPacket +} + +type RtpPacket struct { + pt byte + packetType byte + ts uint64 + subMark byte + simNumber string + + payload []byte +} + +// 读取1078的rtp包, 返回数据类型, 负载类型、时间戳、负载数据 +func read1078RTPPacket(data []byte) (RtpPacket, error) { + if len(data) < 12 { + return RtpPacket{}, fmt.Errorf("invaild data") + } + + packetType := data[11] >> 4 & 0x0F + //忽略透传数据 + if TransmissionDataMark == packetType { + return RtpPacket{}, fmt.Errorf("invaild data") + } + + //忽略低于最低长度的数据包 + if (AudioFrameMark == packetType && len(data) < 26) || (AudioFrameMark == packetType && len(data) < 22) { + return RtpPacket{}, fmt.Errorf("invaild data") + } + + //x扩展位,固定为0 + _ = data[0] >> 4 & 0x1 + pt := data[1] & 0x7F + //seq + _ = binary.BigEndian.Uint16(data[2:]) + + var simNumber string + for i := 4; i < 10; i++ { + simNumber += fmt.Sprintf("%02d", data[i]) + } + + //channel + _ = data[10] + //subMark + subMark := data[11] & 0x0F + //单位ms + var ts uint64 + n := 12 + if TransmissionDataMark != packetType { + ts = binary.BigEndian.Uint64(data[n:]) + n += 8 + } + + if AudioFrameMark > packetType { + //iFrameInterval + _ = binary.BigEndian.Uint16(data[n:]) + n += 2 + //lastFrameInterval + _ = binary.BigEndian.Uint16(data[n:]) + n += 2 + } + + //size + _ = binary.BigEndian.Uint16(data[n:]) + n += 2 + + return RtpPacket{pt: pt, packetType: packetType, ts: ts, simNumber: simNumber, subMark: subMark, payload: data[n:]}, nil +} + +func (s *Session) processVideoPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { + var packet utils.AVPacket + var stream_ utils.AVStream + + if PTVideoH264 == pt { + if s.videoStream == nil { + if VideoIFrameMark != pktType { + return fmt.Errorf("skip non keyframes") + } + + videoStream, err := utils.CreateAVCStreamFromKeyFrame(data, 1) + if err != nil { + return err + } + + stream_ = videoStream + } + + packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000) + } else if PTVideoH265 == pt { + if s.videoStream == nil { + if VideoIFrameMark != pktType { + return fmt.Errorf("skip non keyframes") + } + + videoStream, err := utils.CreateHevcStreamFromKeyFrame(data, 1) + if err != nil { + return err + } + + stream_ = videoStream + } + + packet = utils.NewVideoPacket(data, int64(ts), int64(ts), VideoIFrameMark == pktType, utils.PacketTypeAnnexB, utils.AVCodecIdH265, index, 1000) + } else { + return fmt.Errorf("the codec %d is not implemented", pt) + } + + if stream_ != nil { + s.videoStream = stream_ + s.OnDeMuxStream(stream_) + if s.videoStream != nil && s.audioStream != nil { + s.OnDeMuxStreamDone() + } + } + + s.OnDeMuxPacket(packet) + return nil +} + +func (s *Session) processAudioPacket(pt byte, pktType byte, ts uint64, data []byte, index int) error { + var packet utils.AVPacket + var stream_ utils.AVStream + + if PTAudioG711A == pt { + if s.audioStream == nil { + stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMALAW, nil, nil) + } + + packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMALAW, index, 1000) + } else if PTAudioG711U == pt { + if s.audioStream == nil { + stream_ = utils.NewAVStream(utils.AVMediaTypeAudio, 0, utils.AVCodecIdPCMMULAW, nil, nil) + } + + packet = utils.NewAudioPacket(data, int64(ts), int64(ts), utils.AVCodecIdPCMMULAW, index, 1000) + } else if PTAudioAAC == pt { + + } else { + return fmt.Errorf("the codec %d is not implemented", pt) + } + + if stream_ != nil { + s.audioStream = stream_ + s.OnDeMuxStream(stream_) + if s.videoStream != nil && s.audioStream != nil { + s.OnDeMuxStreamDone() + } + } + + s.OnDeMuxPacket(packet) + return nil +} +func (s *Session) OnJtPTPPacket(data []byte) { + packet, err := read1078RTPPacket(data) + if err != nil { + return + } + + //过滤空数据 + if len(packet.payload) == 0 { + return + } + + //首包处理, hook通知 + if s.rtpPacket == nil { + s.Id_ = packet.simNumber + s.rtpPacket = &RtpPacket{} + *s.rtpPacket = packet + + s.Publish(s, func() { + //response <- utils.HookStateOK + }, func(state utils.HookState) { + //response <- state + }) + } + + //完整包/最后一个分包, 创建AVPacket + //或者参考时间戳, 推流的分包标记为可能不靠谱 + if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt { + if s.rtpPacket.packetType == AudioFrameMark { + if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil { + log.Sugar.Errorf("处理音频包失败 phone:%s err:%s", s.phone, err.Error()) + s.audioBuffer.FreeTail() + } + + *s.rtpPacket = packet + s.audioBuffer.Mark() + } else { + if err := s.processVideoPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.videoBuffer.Fetch(), s.videoIndex); err != nil { + log.Sugar.Errorf("处理视频包失败 phone:%s err:%s", s.phone, err.Error()) + s.videoBuffer.FreeTail() + } + + *s.rtpPacket = packet + s.videoBuffer.Mark() + } + } + + if packet.packetType == AudioFrameMark { + if s.audioBuffer == nil { + if s.videoIndex == 0 && s.audioIndex == 0 { + s.videoIndex = 1 + } + + s.audioBuffer = s.FindOrCreatePacketBuffer(s.audioIndex, utils.AVMediaTypeAudio) + s.audioBuffer.Mark() + } + + s.audioBuffer.Write(packet.payload) + } else { + if s.videoBuffer == nil { + if s.videoIndex == 0 && s.audioIndex == 0 { + s.audioIndex = 1 + } + + s.videoBuffer = s.FindOrCreatePacketBuffer(s.videoIndex, utils.AVMediaTypeVideo) + s.videoBuffer.Mark() + } + + s.videoBuffer.Write(packet.payload) + } +} + +func (s *Session) Input(data []byte) error { + return s.decoder.Input(data) +} + +func (s *Session) Close() { + +} + +func NewSession() *Session { + session := Session{} + delimiter := [4]byte{0x30, 0x31, 0x63, 0x64} + session.decoder = transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:], session.OnJtPTPPacket) + + session.Init(session.Input) + go session.LoopEvent() + return &session +} diff --git a/jt1078/jt_test.go b/jt1078/jt_test.go new file mode 100644 index 0000000..5562444 --- /dev/null +++ b/jt1078/jt_test.go @@ -0,0 +1,39 @@ +package jt1078 + +import ( + "github.com/yangjiechina/avformat/libbufio" + "github.com/yangjiechina/avformat/transport" + "net" + "os" + "testing" + "time" +) + +func TestPublish(t *testing.T) { + path := "D:\\GOProjects\\avformat\\10352264314-2.bin" + + client := transport.TCPClient{} + addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:1078") + if err != nil { + panic(err) + } + err = client.Connect(nil, addr) + if err != nil { + panic(err) + } + + file, err := os.ReadFile(path) + if err != nil { + panic(err) + } + + index := 0 + for index < len(file) { + n := libbufio.MinInt(len(file)-index, 1500) + client.Write(file[index : index+n]) + index += n + time.Sleep(1 * time.Millisecond) + } + + println("end") +} diff --git a/main.go b/main.go index 1516746..671a112 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "github.com/yangjiechina/live-server/flv" "github.com/yangjiechina/live-server/gb28181" "github.com/yangjiechina/live-server/hls" + "github.com/yangjiechina/live-server/jt1078" "github.com/yangjiechina/live-server/log" "github.com/yangjiechina/live-server/rtc" "github.com/yangjiechina/live-server/rtsp" @@ -24,7 +25,7 @@ func NewDefaultAppConfig() stream.AppConfig_ { MergeWriteLatency: 350, Hls: stream.HlsConfig{ - Enable: true, + Enable: false, Dir: "../tmp", Duration: 2, PlaylistLength: 10, @@ -59,6 +60,11 @@ func NewDefaultAppConfig() stream.AppConfig_ { Transport: "UDP|TCP", Port: [2]uint16{20000, 30000}, }, + + JT1078: stream.JT1078Config{ + Enable: true, + Addr: "0.0.0.0:1078", + }, } } @@ -152,6 +158,21 @@ func main() { } } + if stream.AppConfig.JT1078.Enable { + jtAddr, err := net.ResolveTCPAddr("tcp", stream.AppConfig.JT1078.Addr) + if err != nil { + panic(err) + } + + server := jt1078.NewServer() + err = server.Start(jtAddr) + if err != nil { + panic(err) + } + + log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) + } + loadConfigError := http.ListenAndServe(":19999", nil) if loadConfigError != nil { panic(loadConfigError) diff --git a/stream/config.go b/stream/config.go index ceaf039..b4eee3c 100644 --- a/stream/config.go +++ b/stream/config.go @@ -49,6 +49,11 @@ type GB28181Config struct { Port [2]uint16 //单端口模式[0]=port/多端口模式[0]=start port, [0]=end port. } +type JT1078Config struct { + Enable bool + Addr string +} + func (g GB28181Config) EnableTCP() bool { return strings.Contains(g.Transport, "TCP") } @@ -146,4 +151,6 @@ type AppConfig_ struct { Http HttpConfig GB28181 GB28181Config + + JT1078 JT1078Config } diff --git a/stream/trans_stream.go b/stream/trans_stream.go index a5ba311..aa06c0c 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -26,9 +26,11 @@ func init() { int(utils.AVCodecIdVP8): 0x5, int(utils.AVCodecIdVP9): 0x6, - int(utils.AVCodecIdAAC): 101, - int(utils.AVCodecIdMP3): 102, - int(utils.AVCodecIdOPUS): 103, + int(utils.AVCodecIdAAC): 101, + int(utils.AVCodecIdMP3): 102, + int(utils.AVCodecIdOPUS): 103, + int(utils.AVCodecIdPCMALAW): 104, + int(utils.AVCodecIdPCMMULAW): 105, } transStreamFactories = make(map[Protocol]TransStreamFactory, 8)