Rewrite exec pipe, TCP and HTTP sources

This commit is contained in:
Alexey Khit
2023-05-04 11:52:36 +03:00
parent d44efb84a0
commit b5f4c7f75b
12 changed files with 224 additions and 110 deletions

View File

@@ -9,7 +9,7 @@ import (
"github.com/AlexxIT/go2rtc/internal/rtsp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pipe"
"github.com/AlexxIT/go2rtc/pkg/magic"
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog"
@@ -38,12 +38,12 @@ func Init() {
}
})
streams.HandleFunc("exec", Handle)
streams.HandleFunc("exec", execHandle)
log = app.GetLogger("exec")
}
func Handle(url string) (core.Producer, error) {
func execHandle(url string) (core.Producer, error) {
var path string
args := shell.QuoteSplit(url[5:]) // remove `exec:`
@@ -66,9 +66,34 @@ func Handle(url string) (core.Producer, error) {
}
if path == "" {
return pipe.NewClient(cmd)
return handlePipe(url, cmd)
}
return handleRTSP(url, path, cmd)
}
func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) {
r, err := PipeCloser(cmd)
if err != nil {
return nil, err
}
if err = cmd.Start(); err != nil {
return nil, err
}
client := magic.NewClient(r)
if err = client.Probe(); err != nil {
return nil, err
}
client.Desc = "exec active producer"
client.URL = url
return client, nil
}
func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}

26
internal/exec/pipe.go Normal file
View File

@@ -0,0 +1,26 @@
package exec
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"io"
"os/exec"
)
// PipeCloser - return StdoutPipe that Kill cmd on Close call
func PipeCloser(cmd *exec.Cmd) (io.ReadCloser, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
return pipeCloser{stdout, cmd}, nil
}
type pipeCloser struct {
io.ReadCloser
cmd *exec.Cmd
}
func (p pipeCloser) Close() error {
return core.Any(p.ReadCloser.Close(), p.cmd.Process.Kill(), p.cmd.Wait())
}

View File

@@ -3,7 +3,6 @@ package ffmpeg
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/exec"
"github.com/AlexxIT/go2rtc/internal/ffmpeg/device"
"github.com/AlexxIT/go2rtc/internal/ffmpeg/hardware"
"github.com/AlexxIT/go2rtc/internal/rtsp"
@@ -32,7 +31,7 @@ func Init() {
if args == nil {
return nil, errors.New("can't generate ffmpeg command")
}
return exec.Handle("exec:" + args.String())
return streams.GetProducer("exec:" + args.String())
})
device.Init(defaults["bin"])

View File

@@ -2,24 +2,28 @@ package http
import (
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"net"
"net/http"
"net/url"
"strings"
"time"
)
func Init() {
streams.HandleFunc("http", handle)
streams.HandleFunc("https", handle)
streams.HandleFunc("httpx", handle)
streams.HandleFunc("http", handleHTTP)
streams.HandleFunc("https", handleHTTP)
streams.HandleFunc("httpx", handleHTTP)
streams.HandleFunc("tcp", handleTCP)
}
func handle(url string) (core.Producer, error) {
func handleHTTP(url string) (core.Producer, error) {
// first we get the Content-Type to define supported producer
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@@ -54,13 +58,38 @@ func handle(url string) (core.Producer, error) {
}
return conn, nil
case "video/mpeg":
client := mpegts.NewClient(res)
if err = client.Handle(); err != nil {
return nil, err
}
return client, nil
default: // "video/mpeg":
}
return nil, fmt.Errorf("unsupported Content-Type: %s", ct)
client := magic.NewClient(res.Body)
if err = client.Probe(); err != nil {
return nil, err
}
client.Desc = "HTTP active producer"
client.URL = url
return client, nil
}
func handleTCP(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", u.Host, time.Second*3)
if err != nil {
return nil, err
}
client := magic.NewClient(conn)
if err = client.Probe(); err != nil {
return nil, err
}
client.Desc = "TCP active producer"
client.URL = rawURL
return client, nil
}

View File

@@ -6,8 +6,8 @@ import (
"github.com/AlexxIT/go2rtc/internal/ffmpeg"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/pipe"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log"
"io"
@@ -33,7 +33,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
exit := make(chan []byte)
cons := &pipe.Keyframe{
cons := &magic.Keyframe{
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}

View File

@@ -1,35 +0,0 @@
package tcp
import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"net"
"net/http"
"net/url"
"time"
)
func Init() {
streams.HandleFunc("tcp", handle)
}
func handle(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", u.Host, time.Second*3)
if err != nil {
return nil, err
}
req := &http.Request{URL: u}
res := &http.Response{Body: conn, Request: req}
client := mpegts.NewClient(res)
if err := client.Handle(); err != nil {
return nil, err
}
return client, nil
}

View File

@@ -25,7 +25,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/srtp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/internal/tapo"
"github.com/AlexxIT/go2rtc/internal/tcp"
"github.com/AlexxIT/go2rtc/internal/webrtc"
"github.com/AlexxIT/go2rtc/internal/webtorrent"
"os"
@@ -52,7 +51,6 @@ func main() {
isapi.Init()
mpegts.Init()
roborock.Init()
tcp.Init()
srtp.Init()
homekit.Init()

View File

@@ -29,6 +29,15 @@ func RandString(size, base byte) string {
return string(b)
}
func Any(errs ...error) error {
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
func Between(s, sub1, sub2 string) string {
i := strings.Index(s, sub1)
if i < 0 {

54
pkg/h265/avc.go Normal file
View File

@@ -0,0 +1,54 @@
package h265
import "github.com/AlexxIT/go2rtc/pkg/h264"
const forbiddenZeroBit = 0x80
const nalUnitType = 0x3F
// DecodeStream - find and return first AU in AVC format
// useful for processing live streams with unknown separator size
func DecodeStream(annexb []byte) ([]byte, int) {
startPos := -1
i := 0
for {
// search next separator
if i = h264.IndexFrom(annexb, []byte{0, 0, 1}, i); i < 0 {
break
}
// move i to next AU
if i += 3; i >= len(annexb) {
break
}
// check if AU type valid
octet := annexb[i]
if octet&forbiddenZeroBit != 0 {
continue
}
nalType := (octet >> 1) & nalUnitType
if startPos >= 0 {
switch nalType {
case NALUTypeVPS, NALUTypePFrame:
if annexb[i-4] == 0 {
return h264.DecodeAnnexB(annexb[startPos : i-4]), i - 4
} else {
return h264.DecodeAnnexB(annexb[startPos : i-3]), i - 3
}
}
} else {
switch nalType {
case NALUTypeVPS, NALUTypePFrame:
if i >= 4 && annexb[i-4] == 0 {
startPos = i - 4
} else {
startPos = i - 3
}
}
}
}
return nil, 0
}

View File

@@ -1,4 +1,4 @@
package pipe
package magic
import (
"bytes"
@@ -6,17 +6,21 @@ import (
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/pion/rtp"
"io"
"os/exec"
)
// Client - can read unknown bytestream and autodetect format
type Client struct {
cmd *exec.Cmd
stdout io.ReadCloser
sniff []byte
handle func() error
Desc string
URL string
Handle func() error
r io.ReadCloser
sniff []byte
medias []*core.Media
receiver *core.Receiver
@@ -24,47 +28,50 @@ type Client struct {
recv int
}
func NewClient(cmd *exec.Cmd) (prod *Client, err error) {
prod = &Client{cmd: cmd}
func NewClient(r io.ReadCloser) *Client {
return &Client{r: r}
}
prod.stdout, err = cmd.StdoutPipe()
func (c *Client) Probe() (err error) {
c.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT
c.recv, err = io.ReadFull(c.r, c.sniff)
if err != nil {
return nil, err
}
if err = cmd.Start(); err != nil {
return nil, err
}
prod.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT
prod.recv, err = io.ReadFull(prod.stdout, prod.sniff)
if err != nil {
_ = prod.Stop()
return nil, err
_ = c.Close()
return
}
var codec *core.Codec
if bytes.HasPrefix(prod.sniff, []byte{0, 0, 0, 1}) {
if bytes.HasPrefix(c.sniff, []byte{0, 0, 0, 1}) {
switch {
case h264.NALUType(prod.sniff) == h264.NALUTypeSPS:
case h264.NALUType(c.sniff) == h264.NALUTypeSPS:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadBitstreams
c.Handle = c.ReadBitstreams
case h265.NALUType(c.sniff) == h265.NALUTypeVPS:
codec = &core.Codec{
Name: core.CodecH265,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadBitstreams
}
} else if bytes.HasPrefix(prod.sniff, []byte{0xFF, 0xD8}) {
} else if bytes.HasPrefix(c.sniff, []byte{0xFF, 0xD8}) {
codec = &core.Codec{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadMJPEG
} else if prod.sniff[0] == mpegts.SyncByte {
c.Handle = c.ReadMJPEG
} else if c.sniff[0] == mpegts.SyncByte {
ts := mpegts.NewReader()
ts.AppendBuffer(prod.sniff)
ts.AppendBuffer(c.sniff)
_ = ts.GetPacket()
for _, streamType := range ts.GetStreamTypes() {
switch streamType {
@@ -74,17 +81,17 @@ func NewClient(cmd *exec.Cmd) (prod *Client, err error) {
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadMPEGTS
c.Handle = c.ReadMPEGTS
}
}
}
if codec == nil {
_ = prod.Stop()
return nil, errors.New("unknown format: " + hex.EncodeToString(prod.sniff))
_ = c.Close()
return errors.New("unknown format: " + hex.EncodeToString(c.sniff[:8]))
}
prod.medias = append(prod.medias, &core.Media{
c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
@@ -97,10 +104,18 @@ func (c *Client) ReadBitstreams() error {
buf := c.sniff // total bufer
b := make([]byte, 1024*1024) // reading buffer
var decodeStream func([]byte) ([]byte, int)
switch c.receiver.Codec.Name {
case core.CodecH264:
decodeStream = h264.DecodeStream
case core.CodecH265:
decodeStream = h265.DecodeStream
}
for {
payload, n := h264.DecodeStream(buf)
payload, n := decodeStream(buf)
if payload == nil {
n, err := c.stdout.Read(b)
n, err := c.r.Read(b)
if err != nil {
return err
}
@@ -130,7 +145,7 @@ func (c *Client) ReadMJPEG() error {
// one JPEG end and next start
i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8})
if i < 0 {
n, err := c.stdout.Read(b)
n, err := c.r.Read(b)
if err != nil {
return err
}
@@ -167,7 +182,7 @@ func (c *Client) ReadMPEGTS() error {
for {
packet := ts.GetPacket()
if packet == nil {
n, err := c.stdout.Read(b)
n, err := c.r.Read(b)
if err != nil {
return err
}
@@ -186,3 +201,7 @@ func (c *Client) ReadMPEGTS() error {
c.receiver.WriteRTP(packet)
}
}
func (c *Client) Close() error {
return c.r.Close()
}

View File

@@ -1,4 +1,4 @@
package pipe
package magic
import (
"github.com/AlexxIT/go2rtc/pkg/core"

View File

@@ -1,9 +1,8 @@
package pipe
package magic
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"strings"
)
func (c *Client) GetMedias() []*core.Media {
@@ -18,29 +17,20 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
}
func (c *Client) Start() error {
return c.handle()
return c.Handle()
}
func (c *Client) Stop() (err error) {
if c.receiver != nil {
c.receiver.Close()
}
if err1 := c.stdout.Close(); err != nil {
err = err1
}
if err1 := c.cmd.Process.Kill(); err != nil {
err = err1
}
if err1 := c.cmd.Wait(); err != nil {
err = err1
}
return
return c.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "PIPE active producer",
URL: c.cmd.Path + " " + strings.Join(c.cmd.Args, " "),
Type: c.Desc,
URL: c.URL,
Medias: c.medias,
Recv: c.recv,
}