diff --git a/stream/sink.go b/stream/sink.go index d50e240..e535af5 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -206,6 +206,8 @@ func (s *BaseSink) doAsyncWrite() { duration := time.Now().UnixMilli() - l if err != nil { log.Sugar.Errorf(err.Error()) + <-s.cancelCtx.Done() + return } data.Release()