mirror of
https://github.com/bolucat/Archive.git
synced 2025-09-26 20:21:35 +08:00
Update On Wed Dec 4 19:40:20 CET 2024
This commit is contained in:
@@ -710,6 +710,9 @@ asio::error_code CliConnection::OnReadHttpRequest(std::shared_ptr<IOBuf> buf) {
|
||||
}
|
||||
|
||||
http_is_keep_alive_ = false;
|
||||
DCHECK_EQ(0, http_keep_alive_remaining_bytes_);
|
||||
http_keep_alive_remaining_bytes_ = 0;
|
||||
DCHECK(!http_keep_alive_pending_buf_);
|
||||
|
||||
if (ok) {
|
||||
buf->trimStart(nparsed);
|
||||
@@ -726,7 +729,20 @@ asio::error_code CliConnection::OnReadHttpRequest(std::shared_ptr<IOBuf> buf) {
|
||||
buf->prepend(header.size());
|
||||
memcpy(buf->mutable_data(), header.c_str(), header.size());
|
||||
http_is_keep_alive_ = gurl_base::CompareCaseInsensitiveASCII(parser.connection(), "Keep-Alive"sv) == 0;
|
||||
http_keep_alive_remaining_bytes_ += parser.content_length() + header.size() - buf->length();
|
||||
http_keep_alive_remaining_bytes_ = parser.content_length() + header.size() - buf->length();
|
||||
if (http_is_keep_alive_ && http_keep_alive_remaining_bytes_ < 0) {
|
||||
VLOG(2) << "Connection (client) " << connection_id() << " http keepalive request splited";
|
||||
// split buf into two buffers
|
||||
// buf -> (request_size)
|
||||
// http_keep_alive_pending_buf_ -> (bytes after request_size)
|
||||
auto request_size = parser.content_length() + header.size();
|
||||
http_keep_alive_remaining_bytes_ = 0u;
|
||||
http_keep_alive_pending_buf_ =
|
||||
IOBuf::copyBuffer(buf->data() + request_size, buf->length() - request_size, 0, 0);
|
||||
buf->trimEnd(buf->length() - request_size);
|
||||
DCHECK_EQ(buf->length(), request_size);
|
||||
// FIXME yield to resume http handling
|
||||
}
|
||||
VLOG(3) << "Connection (client) " << connection_id() << " Host: " << http_host_ << " Port: " << http_port_
|
||||
<< " KEEPALIVE: " << std::boolalpha << http_is_keep_alive_;
|
||||
if (parser.transfer_encoding_is_chunked()) {
|
||||
@@ -749,6 +765,41 @@ asio::error_code CliConnection::OnReadHttpRequest(std::shared_ptr<IOBuf> buf) {
|
||||
return asio::error::invalid_argument;
|
||||
}
|
||||
|
||||
asio::error_code CliConnection::OnReadHttpRequestAfterReuse(std::shared_ptr<IOBuf>& buf) {
|
||||
DCHECK(http_is_keep_alive_);
|
||||
DCHECK_EQ(http_host_, request_.domain_name());
|
||||
DCHECK_EQ(http_port_, request_.port());
|
||||
auto ec = OnReadHttpRequest(buf);
|
||||
if (ec) {
|
||||
return ec;
|
||||
}
|
||||
if (http_is_connect_) {
|
||||
return asio::error::invalid_argument;
|
||||
}
|
||||
DCHECK(!buf->empty());
|
||||
// request_ isn't updated after OnReadHttpRequest(),
|
||||
// so we are safe to use it to tell whether the new destination is different
|
||||
if (request_.domain_name() != http_host_ || request_.port() != http_port_) {
|
||||
LOG(INFO) << "Connection (client) " << connection_id() << " re-used";
|
||||
upstream_readable_ = false;
|
||||
upstream_writable_ = false;
|
||||
request_ = {};
|
||||
channel_->close();
|
||||
channel_.reset();
|
||||
OnStreamRead(std::move(buf));
|
||||
DCHECK(!buf);
|
||||
ec = PerformCmdOpsHttp();
|
||||
if (ec) {
|
||||
return ec;
|
||||
}
|
||||
SetState(state_stream);
|
||||
return asio::error::try_again;
|
||||
}
|
||||
LOG(INFO) << "Connection (client) " << connection_id() << " connect (re-used) " << remote_domain();
|
||||
SetState(state_stream);
|
||||
return asio::error_code();
|
||||
}
|
||||
|
||||
void CliConnection::WaitStreamError() {
|
||||
scoped_refptr<CliConnection> self(this);
|
||||
downlink_->async_wait_error([this, self](asio::error_code ec) {
|
||||
@@ -1705,8 +1756,23 @@ std::shared_ptr<IOBuf> CliConnection::GetNextUpstreamBuf(asio::error_code& ec,
|
||||
}
|
||||
#endif
|
||||
|
||||
// if we have read previously in OnReadHttpRequestAfterReuse
|
||||
if (http_is_keep_alive_ && http_keep_alive_pending_buf_) {
|
||||
VLOG(2) << "Connection (client) " << connection_id() << " http keepalive splited request resumed";
|
||||
buf = std::move(http_keep_alive_pending_buf_);
|
||||
read = buf->length();
|
||||
DCHECK(!http_keep_alive_pending_buf_);
|
||||
goto after_read;
|
||||
}
|
||||
|
||||
do {
|
||||
buf = IOBuf::create(SOCKET_BUF_SIZE);
|
||||
// sometimes we need to read from here
|
||||
if (http_is_keep_alive_ && http_keep_alive_previous_buf_) {
|
||||
buf = std::move(http_keep_alive_previous_buf_);
|
||||
DCHECK(!http_keep_alive_previous_buf_);
|
||||
} else {
|
||||
buf = IOBuf::create(SOCKET_BUF_SIZE);
|
||||
}
|
||||
read = downlink_->socket_.read_some(tail_buffer(*buf, SOCKET_BUF_SIZE), ec);
|
||||
if (ec == asio::error::interrupted) {
|
||||
continue;
|
||||
@@ -1729,6 +1795,7 @@ std::shared_ptr<IOBuf> CliConnection::GetNextUpstreamBuf(asio::error_code& ec,
|
||||
goto out;
|
||||
}
|
||||
|
||||
after_read:
|
||||
if (!channel_ || !channel_->connected()) {
|
||||
OnStreamRead(buf);
|
||||
ec = asio::error::try_again;
|
||||
@@ -1736,49 +1803,30 @@ std::shared_ptr<IOBuf> CliConnection::GetNextUpstreamBuf(asio::error_code& ec,
|
||||
}
|
||||
|
||||
if (http_is_keep_alive_) {
|
||||
if (http_keep_alive_remaining_bytes_ < (int64_t)read) {
|
||||
DCHECK_EQ(http_host_, request_.domain_name());
|
||||
DCHECK_EQ(http_port_, request_.port());
|
||||
ec = OnReadHttpRequest(buf);
|
||||
if (ec) {
|
||||
return nullptr;
|
||||
}
|
||||
if (http_is_connect_) {
|
||||
ec = asio::error::invalid_argument;
|
||||
return nullptr;
|
||||
}
|
||||
if (buf->empty()) {
|
||||
buf.reset();
|
||||
}
|
||||
// request_ isn't updated after OnReadHttpRequest(),
|
||||
// so we are safe to use it to tell whether the new destination is different
|
||||
if (request_.domain_name() != http_host_ || request_.port() != http_port_) {
|
||||
LOG(INFO) << "Connection (client) " << connection_id() << " re-used";
|
||||
upstream_readable_ = false;
|
||||
upstream_writable_ = false;
|
||||
request_ = {};
|
||||
channel_->close();
|
||||
channel_.reset();
|
||||
if (buf) {
|
||||
OnStreamRead(buf);
|
||||
buf.reset();
|
||||
}
|
||||
ec = PerformCmdOpsHttp();
|
||||
if (ec) {
|
||||
return nullptr;
|
||||
}
|
||||
SetState(state_stream);
|
||||
if (http_keep_alive_remaining_bytes_ == 0) {
|
||||
ec = OnReadHttpRequestAfterReuse(buf);
|
||||
/* use a small number to remind of incomplete request */
|
||||
if (ec == asio::error::invalid_argument && buf->length() < 64u * 1024) {
|
||||
http_keep_alive_previous_buf_ = std::move(buf);
|
||||
DCHECK(!buf);
|
||||
ec = asio::error::try_again;
|
||||
return nullptr;
|
||||
}
|
||||
LOG(INFO) << "Connection (client) " << connection_id() << " connect (re-used) " << remote_domain();
|
||||
SetState(state_stream);
|
||||
if (!buf) {
|
||||
ec = asio::error::try_again;
|
||||
if (ec) {
|
||||
return nullptr;
|
||||
}
|
||||
} else if (http_keep_alive_remaining_bytes_ < (int64_t)read) {
|
||||
VLOG(2) << "Connection (client) " << connection_id()
|
||||
<< " http keepalive consumed: " << http_keep_alive_remaining_bytes_;
|
||||
DCHECK_GE(http_keep_alive_remaining_bytes_, 0);
|
||||
DCHECK_EQ(read, buf->length());
|
||||
DCHECK(!http_keep_alive_pending_buf_);
|
||||
http_keep_alive_pending_buf_ = IOBuf::copyBuffer(buf->data() + http_keep_alive_remaining_bytes_,
|
||||
buf->length() - http_keep_alive_remaining_bytes_);
|
||||
buf->trimEnd(buf->length() - http_keep_alive_remaining_bytes_);
|
||||
http_keep_alive_remaining_bytes_ = 0;
|
||||
} else {
|
||||
VLOG(2) << "Connection (client) " << connection_id() << " http keepalive consumed: " << read;
|
||||
http_keep_alive_remaining_bytes_ -= read;
|
||||
}
|
||||
}
|
||||
|
@@ -277,6 +277,8 @@ class CliConnection : public gurl_base::RefCountedThreadSafe<CliConnection>,
|
||||
asio::error_code OnReadSocks4Handshake(std::shared_ptr<IOBuf> buf);
|
||||
/// Start to read http handshake request
|
||||
asio::error_code OnReadHttpRequest(std::shared_ptr<IOBuf> buf);
|
||||
/// Start to read http handshake request (after reuse)
|
||||
asio::error_code OnReadHttpRequestAfterReuse(std::shared_ptr<IOBuf>& buf);
|
||||
|
||||
/// Start wait error on stream
|
||||
void WaitStreamError();
|
||||
@@ -356,6 +358,10 @@ class CliConnection : public gurl_base::RefCountedThreadSafe<CliConnection>,
|
||||
bool http_is_keep_alive_ = false;
|
||||
/// copy of remaining bytes in keep alive cycle
|
||||
int64_t http_keep_alive_remaining_bytes_ = 0;
|
||||
/// remaining buffer for unhandled http keep-alive requests
|
||||
std::shared_ptr<IOBuf> http_keep_alive_pending_buf_;
|
||||
/// previous buffer for an incomplete http keep-alive request
|
||||
std::shared_ptr<IOBuf> http_keep_alive_previous_buf_;
|
||||
|
||||
/// copy of upstream request
|
||||
ss::request request_;
|
||||
|
@@ -372,6 +372,7 @@ void HttpRequestParser::HeaderDone() {
|
||||
if (status_ == ParserStatus::Error) {
|
||||
return;
|
||||
}
|
||||
status_ = ParserStatus::Ok;
|
||||
headers_done_ = true;
|
||||
}
|
||||
|
||||
@@ -381,6 +382,7 @@ void HttpRequestParser::MessageDone() {
|
||||
if (status_ == ParserStatus::Error) {
|
||||
return;
|
||||
}
|
||||
status_ = ParserStatus::Ok;
|
||||
framer_.Reset();
|
||||
first_byte_processed_ = false;
|
||||
headers_done_ = false;
|
||||
|
@@ -112,7 +112,7 @@ class HttpRequestParser : public quiche::BalsaVisitorInterface {
|
||||
|
||||
bool first_byte_processed_ = false;
|
||||
bool headers_done_ = false;
|
||||
ParserStatus status_ = ParserStatus::Ok;
|
||||
ParserStatus status_ = ParserStatus::Paused;
|
||||
int status_code_ = 0;
|
||||
// An error message, often seemingly arbitrary to match http-parser behavior.
|
||||
std::string_view error_message_;
|
||||
|
Reference in New Issue
Block a user