From dfda2765836350f23f5cc9d0a8cddac496952ca5 Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Thu, 4 Jul 2024 22:35:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BF=9D=E5=AD=98=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E6=95=B0=E6=8D=AE=E5=8C=85=E5=88=B0=E6=96=87=E4=BB=B6?= =?UTF-8?q?.=20=E6=89=93=E5=8D=B0=E6=8B=89=E6=B5=81url?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 1 + config.json | 1 + gb28181/filter_single.go | 2 +- gb28181/source.go | 6 ++++ gb28181/tcp_client.go | 1 + gb28181/tcp_server.go | 50 ++++++++++++++++++------------ gb28181/tcp_session.go | 13 ++++++++ gb28181/udp_server.go | 27 ++++++++++++---- jt1078/jt_server.go | 51 ++++++++++++++---------------- jt1078/jt_session.go | 5 +++ main.go | 5 +-- rtmp/publish_test.go | 40 ++++++++++++++++++++++++ rtmp/rtmp_server.go | 39 +++++++++++------------ rtmp/rtmp_server_test.go | 2 +- stream/config.go | 67 +++++++++++++++++++++++++++++++++++++--- stream/source.go | 9 ++++++ stream/stream_server.go | 42 +++++++++++++++++++++++++ 17 files changed, 280 insertions(+), 81 deletions(-) create mode 100644 rtmp/publish_test.go create mode 100644 stream/stream_server.go diff --git a/api.go b/api.go index 9ba9d28..2c4d485 100644 --- a/api.go +++ b/api.go @@ -61,6 +61,7 @@ func startApiServer(addr string) { http://host:port/xxx_0.ts ws://host:port/xxx.flv */ + //{source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能一层 apiServer.router.HandleFunc("/{source}.flv", withCheckParams(apiServer.onFlv, ".flv")) apiServer.router.HandleFunc("/{source}/{stream}.flv", withCheckParams(apiServer.onFlv, ".flv")) apiServer.router.HandleFunc("/{source}.m3u8", withCheckParams(apiServer.onHLS, ".m3u8")) diff --git a/config.json b/config.json index 42a28e6..006d586 100644 --- a/config.json +++ b/config.json @@ -6,6 +6,7 @@ "public_ip": "192.168.2.148", "idle_timeout": 60, "receive_timeout":60, + "debug": false, "http": { "addr": "0.0.0.0:8080" diff --git a/gb28181/filter_single.go b/gb28181/filter_single.go index dd3f0f1..fbf5db1 100644 --- a/gb28181/filter_single.go +++ b/gb28181/filter_single.go @@ -13,7 +13,7 @@ func (s *singleFilter) AddSource(ssrc uint32, source GBSource) bool { } func (s *singleFilter) RemoveSource(ssrc uint32) { - panic("implement me") + s.source = nil } func (s *singleFilter) FindSource(ssrc uint32) GBSource { diff --git a/gb28181/source.go b/gb28181/source.go index 1792434..cf7467b 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -45,6 +45,8 @@ type GBSource interface { SetConn(conn net.Conn) SetSSRC(ssrc uint32) + + SSRC() uint32 } type BaseGBSource struct { @@ -243,6 +245,10 @@ func (source *BaseGBSource) SetSSRC(ssrc uint32) { source.ssrc = ssrc } +func (source *BaseGBSource) SSRC() uint32 { + return source.ssrc +} + func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ GBSource) { source.SetConn(conn) source.SetSSRC(ssrc) diff --git a/gb28181/tcp_client.go b/gb28181/tcp_client.go index 05abebe..4a48b5c 100644 --- a/gb28181/tcp_client.go +++ b/gb28181/tcp_client.go @@ -7,6 +7,7 @@ import ( "net" ) +// TCPClient GB28181TCP主动收流 type TCPClient struct { TCPServer } diff --git a/gb28181/tcp_server.go b/gb28181/tcp_server.go index eeb9402..5907fc5 100644 --- a/gb28181/tcp_server.go +++ b/gb28181/tcp_server.go @@ -2,30 +2,48 @@ package gb28181 import ( "github.com/yangjiechina/avformat/transport" - "github.com/yangjiechina/lkm/log" + "github.com/yangjiechina/lkm/stream" "net" ) +// TCPServer GB28181TCP被动收流 type TCPServer struct { + stream.StreamServer[*TCPSession] + tcp *transport.TCPServer filter Filter } -func (T *TCPServer) OnConnected(conn net.Conn) []byte { - log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String()) +func (T *TCPServer) OnNewSession(conn net.Conn) *TCPSession { + return NewTCPSession(conn, T.filter) +} - con := conn.(*transport.Conn) - session := NewTCPSession(conn, T.filter) - con.Data = session +func (T *TCPServer) OnCloseSession(session *TCPSession) { + session.Close() + + if session.source != nil { + T.filter.RemoveSource(session.source.SSRC()) + } + + if stream.AppConfig.GB28181.IsMultiPort() { + T.tcp.Close() + T.Handler = nil + } +} + +func (T *TCPServer) OnConnected(conn net.Conn) []byte { + T.StreamServer.OnConnected(conn) //TCP使用ReceiveBuffer区别在于,多端口模式从第一包就使用ReceiveBuffer, 单端口模式先解析出ssrc, 找到source. 后续再使用ReceiveBuffer. - if session.source != nil { - return session.receiveBuffer.GetBlock() + if conn.(*transport.Conn).Data.(*TCPSession).source != nil { + return conn.(*transport.Conn).Data.(*TCPSession).receiveBuffer.GetBlock() } + return nil } func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte { + T.StreamServer.OnPacket(conn, data) session := conn.(*transport.Conn).Data.(*TCPSession) //单端口收流 @@ -42,22 +60,16 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte { return nil } -func (T *TCPServer) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String()) - - con := conn.(*transport.Conn) - if con.Data != nil && con.Data.(*TCPSession).source != nil { - con.Data.(*TCPSession).source.Close() - } - - con.Data = nil -} - func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) { server := &TCPServer{ filter: filter, } + server.StreamServer = stream.StreamServer[*TCPSession]{ + SourceType: stream.SourceType28181, + Handler: server, + } + tcp := &transport.TCPServer{} tcp.SetHandler(server) if err := tcp.Bind(addr); err != nil { diff --git a/gb28181/tcp_session.go b/gb28181/tcp_session.go index 9169880..cbd5843 100644 --- a/gb28181/tcp_session.go +++ b/gb28181/tcp_session.go @@ -33,6 +33,19 @@ func (t *TCPSession) Init(source GBSource) { t.receiveBuffer = stream.NewTCPReceiveBuffer() } +func (t *TCPSession) Close() { + t.conn = nil + if t.source != nil { + t.source.Close() + t.source = nil + } + + if t.decoder != nil { + t.decoder.Close() + t.decoder = nil + } +} + func NewTCPSession(conn net.Conn, filter Filter) *TCPSession { session := &TCPSession{ conn: conn, diff --git a/gb28181/udp_server.go b/gb28181/udp_server.go index 4bf8cc9..28b4b4e 100644 --- a/gb28181/udp_server.go +++ b/gb28181/udp_server.go @@ -8,16 +8,30 @@ import ( "net" ) +// UDPServer GB28181UDP收流 type UDPServer struct { + stream.StreamServer[*UDPSource] udp *transport.UDPServer filter Filter } -func (U UDPServer) OnConnected(conn net.Conn) []byte { +func (U *UDPServer) OnNewSession(conn net.Conn) *UDPSource { return nil } -func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte { +func (U *UDPServer) OnCloseSession(session *UDPSource) { + U.filter.RemoveSource(session.SSRC()) + session.Close() + + if stream.AppConfig.GB28181.IsMultiPort() { + U.udp.Close() + U.Handler = nil + } +} + +func (U *UDPServer) OnPacket(conn net.Conn, data []byte) []byte { + U.StreamServer.OnPacket(conn, data) + packet := rtp.Packet{} err := packet.Unmarshal(data) if err != nil { @@ -32,6 +46,7 @@ func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte { } if stream.SessionStateHandshakeDone == source.State() { + conn.(*transport.Conn).Data = source source.PreparePublish(conn, packet.SSRC, source) } @@ -39,10 +54,6 @@ func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte { return nil } -func (U UDPServer) OnDisConnected(conn net.Conn, err error) { - -} - func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) { server := &UDPServer{ filter: filter, @@ -54,5 +65,9 @@ func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) { } server.udp = udp + server.StreamServer = stream.StreamServer[*UDPSource]{ + SourceType: stream.SourceType28181, + Handler: server, + } return server, nil } diff --git a/jt1078/jt_server.go b/jt1078/jt_server.go index ad137c9..8a5b75a 100644 --- a/jt1078/jt_server.go +++ b/jt1078/jt_server.go @@ -3,7 +3,7 @@ package jt1078 import ( "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" - "github.com/yangjiechina/lkm/log" + "github.com/yangjiechina/lkm/stream" "net" ) @@ -14,44 +14,31 @@ type Server interface { } type jtServer struct { + stream.StreamServer[*Session] tcp *transport.TCPServer } -func NewServer() Server { - return &jtServer{} +func (s *jtServer) OnNewSession(conn net.Conn) *Session { + return NewSession(conn) } -func (s jtServer) OnConnected(conn net.Conn) []byte { - log.Sugar.Debugf("jtserver连接 conn:%s", conn.RemoteAddr().String()) - - t := conn.(*transport.Conn) - t.Data = NewSession(conn) - - return t.Data.(*Session).receiveBuffer.GetBlock() +func (s *jtServer) OnCloseSession(session *Session) { + session.Close() } -func (s jtServer) OnPacket(conn net.Conn, data []byte) []byte { - conn.(*transport.Conn).Data.(*Session).PublishSource.Input(data) - return conn.(*transport.Conn).Data.(*Session).receiveBuffer.GetBlock() +func (s *jtServer) OnPacket(conn net.Conn, data []byte) []byte { + s.StreamServer.OnPacket(conn, data) + session := conn.(*transport.Conn).Data.(*Session) + session.PublishSource.Input(data) + return session.receiveBuffer.GetBlock() } -func (s jtServer) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Debugf("jtserver断开连接 conn:%s", conn.RemoteAddr().String()) - - t := conn.(*transport.Conn) - utils.Assert(t.Data != nil) - t.Data.(*Session).Close() - t.Data = nil -} - -func (s jtServer) Start(addr net.Addr) error { +func (s *jtServer) Start(addr net.Addr) error { utils.Assert(s.tcp == nil) server := &transport.TCPServer{} server.SetHandler(s) - err := server.Bind(addr) - - if err != nil { + if err := server.Bind(addr); err != nil { return err } @@ -59,6 +46,16 @@ func (s jtServer) Start(addr net.Addr) error { return nil } -func (s jtServer) Close() { +func (s *jtServer) Close() { panic("implement me") } + +func NewServer() Server { + j := &jtServer{} + j.StreamServer = stream.StreamServer[*Session]{ + SourceType: stream.SourceType1078, + Handler: j, + } + + return j +} diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index 5d31524..ae32fc1 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -292,6 +292,11 @@ func (s *Session) Close() { s.Conn = nil } + if s.decoder != nil { + s.decoder.Close() + s.decoder = nil + } + s.PublishSource.Close() } diff --git a/main.go b/main.go index ec3205e..a7683ee 100644 --- a/main.go +++ b/main.go @@ -28,9 +28,10 @@ func NewDefaultAppConfig() stream.AppConfig_ { PublicIP: "192.168.2.148", IdleTimeout: int64(60 * time.Second), ReceiveTimeout: int64(10 * time.Second), + Debug: true, Hls: stream.HlsConfig{ - Enable: false, + Enable: true, Dir: "../tmp", Duration: 2, PlaylistLength: 0xFFFF, @@ -79,7 +80,7 @@ func NewDefaultAppConfig() stream.AppConfig_ { }, Hook: stream.HookConfig{ - Enable: false, + Enable: true, Timeout: int64(60 * time.Second), OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish", OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done", diff --git a/rtmp/publish_test.go b/rtmp/publish_test.go new file mode 100644 index 0000000..edb5316 --- /dev/null +++ b/rtmp/publish_test.go @@ -0,0 +1,40 @@ +package rtmp + +import ( + "encoding/binary" + "github.com/yangjiechina/avformat/transport" + "net" + "os" + "testing" + "time" +) + +func TestName(t *testing.T) { + path := "../dump/rtmp-127.0.0.1.6850" + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:1935") + + file, err := os.ReadFile(path) + if err != nil { + panic(err) + } + + client := transport.TCPClient{} + if err := client.Connect(nil, addr); err != nil { + panic(err) + } + + length := len(file) + for i := 0; i < length; { + size := int(binary.BigEndian.Uint32(file[i:])) + if length-i < size { + return + } + + i += 4 + i += size + client.Write(file[i-size : i]) + + time.Sleep(10 * time.Millisecond) + } + +} diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 183780f..b3366f6 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -2,6 +2,7 @@ package rtmp import ( "github.com/yangjiechina/lkm/log" + "github.com/yangjiechina/lkm/stream" "net" "github.com/yangjiechina/avformat/transport" @@ -14,11 +15,9 @@ type Server interface { Close() } -func NewServer() Server { - return &server{} -} - type server struct { + stream.StreamServer[*Session] + tcp *transport.TCPServer } @@ -27,9 +26,7 @@ func (s *server) Start(addr net.Addr) error { tcp := &transport.TCPServer{} tcp.SetHandler(s) - err := tcp.Bind(addr) - - if err != nil { + if err := tcp.Bind(addr); err != nil { return err } @@ -41,17 +38,18 @@ func (s *server) Close() { panic("implement me") } -func (s *server) OnConnected(conn net.Conn) []byte { - log.Sugar.Debugf("rtmp连接 conn:%s", conn.RemoteAddr().String()) +func (s *server) OnNewSession(conn net.Conn) *Session { + return NewSession(conn) +} - t := conn.(*transport.Conn) - t.Data = NewSession(conn) - return nil +func (s *server) OnCloseSession(session *Session) { + session.Close() } func (s *server) OnPacket(conn net.Conn, data []byte) []byte { - t := conn.(*transport.Conn) - session := t.Data.(*Session) + s.StreamServer.OnPacket(conn, data) + + session := conn.(*transport.Conn).Data.(*Session) err := session.Input(conn, data) if err != nil { @@ -66,12 +64,11 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte { return nil } -func (s *server) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String()) - - t := conn.(*transport.Conn) - if t.Data != nil { - t.Data.(*Session).Close() - t.Data = nil +func NewServer() Server { + s := &server{} + s.StreamServer = stream.StreamServer[*Session]{ + SourceType: stream.SourceTypeRtmp, + Handler: s, } + return s } diff --git a/rtmp/rtmp_server_test.go b/rtmp/rtmp_server_test.go index 63e22dd..4a3ca97 100644 --- a/rtmp/rtmp_server_test.go +++ b/rtmp/rtmp_server_test.go @@ -17,7 +17,7 @@ func CreateTransStream(source stream.Source, protocol stream.Protocol, streams [ } func init() { - stream.TransStreamFactory = CreateTransStream + //stream.TransStreamFactory = CreateTransStream } func TestServer(t *testing.T) { diff --git a/stream/config.go b/stream/config.go index 342b3f8..e07cfda 100644 --- a/stream/config.go +++ b/stream/config.go @@ -1,6 +1,11 @@ package stream import ( + "encoding/binary" + "fmt" + "github.com/yangjiechina/lkm/log" + "net" + "os" "strings" ) @@ -144,6 +149,58 @@ func (hook *HookConfig) EnableOnReceiveTimeout() bool { return hook.Enable && hook.OnReceiveTimeoutUrl != "" } +func GetStreamPlayUrls(sourceId string) []string { + var urls []string + if AppConfig.Rtmp.Enable { + _, port, _ := net.SplitHostPort(AppConfig.Rtmp.Addr) + urls = append(urls, fmt.Sprintf("rtmp://%s:%s/%s", AppConfig.PublicIP, port, sourceId)) + } + + if AppConfig.Rtsp.Enable { + _, port, _ := net.SplitHostPort(AppConfig.Rtsp.Addr) + //不拼接userinfo + urls = append(urls, fmt.Sprintf("rtsp://%s:%s/%s", AppConfig.PublicIP, port, sourceId)) + } + + //if AppConfig.Http.Enable { + // return + //} + + _, port, _ := net.SplitHostPort(AppConfig.Http.Addr) + if AppConfig.Hls.Enable { + //不拼接userinfo + urls = append(urls, fmt.Sprintf("http://%s:%s/%s.m3u8", AppConfig.PublicIP, port, sourceId)) + } + + urls = append(urls, fmt.Sprintf("http://%s:%s/%s.flv", AppConfig.PublicIP, port, sourceId)) + urls = append(urls, fmt.Sprintf("http://%s:%s/%s.rtc", AppConfig.PublicIP, port, sourceId)) + urls = append(urls, fmt.Sprintf("ws://%s:%s/%s.flv", AppConfig.PublicIP, port, sourceId)) + return urls +} + +// DumpStream2File 保存推流到文件, 用4字节帧长分割 +func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte) { + if err := os.MkdirAll("dump", 0666); err != nil { + log.Sugar.Errorf("创建dump文件夹失败 err:%s", err.Error()) + return + } + + path := fmt.Sprintf("dump/%s-%s", sourceType.ToString(), conn.RemoteAddr().String()) + path = strings.ReplaceAll(path, ":", ".") + + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + log.Sugar.Errorf("打开dump文件夹失败 err:%s path:%s", err.Error(), path) + return + } + + defer file.Close() + bytes := make([]byte, 4) + binary.BigEndian.PutUint32(bytes, uint32(len(data))) + file.Write(bytes) + file.Write(data) +} + var AppConfig AppConfig_ func init() { @@ -158,17 +215,19 @@ type AppConfig_ struct { PublicIP string `json:"public_ip"` IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. + Debug bool `json:"debug"` //debug模式, 开启将保存推流 //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. MergeWriteLatency int `json:"mw_latency"` Rtmp RtmpConfig Rtsp RtspConfig - Hook HookConfig - Record RecordConfig Hls HlsConfig - Log LogConfig - Http HttpConfig GB28181 GB28181Config JT1078 JT1078Config + + Hook HookConfig + Record RecordConfig + Log LogConfig + Http HttpConfig } diff --git a/stream/source.go b/stream/source.go index 49b2cd0..b3eaeea 100644 --- a/stream/source.go +++ b/stream/source.go @@ -1,6 +1,7 @@ package stream import ( + "encoding/json" "fmt" "github.com/yangjiechina/lkm/log" "net" @@ -173,6 +174,7 @@ type PublishSource struct { idleTimer *time.Timer sinkCount int closed bool + firstPacket bool urlValues url.Values } @@ -254,6 +256,13 @@ func (s *PublishSource) LoopEvent() { break } + if !s.firstPacket { + urls := GetStreamPlayUrls(s.Id_) + indent, _ := json.MarshalIndent(urls, "", "\t") + log.Sugar.Infof("%s 开始推流 source:%s 拉流地址:\r\n%s", s.Type_.ToString(), s.Id_, indent) + s.firstPacket = true + } + if AppConfig.ReceiveTimeout > 0 { s.lastPacketTime = time.Now() } diff --git a/stream/stream_server.go b/stream/stream_server.go new file mode 100644 index 0000000..728af6a --- /dev/null +++ b/stream/stream_server.go @@ -0,0 +1,42 @@ +package stream + +import ( + "github.com/yangjiechina/avformat/transport" + "github.com/yangjiechina/lkm/log" + "net" +) + +type SessionHandler[T any] interface { + OnNewSession(conn net.Conn) T + + OnCloseSession(session T) +} + +type StreamServer[T any] struct { + SourceType SourceType + Handler SessionHandler[T] +} + +func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte { + log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) + conn.(*transport.Conn).Data = s.Handler.OnNewSession(conn) + return nil +} + +func (s *StreamServer[T]) OnPacket(conn net.Conn, data []byte) []byte { + if AppConfig.Debug { + DumpStream2File(s.SourceType, conn, data) + } + + return nil +} + +func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error) { + log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) + + t := conn.(*transport.Conn) + if t.Data != nil { + s.Handler.OnCloseSession(t.Data.(T)) + t.Data = nil + } +}