diff --git a/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py b/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py index 61a4fa10b..d97ea88fb 100644 --- a/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py +++ b/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py @@ -45,6 +45,7 @@ class IPCConnector: self.local_gpu_id = int(local_gpu_id_) tmp = paddle.ones([1, 1]) logger.info(f"init ipc rank{self.rank_id} with remote {self.remote_gpu_id} {self.local_gpu_id}") + paddle.set_device(f"gpu:{self.local_gpu_id}") for layer_id in range(layer_num): key_unique_name = f"key_caches_{layer_id}_rank{self.rank_id}.device{self.remote_gpu_id}" value_unique_name = f"value_caches_{layer_id}_rank{self.rank_id}.device{self.remote_gpu_id}" diff --git a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_connection.h b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_connection.h index 596e3b2e6..a63eb0c59 100644 --- a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_connection.h +++ b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_connection.h @@ -73,12 +73,13 @@ struct IbDeviceInfo { int realPort; int maxQp; }; - /// @brief Queue Pair information for RDMA struct QpInfo { uint32_t lid; uint32_t qpn; uint32_t psn; + uint8_t sl; // Service Level for IB networks + uint8_t path_bits; // Path Bits for IB networks union ibv_gid gid; enum ibv_mtu mtu; @@ -88,7 +89,10 @@ struct QpInfo { intBuffer[0] = htonl(lid); intBuffer[1] = htonl(qpn); intBuffer[2] = htonl(psn); - memcpy(buffer + 12, gid.raw, sizeof(gid.raw)); + // Pack SL and Path Bits into the 4th uint32_t + uint32_t sl_path = (static_cast(sl) << 8) | static_cast(path_bits); + intBuffer[3] = htonl(sl_path); + memcpy(buffer + 16, gid.raw, sizeof(gid.raw)); intBuffer[7] = htonl(static_cast(mtu)); } @@ -98,11 +102,14 @@ struct QpInfo { lid = ntohl(intBuffer[0]); qpn = ntohl(intBuffer[1]); psn = ntohl(intBuffer[2]); - memcpy(gid.raw, buffer + 12, sizeof(gid.raw)); + uint32_t sl_path = ntohl(intBuffer[3]); + sl = static_cast((sl_path >> 8) & 0xFF); + path_bits = static_cast(sl_path & 0xFF); + memcpy(gid.raw, buffer + 16, sizeof(gid.raw)); mtu = static_cast(ntohl(intBuffer[7])); } - static const size_t size = 12 + sizeof(gid.raw) + 4; + static const size_t size = 16 + sizeof(gid.raw) + 4; }; /// @brief RDMA connection context diff --git a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/util.h b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/util.h index c040b2a62..2663c2bec 100644 --- a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/util.h +++ b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/util.h @@ -156,6 +156,8 @@ private: const char* error_file_path_; bool relax_ordering_enabled_; int ib_timeout_; + int ib_service_level_; + int ib_src_path_bits_; const char* rdma_nics_; // Private constructor for singleton pattern @@ -213,6 +215,18 @@ private: "KVCACHE_IB_TIMEOUT" ); + ib_service_level_ = parse_int_value( + std::getenv("KVCACHE_IB_SERVICE_LEVEL"), + 0, + "KVCACHE_IB_SERVICE_LEVEL" + ); + + ib_src_path_bits_ = parse_int_value( + std::getenv("KVCACHE_IB_SRC_PATH_BITS"), + 0, + "KVCACHE_IB_SRC_PATH_BITS" + ); + rdma_nics_ = std::getenv("KVCACHE_RDMA_NICS"); } @@ -255,6 +269,8 @@ public: } int get_ib_timeout() const { return ib_timeout_; } + int get_ib_service_level() const { return ib_service_level_; } + int get_ib_src_path_bits() const { return ib_src_path_bits_; } // Configuration retrieval methods int get_rdma_gid_index() const { return rdma_gid_index_; } diff --git a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_connection.cpp b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_connection.cpp index 6bb4e43a9..b7dc73b98 100644 --- a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_connection.cpp +++ b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_connection.cpp @@ -169,8 +169,12 @@ int parse_port_ib_info() { dev_info.maxQp = dev_attr.max_qp; strncpy(dev_info.devName, dev_name, MAXNAMESIZE); - INFO("Adding device %s port %d (%s)", dev_name, port_num, - port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); + if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { + INFO("Adding IB device %s port %d (LID:0x%x Rate:%dGbps)", + dev_name, port_num, port_attr.lid, port_attr.active_speed/10); + } else { + INFO("Adding RoCE device %s port %d", dev_name, port_num); + } g_ib_all_devs.push_back(dev_info); ++g_kvcache_ib_dev_nums; @@ -304,14 +308,24 @@ QpStatus modify_qp_to_rts( attr.max_dest_rd_atomic = 1; attr.min_rnr_timer = 12; - attr.ah_attr.is_global = 1; - attr.ah_attr.grh.hop_limit = 255; - attr.ah_attr.grh.flow_label = 0; - attr.ah_attr.grh.traffic_class = 0; - attr.ah_attr.grh.dgid.global.subnet_prefix = (dest->gid.global.subnet_prefix); - attr.ah_attr.grh.dgid.global.interface_id = (dest->gid.global.interface_id); - attr.ah_attr.grh.sgid_index = sgid_id; +bool use_grh = (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET); + if (use_grh) { + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.hop_limit = 255; + attr.ah_attr.grh.flow_label = 0; + attr.ah_attr.grh.traffic_class = 0; + attr.ah_attr.grh.dgid.global.subnet_prefix = (dest->gid.global.subnet_prefix); + attr.ah_attr.grh.dgid.global.interface_id = (dest->gid.global.interface_id); + attr.ah_attr.grh.sgid_index = sgid_id; + } else { + attr.ah_attr.is_global = 0; + attr.ah_attr.dlid = dest->lid; + attr.ah_attr.sl = KVCacheConfig::getInstance().get_ib_service_level(); // 从配置获取服务级别 + if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { + attr.ah_attr.src_path_bits = KVCacheConfig::getInstance().get_ib_src_path_bits(); // IB特定路径位 + } + } attr.ah_attr.src_path_bits = 0; attr.ah_attr.port_num = port; @@ -602,11 +616,17 @@ bool client_exchange_destinations( my_dest.lid = ctx->portinfo.lid; my_dest.mtu = ctx->portinfo.active_mtu; + my_dest.sl = KVCacheConfig::getInstance().get_ib_service_level(); + my_dest.path_bits = KVCacheConfig::getInstance().get_ib_src_path_bits(); // Validate LID for InfiniBand - if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !my_dest.lid) { - ERR("Invalid LID 0x%04x for non-Ethernet link layer", my_dest.lid); - return false; + if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET) { + if (!my_dest.lid) { + ERR("Invalid LID 0x%04x for IB network", my_dest.lid); + return false; + } + LOGD("IB network detected - LID:0x%04x SL:%d PathBits:%d", + my_dest.lid, my_dest.sl, my_dest.path_bits); } // Get GID if specified