diff --git a/internal/exec/exec.go b/internal/exec/exec.go index d2003a55..1b7842dd 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -9,7 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/rtsp" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/pipe" + "github.com/AlexxIT/go2rtc/pkg/magic" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" "github.com/rs/zerolog" @@ -38,12 +38,12 @@ func Init() { } }) - streams.HandleFunc("exec", Handle) + streams.HandleFunc("exec", execHandle) log = app.GetLogger("exec") } -func Handle(url string) (core.Producer, error) { +func execHandle(url string) (core.Producer, error) { var path string args := shell.QuoteSplit(url[5:]) // remove `exec:` @@ -66,9 +66,34 @@ func Handle(url string) (core.Producer, error) { } if path == "" { - return pipe.NewClient(cmd) + return handlePipe(url, cmd) } + return handleRTSP(url, path, cmd) +} + +func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) { + r, err := PipeCloser(cmd) + if err != nil { + return nil, err + } + + if err = cmd.Start(); err != nil { + return nil, err + } + + client := magic.NewClient(r) + if err = client.Probe(); err != nil { + return nil, err + } + + client.Desc = "exec active producer" + client.URL = url + + return client, nil +} + +func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) { if log.Trace().Enabled() { cmd.Stdout = os.Stdout } diff --git a/internal/exec/pipe.go b/internal/exec/pipe.go new file mode 100644 index 00000000..de101e04 --- /dev/null +++ b/internal/exec/pipe.go @@ -0,0 +1,26 @@ +package exec + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "io" + "os/exec" +) + +// PipeCloser - return StdoutPipe that Kill cmd on Close call +func PipeCloser(cmd *exec.Cmd) (io.ReadCloser, error) { + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + return pipeCloser{stdout, cmd}, nil +} + +type pipeCloser struct { + io.ReadCloser + cmd *exec.Cmd +} + +func (p pipeCloser) Close() error { + return core.Any(p.ReadCloser.Close(), p.cmd.Process.Kill(), p.cmd.Wait()) +} diff --git a/internal/ffmpeg/ffmpeg.go b/internal/ffmpeg/ffmpeg.go index 51d6c74a..d24fcdb0 100644 --- a/internal/ffmpeg/ffmpeg.go +++ b/internal/ffmpeg/ffmpeg.go @@ -3,7 +3,6 @@ package ffmpeg import ( "errors" "github.com/AlexxIT/go2rtc/internal/app" - "github.com/AlexxIT/go2rtc/internal/exec" "github.com/AlexxIT/go2rtc/internal/ffmpeg/device" "github.com/AlexxIT/go2rtc/internal/ffmpeg/hardware" "github.com/AlexxIT/go2rtc/internal/rtsp" @@ -32,7 +31,7 @@ func Init() { if args == nil { return nil, errors.New("can't generate ffmpeg command") } - return exec.Handle("exec:" + args.String()) + return streams.GetProducer("exec:" + args.String()) }) device.Init(defaults["bin"]) diff --git a/internal/http/http.go b/internal/http/http.go index 1bb86a26..6c158196 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -2,24 +2,28 @@ package http import ( "errors" - "fmt" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/magic" "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/AlexxIT/go2rtc/pkg/tcp" + "net" "net/http" + "net/url" "strings" + "time" ) func Init() { - streams.HandleFunc("http", handle) - streams.HandleFunc("https", handle) - streams.HandleFunc("httpx", handle) + streams.HandleFunc("http", handleHTTP) + streams.HandleFunc("https", handleHTTP) + streams.HandleFunc("httpx", handleHTTP) + + streams.HandleFunc("tcp", handleTCP) } -func handle(url string) (core.Producer, error) { +func handleHTTP(url string) (core.Producer, error) { // first we get the Content-Type to define supported producer req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -54,13 +58,38 @@ func handle(url string) (core.Producer, error) { } return conn, nil - case "video/mpeg": - client := mpegts.NewClient(res) - if err = client.Handle(); err != nil { - return nil, err - } - return client, nil + default: // "video/mpeg": } - return nil, fmt.Errorf("unsupported Content-Type: %s", ct) + client := magic.NewClient(res.Body) + if err = client.Probe(); err != nil { + return nil, err + } + + client.Desc = "HTTP active producer" + client.URL = url + + return client, nil +} + +func handleTCP(rawURL string) (core.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + conn, err := net.DialTimeout("tcp", u.Host, time.Second*3) + if err != nil { + return nil, err + } + + client := magic.NewClient(conn) + if err = client.Probe(); err != nil { + return nil, err + } + + client.Desc = "TCP active producer" + client.URL = rawURL + + return client, nil } diff --git a/internal/mjpeg/init.go b/internal/mjpeg/init.go index c3945674..598aae62 100644 --- a/internal/mjpeg/init.go +++ b/internal/mjpeg/init.go @@ -6,8 +6,8 @@ import ( "github.com/AlexxIT/go2rtc/internal/ffmpeg" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/magic" "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/pipe" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog/log" "io" @@ -33,7 +33,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { exit := make(chan []byte) - cons := &pipe.Keyframe{ + cons := &magic.Keyframe{ RemoteAddr: tcp.RemoteAddr(r), UserAgent: r.UserAgent(), } diff --git a/internal/tcp/init.go b/internal/tcp/init.go deleted file mode 100644 index 7f712415..00000000 --- a/internal/tcp/init.go +++ /dev/null @@ -1,35 +0,0 @@ -package tcp - -import ( - "github.com/AlexxIT/go2rtc/internal/streams" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/mpegts" - "net" - "net/http" - "net/url" - "time" -) - -func Init() { - streams.HandleFunc("tcp", handle) -} - -func handle(rawURL string) (core.Producer, error) { - u, err := url.Parse(rawURL) - if err != nil { - return nil, err - } - - conn, err := net.DialTimeout("tcp", u.Host, time.Second*3) - if err != nil { - return nil, err - } - - req := &http.Request{URL: u} - res := &http.Response{Body: conn, Request: req} - client := mpegts.NewClient(res) - if err := client.Handle(); err != nil { - return nil, err - } - return client, nil -} diff --git a/main.go b/main.go index c637fdc8..973537cd 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/srtp" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/tapo" - "github.com/AlexxIT/go2rtc/internal/tcp" "github.com/AlexxIT/go2rtc/internal/webrtc" "github.com/AlexxIT/go2rtc/internal/webtorrent" "os" @@ -52,7 +51,6 @@ func main() { isapi.Init() mpegts.Init() roborock.Init() - tcp.Init() srtp.Init() homekit.Init() diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go index 0cb78aba..c895fbbf 100644 --- a/pkg/core/helpers.go +++ b/pkg/core/helpers.go @@ -29,6 +29,15 @@ func RandString(size, base byte) string { return string(b) } +func Any(errs ...error) error { + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + func Between(s, sub1, sub2 string) string { i := strings.Index(s, sub1) if i < 0 { diff --git a/pkg/h265/avc.go b/pkg/h265/avc.go new file mode 100644 index 00000000..f6d68559 --- /dev/null +++ b/pkg/h265/avc.go @@ -0,0 +1,54 @@ +package h265 + +import "github.com/AlexxIT/go2rtc/pkg/h264" + +const forbiddenZeroBit = 0x80 +const nalUnitType = 0x3F + +// DecodeStream - find and return first AU in AVC format +// useful for processing live streams with unknown separator size +func DecodeStream(annexb []byte) ([]byte, int) { + startPos := -1 + + i := 0 + for { + // search next separator + if i = h264.IndexFrom(annexb, []byte{0, 0, 1}, i); i < 0 { + break + } + + // move i to next AU + if i += 3; i >= len(annexb) { + break + } + + // check if AU type valid + octet := annexb[i] + if octet&forbiddenZeroBit != 0 { + continue + } + + nalType := (octet >> 1) & nalUnitType + if startPos >= 0 { + switch nalType { + case NALUTypeVPS, NALUTypePFrame: + if annexb[i-4] == 0 { + return h264.DecodeAnnexB(annexb[startPos : i-4]), i - 4 + } else { + return h264.DecodeAnnexB(annexb[startPos : i-3]), i - 3 + } + } + } else { + switch nalType { + case NALUTypeVPS, NALUTypePFrame: + if i >= 4 && annexb[i-4] == 0 { + startPos = i - 4 + } else { + startPos = i - 3 + } + } + } + } + + return nil, 0 +} diff --git a/pkg/pipe/client.go b/pkg/magic/client.go similarity index 64% rename from pkg/pipe/client.go rename to pkg/magic/client.go index 91265010..cc9ac3bb 100644 --- a/pkg/pipe/client.go +++ b/pkg/magic/client.go @@ -1,4 +1,4 @@ -package pipe +package magic import ( "bytes" @@ -6,17 +6,21 @@ import ( "errors" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/pion/rtp" "io" - "os/exec" ) +// Client - can read unknown bytestream and autodetect format type Client struct { - cmd *exec.Cmd - stdout io.ReadCloser - sniff []byte - handle func() error + Desc string + URL string + + Handle func() error + + r io.ReadCloser + sniff []byte medias []*core.Media receiver *core.Receiver @@ -24,47 +28,50 @@ type Client struct { recv int } -func NewClient(cmd *exec.Cmd) (prod *Client, err error) { - prod = &Client{cmd: cmd} +func NewClient(r io.ReadCloser) *Client { + return &Client{r: r} +} - prod.stdout, err = cmd.StdoutPipe() +func (c *Client) Probe() (err error) { + c.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT + c.recv, err = io.ReadFull(c.r, c.sniff) if err != nil { - return nil, err - } - - if err = cmd.Start(); err != nil { - return nil, err - } - - prod.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT - prod.recv, err = io.ReadFull(prod.stdout, prod.sniff) - if err != nil { - _ = prod.Stop() - return nil, err + _ = c.Close() + return } var codec *core.Codec - if bytes.HasPrefix(prod.sniff, []byte{0, 0, 0, 1}) { + if bytes.HasPrefix(c.sniff, []byte{0, 0, 0, 1}) { switch { - case h264.NALUType(prod.sniff) == h264.NALUTypeSPS: + case h264.NALUType(c.sniff) == h264.NALUTypeSPS: codec = &core.Codec{ Name: core.CodecH264, ClockRate: 90000, PayloadType: core.PayloadTypeRAW, } - prod.handle = prod.ReadBitstreams + c.Handle = c.ReadBitstreams + + case h265.NALUType(c.sniff) == h265.NALUTypeVPS: + codec = &core.Codec{ + Name: core.CodecH265, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + } + c.Handle = c.ReadBitstreams } - } else if bytes.HasPrefix(prod.sniff, []byte{0xFF, 0xD8}) { + + } else if bytes.HasPrefix(c.sniff, []byte{0xFF, 0xD8}) { codec = &core.Codec{ Name: core.CodecJPEG, ClockRate: 90000, PayloadType: core.PayloadTypeRAW, } - prod.handle = prod.ReadMJPEG - } else if prod.sniff[0] == mpegts.SyncByte { + c.Handle = c.ReadMJPEG + + } else if c.sniff[0] == mpegts.SyncByte { ts := mpegts.NewReader() - ts.AppendBuffer(prod.sniff) + ts.AppendBuffer(c.sniff) _ = ts.GetPacket() for _, streamType := range ts.GetStreamTypes() { switch streamType { @@ -74,17 +81,17 @@ func NewClient(cmd *exec.Cmd) (prod *Client, err error) { ClockRate: 90000, PayloadType: core.PayloadTypeRAW, } - prod.handle = prod.ReadMPEGTS + c.Handle = c.ReadMPEGTS } } } if codec == nil { - _ = prod.Stop() - return nil, errors.New("unknown format: " + hex.EncodeToString(prod.sniff)) + _ = c.Close() + return errors.New("unknown format: " + hex.EncodeToString(c.sniff[:8])) } - prod.medias = append(prod.medias, &core.Media{ + c.medias = append(c.medias, &core.Media{ Kind: core.KindVideo, Direction: core.DirectionRecvonly, Codecs: []*core.Codec{codec}, @@ -97,10 +104,18 @@ func (c *Client) ReadBitstreams() error { buf := c.sniff // total bufer b := make([]byte, 1024*1024) // reading buffer + var decodeStream func([]byte) ([]byte, int) + switch c.receiver.Codec.Name { + case core.CodecH264: + decodeStream = h264.DecodeStream + case core.CodecH265: + decodeStream = h265.DecodeStream + } + for { - payload, n := h264.DecodeStream(buf) + payload, n := decodeStream(buf) if payload == nil { - n, err := c.stdout.Read(b) + n, err := c.r.Read(b) if err != nil { return err } @@ -130,7 +145,7 @@ func (c *Client) ReadMJPEG() error { // one JPEG end and next start i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8}) if i < 0 { - n, err := c.stdout.Read(b) + n, err := c.r.Read(b) if err != nil { return err } @@ -167,7 +182,7 @@ func (c *Client) ReadMPEGTS() error { for { packet := ts.GetPacket() if packet == nil { - n, err := c.stdout.Read(b) + n, err := c.r.Read(b) if err != nil { return err } @@ -186,3 +201,7 @@ func (c *Client) ReadMPEGTS() error { c.receiver.WriteRTP(packet) } } + +func (c *Client) Close() error { + return c.r.Close() +} diff --git a/pkg/pipe/keyframe.go b/pkg/magic/keyframe.go similarity index 99% rename from pkg/pipe/keyframe.go rename to pkg/magic/keyframe.go index 0126024e..abe83e59 100644 --- a/pkg/pipe/keyframe.go +++ b/pkg/magic/keyframe.go @@ -1,4 +1,4 @@ -package pipe +package magic import ( "github.com/AlexxIT/go2rtc/pkg/core" diff --git a/pkg/pipe/producer.go b/pkg/magic/producer.go similarity index 67% rename from pkg/pipe/producer.go rename to pkg/magic/producer.go index c2d5afdd..716a1eec 100644 --- a/pkg/pipe/producer.go +++ b/pkg/magic/producer.go @@ -1,9 +1,8 @@ -package pipe +package magic import ( "encoding/json" "github.com/AlexxIT/go2rtc/pkg/core" - "strings" ) func (c *Client) GetMedias() []*core.Media { @@ -18,29 +17,20 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) Start() error { - return c.handle() + return c.Handle() } func (c *Client) Stop() (err error) { if c.receiver != nil { c.receiver.Close() } - if err1 := c.stdout.Close(); err != nil { - err = err1 - } - if err1 := c.cmd.Process.Kill(); err != nil { - err = err1 - } - if err1 := c.cmd.Wait(); err != nil { - err = err1 - } - return + return c.Close() } func (c *Client) MarshalJSON() ([]byte, error) { info := &core.Info{ - Type: "PIPE active producer", - URL: c.cmd.Path + " " + strings.Join(c.cmd.Args, " "), + Type: c.Desc, + URL: c.URL, Medias: c.medias, Recv: c.recv, }