select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。 select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省不少的开销。
select
:select
使用一个包含所有文件描述符的位图数据结构(通常是fd_set
)。这个位图的大小通常由FD_SETSIZE
宏定义决定,这个宏的默认值是 1024。当调用 select
时,内核会遍历位图中的每个文件描述符,检查是否有可读、可写、或异常事件。这个遍历是线性的,因此性能受到文件描述符数量的限制。select
是水平触发,它会一直通知应用程序,直到所有就绪事件被处理。poll
:poll
使用一个存放在用户态的数组来注册文件描述符。数组中的每个元素都包含了文件描述符和等待的事件。当调用 poll
时,内核会遍历这个数组,检查每个文件描述符是否有就绪事件。和select
一样,这个遍历也是线性的,因此性能受到文件描述符数量的限制。epoll
:epoll
使用一个事件表(event table)来注册文件描述符,事件表是内核空间的数据结构。当调用 epoll_create
时,内核会创建一个事件表,并返回一个文件描述符,应用程序可以使用这个文件描述符来操作事件表。epoll
使用事件通知机制。当文件描述符就绪时,内核会将这个事件添加到事件表中,然后将一个就绪队列中的事件通知应用程序。应用程序可以调用 epoll_wait
来等待事件就绪,并获得就绪的事件列表,这个操作是阻塞的,直到有事件就绪。epoll
可以使用水平触发或边缘触发模式。在边缘触发模式下,只有在状态发生变化时才会通知应用程序。
map是有序的,通常是按照键(key)的顺序来排序的,这种有序性是通过红黑树(Red-Black Tree)来实现的。
每个节点要么是红色,要么是黑色。 根节点是黑色的。 每个叶子节点(NIL 节点)都是黑色的。 如果一个节点是红色的,那么它的两个子节点必须是黑色的(不能出现两个相连的红节点)。 从任意节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点。
std::map或
std::set通常使用红黑树来实现。因此,当你迭代遍历一个
map时,它的键会按照升序排列,因为红黑树是有序的数据结构。
std::unordered_map和
std::unordered_set,它们使用哈希表来实现,不具备有序性。
std::map
:插入(Insertion):平均时间复杂度 O(log n)。由于它是基于红黑树实现的,插入操作会保持树的平衡,因此是对数时间复杂度。 搜索(Search):平均时间复杂度 O(log n)。根据红黑树的特性,搜索操作也是对数时间复杂度。 std::unordered_map
:插入(Insertion):平均时间复杂度 O(1)。哈希表实现的 unordered_map
通常可以在常数时间内完成插入操作,但在最坏情况下,插入的时间复杂度为 O(n),其中 n 是桶的数量。搜索(Search):平均时间复杂度 O(1)。类似于插入操作,平均情况下,哈希表的搜索操作是常数时间,但在最坏情况下,搜索的时间复杂度也是 O(n)。
unordered_map的性能取决于哈希函数的好坏以及哈希表的装载因子(load factor)。如果哈希函数分布不均匀或者装载因子过高,性能可能下降,导致搜索和插入操作的时间复杂度接近线性。所以,在使用
unordered_map的时候,选择合适的哈希函数和合理的装载因子是很重要的。
std::map对象,可能会导致竞态条件和数据不一致性。
使用互斥锁:你可以在访问和修改 std::map
的操作前使用互斥锁(Mutex)来锁定,确保在任何时刻只有一个线程可以访问或修改该std::map
。但可能会降低多线程程序的性能。
#include <iostream>#include <map>#include <mutex>std::map<int, int> myMap;std::mutex mtx;void addToMap(int key, int value) {std::lock_guard<std::mutex> lock(mtx); // 锁定互斥锁myMap[key] = value; // 修改 std::map}int main() {// 启动多个线程调用 addToMap 函数// ...return 0;}
使用线程安全容器:C++11 引入了一些线程安全的容器,如 std::mutex
,std::shared_mutex
和std::shared_timed_mutex
。可以使用这些容器来实现线程安全的操作,例如std::shared_mutex
用于读写锁定。
#include <iostream>#include <map>#include <shared_mutex>std::map<int, int> myMap;std::shared_mutex mtx;void addToMap(int key, int value) {std::unique_lock<std::shared_mutex> lock(mtx); // 锁定写锁myMap[key] = value; // 修改 std::map}int getFromMap(int key) {std::shared_lock<std::shared_mutex> lock(mtx); // 锁定读锁return myMap[key]; // 读取 std::map}int main() {// 启动多个线程调用 addToMap 和 getFromMap 函数// ...return 0;}
使用并发数据结构库:一些第三方库和框架提供了并发数据结构,如 Intel TBB 的 concurrent_hash_map
。这些数据结构专门设计用于多线程环境,可以提供更高的性能和并发性。避免共享状态:最安全的方法是避免共享可变状态。如果可能的话,设计你的多线程应用程序,使不同线程之间的数据共享最小化,从而减少竞态条件的机会。
创建一个哈希表,将键值对映射到哈希桶(bucket)中。 使用哈希函数来确定每个键应该映射到哪个桶中。 对于每个桶,使用无锁链表(例如,带有头节点的单向链表)来存储键值对。 对于插入操作,首先计算键的哈希值,然后找到相应的桶。接下来,使用CAS操作将新的键值对插入链表的开头。 对于查找操作,计算键的哈希值,找到相应的桶,然后在链表中查找键。 对于删除操作,计算键的哈希值,找到相应的桶,然后在链表中查找并删除键。 为了处理并发冲突,你需要使用CAS操作,如果CAS失败,表示有其他线程正在修改链表,你需要重试或采取其他策略。
#include <iostream>#include <vector>#include <atomic>template <typename K, typename V>class LockFreeMap {private:struct Node {K key;V value;std::atomic<Node*> next;Node(const K& k, const V& v) : key(k), value(v), next(nullptr) {}};std::vector<std::atomic<Node*>> buckets;static const size_t num_buckets = 100; // 根据需要设置桶的数量size_t hash(const K& key) const {// 简单哈希函数,根据需要使用更复杂的哈希函数return std::hash<K>{}(key) % num_buckets;}public:LockFreeMap() : buckets(num_buckets) {}void insert(const K& key, const V& value) {size_t bucket_index = hash(key);Node* newNode = new Node(key, value);while (true) {Node* head = buckets[bucket_index].load();newNode->next = head;if (buckets[bucket_index].compare_exchange_strong(head, newNode)) {return; // 插入成功}}}bool find(const K& key, V& value) const {size_t bucket_index = hash(key);Node* current = buckets[bucket_index].load();while (current) {if (current->key == key) {value = current->value;return true; // 找到了}current = current->next;}return false; // 没找到}bool remove(const K& key) {size_t bucket_index = hash(key);Node* current = buckets[bucket_index].load();Node* prev = nullptr;while (current) {if (current->key == key) {if (prev) {prev->next.store(current->next.load());} else {buckets[bucket_index].store(current->next.load());}delete current;return true; // 移除成功}prev = current;current = current->next.load();}return false; // 没找到要移除的节点}};int main() {LockFreeMap<int, std::string> map;map.insert(1, "One");map.insert(2, "Two");map.insert(3, "Three");std::string value;if (map.find(2, value)) {std::cout << "Found: " << value << std::endl;} else {std::cout << "Not Found" << std::endl;}if (map.remove(3)) {std::cout << "Removed: 3" << std::endl;} else {std::cout << "Not Found" << std::endl;}return 0;}
插入操作: 插入一个元素时,首先需要在底层链表中找到正确的插入位置,这需要 O(log n) 的时间,其中 n 是跳表中的元素数量。 然后,根据一定的概率,将该元素添加到不同的层级中。通常,元素被添加到更高层级的概率较低,因此平均复杂度仍然是 O(log n)。 删除操作: 删除一个元素时,同样需要在底层链表中找到该元素,这需要 O(log n) 的时间。 接下来,需要从各个层级中删除该元素。如果每个层级都需要删除,那么删除的总时间也是 O(log n)。
简单性和可读性:跳表的实现相对简单,易于理解和调试。跳表是一种平衡树结构,而 B+ 树更加复杂,涉及到更多的平衡操作和节点分裂合并,这使得 B+ 树的实现更复杂。 高效的插入和删除操作:Redis 的主要应用场景是作为内存数据库和缓存,其中读写速度非常重要。跳表在插入和删除操作上表现出色,这对于高并发的缓存系统至关重要。B+ 树的插入和删除操作相对复杂,需要维护平衡性,可能导致性能下降。 较低的内存开销:B+ 树通常需要更多的指针和额外的开销,以维护其平衡性,而跳表通常需要更少的指针,因此在某些情况下,跳表可能更节省内存。对于内存数据库和缓存,节省内存开销非常重要。 查询性能足够高:跳表的平均查询时间复杂度是 O(log n),这对于大多数应用来说已经足够高效。B+ 树在某些情况下可能更快,但差距并不明显,而跳表的实现更加简单。 灵活性:Redis 的设计注重简洁和灵活性,跳表更适合这种设计哲学。Redis 的跳表实现还可以轻松支持范围查询,这在一些场景下非常有用。
#include <iostream>#include <ctime>#include <hiredis/hiredis.h>int main() {// 连接到 Redis 服务器redisContext *redis = redisConnect("127.0.0.1", 6379);if (redis == NULL || redis->err) {if (redis) {std::cerr << "Redis connection error: " << redis->errstr << std::endl;redisFree(redis);} else {std::cerr << "Unable to allocate Redis context." << std::endl;}return 1;}// 限流配置const char *key = "my_rate_limiter";int rate = 5; // 每秒产生的令牌数int capacity = 5; // 桶的容量// 获取当前时间std::time_t current_time = std::time(nullptr);// 删除过期的令牌redisReply *reply = (redisReply *)redisCommand(redis, "ZREMRANGEBYSCORE %s -inf %d", key, current_time - rate);if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {std::cerr << "Redis ZREMRANGEBYSCORE error: " << redis->errstr << std::endl;freeReplyObject(reply);redisFree(redis);return 1;}freeReplyObject(reply);// 计算当前令牌数量reply = (redisReply *)redisCommand(redis, "ZCARD %s", key);if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {std::cerr << "Redis ZCARD error: " << redis->errstr << std::endl;freeReplyObject(reply);redisFree(redis);return 1;}int current_tokens = reply->integer;freeReplyObject(reply);// 添加令牌if (current_tokens < capacity) {reply = (redisReply *)redisCommand(redis, "ZADD %s %d %d", key, current_time, current_time);if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {std::cerr << "Redis ZADD error: " << redis->errstr << std::endl;freeReplyObject(reply);redisFree(redis);return 1;}freeReplyObject(reply);std::cout << "Connection allowed." << std::endl;} else {std::cout << "Connection denied." << std::endl;}// 断开 Redis 连接redisFree(redis);return 0;}
SETNX(SET if Not eXists)命令来实现。该命令会在指定的键不存在时设置该键的值,并返回 1,表示成功获得锁;如果该键已存在,表示锁已被其他客户端持有,返回 0,表示获取锁失败。
客户端尝试使用 SETNX
命令在 Redis 中创建一个唯一的锁键,并设置一个超时时间,以防止锁被无限期占用。如果 SETNX
命令返回 1,表示客户端成功获取了锁,可以执行临界区代码。在临界区代码执行完毕后,客户端可以使用 DEL
命令来释放锁,或者让锁自动超时。
set key_test "hello"expire key_test 500 # 500秒后过期
expireat key_test 1746131400
ttl key_test342pttl key_melon # 以毫秒为单位342432
persist key_test
#include <iostream>#include <cstring>#include <unistd.h>#include <hiredis/hiredis.h>// Redis服务器地址和端口const char* redis_host = "localhost";const int redis_port = 6379;// 锁的键名const char* lock_key = "my_lock";// 获取锁的函数bool acquire_lock(redisContext* redis_conn, const char* key, int timeout) {redisReply* reply = (redisReply*)redisCommand(redis_conn, "SET %s 1 NX EX %d", key, timeout);if (reply && reply->str && strcmp(reply->str, "OK") == 0) {freeReplyObject(reply);return true; // 获取锁成功}freeReplyObject(reply);return false; // 获取锁失败}// 释放锁的函数void release_lock(redisContext* redis_conn, const char* key) {redisReply* reply = (redisReply*)redisCommand(redis_conn, "DEL %s", key);freeReplyObject(reply);}int main() {// 连接到Redis服务器redisContext* redis_conn = redisConnect(redis_host, redis_port);if (redis_conn == nullptr || redis_conn->err) {std::cerr << "Error connecting to Redis: " << redis_conn->errstr << std::endl;return 1;}// 获取锁if (acquire_lock(redis_conn, lock_key, 600)) { // 锁的超时时间为600秒std::cout << "Lock acquired. Doing some work..." << std::endl;sleep(5); // 模拟需要加锁的操作std::cout << "Work complete." << std::endl;// 释放锁release_lock(redis_conn, lock_key);} else {std::cout << "Failed to acquire lock." << std::endl;}// 断开与Redis的连接redisFree(redis_conn);return 0;}
SET key 1 NX EX timeout命令,其中
key是锁的键名,
timeout是锁的超时时间(以秒为单位)。如果获取锁成功,它会执行一些模拟的工作,然后释放锁。如果获取锁失败,它会提示“Failed to acquire lock”。
#include <iostream>#include <hiredis/hiredis.h>int main() {// Redis服务器地址和端口const char* redis_host = "localhost";const int redis_port = 6379;// 锁的键名和锁的超时时间const char* lock_key = "my_lock";int lock_timeout = 600; // 锁的超时时间,单位为秒// Redis连接对象redisContext* redis_conn = nullptr;// 连接到Redis服务器redis_conn = redisConnect(redis_host, redis_port);if (redis_conn == nullptr || redis_conn->err) {std::cerr << "Error connecting to Redis: " << redis_conn->errstr << std::endl;return 1;}// 尝试获取锁redisReply* reply = (redisReply*)redisCommand(redis_conn, "SETNX %s 1", lock_key);if (reply && reply->integer == 1) {std::cout << "Lock acquired. Doing some work..." << std::endl;// 在这里执行需要加锁的操作// 释放锁redisReply* release_reply = (redisReply*)redisCommand(redis_conn, "DEL %s", lock_key);freeReplyObject(release_reply);std::cout << "Lock released." << std::endl;} else {std::cout << "Failed to acquire lock." << std::endl;}freeReplyObject(reply);// 关闭Redis连接redisFree(redis_conn);return 0;}
SETNX命令尝试获取锁。如果
SETNX返回 1,表示锁获取成功,然后可以执行需要加锁的操作。最后,在完成工作后,使用
DEL命令释放锁。
SETNX命令只有在锁不存在时才会设置成功,因此只有一个客户端能够成功地获取锁。其他客户端会不断尝试获取锁,直到成功为止,或者等待一段时间后放弃。
#include <iostream>#include <string>#include <sstream>// 反转单个单词的函数std::string reverseWord(const std::string& word) {std::string reversedWord = word;int left = 0;int right = reversedWord.length() - 1;while (left < right) {std::swap(reversedWord[left], reversedWord[right]);left++;right--;}return reversedWord;}std::string reverseString(const std::string& input) {std::istringstream iss(input);std::string word;std::string reversedString;while (iss >> word) {// 反转当前单词并添加到结果字符串std::string reversedWord = reverseWord(word);reversedString += reversedWord + " ";}// 去除末尾多余的空格if (!reversedString.empty()) {reversedString.pop_back();}return reversedString;}int main() {std::string input = "hello world";std::string reversed = reverseString(input);std::cout << "Original: " << input << std::endl;std::cout << "Reversed: " << reversed << std::endl;return 0;}

文章转载自阿Q正砖,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




