Add support incoming RTMP

This commit is contained in:
Alex X
2023-10-11 06:54:50 +03:00
parent b8fb3acbab
commit c02117e626
5 changed files with 81 additions and 41 deletions

View File

@@ -66,21 +66,21 @@ func Init() {
}()
}
func tcpHandle(conn net.Conn) error {
client, err := rtmp.NewServer(conn)
func tcpHandle(netConn net.Conn) error {
rtmpConn, err := rtmp.NewServer(netConn)
if err != nil {
return err
}
if err = client.ReadCommands(); err != nil {
if err = rtmpConn.ReadCommands(); err != nil {
return err
}
switch client.Intent {
switch rtmpConn.Intent {
case rtmp.CommandPlay:
stream := streams.Get(client.App)
stream := streams.Get(rtmpConn.App)
if stream == nil {
return errors.New("stream not found: " + client.App)
return errors.New("stream not found: " + rtmpConn.App)
}
cons := flv.NewConsumer()
@@ -90,16 +90,39 @@ func tcpHandle(conn net.Conn) error {
defer stream.RemoveConsumer(cons)
if err = client.WritePlayStart(); err != nil {
if err = rtmpConn.WriteStart(); err != nil {
return err
}
_, _ = cons.WriteTo(client)
_, _ = cons.WriteTo(rtmpConn)
return nil
case rtmp.CommandPublish:
stream := streams.Get(rtmpConn.App)
if stream == nil {
return errors.New("stream not found: " + rtmpConn.App)
}
if err = rtmpConn.WriteStart(); err != nil {
return err
}
prod, err := rtmpConn.Producer()
if err != nil {
return err
}
stream.AddProducer(prod)
defer stream.RemoveProducer(prod)
_ = prod.Start()
return nil
}
return nil
return errors.New("rtmp: unknown command: " + rtmpConn.Intent)
}
var log zerolog.Logger

View File

@@ -1,3 +1,17 @@
## Logs
```
request []interface {}{"connect", 1, map[string]interface {}{"app":"s", "flashVer":"FMLE/3.0 (compatible; FMSc/1.0)", "tcUrl":"rtmps://xxx.rtmp.t.me/s/xxxxx"}}
response []interface {}{"_result", 1, map[string]interface {}{"capabilities":31, "fmsVer":"FMS/3,0,1,123"}, map[string]interface {}{"code":"NetConnection.Connect.Success", "description":"Connection succeeded.", "level":"status", "objectEncoding":0}}
request []interface {}{"releaseStream", 2, interface {}(nil), "xxxxx"}
request []interface {}{"FCPublish", 3, interface {}(nil), "xxxxx"}
request []interface {}{"createStream", 4, interface {}(nil)}
response []interface {}{"_result", 2, interface {}(nil)}
response []interface {}{"_result", 4, interface {}(nil), 1}
request []interface {}{"publish", 5, interface {}(nil), "xxxxx", "live"}
response []interface {}{"onStatus", 0, interface {}(nil), map[string]interface {}{"code":"NetStream.Publish.Start", "description":"xxxxx is now published", "detail":"xxxxx", "level":"status"}}
```
## Useful links
- https://en.wikipedia.org/wiki/Flash_Video

View File

@@ -8,7 +8,6 @@ import (
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
@@ -23,16 +22,16 @@ func DialPlay(rawURL string) (core.Producer, error) {
return nil, err
}
client, err := NewClient(conn, u)
rtmpConn, err := NewClient(conn, u)
if err != nil {
return nil, err
}
if err = client.play(); err != nil {
if err = rtmpConn.play(); err != nil {
return nil, err
}
return flv.Open(client)
return rtmpConn.Producer()
}
func DialPublish(rawURL string) (io.Writer, error) {
@@ -116,13 +115,6 @@ func (c *Conn) clienHandshake() error {
}
func (c *Conn) play() error {
c.rdBuf = []byte{
'F', 'L', 'V', // signature
1, // version
0, // flags (has video/audio)
0, 0, 0, 9, // header size
}
if err := c.writeConnect(); err != nil {
return err
}

View File

@@ -1,5 +1,21 @@
package rtmp
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
)
func (c *Conn) Producer() (core.Producer, error) {
c.rdBuf = []byte{
'F', 'L', 'V', // signature
1, // version
0, // flags (has video/audio)
0, 0, 0, 9, // header size
}
return flv.Open(c)
}
// Read - convert RTMP to FLV format
func (c *Conn) Read(p []byte) (n int, err error) {
// 1. Check temporary tempbuffer

View File

@@ -81,6 +81,7 @@ func (c *Conn) ReadCommands() error {
const (
CommandConnect = "connect"
CommandReleaseStream = "releaseStream"
CommandFCPublish = "FCPublish"
CommandCreateStream = "createStream"
CommandPublish = "publish"
CommandPlay = "play"
@@ -122,12 +123,8 @@ func (c *Conn) acceptCommand(b []byte) error {
payload := amf.EncodeItems(
"_result", tID,
map[string]any{
"fmsVer": "FMS/3,0,1,123",
},
map[string]any{
"code": "NetConnection.Connect.Success",
},
map[string]any{"fmsVer": "FMS/3,0,1,123"},
map[string]any{"code": "NetConnection.Connect.Success"},
)
return c.writeMessage(3, TypeCommand, 0, payload)
@@ -139,8 +136,11 @@ func (c *Conn) acceptCommand(b []byte) error {
payload := amf.EncodeItems("_result", tID, nil, 1)
return c.writeMessage(3, TypeCommand, 0, payload)
case CommandPublish, CommandPlay:
case CommandPublish, CommandPlay: // response later
c.Intent = cmd
c.streamID = 1
case CommandFCPublish: // no response
default:
println("rtmp: unknown command: " + cmd)
@@ -149,19 +149,14 @@ func (c *Conn) acceptCommand(b []byte) error {
return nil
}
func (c *Conn) WritePlayStart() error {
payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{
"code": "NetStream.Play.Start",
})
func (c *Conn) WriteStart() error {
var code string
if c.Intent == CommandPublish {
code = "NetStream.Publish.Start"
} else {
code = "NetStream.Play.Start"
}
payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{"code": code})
return c.writeMessage(3, TypeCommand, 0, payload)
}
func (c *Conn) code() string {
switch c.Intent {
case CommandPlay:
return "NetStream.Play.Start"
case CommandPublish:
return "NetStream.Publish.Start"
}
return ""
}