From 99d23dc9333158e2ac8643887a7605fa7db8145a Mon Sep 17 00:00:00 2001 From: Will Storey Date: Sun, 1 Jan 2017 16:21:52 -0800 Subject: [PATCH] HTTP stream now works. 1 input, multi clients. This refactors to have one output created per client. --- videostreamer.go | 430 ++++++++++++++++------------------------------- 1 file changed, 144 insertions(+), 286 deletions(-) diff --git a/videostreamer.go b/videostreamer.go index 8253f5e..f57fa48 100644 --- a/videostreamer.go +++ b/videostreamer.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "flag" "fmt" "log" @@ -32,15 +31,14 @@ type Args struct { // HTTPHandler allows us to pass information to our request handlers. type HTTPHandler struct { - Verbose bool - ClientChangeChan chan<- int - ClientChan chan<- Client + Verbose bool + ClientChan chan<- *Client } -// A Client is servicing one HTTP client. It receives frames from the reader. +// Client is servicing one HTTP client. type Client struct { - Data chan []byte - Done chan struct{} + OutPipe *os.File + Output *C.struct_VSOutput } func main() { @@ -51,43 +49,18 @@ func main() { C.vs_setup() - // Encoder writes to out pipe. Reader reads from in pipe. - in, out, err := os.Pipe() - if err != nil { - log.Fatalf("pipe: %s", err) - } + // Clients provide encoder info about themselves when they start up. + clientChan := make(chan *Client) - // Changes in clients announce on this channel. +1 for new client, -1 for - // losing a client. - clientChangeChan := make(chan int) - - // Clients provide reader a channel to receive on. - // - // The reader acts as a publisher and clients act as subscribers. One - // publisher, potentially many subscribers. - clientChan := make(chan Client) - - // When encoder writes a frame, we know how large it is by a message on this - // channel. The encoder sends messages on this channel to the reader to - // inform it of this. The reader then knows how much to read and allows it to - // always read a single frame at a time, which is valid for a client to - // receive. Otherwise if it reads without knowing frame boundaries, it is - // difficult for it to know when it is valid to start sending data to a - // client that enters mid-encoding. - frameChan := make(chan int, 128) - - go encoderSupervisor(out, args.InputFormat, args.InputURL, args.Verbose, - clientChangeChan, frameChan) - go reader(args.Verbose, in, clientChan, frameChan) + go encoder(args.InputFormat, args.InputURL, args.Verbose, clientChan) // Start serving either with HTTP or FastCGI. hostPort := fmt.Sprintf("%s:%d", args.ListenHost, args.ListenPort) handler := HTTPHandler{ - Verbose: args.Verbose, - ClientChangeChan: clientChangeChan, - ClientChan: clientChan, + Verbose: args.Verbose, + ClientChan: clientChan, } if args.FCGI { @@ -153,148 +126,44 @@ func getArgs() (Args, error) { }, nil } -// The encoder supervisor deals with stopping and starting the encoder. -// -// We want there to be at most a single encoder goroutine active at any one -// time no matter how many clients there are. If there are zero clients, there -// should not be any encoding going on. -func encoderSupervisor(outPipe *os.File, inputFormat, inputURL string, - verbose bool, clientChangeChan <-chan int, frameChan chan<- int) { - // A count of how many clients are actively subscribed listening for - // audio/video. We start the encoder when this goes above zero, and stop it - // if it goes to zero. - clients := 0 +func encoder(inputFormat, inputURL string, verbose bool, + clientChan <-chan *Client) { + clients := []*Client{} + var input *C.struct_VSInput - // We close this channel to tell the encoder to stop. It receives no values. - var encoderStopChan chan struct{} - - // The encoder tells us when it stops by sending a message on this channel. - // Note I use sending a message as if this channel closes then the below loop - // will be busy until it is re-opened. - encoderDoneChan := make(chan struct{}) + // TODO: We need to clean up all clients if we return for { - select { - // A change in the number of clients. - case change := <-clientChangeChan: - // Gaining a client. - if change == 1 { - clients++ - - if verbose { - log.Printf("encoder supervisor: new client. %d clients connected", - clients) - } - - if clients == 1 { - if verbose { - log.Printf("encoder supervisor: starting encoder") - } - - encoderStopChan = make(chan struct{}) - - go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } - - continue - } - - // Losing a client. - clients-- - - if verbose { - log.Printf("encoder supervisor: lost client. %d clients connected", - clients) - } - - if clients == 0 { - // Tell encoder to stop. - close(encoderStopChan) - if verbose { - log.Printf("encoder supervisor: stopping encoder") - } - continue - } - - // Encoder stopped for some reason. Restart it if appropriate. - case <-encoderDoneChan: - if verbose { - log.Printf("encoder supervisor: encoder stopped") - } - - if clients == 0 { - // No clients. We don't need to restart it. - continue - } - - if verbose { - log.Printf("encoder supervisor: starting encoder") - } - - encoderStopChan = make(chan struct{}) - - go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan, - encoderDoneChan, frameChan) - } - } -} - -// videoEncoder opens a video input and begins decoding. -// -// It remuxes the video to an mp4 container. -// -// As it writes out frames, it informs the reader about a frame to read, along -// with that frame's size. -func videoEncoder(outPipe *os.File, inputFormat, inputURL string, - stopChan <-chan struct{}, doneChan chan<- struct{}, frameChan chan<- int) { - inputFormatC := C.CString(inputFormat) - inputURLC := C.CString(inputURL) - verbose := C.bool(true) - - input := C.vs_open_input(inputFormatC, inputURLC, verbose) - if input == nil { - log.Printf("Unable to open input") - C.free(unsafe.Pointer(inputFormatC)) - C.free(unsafe.Pointer(inputURLC)) - doneChan <- struct{}{} - return - } - C.free(unsafe.Pointer(inputFormatC)) - C.free(unsafe.Pointer(inputURLC)) - defer C.vs_destroy_input(input) - - outputFormatC := C.CString("mp4") - outputURLC := C.CString(fmt.Sprintf("pipe:%d", outPipe.Fd())) - - output := C.vs_open_output(outputFormatC, outputURLC, input, verbose) - if output == nil { - log.Printf("Unable to open output") - C.free(unsafe.Pointer(outputFormatC)) - C.free(unsafe.Pointer(outputURLC)) - doneChan <- struct{}{} - return - } - C.free(unsafe.Pointer(outputFormatC)) - C.free(unsafe.Pointer(outputURLC)) - defer C.vs_destroy_output(output) - - for { - select { - // If stop channel is closed then we stop what we're doing. - case <-stopChan: - log.Printf("Stopping encoder") - doneChan <- struct{}{} - return - default: + // If there are no clients, then block waiting for one. + if len(clients) == 0 { + client := <-clientChan + log.Printf("encoder: New client") + clients = append(clients, client) + continue } + // There is at least one client. + + // Get any new clients, but don't block. + clients = acceptClients(clientChan, clients) + + // Open the input if it is not open yet. + if input == nil { + input = openInput(inputFormat, inputURL, verbose) + if input == nil { + log.Printf("encoder: Unable to open input") + return + } + log.Printf("encoder: Opened input") + } + + // Read a packet. var pkt C.AVPacket readRes := C.int(0) - readRes = C.vs_read_packet(input, &pkt, verbose) + readRes = C.vs_read_packet(input, &pkt, C.bool(verbose)) if readRes == -1 { - log.Printf("Failure reading") - doneChan <- struct{}{} + log.Printf("encoder: Failure reading packet") + C.vs_destroy_input(input) return } @@ -302,124 +171,113 @@ func videoEncoder(outPipe *os.File, inputFormat, inputURL string, continue } - writeRes := C.int(0) - writeRes = C.vs_write_packet(input, output, &pkt, verbose) - if writeRes == -1 { - log.Printf("Failure writing") - C.av_packet_unref(&pkt) - doneChan <- struct{}{} - return + if verbose { + log.Printf("encoder: read packet") } - pktSize := int(pkt.size) + // Write the packet to all clients. + clients = writePacketToClients(input, &pkt, clients, verbose) C.av_packet_unref(&pkt) - frameChan <- int(pktSize) + // If we get down to zero clients, close the input. + if len(clients) == 0 { + C.vs_destroy_input(input) + input = nil + } } } -// reader reads the pipe containing the re-encoded audio/video. -// -// We send the re-encoded data to each client. -// -// We hear about new clients on the clients channel. -// -// We expect the pipe to never close. The encoder may stop sending for a while -// but when a new client appears, it starts again. -func reader(verbose bool, inPipe *os.File, clientChan <-chan Client, - frameChan <-chan int) { - reader := bufio.NewReader(inPipe) - clients := []Client{} - +func acceptClients(clientChan <-chan *Client, clients []*Client) []*Client { for { select { case client := <-clientChan: - if verbose { - log.Printf("reader: accepted new client") - } - clients = append(clients, client) - - case frameSize := <-frameChan: - if verbose { - log.Printf("reader: reading new frame (%d bytes)", frameSize) - } - - frame, err := readFrame(reader, frameSize) - if err != nil { - log.Printf("reader: %s", err) - return - } - - if verbose { - log.Printf("reader: read frame (%d bytes)", frameSize) - } - - clients = sendFrameToClients(clients, frame) + default: + return clients } } } -// Read a frame. -func readFrame(reader *bufio.Reader, size int) ([]byte, error) { - buf := []byte{} - bytesNeeded := size +func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput { + inputFormatC := C.CString(inputFormat) + inputURLC := C.CString(inputURL) - for { - if bytesNeeded == 0 { - return buf, nil - } - - readBuf := make([]byte, bytesNeeded) - - n, err := reader.Read(readBuf) - if err != nil { - return nil, fmt.Errorf("read: %s", err) - } - - log.Printf("read %d bytes", n) - - buf = append(buf, readBuf[0:n]...) - - bytesNeeded -= n + input := C.vs_open_input(inputFormatC, inputURLC, C.bool(verbose)) + if input == nil { + log.Printf("Unable to open input") + C.free(unsafe.Pointer(inputFormatC)) + C.free(unsafe.Pointer(inputURLC)) + return nil } + C.free(unsafe.Pointer(inputFormatC)) + C.free(unsafe.Pointer(inputURLC)) + + return input } -// Try to send the given frame to each client. -// -// If sending would block, cut the client off. -func sendFrameToClients(clients []Client, frame []byte) []Client { - clients2 := []Client{} +func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket, + clients []*Client, verbose bool) []*Client { + // Rewrite clients slice with only those we succeeded in writing to. If we + // failed for some reason we clean up the client and no longer send it anything + // further. + clients2 := []*Client{} for _, client := range clients { - err := sendFrameToClient(client, frame) - if err != nil { + // Open the client's output if it is not yet open. + if client.Output == nil { + outputFormat := "mp4" + outputURL := fmt.Sprintf("pipe:%d", client.OutPipe.Fd()) + client.Output = openOutput(outputFormat, outputURL, verbose, input) + if client.Output == nil { + log.Printf("Unable to open output for client") + // TODO: Will client reader realize it is closed? + _ = client.OutPipe.Close() + continue + } + log.Printf("Opened output for client") + } + + // Write the packet to it. + + writeRes := C.int(0) + writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose)) + if writeRes == -1 { + log.Printf("Failure writing packet") + _ = client.OutPipe.Close() + C.vs_destroy_output(client.Output) + client.Output = nil continue } - // Keep it around. + if verbose { + log.Printf("encoder: wrote packet to client") + } + + // Keep the client around. + clients2 = append(clients2, client) } return clients2 } -// Send a frame to a client. Don't block. If we would block, return an error. -// This lets us know if we should consider the client dead. -// -// Close the client's frame channel if we fail to send so it knows to go away. -func sendFrameToClient(client Client, frame []byte) error { - select { - case client.Data <- frame: +func openOutput(outputFormat, outputURL string, verbose bool, + input *C.struct_VSInput) *C.struct_VSOutput { + outputFormatC := C.CString("mp4") + outputURLC := C.CString(outputURL) + + output := C.vs_open_output(outputFormatC, outputURLC, input, C.bool(verbose)) + if output == nil { + log.Printf("Unable to open output") + C.free(unsafe.Pointer(outputFormatC)) + C.free(unsafe.Pointer(outputURLC)) return nil - case <-client.Done: - close(client.Data) - return fmt.Errorf("client went away") - default: - close(client.Data) - return fmt.Errorf("client is too slow") } + C.free(unsafe.Pointer(outputFormatC)) + C.free(unsafe.Pointer(outputURLC)) + + return output } // ServeHTTP handles an HTTP request. @@ -438,45 +296,48 @@ func (h HTTPHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { - c := Client{ - // We receive audio/video data on this channel from the reader. - Data: make(chan []byte, 1024), - - // We close this channel to indicate to reader we're done. This is necessary - // if we terminate, otherwise the reader can't know to stop sending us - // frames. - Done: make(chan struct{}), + // Encoder writes to out pipe. We read from in pipe. + inPipe, outPipe, err := os.Pipe() + if err != nil { + log.Printf("Unable to open pipe: %s", err) + rw.WriteHeader(http.StatusInternalServerError) + _, _ = rw.Write([]byte("

500 Internal server error

")) + return } - // Tell the reader we're here. - h.ClientChan <- c + c := &Client{ + OutPipe: outPipe, + } // Tell the encoder we're here. - h.ClientChangeChan <- 1 + h.ClientChan <- c rw.Header().Set("Content-Type", "video/mp4") - rw.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") // We send chunked by default for { - frame, ok := <-c.Data - - // Reader may have cut us off. - if !ok { - log.Printf("reader closed audio/video channel") - break - } - - n, err := rw.Write(frame) + buf := make([]byte, 1024) + readSize, err := inPipe.Read(buf) if err != nil { - log.Printf("write: %s", err) + log.Printf("%s: Read error: %s", r.RemoteAddr, err) break } - if n != len(frame) { - log.Printf("short write") + if readSize == 0 { + log.Printf("%s: EOF", r.RemoteAddr) + break + } + + writeSize, err := rw.Write(buf[:readSize]) + if err != nil { + log.Printf("%s: Write error: %s", r.RemoteAddr, err) + break + } + + if writeSize != readSize { + log.Printf("%s: Short write", r.RemoteAddr) break } @@ -491,13 +352,10 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { } } - h.ClientChangeChan <- -1 + _ = inPipe.Close() + _ = outPipe.Close() - close(c.Done) - - // Drain frames channel. - for range c.Data { - } + // TODO: Do we need to drain pipe? log.Printf("%s: Client cleaned up", r.RemoteAddr) }