From 99c3233f690b0d399a8e540d38f6d3f2f2842d55 Mon Sep 17 00:00:00 2001 From: Will Storey Date: Sun, 1 Jan 2017 16:34:48 -0800 Subject: [PATCH] Be more careful about cleaning up clients Do it if encoder aborts for some reason. Also comment about behaviour of pipe sides when closed. --- videostreamer.go | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/videostreamer.go b/videostreamer.go index f57fa48..0c6a432 100644 --- a/videostreamer.go +++ b/videostreamer.go @@ -131,8 +131,6 @@ func encoder(inputFormat, inputURL string, verbose bool, clients := []*Client{} var input *C.struct_VSInput - // TODO: We need to clean up all clients if we return - for { // If there are no clients, then block waiting for one. if len(clients) == 0 { @@ -152,9 +150,13 @@ func encoder(inputFormat, inputURL string, verbose bool, input = openInput(inputFormat, inputURL, verbose) if input == nil { log.Printf("encoder: Unable to open input") + cleanupClients(clients) return } - log.Printf("encoder: Opened input") + + if verbose { + log.Printf("encoder: Opened input") + } } // Read a packet. @@ -164,6 +166,7 @@ func encoder(inputFormat, inputURL string, verbose bool, if readRes == -1 { log.Printf("encoder: Failure reading packet") C.vs_destroy_input(input) + cleanupClients(clients) return } @@ -171,10 +174,6 @@ func encoder(inputFormat, inputURL string, verbose bool, continue } - if verbose { - log.Printf("encoder: read packet") - } - // Write the packet to all clients. clients = writePacketToClients(input, &pkt, clients, verbose) @@ -199,6 +198,22 @@ func acceptClients(clientChan <-chan *Client, clients []*Client) []*Client { } } +func cleanupClients(clients []*Client) { + for _, client := range clients { + cleanupClient(client) + } +} + +func cleanupClient(client *Client) { + // Closing write side will make read side receive EOF. + _ = client.OutPipe.Close() + + if client.Output != nil { + C.vs_destroy_output(client.Output) + client.Output = nil + } +} + func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput { inputFormatC := C.CString(inputFormat) inputURLC := C.CString(inputURL) @@ -231,10 +246,10 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket, client.Output = openOutput(outputFormat, outputURL, verbose, input) if client.Output == nil { log.Printf("Unable to open output for client") - // TODO: Will client reader realize it is closed? - _ = client.OutPipe.Close() + cleanupClient(client) continue } + log.Printf("Opened output for client") } @@ -244,18 +259,11 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket, writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose)) if writeRes == -1 { log.Printf("Failure writing packet") - _ = client.OutPipe.Close() - C.vs_destroy_output(client.Output) - client.Output = nil + cleanupClient(client) continue } - if verbose { - log.Printf("encoder: wrote packet to client") - } - // Keep the client around. - clients2 = append(clients2, client) } @@ -325,6 +333,7 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { break } + // We get EOF if write side of pipe closed. if readSize == 0 { log.Printf("%s: EOF", r.RemoteAddr) break @@ -352,10 +361,8 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) { } } + // Writes to write side will raise error when read side is closed. _ = inPipe.Close() - _ = outPipe.Close() - - // TODO: Do we need to drain pipe? log.Printf("%s: Client cleaned up", r.RemoteAddr) }