From 59b545d8ff293b3353dac613ce58a9f710ea8848 Mon Sep 17 00:00:00 2001 From: wisdgod Date: Thu, 14 Aug 2025 10:58:13 +0800 Subject: [PATCH] perf(codec): optimize protobuf encoding with direct memory management - Replace Vec abstractions with direct memory allocation for zero-overhead operations - Use encode_raw() to eliminate redundant encoded_len calculations - Implement allocation fusion: single contiguous memory block for header and message - Defer Vec construction until final stage to avoid intermediate state overhead This low-level optimization reduces CPU cycles in the hot path by bypassing Vec's boundary checks and capacity management while maintaining safety through careful unsafe block usage. --- Cargo.toml | 2 +- src/common/utils.rs | 78 ++++++++++++++++------------- src/core/aiserver/v1/aiserver.v1.rs | 13 +++-- src/core/aiserver/v1/lite.proto | 3 ++ src/core/stream/decoder/utils.rs | 2 +- 5 files changed, 55 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b6995df..c280119 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ serde = { version = "1", default-features = false, features = ["std", "derive", # serde_json = { package = "sonic-rs", version = "0" } serde_json = "1" sha2 = { version = "0", default-features = false } -sysinfo = { version = "0.36", default-features = false, features = ["system"] } +sysinfo = { version = "0.37", default-features = false, features = ["system"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "time", "fs", "signal"] } tokio-util = { version = "0.7", features = ["io"] } # tokio-tungstenite = { version = "0.26.2", features = ["rustls-tls-webpki-roots"] } diff --git a/src/common/utils.rs b/src/common/utils.rs index e6d0d01..b61e7a6 100644 --- a/src/common/utils.rs +++ b/src/common/utils.rs @@ -531,24 +531,25 @@ pub fn tokeninfo_to_token(info: key_config::TokenInfo) -> Option { /// 压缩数据为gzip格式 #[inline] -fn compress_gzip(data: &[u8]) -> Result, std::io::Error> { +fn compress_gzip(data: &[u8]) -> Result, ::std::io::Error> { use std::io::Write as _; use flate2::{Compression, write::GzEncoder}; const LEVEL: Compression = Compression::new(6); - let mut encoder = GzEncoder::new(Vec::new(), LEVEL); + // 预分配容量,gzip压缩后通常会变小,但预留一些空间给gzip头部 + let estimated_size = data.len() / 2 + 64; + let mut encoder = GzEncoder::new(Vec::with_capacity(estimated_size), LEVEL); encoder.write_all(data)?; encoder.finish() } -#[allow(clippy::uninit_vec)] #[inline(always)] pub fn encode_message( message: &impl ::prost::Message, maybe_stream: bool, -) -> Result, Box> { +) -> Result, Box> { const COMPRESSION_THRESHOLD: usize = 1024; // 1KB const LENGTH_OVERFLOW_MSG: &str = "Message length exceeds ~4 GiB"; @@ -556,52 +557,57 @@ pub fn encode_message( if !maybe_stream { let mut encoded = Vec::with_capacity(estimated_size); - __unwrap!(message.encode(&mut encoded)); + message.encode_raw(&mut encoded); return Ok(encoded); } - // 预留头部空间 - let mut buf = Vec::with_capacity(5 + estimated_size); - unsafe { - // 跳过头部5字节 - buf.set_len(5); + use ::core::{alloc::Layout, ptr}; - // 编码消息 - __unwrap!(message.encode(&mut buf)); - let message_len = buf.len() - 5; + // 分配连续内存块:[头部 5B][消息体] + let layout = Layout::from_size_align_unchecked(5 + estimated_size, 1); + let ptr = ::std::alloc::alloc(layout); + if ptr.is_null() { + ::std::alloc::handle_alloc_error(layout); + } - // 判断是否需要压缩 - let (compression_flag, final_len) = if message_len >= COMPRESSION_THRESHOLD { - // 需要压缩 - let compressed = compress_gzip(buf.get_unchecked(5..))?; - let compressed_len = compressed.len(); + let header_ptr = ptr; + let body_ptr = ptr.add(5); - // 只在压缩后更小时才使用压缩版本 - if compressed_len < message_len { - // 直接覆盖原数据 - let dst = buf.as_mut_ptr().add(5); - ::core::ptr::copy_nonoverlapping(compressed.as_ptr(), dst, compressed_len); - // 截断到正确长度 - buf.set_len(5 + compressed_len); - (0x01, compressed_len) + // 直接编码到预分配的内存 + message.encode_raw(&mut ::core::slice::from_raw_parts_mut( + body_ptr, + estimated_size, + )); + + // 压缩处理:仅当压缩后更小时才使用压缩版本 + let (compression_flag, final_len) = if estimated_size >= COMPRESSION_THRESHOLD { + let compressed = + compress_gzip(::core::slice::from_raw_parts(body_ptr, estimated_size))?; + + if compressed.len() < estimated_size { + // 原地覆盖为压缩数据 + ptr::copy_nonoverlapping(compressed.as_ptr(), body_ptr, compressed.len()); + (0x01, compressed.len()) } else { - // 压缩后反而更大,保持原样 - (0x00, message_len) + (0x00, estimated_size) } } else { - // 不需要压缩 - (0x00, message_len) + (0x00, estimated_size) }; - // 统一写入头部 + // 构建协议头:[压缩标志 1B][长度 4B BE] let len = u32::try_from(final_len).map_err(|_| LENGTH_OVERFLOW_MSG)?; - let ptr = buf.as_mut_ptr(); - *ptr = compression_flag; - *(ptr.add(1) as *mut [u8; 4]) = len.to_be_bytes(); - } + *header_ptr = compression_flag; + ptr::copy_nonoverlapping(len.to_be_bytes().as_ptr(), header_ptr.add(1), 4); - Ok(buf) + // 转换为 Vec,保留原始容量避免重分配 + Ok(Vec::from_raw_parts( + ptr, + 5 + final_len, // 实际使用 + 5 + estimated_size, // 已分配 + )) + } } /// 生成 PKCE code_verifier 和对应的 code_challenge (S256 method). diff --git a/src/core/aiserver/v1/aiserver.v1.rs b/src/core/aiserver/v1/aiserver.v1.rs index 673f6f9..1ce3692 100644 --- a/src/core/aiserver/v1/aiserver.v1.rs +++ b/src/core/aiserver/v1/aiserver.v1.rs @@ -637,10 +637,13 @@ pub mod stream_cpp_request { where D: ::serde::Deserializer<'de>, { - as ::serde::Deserialize>::deserialize( - deserializer, - ) - .map(|opt| opt.map(|val| val as i32)) + unsafe { + ::core::intrinsics::transmute_unchecked( + as ::serde::Deserialize>::deserialize( + deserializer, + ), + ) + } } } } @@ -845,7 +848,7 @@ pub mod cpp_config_response { as ::serde::Serialize>::serialize( &value .iter() - .map(|val| super::Heuristic::try_from(*val).unwrap_or_default()) + .map(|&val| super::Heuristic::try_from(val).unwrap_or_default()) .collect(), serializer, ) diff --git a/src/core/aiserver/v1/lite.proto b/src/core/aiserver/v1/lite.proto index c5554ea..4400adb 100644 --- a/src/core/aiserver/v1/lite.proto +++ b/src/core/aiserver/v1/lite.proto @@ -1,4 +1,7 @@ syntax = "proto3"; +// @version: 1.4.4-1.4.5 +// @author: wisdgod +// @license: MIT OR Apache-2.0 package aiserver.v1; import "google/protobuf/timestamp.proto"; message CursorPosition { // .aiserver.v1.CursorPosition diff --git a/src/core/stream/decoder/utils.rs b/src/core/stream/decoder/utils.rs index fdb8624..2b0a85a 100644 --- a/src/core/stream/decoder/utils.rs +++ b/src/core/stream/decoder/utils.rs @@ -16,7 +16,7 @@ trait StringFrom: Sized { impl StringFrom for &[u8] { #[inline(always)] - fn as_bytes(&self) -> &[u8] { *self } + fn as_bytes(&self) -> &[u8] { self } #[inline(always)] fn into_vec(self) -> Vec { self.to_vec() } }