From 1ae87a3c72766baadc161e8663c8d21a74fcf38e Mon Sep 17 00:00:00 2001 From: Will Storey Date: Sat, 31 Dec 2016 13:31:16 -0800 Subject: [PATCH] Remove audio encoding Also begin refactor of removing 'frames'. This is now semi functional again. Still only the first client will work. --- videostreamer.go | 191 +++++++---------------------------------------- 1 file changed, 29 insertions(+), 162 deletions(-) diff --git a/videostreamer.go b/videostreamer.go index cd29568..bbb54d2 100644 --- a/videostreamer.go +++ b/videostreamer.go @@ -15,6 +15,8 @@ import ( // #include "videostreamer.h" // #include // #cgo LDFLAGS: -lavformat -lavdevice -lavcodec -lavutil +// #cgo CFLAGS: -std=c11 +// #cgo pkg-config: libavcodec import "C" // Args holds command line arguments. @@ -25,37 +27,20 @@ type Args struct { InputURL string Verbose bool // Serve with FCGI protocol (true) or HTTP (false). - FCGI bool - Encoder EncoderType + FCGI bool } -// EncoderType defines the type of streaming we're doing. -type EncoderType int - -const ( - // AudioEncoder means to encode audio. - AudioEncoder EncoderType = iota - // VideoEncoder means to encode video. - VideoEncoder -) - // HTTPHandler allows us to pass information to our request handlers. type HTTPHandler struct { Verbose bool ClientChangeChan chan<- int ClientChan chan<- Client - Encoder EncoderType } // A Client is servicing one HTTP client. It receives frames from the reader. type Client struct { - Frames chan Frame - Done chan struct{} -} - -// Frame is an audio/video frame (compressed and encoded). -type Frame struct { - Buffer []byte + Data chan []byte + Done chan struct{} } func main() { @@ -64,12 +49,7 @@ func main() { log.Fatalf("Invalid argument: %s", err) } - if args.Encoder == AudioEncoder { - C.as_setup() - } - if args.Encoder == VideoEncoder { - C.vs_setup() - } + C.vs_setup() // Encoder writes to out pipe. Reader reads from in pipe. in, out, err := os.Pipe() @@ -94,11 +74,10 @@ func main() { // receive. Otherwise if it reads without knowing frame boundaries, it is // difficult for it to know when it is valid to start sending data to a // client that enters mid-encoding. - frameChan := make(chan int) - //frameChan := make(chan int, 128) + frameChan := make(chan int, 128) go encoderSupervisor(out, args.InputFormat, args.InputURL, args.Verbose, - clientChangeChan, frameChan, args.Encoder) + clientChangeChan, frameChan) go reader(args.Verbose, in, clientChan, frameChan) // Start serving either with HTTP or FastCGI. @@ -109,7 +88,6 @@ func main() { Verbose: args.Verbose, ClientChangeChan: clientChangeChan, ClientChan: clientChan, - Encoder: args.Encoder, } if args.FCGI { @@ -143,11 +121,10 @@ func main() { func getArgs() (Args, error) { listenHost := flag.String("host", "0.0.0.0", "Host to listen on.") listenPort := flag.Int("port", 8080, "Port to listen on.") - format := flag.String("format", "pulse", "Input format. Examples: pulse for PulseAudio, mp3 for MP3, or rtsp for RTSP.") - input := flag.String("input", "", "Input URL valid for the given format. For MP3 you can give this as a path to a file. For PulseAudio you can give a value such as alsa_output.pci-0000_00_1f.3.analog-stereo.monitor to take input from a monitor. Use 'pactl list sources' to show the available PulseAudio sources. For RTSP you can provide a rtsp:// URL.") + format := flag.String("format", "pulse", "Input format. Example: rtsp for RTSP.") + input := flag.String("input", "", "Input URL valid for the given format. For RTSP you can provide a rtsp:// URL.") verbose := flag.Bool("verbose", false, "Enable verbose logging output.") fcgi := flag.Bool("fcgi", true, "Serve using FastCGI (true) or as a regular HTTP server.") - encoder := flag.String("encoder", "audio", "Stream audio or video. Each uses different encoding methods.") flag.Parse() @@ -166,16 +143,6 @@ func getArgs() (Args, error) { return Args{}, fmt.Errorf("you must provide an input URL") } - encoderType := AudioEncoder - if *encoder == "audio" { - encoderType = AudioEncoder - } else if *encoder == "video" { - encoderType = VideoEncoder - } else { - flag.PrintDefaults() - return Args{}, fmt.Errorf("you must specify encoder as audio or video") - } - return Args{ ListenHost: *listenHost, ListenPort: *listenPort, @@ -183,7 +150,6 @@ func getArgs() (Args, error) { InputURL: *input, Verbose: *verbose, FCGI: *fcgi, - Encoder: encoderType, }, nil } @@ -193,8 +159,7 @@ func getArgs() (Args, error) { // time no matter how many clients there are. If there are zero clients, there // should not be any encoding going on. func encoderSupervisor(outPipe *os.File, inputFormat, inputURL string, - verbose bool, clientChangeChan <-chan int, frameChan chan<- int, - encoder EncoderType) { + verbose bool, clientChangeChan <-chan int, frameChan chan<- int) { // A count of how many clients are actively subscribed listening for // audio/video. We start the encoder when this goes above zero, and stop it // if it goes to zero. @@ -228,14 +193,8 @@ func encoderSupervisor(outPipe *os.File, inputFormat, inputURL string, encoderStopChan = make(chan struct{}) - if encoder == AudioEncoder { - go audioEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } - if encoder == VideoEncoder { - go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } + go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, + encoderDoneChan, frameChan) } continue @@ -275,95 +234,8 @@ func encoderSupervisor(outPipe *os.File, inputFormat, inputURL string, encoderStopChan = make(chan struct{}) - if encoder == AudioEncoder { - go audioEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } - if encoder == VideoEncoder { - go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } - } - } -} - -// audioEncoder opens an audio input and begins decoding. It re-encodes the -// audio out and writes it to a pipe. It informs the reader goroutine how large -// each audio frame it writes is. -func audioEncoder(outPipe *os.File, inputFormat, inputURL string, - stopChan <-chan struct{}, doneChan chan<- struct{}, frameChan chan<- int) { - inputFormatC := C.CString(inputFormat) - inputURLC := C.CString(inputURL) - verbose := C.bool(false) - - input := C.as_open_input(inputFormatC, inputURLC, verbose) - if input == nil { - log.Printf("Unable to open input") - C.free(unsafe.Pointer(inputFormatC)) - C.free(unsafe.Pointer(inputURLC)) - doneChan <- struct{}{} - return - } - C.free(unsafe.Pointer(inputFormatC)) - C.free(unsafe.Pointer(inputURLC)) - - outputFormat := C.CString("mp3") - outputURL := C.CString(fmt.Sprintf("pipe:%d", outPipe.Fd())) - outputEncoder := C.CString("libmp3lame") - - output := C.as_open_output(input, outputFormat, outputURL, outputEncoder) - if output == nil { - log.Printf("Unable to open output") - C.as_destroy_input(input) - C.free(unsafe.Pointer(outputFormat)) - C.free(unsafe.Pointer(outputURL)) - C.free(unsafe.Pointer(outputEncoder)) - doneChan <- struct{}{} - return - } - C.free(unsafe.Pointer(outputFormat)) - C.free(unsafe.Pointer(outputURL)) - C.free(unsafe.Pointer(outputEncoder)) - - audiostreamer := C.as_init_audiostreamer(input, output) - if audiostreamer == nil { - log.Printf("Unable to initialize audiostreamer") - C.as_destroy_output(output) - C.as_destroy_input(input) - doneChan <- struct{}{} - return - } - defer C.as_destroy_audiostreamer(audiostreamer) - - for { - select { - // If stop channel is closed then we stop what we're doing. - case <-stopChan: - log.Printf("Stopping encoder") - doneChan <- struct{}{} - return - default: - } - - frameSize := C.int(0) - res := C.as_read_write(audiostreamer, &frameSize) - if res == -1 { - log.Printf("Failure decoding/encoding") - doneChan <- struct{}{} - return - } - - // EOF. Typical usage will never have EOF. However if we run with a file as - // input then this may happen. - if res == 0 { - doneChan <- struct{}{} - return - } - - // We did some work. - - if frameSize > 0 { - frameChan <- int(frameSize) + go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, + encoderDoneChan, frameChan) } } } @@ -471,20 +343,20 @@ func reader(verbose bool, inPipe *os.File, clientChan <-chan Client, } // Read a frame. -func readFrame(reader *bufio.Reader, size int) (Frame, error) { +func readFrame(reader *bufio.Reader, size int) ([]byte, error) { buf := []byte{} bytesNeeded := size for { if bytesNeeded == 0 { - return Frame{Buffer: buf}, nil + return buf, nil } readBuf := make([]byte, bytesNeeded) n, err := reader.Read(readBuf) if err != nil { - return Frame{}, fmt.Errorf("read: %s", err) + return nil, fmt.Errorf("read: %s", err) } log.Printf("read %d bytes", n) @@ -498,7 +370,7 @@ func readFrame(reader *bufio.Reader, size int) (Frame, error) { // Try to send the given frame to each client. // // If sending would block, cut the client off. -func sendFrameToClients(clients []Client, frame Frame) []Client { +func sendFrameToClients(clients []Client, frame []byte) []Client { clients2 := []Client{} for _, client := range clients { @@ -518,15 +390,15 @@ func sendFrameToClients(clients []Client, frame Frame) []Client { // This lets us know if we should consider the client dead. // // Close the client's frame channel if we fail to send so it knows to go away. -func sendFrameToClient(client Client, frame Frame) error { +func sendFrameToClient(client Client, frame []byte) error { select { - case client.Frames <- frame: + case client.Data <- frame: return nil case <-client.Done: - close(client.Frames) + close(client.Data) return fmt.Errorf("client went away") default: - close(client.Frames) + close(client.Data) return fmt.Errorf("client is too slow") } } @@ -549,7 +421,7 @@ func (h HTTPHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { c := Client{ // We receive audio/video data on this channel from the reader. - Frames: make(chan Frame, 1024), + Data: make(chan []byte, 1024), // We close this channel to indicate to reader we're done. This is necessary // if we terminate, otherwise the reader can't know to stop sending us @@ -563,19 +435,14 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { // Tell the encoder we're here. h.ClientChangeChan <- 1 - if h.Encoder == AudioEncoder { - rw.Header().Set("Content-Type", "audio/mpeg") - } - if h.Encoder == VideoEncoder { - rw.Header().Set("Content-Type", "video/mp4") - } + rw.Header().Set("Content-Type", "video/mp4") rw.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") // We send chunked by default for { - frame, ok := <-c.Frames + frame, ok := <-c.Data // Reader may have cut us off. if !ok { @@ -583,13 +450,13 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { break } - n, err := rw.Write(frame.Buffer) + n, err := rw.Write(frame) if err != nil { log.Printf("write: %s", err) break } - if n != len(frame.Buffer) { + if n != len(frame) { log.Printf("short write") break } @@ -610,7 +477,7 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { close(c.Done) // Drain frames channel. - for range c.Frames { + for range c.Data { } log.Printf("%s: Client cleaned up", r.RemoteAddr)