From 63c90b28b72fb9e569f7d80217c031c3dc8a9ddc Mon Sep 17 00:00:00 2001 From: danil_e71 Date: Wed, 24 Jan 2024 10:58:21 +0300 Subject: [PATCH] feat: add functional options fix: use timeout optionaly --- example/auto/main.go | 70 --------------------------------- example/{custom => url}/main.go | 18 +++++++-- opts.go | 44 +++++++++++++++++++++ rtsp.go | 6 +-- stream.go | 64 ++++++++++++------------------ types_darwin.go | 4 +- types_linux.go | 7 ++-- 7 files changed, 94 insertions(+), 119 deletions(-) delete mode 100644 example/auto/main.go rename example/{custom => url}/main.go (73%) create mode 100644 opts.go diff --git a/example/auto/main.go b/example/auto/main.go deleted file mode 100644 index 405bf6a..0000000 --- a/example/auto/main.go +++ /dev/null @@ -1,70 +0,0 @@ -package main - -import ( - "io" - "log/slog" - "net/http" - "os" - - "github.com/Danile71/go-rtsp" - "github.com/gorilla/mux" - "github.com/mattn/go-mjpeg" -) - -const uri = "rtsp://admin:admin@127.0.0.1:554" - -func main() { - s := mjpeg.NewStream() - - stream, err := rtsp.Open(uri) - if err != nil { - slog.Error( - "open rtsp", - - "error", err, - ) - return - } - - go func() { - for { - pkt, err := stream.ReadPacket() - if err != nil { - if err == io.EOF { - os.Exit(0) - } - slog.Error( - "read packet", - - "error", err, - ) - continue - } - - if pkt.IsVideo() { - if err := s.Update(pkt.Data()); err != nil { - slog.Error( - "write packet", - - "error", err, - ) - } - } - } - }() - - streamHandler := func(w http.ResponseWriter, r *http.Request) { - s.ServeHTTP(w, r) - } - - router := mux.NewRouter() - router.HandleFunc("/stream", streamHandler) - http.Handle("/", router) - if err := http.ListenAndServe(":8181", nil); err != nil { - slog.Error( - "listen", - - "error", err, - ) - } -} diff --git a/example/custom/main.go b/example/url/main.go similarity index 73% rename from example/custom/main.go rename to example/url/main.go index 4df29da..0e1cdc8 100644 --- a/example/custom/main.go +++ b/example/url/main.go @@ -11,14 +11,26 @@ import ( "github.com/mattn/go-mjpeg" ) -const uri = "rtsp://admin:admin@127.0.0.1:554" +const uri = "rtsp://192.168.139.24:8554/mystream" func main() { + // Set ffmpeg log level + rtsp.SetLogLevel(rtsp.AV_LOG_QUIET) + + // Create mjpeg instance s := mjpeg.NewStream() - stream := rtsp.New(uri) + // Prepare stream + stream := rtsp.New(uri, + // Set transport + rtsp.WithType(rtsp.Tcp), - err := stream.Setup(rtsp.Tcp) // or rtsp.Udp or rtsp.Auto + // Set timeout + // rtsp.WithTimeout("1000"), + ) + + // Setup and open stream + err := stream.Setup() if err != nil { slog.Error( "setup stream", diff --git a/opts.go b/opts.go new file mode 100644 index 0000000..7df4732 --- /dev/null +++ b/opts.go @@ -0,0 +1,44 @@ +package rtsp + +/* +#include "ffmpeg.h" +*/ +import "C" +import ( + "unsafe" +) + +type StreamOption func(*Stream) + +func WithTimeout(timeout string) StreamOption { + return func(stream *Stream) { + timeoutKey := C.CString("listen_timeout") + defer C.free(unsafe.Pointer(timeoutKey)) + + timeoutStr := C.CString(timeout) + defer C.free(unsafe.Pointer(timeoutStr)) + + C.av_dict_set(&stream.dictionary, timeoutKey, timeoutStr, 0) + } +} + +func WithType(streamType Type) StreamOption { + return func(stream *Stream) { + transport := C.CString("rtsp_transport") + defer C.free(unsafe.Pointer(transport)) + + switch streamType { + case Tcp: + tcp := C.CString("tcp") + defer C.free(unsafe.Pointer(tcp)) + + C.av_dict_set(&stream.dictionary, transport, tcp, 0) + case Udp: + udp := C.CString("udp") + defer C.free(unsafe.Pointer(udp)) + + C.av_dict_set(&stream.dictionary, transport, udp, 0) + default: + } + } +} diff --git a/rtsp.go b/rtsp.go index a7d0e6d..125d10f 100644 --- a/rtsp.go +++ b/rtsp.go @@ -1,7 +1,7 @@ package rtsp // Open rtsp stream or file -func Open(uri string) (*Stream, error) { - stream := New(uri) - return stream, stream.Setup(Auto) +func Open(uri string, opts ...StreamOption) (*Stream, error) { + stream := New(uri, opts...) + return stream, stream.Setup() } diff --git a/stream.go b/stream.go index 4286177..fa66946 100644 --- a/stream.go +++ b/stream.go @@ -7,20 +7,36 @@ import "C" import ( "fmt" "io" - "os" "runtime" "sync" "unsafe" ) +type LogLevel C.int + +const ( + AV_LOG_QUIET = C.AV_LOG_QUIET + AV_LOG_PANIC = C.AV_LOG_PANIC + AV_LOG_FATAL = C.AV_LOG_FATAL + AV_LOG_ERROR = C.AV_LOG_ERROR + AV_LOG_WARNING = C.AV_LOG_WARNING + AV_LOG_INFO = C.AV_LOG_INFO + AV_LOG_VERBOSE = C.AV_LOG_VERBOSE + AV_LOG_DEBUG = C.AV_LOG_DEBUG + AV_LOG_TRACE = C.AV_LOG_TRACE +) + +// SetLogLevel ffmpeg log level +func SetLogLevel(logLevel LogLevel) { + C.av_log_set_level(C.int(logLevel)) +} + // Type rtsp transport protocol type Type int const ( - // Auto auto change - Auto Type = iota // Tcp use tcp transport protocol - Tcp + Tcp = iota // Udp use udp transport protocol Udp ) @@ -36,12 +52,14 @@ type Stream struct { } // New media stream -func New(uri string) (stream *Stream) { +func New(uri string, opts ...StreamOption) (stream *Stream) { stream = &Stream{uri: uri} stream.decoders = make(map[int]*decoder) stream.formatCtx = C.avformat_alloc_context() - C.av_log_set_level(C.AV_LOG_QUIET) + for _, opt := range opts { + opt(stream) + } runtime.SetFinalizer(stream, free) return @@ -70,38 +88,8 @@ func (e ErrTimeout) Error() string { return e.err.Error() } -// Setup transport protocol (tcp, udp or auto) -func (stream *Stream) Setup(t Type) (err error) { - transport := C.CString("rtsp_transport") - defer C.free(unsafe.Pointer(transport)) - - tcp := C.CString("tcp") - defer C.free(unsafe.Pointer(tcp)) - - udp := C.CString("udp") - defer C.free(unsafe.Pointer(udp)) - - timeoutKey := C.CString("timeout") - defer C.free(unsafe.Pointer(timeoutKey)) - - goTimeout := os.Getenv("FFMPEG_TIMEOUT") - if goTimeout == "" { - goTimeout = "10000000" - } - - timeout := C.CString(goTimeout) - defer C.free(unsafe.Pointer(timeout)) - - C.av_dict_set(&stream.dictionary, timeoutKey, timeout, 0) - - switch t { - case Tcp: - C.av_dict_set(&stream.dictionary, transport, tcp, 0) - case Udp: - C.av_dict_set(&stream.dictionary, transport, udp, 0) - default: - } - +// Setup stream +func (stream *Stream) Setup() (err error) { uri := C.CString(stream.uri) defer C.free(unsafe.Pointer(uri)) diff --git a/types_darwin.go b/types_darwin.go index d5f6978..a426ed1 100644 --- a/types_darwin.go +++ b/types_darwin.go @@ -13,11 +13,11 @@ import ( // CErr2Str convert C error code to Go string func CErr2Str(code C.int) string { - buf := make([]byte, 64) + buf := make([]byte, 1024) C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.ulonglong(len(buf))) - return string(buf) + return string(buf[:bytes.Index(buf, []byte{0})]) } func swrAllocSetOpts(layout uint64, sampleRate C.int, sampleFmt int32) *C.SwrContext { diff --git a/types_linux.go b/types_linux.go index 4d01365..d49b07b 100644 --- a/types_linux.go +++ b/types_linux.go @@ -8,16 +8,17 @@ package rtsp */ import "C" import ( + "bytes" "unsafe" ) // CErr2Str convert C error code to Go string func CErr2Str(code C.int) string { - buf := make([]byte, 64) + buf := make([]byte, 1024) - C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.ulong(len(buf))) + C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))) - return string(buf) + return string(buf[:bytes.Index(buf, []byte{0})]) } func swrAllocSetOpts(layout uint64, sampleRate C.int, sampleFmt int32) *C.SwrContext {