fix: 重复处理首个 chunk

This commit is contained in:
wisdgod
2025-01-28 06:27:14 +08:00
parent 00a6980da9
commit 0d8035fd02
3 changed files with 25 additions and 23 deletions

View File

@@ -125,6 +125,7 @@ impl ErrorResponse {
pub enum StreamError {
ChatError(ChatError),
DataLengthLessThan5,
EmptyStream,
}
impl std::fmt::Display for StreamError {
@@ -132,6 +133,7 @@ impl std::fmt::Display for StreamError {
match self {
StreamError::ChatError(error) => write!(f, "{}", error.error.code),
StreamError::DataLengthLessThan5 => write!(f, "data length less than 5"),
StreamError::EmptyStream => write!(f, "empty stream"),
}
}
}

View File

@@ -575,7 +575,6 @@ pub async fn handle_chat(
}
}
// 处理后续的stream
stream.then({
let decoder = decoder.clone();
let response_id = response_id.clone();
@@ -605,7 +604,6 @@ pub async fn handle_chat(
current_id,
};
// 使用decoder处理chunk
let messages = match decoder.lock().await.decode(&chunk, convert_web_ref) {
Ok(msgs) => msgs,
Err(e) => {
@@ -614,19 +612,19 @@ pub async fn handle_chat(
}
};
let mut response_data = String::new();
// let mut response_data = String::new();
if let Some(first_msg) = decoder.lock().await.take_first_result() {
let first_response = process_messages(first_msg, &ctx).await;
if !first_response.is_empty() {
response_data.push_str(&first_response);
}
}
// if let Some(first_msg) = decoder.lock().await.take_first_result() {
// let first_response = process_messages(first_msg, &ctx).await;
// if !first_response.is_empty() {
// response_data.push_str(&first_response);
// }
// }
let current_response = process_messages(messages, &ctx).await;
if !current_response.is_empty() {
response_data.push_str(&current_response);
}
let response_data = process_messages(messages, &ctx).await;
// if !current_response.is_empty() {
// response_data.push_str(&current_response);
// }
Ok(Bytes::from(response_data))
}

View File

@@ -77,16 +77,15 @@ impl StreamDecoder {
}
}
// 获取第一个结果的引用
pub fn take_first_result(&mut self) -> Option<Vec<StreamMessage>> {
if !self.buffer.is_empty() {
return None;
}
if self.first_result.is_some() {
self.first_result_taken = true;
}
self.first_result.take()
}
// pub fn take_first_result(&mut self) -> Option<Vec<StreamMessage>> {
// if !self.buffer.is_empty() {
// return None;
// }
// if self.first_result.is_some() {
// self.first_result_taken = true;
// }
// self.first_result.take()
// }
#[cfg(test)]
fn is_incomplete(&self) -> bool {
@@ -101,6 +100,9 @@ impl StreamDecoder {
self.buffer.extend_from_slice(data);
if self.buffer.len() < 5 {
if self.buffer.len() == 0 {
return Err(StreamError::EmptyStream);
}
crate::debug_println!("数据长度小于5字节当前数据: {}", hex::encode(&self.buffer));
return Err(StreamError::DataLengthLessThan5);
}