Give each client its own goroutine for writes

This is to avoid the encoder blocking trying to write to the write side
of the pipe when a client is not reading from it quickly enough.
This commit is contained in:
Will Storey
2017-01-01 22:36:09 -08:00
parent 99c3233f69
commit 1f0df03f36

View File

@@ -37,8 +37,17 @@ type HTTPHandler struct {
// Client is servicing one HTTP client.
type Client struct {
// packetWriter goroutine writes out video packets to this pipe. HTTP goroutine
// reads from the read side.
OutPipe *os.File
Output *C.struct_VSOutput
// Reference to a media output context. Through this, the packetWriter
// goroutine writes packets to the write side of the pipe.
Output *C.struct_VSOutput
// Encoder writes packets to this channel, then the packetWriter goroutine
// writes them to the pipe.
PacketChan chan *C.AVPacket
}
func main() {
@@ -206,12 +215,33 @@ func cleanupClients(clients []*Client) {
func cleanupClient(client *Client) {
// Closing write side will make read side receive EOF.
_ = client.OutPipe.Close()
if client.OutPipe != nil {
_ = client.OutPipe.Close()
client.OutPipe = nil
}
if client.Output != nil {
C.vs_destroy_output(client.Output)
client.Output = nil
}
if client.PacketChan != nil {
close(client.PacketChan)
// Drain it. The packetWriter should be draining it too. However it is
// possible that it ended.
//
// Note one may think that draining both here and in the packetWriter could
// lead to the unfortunate likelihood that the client will receive some
// packets but not others, leading to corruption. But since we closed the
// write side of the pipe above, this will not happen. No further packets will
// be reaching the client.
for pkt := range client.PacketChan {
C.av_packet_free(&pkt)
}
client.PacketChan = nil
}
}
func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput {
@@ -231,6 +261,8 @@ func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput {
return input
}
// Try to write the packet to each client. If we fail, we clean up the client
// and it will not be in the returned list of clients.
func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
clients []*Client, verbose bool) []*Client {
// Rewrite clients slice with only those we succeeded in writing to. If we
@@ -250,26 +282,61 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
continue
}
// We pass packets to the client via this channel. We give each client its
// own goroutine for the purposes of receiving these packets and writing them
// to the write side of the pipe. We do it this way rather than directly here
// because we do not want the encoder to block waiting on a write to the
// write side of the pipe because there is a slow HTTP client.
client.PacketChan = make(chan *C.AVPacket, 32)
go packetWriter(client, input, verbose)
log.Printf("Opened output for client")
}
// Write the packet to it.
writeRes := C.int(0)
writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose))
if writeRes == -1 {
log.Printf("Failure writing packet")
// Duplicate the packet. Each client's goroutine will receive a copy.
pktCopy := C.av_packet_clone(pkt)
if pktCopy == nil {
log.Printf("Unable to clone packet")
cleanupClient(client)
continue
}
// Keep the client around.
// Pass the client to a goroutine that writes it to this client.
select {
case client.PacketChan <- pktCopy:
default:
log.Printf("Client too slow")
C.av_packet_free(&pktCopy)
cleanupClient(client)
continue
}
// Successful so far. Keep the client around.
clients2 = append(clients2, client)
}
return clients2
}
// Receive packets from the encoder, and write them out to the client's pipe.
//
// We end when encoder closes the channel, or if we encounter a write error.
func packetWriter(client *Client, input *C.struct_VSInput, verbose bool) {
for pkt := range client.PacketChan {
writeRes := C.int(0)
writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose))
if writeRes == -1 {
log.Printf("Failure writing packet")
C.av_packet_free(&pkt)
return
}
C.av_packet_free(&pkt)
}
}
// Open the output file. This creates an MP4 container and writes the header to
// the given output URL.
func openOutput(outputFormat, outputURL string, verbose bool,
input *C.struct_VSInput) *C.struct_VSOutput {
outputFormatC := C.CString("mp4")
@@ -303,8 +370,12 @@ func (h HTTPHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write([]byte("<h1>404 Not found</h1>"))
}
// Read from a pipe where streaming media shows up. We read a chunk and write it
// immediately to the client, and repeat forever (until either the client goes
// away, or an error of some kind occurs).
func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
// Encoder writes to out pipe. We read from in pipe.
// The encoder writes to the out pipe (using the packetWriter goroutine). We
// read from the in pipe.
inPipe, outPipe, err := os.Pipe()
if err != nil {
log.Printf("Unable to open pipe: %s", err)