mirror of
				https://github.com/AlexxIT/go2rtc.git
				synced 2025-10-31 19:53:02 +08:00 
			
		
		
		
	Compare commits
	
		
			29 Commits
		
	
	
		
			v0.1-beta.
			...
			v0.1-rc.1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d8158bc1e3 | ||
|   | f4f588d2c6 | ||
|   | e287b52808 | ||
|   | ff96257252 | ||
|   | 909f21b7e4 | ||
|   | 7d6a5b44f8 | ||
|   | 278f7696b6 | ||
|   | 3cbf2465ae | ||
|   | e9ea7a0b1f | ||
|   | 0231fc3a90 | ||
|   | 9ef2633840 | ||
|   | 5a8df3e90a | ||
|   | a31cbec3eb | ||
|   | 54f547977e | ||
|   | 65d91e02bd | ||
|   | 7fc3f0f641 | ||
|   | 7725d5ed31 | ||
|   | 6c1b9daa8b | ||
|   | 6d432574bf | ||
|   | 616f69c88b | ||
|   | f72440712b | ||
|   | ceed146fb8 | ||
|   | f17dadbbbf | ||
|   | 3d4514eab9 | ||
|   | 2629dccb81 | ||
|   | 04f1aa2900 | ||
|   | 0dacdea1c3 | ||
|   | 24082b1616 | ||
|   | 7964b1743b | 
							
								
								
									
										10
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								README.md
									
									
									
									
									
								
							| @@ -172,6 +172,8 @@ streams: | ||||
|     - rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1 | ||||
| ``` | ||||
|  | ||||
| **PS.** For disable bachannel just add `#backchannel=0` to end of RTSP link. | ||||
|  | ||||
| #### Source: RTMP | ||||
|  | ||||
| You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio. | ||||
| @@ -385,13 +387,13 @@ ngrok: | ||||
|   command: ... | ||||
| ``` | ||||
|  | ||||
| **Own TCP-tunnel** | ||||
| **Hard tech way 1. Own TCP-tunnel** | ||||
|  | ||||
| If you have personal VPS, you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config. | ||||
| If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config. | ||||
|  | ||||
| **Using TURN-server** | ||||
| **Hard tech way 2. Using TURN-server** | ||||
|  | ||||
| TODO... | ||||
| If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can install TURN server (e.g. [coturn](https://github.com/coturn/coturn), config [example](https://github.com/AlexxIT/WebRTC/wiki/Coturn-Example)). | ||||
|  | ||||
| ```yaml | ||||
| webrtc: | ||||
|   | ||||
| @@ -3,6 +3,7 @@ package app | ||||
| import ( | ||||
| 	"flag" | ||||
| 	"github.com/rs/zerolog" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| 	"io" | ||||
| 	"os" | ||||
| @@ -30,10 +31,18 @@ func Init() { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"]) | ||||
|  | ||||
| 	modules = cfg.Mod | ||||
|  | ||||
| 	path, _ := os.Getwd() | ||||
| 	log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH). | ||||
| 		Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]") | ||||
| } | ||||
|  | ||||
| func NewLogger(format string, level string) zerolog.Logger { | ||||
| 	var writer io.Writer = os.Stdout | ||||
|  | ||||
| 	// styles | ||||
| 	format := cfg.Mod["format"] | ||||
| 	if format != "json" { | ||||
| 		writer = zerolog.ConsoleWriter{ | ||||
| 			Out: writer, TimeFormat: "15:04:05.000", | ||||
| @@ -43,18 +52,12 @@ func Init() { | ||||
|  | ||||
| 	zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs | ||||
|  | ||||
| 	lvl, err := zerolog.ParseLevel(cfg.Mod["level"]) | ||||
| 	lvl, err := zerolog.ParseLevel(level) | ||||
| 	if err != nil || lvl == zerolog.NoLevel { | ||||
| 		lvl = zerolog.InfoLevel | ||||
| 	} | ||||
|  | ||||
| 	log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl) | ||||
|  | ||||
| 	modules = cfg.Mod | ||||
|  | ||||
| 	path, _ := os.Getwd() | ||||
| 	log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH). | ||||
| 		Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]") | ||||
| 	return zerolog.New(writer).With().Timestamp().Logger().Level(lvl) | ||||
| } | ||||
|  | ||||
| func LoadConfig(v interface{}) { | ||||
| @@ -68,15 +71,13 @@ func LoadConfig(v interface{}) { | ||||
| func GetLogger(module string) zerolog.Logger { | ||||
| 	if s, ok := modules[module]; ok { | ||||
| 		lvl, err := zerolog.ParseLevel(s) | ||||
| 		if err != nil { | ||||
| 			log.Warn().Err(err).Msg("[log]") | ||||
| 			return log | ||||
| 		if err == nil { | ||||
| 			return log.Level(lvl) | ||||
| 		} | ||||
|  | ||||
| 		return log.Level(lvl) | ||||
| 		log.Warn().Err(err).Caller().Send() | ||||
| 	} | ||||
|  | ||||
| 	return log | ||||
| 	return log.Logger | ||||
| } | ||||
|  | ||||
| // internal | ||||
| @@ -84,8 +85,5 @@ func GetLogger(module string) zerolog.Logger { | ||||
| // data - config content | ||||
| var data []byte | ||||
|  | ||||
| // log - main logger | ||||
| var log zerolog.Logger | ||||
|  | ||||
| // modules log levels | ||||
| var modules map[string]string | ||||
|   | ||||
| @@ -21,6 +21,7 @@ var stackSkip = [][]byte{ | ||||
| 	[]byte("created by net/http.(*Server).Serve"), // TODO: why two? | ||||
|  | ||||
| 	[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"), | ||||
| 	[]byte("created by github.com/AlexxIT/go2rtc/cmd/srtp.Init"), | ||||
|  | ||||
| 	// webrtc/api.go | ||||
| 	[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import ( | ||||
| 	"os" | ||||
| 	"os/exec" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| @@ -23,22 +24,22 @@ func Init() { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	rtsp.OnProducer = func(prod streamer.Producer) bool { | ||||
| 		if conn := prod.(*pkg.Conn); conn != nil { | ||||
| 			if waiter := waiters[conn.URL.Path]; waiter != nil { | ||||
| 				waiter <- prod | ||||
| 				return true | ||||
| 			} | ||||
| 	rtsp.HandleFunc(func(conn *pkg.Conn) bool { | ||||
| 		waitersMu.Lock() | ||||
| 		waiter := waiters[conn.URL.Path] | ||||
| 		waitersMu.Unlock() | ||||
|  | ||||
| 		if waiter == nil { | ||||
| 			return false | ||||
| 		} | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 		waiter <- conn | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	streams.HandleFunc("exec", Handle) | ||||
|  | ||||
| 	log = app.GetLogger("exec") | ||||
|  | ||||
| 	// TODO: add sync.Mutex | ||||
| 	waiters = map[string]chan streamer.Producer{} | ||||
| } | ||||
|  | ||||
| func Handle(url string) (streamer.Producer, error) { | ||||
| @@ -60,8 +61,15 @@ func Handle(url string) (streamer.Producer, error) { | ||||
|  | ||||
| 	ch := make(chan streamer.Producer) | ||||
|  | ||||
| 	waitersMu.Lock() | ||||
| 	waiters[path] = ch | ||||
| 	defer delete(waiters, path) | ||||
| 	waitersMu.Unlock() | ||||
|  | ||||
| 	defer func() { | ||||
| 		waitersMu.Lock() | ||||
| 		delete(waiters, path) | ||||
| 		waitersMu.Unlock() | ||||
| 	}() | ||||
|  | ||||
| 	log.Debug().Str("url", url).Msg("[exec] run") | ||||
|  | ||||
| @@ -86,4 +94,5 @@ func Handle(url string) (streamer.Producer, error) { | ||||
| // internal | ||||
|  | ||||
| var log zerolog.Logger | ||||
| var waiters map[string]chan streamer.Producer | ||||
| var waiters = map[string]chan streamer.Producer{} | ||||
| var waitersMu sync.Mutex | ||||
|   | ||||
| @@ -23,7 +23,7 @@ func Init() { | ||||
| 		// inputs | ||||
| 		"file": "-re -stream_loop -1 -i {input}", | ||||
| 		"http": "-fflags nobuffer -flags low_delay -i {input}", | ||||
| 		"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}", | ||||
| 		"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -timeout 5000000 -i {input}", | ||||
|  | ||||
| 		// output | ||||
| 		"output": "-rtsp_transport tcp -f rtsp {output}", | ||||
|   | ||||
| @@ -55,6 +55,10 @@ func initAPI() { | ||||
| 		// /stream/{id}/channel/0/webrtc | ||||
| 		default: | ||||
| 			i := strings.IndexByte(r.RequestURI[8:], '/') | ||||
| 			if i <= 0 { | ||||
| 				log.Warn().Msgf("wrong request: %s", r.RequestURI) | ||||
| 				return | ||||
| 			} | ||||
| 			name := r.RequestURI[8 : 8+i] | ||||
|  | ||||
| 			stream := streams.Get(name) | ||||
|   | ||||
| @@ -10,12 +10,47 @@ import ( | ||||
| ) | ||||
|  | ||||
| func Init() { | ||||
| 	api.HandleFunc("api/stream.mjpeg", handler) | ||||
| 	api.HandleFunc("api/frame.jpeg", handlerKeyframe) | ||||
| 	api.HandleFunc("api/stream.mjpeg", handlerStream) | ||||
| } | ||||
|  | ||||
| func handlerKeyframe(w http.ResponseWriter, r *http.Request) { | ||||
| 	src := r.URL.Query().Get("src") | ||||
| 	stream := streams.GetOrNew(src) | ||||
| 	if stream == nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	exit := make(chan []byte) | ||||
|  | ||||
| 	cons := &mjpeg.Consumer{} | ||||
| 	cons.Listen(func(msg interface{}) { | ||||
| 		switch msg := msg.(type) { | ||||
| 		case []byte: | ||||
| 			exit <- msg | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	if err := stream.AddConsumer(cons); err != nil { | ||||
| 		log.Error().Err(err).Caller().Send() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	data := <-exit | ||||
|  | ||||
| 	stream.RemoveConsumer(cons) | ||||
|  | ||||
| 	w.Header().Set("Content-Type", "image/jpeg") | ||||
| 	w.Header().Set("Content-Length", strconv.Itoa(len(data))) | ||||
|  | ||||
| 	if _, err := w.Write(data); err != nil { | ||||
| 		log.Error().Err(err).Caller().Send() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: " | ||||
|  | ||||
| func handler(w http.ResponseWriter, r *http.Request) { | ||||
| func handlerStream(w http.ResponseWriter, r *http.Request) { | ||||
| 	src := r.URL.Query().Get("src") | ||||
| 	stream := streams.GetOrNew(src) | ||||
| 	if stream == nil { | ||||
|   | ||||
| @@ -8,6 +8,8 @@ import ( | ||||
|  | ||||
| func Init() { | ||||
| 	streams.HandleFunc("rtmp", handle) | ||||
| 	streams.HandleFunc("http", handle) | ||||
| 	streams.HandleFunc("https", handle) | ||||
| } | ||||
|  | ||||
| func handle(url string) (streamer.Producer, error) { | ||||
|   | ||||
							
								
								
									
										187
									
								
								cmd/rtsp/rtsp.go
									
									
									
									
									
								
							
							
						
						
									
										187
									
								
								cmd/rtsp/rtsp.go
									
									
									
									
									
								
							| @@ -32,20 +32,43 @@ func Init() { | ||||
|  | ||||
| 	// RTSP server support | ||||
| 	address := conf.Mod.Listen | ||||
| 	if address != "" { | ||||
| 		_, Port, _ = net.SplitHostPort(address) | ||||
|  | ||||
| 		go worker(address) | ||||
| 	if address == "" { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	ln, err := net.Listen("tcp", address) | ||||
| 	if err != nil { | ||||
| 		log.Error().Err(err).Msg("[rtsp] listen") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	_, Port, _ = net.SplitHostPort(address) | ||||
|  | ||||
| 	log.Info().Str("addr", address).Msg("[rtsp] listen") | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			conn, err := ln.Accept() | ||||
| 			if err != nil { | ||||
| 				return | ||||
| 			} | ||||
| 			go tcpHandler(conn) | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| type Handler func(conn *rtsp.Conn) bool | ||||
|  | ||||
| func HandleFunc(handler Handler) { | ||||
| 	handlers = append(handlers, handler) | ||||
| } | ||||
|  | ||||
| var Port string | ||||
|  | ||||
| var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite... | ||||
|  | ||||
| // internal | ||||
|  | ||||
| var log zerolog.Logger | ||||
| var handlers []Handler | ||||
|  | ||||
| func rtspHandler(url string) (streamer.Producer, error) { | ||||
| 	backchannel := true | ||||
| @@ -84,10 +107,10 @@ func rtspHandler(url string) (streamer.Producer, error) { | ||||
| 		} | ||||
|  | ||||
| 		// second try without backchannel, we need to reconnect | ||||
| 		conn.Backchannel = false | ||||
| 		if err = conn.Dial(); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		conn.Backchannel = false | ||||
| 		if err = conn.Describe(); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| @@ -96,101 +119,89 @@ func rtspHandler(url string) (streamer.Producer, error) { | ||||
| 	return conn, nil | ||||
| } | ||||
|  | ||||
| func worker(address string) { | ||||
| 	srv, err := tcp.NewServer(address) | ||||
| 	if err != nil { | ||||
| 		log.Error().Err(err).Msg("[rtsp] listen") | ||||
| 		return | ||||
| 	} | ||||
| func tcpHandler(c net.Conn) { | ||||
| 	var name string | ||||
| 	var closer func() | ||||
|  | ||||
| 	log.Info().Str("addr", address).Msg("[rtsp] listen") | ||||
| 	trace := log.Trace().Enabled() | ||||
|  | ||||
| 	srv.Listen(func(msg interface{}) { | ||||
| 		switch msg.(type) { | ||||
| 		case net.Conn: | ||||
| 			var name string | ||||
| 			var onDisconnect func() | ||||
| 	conn := rtsp.NewServer(c) | ||||
| 	conn.Listen(func(msg interface{}) { | ||||
| 		if trace { | ||||
| 			switch msg := msg.(type) { | ||||
| 			case *tcp.Request: | ||||
| 				log.Trace().Msgf("[rtsp] server request:\n%s", msg) | ||||
| 			case *tcp.Response: | ||||
| 				log.Trace().Msgf("[rtsp] server response:\n%s", msg) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 			trace := log.Trace().Enabled() | ||||
| 		switch msg { | ||||
| 		case rtsp.MethodDescribe: | ||||
| 			name = conn.URL.Path[1:] | ||||
|  | ||||
| 			conn := rtsp.NewServer(msg.(net.Conn)) | ||||
| 			conn.Listen(func(msg interface{}) { | ||||
| 				if trace { | ||||
| 					switch msg := msg.(type) { | ||||
| 					case *tcp.Request: | ||||
| 						log.Trace().Msgf("[rtsp] server request:\n%s", msg) | ||||
| 					case *tcp.Response: | ||||
| 						log.Trace().Msgf("[rtsp] server response:\n%s", msg) | ||||
| 					} | ||||
| 				} | ||||
|  | ||||
| 				switch msg { | ||||
| 				case rtsp.MethodDescribe: | ||||
| 					name = conn.URL.Path[1:] | ||||
|  | ||||
| 					log.Debug().Str("stream", name).Msg("[rtsp] new consumer") | ||||
|  | ||||
| 					stream := streams.Get(name) // TODO: rewrite | ||||
| 					if stream == nil { | ||||
| 						return | ||||
| 					} | ||||
|  | ||||
| 					initMedias(conn) | ||||
|  | ||||
| 					if err = stream.AddConsumer(conn); err != nil { | ||||
| 						log.Warn().Err(err).Str("stream", name).Msg("[rtsp]") | ||||
| 						return | ||||
| 					} | ||||
|  | ||||
| 					onDisconnect = func() { | ||||
| 						stream.RemoveConsumer(conn) | ||||
| 					} | ||||
|  | ||||
| 				case rtsp.MethodAnnounce: | ||||
| 					if OnProducer != nil { | ||||
| 						if OnProducer(conn) { | ||||
| 							return | ||||
| 						} | ||||
| 					} | ||||
|  | ||||
| 					name = conn.URL.Path[1:] | ||||
|  | ||||
| 					log.Debug().Str("stream", name).Msg("[rtsp] new producer") | ||||
|  | ||||
| 					stream := streams.Get(name) | ||||
| 					if stream == nil { | ||||
| 						return | ||||
| 					} | ||||
|  | ||||
| 					stream.AddProducer(conn) | ||||
|  | ||||
| 					onDisconnect = func() { | ||||
| 						stream.RemoveProducer(conn) | ||||
| 					} | ||||
|  | ||||
| 				case streamer.StatePlaying: | ||||
| 					log.Debug().Str("stream", name).Msg("[rtsp] start") | ||||
| 				} | ||||
| 			}) | ||||
|  | ||||
| 			if err = conn.Accept(); err != nil { | ||||
| 				log.Warn().Err(err).Msg("[rtsp] accept") | ||||
| 			stream := streams.Get(name) | ||||
| 			if stream == nil { | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			if err = conn.Handle(); err != nil { | ||||
| 				//log.Warn().Err(err).Msg("[rtsp] handle server") | ||||
| 			log.Debug().Str("stream", name).Msg("[rtsp] new consumer") | ||||
|  | ||||
| 			initMedias(conn) | ||||
|  | ||||
| 			if err := stream.AddConsumer(conn); err != nil { | ||||
| 				log.Warn().Err(err).Str("stream", name).Msg("[rtsp]") | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			if onDisconnect != nil { | ||||
| 				onDisconnect() | ||||
| 			closer = func() { | ||||
| 				stream.RemoveConsumer(conn) | ||||
| 			} | ||||
|  | ||||
| 			log.Debug().Str("stream", name).Msg("[rtsp] disconnect") | ||||
| 		case rtsp.MethodAnnounce: | ||||
| 			name = conn.URL.Path[1:] | ||||
|  | ||||
| 			stream := streams.Get(name) | ||||
| 			if stream == nil { | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			log.Debug().Str("stream", name).Msg("[rtsp] new producer") | ||||
|  | ||||
| 			stream.AddProducer(conn) | ||||
|  | ||||
| 			closer = func() { | ||||
| 				stream.RemoveProducer(conn) | ||||
| 			} | ||||
|  | ||||
| 		case streamer.StatePlaying: | ||||
| 			log.Debug().Str("stream", name).Msg("[rtsp] start") | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	srv.Serve() | ||||
| 	if err := conn.Accept(); err != nil { | ||||
| 		log.Warn().Err(err).Caller().Send() | ||||
| 		_ = conn.Close() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	for _, handler := range handlers { | ||||
| 		if handler(conn) { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if closer != nil { | ||||
| 		if err := conn.Handle(); err != nil { | ||||
| 			log.Debug().Err(err).Caller().Send() | ||||
| 		} | ||||
|  | ||||
| 		closer() | ||||
|  | ||||
| 		log.Debug().Str("stream", name).Msg("[rtsp] disconnect") | ||||
| 	} | ||||
|  | ||||
| 	_ = conn.Close() | ||||
| } | ||||
|  | ||||
| func initMedias(conn *rtsp.Conn) { | ||||
|   | ||||
| @@ -4,30 +4,36 @@ import ( | ||||
| 	"fmt" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/streamer" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| ) | ||||
|  | ||||
| type Handler func(url string) (streamer.Producer, error) | ||||
|  | ||||
| var handlers map[string]Handler | ||||
| var handlers = map[string]Handler{} | ||||
| var handlersMu sync.Mutex | ||||
|  | ||||
| func HandleFunc(scheme string, handler Handler) { | ||||
| 	if handlers == nil { | ||||
| 		handlers = make(map[string]Handler) | ||||
| 	} | ||||
| 	handlersMu.Lock() | ||||
| 	handlers[scheme] = handler | ||||
| 	handlersMu.Unlock() | ||||
| } | ||||
|  | ||||
| func getHandler(url string) Handler { | ||||
| 	i := strings.IndexByte(url, ':') | ||||
| 	if i <= 0 { // TODO: i < 4 ? | ||||
| 		return nil | ||||
| 	} | ||||
| 	handlersMu.Lock() | ||||
| 	defer handlersMu.Unlock() | ||||
| 	return handlers[url[:i]] | ||||
| } | ||||
|  | ||||
| func HasProducer(url string) bool { | ||||
| 	i := strings.IndexByte(url, ':') | ||||
| 	if i <= 0 { // TODO: i < 4 ? | ||||
| 		return false | ||||
| 	} | ||||
| 	return handlers[url[:i]] != nil | ||||
| 	return getHandler(url) != nil | ||||
| } | ||||
|  | ||||
| func GetProducer(url string) (streamer.Producer, error) { | ||||
| 	i := strings.IndexByte(url, ':') | ||||
| 	handler := handlers[url[:i]] | ||||
| 	handler := getHandler(url) | ||||
| 	if handler == nil { | ||||
| 		return nil, fmt.Errorf("unsupported scheme: %s", url) | ||||
| 	} | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import ( | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/streamer" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type state byte | ||||
| @@ -24,8 +25,9 @@ type Producer struct { | ||||
| 	element streamer.Producer | ||||
| 	tracks  []*streamer.Track | ||||
|  | ||||
| 	state state | ||||
| 	mx    sync.Mutex | ||||
| 	state   state | ||||
| 	mu      sync.Mutex | ||||
| 	restart *time.Timer | ||||
| } | ||||
|  | ||||
| func (p *Producer) SetSource(s string) { | ||||
| @@ -36,16 +38,16 @@ func (p *Producer) SetSource(s string) { | ||||
| } | ||||
|  | ||||
| func (p *Producer) GetMedias() []*streamer.Media { | ||||
| 	p.mx.Lock() | ||||
| 	defer p.mx.Unlock() | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
|  | ||||
| 	if p.state == stateNone { | ||||
| 		log.Debug().Str("url", p.url).Msg("[streams] probe producer") | ||||
| 		log.Debug().Msgf("[streams] probe producer url=%s", p.url) | ||||
|  | ||||
| 		var err error | ||||
| 		p.element, err = GetProducer(p.url) | ||||
| 		if err != nil || p.element == nil { | ||||
| 			log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer") | ||||
| 			log.Error().Err(err).Caller().Send() | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| @@ -56,14 +58,17 @@ func (p *Producer) GetMedias() []*streamer.Media { | ||||
| } | ||||
|  | ||||
| func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { | ||||
| 	p.mx.Lock() | ||||
| 	defer p.mx.Unlock() | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
|  | ||||
| 	if p.state == stateMedias { | ||||
| 		p.state = stateTracks | ||||
| 	if p.state == stateNone { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	track := p.element.GetTrack(media, codec) | ||||
| 	if track == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	for _, t := range p.tracks { | ||||
| 		if track == t { | ||||
| @@ -71,6 +76,10 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if p.state == stateMedias { | ||||
| 		p.state = stateTracks | ||||
| 	} | ||||
|  | ||||
| 	p.tracks = append(p.tracks, track) | ||||
|  | ||||
| 	return track | ||||
| @@ -79,36 +88,89 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea | ||||
| // internals | ||||
|  | ||||
| func (p *Producer) start() { | ||||
| 	p.mx.Lock() | ||||
| 	defer p.mx.Unlock() | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
|  | ||||
| 	if p.state != stateTracks { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	log.Debug().Str("url", p.url).Msg("[streams] start producer") | ||||
| 	log.Debug().Msgf("[streams] start producer url=%s", p.url) | ||||
|  | ||||
| 	p.state = stateStart | ||||
| 	go func() { | ||||
| 		// safe read element while mu locked | ||||
| 		if err := p.element.Start(); err != nil { | ||||
| 			log.Warn().Err(err).Str("url", p.url).Msg("[streams] start") | ||||
| 			log.Warn().Err(err).Caller().Send() | ||||
| 		} | ||||
| 		p.reconnect() | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (p *Producer) reconnect() { | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
|  | ||||
| 	if p.state != stateStart { | ||||
| 		log.Trace().Msgf("[streams] stop reconnect url=%s", p.url) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	log.Debug().Msgf("[streams] reconnect to url=%s", p.url) | ||||
|  | ||||
| 	var err error | ||||
| 	p.element, err = GetProducer(p.url) | ||||
| 	if err != nil || p.element == nil { | ||||
| 		log.Debug().Err(err).Caller().Send() | ||||
| 		// TODO: dynamic timeout | ||||
| 		p.restart = time.AfterFunc(30*time.Second, p.reconnect) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	medias := p.element.GetMedias() | ||||
|  | ||||
| 	// convert all old producer tracks to new tracks | ||||
| 	for i, oldTrack := range p.tracks { | ||||
| 		// match new element medias with old track codec | ||||
| 		for _, media := range medias { | ||||
| 			codec := media.MatchCodec(oldTrack.Codec) | ||||
| 			if codec == nil { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// move sink from old track to new track | ||||
| 			newTrack := p.element.GetTrack(media, codec) | ||||
| 			newTrack.GetSink(oldTrack) | ||||
| 			p.tracks[i] = newTrack | ||||
|  | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		if err = p.element.Start(); err != nil { | ||||
| 			log.Debug().Err(err).Caller().Send() | ||||
| 		} | ||||
| 		p.reconnect() | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (p *Producer) stop() { | ||||
| 	p.mx.Lock() | ||||
| 	p.mu.Lock() | ||||
|  | ||||
| 	log.Debug().Str("url", p.url).Msg("[streams] stop producer") | ||||
| 	log.Debug().Msgf("[streams] stop producer url=%s", p.url) | ||||
|  | ||||
| 	if p.element != nil { | ||||
| 		_ = p.element.Stop() | ||||
| 		p.element = nil | ||||
| 	} else { | ||||
| 		log.Warn().Str("url", p.url).Msg("[streams] stop empty producer") | ||||
| 	} | ||||
| 	p.tracks = nil | ||||
| 	p.state = stateNone | ||||
| 	if p.restart != nil { | ||||
| 		p.restart.Stop() | ||||
| 		p.restart = nil | ||||
| 	} | ||||
|  | ||||
| 	p.mx.Unlock() | ||||
| 	p.state = stateNone | ||||
| 	p.tracks = nil | ||||
|  | ||||
| 	p.mu.Unlock() | ||||
| } | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/streamer" | ||||
| 	"sync" | ||||
| ) | ||||
|  | ||||
| type Consumer struct { | ||||
| @@ -14,6 +15,7 @@ type Consumer struct { | ||||
| type Stream struct { | ||||
| 	producers []*Producer | ||||
| 	consumers []*Consumer | ||||
| 	mu        sync.Mutex | ||||
| } | ||||
|  | ||||
| func NewStream(source interface{}) *Stream { | ||||
| @@ -51,18 +53,19 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { | ||||
| 	ic := len(s.consumers) | ||||
|  | ||||
| 	consumer := &Consumer{element: cons} | ||||
| 	var producers []*Producer // matched producers for consumer | ||||
|  | ||||
| 	// Step 1. Get consumer medias | ||||
| 	for icc, consMedia := range cons.GetMedias() { | ||||
| 		log.Trace().Stringer("media", consMedia). | ||||
| 			Msgf("[streams] consumer:%d:%d candidate", ic, icc) | ||||
| 			Msgf("[streams] consumer=%d candidate=%d", ic, icc) | ||||
|  | ||||
| 	producers: | ||||
| 		for ip, prod := range s.producers { | ||||
| 			// Step 2. Get producer medias (not tracks yet) | ||||
| 			for ipc, prodMedia := range prod.GetMedias() { | ||||
| 				log.Trace().Stringer("media", prodMedia). | ||||
| 					Msgf("[streams] producer:%d:%d candidate", ip, ipc) | ||||
| 					Msgf("[streams] producer=%d candidate=%d", ip, ipc) | ||||
|  | ||||
| 				// Step 3. Match consumer/producer codecs list | ||||
| 				prodCodec := prodMedia.MatchMedia(consMedia) | ||||
| @@ -81,20 +84,23 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { | ||||
| 					consTrack := consumer.element.AddTrack(consMedia, prodTrack) | ||||
|  | ||||
| 					consumer.tracks = append(consumer.tracks, consTrack) | ||||
| 					producers = append(producers, prod) | ||||
| 					break producers | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// can't match tracks for consumer | ||||
| 	if len(consumer.tracks) == 0 { | ||||
| 	if len(producers) == 0 { | ||||
| 		return errors.New("couldn't find the matching tracks") | ||||
| 	} | ||||
|  | ||||
| 	s.mu.Lock() | ||||
| 	s.consumers = append(s.consumers, consumer) | ||||
| 	s.mu.Unlock() | ||||
|  | ||||
| 	for _, prod := range s.producers { | ||||
| 	// there may be duplicates, but that's not a problem | ||||
| 	for _, prod := range producers { | ||||
| 		prod.start() | ||||
| 	} | ||||
|  | ||||
| @@ -102,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { | ||||
| } | ||||
|  | ||||
| func (s *Stream) RemoveConsumer(cons streamer.Consumer) { | ||||
| 	s.mu.Lock() | ||||
| 	for i, consumer := range s.consumers { | ||||
| 		if consumer == nil { | ||||
| 			log.Warn().Msgf("empty consumer: %+v\n", s) | ||||
| @@ -127,7 +134,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { | ||||
|  | ||||
| 		var sink bool | ||||
| 		for _, track := range producer.tracks { | ||||
| 			if len(track.Sink) > 0 { | ||||
| 			if track.HasSink() { | ||||
| 				sink = true | ||||
| 			} | ||||
| 		} | ||||
| @@ -135,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { | ||||
| 			producer.stop() | ||||
| 		} | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| } | ||||
|  | ||||
| func (s *Stream) AddProducer(prod streamer.Producer) { | ||||
| 	producer := &Producer{element: prod, state: stateTracks} | ||||
| 	s.mu.Lock() | ||||
| 	s.producers = append(s.producers, producer) | ||||
| 	s.mu.Unlock() | ||||
| } | ||||
|  | ||||
| func (s *Stream) RemoveProducer(prod streamer.Producer) { | ||||
| 	s.mu.Lock() | ||||
| 	for i, producer := range s.producers { | ||||
| 		if producer.element == prod { | ||||
| 			s.removeProducer(i) | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| } | ||||
|  | ||||
| func (s *Stream) Active() bool { | ||||
| 	if len(s.consumers) > 0 { | ||||
| 		return true | ||||
| 	} | ||||
|  | ||||
| 	for _, prod := range s.producers { | ||||
| 		if prod.element != nil { | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return false | ||||
| } | ||||
| //func (s *Stream) Active() bool { | ||||
| //	if len(s.consumers) > 0 { | ||||
| //		return true | ||||
| //	} | ||||
| // | ||||
| //	for _, prod := range s.producers { | ||||
| //		if prod.element != nil { | ||||
| //			return true | ||||
| //		} | ||||
| //	} | ||||
| // | ||||
| //	return false | ||||
| //} | ||||
|  | ||||
| func (s *Stream) MarshalJSON() ([]byte, error) { | ||||
| 	var v []interface{} | ||||
| 	s.mu.Lock() | ||||
| 	for _, prod := range s.producers { | ||||
| 		if prod.element != nil { | ||||
| 			v = append(v, prod.element) | ||||
| @@ -176,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) { | ||||
| 		// cons.element always not nil | ||||
| 		v = append(v, cons.element) | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| 	if len(v) == 0 { | ||||
| 		v = nil | ||||
| 	} | ||||
|   | ||||
| @@ -8,7 +8,9 @@ import ( | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/webrtc" | ||||
| 	pion "github.com/pion/webrtc/v3" | ||||
| 	"github.com/rs/zerolog" | ||||
| 	"io/ioutil" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| ) | ||||
|  | ||||
| func Init() { | ||||
| @@ -55,6 +57,8 @@ func Init() { | ||||
|  | ||||
| 	api.HandleWS(webrtc.MsgTypeOffer, offerHandler) | ||||
| 	api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler) | ||||
|  | ||||
| 	api.HandleFunc("api/webrtc", syncHandler) | ||||
| } | ||||
|  | ||||
| var Port string | ||||
| @@ -137,6 +141,32 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) { | ||||
| 	ctx.Consumer = conn | ||||
| } | ||||
|  | ||||
| func syncHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	url := r.URL.Query().Get("src") | ||||
| 	stream := streams.Get(url) | ||||
| 	if stream == nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// get offer | ||||
| 	offer, err := ioutil.ReadAll(r.Body) | ||||
| 	if err != nil { | ||||
| 		log.Error().Err(err).Caller().Send() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	answer, err := ExchangeSDP(stream, string(offer), r.UserAgent()) | ||||
| 	if err != nil { | ||||
| 		log.Error().Err(err).Caller().Send() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// send SDP to client | ||||
| 	if _, err = w.Write([]byte(answer)); err != nil { | ||||
| 		log.Error().Err(err).Caller().Send() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func ExchangeSDP( | ||||
| 	stream *streams.Stream, offer string, userAgent string, | ||||
| ) (answer string, err error) { | ||||
|   | ||||
| @@ -8,11 +8,12 @@ import ( | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	NALUTypePFrame = 1 | ||||
| 	NALUTypeIFrame = 5 | ||||
| 	NALUTypeSEI    = 6 | ||||
| 	NALUTypeSPS    = 7 | ||||
| 	NALUTypePPS    = 8 | ||||
| 	NALUTypePFrame = 1 // Coded slice of a non-IDR picture | ||||
| 	NALUTypeIFrame = 5 // Coded slice of an IDR picture | ||||
| 	NALUTypeSEI    = 6 // Supplemental enhancement information (SEI) | ||||
| 	NALUTypeSPS    = 7 // Sequence parameter set | ||||
| 	NALUTypePPS    = 8 // Picture parameter set | ||||
| 	NALUTypeAUD    = 9 // Access unit delimiter | ||||
| ) | ||||
|  | ||||
| func NALUType(b []byte) byte { | ||||
|   | ||||
| @@ -19,11 +19,7 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc { | ||||
|  | ||||
| 	return func(push streamer.WriterFunc) streamer.WriterFunc { | ||||
| 		return func(packet *rtp.Packet) error { | ||||
| 			//fmt.Printf( | ||||
| 			//	"[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v\n", | ||||
| 			//	track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, | ||||
| 			//	packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker, | ||||
| 			//) | ||||
| 			//log.Printf("[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v", track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker) | ||||
|  | ||||
| 			payload, err := depack.Unmarshal(packet.Payload) | ||||
| 			if len(payload) == 0 || err != nil { | ||||
| @@ -40,11 +36,13 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc { | ||||
| 			} | ||||
|  | ||||
| 			if len(buf) == 0 { | ||||
| 				// Amcrest IP4M-1051: 9, 7, 8, 6, 28... | ||||
| 				// Amcrest IP4M-1051: 9, 6, 1 | ||||
| 				switch NALUType(payload) { | ||||
| 				case NALUTypeIFrame: | ||||
| 					// fix IFrame without SPS,PPS | ||||
| 					buf = append(buf, ps...) | ||||
| 				case NALUTypeSEI: | ||||
| 				case NALUTypeSEI, NALUTypeAUD: | ||||
| 					// fix ffmpeg with transcoding first frame | ||||
| 					i := int(4 + binary.BigEndian.Uint32(payload)) | ||||
|  | ||||
| @@ -72,10 +70,7 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc { | ||||
| 				buf = buf[:0] | ||||
| 			} | ||||
|  | ||||
| 			//fmt.Printf( | ||||
| 			//	"[AVC] %v, len: %d, %v\n", Types(payload), len(payload), | ||||
| 			//	reflect.ValueOf(buf).Pointer() == reflect.ValueOf(payload).Pointer(), | ||||
| 			//) | ||||
| 			//log.Printf("[AVC] %v, len: %d", Types(payload), len(payload)) | ||||
|  | ||||
| 			clone := *packet | ||||
| 			clone.Version = RTPPacketVersionAVC | ||||
|   | ||||
							
								
								
									
										3
									
								
								pkg/httpflv/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								pkg/httpflv/README.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,3 @@ | ||||
| ## Useful links | ||||
|  | ||||
| - https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779 | ||||
							
								
								
									
										100
									
								
								pkg/httpflv/httpflv.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										100
									
								
								pkg/httpflv/httpflv.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,100 @@ | ||||
| package httpflv | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"errors" | ||||
| 	"github.com/deepch/vdk/av" | ||||
| 	"github.com/deepch/vdk/codec/h264parser" | ||||
| 	"github.com/deepch/vdk/format/flv/flvio" | ||||
| 	"github.com/deepch/vdk/utils/bits/pio" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| ) | ||||
|  | ||||
| func Dial(uri string) (*Conn, error) { | ||||
| 	req, err := http.NewRequest("GET", uri, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	res, err := http.DefaultClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	c := Conn{ | ||||
| 		conn:   res.Body, | ||||
| 		reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize), | ||||
| 		buf:    make([]byte, 256), | ||||
| 	} | ||||
|  | ||||
| 	if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	flags, n, err := flvio.ParseFileHeader(c.buf) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if flags&flvio.FILE_HAS_VIDEO == 0 { | ||||
| 		return nil, errors.New("not supported") | ||||
| 	} | ||||
|  | ||||
| 	if _, err = c.reader.Discard(n); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &c, nil | ||||
| } | ||||
|  | ||||
| type Conn struct { | ||||
| 	conn      io.ReadCloser | ||||
| 	reader    *bufio.Reader | ||||
| 	buf       []byte | ||||
| } | ||||
|  | ||||
| func (c *Conn) Streams() ([]av.CodecData, error) { | ||||
| 	for { | ||||
| 		tag, _, err := flvio.ReadTag(c.reader, c.buf) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		return []av.CodecData{stream}, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c *Conn) ReadPacket() (av.Packet, error) { | ||||
| 	for { | ||||
| 		tag, ts, err := flvio.ReadTag(c.reader, c.buf) | ||||
| 		if err != nil { | ||||
| 			return av.Packet{}, err | ||||
| 		} | ||||
|  | ||||
| 		if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		return av.Packet{ | ||||
| 			Idx:             0, | ||||
| 			Data:            tag.Data, | ||||
| 			CompositionTime: flvio.TsToTime(tag.CompositionTime), | ||||
| 			IsKeyFrame:      tag.FrameType == flvio.FRAME_KEY, | ||||
| 			Time:            flvio.TsToTime(ts), | ||||
| 		}, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c *Conn) Close() (err error) { | ||||
| 	return c.conn.Close() | ||||
| } | ||||
| @@ -5,15 +5,24 @@ import ( | ||||
| 	"encoding/hex" | ||||
| 	"fmt" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/h264" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/httpflv" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/streamer" | ||||
| 	"github.com/deepch/vdk/av" | ||||
| 	"github.com/deepch/vdk/codec/aacparser" | ||||
| 	"github.com/deepch/vdk/codec/h264parser" | ||||
| 	"github.com/deepch/vdk/format/rtmp" | ||||
| 	"github.com/pion/rtp" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| // Conn for RTMP and RTMPT (flv over HTTP) | ||||
| type Conn interface { | ||||
| 	Streams() (streams []av.CodecData, err error) | ||||
| 	ReadPacket() (pkt av.Packet, err error) | ||||
| 	Close() (err error) | ||||
| } | ||||
|  | ||||
| type Client struct { | ||||
| 	streamer.Element | ||||
|  | ||||
| @@ -22,7 +31,7 @@ type Client struct { | ||||
| 	medias []*streamer.Media | ||||
| 	tracks []*streamer.Track | ||||
|  | ||||
| 	conn   *rtmp.Conn | ||||
| 	conn   Conn | ||||
| 	closed bool | ||||
|  | ||||
| 	receive int | ||||
| @@ -33,7 +42,12 @@ func NewClient(uri string) *Client { | ||||
| } | ||||
|  | ||||
| func (c *Client) Dial() (err error) { | ||||
| 	c.conn, err = rtmp.Dial(c.URI) | ||||
| 	if strings.HasPrefix(c.URI, "http") { | ||||
| 		c.conn, err = httpflv.Dial(c.URI) | ||||
| 	} else { | ||||
| 		c.conn, err = rtmp.Dial(c.URI) | ||||
| 	} | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|   | ||||
| @@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) { | ||||
| 	v := map[string]interface{}{ | ||||
| 		streamer.JSONReceive:    c.receive, | ||||
| 		streamer.JSONType:       "RTMP client producer", | ||||
| 		streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), | ||||
| 		//streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), | ||||
| 		"url":                   c.URI, | ||||
| 	} | ||||
| 	for i, media := range c.medias { | ||||
|   | ||||
							
								
								
									
										105
									
								
								pkg/rtsp/conn.go
									
									
									
									
									
								
							
							
						
						
									
										105
									
								
								pkg/rtsp/conn.go
									
									
									
									
									
								
							| @@ -43,8 +43,6 @@ const ( | ||||
| 	ModeServerConsumer | ||||
| ) | ||||
|  | ||||
| const KeepAlive = time.Second * 25 | ||||
|  | ||||
| type Conn struct { | ||||
| 	streamer.Element | ||||
|  | ||||
| @@ -60,6 +58,7 @@ type Conn struct { | ||||
| 	// internal | ||||
|  | ||||
| 	auth     *tcp.Auth | ||||
| 	closed   bool | ||||
| 	conn     net.Conn | ||||
| 	mode     Mode | ||||
| 	reader   *bufio.Reader | ||||
| @@ -115,9 +114,7 @@ func (c *Conn) Dial() (err error) { | ||||
| 		_ = c.parseURI() | ||||
| 	} | ||||
|  | ||||
| 	c.conn, err = net.DialTimeout( | ||||
| 		"tcp", c.URL.Host, 10*time.Second, | ||||
| 	) | ||||
| 	c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| @@ -362,21 +359,25 @@ func (c *Conn) SetupMedia( | ||||
| 	var res *tcp.Response | ||||
| 	res, err = c.Do(req) | ||||
| 	if err != nil { | ||||
| 		// Dahua VTO2111D fail on this step because of backchannel | ||||
| 		// some Dahua/Amcrest cameras fail here because two simultaneous | ||||
| 		// backchannel connections | ||||
| 		if c.Backchannel { | ||||
| 			if err = c.Dial(); err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			c.Backchannel = false | ||||
| 			if err = c.Describe(); err != nil { | ||||
| 			if err := c.Dial(); err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			res, err = c.Do(req) | ||||
| 			if err := c.Describe(); err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
|  | ||||
| 			for _, newMedia := range c.Medias { | ||||
| 				if newMedia.Control == media.Control { | ||||
| 					return c.SetupMedia(newMedia, newMedia.Codecs[0]) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if c.Session == "" { | ||||
| @@ -455,24 +456,19 @@ func (c *Conn) Teardown() (err error) { | ||||
| } | ||||
|  | ||||
| func (c *Conn) Close() error { | ||||
| 	if c.conn == nil { | ||||
| 	if c.closed { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if err := c.Teardown(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	conn := c.conn | ||||
| 	c.conn = nil | ||||
| 	return conn.Close() | ||||
| 	c.closed = true | ||||
| 	return c.conn.Close() | ||||
| } | ||||
|  | ||||
| const transport = "RTP/AVP/TCP;unicast;interleaved=" | ||||
|  | ||||
| func (c *Conn) Accept() error { | ||||
| 	//if c.state != StateServerInit { | ||||
| 	//	panic("wrong state") | ||||
| 	//} | ||||
|  | ||||
| 	for { | ||||
| 		req, err := tcp.ReadRequest(c.reader) | ||||
| 		if err != nil { | ||||
| @@ -575,7 +571,7 @@ func (c *Conn) Accept() error { | ||||
| 				Request: req, | ||||
| 			} | ||||
|  | ||||
| 			if tr[:len(transport)] == transport { | ||||
| 			if strings.HasPrefix(tr, transport) { | ||||
| 				c.Session = "1" // TODO: fixme | ||||
| 				res.Header.Set("Transport", tr[:len(transport)+3]) | ||||
| 			} else { | ||||
| @@ -598,16 +594,44 @@ func (c *Conn) Accept() error { | ||||
|  | ||||
| func (c *Conn) Handle() (err error) { | ||||
| 	defer func() { | ||||
| 		if c.conn == nil { | ||||
| 		if c.closed { | ||||
| 			err = nil | ||||
| 		} else { | ||||
| 			// may have gotten here because of the deadline | ||||
| 			// so close the connection to stop keepalive | ||||
| 			_ = c.conn.Close() | ||||
| 		} | ||||
| 		//c.Fire(streamer.StateNull) | ||||
| 	}() | ||||
|  | ||||
| 	//c.Fire(streamer.StatePlaying) | ||||
| 	ts := time.Now().Add(KeepAlive) | ||||
| 	var timeout time.Duration | ||||
|  | ||||
| 	switch c.mode { | ||||
| 	case ModeClientProducer: | ||||
| 		// polling frames from remote RTSP Server (ex Camera) | ||||
| 		timeout = time.Second * 5 | ||||
| 		go c.keepalive() | ||||
|  | ||||
| 	case ModeServerProducer: | ||||
| 		// polling frames from remote RTSP Client (ex FFmpeg) | ||||
| 		timeout = time.Second * 15 | ||||
|  | ||||
| 	case ModeServerConsumer: | ||||
| 		// pushing frames to remote RTSP Client (ex VLC) | ||||
| 		timeout = time.Second * 60 | ||||
|  | ||||
| 	default: | ||||
| 		return fmt.Errorf("wrong RTSP conn mode: %d", c.mode) | ||||
| 	} | ||||
|  | ||||
| 	for { | ||||
| 		if c.closed { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// we can read: | ||||
| 		// 1. RTP interleaved: `$` + 1B channel number + 2B size | ||||
| 		// 2. RTSP response:   RTSP/1.0 200 OK | ||||
| @@ -685,16 +709,19 @@ func (c *Conn) Handle() (err error) { | ||||
|  | ||||
| 			c.Fire(msg) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| 		// keep-alive | ||||
| 		now := time.Now() | ||||
| 		if now.After(ts) { | ||||
| 			req := &tcp.Request{Method: MethodOptions, URL: c.URL} | ||||
| 			// don't need to wait respose on this request | ||||
| 			if err = c.Request(req); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			ts = now.Add(KeepAlive) | ||||
| func (c *Conn) keepalive() { | ||||
| 	// TODO: rewrite to RTCP | ||||
| 	req := &tcp.Request{Method: MethodOptions, URL: c.URL} | ||||
| 	for { | ||||
| 		time.Sleep(time.Second * 25) | ||||
| 		if c.closed { | ||||
| 			return | ||||
| 		} | ||||
| 		if err := c.Request(req); err != nil { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -712,20 +739,16 @@ func (c *Conn) bindTrack( | ||||
| 	track *streamer.Track, channel uint8, payloadType uint8, | ||||
| ) *streamer.Track { | ||||
| 	push := func(packet *rtp.Packet) error { | ||||
| 		if c.conn == nil { | ||||
| 		if c.closed { | ||||
| 			return nil | ||||
| 		} | ||||
| 		packet.Header.PayloadType = payloadType | ||||
| 		//packet.Header.PayloadType = 100 | ||||
| 		//packet.Header.PayloadType = 8 | ||||
| 		//packet.Header.PayloadType = 106 | ||||
|  | ||||
| 		size := packet.MarshalSize() | ||||
|  | ||||
| 		data := make([]byte, 4+size) | ||||
| 		data[0] = '$' | ||||
| 		data[1] = channel | ||||
| 		//data[1] = 10 | ||||
| 		binary.BigEndian.PutUint16(data[2:], uint16(size)) | ||||
|  | ||||
| 		if _, err := packet.MarshalTo(data[4:]); err != nil { | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package rtsp | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/AlexxIT/go2rtc/pkg/streamer" | ||||
| 	"strconv" | ||||
| ) | ||||
| @@ -27,13 +28,16 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. | ||||
| } | ||||
|  | ||||
| func (c *Conn) Start() error { | ||||
| 	if c.mode == ModeServerProducer { | ||||
| 		return nil | ||||
| 	switch c.mode { | ||||
| 	case ModeClientProducer: | ||||
| 		if err := c.Play(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	case ModeServerProducer: | ||||
| 	default: | ||||
| 		return fmt.Errorf("start wrong mode: %d", c.mode) | ||||
| 	} | ||||
|  | ||||
| 	if err := c.Play(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return c.Handle() | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -75,13 +75,13 @@ func (m *Media) AV() bool { | ||||
| 	return m.Kind == KindVideo || m.Kind == KindAudio | ||||
| } | ||||
|  | ||||
| func (m *Media) MatchCodec(codec *Codec) bool { | ||||
| func (m *Media) MatchCodec(codec *Codec) *Codec { | ||||
| 	for _, c := range m.Codecs { | ||||
| 		if c.Match(codec) { | ||||
| 			return true | ||||
| 			return c | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *Media) MatchMedia(media *Media) *Codec { | ||||
|   | ||||
| @@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc | ||||
| type Track struct { | ||||
| 	Codec     *Codec | ||||
| 	Direction string | ||||
| 	Sink      map[*Track]WriterFunc | ||||
| 	mx        sync.Mutex | ||||
| 	sink      map[*Track]WriterFunc | ||||
| 	sinkMu    sync.Mutex | ||||
| } | ||||
|  | ||||
| func (t *Track) String() string { | ||||
| 	s := t.Codec.String() | ||||
| 	s += fmt.Sprintf(", sinks=%d", len(t.Sink)) | ||||
| 	s += fmt.Sprintf(", sinks=%d", len(t.sink)) | ||||
| 	return s | ||||
| } | ||||
|  | ||||
| func (t *Track) WriteRTP(p *rtp.Packet) error { | ||||
| 	t.mx.Lock() | ||||
| 	for _, f := range t.Sink { | ||||
| 	t.sinkMu.Lock() | ||||
| 	for _, f := range t.sink { | ||||
| 		_ = f(p) | ||||
| 	} | ||||
| 	t.mx.Unlock() | ||||
| 	t.sinkMu.Unlock() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *Track) Bind(w WriterFunc) *Track { | ||||
| 	t.mx.Lock() | ||||
| 	t.sinkMu.Lock() | ||||
|  | ||||
| 	if t.Sink == nil { | ||||
| 		t.Sink = map[*Track]WriterFunc{} | ||||
| 	if t.sink == nil { | ||||
| 		t.sink = map[*Track]WriterFunc{} | ||||
| 	} | ||||
|  | ||||
| 	clone := &Track{ | ||||
| 		Codec: t.Codec, Direction: t.Direction, Sink: t.Sink, | ||||
| 		Codec: t.Codec, Direction: t.Direction, sink: t.sink, | ||||
| 	} | ||||
| 	t.Sink[clone] = w | ||||
| 	t.sink[clone] = w | ||||
|  | ||||
| 	t.mx.Unlock() | ||||
| 	t.sinkMu.Unlock() | ||||
|  | ||||
| 	return clone | ||||
| } | ||||
|  | ||||
| func (t *Track) Unbind() { | ||||
| 	t.mx.Lock() | ||||
| 	delete(t.Sink, t) | ||||
| 	t.mx.Unlock() | ||||
| 	t.sinkMu.Lock() | ||||
| 	delete(t.sink, t) | ||||
| 	t.sinkMu.Unlock() | ||||
| } | ||||
|  | ||||
| func (t *Track) GetSink(from *Track) { | ||||
| 	t.sink = from.sink | ||||
| } | ||||
|  | ||||
| func (t *Track) HasSink() bool { | ||||
| 	t.sinkMu.Lock() | ||||
| 	defer t.sinkMu.Unlock() | ||||
| 	return len(t.sink) > 0 | ||||
| } | ||||
|   | ||||
| @@ -9,6 +9,7 @@ import ( | ||||
| 	"net" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func NewCandidate(address string) (string, error) { | ||||
| @@ -38,7 +39,7 @@ func NewCandidate(address string) (string, error) { | ||||
|  | ||||
| func LookupIP(address string) (string, error) { | ||||
| 	if strings.HasPrefix(address, "stun:") { | ||||
| 		ip, err := GetPublicIP() | ||||
| 		ip, err := GetCachedPublicIP() | ||||
| 		if err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
| @@ -63,11 +64,20 @@ func LookupIP(address string) (string, error) { | ||||
|  | ||||
| // GetPublicIP example from https://github.com/pion/stun | ||||
| func GetPublicIP() (net.IP, error) { | ||||
| 	c, err := stun.Dial("udp", "stun.l.google.com:19302") | ||||
| 	conn, err := net.Dial("udp", "stun.l.google.com:19302") | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	c, err := stun.NewClient(conn) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if err = conn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var res stun.Event | ||||
|  | ||||
| 	message := stun.MustBuild(stun.TransactionID, stun.BindingRequest) | ||||
| @@ -90,6 +100,24 @@ func GetPublicIP() (net.IP, error) { | ||||
| 	return xorAddr.IP, nil | ||||
| } | ||||
|  | ||||
| var cachedIP net.IP | ||||
| var cachedTS time.Time | ||||
|  | ||||
| func GetCachedPublicIP() (net.IP, error) { | ||||
| 	now := time.Now() | ||||
| 	if now.After(cachedTS) { | ||||
| 		newIP, err := GetPublicIP() | ||||
| 		if err == nil { | ||||
| 			cachedIP = newIP | ||||
| 			cachedTS = now.Add(time.Minute * 5) | ||||
| 		} else if cachedIP == nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return cachedIP, nil | ||||
| } | ||||
|  | ||||
| func IsIP(host string) bool { | ||||
| 	for _, i := range host { | ||||
| 		if i >= 'A' { | ||||
|   | ||||
| @@ -51,11 +51,6 @@ | ||||
|                 pc.addIceCandidate({candidate: msg.value, sdpMid: ''}); | ||||
|             } else if (msg.type === 'webrtc/answer') { | ||||
|                 pc.setRemoteDescription({type: 'answer', sdp: msg.value}); | ||||
|                 pc.getTransceivers().forEach(t => { | ||||
|                     if (t.receiver.track.kind === 'audio') { | ||||
|                         t.currentDirection | ||||
|                     } | ||||
|                 }) | ||||
|             } | ||||
|         } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user