支持保存推流数据包到文件. 打印拉流url

This commit is contained in:
yangjiechina
2024-07-04 22:35:55 +08:00
parent 4b1200d9ad
commit dfda276583
17 changed files with 280 additions and 81 deletions

1
api.go
View File

@@ -61,6 +61,7 @@ func startApiServer(addr string) {
http://host:port/xxx_0.ts
ws://host:port/xxx.flv
*/
//{source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能一层
apiServer.router.HandleFunc("/{source}.flv", withCheckParams(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}/{stream}.flv", withCheckParams(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}.m3u8", withCheckParams(apiServer.onHLS, ".m3u8"))

View File

@@ -6,6 +6,7 @@
"public_ip": "192.168.2.148",
"idle_timeout": 60,
"receive_timeout":60,
"debug": false,
"http": {
"addr": "0.0.0.0:8080"

View File

@@ -13,7 +13,7 @@ func (s *singleFilter) AddSource(ssrc uint32, source GBSource) bool {
}
func (s *singleFilter) RemoveSource(ssrc uint32) {
panic("implement me")
s.source = nil
}
func (s *singleFilter) FindSource(ssrc uint32) GBSource {

View File

@@ -45,6 +45,8 @@ type GBSource interface {
SetConn(conn net.Conn)
SetSSRC(ssrc uint32)
SSRC() uint32
}
type BaseGBSource struct {
@@ -243,6 +245,10 @@ func (source *BaseGBSource) SetSSRC(ssrc uint32) {
source.ssrc = ssrc
}
func (source *BaseGBSource) SSRC() uint32 {
return source.ssrc
}
func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ GBSource) {
source.SetConn(conn)
source.SetSSRC(ssrc)

View File

@@ -7,6 +7,7 @@ import (
"net"
)
// TCPClient GB28181TCP主动收流
type TCPClient struct {
TCPServer
}

View File

@@ -2,30 +2,48 @@ package gb28181
import (
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
// TCPServer GB28181TCP被动收流
type TCPServer struct {
stream.StreamServer[*TCPSession]
tcp *transport.TCPServer
filter Filter
}
func (T *TCPServer) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String())
func (T *TCPServer) OnNewSession(conn net.Conn) *TCPSession {
return NewTCPSession(conn, T.filter)
}
con := conn.(*transport.Conn)
session := NewTCPSession(conn, T.filter)
con.Data = session
func (T *TCPServer) OnCloseSession(session *TCPSession) {
session.Close()
if session.source != nil {
T.filter.RemoveSource(session.source.SSRC())
}
if stream.AppConfig.GB28181.IsMultiPort() {
T.tcp.Close()
T.Handler = nil
}
}
func (T *TCPServer) OnConnected(conn net.Conn) []byte {
T.StreamServer.OnConnected(conn)
//TCP使用ReceiveBuffer区别在于,多端口模式从第一包就使用ReceiveBuffer, 单端口模式先解析出ssrc, 找到source. 后续再使用ReceiveBuffer.
if session.source != nil {
return session.receiveBuffer.GetBlock()
if conn.(*transport.Conn).Data.(*TCPSession).source != nil {
return conn.(*transport.Conn).Data.(*TCPSession).receiveBuffer.GetBlock()
}
return nil
}
func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
T.StreamServer.OnPacket(conn, data)
session := conn.(*transport.Conn).Data.(*TCPSession)
//单端口收流
@@ -42,22 +60,16 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (T *TCPServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
if con.Data != nil && con.Data.(*TCPSession).source != nil {
con.Data.(*TCPSession).source.Close()
}
con.Data = nil
}
func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) {
server := &TCPServer{
filter: filter,
}
server.StreamServer = stream.StreamServer[*TCPSession]{
SourceType: stream.SourceType28181,
Handler: server,
}
tcp := &transport.TCPServer{}
tcp.SetHandler(server)
if err := tcp.Bind(addr); err != nil {

View File

@@ -33,6 +33,19 @@ func (t *TCPSession) Init(source GBSource) {
t.receiveBuffer = stream.NewTCPReceiveBuffer()
}
func (t *TCPSession) Close() {
t.conn = nil
if t.source != nil {
t.source.Close()
t.source = nil
}
if t.decoder != nil {
t.decoder.Close()
t.decoder = nil
}
}
func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
session := &TCPSession{
conn: conn,

View File

@@ -8,16 +8,30 @@ import (
"net"
)
// UDPServer GB28181UDP收流
type UDPServer struct {
stream.StreamServer[*UDPSource]
udp *transport.UDPServer
filter Filter
}
func (U UDPServer) OnConnected(conn net.Conn) []byte {
func (U *UDPServer) OnNewSession(conn net.Conn) *UDPSource {
return nil
}
func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
func (U *UDPServer) OnCloseSession(session *UDPSource) {
U.filter.RemoveSource(session.SSRC())
session.Close()
if stream.AppConfig.GB28181.IsMultiPort() {
U.udp.Close()
U.Handler = nil
}
}
func (U *UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
U.StreamServer.OnPacket(conn, data)
packet := rtp.Packet{}
err := packet.Unmarshal(data)
if err != nil {
@@ -32,6 +46,7 @@ func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
}
if stream.SessionStateHandshakeDone == source.State() {
conn.(*transport.Conn).Data = source
source.PreparePublish(conn, packet.SSRC, source)
}
@@ -39,10 +54,6 @@ func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (U UDPServer) OnDisConnected(conn net.Conn, err error) {
}
func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) {
server := &UDPServer{
filter: filter,
@@ -54,5 +65,9 @@ func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) {
}
server.udp = udp
server.StreamServer = stream.StreamServer[*UDPSource]{
SourceType: stream.SourceType28181,
Handler: server,
}
return server, nil
}

View File

@@ -3,7 +3,7 @@ package jt1078
import (
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
@@ -14,44 +14,31 @@ type Server interface {
}
type jtServer struct {
stream.StreamServer[*Session]
tcp *transport.TCPServer
}
func NewServer() Server {
return &jtServer{}
func (s *jtServer) OnNewSession(conn net.Conn) *Session {
return NewSession(conn)
}
func (s jtServer) OnConnected(conn net.Conn) []byte {
log.Sugar.Debugf("jtserver连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data = NewSession(conn)
return t.Data.(*Session).receiveBuffer.GetBlock()
func (s *jtServer) OnCloseSession(session *Session) {
session.Close()
}
func (s jtServer) OnPacket(conn net.Conn, data []byte) []byte {
conn.(*transport.Conn).Data.(*Session).PublishSource.Input(data)
return conn.(*transport.Conn).Data.(*Session).receiveBuffer.GetBlock()
func (s *jtServer) OnPacket(conn net.Conn, data []byte) []byte {
s.StreamServer.OnPacket(conn, data)
session := conn.(*transport.Conn).Data.(*Session)
session.PublishSource.Input(data)
return session.receiveBuffer.GetBlock()
}
func (s jtServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("jtserver断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
utils.Assert(t.Data != nil)
t.Data.(*Session).Close()
t.Data = nil
}
func (s jtServer) Start(addr net.Addr) error {
func (s *jtServer) Start(addr net.Addr) error {
utils.Assert(s.tcp == nil)
server := &transport.TCPServer{}
server.SetHandler(s)
err := server.Bind(addr)
if err != nil {
if err := server.Bind(addr); err != nil {
return err
}
@@ -59,6 +46,16 @@ func (s jtServer) Start(addr net.Addr) error {
return nil
}
func (s jtServer) Close() {
func (s *jtServer) Close() {
panic("implement me")
}
func NewServer() Server {
j := &jtServer{}
j.StreamServer = stream.StreamServer[*Session]{
SourceType: stream.SourceType1078,
Handler: j,
}
return j
}

View File

@@ -292,6 +292,11 @@ func (s *Session) Close() {
s.Conn = nil
}
if s.decoder != nil {
s.decoder.Close()
s.decoder = nil
}
s.PublishSource.Close()
}

View File

@@ -28,9 +28,10 @@ func NewDefaultAppConfig() stream.AppConfig_ {
PublicIP: "192.168.2.148",
IdleTimeout: int64(60 * time.Second),
ReceiveTimeout: int64(10 * time.Second),
Debug: true,
Hls: stream.HlsConfig{
Enable: false,
Enable: true,
Dir: "../tmp",
Duration: 2,
PlaylistLength: 0xFFFF,
@@ -79,7 +80,7 @@ func NewDefaultAppConfig() stream.AppConfig_ {
},
Hook: stream.HookConfig{
Enable: false,
Enable: true,
Timeout: int64(60 * time.Second),
OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish",
OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done",

40
rtmp/publish_test.go Normal file
View File

@@ -0,0 +1,40 @@
package rtmp
import (
"encoding/binary"
"github.com/yangjiechina/avformat/transport"
"net"
"os"
"testing"
"time"
)
func TestName(t *testing.T) {
path := "../dump/rtmp-127.0.0.1.6850"
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:1935")
file, err := os.ReadFile(path)
if err != nil {
panic(err)
}
client := transport.TCPClient{}
if err := client.Connect(nil, addr); err != nil {
panic(err)
}
length := len(file)
for i := 0; i < length; {
size := int(binary.BigEndian.Uint32(file[i:]))
if length-i < size {
return
}
i += 4
i += size
client.Write(file[i-size : i])
time.Sleep(10 * time.Millisecond)
}
}

View File

@@ -2,6 +2,7 @@ package rtmp
import (
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
"github.com/yangjiechina/avformat/transport"
@@ -14,11 +15,9 @@ type Server interface {
Close()
}
func NewServer() Server {
return &server{}
}
type server struct {
stream.StreamServer[*Session]
tcp *transport.TCPServer
}
@@ -27,9 +26,7 @@ func (s *server) Start(addr net.Addr) error {
tcp := &transport.TCPServer{}
tcp.SetHandler(s)
err := tcp.Bind(addr)
if err != nil {
if err := tcp.Bind(addr); err != nil {
return err
}
@@ -41,17 +38,18 @@ func (s *server) Close() {
panic("implement me")
}
func (s *server) OnConnected(conn net.Conn) []byte {
log.Sugar.Debugf("rtmp连接 conn:%s", conn.RemoteAddr().String())
func (s *server) OnNewSession(conn net.Conn) *Session {
return NewSession(conn)
}
t := conn.(*transport.Conn)
t.Data = NewSession(conn)
return nil
func (s *server) OnCloseSession(session *Session) {
session.Close()
}
func (s *server) OnPacket(conn net.Conn, data []byte) []byte {
t := conn.(*transport.Conn)
session := t.Data.(*Session)
s.StreamServer.OnPacket(conn, data)
session := conn.(*transport.Conn).Data.(*Session)
err := session.Input(conn, data)
if err != nil {
@@ -66,12 +64,11 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (s *server) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
if t.Data != nil {
t.Data.(*Session).Close()
t.Data = nil
func NewServer() Server {
s := &server{}
s.StreamServer = stream.StreamServer[*Session]{
SourceType: stream.SourceTypeRtmp,
Handler: s,
}
return s
}

View File

@@ -17,7 +17,7 @@ func CreateTransStream(source stream.Source, protocol stream.Protocol, streams [
}
func init() {
stream.TransStreamFactory = CreateTransStream
//stream.TransStreamFactory = CreateTransStream
}
func TestServer(t *testing.T) {

View File

@@ -1,6 +1,11 @@
package stream
import (
"encoding/binary"
"fmt"
"github.com/yangjiechina/lkm/log"
"net"
"os"
"strings"
)
@@ -144,6 +149,58 @@ func (hook *HookConfig) EnableOnReceiveTimeout() bool {
return hook.Enable && hook.OnReceiveTimeoutUrl != ""
}
func GetStreamPlayUrls(sourceId string) []string {
var urls []string
if AppConfig.Rtmp.Enable {
_, port, _ := net.SplitHostPort(AppConfig.Rtmp.Addr)
urls = append(urls, fmt.Sprintf("rtmp://%s:%s/%s", AppConfig.PublicIP, port, sourceId))
}
if AppConfig.Rtsp.Enable {
_, port, _ := net.SplitHostPort(AppConfig.Rtsp.Addr)
//不拼接userinfo
urls = append(urls, fmt.Sprintf("rtsp://%s:%s/%s", AppConfig.PublicIP, port, sourceId))
}
//if AppConfig.Http.Enable {
// return
//}
_, port, _ := net.SplitHostPort(AppConfig.Http.Addr)
if AppConfig.Hls.Enable {
//不拼接userinfo
urls = append(urls, fmt.Sprintf("http://%s:%s/%s.m3u8", AppConfig.PublicIP, port, sourceId))
}
urls = append(urls, fmt.Sprintf("http://%s:%s/%s.flv", AppConfig.PublicIP, port, sourceId))
urls = append(urls, fmt.Sprintf("http://%s:%s/%s.rtc", AppConfig.PublicIP, port, sourceId))
urls = append(urls, fmt.Sprintf("ws://%s:%s/%s.flv", AppConfig.PublicIP, port, sourceId))
return urls
}
// DumpStream2File 保存推流到文件, 用4字节帧长分割
func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte) {
if err := os.MkdirAll("dump", 0666); err != nil {
log.Sugar.Errorf("创建dump文件夹失败 err:%s", err.Error())
return
}
path := fmt.Sprintf("dump/%s-%s", sourceType.ToString(), conn.RemoteAddr().String())
path = strings.ReplaceAll(path, ":", ".")
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
log.Sugar.Errorf("打开dump文件夹失败 err:%s path:%s", err.Error(), path)
return
}
defer file.Close()
bytes := make([]byte, 4)
binary.BigEndian.PutUint32(bytes, uint32(len(data)))
file.Write(bytes)
file.Write(data)
}
var AppConfig AppConfig_
func init() {
@@ -158,17 +215,19 @@ type AppConfig_ struct {
PublicIP string `json:"public_ip"`
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` //debug模式, 开启将保存推流
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
MergeWriteLatency int `json:"mw_latency"`
Rtmp RtmpConfig
Rtsp RtspConfig
Hook HookConfig
Record RecordConfig
Hls HlsConfig
Log LogConfig
Http HttpConfig
GB28181 GB28181Config
JT1078 JT1078Config
Hook HookConfig
Record RecordConfig
Log LogConfig
Http HttpConfig
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"encoding/json"
"fmt"
"github.com/yangjiechina/lkm/log"
"net"
@@ -173,6 +174,7 @@ type PublishSource struct {
idleTimer *time.Timer
sinkCount int
closed bool
firstPacket bool
urlValues url.Values
}
@@ -254,6 +256,13 @@ func (s *PublishSource) LoopEvent() {
break
}
if !s.firstPacket {
urls := GetStreamPlayUrls(s.Id_)
indent, _ := json.MarshalIndent(urls, "", "\t")
log.Sugar.Infof("%s 开始推流 source:%s 拉流地址:\r\n%s", s.Type_.ToString(), s.Id_, indent)
s.firstPacket = true
}
if AppConfig.ReceiveTimeout > 0 {
s.lastPacketTime = time.Now()
}

42
stream/stream_server.go Normal file
View File

@@ -0,0 +1,42 @@
package stream
import (
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/lkm/log"
"net"
)
type SessionHandler[T any] interface {
OnNewSession(conn net.Conn) T
OnCloseSession(session T)
}
type StreamServer[T any] struct {
SourceType SourceType
Handler SessionHandler[T]
}
func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte {
log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String())
conn.(*transport.Conn).Data = s.Handler.OnNewSession(conn)
return nil
}
func (s *StreamServer[T]) OnPacket(conn net.Conn, data []byte) []byte {
if AppConfig.Debug {
DumpStream2File(s.SourceType, conn, data)
}
return nil
}
func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String())
t := conn.(*transport.Conn)
if t.Data != nil {
s.Handler.OnCloseSession(t.Data.(T))
t.Data = nil
}
}