feat: add functional options

fix: use timeout optionaly
This commit is contained in:
danil_e71
2024-01-24 10:58:21 +03:00
parent 6393ef842f
commit 63c90b28b7
7 changed files with 94 additions and 119 deletions

View File

@@ -1,70 +0,0 @@
package main
import (
"io"
"log/slog"
"net/http"
"os"
"github.com/Danile71/go-rtsp"
"github.com/gorilla/mux"
"github.com/mattn/go-mjpeg"
)
const uri = "rtsp://admin:admin@127.0.0.1:554"
func main() {
s := mjpeg.NewStream()
stream, err := rtsp.Open(uri)
if err != nil {
slog.Error(
"open rtsp",
"error", err,
)
return
}
go func() {
for {
pkt, err := stream.ReadPacket()
if err != nil {
if err == io.EOF {
os.Exit(0)
}
slog.Error(
"read packet",
"error", err,
)
continue
}
if pkt.IsVideo() {
if err := s.Update(pkt.Data()); err != nil {
slog.Error(
"write packet",
"error", err,
)
}
}
}
}()
streamHandler := func(w http.ResponseWriter, r *http.Request) {
s.ServeHTTP(w, r)
}
router := mux.NewRouter()
router.HandleFunc("/stream", streamHandler)
http.Handle("/", router)
if err := http.ListenAndServe(":8181", nil); err != nil {
slog.Error(
"listen",
"error", err,
)
}
}

View File

@@ -11,14 +11,26 @@ import (
"github.com/mattn/go-mjpeg"
)
const uri = "rtsp://admin:admin@127.0.0.1:554"
const uri = "rtsp://192.168.139.24:8554/mystream"
func main() {
// Set ffmpeg log level
rtsp.SetLogLevel(rtsp.AV_LOG_QUIET)
// Create mjpeg instance
s := mjpeg.NewStream()
stream := rtsp.New(uri)
// Prepare stream
stream := rtsp.New(uri,
// Set transport
rtsp.WithType(rtsp.Tcp),
err := stream.Setup(rtsp.Tcp) // or rtsp.Udp or rtsp.Auto
// Set timeout
// rtsp.WithTimeout("1000"),
)
// Setup and open stream
err := stream.Setup()
if err != nil {
slog.Error(
"setup stream",

44
opts.go Normal file
View File

@@ -0,0 +1,44 @@
package rtsp
/*
#include "ffmpeg.h"
*/
import "C"
import (
"unsafe"
)
type StreamOption func(*Stream)
func WithTimeout(timeout string) StreamOption {
return func(stream *Stream) {
timeoutKey := C.CString("listen_timeout")
defer C.free(unsafe.Pointer(timeoutKey))
timeoutStr := C.CString(timeout)
defer C.free(unsafe.Pointer(timeoutStr))
C.av_dict_set(&stream.dictionary, timeoutKey, timeoutStr, 0)
}
}
func WithType(streamType Type) StreamOption {
return func(stream *Stream) {
transport := C.CString("rtsp_transport")
defer C.free(unsafe.Pointer(transport))
switch streamType {
case Tcp:
tcp := C.CString("tcp")
defer C.free(unsafe.Pointer(tcp))
C.av_dict_set(&stream.dictionary, transport, tcp, 0)
case Udp:
udp := C.CString("udp")
defer C.free(unsafe.Pointer(udp))
C.av_dict_set(&stream.dictionary, transport, udp, 0)
default:
}
}
}

View File

@@ -1,7 +1,7 @@
package rtsp
// Open rtsp stream or file
func Open(uri string) (*Stream, error) {
stream := New(uri)
return stream, stream.Setup(Auto)
func Open(uri string, opts ...StreamOption) (*Stream, error) {
stream := New(uri, opts...)
return stream, stream.Setup()
}

View File

@@ -7,20 +7,36 @@ import "C"
import (
"fmt"
"io"
"os"
"runtime"
"sync"
"unsafe"
)
type LogLevel C.int
const (
AV_LOG_QUIET = C.AV_LOG_QUIET
AV_LOG_PANIC = C.AV_LOG_PANIC
AV_LOG_FATAL = C.AV_LOG_FATAL
AV_LOG_ERROR = C.AV_LOG_ERROR
AV_LOG_WARNING = C.AV_LOG_WARNING
AV_LOG_INFO = C.AV_LOG_INFO
AV_LOG_VERBOSE = C.AV_LOG_VERBOSE
AV_LOG_DEBUG = C.AV_LOG_DEBUG
AV_LOG_TRACE = C.AV_LOG_TRACE
)
// SetLogLevel ffmpeg log level
func SetLogLevel(logLevel LogLevel) {
C.av_log_set_level(C.int(logLevel))
}
// Type rtsp transport protocol
type Type int
const (
// Auto auto change
Auto Type = iota
// Tcp use tcp transport protocol
Tcp
Tcp = iota
// Udp use udp transport protocol
Udp
)
@@ -36,12 +52,14 @@ type Stream struct {
}
// New media stream
func New(uri string) (stream *Stream) {
func New(uri string, opts ...StreamOption) (stream *Stream) {
stream = &Stream{uri: uri}
stream.decoders = make(map[int]*decoder)
stream.formatCtx = C.avformat_alloc_context()
C.av_log_set_level(C.AV_LOG_QUIET)
for _, opt := range opts {
opt(stream)
}
runtime.SetFinalizer(stream, free)
return
@@ -70,38 +88,8 @@ func (e ErrTimeout) Error() string {
return e.err.Error()
}
// Setup transport protocol (tcp, udp or auto)
func (stream *Stream) Setup(t Type) (err error) {
transport := C.CString("rtsp_transport")
defer C.free(unsafe.Pointer(transport))
tcp := C.CString("tcp")
defer C.free(unsafe.Pointer(tcp))
udp := C.CString("udp")
defer C.free(unsafe.Pointer(udp))
timeoutKey := C.CString("timeout")
defer C.free(unsafe.Pointer(timeoutKey))
goTimeout := os.Getenv("FFMPEG_TIMEOUT")
if goTimeout == "" {
goTimeout = "10000000"
}
timeout := C.CString(goTimeout)
defer C.free(unsafe.Pointer(timeout))
C.av_dict_set(&stream.dictionary, timeoutKey, timeout, 0)
switch t {
case Tcp:
C.av_dict_set(&stream.dictionary, transport, tcp, 0)
case Udp:
C.av_dict_set(&stream.dictionary, transport, udp, 0)
default:
}
// Setup stream
func (stream *Stream) Setup() (err error) {
uri := C.CString(stream.uri)
defer C.free(unsafe.Pointer(uri))

View File

@@ -13,11 +13,11 @@ import (
// CErr2Str convert C error code to Go string
func CErr2Str(code C.int) string {
buf := make([]byte, 64)
buf := make([]byte, 1024)
C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.ulonglong(len(buf)))
return string(buf)
return string(buf[:bytes.Index(buf, []byte{0})])
}
func swrAllocSetOpts(layout uint64, sampleRate C.int, sampleFmt int32) *C.SwrContext {

View File

@@ -8,16 +8,17 @@ package rtsp
*/
import "C"
import (
"bytes"
"unsafe"
)
// CErr2Str convert C error code to Go string
func CErr2Str(code C.int) string {
buf := make([]byte, 64)
buf := make([]byte, 1024)
C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.ulong(len(buf)))
C.av_strerror(code, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))
return string(buf)
return string(buf[:bytes.Index(buf, []byte{0})])
}
func swrAllocSetOpts(layout uint64, sampleRate C.int, sampleFmt int32) *C.SwrContext {