Synchronize access to input and client outputs

It appears there is a race condition in that we could close the input or
client's output while the packet writer goroutine is still using them.
This could lead to crashes.

This hopefully fixes #6.
This commit is contained in:
Will Storey
2018-01-23 18:28:40 -08:00
parent c2aa2d30b5
commit 4c857a40fb

View File

@@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"net/http/fcgi" "net/http/fcgi"
"os" "os"
"sync"
"unsafe" "unsafe"
) )
@@ -37,6 +38,10 @@ type HTTPHandler struct {
// Client is servicing one HTTP client. // Client is servicing one HTTP client.
type Client struct { type Client struct {
// Protect access to Output in particular. Destroying it when we clean up
// the client can race with packetWriter().
mutex *sync.RWMutex
// packetWriter goroutine writes out video packets to this pipe. HTTP // packetWriter goroutine writes out video packets to this pipe. HTTP
// goroutine reads from the read side. // goroutine reads from the read side.
OutPipe *os.File OutPipe *os.File
@@ -138,7 +143,7 @@ func getArgs() (Args, error) {
func encoder(inputFormat, inputURL string, verbose bool, func encoder(inputFormat, inputURL string, verbose bool,
clientChan <-chan *Client) { clientChan <-chan *Client) {
clients := []*Client{} clients := []*Client{}
var input *C.struct_VSInput var input *Input
for { for {
// If there are no clients, then block waiting for one. // If there are no clients, then block waiting for one.
@@ -178,10 +183,12 @@ func encoder(inputFormat, inputURL string, verbose bool,
// Read a packet. // Read a packet.
var pkt C.AVPacket var pkt C.AVPacket
readRes := C.int(0) readRes := C.int(0)
readRes = C.vs_read_packet(input, &pkt, C.bool(verbose)) // We might want to lock input here. It's probably not necessary though.
// Other goroutines should only be reading it. We're the writer.
readRes = C.vs_read_packet(input.vsInput, &pkt, C.bool(verbose))
if readRes == -1 { if readRes == -1 {
log.Printf("encoder: Failure reading packet") log.Printf("encoder: Failure reading packet")
C.vs_destroy_input(input) destroyInput(input)
cleanupClients(clients) cleanupClients(clients)
return return
} }
@@ -203,7 +210,7 @@ func encoder(inputFormat, inputURL string, verbose bool,
// If we get down to zero clients, close the input. // If we get down to zero clients, close the input.
if len(clients) == 0 { if len(clients) == 0 {
C.vs_destroy_input(input) destroyInput(input)
input = nil input = nil
log.Printf("encoder: Closed input") log.Printf("encoder: Closed input")
} }
@@ -228,6 +235,8 @@ func cleanupClients(clients []*Client) {
} }
func cleanupClient(client *Client) { func cleanupClient(client *Client) {
client.mutex.Lock()
// Closing write side will make read side receive EOF. // Closing write side will make read side receive EOF.
if client.OutPipe != nil { if client.OutPipe != nil {
_ = client.OutPipe.Close() _ = client.OutPipe.Close()
@@ -239,6 +248,8 @@ func cleanupClient(client *Client) {
client.Output = nil client.Output = nil
} }
client.mutex.Unlock()
if client.PacketChan != nil { if client.PacketChan != nil {
close(client.PacketChan) close(client.PacketChan)
@@ -258,7 +269,13 @@ func cleanupClient(client *Client) {
} }
} }
func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput { // Input represents a video input.
type Input struct {
mutex *sync.RWMutex
vsInput *C.struct_VSInput
}
func openInput(inputFormat, inputURL string, verbose bool) *Input {
inputFormatC := C.CString(inputFormat) inputFormatC := C.CString(inputFormat)
inputURLC := C.CString(inputURL) inputURLC := C.CString(inputURL)
@@ -272,12 +289,24 @@ func openInput(inputFormat, inputURL string, verbose bool) *C.struct_VSInput {
C.free(unsafe.Pointer(inputFormatC)) C.free(unsafe.Pointer(inputFormatC))
C.free(unsafe.Pointer(inputURLC)) C.free(unsafe.Pointer(inputURLC))
return input return &Input{
mutex: &sync.RWMutex{},
vsInput: input,
}
}
func destroyInput(input *Input) {
input.mutex.Lock()
defer input.mutex.Unlock()
if input.vsInput != nil {
C.vs_destroy_input(input.vsInput)
}
} }
// Try to write the packet to each client. If we fail, we clean up the client // Try to write the packet to each client. If we fail, we clean up the client
// and it will not be in the returned list of clients. // and it will not be in the returned list of clients.
func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket, func writePacketToClients(input *Input, pkt *C.AVPacket,
clients []*Client, verbose bool) []*Client { clients []*Client, verbose bool) []*Client {
// Rewrite clients slice with only those we succeeded in writing to. If we // Rewrite clients slice with only those we succeeded in writing to. If we
// failed for some reason we clean up the client and no longer send it // failed for some reason we clean up the client and no longer send it
@@ -286,6 +315,7 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
for _, client := range clients { for _, client := range clients {
// Open the client's output if it is not yet open. // Open the client's output if it is not yet open.
client.mutex.Lock()
if client.Output == nil { if client.Output == nil {
outputFormat := "mp4" outputFormat := "mp4"
outputURL := fmt.Sprintf("pipe:%d", client.OutPipe.Fd()) outputURL := fmt.Sprintf("pipe:%d", client.OutPipe.Fd())
@@ -293,6 +323,7 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
if client.Output == nil { if client.Output == nil {
log.Printf("Unable to open output for client") log.Printf("Unable to open output for client")
cleanupClient(client) cleanupClient(client)
client.mutex.Unlock()
continue continue
} }
@@ -308,6 +339,7 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
log.Printf("Opened output for client") log.Printf("Opened output for client")
} }
client.mutex.Unlock()
// Duplicate the packet. Each client's goroutine will receive a copy. // Duplicate the packet. Each client's goroutine will receive a copy.
pktCopy := C.av_packet_clone(pkt) pktCopy := C.av_packet_clone(pkt)
@@ -337,15 +369,21 @@ func writePacketToClients(input *C.struct_VSInput, pkt *C.AVPacket,
// Receive packets from the encoder, and write them out to the client's pipe. // Receive packets from the encoder, and write them out to the client's pipe.
// //
// We end when encoder closes the channel, or if we encounter a write error. // We end when encoder closes the channel, or if we encounter a write error.
func packetWriter(client *Client, input *C.struct_VSInput, verbose bool) { func packetWriter(client *Client, input *Input, verbose bool) {
for pkt := range client.PacketChan { for pkt := range client.PacketChan {
writeRes := C.int(0) writeRes := C.int(0)
writeRes = C.vs_write_packet(input, client.Output, pkt, C.bool(verbose)) client.mutex.RLock()
input.mutex.RLock()
writeRes = C.vs_write_packet(input.vsInput, client.Output, pkt,
C.bool(verbose))
input.mutex.RUnlock()
if writeRes == -1 { if writeRes == -1 {
log.Printf("Failure writing packet") log.Printf("Failure writing packet")
C.av_packet_free(&pkt) C.av_packet_free(&pkt)
client.mutex.RUnlock()
return return
} }
client.mutex.RUnlock()
C.av_packet_free(&pkt) C.av_packet_free(&pkt)
} }
} }
@@ -353,11 +391,14 @@ func packetWriter(client *Client, input *C.struct_VSInput, verbose bool) {
// Open the output file. This creates an MP4 container and writes the header to // Open the output file. This creates an MP4 container and writes the header to
// the given output URL. // the given output URL.
func openOutput(outputFormat, outputURL string, verbose bool, func openOutput(outputFormat, outputURL string, verbose bool,
input *C.struct_VSInput) *C.struct_VSOutput { input *Input) *C.struct_VSOutput {
outputFormatC := C.CString("mp4") outputFormatC := C.CString("mp4")
outputURLC := C.CString(outputURL) outputURLC := C.CString(outputURL)
output := C.vs_open_output(outputFormatC, outputURLC, input, C.bool(verbose)) input.mutex.RLock()
output := C.vs_open_output(outputFormatC, outputURLC, input.vsInput,
C.bool(verbose))
input.mutex.RUnlock()
if output == nil { if output == nil {
log.Printf("Unable to open output") log.Printf("Unable to open output")
C.free(unsafe.Pointer(outputFormatC)) C.free(unsafe.Pointer(outputFormatC))
@@ -400,6 +441,7 @@ func (h HTTPHandler) streamRequest(rw http.ResponseWriter, r *http.Request) {
} }
c := &Client{ c := &Client{
mutex: &sync.RWMutex{},
OutPipe: outPipe, OutPipe: outPipe,
} }