mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
c++ code format (#4527)
This commit is contained in:
@@ -22,32 +22,25 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <rdma/rdma_cma.h>
|
||||
#include <rdma/rdma_verbs.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <netinet/tcp.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sstream>
|
||||
#include <netdb.h>
|
||||
#include <sstream>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <sys/socket.h>
|
||||
#include <cstring>
|
||||
#include <netdb.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <fcntl.h>
|
||||
#include <net/if.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <netdb.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <rdma/rdma_cma.h>
|
||||
#include <rdma/rdma_verbs.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "kvcache_rdma.h"
|
||||
#include "util.h"
|
||||
@@ -60,115 +53,115 @@
|
||||
|
||||
/// @brief IB device information structure
|
||||
struct IbDeviceInfo {
|
||||
int device;
|
||||
uint64_t guid;
|
||||
enum ibv_mtu mtu;
|
||||
uint64_t busid;
|
||||
uint8_t port;
|
||||
uint8_t link;
|
||||
uint8_t active_mtu;
|
||||
int speed;
|
||||
ibv_context* context;
|
||||
char devName[64];
|
||||
int realPort;
|
||||
int maxQp;
|
||||
int device;
|
||||
uint64_t guid;
|
||||
enum ibv_mtu mtu;
|
||||
uint64_t busid;
|
||||
uint8_t port;
|
||||
uint8_t link;
|
||||
uint8_t active_mtu;
|
||||
int speed;
|
||||
ibv_context* context;
|
||||
char devName[64];
|
||||
int realPort;
|
||||
int maxQp;
|
||||
};
|
||||
|
||||
/// @brief Queue Pair information for RDMA
|
||||
struct QpInfo {
|
||||
uint32_t lid;
|
||||
uint32_t qpn;
|
||||
uint32_t psn;
|
||||
union ibv_gid gid;
|
||||
enum ibv_mtu mtu;
|
||||
uint32_t lid;
|
||||
uint32_t qpn;
|
||||
uint32_t psn;
|
||||
union ibv_gid gid;
|
||||
enum ibv_mtu mtu;
|
||||
|
||||
/// @brief Serialize QP info to buffer
|
||||
void serialize(char* buffer) const {
|
||||
uint32_t* intBuffer = reinterpret_cast<uint32_t*>(buffer);
|
||||
intBuffer[0] = htonl(lid);
|
||||
intBuffer[1] = htonl(qpn);
|
||||
intBuffer[2] = htonl(psn);
|
||||
memcpy(buffer + 12, gid.raw, sizeof(gid.raw));
|
||||
intBuffer[7] = htonl(static_cast<uint32_t>(mtu));
|
||||
}
|
||||
/// @brief Serialize QP info to buffer
|
||||
void serialize(char* buffer) const {
|
||||
uint32_t* intBuffer = reinterpret_cast<uint32_t*>(buffer);
|
||||
intBuffer[0] = htonl(lid);
|
||||
intBuffer[1] = htonl(qpn);
|
||||
intBuffer[2] = htonl(psn);
|
||||
memcpy(buffer + 12, gid.raw, sizeof(gid.raw));
|
||||
intBuffer[7] = htonl(static_cast<uint32_t>(mtu));
|
||||
}
|
||||
|
||||
/// @brief Deserialize QP info from buffer
|
||||
void deserialize(const char* buffer) {
|
||||
const uint32_t* intBuffer = reinterpret_cast<const uint32_t*>(buffer);
|
||||
lid = ntohl(intBuffer[0]);
|
||||
qpn = ntohl(intBuffer[1]);
|
||||
psn = ntohl(intBuffer[2]);
|
||||
memcpy(gid.raw, buffer + 12, sizeof(gid.raw));
|
||||
mtu = static_cast<ibv_mtu>(ntohl(intBuffer[7]));
|
||||
}
|
||||
/// @brief Deserialize QP info from buffer
|
||||
void deserialize(const char* buffer) {
|
||||
const uint32_t* intBuffer = reinterpret_cast<const uint32_t*>(buffer);
|
||||
lid = ntohl(intBuffer[0]);
|
||||
qpn = ntohl(intBuffer[1]);
|
||||
psn = ntohl(intBuffer[2]);
|
||||
memcpy(gid.raw, buffer + 12, sizeof(gid.raw));
|
||||
mtu = static_cast<ibv_mtu>(ntohl(intBuffer[7]));
|
||||
}
|
||||
|
||||
static const size_t size = 12 + sizeof(gid.raw) + 4;
|
||||
static const size_t size = 12 + sizeof(gid.raw) + 4;
|
||||
};
|
||||
|
||||
/// @brief RDMA connection context
|
||||
struct Connection {
|
||||
std::atomic<int> connected;
|
||||
std::atomic<int> connected;
|
||||
|
||||
// Memory regions
|
||||
struct ibv_mr *recv_mr;
|
||||
struct ibv_mr *send_mr;
|
||||
// Memory regions
|
||||
struct ibv_mr* recv_mr;
|
||||
struct ibv_mr* send_mr;
|
||||
|
||||
// Cache pointers
|
||||
std::vector<std::vector<void*>> local_cache_key_ptr_per_layer;
|
||||
std::vector<std::vector<void*>> local_cache_value_ptr_per_layer;
|
||||
// Cache pointers
|
||||
std::vector<std::vector<void*>> local_cache_key_ptr_per_layer;
|
||||
std::vector<std::vector<void*>> local_cache_value_ptr_per_layer;
|
||||
|
||||
// Memory region lists
|
||||
std::vector<ibv_mr*> write_cache_key_server_mr_list;
|
||||
std::vector<ibv_mr*> write_cache_value_server_mr_list;
|
||||
std::vector<std::vector<ibv_mr*>> write_mr_key_list;
|
||||
std::vector<std::vector<ibv_mr*>> write_mr_value_list;
|
||||
// Memory region lists
|
||||
std::vector<ibv_mr*> write_cache_key_server_mr_list;
|
||||
std::vector<ibv_mr*> write_cache_value_server_mr_list;
|
||||
std::vector<std::vector<ibv_mr*>> write_mr_key_list;
|
||||
std::vector<std::vector<ibv_mr*>> write_mr_value_list;
|
||||
|
||||
// Remote access information
|
||||
std::vector<void*> write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> write_cache_key_remote_rkey_list;
|
||||
std::vector<void*> write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> write_cache_value_remote_rkey_list;
|
||||
// Remote access information
|
||||
std::vector<void*> write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> write_cache_key_remote_rkey_list;
|
||||
std::vector<void*> write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> write_cache_value_remote_rkey_list;
|
||||
|
||||
// Received remote memory information
|
||||
std::vector<void*> receive_write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> receive_write_cache_key_remote_rkey_list;
|
||||
std::vector<void*> receive_write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> receive_write_cache_value_remote_rkey_list;
|
||||
// Received remote memory information
|
||||
std::vector<void*> receive_write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> receive_write_cache_key_remote_rkey_list;
|
||||
std::vector<void*> receive_write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> receive_write_cache_value_remote_rkey_list;
|
||||
|
||||
std::vector<void *> send_write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> send_write_cache_key_remote_rkey_list;
|
||||
std::vector<void *> send_write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> send_write_cache_value_remote_rkey_list;
|
||||
std::vector<void*> send_write_cache_key_remote_ptr_list;
|
||||
std::vector<uint32_t> send_write_cache_key_remote_rkey_list;
|
||||
std::vector<void*> send_write_cache_value_remote_ptr_list;
|
||||
std::vector<uint32_t> send_write_cache_value_remote_rkey_list;
|
||||
|
||||
// For rdma read operations
|
||||
std::vector<void*> read_bufs;
|
||||
std::vector<ibv_mr*> read_mrs;
|
||||
// For rdma read operations
|
||||
std::vector<void*> read_bufs;
|
||||
std::vector<ibv_mr*> read_mrs;
|
||||
|
||||
// Work completion tracking
|
||||
int wc_count;
|
||||
int wc_target_count;
|
||||
// Work completion tracking
|
||||
int wc_count;
|
||||
int wc_target_count;
|
||||
|
||||
// Configuration
|
||||
int layer_number;
|
||||
int block_number;
|
||||
int block_byte_size;
|
||||
std::string url;
|
||||
// Configuration
|
||||
int layer_number;
|
||||
int block_number;
|
||||
int block_byte_size;
|
||||
std::string url;
|
||||
|
||||
Connection() = default;
|
||||
~Connection();
|
||||
Connection() = default;
|
||||
~Connection();
|
||||
};
|
||||
|
||||
/// @brief RDMA context structure
|
||||
struct RdmaContext {
|
||||
int sock_fd;
|
||||
struct ibv_context* context;
|
||||
struct ibv_comp_channel* channel;
|
||||
struct ibv_pd* pd;
|
||||
struct ibv_mr* mr;
|
||||
struct ibv_cq* cq;
|
||||
struct ibv_qp* qp;
|
||||
struct ibv_port_attr portinfo;
|
||||
struct Connection conn;
|
||||
int sock_fd;
|
||||
struct ibv_context* context;
|
||||
struct ibv_comp_channel* channel;
|
||||
struct ibv_pd* pd;
|
||||
struct ibv_mr* mr;
|
||||
struct ibv_cq* cq;
|
||||
struct ibv_qp* qp;
|
||||
struct ibv_port_attr portinfo;
|
||||
struct Connection conn;
|
||||
};
|
||||
|
||||
// Global variables
|
||||
@@ -176,36 +169,46 @@ extern std::vector<IbDeviceInfo> g_ib_all_devs;
|
||||
static int g_kvcache_ib_dev_nums = -1;
|
||||
|
||||
// Connection management functions
|
||||
bool client_exchange_destinations(
|
||||
struct RdmaContext* ctx,
|
||||
int ib_port,
|
||||
unsigned int port,
|
||||
int gidx,
|
||||
const std::string& dst_ip);
|
||||
bool client_exchange_destinations(struct RdmaContext* ctx,
|
||||
int ib_port,
|
||||
unsigned int port,
|
||||
int gidx,
|
||||
const std::string& dst_ip);
|
||||
|
||||
int server_exchange_qp_info(int connfd, QpInfo* local_dest, QpInfo* rem_dest);
|
||||
struct RdmaContext* create_qp(struct IbDeviceInfo* ib_dev, struct ibv_pd** g_pd);
|
||||
struct RdmaContext* create_qp(struct IbDeviceInfo* ib_dev,
|
||||
struct ibv_pd** g_pd);
|
||||
bool clear_qp_info(struct RdmaContext* ctx);
|
||||
|
||||
// QP modification functions
|
||||
QpStatus modify_qp_to_rts(struct RdmaContext* ctx, int port, int my_psn,
|
||||
struct QpInfo* dest, int sgid_id);
|
||||
bool poll_cq_with_timeout(struct RdmaContext* ctx, int timeout_seconds, int cqe_count);
|
||||
QpStatus modify_qp_to_rts(struct RdmaContext* ctx,
|
||||
int port,
|
||||
int my_psn,
|
||||
struct QpInfo* dest,
|
||||
int sgid_id);
|
||||
bool poll_cq_with_timeout(struct RdmaContext* ctx,
|
||||
int timeout_seconds,
|
||||
int cqe_count);
|
||||
|
||||
// Utility functions
|
||||
int get_port_info(struct ibv_context* Context, int port,
|
||||
struct ibv_port_attr* attr);
|
||||
int get_port_info(struct ibv_context* Context,
|
||||
int port,
|
||||
struct ibv_port_attr* attr);
|
||||
int parse_port_ib_info();
|
||||
|
||||
// Memory region exchange
|
||||
bool client_exchange_mr(struct RdmaContext* ctx);
|
||||
bool server_exchange_mr(struct RdmaContext* ctx);
|
||||
bool server_send_memory_region(struct RdmaContext *ctx, void *local_mr, int byte_num);
|
||||
bool client_receive_memory_region(struct RdmaContext *ctx, void *remote_mr, int byte_num);
|
||||
bool server_send_memory_region(struct RdmaContext* ctx,
|
||||
void* local_mr,
|
||||
int byte_num);
|
||||
bool client_receive_memory_region(struct RdmaContext* ctx,
|
||||
void* remote_mr,
|
||||
int byte_num);
|
||||
|
||||
// Network setup
|
||||
int setup_listening_socket(int port);
|
||||
int configure_epoll(int sockfd);
|
||||
std::vector<std::string> get_net_ifname();
|
||||
|
||||
#endif // FASTDEPLOY_KVCACHE_CONNECTION_H
|
||||
#endif // FASTDEPLOY_KVCACHE_CONNECTION_H
|
||||
|
||||
@@ -4,77 +4,88 @@
|
||||
#pragma once
|
||||
|
||||
#include <rdma/rdma_cma.h>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include "util.h" // Contains constant definitions
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "kvcache_connection.h"
|
||||
#include "log.h"
|
||||
|
||||
#include "util.h" // Contains constant definitions
|
||||
|
||||
/**
|
||||
* @brief RDMA communication handler for key-value cache
|
||||
*/
|
||||
class RDMACommunicator {
|
||||
public:
|
||||
// Construction/Destruction
|
||||
RDMACommunicator(std::string &role, int gpu_idx, std::string &port,
|
||||
std::vector<int64_t> local_key_cache,
|
||||
std::vector<int64_t> local_value_cache,
|
||||
int block_number, int block_bytes);
|
||||
~RDMACommunicator();
|
||||
public:
|
||||
// Construction/Destruction
|
||||
RDMACommunicator(std::string& role,
|
||||
int gpu_idx,
|
||||
std::string& port,
|
||||
std::vector<int64_t> local_key_cache,
|
||||
std::vector<int64_t> local_value_cache,
|
||||
int block_number,
|
||||
int block_bytes);
|
||||
~RDMACommunicator();
|
||||
|
||||
// Connection management
|
||||
int connect(const std::string &dst_ip, const std::string &dst_port);
|
||||
bool is_connected(const std::string &dst_ip, const std::string &dst_port);
|
||||
// Connection management
|
||||
int connect(const std::string& dst_ip, const std::string& dst_port);
|
||||
bool is_connected(const std::string& dst_ip, const std::string& dst_port);
|
||||
|
||||
// Core functionality
|
||||
int write_cache(const std::string &ip, const std::string &port,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
const std::vector<int64_t>& remote_block_ids,
|
||||
int32_t layer_idx);
|
||||
// Core functionality
|
||||
int write_cache(const std::string& ip,
|
||||
const std::string& port,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
const std::vector<int64_t>& remote_block_ids,
|
||||
int32_t layer_idx);
|
||||
|
||||
// Server Init
|
||||
int init_server();
|
||||
// Server Init
|
||||
int init_server();
|
||||
|
||||
// get socket nic ip
|
||||
std::string fetch_local_ip();
|
||||
// get socket nic ip
|
||||
std::string fetch_local_ip();
|
||||
|
||||
private:
|
||||
// Server Core functions
|
||||
int start_server(int sport, int sgid_idx, int gpu_index);
|
||||
private:
|
||||
// Server Core functions
|
||||
int start_server(int sport, int sgid_idx, int gpu_index);
|
||||
|
||||
// Internal implementation methods
|
||||
void resize_vectors();
|
||||
void assign_pointers();
|
||||
void validate_addr();
|
||||
bool client_mr_register_per_layer(struct RdmaContext *ctx);
|
||||
bool server_mr_register_per_layer(struct RdmaContext *ctx);
|
||||
struct ibv_mr* register_memory_region(ibv_pd* pd, void* addr, size_t size,
|
||||
const std::string& desc, uint32_t access_flags);
|
||||
bool deregister_memory_regions(struct RdmaContext* ctx);
|
||||
// Internal implementation methods
|
||||
void resize_vectors();
|
||||
void assign_pointers();
|
||||
void validate_addr();
|
||||
bool client_mr_register_per_layer(struct RdmaContext* ctx);
|
||||
bool server_mr_register_per_layer(struct RdmaContext* ctx);
|
||||
struct ibv_mr* register_memory_region(ibv_pd* pd,
|
||||
void* addr,
|
||||
size_t size,
|
||||
const std::string& desc,
|
||||
uint32_t access_flags);
|
||||
bool deregister_memory_regions(struct RdmaContext* ctx);
|
||||
|
||||
bool post_block_send(struct RdmaContext* ctx, int layer_idx,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
bool is_key, std::vector<uint64_t>& remote_addr,
|
||||
uint32_t rkey, const std::string &ip,
|
||||
const std::string &port);
|
||||
bool post_block_send(struct RdmaContext* ctx,
|
||||
int layer_idx,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
bool is_key,
|
||||
std::vector<uint64_t>& remote_addr,
|
||||
uint32_t rkey,
|
||||
const std::string& ip,
|
||||
const std::string& port);
|
||||
|
||||
bool execute_rdma_writes(struct RdmaContext* ctx, int layer_idx,
|
||||
bool execute_rdma_writes(struct RdmaContext* ctx,
|
||||
int layer_idx,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
bool is_key, std::vector<uint64_t>& remote_addr,
|
||||
bool is_key,
|
||||
std::vector<uint64_t>& remote_addr,
|
||||
uint32_t rkey);
|
||||
|
||||
void prepare_write_requests(struct ibv_sge* sge_list,
|
||||
struct ibv_send_wr* send_wr_list,
|
||||
int layer_idx,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
bool is_key,
|
||||
std::vector<uint64_t>& remote_addr,
|
||||
uint32_t rkey);
|
||||
void prepare_write_requests(struct ibv_sge* sge_list,
|
||||
struct ibv_send_wr* send_wr_list,
|
||||
int layer_idx,
|
||||
const std::vector<int64_t>& local_block_ids,
|
||||
bool is_key,
|
||||
std::vector<uint64_t>& remote_addr,
|
||||
uint32_t rkey);
|
||||
|
||||
bool execute_read_verification(struct RdmaContext* ctx,
|
||||
bool execute_read_verification(struct RdmaContext* ctx,
|
||||
size_t block_idx,
|
||||
uint64_t remote_addr,
|
||||
uint32_t rkey,
|
||||
@@ -82,46 +93,56 @@ private:
|
||||
const std::string& ip,
|
||||
const std::string& port);
|
||||
|
||||
bool post_send_with_retry(struct RdmaContext* ctx,
|
||||
bool post_send_with_retry(struct RdmaContext* ctx,
|
||||
struct ibv_send_wr* wr_list,
|
||||
size_t inflight_wr,
|
||||
bool need_poll);
|
||||
|
||||
// Connection management
|
||||
int client_listener();
|
||||
void close_server_connection(int fd, struct RdmaContext* ctx, int epollfd,
|
||||
std::map<int, struct RdmaContext*>& connectionContexts);
|
||||
void close_client_connection(int fd, struct RdmaContext* ctx, int epollfd);
|
||||
// Connection management
|
||||
int client_listener();
|
||||
void close_server_connection(
|
||||
int fd,
|
||||
struct RdmaContext* ctx,
|
||||
int epollfd,
|
||||
std::map<int, struct RdmaContext*>& connectionContexts);
|
||||
void close_client_connection(int fd, struct RdmaContext* ctx, int epollfd);
|
||||
|
||||
void remove_conn(const std::string& url);
|
||||
struct RdmaContext *get_conn(const std::string &ip,
|
||||
const std::string &port);
|
||||
void remove_conn(const std::string& url);
|
||||
struct RdmaContext* get_conn(const std::string& ip, const std::string& port);
|
||||
|
||||
// Member variables
|
||||
std::string splitwise_role; // Role in distributed system ("decode" or other)
|
||||
int gpu_idx; // GPU device index
|
||||
std::string port; // Communication port
|
||||
std::vector<int64_t> local_cache_key_ptr_layer_head_; // Key cache pointers
|
||||
std::vector<int64_t> local_cache_value_ptr_layer_head_; // Value cache pointers
|
||||
int block_number; // Number of blocks
|
||||
int block_size_byte; // Size of each block in bytes
|
||||
int layer_number; // Number of layers
|
||||
// Member variables
|
||||
std::string splitwise_role; // Role in distributed system ("decode" or other)
|
||||
int gpu_idx; // GPU device index
|
||||
std::string port; // Communication port
|
||||
std::vector<int64_t> local_cache_key_ptr_layer_head_; // Key cache pointers
|
||||
std::vector<int64_t>
|
||||
local_cache_value_ptr_layer_head_; // Value cache pointers
|
||||
int block_number; // Number of blocks
|
||||
int block_size_byte; // Size of each block in bytes
|
||||
int layer_number; // Number of layers
|
||||
|
||||
std::vector<std::vector<void*>> local_cache_key_ptr_per_layer; // Per-layer key pointers
|
||||
std::vector<std::vector<void*>> local_cache_value_ptr_per_layer; // Per-layer value pointers
|
||||
std::vector<std::vector<void*>>
|
||||
local_cache_key_ptr_per_layer; // Per-layer key pointers
|
||||
std::vector<std::vector<void*>>
|
||||
local_cache_value_ptr_per_layer; // Per-layer value pointers
|
||||
|
||||
std::vector<struct ibv_mr*> write_mr_key_list; // Memory regions for key writes
|
||||
std::vector<struct ibv_mr*> write_mr_value_list; // Memory regions for value writes
|
||||
std::vector<struct ibv_mr*> write_cache_key_server_mr_list; // Server-side key memory regions
|
||||
std::vector<struct ibv_mr*> write_cache_value_server_mr_list; // Server-side value memory regions
|
||||
std::vector<struct ibv_mr*>
|
||||
write_mr_key_list; // Memory regions for key writes
|
||||
std::vector<struct ibv_mr*>
|
||||
write_mr_value_list; // Memory regions for value writes
|
||||
std::vector<struct ibv_mr*>
|
||||
write_cache_key_server_mr_list; // Server-side key memory regions
|
||||
std::vector<struct ibv_mr*>
|
||||
write_cache_value_server_mr_list; // Server-side value memory regions
|
||||
|
||||
std::vector<std::string> main_ip_list; // List of local IP addresses
|
||||
std::map<std::string, struct RdmaContext*> conn_map; // Active connections map
|
||||
std::mutex mutex_; // Thread synchronization mutex
|
||||
int rdma_event_channel_epoll_fd; // Epoll file descriptor
|
||||
struct ibv_pd *g_pd = NULL; // fd
|
||||
int RDMACommunicator_status; // Communicator status flag
|
||||
bool start_client_listener = false; // Client listener flag
|
||||
std::vector<std::string> main_ip_list; // List of local IP addresses
|
||||
std::map<std::string, struct RdmaContext*>
|
||||
conn_map; // Active connections map
|
||||
std::mutex mutex_; // Thread synchronization mutex
|
||||
int rdma_event_channel_epoll_fd; // Epoll file descriptor
|
||||
struct ibv_pd* g_pd = NULL; // fd
|
||||
int RDMACommunicator_status; // Communicator status flag
|
||||
bool start_client_listener = false; // Client listener flag
|
||||
};
|
||||
|
||||
#endif // KVCACHE_RDMA_H
|
||||
#endif // KVCACHE_RDMA_H
|
||||
|
||||
@@ -19,99 +19,130 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h> //for gethostname
|
||||
#include <sys/syscall.h>
|
||||
#include <pthread.h>
|
||||
#include <string>
|
||||
#include <ctime>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h> //for gethostname
|
||||
#include <chrono>
|
||||
#include <ctime>
|
||||
#include <string>
|
||||
|
||||
#define KV_IS_DEBUG_ENABLED (std::getenv("KVCACHE_DEBUG"))
|
||||
#define FILE_NAME(x) (strrchr(x,'/') ? strrchr(x,'/')+1 : x)
|
||||
#define FILE_NAME(x) (strrchr(x, '/') ? strrchr(x, '/') + 1 : x)
|
||||
|
||||
static thread_local char __attribute__((__unused__)) str[64];
|
||||
|
||||
// for log levels (C++ enum class style in C)
|
||||
typedef enum {
|
||||
KV_LOG_LEVEL_INFO = 0,
|
||||
KV_LOG_LEVEL_DEBUG = 1,
|
||||
KV_LOG_LEVEL_WARN = 2,
|
||||
KV_LOG_LEVEL_ERROR = 3
|
||||
KV_LOG_LEVEL_INFO = 0,
|
||||
KV_LOG_LEVEL_DEBUG = 1,
|
||||
KV_LOG_LEVEL_WARN = 2,
|
||||
KV_LOG_LEVEL_ERROR = 3
|
||||
} KVLogLevel;
|
||||
|
||||
void debug_log(KVLogLevel level, bool enable_to_terminal, const char *filefunc,
|
||||
int line, const char *fmt, ...) __attribute__ ((format (printf, 5, 6)));
|
||||
void debug_log(KVLogLevel level,
|
||||
bool enable_to_terminal,
|
||||
const char *filefunc,
|
||||
int line,
|
||||
const char *fmt,
|
||||
...) __attribute__((format(printf, 5, 6)));
|
||||
|
||||
/**
|
||||
* @brief Unified logging macro to reduce duplication and improve maintainability.
|
||||
* @brief Unified logging macro to reduce duplication and improve
|
||||
* maintainability.
|
||||
*
|
||||
* @param level Log level (e.g., INFO, DEBUG, WARN, ERR).
|
||||
* @param to_terminal If true, the log will be printed to terminal.
|
||||
* @param ... Format string and arguments (like printf).
|
||||
*/
|
||||
#define KV_LOG(level, to_terminal, ...) \
|
||||
debug_log(level, to_terminal, FILE_NAME(__FILE__), __LINE__, __VA_ARGS__)
|
||||
debug_log(level, to_terminal, FILE_NAME(__FILE__), __LINE__, __VA_ARGS__)
|
||||
|
||||
// Public logging macros with terminal output
|
||||
#define WARN(...) KV_LOG(KV_LOG_LEVEL_WARN, true, __VA_ARGS__)
|
||||
#define ERR(...) KV_LOG(KV_LOG_LEVEL_ERROR, true, __VA_ARGS__)
|
||||
#define DEBUG(...) KV_LOG(KV_LOG_LEVEL_DEBUG, true, __VA_ARGS__)
|
||||
#define INFO(...) KV_LOG(KV_LOG_LEVEL_INFO, true, __VA_ARGS__)
|
||||
#define WARN(...) KV_LOG(KV_LOG_LEVEL_WARN, true, __VA_ARGS__)
|
||||
#define ERR(...) KV_LOG(KV_LOG_LEVEL_ERROR, true, __VA_ARGS__)
|
||||
#define DEBUG(...) KV_LOG(KV_LOG_LEVEL_DEBUG, true, __VA_ARGS__)
|
||||
#define INFO(...) KV_LOG(KV_LOG_LEVEL_INFO, true, __VA_ARGS__)
|
||||
|
||||
#define gettid() ((pid_t)syscall(SYS_gettid))
|
||||
#define GET_CURRENT_TIME() do { \
|
||||
time_t timer = time(0); \
|
||||
struct tm* t = localtime(&timer); \
|
||||
char hostname[32]; \
|
||||
gethostname(hostname, 32); \
|
||||
sprintf(str, "%02d:%02d:%02d][%.32s][%d", \
|
||||
t->tm_hour, t->tm_min, t->tm_sec, hostname, gettid()); \
|
||||
} while (0)
|
||||
#define GET_CURRENT_TIME() \
|
||||
do { \
|
||||
time_t timer = time(0); \
|
||||
struct tm *t = localtime(&timer); \
|
||||
char hostname[32]; \
|
||||
gethostname(hostname, 32); \
|
||||
sprintf(str, \
|
||||
"%02d:%02d:%02d][%.32s][%d", \
|
||||
t->tm_hour, \
|
||||
t->tm_min, \
|
||||
t->tm_sec, \
|
||||
hostname, \
|
||||
gettid()); \
|
||||
} while (0)
|
||||
|
||||
#define LOGE(fmt, arg...) do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stderr, "[%s][ERR][KV_CACHE][%s:%d] " \
|
||||
fmt "\n",str, \
|
||||
FILE_NAME(__FILE__), __LINE__, ## arg); \
|
||||
} while (0)
|
||||
#define LOGE(fmt, arg...) \
|
||||
do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stderr, \
|
||||
"[%s][ERR][KV_CACHE][%s:%d] " fmt "\n", \
|
||||
str, \
|
||||
FILE_NAME(__FILE__), \
|
||||
__LINE__, \
|
||||
##arg); \
|
||||
} while (0)
|
||||
|
||||
#define LOGW(fmt, arg...) do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stderr, "[%s][WARN][KV_CACHE][%s:%d] " \
|
||||
fmt "\n",str, \
|
||||
FILE_NAME(__FILE__), __LINE__, ## arg); \
|
||||
} while (0)
|
||||
#define LOGW(fmt, arg...) \
|
||||
do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stderr, \
|
||||
"[%s][WARN][KV_CACHE][%s:%d] " fmt "\n", \
|
||||
str, \
|
||||
FILE_NAME(__FILE__), \
|
||||
__LINE__, \
|
||||
##arg); \
|
||||
} while (0)
|
||||
|
||||
#define LOGI(fmt, arg...) do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, "[%s][INFO][KV_CACHE][%s:%d] " \
|
||||
fmt "\n",str, \
|
||||
FILE_NAME(__FILE__), __LINE__, ## arg); \
|
||||
} while (0)
|
||||
#define LOGI(fmt, arg...) \
|
||||
do { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, \
|
||||
"[%s][INFO][KV_CACHE][%s:%d] " fmt "\n", \
|
||||
str, \
|
||||
FILE_NAME(__FILE__), \
|
||||
__LINE__, \
|
||||
##arg); \
|
||||
} while (0)
|
||||
|
||||
#define LOGD(fmt, arg...) do { \
|
||||
if (KV_IS_DEBUG_ENABLED) { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, "[%s][DBG][KV_CACHE][%s:%d] " \
|
||||
fmt "\n", str, \
|
||||
FILE_NAME(__FILE__), __LINE__, ## arg); \
|
||||
} \
|
||||
} while (0)
|
||||
#define LOGD(fmt, arg...) \
|
||||
do { \
|
||||
if (KV_IS_DEBUG_ENABLED) { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, \
|
||||
"[%s][DBG][KV_CACHE][%s:%d] " fmt "\n", \
|
||||
str, \
|
||||
FILE_NAME(__FILE__), \
|
||||
__LINE__, \
|
||||
##arg); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOGD_IF(cond, fmt, ...) do { \
|
||||
if ((cond)) \
|
||||
LOGD(fmt, __VA_ARGS__); \
|
||||
} while (0)
|
||||
#define LOGD_IF(cond, fmt, ...) \
|
||||
do { \
|
||||
if ((cond)) LOGD(fmt, __VA_ARGS__); \
|
||||
} while (0)
|
||||
|
||||
#define LOGD_RAW(fmt, arg...) do { \
|
||||
if (ENV_ENABLE_RAW("KV_IS_DEBUG_ENABLED")) { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, "[%s][DBG][KV_CACHE][%s:%d] " \
|
||||
fmt "\n", str, \
|
||||
FILE_NAME(__FILE__), __LINE__, ## arg); \
|
||||
} \
|
||||
} while (0)
|
||||
#define LOGD_RAW(fmt, arg...) \
|
||||
do { \
|
||||
if (ENV_ENABLE_RAW("KV_IS_DEBUG_ENABLED")) { \
|
||||
GET_CURRENT_TIME(); \
|
||||
fprintf(stdout, \
|
||||
"[%s][DBG][KV_CACHE][%s:%d] " fmt "\n", \
|
||||
str, \
|
||||
FILE_NAME(__FILE__), \
|
||||
__LINE__, \
|
||||
##arg); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
#ifndef KVCACHE_UTILS_H
|
||||
#define KVCACHE_UTILS_H
|
||||
|
||||
#include <ctime>
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <cstdlib>
|
||||
#include <algorithm>
|
||||
#include <cctype>
|
||||
#include <stdexcept>
|
||||
#include <cstdio>
|
||||
#include <arpa/inet.h>
|
||||
#include <ifaddrs.h>
|
||||
#include <net/if.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <cctype>
|
||||
#include <chrono>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "log.h"
|
||||
|
||||
#define PATH_MAX 4096 /* # chars in a path name including nul */
|
||||
@@ -28,22 +28,22 @@
|
||||
|
||||
/// @brief Connection status enumeration
|
||||
enum class ConnStatus {
|
||||
kConnected, // Connection is active
|
||||
kDisconnected, // Connection is not active
|
||||
kError, // Connection error occurred
|
||||
kTimeout, // Connection timed out
|
||||
kInvalidParameters // Invalid connection parameters
|
||||
kConnected, // Connection is active
|
||||
kDisconnected, // Connection is not active
|
||||
kError, // Connection error occurred
|
||||
kTimeout, // Connection timed out
|
||||
kInvalidParameters // Invalid connection parameters
|
||||
};
|
||||
|
||||
/// @brief Queue Pair (QP) setup result status
|
||||
enum class QpStatus {
|
||||
kSuccess, // Successfully transitioned QP to RTS
|
||||
kInvalidParameters, // ctx or dest is null
|
||||
kDeviceQueryFailed, // ibv_query_device failed
|
||||
kPortQueryFailed, // ibv_query_port failed
|
||||
kMtuMismatch, // Requested MTU exceeds active MTU
|
||||
kModifyToRTRFailed, // Failed to modify QP to RTR
|
||||
kModifyToRTSFailed // Failed to modify QP to RTS
|
||||
kSuccess, // Successfully transitioned QP to RTS
|
||||
kInvalidParameters, // ctx or dest is null
|
||||
kDeviceQueryFailed, // ibv_query_device failed
|
||||
kPortQueryFailed, // ibv_query_port failed
|
||||
kMtuMismatch, // Requested MTU exceeds active MTU
|
||||
kModifyToRTRFailed, // Failed to modify QP to RTR
|
||||
kModifyToRTSFailed // Failed to modify QP to RTS
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -51,265 +51,281 @@ enum class QpStatus {
|
||||
* @param busId PCI bus ID string (e.g. "0000:3b:00.0")
|
||||
* @param[out] id Converted numeric ID
|
||||
*/
|
||||
inline void busid_to_int64(const char *busId, int64_t *id) {
|
||||
char hexStr[17] = {0};
|
||||
int hexOffset = 0;
|
||||
inline void busid_to_int64(const char* busId, int64_t* id) {
|
||||
char hexStr[17] = {0};
|
||||
int hexOffset = 0;
|
||||
|
||||
// Filter valid hex characters
|
||||
for (int i = 0; hexOffset < sizeof(hexStr) - 1 && busId[i] != '\0'; i++) {
|
||||
char c = busId[i];
|
||||
if (c == '.' || c == ':') continue;
|
||||
// Filter valid hex characters
|
||||
for (int i = 0; hexOffset < sizeof(hexStr) - 1 && busId[i] != '\0'; i++) {
|
||||
char c = busId[i];
|
||||
if (c == '.' || c == ':') continue;
|
||||
|
||||
if ((c >= '0' && c <= '9') ||
|
||||
(c >= 'A' && c <= 'F') ||
|
||||
(c >= 'a' && c <= 'f')) {
|
||||
hexStr[hexOffset++] = c;
|
||||
}
|
||||
if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') ||
|
||||
(c >= 'a' && c <= 'f')) {
|
||||
hexStr[hexOffset++] = c;
|
||||
}
|
||||
}
|
||||
|
||||
*id = strtol(hexStr, NULL, 16);
|
||||
*id = strtol(hexStr, NULL, 16);
|
||||
}
|
||||
|
||||
class NetworkInterfaceManager {
|
||||
public:
|
||||
struct InterfaceInfo {
|
||||
std::string name;
|
||||
std::string ip;
|
||||
bool is_up;
|
||||
bool is_running;
|
||||
bool is_loopback;
|
||||
public:
|
||||
struct InterfaceInfo {
|
||||
std::string name;
|
||||
std::string ip;
|
||||
bool is_up;
|
||||
bool is_running;
|
||||
bool is_loopback;
|
||||
|
||||
bool isUsable() const {
|
||||
return is_up && is_running && !is_loopback;
|
||||
}
|
||||
};
|
||||
bool isUsable() const { return is_up && is_running && !is_loopback; }
|
||||
};
|
||||
|
||||
static std::vector<InterfaceInfo> getAllInterfaces() {
|
||||
std::vector<InterfaceInfo> interfaces;
|
||||
struct ifaddrs *ifaddrs_ptr = nullptr;
|
||||
static std::vector<InterfaceInfo> getAllInterfaces() {
|
||||
std::vector<InterfaceInfo> interfaces;
|
||||
struct ifaddrs* ifaddrs_ptr = nullptr;
|
||||
|
||||
if (getifaddrs(&ifaddrs_ptr) == -1) {
|
||||
return interfaces;
|
||||
}
|
||||
|
||||
for (struct ifaddrs *ifa = ifaddrs_ptr; ifa != nullptr; ifa = ifa->ifa_next) {
|
||||
if (ifa->ifa_addr == nullptr) continue;
|
||||
if (ifa->ifa_addr->sa_family != AF_INET) continue;
|
||||
|
||||
InterfaceInfo info;
|
||||
info.name = ifa->ifa_name;
|
||||
info.is_up = (ifa->ifa_flags & IFF_UP) != 0;
|
||||
info.is_running = (ifa->ifa_flags & IFF_RUNNING) != 0;
|
||||
info.is_loopback = (ifa->ifa_flags & IFF_LOOPBACK) != 0;
|
||||
|
||||
struct sockaddr_in* sa = (struct sockaddr_in*)ifa->ifa_addr;
|
||||
char ip_str[INET_ADDRSTRLEN];
|
||||
inet_ntop(AF_INET, &sa->sin_addr, ip_str, INET_ADDRSTRLEN);
|
||||
info.ip = ip_str;
|
||||
|
||||
interfaces.push_back(info);
|
||||
}
|
||||
|
||||
freeifaddrs(ifaddrs_ptr);
|
||||
return interfaces;
|
||||
if (getifaddrs(&ifaddrs_ptr) == -1) {
|
||||
return interfaces;
|
||||
}
|
||||
|
||||
static std::string getFirstUsableInterface() {
|
||||
auto interfaces = getAllInterfaces();
|
||||
for (struct ifaddrs* ifa = ifaddrs_ptr; ifa != nullptr;
|
||||
ifa = ifa->ifa_next) {
|
||||
if (ifa->ifa_addr == nullptr) continue;
|
||||
if (ifa->ifa_addr->sa_family != AF_INET) continue;
|
||||
|
||||
for (const auto& iface : interfaces) {
|
||||
if (iface.isUsable()) {
|
||||
return iface.name;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
InterfaceInfo info;
|
||||
info.name = ifa->ifa_name;
|
||||
info.is_up = (ifa->ifa_flags & IFF_UP) != 0;
|
||||
info.is_running = (ifa->ifa_flags & IFF_RUNNING) != 0;
|
||||
info.is_loopback = (ifa->ifa_flags & IFF_LOOPBACK) != 0;
|
||||
|
||||
struct sockaddr_in* sa = (struct sockaddr_in*)ifa->ifa_addr;
|
||||
char ip_str[INET_ADDRSTRLEN];
|
||||
inet_ntop(AF_INET, &sa->sin_addr, ip_str, INET_ADDRSTRLEN);
|
||||
info.ip = ip_str;
|
||||
|
||||
interfaces.push_back(info);
|
||||
}
|
||||
|
||||
static void displayAllInterfaces() {
|
||||
auto interfaces = getAllInterfaces();
|
||||
freeifaddrs(ifaddrs_ptr);
|
||||
return interfaces;
|
||||
}
|
||||
|
||||
printf("Available network interfaces:\n");
|
||||
for (const auto& iface : interfaces) {
|
||||
printf(" %s: %s [%s%s%s]\n",
|
||||
iface.name.c_str(),
|
||||
iface.ip.c_str(),
|
||||
iface.is_up ? "UP" : "DOWN",
|
||||
iface.is_running ? ",RUNNING" : "",
|
||||
iface.is_loopback ? ",LOOPBACK" : "");
|
||||
}
|
||||
static std::string getFirstUsableInterface() {
|
||||
auto interfaces = getAllInterfaces();
|
||||
|
||||
for (const auto& iface : interfaces) {
|
||||
if (iface.isUsable()) {
|
||||
return iface.name;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
static void displayAllInterfaces() {
|
||||
auto interfaces = getAllInterfaces();
|
||||
|
||||
printf("Available network interfaces:\n");
|
||||
for (const auto& iface : interfaces) {
|
||||
printf(" %s: %s [%s%s%s]\n",
|
||||
iface.name.c_str(),
|
||||
iface.ip.c_str(),
|
||||
iface.is_up ? "UP" : "DOWN",
|
||||
iface.is_running ? ",RUNNING" : "",
|
||||
iface.is_loopback ? ",LOOPBACK" : "");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class KVCacheConfig {
|
||||
private:
|
||||
// Configuration values
|
||||
int rdma_gid_index_;
|
||||
bool has_rdma_dest_port_override_; // 替代 std::optional
|
||||
int rdma_dest_port_override_;
|
||||
const char* socket_interface_;
|
||||
char* socket_interface_buffer_;
|
||||
bool gdrcopy_flush_enabled_;
|
||||
bool verify_read_enabled_;
|
||||
bool debug_mode_enabled_;
|
||||
bool debug_output_enabled_;
|
||||
const char* debug_file_path_;
|
||||
const char* error_file_path_;
|
||||
bool relax_ordering_enabled_;
|
||||
int ib_timeout_;
|
||||
const char* rdma_nics_;
|
||||
private:
|
||||
// Configuration values
|
||||
int rdma_gid_index_;
|
||||
bool has_rdma_dest_port_override_; // 替代 std::optional
|
||||
int rdma_dest_port_override_;
|
||||
const char* socket_interface_;
|
||||
char* socket_interface_buffer_;
|
||||
bool gdrcopy_flush_enabled_;
|
||||
bool verify_read_enabled_;
|
||||
bool debug_mode_enabled_;
|
||||
bool debug_output_enabled_;
|
||||
const char* debug_file_path_;
|
||||
const char* error_file_path_;
|
||||
bool relax_ordering_enabled_;
|
||||
int ib_timeout_;
|
||||
const char* rdma_nics_;
|
||||
|
||||
// Private constructor for singleton pattern
|
||||
KVCacheConfig() {
|
||||
// Initialize configuration from environment variables
|
||||
rdma_gid_index_ = parse_int_value(
|
||||
std::getenv("KVCACHE_RDMA_GID_INDEX"), 3, "KVCACHE_RDMA_GID_INDEX");
|
||||
// Private constructor for singleton pattern
|
||||
KVCacheConfig() {
|
||||
// Initialize configuration from environment variables
|
||||
rdma_gid_index_ = parse_int_value(
|
||||
std::getenv("KVCACHE_RDMA_GID_INDEX"), 3, "KVCACHE_RDMA_GID_INDEX");
|
||||
|
||||
// Parse optional RDMA port override
|
||||
const char* port_value = std::getenv("SET_RDMA_DEST_PORT");
|
||||
has_rdma_dest_port_override_ = false; // 默认为false
|
||||
if (port_value) {
|
||||
try {
|
||||
rdma_dest_port_override_ = std::stoi(std::string(port_value));
|
||||
has_rdma_dest_port_override_ = true;
|
||||
} catch (const std::exception& e) {
|
||||
fprintf(stderr, "Invalid SET_RDMA_DEST_PORT value: '%s', ignoring\n", port_value);
|
||||
}
|
||||
}
|
||||
|
||||
const char* env_interface = std::getenv("KVCACHE_SOCKET_IFNAME");
|
||||
|
||||
if (env_interface && env_interface[0] != '\0') {
|
||||
socket_interface_ = env_interface;
|
||||
printf("Using specified interface: %s\n", socket_interface_);
|
||||
} else {
|
||||
std::string iface = NetworkInterfaceManager::getFirstUsableInterface();
|
||||
if (!iface.empty()) {
|
||||
socket_interface_buffer_ = new char[iface.size() + 1];
|
||||
std::strcpy(socket_interface_buffer_, iface.c_str());
|
||||
socket_interface_ = socket_interface_buffer_;
|
||||
printf("Auto-detected interface: %s\n", socket_interface_);
|
||||
} else {
|
||||
fprintf(stderr, "Warning: No usable network interface found\n");
|
||||
socket_interface_ = "";
|
||||
}
|
||||
NetworkInterfaceManager::displayAllInterfaces();
|
||||
}
|
||||
|
||||
socket_interface_ = std::getenv("KVCACHE_SOCKET_IFNAME");
|
||||
debug_file_path_ = std::getenv("KVCACHE_DEBUG_FILE");
|
||||
error_file_path_ = std::getenv("KVCACHE_ERROR_FILE");
|
||||
|
||||
gdrcopy_flush_enabled_ = parse_bool_value(std::getenv("KVCACHE_GDRCOPY_FLUSH_ENABLE"));
|
||||
verify_read_enabled_ = parse_bool_value(std::getenv("KVCACHE_VERIFY_READ"));
|
||||
debug_mode_enabled_ = parse_bool_value(std::getenv("KVCACHE_DEBUG")) ||
|
||||
parse_bool_value(std::getenv("KV_IS_DEBUG_ENABLED"));
|
||||
debug_output_enabled_ = parse_bool_value(std::getenv("KVCACHE_DEBUG_OUTPUT"));
|
||||
|
||||
relax_ordering_enabled_ = parse_bool_value(std::getenv("KVCACHE_RELAX_ORDERING"));
|
||||
|
||||
ib_timeout_ = parse_int_value(
|
||||
std::getenv("KVCACHE_IB_TIMEOUT"),
|
||||
18,
|
||||
"KVCACHE_IB_TIMEOUT"
|
||||
);
|
||||
|
||||
rdma_nics_ = std::getenv("KVCACHE_RDMA_NICS");
|
||||
// Parse optional RDMA port override
|
||||
const char* port_value = std::getenv("SET_RDMA_DEST_PORT");
|
||||
has_rdma_dest_port_override_ = false; // 默认为false
|
||||
if (port_value) {
|
||||
try {
|
||||
rdma_dest_port_override_ = std::stoi(std::string(port_value));
|
||||
has_rdma_dest_port_override_ = true;
|
||||
} catch (const std::exception& e) {
|
||||
fprintf(stderr,
|
||||
"Invalid SET_RDMA_DEST_PORT value: '%s', ignoring\n",
|
||||
port_value);
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods
|
||||
bool parse_bool_value(const char* value) {
|
||||
if (!value) return false;
|
||||
const char* env_interface = std::getenv("KVCACHE_SOCKET_IFNAME");
|
||||
|
||||
std::string str_value(value);
|
||||
std::transform(str_value.begin(), str_value.end(), str_value.begin(), ::tolower);
|
||||
|
||||
return (str_value == "1" || str_value == "true" ||
|
||||
str_value == "on" || str_value == "yes");
|
||||
if (env_interface && env_interface[0] != '\0') {
|
||||
socket_interface_ = env_interface;
|
||||
printf("Using specified interface: %s\n", socket_interface_);
|
||||
} else {
|
||||
std::string iface = NetworkInterfaceManager::getFirstUsableInterface();
|
||||
if (!iface.empty()) {
|
||||
socket_interface_buffer_ = new char[iface.size() + 1];
|
||||
std::strcpy(socket_interface_buffer_, iface.c_str());
|
||||
socket_interface_ = socket_interface_buffer_;
|
||||
printf("Auto-detected interface: %s\n", socket_interface_);
|
||||
} else {
|
||||
fprintf(stderr, "Warning: No usable network interface found\n");
|
||||
socket_interface_ = "";
|
||||
}
|
||||
NetworkInterfaceManager::displayAllInterfaces();
|
||||
}
|
||||
|
||||
int parse_int_value(const char* value, int default_value, const char* env_name) {
|
||||
if (!value) return default_value;
|
||||
socket_interface_ = std::getenv("KVCACHE_SOCKET_IFNAME");
|
||||
debug_file_path_ = std::getenv("KVCACHE_DEBUG_FILE");
|
||||
error_file_path_ = std::getenv("KVCACHE_ERROR_FILE");
|
||||
|
||||
try {
|
||||
return std::stoi(std::string(value));
|
||||
} catch (const std::invalid_argument& e) {
|
||||
fprintf(stderr, "Invalid value for %s: '%s', using default: %d\n",
|
||||
env_name, value, default_value);
|
||||
return default_value;
|
||||
} catch (const std::out_of_range& e) {
|
||||
fprintf(stderr, "%s value out of range: '%s', using default: %d\n",
|
||||
env_name, value, default_value);
|
||||
return default_value;
|
||||
}
|
||||
gdrcopy_flush_enabled_ =
|
||||
parse_bool_value(std::getenv("KVCACHE_GDRCOPY_FLUSH_ENABLE"));
|
||||
verify_read_enabled_ = parse_bool_value(std::getenv("KVCACHE_VERIFY_READ"));
|
||||
debug_mode_enabled_ = parse_bool_value(std::getenv("KVCACHE_DEBUG")) ||
|
||||
parse_bool_value(std::getenv("KV_IS_DEBUG_ENABLED"));
|
||||
debug_output_enabled_ =
|
||||
parse_bool_value(std::getenv("KVCACHE_DEBUG_OUTPUT"));
|
||||
|
||||
relax_ordering_enabled_ =
|
||||
parse_bool_value(std::getenv("KVCACHE_RELAX_ORDERING"));
|
||||
|
||||
ib_timeout_ = parse_int_value(
|
||||
std::getenv("KVCACHE_IB_TIMEOUT"), 18, "KVCACHE_IB_TIMEOUT");
|
||||
|
||||
rdma_nics_ = std::getenv("KVCACHE_RDMA_NICS");
|
||||
}
|
||||
|
||||
// Helper methods
|
||||
bool parse_bool_value(const char* value) {
|
||||
if (!value) return false;
|
||||
|
||||
std::string str_value(value);
|
||||
std::transform(
|
||||
str_value.begin(), str_value.end(), str_value.begin(), ::tolower);
|
||||
|
||||
return (str_value == "1" || str_value == "true" || str_value == "on" ||
|
||||
str_value == "yes");
|
||||
}
|
||||
|
||||
int parse_int_value(const char* value,
|
||||
int default_value,
|
||||
const char* env_name) {
|
||||
if (!value) return default_value;
|
||||
|
||||
try {
|
||||
return std::stoi(std::string(value));
|
||||
} catch (const std::invalid_argument& e) {
|
||||
fprintf(stderr,
|
||||
"Invalid value for %s: '%s', using default: %d\n",
|
||||
env_name,
|
||||
value,
|
||||
default_value);
|
||||
return default_value;
|
||||
} catch (const std::out_of_range& e) {
|
||||
fprintf(stderr,
|
||||
"%s value out of range: '%s', using default: %d\n",
|
||||
env_name,
|
||||
value,
|
||||
default_value);
|
||||
return default_value;
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
// Prevent copying and assignment
|
||||
KVCacheConfig(const KVCacheConfig&) = delete;
|
||||
KVCacheConfig& operator=(const KVCacheConfig&) = delete;
|
||||
|
||||
// Get singleton instance
|
||||
static KVCacheConfig& getInstance() {
|
||||
static KVCacheConfig instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
int get_ib_timeout() const { return ib_timeout_; }
|
||||
|
||||
// Configuration retrieval methods
|
||||
int get_rdma_gid_index() const { return rdma_gid_index_; }
|
||||
|
||||
int resolve_rdma_dest_port(int default_port) const {
|
||||
return has_rdma_dest_port_override_ ? rdma_dest_port_override_
|
||||
: default_port;
|
||||
}
|
||||
|
||||
int resolve_rdma_dest_port(const std::string& default_port) const {
|
||||
try {
|
||||
return resolve_rdma_dest_port(std::stoi(default_port));
|
||||
} catch (const std::exception& e) {
|
||||
fprintf(
|
||||
stderr, "Invalid default port string: %s\n", default_port.c_str());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
const char* get_socket_interface() const { return socket_interface_; }
|
||||
const char* get_debug_file_path() const { return debug_file_path_; }
|
||||
const char* get_error_file_path() const { return error_file_path_; }
|
||||
const char* get_rdma_nics() const { return rdma_nics_; }
|
||||
|
||||
// Feature check methods
|
||||
bool is_gdrcopy_flush_enabled() const { return gdrcopy_flush_enabled_; }
|
||||
bool is_verify_read_enabled() const { return verify_read_enabled_; }
|
||||
bool is_debug_mode_enabled() const { return debug_mode_enabled_; }
|
||||
bool is_debug_output_enabled() const { return debug_output_enabled_; }
|
||||
bool is_relax_ordering_enabled() const { return relax_ordering_enabled_; }
|
||||
|
||||
// Display configuration
|
||||
void displayConfiguration() const {
|
||||
INFO("KVCache Configuration:\n");
|
||||
INFO("Init KVCacheConfig RDMA GID Index: %d\n", rdma_gid_index_);
|
||||
|
||||
if (has_rdma_dest_port_override_) {
|
||||
INFO("Init KVCacheConfig RDMA Destination Port Override: %d\n",
|
||||
rdma_dest_port_override_);
|
||||
}
|
||||
|
||||
public:
|
||||
// Prevent copying and assignment
|
||||
KVCacheConfig(const KVCacheConfig&) = delete;
|
||||
KVCacheConfig& operator=(const KVCacheConfig&) = delete;
|
||||
|
||||
// Get singleton instance
|
||||
static KVCacheConfig& getInstance() {
|
||||
static KVCacheConfig instance;
|
||||
return instance;
|
||||
if (socket_interface_) {
|
||||
INFO("Init KVCacheConfig Socket Interface: %s\n", socket_interface_);
|
||||
}
|
||||
|
||||
int get_ib_timeout() const { return ib_timeout_; }
|
||||
INFO("Init KVCacheConfig GDRCopy Flush: %s\n",
|
||||
gdrcopy_flush_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Verify Read: %s\n",
|
||||
verify_read_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Debug Mode: %s\n",
|
||||
debug_mode_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Debug Output: %s\n",
|
||||
debug_output_enabled_ ? "enabled" : "disabled");
|
||||
|
||||
// Configuration retrieval methods
|
||||
int get_rdma_gid_index() const { return rdma_gid_index_; }
|
||||
|
||||
int resolve_rdma_dest_port(int default_port) const {
|
||||
return has_rdma_dest_port_override_ ? rdma_dest_port_override_ : default_port;
|
||||
if (debug_file_path_) {
|
||||
INFO("Init KVCacheConfig Debug File: %s\n", debug_file_path_);
|
||||
}
|
||||
|
||||
int resolve_rdma_dest_port(const std::string& default_port) const {
|
||||
try {
|
||||
return resolve_rdma_dest_port(std::stoi(default_port));
|
||||
} catch (const std::exception& e) {
|
||||
fprintf(stderr, "Invalid default port string: %s\n", default_port.c_str());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
const char* get_socket_interface() const { return socket_interface_; }
|
||||
const char* get_debug_file_path() const { return debug_file_path_; }
|
||||
const char* get_error_file_path() const { return error_file_path_; }
|
||||
const char* get_rdma_nics() const { return rdma_nics_; }
|
||||
|
||||
// Feature check methods
|
||||
bool is_gdrcopy_flush_enabled() const { return gdrcopy_flush_enabled_; }
|
||||
bool is_verify_read_enabled() const { return verify_read_enabled_; }
|
||||
bool is_debug_mode_enabled() const { return debug_mode_enabled_; }
|
||||
bool is_debug_output_enabled() const { return debug_output_enabled_; }
|
||||
bool is_relax_ordering_enabled() const { return relax_ordering_enabled_; }
|
||||
|
||||
// Display configuration
|
||||
void displayConfiguration() const {
|
||||
INFO("KVCache Configuration:\n");
|
||||
INFO("Init KVCacheConfig RDMA GID Index: %d\n", rdma_gid_index_);
|
||||
|
||||
if (has_rdma_dest_port_override_) {
|
||||
INFO("Init KVCacheConfig RDMA Destination Port Override: %d\n", rdma_dest_port_override_);
|
||||
}
|
||||
|
||||
if (socket_interface_) {
|
||||
INFO("Init KVCacheConfig Socket Interface: %s\n", socket_interface_);
|
||||
}
|
||||
|
||||
INFO("Init KVCacheConfig GDRCopy Flush: %s\n", gdrcopy_flush_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Verify Read: %s\n", verify_read_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Debug Mode: %s\n", debug_mode_enabled_ ? "enabled" : "disabled");
|
||||
INFO("Init KVCacheConfig Debug Output: %s\n", debug_output_enabled_ ? "enabled" : "disabled");
|
||||
|
||||
if (debug_file_path_) {
|
||||
INFO("Init KVCacheConfig Debug File: %s\n", debug_file_path_);
|
||||
}
|
||||
|
||||
if (error_file_path_) {
|
||||
INFO("Init KVCacheConfig Error File: %s\n", error_file_path_);
|
||||
}
|
||||
if (error_file_path_) {
|
||||
INFO("Init KVCacheConfig Error File: %s\n", error_file_path_);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -17,14 +17,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <sys/syscall.h>
|
||||
#include <sys/stat.h>
|
||||
#include <libgen.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include "log.h"
|
||||
#include <errno.h>
|
||||
#include <libgen.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/syscall.h>
|
||||
#include "util.h"
|
||||
|
||||
static int pid = -1;
|
||||
@@ -33,180 +33,237 @@ static char hostname[64];
|
||||
char global_log_last_error[1024] = "";
|
||||
FILE *global_debug_file = stdout;
|
||||
FILE *global_error_file = stdout;
|
||||
static char global_debug_file_name[PATH_MAX+1] = "";
|
||||
static char global_err_file_name[PATH_MAX+1] = "";
|
||||
static char global_debug_file_name[PATH_MAX + 1] = "";
|
||||
static char global_err_file_name[PATH_MAX + 1] = "";
|
||||
int global_debug_level = -1;
|
||||
pthread_mutex_t global_debug_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_mutex_t global_log_file_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
void log_file_init(FILE **kv_cache_log_file, const char *kv_cache_log_file_env, char *logFileName) {
|
||||
int c = 0;
|
||||
char *dfn = logFileName;
|
||||
while (c < PATH_MAX && kv_cache_log_file_env[c] != '\0') {
|
||||
if (kv_cache_log_file_env[c++] != '%') {
|
||||
*dfn++ = kv_cache_log_file_env[c - 1];
|
||||
continue;
|
||||
}
|
||||
switch (kv_cache_log_file_env[c++]) {
|
||||
case '%': // Double %
|
||||
*dfn++ = '%';
|
||||
break;
|
||||
case 'h': // %h = hostname
|
||||
dfn += snprintf(dfn, PATH_MAX, "%s", hostname);
|
||||
break;
|
||||
case 'p': // %p = pid
|
||||
dfn += snprintf(dfn, PATH_MAX, "%d", pid);
|
||||
break;
|
||||
default: // Echo everything we don't understand
|
||||
*dfn++ = '%';
|
||||
*dfn++ = kv_cache_log_file_env[c - 1];
|
||||
break;
|
||||
}
|
||||
void log_file_init(FILE **kv_cache_log_file,
|
||||
const char *kv_cache_log_file_env,
|
||||
char *logFileName) {
|
||||
int c = 0;
|
||||
char *dfn = logFileName;
|
||||
while (c < PATH_MAX && kv_cache_log_file_env[c] != '\0') {
|
||||
if (kv_cache_log_file_env[c++] != '%') {
|
||||
*dfn++ = kv_cache_log_file_env[c - 1];
|
||||
continue;
|
||||
}
|
||||
*dfn = '\0';
|
||||
if (logFileName[0] != '\0') {
|
||||
FILE *file = fopen(logFileName, "w");
|
||||
if (file != nullptr) {
|
||||
setbuf(file, nullptr); // disable buffering
|
||||
*kv_cache_log_file = file;
|
||||
}
|
||||
switch (kv_cache_log_file_env[c++]) {
|
||||
case '%': // Double %
|
||||
*dfn++ = '%';
|
||||
break;
|
||||
case 'h': // %h = hostname
|
||||
dfn += snprintf(dfn, PATH_MAX, "%s", hostname);
|
||||
break;
|
||||
case 'p': // %p = pid
|
||||
dfn += snprintf(dfn, PATH_MAX, "%d", pid);
|
||||
break;
|
||||
default: // Echo everything we don't understand
|
||||
*dfn++ = '%';
|
||||
*dfn++ = kv_cache_log_file_env[c - 1];
|
||||
break;
|
||||
}
|
||||
}
|
||||
*dfn = '\0';
|
||||
if (logFileName[0] != '\0') {
|
||||
FILE *file = fopen(logFileName, "w");
|
||||
if (file != nullptr) {
|
||||
setbuf(file, nullptr); // disable buffering
|
||||
*kv_cache_log_file = file;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void recreate_log_file(FILE **kv_cache_log_file, char *logFileName) {
|
||||
if (logFileName[0] != '\0') {
|
||||
pthread_mutex_lock(&global_log_file_lock);
|
||||
FILE *file = fopen(logFileName, "a"); // Use "a" mode to append if file exists, otherwise create it
|
||||
// close the previous log file if it exists
|
||||
if (*kv_cache_log_file != NULL && *kv_cache_log_file != file) {
|
||||
fclose(*kv_cache_log_file);
|
||||
*kv_cache_log_file = NULL;
|
||||
}
|
||||
if (file != NULL) {
|
||||
setbuf(file, NULL); // disable buffering
|
||||
*kv_cache_log_file = file;
|
||||
}
|
||||
pthread_mutex_unlock(&global_log_file_lock);
|
||||
if (logFileName[0] != '\0') {
|
||||
pthread_mutex_lock(&global_log_file_lock);
|
||||
FILE *file = fopen(
|
||||
logFileName,
|
||||
"a"); // Use "a" mode to append if file exists, otherwise create it
|
||||
// close the previous log file if it exists
|
||||
if (*kv_cache_log_file != NULL && *kv_cache_log_file != file) {
|
||||
fclose(*kv_cache_log_file);
|
||||
*kv_cache_log_file = NULL;
|
||||
}
|
||||
if (file != NULL) {
|
||||
setbuf(file, NULL); // disable buffering
|
||||
*kv_cache_log_file = file;
|
||||
}
|
||||
pthread_mutex_unlock(&global_log_file_lock);
|
||||
}
|
||||
}
|
||||
|
||||
void debug_init() {
|
||||
pthread_mutex_lock(&global_debug_lock);
|
||||
if (global_debug_level != -1) {
|
||||
pthread_mutex_unlock(&global_debug_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
const char* kv_cache_debug = std::getenv("KV_IS_DEBUG_ENABLED");
|
||||
int tempg_kv_cache_debug_level = -1;
|
||||
|
||||
if (kv_cache_debug == NULL) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
} else if (strcasecmp(kv_cache_debug, "0") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
} else if (strcasecmp(kv_cache_debug, "1") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_DEBUG;
|
||||
} else if (strcasecmp(kv_cache_debug, "2") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_WARN;
|
||||
} else if (strcasecmp(kv_cache_debug, "3") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_ERROR;
|
||||
} else {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
}
|
||||
|
||||
gethostname(hostname, 64);
|
||||
pid = getpid();
|
||||
|
||||
const char* g_kv_cache_debug_fileEnv = KVCacheConfig::getInstance().get_debug_file_path();
|
||||
if (tempg_kv_cache_debug_level >= KV_LOG_LEVEL_INFO && g_kv_cache_debug_fileEnv != NULL) {
|
||||
log_file_init(&global_debug_file, g_kv_cache_debug_fileEnv, global_debug_file_name);
|
||||
}
|
||||
|
||||
const char* g_kv_cache_error_fileEnv = KVCacheConfig::getInstance().get_error_file_path();
|
||||
if (tempg_kv_cache_debug_level >= KV_LOG_LEVEL_INFO && g_kv_cache_error_fileEnv != NULL) {
|
||||
log_file_init(&global_error_file, g_kv_cache_error_fileEnv, global_err_file_name);
|
||||
char buffer[1024];
|
||||
size_t len = 0;
|
||||
char timeBuffer[80]; // Buffer to hold the formatted time
|
||||
std::time_t absoluteTime = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
std::strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", std::localtime(&absoluteTime));
|
||||
len = snprintf(buffer, sizeof(buffer), "%s KV_CACHE START ", timeBuffer);
|
||||
buffer[len++] = '\n';
|
||||
if (global_error_file != NULL) {
|
||||
fwrite(buffer, 1, len, global_error_file);
|
||||
}
|
||||
}
|
||||
__atomic_store_n(&global_debug_level, tempg_kv_cache_debug_level, __ATOMIC_RELEASE);
|
||||
pthread_mutex_lock(&global_debug_lock);
|
||||
if (global_debug_level != -1) {
|
||||
pthread_mutex_unlock(&global_debug_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
const char *kv_cache_debug = std::getenv("KV_IS_DEBUG_ENABLED");
|
||||
int tempg_kv_cache_debug_level = -1;
|
||||
|
||||
if (kv_cache_debug == NULL) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
} else if (strcasecmp(kv_cache_debug, "0") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
} else if (strcasecmp(kv_cache_debug, "1") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_DEBUG;
|
||||
} else if (strcasecmp(kv_cache_debug, "2") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_WARN;
|
||||
} else if (strcasecmp(kv_cache_debug, "3") == 0) {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_ERROR;
|
||||
} else {
|
||||
tempg_kv_cache_debug_level = KV_LOG_LEVEL_INFO;
|
||||
}
|
||||
|
||||
gethostname(hostname, 64);
|
||||
pid = getpid();
|
||||
|
||||
const char *g_kv_cache_debug_fileEnv =
|
||||
KVCacheConfig::getInstance().get_debug_file_path();
|
||||
if (tempg_kv_cache_debug_level >= KV_LOG_LEVEL_INFO &&
|
||||
g_kv_cache_debug_fileEnv != NULL) {
|
||||
log_file_init(
|
||||
&global_debug_file, g_kv_cache_debug_fileEnv, global_debug_file_name);
|
||||
}
|
||||
|
||||
const char *g_kv_cache_error_fileEnv =
|
||||
KVCacheConfig::getInstance().get_error_file_path();
|
||||
if (tempg_kv_cache_debug_level >= KV_LOG_LEVEL_INFO &&
|
||||
g_kv_cache_error_fileEnv != NULL) {
|
||||
log_file_init(
|
||||
&global_error_file, g_kv_cache_error_fileEnv, global_err_file_name);
|
||||
char buffer[1024];
|
||||
size_t len = 0;
|
||||
char timeBuffer[80]; // Buffer to hold the formatted time
|
||||
std::time_t absoluteTime =
|
||||
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
std::strftime(timeBuffer,
|
||||
sizeof(timeBuffer),
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
std::localtime(&absoluteTime));
|
||||
len = snprintf(buffer, sizeof(buffer), "%s KV_CACHE START ", timeBuffer);
|
||||
buffer[len++] = '\n';
|
||||
if (global_error_file != NULL) {
|
||||
fwrite(buffer, 1, len, global_error_file);
|
||||
}
|
||||
}
|
||||
__atomic_store_n(
|
||||
&global_debug_level, tempg_kv_cache_debug_level, __ATOMIC_RELEASE);
|
||||
pthread_mutex_unlock(&global_debug_lock);
|
||||
}
|
||||
|
||||
/* Common logging function used by the INFO, DEBUG and WARN macros
|
||||
* Also exported to the dynamically loadable Net transport modules so
|
||||
* they can share the debugging mechanisms and output files
|
||||
*/
|
||||
void debug_log(KVLogLevel level, bool enable_to_terminal, const char *filefunc, int line, const char *fmt, ...) {
|
||||
if (__atomic_load_n(&global_debug_level, __ATOMIC_ACQUIRE) == -1) {
|
||||
debug_init();
|
||||
}
|
||||
void debug_log(KVLogLevel level,
|
||||
bool enable_to_terminal,
|
||||
const char *filefunc,
|
||||
int line,
|
||||
const char *fmt,
|
||||
...) {
|
||||
if (__atomic_load_n(&global_debug_level, __ATOMIC_ACQUIRE) == -1) {
|
||||
debug_init();
|
||||
}
|
||||
|
||||
// Save the last error (WARN) as a human readable string
|
||||
if (level == KV_LOG_LEVEL_WARN) {
|
||||
pthread_mutex_lock(&global_debug_lock);
|
||||
va_list vargs;
|
||||
va_start(vargs, fmt);
|
||||
(void) vsnprintf(global_log_last_error, sizeof(global_log_last_error), fmt, vargs);
|
||||
va_end(vargs);
|
||||
pthread_mutex_unlock(&global_debug_lock);
|
||||
}
|
||||
// Save the last error (WARN) as a human readable string
|
||||
if (level == KV_LOG_LEVEL_WARN) {
|
||||
pthread_mutex_lock(&global_debug_lock);
|
||||
va_list vargs;
|
||||
va_start(vargs, fmt);
|
||||
(void)vsnprintf(
|
||||
global_log_last_error, sizeof(global_log_last_error), fmt, vargs);
|
||||
va_end(vargs);
|
||||
pthread_mutex_unlock(&global_debug_lock);
|
||||
}
|
||||
|
||||
if (tid == -1) {
|
||||
tid = syscall(SYS_gettid);
|
||||
}
|
||||
if (tid == -1) {
|
||||
tid = syscall(SYS_gettid);
|
||||
}
|
||||
|
||||
char buffer[1024];
|
||||
size_t len = 0;
|
||||
// Convert timestamp to absolute time and directly use it in the snprintf function
|
||||
std::time_t absoluteTime = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
char timeBuffer[80]; // Buffer to hold the formatted time
|
||||
std::strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", std::localtime(&absoluteTime));
|
||||
char buffer[1024];
|
||||
size_t len = 0;
|
||||
// Convert timestamp to absolute time and directly use it in the snprintf
|
||||
// function
|
||||
std::time_t absoluteTime =
|
||||
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
char timeBuffer[80]; // Buffer to hold the formatted time
|
||||
std::strftime(timeBuffer,
|
||||
sizeof(timeBuffer),
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
std::localtime(&absoluteTime));
|
||||
|
||||
if (level == KV_LOG_LEVEL_WARN) {
|
||||
len = snprintf(buffer, sizeof(buffer), "\n%s %s:%d:%d %s:%d KV_CACHE WARN ",
|
||||
timeBuffer, hostname, pid, tid, filefunc, line);
|
||||
} else if (level == KV_LOG_LEVEL_INFO) {
|
||||
len = snprintf(buffer, sizeof(buffer), "%s %s:%d:%d KV_CACHE INFO ", timeBuffer, hostname, pid, tid);
|
||||
} else if (level == KV_LOG_LEVEL_DEBUG) {
|
||||
len = snprintf(buffer, sizeof(buffer), "%s %s:%d:%d KV_CACHE DEBUG ", timeBuffer, hostname, pid, tid);
|
||||
} else if (level == KV_LOG_LEVEL_ERROR) {
|
||||
len = snprintf(buffer, sizeof(buffer), "%s %s:%d:%d KV_CACHE ERROR ", timeBuffer, hostname, pid, tid);
|
||||
} else {
|
||||
len = snprintf(buffer, sizeof(buffer), "%s %s:%d:%d KV_CACHE ", timeBuffer, hostname, pid, tid);
|
||||
}
|
||||
if (level == KV_LOG_LEVEL_WARN) {
|
||||
len = snprintf(buffer,
|
||||
sizeof(buffer),
|
||||
"\n%s %s:%d:%d %s:%d KV_CACHE WARN ",
|
||||
timeBuffer,
|
||||
hostname,
|
||||
pid,
|
||||
tid,
|
||||
filefunc,
|
||||
line);
|
||||
} else if (level == KV_LOG_LEVEL_INFO) {
|
||||
len = snprintf(buffer,
|
||||
sizeof(buffer),
|
||||
"%s %s:%d:%d KV_CACHE INFO ",
|
||||
timeBuffer,
|
||||
hostname,
|
||||
pid,
|
||||
tid);
|
||||
} else if (level == KV_LOG_LEVEL_DEBUG) {
|
||||
len = snprintf(buffer,
|
||||
sizeof(buffer),
|
||||
"%s %s:%d:%d KV_CACHE DEBUG ",
|
||||
timeBuffer,
|
||||
hostname,
|
||||
pid,
|
||||
tid);
|
||||
} else if (level == KV_LOG_LEVEL_ERROR) {
|
||||
len = snprintf(buffer,
|
||||
sizeof(buffer),
|
||||
"%s %s:%d:%d KV_CACHE ERROR ",
|
||||
timeBuffer,
|
||||
hostname,
|
||||
pid,
|
||||
tid);
|
||||
} else {
|
||||
len = snprintf(buffer,
|
||||
sizeof(buffer),
|
||||
"%s %s:%d:%d KV_CACHE ",
|
||||
timeBuffer,
|
||||
hostname,
|
||||
pid,
|
||||
tid);
|
||||
}
|
||||
|
||||
if (len) {
|
||||
va_list vargs;
|
||||
va_start(vargs, fmt);
|
||||
len += vsnprintf(buffer + len, sizeof(buffer) - len, fmt, vargs);
|
||||
va_end(vargs);
|
||||
// vsnprintf may return len > sizeof(buffer) in the case of a truncated output.
|
||||
// Rewind len so that we can replace the final \0 by \n
|
||||
if (len > sizeof(buffer)) {
|
||||
len = sizeof(buffer) - 1;
|
||||
}
|
||||
buffer[len++] = '\n';
|
||||
if (access(global_debug_file_name, F_OK) != 0) {
|
||||
recreate_log_file(&global_debug_file, global_debug_file_name);
|
||||
}
|
||||
if (enable_to_terminal) {
|
||||
fwrite(buffer, 1, len, global_debug_file);
|
||||
}
|
||||
if (level == KV_LOG_LEVEL_WARN && global_error_file != stdout) {
|
||||
if (access(global_err_file_name, F_OK) != 0) {
|
||||
recreate_log_file(&global_error_file, global_err_file_name);
|
||||
}
|
||||
if (global_error_file != NULL) {
|
||||
fwrite(buffer, 1, len, global_error_file);
|
||||
}
|
||||
}
|
||||
if (len) {
|
||||
va_list vargs;
|
||||
va_start(vargs, fmt);
|
||||
len += vsnprintf(buffer + len, sizeof(buffer) - len, fmt, vargs);
|
||||
va_end(vargs);
|
||||
// vsnprintf may return len > sizeof(buffer) in the case of a truncated
|
||||
// output. Rewind len so that we can replace the final \0 by \n
|
||||
if (len > sizeof(buffer)) {
|
||||
len = sizeof(buffer) - 1;
|
||||
}
|
||||
buffer[len++] = '\n';
|
||||
if (access(global_debug_file_name, F_OK) != 0) {
|
||||
recreate_log_file(&global_debug_file, global_debug_file_name);
|
||||
}
|
||||
if (enable_to_terminal) {
|
||||
fwrite(buffer, 1, len, global_debug_file);
|
||||
}
|
||||
if (level == KV_LOG_LEVEL_WARN && global_error_file != stdout) {
|
||||
if (access(global_err_file_name, F_OK) != 0) {
|
||||
recreate_log_file(&global_error_file, global_err_file_name);
|
||||
}
|
||||
if (global_error_file != NULL) {
|
||||
fwrite(buffer, 1, len, global_error_file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,17 +6,22 @@
|
||||
namespace py = pybind11;
|
||||
|
||||
PYBIND11_MODULE(rdma_comm, m) {
|
||||
m.doc() = R"pbdoc(kv cache messager)pbdoc";
|
||||
py::class_<RDMACommunicator>(m, "RDMACommunicator")
|
||||
.def(py::init<std::string &, int, std::string &, std::vector<int64_t>,
|
||||
std::vector<int64_t>, int, int>())
|
||||
.def("connect", &RDMACommunicator::connect)
|
||||
.def("is_connected", &RDMACommunicator::is_connected)
|
||||
.def("write_cache", &RDMACommunicator::write_cache);
|
||||
m.doc() = R"pbdoc(kv cache messager)pbdoc";
|
||||
py::class_<RDMACommunicator>(m, "RDMACommunicator")
|
||||
.def(py::init<std::string &,
|
||||
int,
|
||||
std::string &,
|
||||
std::vector<int64_t>,
|
||||
std::vector<int64_t>,
|
||||
int,
|
||||
int>())
|
||||
.def("connect", &RDMACommunicator::connect)
|
||||
.def("is_connected", &RDMACommunicator::is_connected)
|
||||
.def("write_cache", &RDMACommunicator::write_cache);
|
||||
|
||||
#ifdef VERSION_INFO
|
||||
m.attr("__version__") = VERSION_INFO;
|
||||
m.attr("__version__") = VERSION_INFO;
|
||||
#else
|
||||
m.attr("__version__") = "dev";
|
||||
m.attr("__version__") = "dev";
|
||||
#endif
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user