diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index c31b6f4c..07aa5f71 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -66,21 +66,21 @@ func Init() { }() } -func tcpHandle(conn net.Conn) error { - client, err := rtmp.NewServer(conn) +func tcpHandle(netConn net.Conn) error { + rtmpConn, err := rtmp.NewServer(netConn) if err != nil { return err } - if err = client.ReadCommands(); err != nil { + if err = rtmpConn.ReadCommands(); err != nil { return err } - switch client.Intent { + switch rtmpConn.Intent { case rtmp.CommandPlay: - stream := streams.Get(client.App) + stream := streams.Get(rtmpConn.App) if stream == nil { - return errors.New("stream not found: " + client.App) + return errors.New("stream not found: " + rtmpConn.App) } cons := flv.NewConsumer() @@ -90,16 +90,39 @@ func tcpHandle(conn net.Conn) error { defer stream.RemoveConsumer(cons) - if err = client.WritePlayStart(); err != nil { + if err = rtmpConn.WriteStart(); err != nil { return err } - _, _ = cons.WriteTo(client) + _, _ = cons.WriteTo(rtmpConn) + + return nil case rtmp.CommandPublish: + stream := streams.Get(rtmpConn.App) + if stream == nil { + return errors.New("stream not found: " + rtmpConn.App) + } + + if err = rtmpConn.WriteStart(); err != nil { + return err + } + + prod, err := rtmpConn.Producer() + if err != nil { + return err + } + + stream.AddProducer(prod) + + defer stream.RemoveProducer(prod) + + _ = prod.Start() + + return nil } - return nil + return errors.New("rtmp: unknown command: " + rtmpConn.Intent) } var log zerolog.Logger diff --git a/pkg/rtmp/README.md b/pkg/rtmp/README.md index 746111ab..11382210 100644 --- a/pkg/rtmp/README.md +++ b/pkg/rtmp/README.md @@ -1,3 +1,17 @@ +## Logs + +``` +request []interface {}{"connect", 1, map[string]interface {}{"app":"s", "flashVer":"FMLE/3.0 (compatible; FMSc/1.0)", "tcUrl":"rtmps://xxx.rtmp.t.me/s/xxxxx"}} +response []interface {}{"_result", 1, map[string]interface {}{"capabilities":31, "fmsVer":"FMS/3,0,1,123"}, map[string]interface {}{"code":"NetConnection.Connect.Success", "description":"Connection succeeded.", "level":"status", "objectEncoding":0}} +request []interface {}{"releaseStream", 2, interface {}(nil), "xxxxx"} +request []interface {}{"FCPublish", 3, interface {}(nil), "xxxxx"} +request []interface {}{"createStream", 4, interface {}(nil)} +response []interface {}{"_result", 2, interface {}(nil)} +response []interface {}{"_result", 4, interface {}(nil), 1} +request []interface {}{"publish", 5, interface {}(nil), "xxxxx", "live"} +response []interface {}{"onStatus", 0, interface {}(nil), map[string]interface {}{"code":"NetStream.Publish.Start", "description":"xxxxx is now published", "detail":"xxxxx", "level":"status"}} +``` + ## Useful links - https://en.wikipedia.org/wiki/Flash_Video diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index d97068f2..00544d5b 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/tcp" ) @@ -23,16 +22,16 @@ func DialPlay(rawURL string) (core.Producer, error) { return nil, err } - client, err := NewClient(conn, u) + rtmpConn, err := NewClient(conn, u) if err != nil { return nil, err } - if err = client.play(); err != nil { + if err = rtmpConn.play(); err != nil { return nil, err } - return flv.Open(client) + return rtmpConn.Producer() } func DialPublish(rawURL string) (io.Writer, error) { @@ -116,13 +115,6 @@ func (c *Conn) clienHandshake() error { } func (c *Conn) play() error { - c.rdBuf = []byte{ - 'F', 'L', 'V', // signature - 1, // version - 0, // flags (has video/audio) - 0, 0, 0, 9, // header size - } - if err := c.writeConnect(); err != nil { return err } diff --git a/pkg/rtmp/flv.go b/pkg/rtmp/flv.go index 7e608283..87bef0a8 100644 --- a/pkg/rtmp/flv.go +++ b/pkg/rtmp/flv.go @@ -1,5 +1,21 @@ package rtmp +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" +) + +func (c *Conn) Producer() (core.Producer, error) { + c.rdBuf = []byte{ + 'F', 'L', 'V', // signature + 1, // version + 0, // flags (has video/audio) + 0, 0, 0, 9, // header size + } + + return flv.Open(c) +} + // Read - convert RTMP to FLV format func (c *Conn) Read(p []byte) (n int, err error) { // 1. Check temporary tempbuffer diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 218582d2..f5fc96f8 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -81,6 +81,7 @@ func (c *Conn) ReadCommands() error { const ( CommandConnect = "connect" CommandReleaseStream = "releaseStream" + CommandFCPublish = "FCPublish" CommandCreateStream = "createStream" CommandPublish = "publish" CommandPlay = "play" @@ -122,12 +123,8 @@ func (c *Conn) acceptCommand(b []byte) error { payload := amf.EncodeItems( "_result", tID, - map[string]any{ - "fmsVer": "FMS/3,0,1,123", - }, - map[string]any{ - "code": "NetConnection.Connect.Success", - }, + map[string]any{"fmsVer": "FMS/3,0,1,123"}, + map[string]any{"code": "NetConnection.Connect.Success"}, ) return c.writeMessage(3, TypeCommand, 0, payload) @@ -139,8 +136,11 @@ func (c *Conn) acceptCommand(b []byte) error { payload := amf.EncodeItems("_result", tID, nil, 1) return c.writeMessage(3, TypeCommand, 0, payload) - case CommandPublish, CommandPlay: + case CommandPublish, CommandPlay: // response later c.Intent = cmd + c.streamID = 1 + + case CommandFCPublish: // no response default: println("rtmp: unknown command: " + cmd) @@ -149,19 +149,14 @@ func (c *Conn) acceptCommand(b []byte) error { return nil } -func (c *Conn) WritePlayStart() error { - payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{ - "code": "NetStream.Play.Start", - }) +func (c *Conn) WriteStart() error { + var code string + if c.Intent == CommandPublish { + code = "NetStream.Publish.Start" + } else { + code = "NetStream.Play.Start" + } + + payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{"code": code}) return c.writeMessage(3, TypeCommand, 0, payload) } - -func (c *Conn) code() string { - switch c.Intent { - case CommandPlay: - return "NetStream.Play.Start" - case CommandPublish: - return "NetStream.Publish.Start" - } - return "" -}