Add support HTTP JPEG and MJPEG sources

This commit is contained in:
Alexey Khit
2022-12-01 13:01:48 +03:00
parent 684878b4b1
commit a16d8acc30
18 changed files with 421 additions and 106 deletions

56
cmd/http/http.go Normal file
View File

@@ -0,0 +1,56 @@
package http
import (
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"net/http"
"strings"
)
func Init() {
streams.HandleFunc("http", handle)
streams.HandleFunc("https", handle)
}
func handle(url string) (streamer.Producer, error) {
// first we get the Content-Type to define supported producer
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
res, err := tcp.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, errors.New(res.Status)
}
ct := res.Header.Get("Content-Type")
if i := strings.IndexByte(ct, ';'); i > 0 {
ct = ct[:i]
}
switch ct {
case "image/jpeg", "multipart/x-mixed-replace":
return mjpeg.NewClient(res), nil
case "video/x-flv":
var conn *rtmp.Client
if conn, err = rtmp.Accept(res); err != nil {
return nil, err
}
if err = conn.Describe(); err != nil {
return nil, err
}
return conn, nil
}
return nil, fmt.Errorf("unsupported Content-Type: %s", ct)
}

View File

@@ -40,8 +40,12 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
w.Header().Set("Content-Type", "image/jpeg") h := w.Header()
w.Header().Set("Content-Length", strconv.Itoa(len(data))) h.Set("Content-Type", "image/jpeg")
h.Set("Content-Length", strconv.Itoa(len(data)))
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "close")
h.Set("Pragma", "no-cache")
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
@@ -57,20 +61,21 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan struct{}) flusher := w.(http.Flusher)
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
switch msg := msg.(type) { switch msg := msg.(type) {
case []byte: case []byte:
data := []byte(header + strconv.Itoa(len(msg))) data := []byte(header + strconv.Itoa(len(msg)))
data = append(data, 0x0D, 0x0A, 0x0D, 0x0A) data = append(data, '\r', '\n', '\r', '\n')
data = append(data, msg...) data = append(data, msg...)
data = append(data, 0x0D, 0x0A) data = append(data, '\r', '\n')
if _, err := w.Write(data); err != nil { // Chrome bug: mjpeg image always shows the second to last image
exit <- struct{}{} // https://bugs.chromium.org/p/chromium/issues/detail?id=527446
} _, _ = w.Write(data)
flusher.Flush()
} }
}) })
@@ -79,9 +84,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("Content-Type", `multipart/x-mixed-replace; boundary=frame`) h := w.Header()
h.Set("Content-Type", "multipart/x-mixed-replace; boundary=frame")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "close")
h.Set("Pragma", "no-cache")
<-exit <-r.Context().Done()
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)

View File

@@ -8,8 +8,6 @@ import (
func Init() { func Init() {
streams.HandleFunc("rtmp", handle) streams.HandleFunc("rtmp", handle)
streams.HandleFunc("http", handle)
streams.HandleFunc("https", handle)
} }
func handle(url string) (streamer.Producer, error) { func handle(url string) (streamer.Producer, error) {
@@ -17,5 +15,8 @@ func handle(url string) (streamer.Producer, error) {
if err := conn.Dial(); err != nil { if err := conn.Dial(); err != nil {
return nil, err return nil, err
} }
if err := conn.Describe(); err != nil {
return nil, err
}
return conn, nil return conn, nil
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/ffmpeg" "github.com/AlexxIT/go2rtc/cmd/ffmpeg"
"github.com/AlexxIT/go2rtc/cmd/hass" "github.com/AlexxIT/go2rtc/cmd/hass"
"github.com/AlexxIT/go2rtc/cmd/homekit" "github.com/AlexxIT/go2rtc/cmd/homekit"
"github.com/AlexxIT/go2rtc/cmd/http"
"github.com/AlexxIT/go2rtc/cmd/ivideon" "github.com/AlexxIT/go2rtc/cmd/ivideon"
"github.com/AlexxIT/go2rtc/cmd/mjpeg" "github.com/AlexxIT/go2rtc/cmd/mjpeg"
"github.com/AlexxIT/go2rtc/cmd/mp4" "github.com/AlexxIT/go2rtc/cmd/mp4"
@@ -40,6 +41,7 @@ func main() {
webrtc.Init() webrtc.Init()
mp4.Init() mp4.Init()
mjpeg.Init() mjpeg.Init()
http.Init()
srtp.Init() srtp.Init()
homekit.Init() homekit.Init()

View File

@@ -22,13 +22,17 @@ func Dial(uri string) (*Conn, error) {
return nil, err return nil, err
} }
return Accept(res)
}
func Accept(res *http.Response) (*Conn, error) {
c := Conn{ c := Conn{
conn: res.Body, conn: res.Body,
reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize), reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize),
buf: make([]byte, 256), buf: make([]byte, 256),
} }
if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil { if _, err := io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil {
return nil, err return nil, err
} }
@@ -49,9 +53,9 @@ func Dial(uri string) (*Conn, error) {
} }
type Conn struct { type Conn struct {
conn io.ReadCloser conn io.ReadCloser
reader *bufio.Reader reader *bufio.Reader
buf []byte buf []byte
} }
func (c *Conn) Streams() ([]av.CodecData, error) { func (c *Conn) Streams() ([]av.CodecData, error) {

View File

@@ -165,7 +165,7 @@ func (c *Client) getTracks() error {
Name: streamer.CodecH264, Name: streamer.CodecH264,
ClockRate: 90000, ClockRate: 90000,
FmtpLine: "profile-level-id=" + msg.CodecString[i+1:], FmtpLine: "profile-level-id=" + msg.CodecString[i+1:],
PayloadType: streamer.PayloadTypeMP4, PayloadType: streamer.PayloadTypeRAW,
} }
i = bytes.Index(msg.Data, []byte("avcC")) - 4 i = bytes.Index(msg.Data, []byte("avcC")) - 4

View File

@@ -2,3 +2,4 @@
- https://www.rfc-editor.org/rfc/rfc2435 - https://www.rfc-editor.org/rfc/rfc2435
- https://github.com/GStreamer/gst-plugins-good/blob/master/gst/rtp/gstrtpjpegdepay.c - https://github.com/GStreamer/gst-plugins-good/blob/master/gst/rtp/gstrtpjpegdepay.c
- https://mjpeg.sanford.io/

147
pkg/mjpeg/client.go Normal file
View File

@@ -0,0 +1,147 @@
package mjpeg
import (
"bufio"
"errors"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
)
type Client struct {
streamer.Element
UserAgent string
RemoteAddr string
closed bool
res *http.Response
track *streamer.Track
}
func NewClient(res *http.Response) *Client {
codec := &streamer.Codec{
Name: streamer.CodecJPEG, ClockRate: 90000, PayloadType: streamer.PayloadTypeRAW,
}
return &Client{
res: res,
track: streamer.NewTrack(codec, streamer.DirectionSendonly),
}
}
func (c *Client) GetMedias() []*streamer.Media {
return []*streamer.Media{{
Kind: streamer.KindVideo,
Direction: streamer.DirectionSendonly,
Codecs: []*streamer.Codec{c.track.Codec},
}}
}
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
return c.track
}
func (c *Client) Start() error {
ct := c.res.Header.Get("Content-Type")
if ct == "image/jpeg" {
return c.startJPEG()
}
// added in go1.18
if _, s, ok := strings.Cut(ct, "boundary="); ok {
return c.startMJPEG(s)
}
return errors.New("wrong Content-Type: " + ct)
}
func (c *Client) Stop() error {
c.closed = true
return nil
}
func (c *Client) startJPEG() error {
buf, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
packet := &rtp.Packet{Payload: buf}
_ = c.track.WriteRTP(packet)
req := c.res.Request
for !c.closed {
res, err := tcp.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("wrong status: " + res.Status)
}
buf, err = io.ReadAll(res.Body)
if err != nil {
return err
}
packet = &rtp.Packet{Payload: buf}
_ = c.track.WriteRTP(packet)
}
return nil
}
func (c *Client) startMJPEG(boundary string) error {
boundary = "--" + boundary
r := bufio.NewReader(c.res.Body)
tp := textproto.NewReader(r)
for !c.closed {
s, err := tp.ReadLine()
if err != nil {
return err
}
if s != boundary {
return errors.New("wrong boundary: " + s)
}
header, err := tp.ReadMIMEHeader()
if err != nil {
return err
}
s = header.Get("Content-Length")
if s == "" {
return errors.New("no content length")
}
size, err := strconv.Atoi(s)
if err != nil {
return err
}
buf := make([]byte, size)
if _, err = io.ReadFull(r, buf); err != nil {
return err
}
packet := &rtp.Packet{Payload: buf}
_ = c.track.WriteRTP(packet)
if _, err = r.Discard(2); err != nil {
return err
}
}
return nil
}

View File

@@ -26,70 +26,15 @@ func (c *Consumer) GetMedias() []*streamer.Media {
} }
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
var header, payload []byte
push := func(packet *rtp.Packet) error { push := func(packet *rtp.Packet) error {
//fmt.Printf( c.Fire(packet.Payload)
// "[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v\n",
// track.Codec.Name, len(packet.Payload), packet.Timestamp,
// packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker,
//)
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1
b := packet.Payload
// 3.1. JPEG header
t := b[4]
// 3.1.7. Restart Marker header
if 64 <= t && t <= 127 {
b = b[12:] // skip it
} else {
b = b[8:]
}
if header == nil {
var lqt, cqt []byte
// 3.1.8. Quantization Table header
q := packet.Payload[5]
if q >= 128 {
lqt = b[4:68]
cqt = b[68:132]
b = b[132:]
} else {
lqt, cqt = MakeTables(q)
}
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
// The maximum width is 2040 pixels.
w := uint16(packet.Payload[6]) << 3
h := uint16(packet.Payload[7]) << 3
// fix 2560x1920 and 2560x1440
if w == 512 && (h == 1920 || h == 1440) {
w = 2560
}
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
header = MakeHeaders(t, w, h, lqt, cqt)
}
// 3.1.9. JPEG Payload
payload = append(payload, b...)
if packet.Marker {
b = append(header, payload...)
if end := b[len(b)-2:]; end[0] != 0xFF && end[1] != 0xD9 {
b = append(b, 0xFF, 0xD9)
}
c.Fire(b)
header = nil
payload = nil
}
return nil return nil
} }
if track.Codec.IsRTP() {
wrapper := RTPDepay(track)
push = wrapper(push)
}
return track.Bind(push) return track.Bind(push)
} }

78
pkg/mjpeg/rtp.go Normal file
View File

@@ -0,0 +1,78 @@
package mjpeg
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
)
func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
var header, payload []byte
return func(push streamer.WriterFunc) streamer.WriterFunc {
return func(packet *rtp.Packet) error {
//fmt.Printf(
// "[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v\n",
// track.Codec.Name, len(packet.Payload), packet.Timestamp,
// packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker,
//)
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1
b := packet.Payload
// 3.1. JPEG header
t := b[4]
// 3.1.7. Restart Marker header
if 64 <= t && t <= 127 {
b = b[12:] // skip it
} else {
b = b[8:]
}
if header == nil {
var lqt, cqt []byte
// 3.1.8. Quantization Table header
q := packet.Payload[5]
if q >= 128 {
lqt = b[4:68]
cqt = b[68:132]
b = b[132:]
} else {
lqt, cqt = MakeTables(q)
}
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
// The maximum width is 2040 pixels.
w := uint16(packet.Payload[6]) << 3
h := uint16(packet.Payload[7]) << 3
// fix 2560x1920 and 2560x1440
if w == 512 && (h == 1920 || h == 1440) {
w = 2560
}
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
header = MakeHeaders(t, w, h, lqt, cqt)
}
// 3.1.9. JPEG Payload
payload = append(payload, b...)
if !packet.Marker {
return nil
}
b = append(header, payload...)
if end := b[len(b)-2:]; end[0] != 0xFF && end[1] != 0xD9 {
b = append(b, 0xFF, 0xD9)
}
header = nil
payload = nil
packet.Payload = b
return push(packet)
}
}
}

View File

@@ -72,10 +72,10 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
} }
var wrapper streamer.WrapperFunc var wrapper streamer.WrapperFunc
if codec.IsMP4() { if codec.IsRTP() {
wrapper = h264.RepairAVC(track)
} else {
wrapper = h264.RTPDepay(track) wrapper = h264.RTPDepay(track)
} else {
wrapper = h264.RepairAVC(track)
} }
push = wrapper(push) push = wrapper(push)
@@ -98,7 +98,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !codec.IsMP4() { if codec.IsRTP() {
wrapper := h265.RTPDepay(track) wrapper := h265.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }
@@ -118,7 +118,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !codec.IsMP4() { if codec.IsRTP() {
wrapper := aac.RTPDepay(track) wrapper := aac.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }

View File

@@ -52,10 +52,10 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea
} }
var wrapper streamer.WrapperFunc var wrapper streamer.WrapperFunc
if track.Codec.IsMP4() { if track.Codec.IsRTP() {
wrapper = h264.RepairAVC(track)
} else {
wrapper = h264.RTPDepay(track) wrapper = h264.RTPDepay(track)
} else {
wrapper = h264.RepairAVC(track)
} }
push = wrapper(push) push = wrapper(push)
@@ -73,7 +73,7 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !track.Codec.IsMP4() { if track.Codec.IsRTP() {
wrapper := h265.RTPDepay(track) wrapper := h265.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }

View File

@@ -89,7 +89,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !codec.IsMP4() { if !codec.IsRAW() {
wrapper := h264.RTPDepay(track) wrapper := h264.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }
@@ -146,7 +146,7 @@ func (c *Consumer) Init() ([]byte, error) {
return data, nil return data, nil
} }
func (c *Consumer) Start() { func (c *Consumer) Start() {
c.start = true c.start = true
} }

View File

@@ -11,7 +11,7 @@ import (
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtmp" "github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp" "github.com/pion/rtp"
"strings" "net/http"
"time" "time"
) )
@@ -41,16 +41,20 @@ func NewClient(uri string) *Client {
} }
func (c *Client) Dial() (err error) { func (c *Client) Dial() (err error) {
if strings.HasPrefix(c.URI, "http") { c.conn, err = rtmp.Dial(c.URI)
c.conn, err = httpflv.Dial(c.URI) return
} else { }
c.conn, err = rtmp.Dial(c.URI)
}
// Accept - convert http.Response to Client
func Accept(res *http.Response) (*Client, error) {
conn, err := httpflv.Accept(res)
if err != nil { if err != nil {
return return nil, err
} }
return &Client{URI: res.Request.URL.String(), conn: conn}, nil
}
func (c *Client) Describe() (err error) {
// important to get SPS/PPS // important to get SPS/PPS
streams, err := c.conn.Streams() streams, err := c.conn.Streams()
if err != nil { if err != nil {
@@ -73,7 +77,7 @@ func (c *Client) Dial() (err error) {
Name: streamer.CodecH264, Name: streamer.CodecH264,
ClockRate: 90000, ClockRate: 90000,
FmtpLine: fmtp, FmtpLine: fmtp,
PayloadType: streamer.PayloadTypeMP4, PayloadType: streamer.PayloadTypeRAW,
} }
media := &streamer.Media{ media := &streamer.Media{
@@ -96,7 +100,7 @@ func (c *Client) Dial() (err error) {
Channels: uint16(cd.Config.ChannelConfig), Channels: uint16(cd.Config.ChannelConfig),
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588 // a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes), FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes),
PayloadType: streamer.PayloadTypeMP4, PayloadType: streamer.PayloadTypeRAW,
} }
media := &streamer.Media{ media := &streamer.Media{

View File

@@ -794,7 +794,7 @@ func (c *Conn) bindTrack(
return nil return nil
} }
if track.Codec.IsMP4() { if !track.Codec.IsRTP() {
switch track.Codec.Name { switch track.Codec.Name {
case streamer.CodecH264: case streamer.CodecH264:
wrapper := h264.RTPPay(1500) wrapper := h264.RTPPay(1500)

View File

@@ -37,7 +37,7 @@ const (
CodecELD = "ELD" // AAC-ELD CodecELD = "ELD" // AAC-ELD
) )
const PayloadTypeMP4 byte = 255 const PayloadTypeRAW byte = 255
func GetKind(name string) string { func GetKind(name string) string {
switch name { switch name {
@@ -139,8 +139,8 @@ func (c *Codec) String() string {
return s return s
} }
func (c *Codec) IsMP4() bool { func (c *Codec) IsRTP() bool {
return c.PayloadType == PayloadTypeMP4 return c.PayloadType != PayloadTypeRAW
} }
func (c *Codec) Clone() *Codec { func (c *Codec) Clone() *Codec {

68
pkg/tcp/request.go Normal file
View File

@@ -0,0 +1,68 @@
package tcp
import (
"errors"
"fmt"
"net/http"
"strings"
"time"
)
// Do - http.Client with support Digest Authorization
func Do(req *http.Request) (*http.Response, error) {
// need to create new client each time to reset timeout
client := http.Client{Timeout: time.Second * 5000}
res, err := client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusUnauthorized && req.URL.User != nil {
auth := res.Header.Get("WWW-Authenticate")
if !strings.HasPrefix(auth, "Digest") {
return nil, errors.New("unsupported auth: " + auth)
}
realm := Between(auth, `realm="`, `"`)
nonce := Between(auth, `nonce="`, `"`)
qop := Between(auth, `qop="`, `"`)
user := req.URL.User
username := user.Username()
password, _ := user.Password()
ha1 := HexMD5(username, realm, password)
uri := req.URL.RequestURI()
ha2 := HexMD5(req.Method, uri)
var header string
switch qop {
case "":
response := HexMD5(ha1, nonce, ha2)
header = fmt.Sprintf(
`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`,
user, realm, nonce, uri, response,
)
case "auth":
nc := "00000001"
cnonce := "00000001" // TODO: random...
response := HexMD5(ha1, nonce, nc, cnonce, qop, ha2)
header = fmt.Sprintf(
`Digest username="%s", realm="%s", nonce="%s", uri="%s", qop=%s, nc=%s, cnonce="%s", response="%s"`,
username, realm, nonce, uri, qop, nc, cnonce, response,
)
default:
return nil, errors.New("unsupported qop: " + auth)
}
req.Header.Set("Authorization", header)
res, err = client.Do(req)
if err != nil {
return nil, err
}
}
return res, nil
}

View File

@@ -57,10 +57,10 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.
wrapper := h264.RTPPay(1200) wrapper := h264.RTPPay(1200)
push = wrapper(push) push = wrapper(push)
if codec.IsMP4() { if codec.IsRTP() {
wrapper = h264.RepairAVC(track)
} else {
wrapper = h264.RTPDepay(track) wrapper = h264.RTPDepay(track)
} else {
wrapper = h264.RepairAVC(track)
} }
push = wrapper(push) push = wrapper(push)