Be more careful about cleaning up clients

Do it if encoder aborts for some reason. Also comment about behaviour of
pipe sides when closed.
This commit is contained in:
Will Storey
2017-01-01 16:34:48 -08:00
parent 99d23dc933
commit 99c3233f69

View File

@@ -131,8 +131,6 @@ func encoder(inputFormat, inputURL string, verbose bool,
clients := []*Client{}
var input *C.struct_VSInput
// TODO: We need to clean up all clients if we return
for {
// If there are no clients, then block waiting for one.
if len(clients) == 0 {
@@ -152,9 +150,13 @@ func encoder(inputFormat, inputURL string, verbose bool,
input = openInput(inputFormat, inputURL, verbose)
if input == nil {
log.Printf("encoder: Unable to open input")
cleanupClients(clients)
return
}
log.Printf("encoder: Opened input")
if verbose {
log.Printf("encoder: Opened input")
}
}
// Read a packet.
@@ -164,6 +166,7 @@ func encoder(inputFormat, inputURL string, verbose bool,
if readRes == -1 {
log.Printf("encoder: Failure reading packet")
C.vs_destroy_input(input)
cleanupClients(clients)
return
}
@@ -171,10 +174,6 @@ func encoder(inputFormat, inputURL string, verbose bool,
continue
}
if verbose {
log.Printf("encoder: read packet")
}
// Write the packet to all clients.
clients = writePacketToClients(input, &pkt, clients, verbose)
@@ -199,6 +198,22 @@ func acceptClients(clientChan <-chan *Client, clients []*Client) []*Client {
}
}
func cleanupClients(clients []*Client) {
for _, client := range clients {
cleanupClient(client)
}
}
func cleanupClient(client *Client) {
// Closing write side will make read side receive EOF.
_ = client.OutPipe.Close()
if client.Output != nil {
C.vs_destroy_output(client.Output)
client.Output = nil
}
}
func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput {
inputFormatC := C.CString(inputFormat)
inputURLC := C.CString(inputURL)
@@ -231,10 +246,10 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
client.Output = openOutput(outputFormat, outputURL, verbose, input)
if client.Output == nil {
log.Printf("Unable to open output for client")
// TODO: Will client reader realize it is closed?
_ = client.OutPipe.Close()
cleanupClient(client)
continue
}
log.Printf("Opened output for client")
}
@@ -244,18 +259,11 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose))
if writeRes == -1 {
log.Printf("Failure writing packet")
_ = client.OutPipe.Close()
C.vs_destroy_output(client.Output)
client.Output = nil
cleanupClient(client)
continue
}
if verbose {
log.Printf("encoder: wrote packet to client")
}
// Keep the client around.
clients2 = append(clients2, client)
}
@@ -325,6 +333,7 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
break
}
// We get EOF if write side of pipe closed.
if readSize == 0 {
log.Printf("%s: EOF", r.RemoteAddr)
break
@@ -352,10 +361,8 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
}
}
// Writes to write side will raise error when read side is closed.
_ = inPipe.Close()
_ = outPipe.Close()
// TODO: Do we need to drain pipe?
log.Printf("%s: Client cleaned up", r.RemoteAddr)
}