diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 065fcd87..200d6937 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -1,34 +1,130 @@ package rtmp import ( + "errors" "io" + "net" "net/http" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/AlexxIT/go2rtc/pkg/tcp" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func Init() { + var conf struct { + Mod struct { + Listen string `yaml:"listen" json:"listen"` + } `yaml:"rtmp"` + } + + app.LoadConfig(&conf) + + log = app.GetLogger("rtsp") + streams.HandleFunc("rtmp", streamsHandle) streams.HandleFunc("rtmps", streamsHandle) streams.HandleFunc("rtmpx", streamsHandle) api.HandleFunc("api/stream.flv", apiHandle) + + streams.HandleConsumerFunc("rtmp", streamsConsumerHandle) + streams.HandleConsumerFunc("rtmps", streamsConsumerHandle) + streams.HandleConsumerFunc("rtmpx", streamsConsumerHandle) + + address := conf.Mod.Listen + if address == "" { + return + } + + ln, err := net.Listen("tcp", address) + if err != nil { + log.Error().Err(err).Caller().Send() + return + } + + log.Info().Str("addr", address).Msg("[rtmp] listen") + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + + go func() { + if err = tcpHandle(conn); err != nil { + log.Error().Err(err).Caller().Send() + } + }() + } + }() } +func tcpHandle(conn net.Conn) error { + client, err := rtmp.NewServer(conn) + if err != nil { + return err + } + + if err = client.ReadCommands(); err != nil { + return err + } + + switch client.Intent { + case rtmp.CommandPlay: + stream := streams.Get(client.App) + if stream == nil { + return errors.New("stream not found: " + client.App) + } + + cons := flv.NewConsumer() + if err = stream.AddConsumer(cons); err != nil { + return err + } + + defer stream.RemoveConsumer(cons) + + if err = client.WritePlayStart(); err != nil { + return err + } + + _, _ = cons.WriteTo(client) + + case rtmp.CommandPublish: + } + + return nil +} + +var log zerolog.Logger + func streamsHandle(url string) (core.Producer, error) { - client, err := rtmp.Dial(url) + client, err := rtmp.DialPlay(url) if err != nil { return nil, err } return client, nil } +func streamsConsumerHandle(url string) (core.Consumer, func(), error) { + cons := flv.NewConsumer() + run := func() { + wr, err := rtmp.DialPublish(url) + if err != nil { + return + } + _, err = cons.WriteTo(wr) + } + + return cons, run, nil +} + func apiHandle(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { outputFLV(w, r) diff --git a/pkg/rtmp/README.md b/pkg/rtmp/README.md new file mode 100644 index 00000000..746111ab --- /dev/null +++ b/pkg/rtmp/README.md @@ -0,0 +1,5 @@ +## Useful links + +- https://en.wikipedia.org/wiki/Flash_Video +- https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol +- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf \ No newline at end of file diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go new file mode 100644 index 00000000..d97068f2 --- /dev/null +++ b/pkg/rtmp/client.go @@ -0,0 +1,163 @@ +package rtmp + +import ( + "bufio" + "io" + "net" + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" + "github.com/AlexxIT/go2rtc/pkg/tcp" +) + +func DialPlay(rawURL string) (core.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + conn, err := tcp.Dial(u, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + client, err := NewClient(conn, u) + if err != nil { + return nil, err + } + + if err = client.play(); err != nil { + return nil, err + } + + return flv.Open(client) +} + +func DialPublish(rawURL string) (io.Writer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + conn, err := tcp.Dial(u, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + client, err := NewClient(conn, u) + if err != nil { + return nil, err + } + + if err = client.publish(); err != nil { + return nil, err + } + + return client, nil +} + +func NewClient(conn net.Conn, u *url.URL) (*Conn, error) { + c := &Conn{ + url: u.String(), + + conn: conn, + rd: bufio.NewReaderSize(conn, core.BufferSize), + wr: conn, + + chunks: map[uint8]*header{}, + + rdPacketSize: 128, + wrPacketSize: 4096, // OBS - 4096, Reolink - 4096 + } + + if args := strings.Split(u.Path, "/"); len(args) >= 2 { + c.App = args[1] + if len(args) >= 3 { + c.Stream = args[2] + if u.RawQuery != "" { + c.Stream += "?" + u.RawQuery + } + } + } + + if err := c.clienHandshake(); err != nil { + return nil, err + } + if err := c.writePacketSize(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Conn) clienHandshake() error { + // simple handshake without real random and check response + b := make([]byte, 1+1536) + b[0] = 0x03 + // write C0+C1 + if _, err := c.conn.Write(b); err != nil { + return err + } + // read S0+S1 + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + // write S1 + if _, err := c.conn.Write(b[1:]); err != nil { + return err + } + // read C1, skip check + if _, err := io.ReadFull(c.rd, b[1:]); err != nil { + return err + } + return nil +} + +func (c *Conn) play() error { + c.rdBuf = []byte{ + 'F', 'L', 'V', // signature + 1, // version + 0, // flags (has video/audio) + 0, 0, 0, 9, // header size + } + + if err := c.writeConnect(); err != nil { + return err + } + if err := c.writeCreateStream(); err != nil { + return err + } + if err := c.writePlay(); err != nil { + return err + } + return nil +} + +func (c *Conn) publish() error { + if err := c.writeConnect(); err != nil { + return err + } + if err := c.writeReleaseStream(); err != nil { + return err + } + if err := c.writeCreateStream(); err != nil { + return err + } + if err := c.writePublish(); err != nil { + return err + } + + go func() { + for { + _, _, _, err := c.readMessage() + //log.Printf("!!! %d %d %.30x", msgType, timeMS, b) + if err != nil { + return + } + } + }() + + return nil +} diff --git a/pkg/rtmp/conn.go b/pkg/rtmp/conn.go new file mode 100644 index 00000000..f81b1dec --- /dev/null +++ b/pkg/rtmp/conn.go @@ -0,0 +1,353 @@ +package rtmp + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "strings" + "sync" + + "github.com/AlexxIT/go2rtc/pkg/flv/amf" +) + +const ( + TypeSetPacketSize = 1 + TypeServerBandwidth = 5 + TypeClientBandwidth = 6 + TypeAudio = 8 + TypeVideo = 9 + TypeData = 18 + TypeCommand = 20 +) + +type Conn struct { + App string + Stream string + Intent string + + rdPacketSize uint32 + wrPacketSize uint32 + + chunks map[byte]*header + streamID byte + url string + + conn net.Conn + rd io.Reader + wr io.Writer + + rdBuf []byte + wrBuf []byte + mu sync.Mutex +} + +func (c *Conn) Close() error { + return c.conn.Close() +} + +func (c *Conn) readResponse(transID float64) ([]any, error) { + for { + msgType, _, b, err := c.readMessage() + if err != nil { + return nil, err + } + + switch msgType { + case TypeSetPacketSize: + c.rdPacketSize = binary.BigEndian.Uint32(b) + case TypeCommand: + items, _ := amf.NewReader(b).ReadItems() + if len(items) >= 3 && items[1] == transID { + return items, nil + } + } + } +} + +type header struct { + timeMS uint32 + dataSize uint32 + tagType byte + streamID uint32 +} + +//var ErrNotImplemented = errors.New("rtmp: not implemented") + +func (c *Conn) readMessage() (byte, uint32, []byte, error) { + b, err := c.readSize(1) // doesn't support big chunkID!!! + if err != nil { + return 0, 0, nil, err + } + + hdrType := b[0] >> 6 + chunkID := b[0] & 0b111111 + + // storing header information for support header type 3 + hdr, ok := c.chunks[chunkID] + if !ok { + hdr = &header{} + c.chunks[chunkID] = hdr + } + + switch hdrType { + case 0: // 12 byte header (full header) + if b, err = c.readSize(11); err != nil { + return 0, 0, nil, err + } + _ = b[7] + hdr.timeMS = Uint24(b) + hdr.dataSize = Uint24(b[3:]) + hdr.tagType = b[6] + hdr.streamID = binary.LittleEndian.Uint32(b[7:]) + + case 1: // 8 bytes - like type b00, not including message ID (4 last bytes) + if b, err = c.readSize(7); err != nil { + return 0, 0, nil, err + } + _ = b[6] + hdr.timeMS = Uint24(b) // timestamp + hdr.dataSize = Uint24(b[3:]) // msgdatalen + hdr.tagType = b[6] // msgtypeid + + case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included + if b, err = c.readSize(3); err != nil { + return 0, 0, nil, err + } + hdr.timeMS = Uint24(b) // timestamp + + case 3: // 1 byte - only the Basic Header is included + // use here hdr from previous msg with same session ID (sid) + } + + timeMS := hdr.timeMS + if timeMS == 0xFFFFFF { + if b, err = c.readSize(4); err != nil { + return 0, 0, nil, err + } + timeMS = binary.BigEndian.Uint32(b) + } + + //log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, hdr.timeMS, hdr.dataSize, hdr.tagType, hdr.streamID) + + // 1. Response zero size + if hdr.dataSize == 0 { + return hdr.tagType, timeMS, nil, nil + } + + b = make([]byte, hdr.dataSize) + + // 2. Response small packet + if hdr.dataSize <= c.rdPacketSize { + if _, err = io.ReadFull(c.rd, b); err != nil { + return 0, 0, nil, err + } + return hdr.tagType, timeMS, b, nil + } + + // 3. Response big packet + var i0 uint32 + for i1 := c.rdPacketSize; i1 < hdr.dataSize; i1 += c.rdPacketSize { + if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil { + return 0, 0, nil, err + } + + if _, err = c.readSize(1); err != nil { + return 0, 0, nil, err + } + + if hdr.timeMS == 0xFFFFFF { + if _, err = c.readSize(4); err != nil { + return 0, 0, nil, err + } + } + + i0 = i1 + } + + if _, err = io.ReadFull(c.rd, b[i0:]); err != nil { + return 0, 0, nil, err + } + + return hdr.tagType, timeMS, b, nil +} +func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error { + c.mu.Lock() + c.resetBuffer() + + b := payload + size := uint32(len(b)) + + if size > c.wrPacketSize { + c.appendType0(chunkID, tagType, timeMS, size, b[:c.wrPacketSize]) + + for { + b = b[c.wrPacketSize:] + if uint32(len(b)) > c.wrPacketSize { + c.appendType3(chunkID, b[:c.wrPacketSize]) + } else { + c.appendType3(chunkID, b) + break + } + } + } else { + c.appendType0(chunkID, tagType, timeMS, size, b) + } + + //log.Printf("%d %2d %5d %6d %.32x", chunkID, tagType, timeMS, size, payload) + + _, err := c.wr.Write(c.wrBuf) + c.mu.Unlock() + return err +} + +func (c *Conn) resetBuffer() { + c.wrBuf = c.wrBuf[:0] +} + +func (c *Conn) appendType0(chunkID, tagType byte, timeMS, size uint32, payload []byte) { + // TODO: timeMS more than 24 bit + c.wrBuf = append(c.wrBuf, + chunkID, + byte(timeMS>>16), byte(timeMS>>8), byte(timeMS), + byte(size>>16), byte(size>>8), byte(size), + tagType, + c.streamID, 0, 0, 0, // little endian streamID + ) + c.wrBuf = append(c.wrBuf, payload...) +} + +func (c *Conn) appendType3(chunkID byte, payload []byte) { + c.wrBuf = append(c.wrBuf, 3<<6|chunkID) + c.wrBuf = append(c.wrBuf, payload...) +} + +func (c *Conn) writePacketSize() error { + b := binary.BigEndian.AppendUint32(nil, c.wrPacketSize) + return c.writeMessage(2, TypeSetPacketSize, 0, b) +} + +func (c *Conn) writeConnect() error { + b := amf.EncodeItems("connect", 1, map[string]any{ + "app": c.App, + "flashVer": "FMLE/3.0 (compatible; FMSc/1.0)", + "tcUrl": c.url, + }) + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + + v, err := c.readResponse(1) + if err != nil { + return err + } + + code := getString(v, 3, "code") + if code != "NetConnection.Connect.Success" { + return fmt.Errorf("rtmp: wrong response %#v", v) + } + + return nil +} + +func (c *Conn) writeReleaseStream() error { + b := amf.EncodeItems("releaseStream", 2, nil, c.Stream) + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + b = amf.EncodeItems("FCPublish", 3, nil, c.Stream) + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + return nil +} +func (c *Conn) writeCreateStream() error { + b := amf.EncodeItems("createStream", 4, nil) + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + + v, err := c.readResponse(4) + if err != nil { + return err + } + + if len(v) == 4 { + if f, ok := v[3].(float64); ok { + c.streamID = byte(f) + return nil + } + } + + return fmt.Errorf("rtmp: wrong response %#v", v) +} + +func (c *Conn) writePublish() error { + b := amf.EncodeItems("publish", 5, nil, c.Stream, "live") + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + + v, err := c.readResponse(0) + if err != nil { + return nil + } + + code := getString(v, 3, "code") + if code != "NetStream.Publish.Start" { + return fmt.Errorf("rtmp: wrong response %#v", v) + } + + return nil +} + +func (c *Conn) writePlay() error { + b := amf.EncodeItems("play", 5, nil, c.Stream) + if err := c.writeMessage(3, TypeCommand, 0, b); err != nil { + return err + } + + v, err := c.readResponse(0) + if err != nil { + return nil + } + + code := getString(v, 3, "code") + if !strings.HasPrefix(code, "NetStream.Play.") { + return fmt.Errorf("rtmp: wrong response %#v", v) + } + + return nil +} + +func (c *Conn) readSize(n uint32) ([]byte, error) { + b := make([]byte, n) + if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil { + return nil, err + } + return b, nil +} + +func PutUint24(b []byte, v uint32) { + _ = b[2] + b[0] = byte(v >> 16) + b[1] = byte(v >> 8) + b[2] = byte(v) +} + +func Uint24(b []byte) uint32 { + _ = b[2] + return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2]) +} + +func getString(v []any, i int, key string) string { + if len(v) <= i { + return "" + } + if v, ok := v[i].(map[string]any); ok { + if s, ok := v[key].(string); ok { + return s + } + } + return "" +} diff --git a/pkg/rtmp/flv.go b/pkg/rtmp/flv.go new file mode 100644 index 00000000..7e608283 --- /dev/null +++ b/pkg/rtmp/flv.go @@ -0,0 +1,71 @@ +package rtmp + +// Read - convert RTMP to FLV format +func (c *Conn) Read(p []byte) (n int, err error) { + // 1. Check temporary tempbuffer + if len(c.rdBuf) == 0 { + msgType, timeMS, payload, err2 := c.readMessage() + if err2 != nil { + return 0, err2 + } + + // previous tag size (4 byte) + header (11 byte) + payload + n = 4 + 11 + len(payload) + + // 2. Check if the message fits in the buffer + if n <= len(p) { + encodeFLV(p, msgType, timeMS, payload) + return + } + + // 3. Put the message into a temporary buffer + c.rdBuf = make([]byte, n) + encodeFLV(c.rdBuf, msgType, timeMS, payload) + } + + // 4. Send temporary buffer + n = copy(p, c.rdBuf) + c.rdBuf = c.rdBuf[n:] + return +} + +func encodeFLV(b []byte, msgType byte, time uint32, payload []byte) { + _ = b[4+11] + + b[0] = 0 + b[1] = 0 + b[2] = 0 + b[3] = 0 + b[4+0] = msgType + PutUint24(b[4+1:], uint32(len(payload))) + PutUint24(b[4+4:], time) + b[4+7] = byte(time >> 24) + + copy(b[4+11:], payload) +} + +// Write - convert FLV format to RTMP format +func (c *Conn) Write(p []byte) (n int, err error) { + n = len(p) + + if p[0] == 'F' { + p = p[9+4:] // skip first msg with FLV header + + for len(p) > 0 { + size := 11 + uint16(p[2])<<8 + uint16(p[3]) + 4 + if _, err = c.Write(p[:size]); err != nil { + return 0, err + } + p = p[size:] + } + return + } + + // decode FLV: 11 bytes header + payload + 4 byte size + tagType := p[0] + timeMS := uint32(p[4])<<16 | uint32(p[5])<<8 | uint32(p[6]) | uint32(p[7])<<24 + payload := p[11 : len(p)-4] + + err = c.writeMessage(4, tagType, timeMS, payload) + return +} diff --git a/pkg/rtmp/producer.go b/pkg/rtmp/producer.go deleted file mode 100644 index bbded280..00000000 --- a/pkg/rtmp/producer.go +++ /dev/null @@ -1,28 +0,0 @@ -package rtmp - -import ( - "net/url" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/flv" - "github.com/AlexxIT/go2rtc/pkg/tcp" -) - -func Dial(rawURL string) (core.Producer, error) { - u, err := url.Parse(rawURL) - if err != nil { - return nil, err - } - - conn, err := tcp.Dial(u, core.ConnDialTimeout) - if err != nil { - return nil, err - } - - rd, err := NewReader(u, conn) - if err != nil { - return nil, err - } - - return flv.Open(rd) -} diff --git a/pkg/rtmp/reader.go b/pkg/rtmp/reader.go deleted file mode 100644 index 6674a9ee..00000000 --- a/pkg/rtmp/reader.go +++ /dev/null @@ -1,488 +0,0 @@ -package rtmp - -import ( - "bufio" - "encoding/binary" - "errors" - "io" - "net" - "net/url" - "strings" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/flv/amf" -) - -const ( - MsgSetPacketSize = 1 - MsgServerBandwidth = 5 - MsgClientBandwidth = 6 - MsgCommand = 20 - - //MsgAck = 3 - //MsgControl = 4 - //MsgAudioPacket = 8 - //MsgVideoPacket = 9 - //MsgDataExt = 15 - //MsgCommandExt = 17 - //MsgData = 18 -) - -var ErrResponse = errors.New("rtmp: wrong response") - -type Reader struct { - url string - app string - stream string - pktSize uint32 - - headers map[uint32]*header - - conn net.Conn - rd io.Reader - - buf []byte -} - -func NewReader(u *url.URL, conn net.Conn) (*Reader, error) { - rd := &Reader{ - url: u.String(), - headers: map[uint32]*header{}, - conn: conn, - rd: bufio.NewReaderSize(conn, core.BufferSize), - } - - if args := strings.Split(u.Path, "/"); len(args) >= 2 { - rd.app = args[1] - if len(args) >= 3 { - rd.stream = args[2] - if u.RawQuery != "" { - rd.stream += "?" + u.RawQuery - } - } - } - - if err := rd.handshake(); err != nil { - return nil, err - } - if err := rd.sendConfig(); err != nil { - return nil, err - } - if err := rd.sendConnect(); err != nil { - return nil, err - } - if err := rd.sendPlay(); err != nil { - return nil, err - } - - rd.buf = []byte{ - 'F', 'L', 'V', // signature - 1, // version - 0, // flags (has video/audio) - 0, 0, 0, 9, // header size - } - - return rd, nil -} - -func (c *Reader) Read(p []byte) (n int, err error) { - // 1. Check temporary tempbuffer - if len(c.buf) == 0 { - msgType, timeMS, payload, err2 := c.readMessage() - if err2 != nil { - return 0, err2 - } - - payloadSize := len(payload) - - // previous tag size (4 byte) + header (11 byte) + payload - n = 4 + 11 + payloadSize - - // 2. Check if the message fits in the buffer - if n <= len(p) { - encodeFLV(p, msgType, timeMS, uint32(payloadSize), payload) - return - } - - // 3. Put the message into a temporary buffer - c.buf = make([]byte, n) - encodeFLV(c.buf, msgType, timeMS, uint32(payloadSize), payload) - } - - // 4. Send temporary buffer - n = copy(p, c.buf) - c.buf = c.buf[n:] - return -} - -func (c *Reader) Close() error { - return c.conn.Close() -} - -func encodeFLV(b []byte, msgType byte, time, size uint32, payload []byte) { - b[0] = 0 - b[1] = 0 - b[2] = 0 - b[3] = 0 - b[4+0] = msgType - PutUint24(b[4+1:], size) - PutUint24(b[4+4:], time) - b[4+7] = byte(time >> 24) - - copy(b[4+11:], payload) -} - -type header struct { - msgTime uint32 - msgSize uint32 - msgType byte -} - -func (c *Reader) readMessage() (byte, uint32, []byte, error) { - hdrType, sid, err := c.readHeader() - if err != nil { - return 0, 0, nil, err - } - - // storing header information for support header type 3 - hdr, ok := c.headers[sid] - if !ok { - hdr = &header{} - c.headers[sid] = hdr - } - - var b []byte - - // https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol#Packet_structure - switch hdrType { - case 0: // 12 byte header (full header) - if b, err = c.readSize(11); err != nil { - return 0, 0, nil, err - } - _ = b[7] - hdr.msgTime = Uint24(b) // timestamp - hdr.msgSize = Uint24(b[3:]) // msgdatalen - hdr.msgType = b[6] // msgtypeid - _ = binary.BigEndian.Uint32(b[7:]) // msgsid - - case 1: // 8 bytes - like type b00, not including message ID (4 last bytes) - if b, err = c.readSize(7); err != nil { - return 0, 0, nil, err - } - _ = b[6] - hdr.msgTime = Uint24(b) // timestamp - hdr.msgSize = Uint24(b[3:]) // msgdatalen - hdr.msgType = b[6] // msgtypeid - - case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included - if b, err = c.readSize(3); err != nil { - return 0, 0, nil, err - } - hdr.msgTime = Uint24(b) // timestamp - - case 3: // 1 byte - only the Basic Header is included - // use here hdr from previous msg with same session ID (sid) - } - - timeMS := hdr.msgTime - if timeMS == 0xFFFFFF { - if b, err = c.readSize(4); err != nil { - return 0, 0, nil, err - } - timeMS = binary.BigEndian.Uint32(b) - } - - //log.Printf("[Reader] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType) - - // 1. Response zero size - if hdr.msgSize == 0 { - return hdr.msgType, timeMS, nil, nil - } - - b = make([]byte, hdr.msgSize) - - // 2. Response small packet - if c.pktSize == 0 || hdr.msgSize < c.pktSize { - if _, err = io.ReadFull(c.rd, b); err != nil { - return 0, 0, nil, err - } - return hdr.msgType, timeMS, b, nil - } - - // 3. Response big packet - var i0 uint32 - for i1 := c.pktSize; i1 < hdr.msgSize; i1 += c.pktSize { - if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil { - return 0, 0, nil, err - } - - if _, _, err = c.readHeader(); err != nil { - return 0, 0, nil, err - } - - if hdr.msgTime == 0xFFFFFF { - if _, err = c.readSize(4); err != nil { - return 0, 0, nil, err - } - } - - i0 = i1 - } - - if _, err = io.ReadFull(c.rd, b[i0:]); err != nil { - return 0, 0, nil, err - } - - return hdr.msgType, timeMS, b, nil -} - -func (c *Reader) handshake() error { - // simple handshake without real random and check response - const randomSize = 4 + 4 + 1528 - - b := make([]byte, 1+randomSize) - b[0] = 0x03 - if _, err := c.conn.Write(b); err != nil { - return err - } - - if _, err := io.ReadFull(c.rd, b); err != nil { - return err - } - - if b[0] != 3 { - return errors.New("Reader: wrong handshake") - } - - if _, err := c.conn.Write(b[1:]); err != nil { - return err - } - - if _, err := io.ReadFull(c.rd, b[1:]); err != nil { - return err - } - - return nil -} - -func (c *Reader) sendConfig() error { - b := make([]byte, 5) - binary.BigEndian.PutUint32(b, 65536) - if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil { - return err - } - - binary.BigEndian.PutUint32(b, 2500000) - if err := c.sendRequest(MsgServerBandwidth, 0, b[:4]); err != nil { - return err - } - - binary.BigEndian.PutUint32(b, 10000000) // ack size - b[4] = 2 // limit type - if err := c.sendRequest(MsgClientBandwidth, 0, b); err != nil { - return err - } - - return nil -} - -func (c *Reader) sendConnect() error { - msg := amf.AMF{} - msg.WriteString("connect") - msg.WriteNumber(1) - msg.WriteObject(map[string]any{ - "app": c.app, - "flashVer": "MAC 32,0,0,0", - "tcUrl": c.url, - "fpad": false, // ? - "capabilities": 15, // ? - "audioCodecs": 4071, // ? - "videoCodecs": 252, // ? - "videoFunction": 1, // ? - }) - - if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil { - return err - } - - s, err := c.waitCode("_result", float64(1)) // result with same ID - if err != nil { - return err - } - - if s != "NetConnection.Connect.Success" { - return errors.New("Reader: wrong code: " + s) - } - - return nil -} - -func (c *Reader) sendPlay() error { - msg := amf.NewWriter() - msg.WriteString("createStream") - msg.WriteNumber(2) - msg.WriteNull() - - if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil { - return err - } - - args, err := c.waitResponse("_result", float64(2)) // result with same ID - if err != nil { - return err - } - - if len(args) < 4 { - return ErrResponse - } - - sid, _ := args[3].(float64) - - msg = amf.NewWriter() - msg.WriteString("play") - msg.WriteNumber(0) - msg.WriteNull() - msg.WriteString(c.stream) - - if err = c.sendRequest(MsgCommand, uint32(sid), msg.Bytes()); err != nil { - return err - } - - s, err := c.waitCode("onStatus", float64(0)) // events has zero transaction ID - if err != nil { - return err - } - - switch s { - case "NetStream.Play.Start", "NetStream.Play.Reset": - return nil - } - - return errors.New("Reader: wrong code: " + s) -} - -func (c *Reader) sendRequest(msgType byte, streamID uint32, payload []byte) error { - n := len(payload) - b := make([]byte, 12+n) - _ = b[12] - - switch msgType { - case MsgSetPacketSize, MsgServerBandwidth, MsgClientBandwidth: - b[0] = 0x02 // chunk ID - case MsgCommand: - if streamID == 0 { - b[0] = 0x03 // chunk ID - } else { - b[0] = 0x08 // chunk ID - } - } - - //PutUint24(b[1:], 0) // timestamp - PutUint24(b[4:], uint32(n)) // payload size - b[7] = msgType // message type - binary.BigEndian.PutUint32(b[8:], streamID) // message stream ID - copy(b[12:], payload) - - if _, err := c.conn.Write(b); err != nil { - return err - } - - return nil -} - -func (c *Reader) readHeader() (byte, uint32, error) { - b, err := c.readSize(1) - if err != nil { - return 0, 0, err - } - - hdrType := b[0] >> 6 - sid := uint32(b[0] & 0b111111) - - switch sid { - case 0: - if b, err = c.readSize(1); err != nil { - return 0, 0, err - } - sid = 64 + uint32(b[0]) - case 1: - if b, err = c.readSize(2); err != nil { - return 0, 0, err - } - sid = 64 + uint32(binary.BigEndian.Uint16(b)) - } - - return hdrType, sid, nil -} - -func (c *Reader) readSize(n uint32) ([]byte, error) { - b := make([]byte, n) - if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil { - return nil, err - } - return b, nil -} - -func (c *Reader) waitResponse(cmd any, tid any) ([]any, error) { - for { - msgType, _, b, err := c.readMessage() - if err != nil { - return nil, err - } - - switch msgType { - case MsgSetPacketSize: - c.pktSize = binary.BigEndian.Uint32(b) - - case MsgCommand: - var v []any - if v, err = amf.NewReader(b).ReadItems(); err != nil { - return nil, err - } - - if len(v) < 4 { - return nil, ErrResponse - } - - if v[0] == cmd && v[1] == tid { - return v, nil - } - } - } -} - -func (c *Reader) waitCode(cmd any, tid any) (string, error) { - args, err := c.waitResponse(cmd, tid) - if err != nil { - return "", err - } - - if len(args) < 4 { - return "", ErrResponse - } - - m, _ := args[3].(map[string]any) - if m == nil { - return "", ErrResponse - } - - v, _ := m["code"] - if v == nil { - return "", ErrResponse - } - - s, _ := v.(string) - return s, nil -} - -func PutUint24(b []byte, v uint32) { - _ = b[2] - b[0] = byte(v >> 16) - b[1] = byte(v >> 8) - b[2] = byte(v) -} - -func Uint24(b []byte) uint32 { - _ = b[2] - return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2]) -} diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go new file mode 100644 index 00000000..218582d2 --- /dev/null +++ b/pkg/rtmp/server.go @@ -0,0 +1,167 @@ +package rtmp + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv/amf" +) + +func NewServer(conn net.Conn) (*Conn, error) { + c := &Conn{ + conn: conn, + rd: bufio.NewReaderSize(conn, core.BufferSize), + wr: conn, + + chunks: map[uint8]*header{}, + + rdPacketSize: 128, + wrPacketSize: 4096, + } + + if err := c.serverHandshake(); err != nil { + return nil, err + } + if err := c.writePacketSize(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Conn) serverHandshake() error { + b := make([]byte, 1+1536) + // read C0+C1 + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + // write S0+S1, skip random + if _, err := c.conn.Write(b); err != nil { + return err + } + // read S1, skip check + if _, err := io.ReadFull(c.rd, make([]byte, 1536)); err != nil { + return err + } + // write C1 + if _, err := c.conn.Write(b[1:]); err != nil { + return err + } + return nil +} + +func (c *Conn) ReadCommands() error { + for { + msgType, _, b, err := c.readMessage() + if err != nil { + return err + } + + //log.Printf("%d %.256x", msgType, b) + + switch msgType { + case TypeSetPacketSize: + c.rdPacketSize = binary.BigEndian.Uint32(b) + case TypeCommand: + if err = c.acceptCommand(b); err != nil { + return err + } + + if c.Intent != "" { + return nil + } + } + } +} + +const ( + CommandConnect = "connect" + CommandReleaseStream = "releaseStream" + CommandCreateStream = "createStream" + CommandPublish = "publish" + CommandPlay = "play" +) + +func (c *Conn) acceptCommand(b []byte) error { + items, err := amf.NewReader(b).ReadItems() + if err != nil { + return nil + } + + //log.Printf("%#v", items) + + if len(items) < 2 { + return fmt.Errorf("rtmp: read command %x", b) + } + + cmd, ok := items[0].(string) + if !ok { + return fmt.Errorf("rtmp: read command %x", b) + } + + tID, ok := items[1].(float64) // transaction ID + if !ok { + return fmt.Errorf("rtmp: read command %x", b) + } + + switch cmd { + case CommandConnect: + if len(items) == 3 { + if v, ok := items[2].(map[string]any); ok { + c.App, _ = v["app"].(string) + } + } + + if c.App == "" { + return fmt.Errorf("rtmp: read command %x", b) + } + + payload := amf.EncodeItems( + "_result", tID, + map[string]any{ + "fmsVer": "FMS/3,0,1,123", + }, + map[string]any{ + "code": "NetConnection.Connect.Success", + }, + ) + return c.writeMessage(3, TypeCommand, 0, payload) + + case CommandReleaseStream: + payload := amf.EncodeItems("_result", tID, nil) + return c.writeMessage(3, TypeCommand, 0, payload) + + case CommandCreateStream: + payload := amf.EncodeItems("_result", tID, nil, 1) + return c.writeMessage(3, TypeCommand, 0, payload) + + case CommandPublish, CommandPlay: + c.Intent = cmd + + default: + println("rtmp: unknown command: " + cmd) + } + + return nil +} + +func (c *Conn) WritePlayStart() error { + payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{ + "code": "NetStream.Play.Start", + }) + return c.writeMessage(3, TypeCommand, 0, payload) +} + +func (c *Conn) code() string { + switch c.Intent { + case CommandPlay: + return "NetStream.Play.Start" + case CommandPublish: + return "NetStream.Publish.Start" + } + return "" +}