From 0d8035fd02e74350b55619845a76adc510c42b51 Mon Sep 17 00:00:00 2001 From: wisdgod Date: Tue, 28 Jan 2025 06:27:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=87=8D=E5=A4=8D=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=A6=96=E4=B8=AA=20chunk?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/error.rs | 2 ++ src/chat/service.rs | 24 +++++++++++------------- src/chat/stream/decoder.rs | 22 ++++++++++++---------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/chat/error.rs b/src/chat/error.rs index 5fcc1e0..c159ba1 100644 --- a/src/chat/error.rs +++ b/src/chat/error.rs @@ -125,6 +125,7 @@ impl ErrorResponse { pub enum StreamError { ChatError(ChatError), DataLengthLessThan5, + EmptyStream, } impl std::fmt::Display for StreamError { @@ -132,6 +133,7 @@ impl std::fmt::Display for StreamError { match self { StreamError::ChatError(error) => write!(f, "{}", error.error.code), StreamError::DataLengthLessThan5 => write!(f, "data length less than 5"), + StreamError::EmptyStream => write!(f, "empty stream"), } } } diff --git a/src/chat/service.rs b/src/chat/service.rs index ada0b49..165ebd0 100644 --- a/src/chat/service.rs +++ b/src/chat/service.rs @@ -575,7 +575,6 @@ pub async fn handle_chat( } } - // 处理后续的stream stream.then({ let decoder = decoder.clone(); let response_id = response_id.clone(); @@ -605,7 +604,6 @@ pub async fn handle_chat( current_id, }; - // 使用decoder处理chunk let messages = match decoder.lock().await.decode(&chunk, convert_web_ref) { Ok(msgs) => msgs, Err(e) => { @@ -614,19 +612,19 @@ pub async fn handle_chat( } }; - let mut response_data = String::new(); + // let mut response_data = String::new(); - if let Some(first_msg) = decoder.lock().await.take_first_result() { - let first_response = process_messages(first_msg, &ctx).await; - if !first_response.is_empty() { - response_data.push_str(&first_response); - } - } + // if let Some(first_msg) = decoder.lock().await.take_first_result() { + // let first_response = process_messages(first_msg, &ctx).await; + // if !first_response.is_empty() { + // response_data.push_str(&first_response); + // } + // } - let current_response = process_messages(messages, &ctx).await; - if !current_response.is_empty() { - response_data.push_str(¤t_response); - } + let response_data = process_messages(messages, &ctx).await; + // if !current_response.is_empty() { + // response_data.push_str(¤t_response); + // } Ok(Bytes::from(response_data)) } diff --git a/src/chat/stream/decoder.rs b/src/chat/stream/decoder.rs index 76e23d1..9e80e20 100644 --- a/src/chat/stream/decoder.rs +++ b/src/chat/stream/decoder.rs @@ -77,16 +77,15 @@ impl StreamDecoder { } } - // 获取第一个结果的引用 - pub fn take_first_result(&mut self) -> Option> { - if !self.buffer.is_empty() { - return None; - } - if self.first_result.is_some() { - self.first_result_taken = true; - } - self.first_result.take() - } + // pub fn take_first_result(&mut self) -> Option> { + // if !self.buffer.is_empty() { + // return None; + // } + // if self.first_result.is_some() { + // self.first_result_taken = true; + // } + // self.first_result.take() + // } #[cfg(test)] fn is_incomplete(&self) -> bool { @@ -101,6 +100,9 @@ impl StreamDecoder { self.buffer.extend_from_slice(data); if self.buffer.len() < 5 { + if self.buffer.len() == 0 { + return Err(StreamError::EmptyStream); + } crate::debug_println!("数据长度小于5字节,当前数据: {}", hex::encode(&self.buffer)); return Err(StreamError::DataLengthLessThan5); }