HTTP stream now works. 1 input, multi clients.

This refactors to have one output created per client.
This commit is contained in:
Will Storey
2017-01-01 16:21:52 -08:00
parent 37325f57b0
commit 99d23dc933

View File

@@ -1,7 +1,6 @@
package main
import (
"bufio"
"flag"
"fmt"
"log"
@@ -33,14 +32,13 @@ type Args struct {
// HTTPHandler allows us to pass information to our request handlers.
type HTTPHandler struct {
Verbose bool
ClientChangeChan chan<- int
ClientChan chan<- Client
ClientChan chan<- *Client
}
// A Client is servicing one HTTP client. It receives frames from the reader.
// Client is servicing one HTTP client.
type Client struct {
Data chan []byte
Done chan struct{}
OutPipe *os.File
Output *C.struct_VSOutput
}
func main() {
@@ -51,34 +49,10 @@ func main() {
C.vs_setup()
// Encoder writes to out pipe. Reader reads from in pipe.
in, out, err := os.Pipe()
if err != nil {
log.Fatalf("pipe: %s", err)
}
// Clients provide encoder info about themselves when they start up.
clientChan := make(chan *Client)
// Changes in clients announce on this channel. +1 for new client, -1 for
// losing a client.
clientChangeChan := make(chan int)
// Clients provide reader a channel to receive on.
//
// The reader acts as a publisher and clients act as subscribers. One
// publisher, potentially many subscribers.
clientChan := make(chan Client)
// When encoder writes a frame, we know how large it is by a message on this
// channel. The encoder sends messages on this channel to the reader to
// inform it of this. The reader then knows how much to read and allows it to
// always read a single frame at a time, which is valid for a client to
// receive. Otherwise if it reads without knowing frame boundaries, it is
// difficult for it to know when it is valid to start sending data to a
// client that enters mid-encoding.
frameChan := make(chan int, 128)
go encoderSupervisor(out, args.InputFormat, args.InputURL, args.Verbose,
clientChangeChan, frameChan)
go reader(args.Verbose, in, clientChan, frameChan)
go encoder(args.InputFormat, args.InputURL, args.Verbose, clientChan)
// Start serving either with HTTP or FastCGI.
@@ -86,7 +60,6 @@ func main() {
handler := HTTPHandler{
Verbose: args.Verbose,
ClientChangeChan: clientChangeChan,
ClientChan: clientChan,
}
@@ -153,148 +126,44 @@ func getArgs() (Args, error) {
}, nil
}
// The encoder supervisor deals with stopping and starting the encoder.
//
// We want there to be at most a single encoder goroutine active at any one
// time no matter how many clients there are. If there are zero clients, there
// should not be any encoding going on.
func encoderSupervisor(outPipe *os.File, inputFormat, inputURL string,
verbose bool, clientChangeChan <-chan int, frameChan chan<- int) {
// A count of how many clients are actively subscribed listening for
// audio/video. We start the encoder when this goes above zero, and stop it
// if it goes to zero.
clients := 0
func encoder(inputFormat, inputURL string, verbose bool,
clientChan <-chan *Client) {
clients := []*Client{}
var input *C.struct_VSInput
// We close this channel to tell the encoder to stop. It receives no values.
var encoderStopChan chan struct{}
// The encoder tells us when it stops by sending a message on this channel.
// Note I use sending a message as if this channel closes then the below loop
// will be busy until it is re-opened.
encoderDoneChan := make(chan struct{})
// TODO: We need to clean up all clients if we return
for {
select {
// A change in the number of clients.
case change := <-clientChangeChan:
// Gaining a client.
if change == 1 {
clients++
if verbose {
log.Printf("encoder supervisor: new client. %d clients connected",
clients)
}
if clients == 1 {
if verbose {
log.Printf("encoder supervisor: starting encoder")
}
encoderStopChan = make(chan struct{})
go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan,
encoderDoneChan, frameChan)
}
// If there are no clients, then block waiting for one.
if len(clients) == 0 {
client := <-clientChan
log.Printf("encoder: New client")
clients = append(clients, client)
continue
}
// Losing a client.
clients--
// There is at least one client.
if verbose {
log.Printf("encoder supervisor: lost client. %d clients connected",
clients)
}
// Get any new clients, but don't block.
clients = acceptClients(clientChan, clients)
if clients == 0 {
// Tell encoder to stop.
close(encoderStopChan)
if verbose {
log.Printf("encoder supervisor: stopping encoder")
}
continue
}
// Encoder stopped for some reason. Restart it if appropriate.
case <-encoderDoneChan:
if verbose {
log.Printf("encoder supervisor: encoder stopped")
}
if clients == 0 {
// No clients. We don't need to restart it.
continue
}
if verbose {
log.Printf("encoder supervisor: starting encoder")
}
encoderStopChan = make(chan struct{})
go videoEncoder(outPipe, inputFormat, inputURL, encoderStopChan,
encoderDoneChan, frameChan)
}
}
}
// videoEncoder opens a video input and begins decoding.
//
// It remuxes the video to an mp4 container.
//
// As it writes out frames, it informs the reader about a frame to read, along
// with that frame's size.
func videoEncoder(outPipe *os.File, inputFormat, inputURL string,
stopChan <-chan struct{}, doneChan chan<- struct{}, frameChan chan<- int) {
inputFormatC := C.CString(inputFormat)
inputURLC := C.CString(inputURL)
verbose := C.bool(true)
input := C.vs_open_input(inputFormatC, inputURLC, verbose)
// Open the input if it is not open yet.
if input == nil {
log.Printf("Unable to open input")
C.free(unsafe.Pointer(inputFormatC))
C.free(unsafe.Pointer(inputURLC))
doneChan <- struct{}{}
input = openInput(inputFormat, inputURL, verbose)
if input == nil {
log.Printf("encoder: Unable to open input")
return
}
C.free(unsafe.Pointer(inputFormatC))
C.free(unsafe.Pointer(inputURLC))
defer C.vs_destroy_input(input)
outputFormatC := C.CString("mp4")
outputURLC := C.CString(fmt.Sprintf("pipe:%d", outPipe.Fd()))
output := C.vs_open_output(outputFormatC, outputURLC, input, verbose)
if output == nil {
log.Printf("Unable to open output")
C.free(unsafe.Pointer(outputFormatC))
C.free(unsafe.Pointer(outputURLC))
doneChan <- struct{}{}
return
}
C.free(unsafe.Pointer(outputFormatC))
C.free(unsafe.Pointer(outputURLC))
defer C.vs_destroy_output(output)
for {
select {
// If stop channel is closed then we stop what we're doing.
case <-stopChan:
log.Printf("Stopping encoder")
doneChan <- struct{}{}
return
default:
log.Printf("encoder: Opened input")
}
// Read a packet.
var pkt C.AVPacket
readRes := C.int(0)
readRes = C.vs_read_packet(input, &pkt, verbose)
readRes = C.vs_read_packet(input, &pkt, C.bool(verbose))
if readRes == -1 {
log.Printf("Failure reading")
doneChan <- struct{}{}
log.Printf("encoder: Failure reading packet")
C.vs_destroy_input(input)
return
}
@@ -302,124 +171,113 @@ func videoEncoder(outPipe *os.File, inputFormat, inputURL string,
continue
}
writeRes := C.int(0)
writeRes = C.vs_write_packet(input, output, &pkt, verbose)
if writeRes == -1 {
log.Printf("Failure writing")
C.av_packet_unref(&pkt)
doneChan <- struct{}{}
return
if verbose {
log.Printf("encoder: read packet")
}
pktSize := int(pkt.size)
// Write the packet to all clients.
clients = writePacketToClients(input, &pkt, clients, verbose)
C.av_packet_unref(&pkt)
frameChan <- int(pktSize)
// If we get down to zero clients, close the input.
if len(clients) == 0 {
C.vs_destroy_input(input)
input = nil
}
}
}
// reader reads the pipe containing the re-encoded audio/video.
//
// We send the re-encoded data to each client.
//
// We hear about new clients on the clients channel.
//
// We expect the pipe to never close. The encoder may stop sending for a while
// but when a new client appears, it starts again.
func reader(verbose bool, inPipe *os.File, clientChan <-chan Client,
frameChan <-chan int) {
reader := bufio.NewReader(inPipe)
clients := []Client{}
func acceptClients(clientChan <-chan *Client, clients []*Client) []*Client {
for {
select {
case client := <-clientChan:
if verbose {
log.Printf("reader: accepted new client")
}
clients = append(clients, client)
case frameSize := <-frameChan:
if verbose {
log.Printf("reader: reading new frame (%d bytes)", frameSize)
}
frame, err := readFrame(reader, frameSize)
if err != nil {
log.Printf("reader: %s", err)
return
}
if verbose {
log.Printf("reader: read frame (%d bytes)", frameSize)
}
clients = sendFrameToClients(clients, frame)
default:
return clients
}
}
}
// Read a frame.
func readFrame(reader *bufio.Reader, size int) ([]byte, error) {
buf := []byte{}
bytesNeeded := size
func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput {
inputFormatC := C.CString(inputFormat)
inputURLC := C.CString(inputURL)
for {
if bytesNeeded == 0 {
return buf, nil
input := C.vs_open_input(inputFormatC, inputURLC, C.bool(verbose))
if input == nil {
log.Printf("Unable to open input")
C.free(unsafe.Pointer(inputFormatC))
C.free(unsafe.Pointer(inputURLC))
return nil
}
C.free(unsafe.Pointer(inputFormatC))
C.free(unsafe.Pointer(inputURLC))
return input
}
readBuf := make([]byte, bytesNeeded)
n, err := reader.Read(readBuf)
if err != nil {
return nil, fmt.Errorf("read: %s", err)
}
log.Printf("read %d bytes", n)
buf = append(buf, readBuf[0:n]...)
bytesNeeded -= n
}
}
// Try to send the given frame to each client.
//
// If sending would block, cut the client off.
func sendFrameToClients(clients []Client, frame []byte) []Client {
clients2 := []Client{}
func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
clients []*Client, verbose bool) []*Client {
// Rewrite clients slice with only those we succeeded in writing to. If we
// failed for some reason we clean up the client and no longer send it anything
// further.
clients2 := []*Client{}
for _, client := range clients {
err := sendFrameToClient(client, frame)
if err != nil {
// Open the client's output if it is not yet open.
if client.Output == nil {
outputFormat := "mp4"
outputURL := fmt.Sprintf("pipe:%d", client.OutPipe.Fd())
client.Output = openOutput(outputFormat, outputURL, verbose, input)
if client.Output == nil {
log.Printf("Unable to open output for client")
// TODO: Will client reader realize it is closed?
_ = client.OutPipe.Close()
continue
}
log.Printf("Opened output for client")
}
// Write the packet to it.
writeRes := C.int(0)
writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose))
if writeRes == -1 {
log.Printf("Failure writing packet")
_ = client.OutPipe.Close()
C.vs_destroy_output(client.Output)
client.Output = nil
continue
}
// Keep it around.
if verbose {
log.Printf("encoder: wrote packet to client")
}
// Keep the client around.
clients2 = append(clients2, client)
}
return clients2
}
// Send a frame to a client. Don't block. If we would block, return an error.
// This lets us know if we should consider the client dead.
//
// Close the client's frame channel if we fail to send so it knows to go away.
func sendFrameToClient(client Client, frame []byte) error {
select {
case client.Data <- frame:
func openOutput(outputFormat, outputURL string, verbose bool,
input *C.struct_VSInput) *C.struct_VSOutput {
outputFormatC := C.CString("mp4")
outputURLC := C.CString(outputURL)
output := C.vs_open_output(outputFormatC, outputURLC, input, C.bool(verbose))
if output == nil {
log.Printf("Unable to open output")
C.free(unsafe.Pointer(outputFormatC))
C.free(unsafe.Pointer(outputURLC))
return nil
case <-client.Done:
close(client.Data)
return fmt.Errorf("client went away")
default:
close(client.Data)
return fmt.Errorf("client is too slow")
}
C.free(unsafe.Pointer(outputFormatC))
C.free(unsafe.Pointer(outputURLC))
return output
}
// ServeHTTP handles an HTTP request.
@@ -438,45 +296,48 @@ func (h HTTPHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
c := Client{
// We receive audio/video data on this channel from the reader.
Data: make(chan []byte, 1024),
// We close this channel to indicate to reader we're done. This is necessary
// if we terminate, otherwise the reader can't know to stop sending us
// frames.
Done: make(chan struct{}),
// Encoder writes to out pipe. We read from in pipe.
inPipe, outPipe, err := os.Pipe()
if err != nil {
log.Printf("Unable to open pipe: %s", err)
rw.WriteHeader(http.StatusInternalServerError)
_, _ = rw.Write([]byte("<h1>500 Internal server error</h1>"))
return
}
// Tell the reader we're here.
h.ClientChan <- c
c := &Client{
OutPipe: outPipe,
}
// Tell the encoder we're here.
h.ClientChangeChan <- 1
h.ClientChan <- c
rw.Header().Set("Content-Type", "video/mp4")
rw.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
// We send chunked by default
for {
frame, ok := <-c.Data
// Reader may have cut us off.
if !ok {
log.Printf("reader closed audio/video channel")
break
}
n, err := rw.Write(frame)
buf := make([]byte, 1024)
readSize, err := inPipe.Read(buf)
if err != nil {
log.Printf("write: %s", err)
log.Printf("%s: Read error: %s", r.RemoteAddr, err)
break
}
if n != len(frame) {
log.Printf("short write")
if readSize == 0 {
log.Printf("%s: EOF", r.RemoteAddr)
break
}
writeSize, err := rw.Write(buf[:readSize])
if err != nil {
log.Printf("%s: Write error: %s", r.RemoteAddr, err)
break
}
if writeSize != readSize {
log.Printf("%s: Short write", r.RemoteAddr)
break
}
@@ -491,13 +352,10 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
}
}
h.ClientChangeChan <- -1
_ = inPipe.Close()
_ = outPipe.Close()
close(c.Done)
// Drain frames channel.
for range c.Data {
}
// TODO: Do we need to drain pipe?
log.Printf("%s: Client cleaned up", r.RemoteAddr)
}