client: add DialReadContext, DialPublishContext

This commit is contained in:
aler9
2021-05-10 18:11:01 +02:00
parent c7bdded8f0
commit e75b14c608
2 changed files with 42 additions and 0 deletions

View File

@@ -115,6 +115,11 @@ func (c *Client) Dial(scheme string, host string) (*ClientConn, error) {
// DialRead connects to the address and starts reading all tracks. // DialRead connects to the address and starts reading all tracks.
func (c *Client) DialRead(address string) (*ClientConn, error) { func (c *Client) DialRead(address string) (*ClientConn, error) {
return c.DialReadContext(context.Background(), address)
}
// DialReadContext connects to the address with the given context and starts reading all tracks.
func (c *Client) DialReadContext(ctx context.Context, address string) (*ClientConn, error) {
u, err := base.ParseURL(address) u, err := base.ParseURL(address)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -125,6 +130,21 @@ func (c *Client) DialRead(address string) (*ClientConn, error) {
return nil, err return nil, err
} }
ctxHandlerDone := make(chan struct{})
defer func() { <-ctxHandlerDone }()
ctxHandlerTerminate := make(chan struct{})
defer close(ctxHandlerTerminate)
go func() {
defer close(ctxHandlerDone)
select {
case <-ctx.Done():
conn.Close()
case <-ctxHandlerTerminate:
}
}()
_, err = conn.Options(u) _, err = conn.Options(u)
if err != nil { if err != nil {
conn.Close() conn.Close()
@@ -156,6 +176,11 @@ func (c *Client) DialRead(address string) (*ClientConn, error) {
// DialPublish connects to the address and starts publishing the tracks. // DialPublish connects to the address and starts publishing the tracks.
func (c *Client) DialPublish(address string, tracks Tracks) (*ClientConn, error) { func (c *Client) DialPublish(address string, tracks Tracks) (*ClientConn, error) {
return c.DialPublishContext(context.Background(), address, tracks)
}
// DialPublishContext connects to the address with the given context and starts publishing the tracks.
func (c *Client) DialPublishContext(ctx context.Context, address string, tracks Tracks) (*ClientConn, error) {
u, err := base.ParseURL(address) u, err := base.ParseURL(address)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -166,6 +191,21 @@ func (c *Client) DialPublish(address string, tracks Tracks) (*ClientConn, error)
return nil, err return nil, err
} }
ctxHandlerDone := make(chan struct{})
defer func() { <-ctxHandlerDone }()
ctxHandlerTerminate := make(chan struct{})
defer close(ctxHandlerTerminate)
go func() {
defer close(ctxHandlerDone)
select {
case <-ctx.Done():
conn.Close()
case <-ctxHandlerTerminate:
}
}()
_, err = conn.Options(u) _, err = conn.Options(u)
if err != nil { if err != nil {
conn.Close() conn.Close()

View File

@@ -832,6 +832,8 @@ func (cc *ClientConn) do(req *base.Request, skipResponse bool) (*base.Response,
// it's better not to stop the request, but wait until teardown // it's better not to stop the request, but wait until teardown
if !skipResponse { if !skipResponse {
ctxHandlerDone := make(chan struct{}) ctxHandlerDone := make(chan struct{})
defer func() { <-ctxHandlerDone }()
ctxHandlerTerminate := make(chan struct{}) ctxHandlerTerminate := make(chan struct{})
defer close(ctxHandlerTerminate) defer close(ctxHandlerTerminate)