diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 0da06dbd..065fcd87 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -9,6 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/rtmp" + "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog/log" ) @@ -30,10 +31,39 @@ func streamsHandle(url string) (core.Producer, error) { func apiHandle(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { - http.Error(w, "", http.StatusMethodNotAllowed) + outputFLV(w, r) + } else { + inputFLV(w, r) + } +} + +func outputFLV(w http.ResponseWriter, r *http.Request) { + src := r.URL.Query().Get("src") + stream := streams.Get(src) + if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) return } + cons := flv.NewConsumer() + cons.Type = "HTTP-FLV consumer" + cons.RemoteAddr = tcp.RemoteAddr(r) + cons.UserAgent = r.UserAgent() + + if err := stream.AddConsumer(cons); err != nil { + log.Error().Err(err).Caller().Send() + return + } + + h := w.Header() + h.Set("Content-Type", "video/x-flv") + + _, _ = cons.WriteTo(w) + + stream.RemoveConsumer(cons) +} + +func inputFLV(w http.ResponseWriter, r *http.Request) { dst := r.URL.Query().Get("dst") stream := streams.Get(dst) if stream == nil { diff --git a/pkg/flv/consumer.go b/pkg/flv/consumer.go new file mode 100644 index 00000000..59e65d9c --- /dev/null +++ b/pkg/flv/consumer.go @@ -0,0 +1,93 @@ +package flv + +import ( + "io" + + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/pion/rtp" +) + +type Consumer struct { + core.SuperConsumer + wr *core.WriteBuffer + muxer *Muxer +} + +func NewConsumer() *Consumer { + c := &Consumer{ + wr: core.NewWriteBuffer(nil), + muxer: &Muxer{}, + } + c.Medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecH264}, + }, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecAAC}, + }, + }, + } + return c +} + +func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, track.Codec) + + switch track.Codec.Name { + case core.CodecH264: + payload := c.muxer.GetPayloader(track.Codec) + + sender.Handler = func(pkt *rtp.Packet) { + b := payload(pkt) + if n, err := c.wr.Write(b); err == nil { + c.Send += n + } + } + + if track.Codec.IsRTP() { + sender.Handler = h264.RTPDepay(track.Codec, sender.Handler) + } else { + sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler) + } + + case core.CodecAAC: + payload := c.muxer.GetPayloader(track.Codec) + + sender.Handler = func(pkt *rtp.Packet) { + b := payload(pkt) + if n, err := c.wr.Write(b); err == nil { + c.Send += n + } + } + + if track.Codec.IsRTP() { + sender.Handler = aac.RTPDepay(sender.Handler) + } + } + + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { + b := c.muxer.GetInit() + if _, err := wr.Write(b); err != nil { + return 0, err + } + return c.wr.WriteTo(wr) +} + +func (c *Consumer) Stop() error { + _ = c.SuperConsumer.Close() + return c.wr.Close() +} diff --git a/pkg/flv/muxer.go b/pkg/flv/muxer.go new file mode 100644 index 00000000..494773a9 --- /dev/null +++ b/pkg/flv/muxer.go @@ -0,0 +1,176 @@ +package flv + +import ( + "encoding/binary" + "encoding/hex" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv/amf" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/pion/rtp" +) + +type Muxer struct { + codecs []*core.Codec +} + +const ( + FlagsVideo = 0b001 + FlagsAudio = 0b100 +) + +func (m *Muxer) GetInit() []byte { + b := []byte{ + 'F', 'L', 'V', // signature + 1, // version + 0, // flags (has video/audio) + 0, 0, 0, 9, // header size + 0, 0, 0, 0, // tag 0 size + } + + obj := map[string]any{} + + for _, codec := range m.codecs { + switch codec.Name { + case core.CodecH264: + b[4] |= FlagsVideo + obj["videocodecid"] = CodecAVC + + case core.CodecAAC: + b[4] |= FlagsAudio + obj["audiocodecid"] = CodecAAC + obj["audiosamplerate"] = codec.ClockRate + obj["audiosamplesize"] = 16 + obj["stereo"] = codec.Channels == 2 + } + } + + wr := amf.NewWriter() + wr.WriteString("onMetaData") + wr.WriteEcmaArray(obj) + b = append(b, EncodePacket(TagData, 0, wr.Bytes())...) + + for _, codec := range m.codecs { + switch codec.Name { + case core.CodecH264: + sps, pps := h264.GetParameterSet(codec.FmtpLine) + if len(sps) == 0 { + sps = []byte{0x67, 0x42, 0x00, 0x0a, 0xf8, 0x41, 0xa2} + } + if len(pps) == 0 { + pps = []byte{0x68, 0xce, 0x38, 0x80} + } + + config := h264.EncodeConfig(sps, pps) + payload := append(encodeAVData(codec, 0), config...) + b = append(b, EncodePacket(TagVideo, 0, payload)...) + + case core.CodecAAC: + s := core.Between(codec.FmtpLine, "config=", ";") + config, _ := hex.DecodeString(s) + payload := append( + encodeAVData(codec, 0), config..., + ) + b = append(b, EncodePacket(TagAudio, 0, payload)...) + } + } + + return b +} + +func (m *Muxer) GetPayloader(codec *core.Codec) func(packet *rtp.Packet) []byte { + m.codecs = append(m.codecs, codec) + + var ts0 uint32 + var k = codec.ClockRate / 1000 + + switch codec.Name { + case core.CodecH264: + buf := encodeAVData(codec, 1) + + return func(packet *rtp.Packet) []byte { + if h264.IsKeyframe(packet.Payload) { + buf[0] = 1<<4 | 7 + } else { + buf[0] = 2<<4 | 7 + } + + buf = append(buf[:5], packet.Payload...) // reset buffer to previous place + + if ts0 == 0 { + ts0 = packet.Timestamp + } + + timeMS := (packet.Timestamp - ts0) / k + return EncodePacket(TagVideo, timeMS, buf) + } + + case core.CodecAAC: + buf := encodeAVData(codec, 1) + + return func(packet *rtp.Packet) []byte { + buf = append(buf[:2], packet.Payload...) + + if ts0 == 0 { + ts0 = packet.Timestamp + } + + timeMS := (packet.Timestamp - ts0) / k + return EncodePacket(TagAudio, timeMS, buf) + } + } + + return nil +} + +func EncodePacket(tagType byte, timeMS uint32, payload []byte) []byte { + payloadSize := uint32(len(payload)) + tagSize := payloadSize + 11 + + b := make([]byte, tagSize+4) + b[0] = tagType + b[1] = byte(payloadSize >> 16) + b[2] = byte(payloadSize >> 8) + b[3] = byte(payloadSize) + b[4] = byte(timeMS >> 16) + b[5] = byte(timeMS >> 8) + b[6] = byte(timeMS) + b[7] = byte(timeMS >> 24) + copy(b[11:], payload) + + binary.BigEndian.PutUint32(b[tagSize:], tagSize) + return b +} + +func encodeAVData(codec *core.Codec, isFrame byte) []byte { + switch codec.Name { + case core.CodecH264: + return []byte{ + 1<<4 | 7, // keyframe + AVC + isFrame, // 0 - config, 1 - frame + 0, 0, 0, // composition time = 0 + } + + case core.CodecAAC: + var b0 byte = 10 << 4 // AAC + + switch codec.ClockRate { + case 11025: + b0 |= 1 << 2 + case 22050: + b0 |= 2 << 2 + case 44100: + b0 |= 3 << 2 + } + + b0 |= 1 << 1 // 16 bits + + if codec.Channels == 2 { + b0 |= 1 + } + + return []byte{b0, isFrame} // 0 - config, 1 - frame + } + + return nil +}