[Feature] support rdma IB transfer (#4123)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled

* Update serving_chat.py

* Update serving_completion.py

* Update serving_completion.py

* mv connection_manager init

* [BugFix] fix kv cache

* fix format

---------

Co-authored-by: Yuanle Liu <yuanlehome@163.com>
This commit is contained in:
ltd0924
2025-09-19 12:54:49 +08:00
committed by GitHub
parent 4f460db556
commit bba279cf38
4 changed files with 60 additions and 16 deletions

View File

@@ -45,6 +45,7 @@ class IPCConnector:
self.local_gpu_id = int(local_gpu_id_)
tmp = paddle.ones([1, 1])
logger.info(f"init ipc rank{self.rank_id} with remote {self.remote_gpu_id} {self.local_gpu_id}")
paddle.set_device(f"gpu:{self.local_gpu_id}")
for layer_id in range(layer_num):
key_unique_name = f"key_caches_{layer_id}_rank{self.rank_id}.device{self.remote_gpu_id}"
value_unique_name = f"value_caches_{layer_id}_rank{self.rank_id}.device{self.remote_gpu_id}"

View File

@@ -73,12 +73,13 @@ struct IbDeviceInfo {
int realPort;
int maxQp;
};
/// @brief Queue Pair information for RDMA
struct QpInfo {
uint32_t lid;
uint32_t qpn;
uint32_t psn;
uint8_t sl; // Service Level for IB networks
uint8_t path_bits; // Path Bits for IB networks
union ibv_gid gid;
enum ibv_mtu mtu;
@@ -88,7 +89,10 @@ struct QpInfo {
intBuffer[0] = htonl(lid);
intBuffer[1] = htonl(qpn);
intBuffer[2] = htonl(psn);
memcpy(buffer + 12, gid.raw, sizeof(gid.raw));
// Pack SL and Path Bits into the 4th uint32_t
uint32_t sl_path = (static_cast<uint32_t>(sl) << 8) | static_cast<uint32_t>(path_bits);
intBuffer[3] = htonl(sl_path);
memcpy(buffer + 16, gid.raw, sizeof(gid.raw));
intBuffer[7] = htonl(static_cast<uint32_t>(mtu));
}
@@ -98,11 +102,14 @@ struct QpInfo {
lid = ntohl(intBuffer[0]);
qpn = ntohl(intBuffer[1]);
psn = ntohl(intBuffer[2]);
memcpy(gid.raw, buffer + 12, sizeof(gid.raw));
uint32_t sl_path = ntohl(intBuffer[3]);
sl = static_cast<uint8_t>((sl_path >> 8) & 0xFF);
path_bits = static_cast<uint8_t>(sl_path & 0xFF);
memcpy(gid.raw, buffer + 16, sizeof(gid.raw));
mtu = static_cast<ibv_mtu>(ntohl(intBuffer[7]));
}
static const size_t size = 12 + sizeof(gid.raw) + 4;
static const size_t size = 16 + sizeof(gid.raw) + 4;
};
/// @brief RDMA connection context

View File

@@ -156,6 +156,8 @@ private:
const char* error_file_path_;
bool relax_ordering_enabled_;
int ib_timeout_;
int ib_service_level_;
int ib_src_path_bits_;
const char* rdma_nics_;
// Private constructor for singleton pattern
@@ -213,6 +215,18 @@ private:
"KVCACHE_IB_TIMEOUT"
);
ib_service_level_ = parse_int_value(
std::getenv("KVCACHE_IB_SERVICE_LEVEL"),
0,
"KVCACHE_IB_SERVICE_LEVEL"
);
ib_src_path_bits_ = parse_int_value(
std::getenv("KVCACHE_IB_SRC_PATH_BITS"),
0,
"KVCACHE_IB_SRC_PATH_BITS"
);
rdma_nics_ = std::getenv("KVCACHE_RDMA_NICS");
}
@@ -255,6 +269,8 @@ public:
}
int get_ib_timeout() const { return ib_timeout_; }
int get_ib_service_level() const { return ib_service_level_; }
int get_ib_src_path_bits() const { return ib_src_path_bits_; }
// Configuration retrieval methods
int get_rdma_gid_index() const { return rdma_gid_index_; }

View File

@@ -169,8 +169,12 @@ int parse_port_ib_info() {
dev_info.maxQp = dev_attr.max_qp;
strncpy(dev_info.devName, dev_name, MAXNAMESIZE);
INFO("Adding device %s port %d (%s)", dev_name, port_num,
port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
INFO("Adding IB device %s port %d (LID:0x%x Rate:%dGbps)",
dev_name, port_num, port_attr.lid, port_attr.active_speed/10);
} else {
INFO("Adding RoCE device %s port %d", dev_name, port_num);
}
g_ib_all_devs.push_back(dev_info);
++g_kvcache_ib_dev_nums;
@@ -304,6 +308,9 @@ QpStatus modify_qp_to_rts(
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 12;
bool use_grh = (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET);
if (use_grh) {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.hop_limit = 255;
attr.ah_attr.grh.flow_label = 0;
@@ -311,7 +318,14 @@ QpStatus modify_qp_to_rts(
attr.ah_attr.grh.dgid.global.subnet_prefix = (dest->gid.global.subnet_prefix);
attr.ah_attr.grh.dgid.global.interface_id = (dest->gid.global.interface_id);
attr.ah_attr.grh.sgid_index = sgid_id;
} else {
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = dest->lid;
attr.ah_attr.sl = KVCacheConfig::getInstance().get_ib_service_level(); // 从配置获取服务级别
if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
attr.ah_attr.src_path_bits = KVCacheConfig::getInstance().get_ib_src_path_bits(); // IB特定路径位
}
}
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = port;
@@ -602,12 +616,18 @@ bool client_exchange_destinations(
my_dest.lid = ctx->portinfo.lid;
my_dest.mtu = ctx->portinfo.active_mtu;
my_dest.sl = KVCacheConfig::getInstance().get_ib_service_level();
my_dest.path_bits = KVCacheConfig::getInstance().get_ib_src_path_bits();
// Validate LID for InfiniBand
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !my_dest.lid) {
ERR("Invalid LID 0x%04x for non-Ethernet link layer", my_dest.lid);
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET) {
if (!my_dest.lid) {
ERR("Invalid LID 0x%04x for IB network", my_dest.lid);
return false;
}
LOGD("IB network detected - LID:0x%04x SL:%d PathBits:%d",
my_dest.lid, my_dest.sl, my_dest.path_bits);
}
// Get GID if specified
if (gidx >= 0) {