并发编程(19)——引用计数型无锁栈
文章目录
- 十九、day19
- 1. 引用计数
- 2. 代码实现
- 2.1 单引用计数器无锁栈
- 2.2 双引用计数器无锁栈
- 3. 本节的一些理解
十九、day19
上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点:
第一种方式:如果 pop
操作频繁被多个线程调用,待删除的节点将不断累积,导致待删除列表无法及时回收,进而占用大量内存。此时我们需要一种机制,每次调用 pop 函数结束后都需要判断侯删链表中的节点是否可以被 delete,即风险指针。
第二种方式:
-
如何正确且高效地分配这块内存是一个难点。
-
为了删除一个节点,需要遍历风险指针数组,检查该节点是否被某个风险指针引用。同时,在处理待删除节点时,也需要从风险数组中选取合适的节点记录其地址,这进一步增加了开销。
-
而且因为该方法受专利保护,所以在一些场合可能无法使用
在这一节中,我们学习如何通过引用计数来实现无锁栈以及安全回收机制。
参考:
恋恋风辰官方博客
《c++并发编程》中无锁栈的实现为什么要用双引用计数器_无锁栈 双重引用技术-CSDN博客
C++实现“无锁”双buffer - 知乎
1. 引用计数
本节的引用计数通过两个计数器实现:一个是存储在节点内部的 内部计数器,另一个是与指针绑定在一起的 外部计数器。外部计数器和指针被封装在一个结构体 counted_node_ptr
中,因此栈顶指针的类型从原生指针(void⭐、int⭐、char⭐、node⭐…)修改为 counted_node_ptr
。
struct counted_node_ptr {int external_count; // 外部计数器node* ptr; // 原始指针,指向节点
};
具体机制如下:
- 当线程加载
head
指针时,外部计数器会加 1,表示有新的线程在引用这个节点。 - 当线程不再引用某个节点时,内部计数器会减 1。
通过将 内部计数器 和 外部计数器 相加,就可以得到该节点的实际引用计数。当引用计数归零时,说明没有线程再访问这个节点,此时可以安全地删除它。这种设计确保了节点的引用计数在并发环境下能够准确管理,同时避免了资源泄露的问题。数据结构如下图所示
当线程尝试从栈中弹出数据时,会按照以下步骤操作:
- 读取栈顶指针
线程首先读取head
指针(栈顶指针),将其值存入线程的局部变量local_head
中,同时将head
指针的外部计数器加 1。这个过程是原子的,因此能够确保线程安全。 - 同步计数器
将local_head
的外部计数器更新为与head
的外部计数器相同。随后,线程可以通过local_head
访问节点。 - 判断能否弹出节点
- 如果在
local_head
载入后,发现local_head
的指针值与当前head
不一致,说明有其他线程在此期间修改了head
。此时,当前线程无法弹出节点,因为它载入的local_head
已作废。 - 线程会将节点的内部计数器减 1(因为当前线程不再指向该节点)。如果内部计数器变为 0(说明没有其他线程再引用这个节点),当前线程负责删除节点;否则,什么也不做。
- 如果在
- 成功弹出节点
- 如果
local_head
的指针值仍与head
相同,说明没有其他线程修改head
,当前线程可以成功弹出节点。 - 线程将
head
指针更新为当前节点的下一个节点(即head.ptr->next
),这个更新操作是原子的。
- 如果
- 更新计数器
- 弹出节点后,将
local_head
的外部计数器减 1,因为head
指针已经不再指向这个节点。 - 再次将
local_head
的外部计数器减 1,因为local_head
也准备不再指向这个节点(实际上,可以一次性减 2,这里分开解释是为了说明两个来源)。 - 然后,将
local_head
的外部计数器值加到节点的内部计数器上。
- 弹出节点后,将
- 判断节点是否需要删除
- 如果内部计数器因此变为 0(即它原本是外部计数器的相反数),说明没有线程再引用该节点,当前线程负责删除节点。
- 如果内部计数器不为 0,则无需删除节点。
通过这样的流程,可以确保节点在无人引用时才被安全回收,同时减少不必要的内存操作,提升了效率和线程安全性。
这段文字光看很难看懂,我们通过代码进行解释:
2. 代码实现
2.1 单引用计数器无锁栈
在此之前,我们需要知道:该方式必须通过两个计数器才能实现,如果只用一个,会有一些问题。我们先通过只用一个计数器实现引用计数,进而说明为什么只能使用两个计数器实现。
template<typename T>
class single_ref_stack {
public:single_ref_stack():head(nullptr) {}~single_ref_stack() {//循环出栈while (pop());}void push(T const& data);std::shared_ptr<T> pop();
private:struct ref_node {//1 数据域智能指针std::shared_ptr<T> _data;//2 引用计数std::atomic<int> _ref_count;//3 下一个节点ref_node* _next;ref_node(T const& data_) : _data(std::make_shared<T>(data_)),_ref_count(1), _next(nullptr) {}};//头部节点std::atomic<ref_node*> head;
};
上段代码是一个单引用的无锁栈实现,内容很简单不多于解释,我们只需要注意一点:计数器引用计数必须要使用原子类型,为了保证多线程修改下仍是线程安全的。
接下来实现 pop() 和 push():
void push(T const& data) {auto new_node = new ref_node(data);new_node->next = head.load();while (!head.compare_exchange_weak(new_node->next, new_node));
}
push()
函数很简单,就是将数据插入至栈链表的表头,并且调用原子操作 compare_exchange_weak
将 head
修改为 new_node
,即将 head
指针指向新插入的表头节点 new_node
。
std::shared_ptr<T> pop() {ref_node* old_head = head.load();for (;;) {if (!old_head) { // 空栈检查return std::shared_ptr<T>();}//1 只要执行pop就对引用计数+1++(old_head->_ref_count);//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, old_head->_next)) {auto cur_count = old_head->_ref_count.load();auto new_count;//3 循环重试保证引用计数安全更新do {//4 减去本线程增加的1次和初始的1次new_count = cur_count - 2;} while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));//返回头部数据std::shared_ptr<T> res;//5 交换数据res.swap(old_head->_data);//6if (old_head->_ref_count == 0) {delete old_head;}return res;}else {//7 if (old_head->_ref_count.fetch_sub(1) == 1) {delete old_head;}}}
}
该函数仅有两种返回值:空栈返回一个空的智能指针;非空栈返回一个指向弹出数据的智能指针。执行流程如下:
- 进行空栈检查,如果是空栈,直接返回空的
std::shared_ptr<T>()
,否则继续执行 - 因为节点结构体的成员增加了一个原子类型的引用计数,这里增加当前线程对节点(old_head)的引用计数(因为计数器是
std::atomic<int>
类型的整型原子变量,所以++
是原子操作),避免该节点(old_head)在操作过程中被其他线程删除 - 使用 CAS(Compare-And-Swap) 操作尝试将
head
从old_head
更新为old_head->_next
。如果成功说明当前线程成功弹出头节点,否则说明其他线程在此期间修改了head
,需要重新尝试。 - 如果判断成功,则
head
被修改为old_head->_next
。按照上一节的做法,这里需要将old_head
加入至侯删链表,但是在本节中我们需要在这里更新引用计数- 首先,读取
old_head
的引用计数(我们在 pop 函数最开始将其 +1,保证其他线程不会删除它),并将其赋值给cur_count
- 然后将引用计数减去 2(pop 函数一开始加了1,每个节点初始化的时候默认构造函数将引用计数设置为 1,一共增加了两次 1)
- 使用 CAS(Weak) 重试,保证引用计数能被安全更新到
new_count
- 引用计数修改成功之后,将弹出节点 old_head 的数据存储至
std::shared_ptr<T>
类型的智能指针中,并判断old_head
的引用计数是否为 0,如果为 0 则直接将old_head
删除,否则保留 - 最后返回存有
old_head->_data
的智能指针 res
- 首先,读取
- 如果判断失败,说明其他线程在此期间修改了 head。判断失败的线程可以理解为抢占失败的线程,因为 pop 函数的一开始会将栈表头节点的引用计数 +1,所以如果抢占失败,需要在 pop 函数结束前将引用计数 -1。如果引用 -1 之后的值为 1,则表示其他对该节点 pop 的线程已经结束,当前线程是最后引用此指针的线程,我们需要在该线程中将 old_head 删除。
但是该函数存在隐含的风险:
- 风险1:假设线程 1 和线程 2 同时调用pop函数,并已经读取了head 的值,现在准备将 old_head 的引用计数 +1,但线程 2 率先完成了pop函数操作,将引用计数
_ref_count
更新为 0 并删除了old_head
。此时,线程 1 恰好继续执行第 1 步代码,却尝试访问已经被删除的old_head
,因此导致程序崩溃。 - 风险2:假设线程 1 和线程 2 同时调用 pop函数,并先后将
old_head
的引用计数 +1(此时old_head
的引用计数总计为 3),假如线程 1 率先通过 if 的判断,将head
成功更新为old_head->next
;而线程 2 会因为 CAS 会读取到head
最新修改的值,即old_head->next
,因为head
最新的值和old_head
不同,所以会将old_head
修改为old_head->next
,线程 2 走进 else 分支之后判断的old_head->ref_count
其实是old_head
修改后的引用计数,即old_head->next->ref_count
,为1(只有初始化的时候才 +1,其他线程没有对它的引用计数进行操作),从而会将其删除。其他线程如果要弹出 old_head->next 的时候会发现节点已经不存在,报错。流程如下图所示:
- 假设线程 1 和线程 2 同时调用 pop函数,并先后将
old_head
(node1
)的引用计数 +1(此时old_head
的引用计数总计为 3),如上图中的流程1 - 假如线程 1 率先通过 if 的判断,将
head
成功更新为old_head->next
,即node2
,如上图的流程2 - 线程 2 会因为 CAS 会读取到
head
最新修改的值,即old_head->next
(node2
),因为head
最新的值和old_head
不同,所以会将old_head
修改为和 head 相同的值old_head->next
,如上图的流程3 - 因为线程 2 没通过 if 的判断条件,从而走进了 else 分支,else分支中会执行
old_head->_ref_count.fetch_sub(1) == 1
操作,此时old_head
指向节点的引用计数是 1。注意,fetch_sub
将_ref_count
减 1 后返回的是减之前的值,即1,而不是减之后的值 0,所以这里虽然将 _ref_count 从 1 减为 0 ,但是 if 条件仍然会判断成功,从而将 old_head 删除 - 假如线程 3 在线程 2 执行完之后调用了 pop 函数想要将
node2
弹出,按正常流程来走的话,会读取head
指向的节点,即node2
,但因为node2
已经被线程 2删除,所以这里在读取head.load()
的时候会报错,此时head
指向空指针nullptr
,如下图所示:
解决方法就是将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合。
将原本数据结构体 ref_node
分解为 ref_node
和 node
两个结构体,前者用于存储引用计数,后者用于存储数据。
struct node {//1 数据域智能指针std::shared_ptr<T> _data;//2 下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}
};
struct ref_node {// 引用计数std::atomic<int> _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}
};
如果将 head
存储为指针类型而不是副本类型:
std::atomic<ref_node*> head; // 头节点
注意:这里的头节点存储的类型是
ref_node*
而不是node*
。
那么 pop()
函数需修改如下:
std::shared_ptr<T> pop() {//0 处ref_node* old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node* new_head;do {new_head = old_head;new_head->_ref_count += 1; // 7} while (!head.compare_exchange_weak(old_head, new_head));//4 确保 head == old_head == new_head old_head = new_head;auto* node_ptr = old_head->_node_ptr; // 获取当前节点的 node(存储数据的结构体)if (node_ptr == nullptr) { // 判断空栈return std::shared_ptr<T>();}//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {// 要返回的值std::shared_ptr<T> res;//5 交换智能指针res.swap(node_ptr->_data);//6 增加的数量int increase_count = old_head->_ref_count.fetch_sub(2);//3 判断仅有当前线程持有指针则删除if (increase_count == 2) {delete node_ptr;}return res;}else {if (old_head->_ref_count.fetch_sub(1) == 1) {delete node_ptr;}}}
}
- 第一个 while 是为了在多线程下安全地更新头节点的引用计数
- 因为
head
、old_head
、new_head
都是ref_node
类型的结构体,这里必须使用auto* node_ptr = old_head->node_ptr
获取和当前引用计数匹配的数据节点,使用该节点判断是否存在内容;而head
的下一个节点并不是head ->node_ptr
,而是head ->node_ptr->next
- 第二个 while 是为了更新 head 的值,将表头节点从链表中删除(从链表中删除而不是从内存中删除)。
- 如果
head
更新成功,则将old_head ->node_ptr
弹出,而不是将old_head
弹出,old_head
表示的仅仅只是引用计数而不是数据。 int increase_count = old_head->_ref_count.fetch_sub(2)
是为了获取引用计数增加的数量(ref_count.fetch_sub(2)
会将ref_count
的值减去 2 并返回_ref_count
的旧值,而不是减去 2 后的新值),如果确实只增加了 2,那么表示当前线程是弹出节点的唯一持有者,可以直接删除。
如果我们将
head
存储为指针类型而不是副本类型,那么上面的代码有以下风险:
-
风险1:和上面引用计数和数据耦合为一个结构体时的风险2相同,会造成误删:
假设线程 1 比线程 2 先执行,线程 1 在步骤 2 进行比较并交换操作(CAS)后,会将
head
更新为新的值。而此时线程 2 也尝试执行同样的 CAS 操作,但由于head
已被线程 1 修改,线程 2 的 CAS 操作会失败。此时,线程 2 会进入
else
分支进行处理,并更新old_head
为当前的head
值。然而,此时线程 2 中的old_head
的引用计数为 1,而old_head
指向的数据实际上是新的head
所指向的数据节点。如果线程 2 随后误将old_head
指向的数据节点删除,就会导致问题。因为线程 2 删除的实际上是当前head
所指向的节点,而它并未真正弹出节点或修改head
的值。这会导致其他线程在尝试弹出栈顶元素时访问到已被删除的节点,从而引发程序崩溃。 -
风险2:
假设线程 1 和线程 2 都执行完了第 0 步的代码,此时它们读取到的
old_head
值是相同的。线程 1 抢先一步执行,完成了第 5 步操作,并准备执行第 3 步判断逻辑时,线程 2 抢占了 CPU 开始执行第 7 步的代码。虽然线程 2 的
while
循环会检测到old_head
和当前head
不同,并因此进行重试,更新了old_head
,但在do
循环的第一次迭代中,线程 2 的old_head
和线程 1 的old_head
仍然指向同一个节点。此时,线程 2 修改了old_head
的引用计数。这会导致线程 1 在执行第 3 步时,引用计数不满足删除条件(因为线程 2 将old_head
的计数加一,因为head、old_head、new_head的类型都是指针类型,指向的是同一块内存(指针),所以会反映到线程1中。将此时为3,线程 1 加一,线程 2 加一,初始化加一),因此不会进入if
逻辑删除节点。与此同时,因为线程2在2处之后while会不断重试,线程2的head已经和old_head指向不同了,导致线程2也不会回收old_head内部节点指向的数据。最终,
old_head
节点中的数据既没有被线程 1 删除,也没有被线程 2 回收,导致内存泄漏。问题根源在于,多个线程对
old_head
的引用计数存在竞争,但删除逻辑对引用计数的操作未能同步,导致某些节点被遗漏回收。 -
一种和风险2相似但是正常的情况:
假设线程 1 和线程 2 都执行完了第 0 步的代码,此时两者读取的
old_head
值是相同的。接下来,线程 1 比线程 2 先获得 CPU 时间片并继续执行,而线程 2 因未抢占到 CPU 时间片停在了第 1 步。线程 1 按照顺序继续执行,最终到达第 3 步并将
node_ptr
所指向的节点删除。同时,head
已更新为新的栈顶元素,也就是old_head
的下一个节点。这时,线程 2 抢占到了 CPU 时间片,继续执行第 1 步的代码,并在第一个 while 循环中因为 CAS 失败将
old_head
更新为当前head
的值。此时,因为new_head
是指针,所以对new_head
的修改能反映到old_head
本身,old_head
的引用计数也增加了 1,变为 2,但它实际指向的是下一个节点(即新的栈顶)。在这种情况下,线程 2 仍会进入
if
条件(不是进入else
分支),对新的old_head
节点执行删除操作。由于此时的old_head
和引用计数逻辑匹配,这种情况是正常的,不会引发错误。
我们总结如下:
-
在实现引用计数内存回收机制时,应尽量避免直接存储指针,因为存储指针会带来多个线程同时操作同一块内存的风险。我们这里需要将 head 的类型从
std::atomic<ref_node*>
修改为std::atomic<ref_node>
,使用副本类型而不是指针类型。 -
引入一个专门用于记录减少引用计数的计数器。为了确保在多个线程中修改该计数器的安全性,可以将其设计为
node
类的成员变量。由于node
类的实例是通过指针存储在栈的节点中的,多个线程能够安全地访问和修改它。
通过将引用计数分为“增加”和“减少”两部分,并将减少的计数器放入 node
类中,我们能够更清晰地追踪各线程对节点的引用情况,从而避免因引用计数不准确导致的内存泄漏或竞争问题。即双引用计数器版本。
- 一个节点是否可以被回收,取决于其整体引用计数(增加的引用计数加减少的引用计数)是否为 0。只有当引用计数为 0 时,节点才可以被安全地删除。
2.2 双引用计数器无锁栈
有两种双引用计数器无锁栈的实现方式,一种是博主恋恋风辰的实现方式:将增加计数器放在 ref_node 结构体中,将减少计数器放在 node 结构体中;另一种是C++并发编程书中的实现方式,使用了内存序进行了优化,具体实现相同,只不过后者将一些功能分离成了单独的函数,并通过内存序增加了效率。后者我们在下一节学习,本节只学习第一种。
#pragma once
#include <atomic>template<typename T>
class single_ref_stack {
public:single_ref_stack(){}~single_ref_stack() {//循环出栈while (pop());}void push(T const& data) {auto new_node = ref_node(data);new_node._node_ptr->_next = head.load();while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));}std::shared_ptr<T> pop() {ref_node old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node new_head;do {new_head = old_head;new_head._ref_count += 1;} while (!head.compare_exchange_weak(old_head, new_head));old_head = new_head;auto* node_ptr = old_head._node_ptr;if (node_ptr == nullptr) {return std::shared_ptr<T>();}//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {//要返回的值std::shared_ptr<T> res;//交换智能指针res.swap(node_ptr->_data);//增加的数量int increase_count = old_head._ref_count - 2;if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {delete node_ptr;}return res;}else {if (node_ptr->_dec_count.fetch_sub(1) == 1) {delete node_ptr;}}}}private:struct ref_node;struct node {//1 数据域智能指针std::shared_ptr<T> _data;//2 下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}// 减少引用计数器std::atomic<int> _dec_count;};struct ref_node {// 增加引用计数int _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}};//头部节点std::atomic<ref_node> head;
};
针对单引用计数器提出的结论,我们做如下修改:
-
增加一个减少计数器,用于记录引用计数减少的次数,存储至 node 结构体中。因为 head 存储的是
std::atomic<ref_node>
类型,所以_ref_count
的类型是int
而不用是std::atomic<int>
,ref_node
的数据只需每个线程自己看到就行,不用分享给其他线程。因为
_ref_count
的类型是 int,所以其他线程没办法看到,为了让其他线程看到ref_count
的值,我们在 pop 内部将 _ref_count - 2 的值加到了_dec_count
上面,dec_count
是原子变量,其他线程可以看到。而且我们通过CAS可以修改head的值,因为head是原子类型,所以head的引用计数也会被其他线程看到。一共有两种方式。
struct ref_node;struct node {//1 数据域智能指针std::shared_ptr<T> _data;//2 下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}//减少的数量std::atomic<int> _dec_count;};struct ref_node {// 引用计数int _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}};
- 将
head
的类型从std::atomic<ref_node*>
修改为std::atomic<ref_node>
:
std::atomic<ref_node> head;
push()
函数的实现如下:
void push(T const& data) {auto new_node = ref_node(data);new_node._node_ptr->_next = head.load();while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}
pop()
函数的实现如下:
std::shared_ptr<T> pop() {ref_node old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node new_head;//2do {new_head = old_head;new_head._ref_count += 1;} while (!head.compare_exchange_weak(old_head, new_head));old_head = new_head;//3auto* node_ptr = old_head._node_ptr;if (node_ptr == nullptr) {return std::shared_ptr<T>();}//4 比较head和old_head相等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {//要返回的值std::shared_ptr<T> res;//交换智能指针res.swap(node_ptr->_data);//5 增加的数量int increase_count = old_head._ref_count - 2;//6 if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {delete node_ptr;}return res;}else {//7if (node_ptr->_dec_count.fetch_sub(1) == 1) {delete node_ptr;}}}
}
- 当多个线程并发调用
pop
时,可能会有线程在代码的第 2 处不断重试。这种重试的原因是head
和old_head
之间的某些字段(如引用计数或节点node地址)可能已经被其他线程更新。但无论如何,head
使用的是副本存储(std::atomic<ref_node>
),即使重试增加了引用计数,也不会影响到其他线程,因为每个线程操作的都是自己的old_head
副本。 - 在代码的第 3 处,我们从
old_head
中取出node_ptr
,用来保存指向节点的地址。这样,后续操作中可以安全地减少该节点的引用计数(比如 6 和 7 处的node_ptr->_dec_count.fetch_add(x)
)。因为多个线程可能同时操作同一个node_ptr
指向的节点,所以引用计数使用了原子变量,以确保多个线程对其修改时相互可见且线程安全。 - 在第 4 处,我们通过比较交换(
compare_exchange_strong
)操作来判断old_head
和head
是否一致。因为 head 、new_head、old_head 存储的是副本类型,所以多个线程看到的old_head
的值可能不一样(因为每个线程的old_head
都是独立的副本,互不干涉),但我们能保证仅有一个线程进入if逻辑,进入的线程就是old_head和head匹配的那个。 如果一致,说明当前线程成功抢占操作权,进入if
逻辑;如果不一致,说明head
已被其他线程更新,当前线程无法进入if
,需要进行后续的else
逻辑处理。值得注意的是,即使多个线程同时看到不同的old_head
,也只有一个线程能进入if
条件,其它线程都会失败。 - 在第 5 处,我们已经确定当前线程需要处理的
node_ptr
节点。此时,我们定义了res
,用来保存弹出的数据。在调整引用计数时,先减去 2,其中 1 表示当前线程自身的操作,另 1 表示 初始化的 1。接着计算其他并发线程操作该节点的数量,存储在increase_count
中。最终,是否删除节点取决于增加和减少的引用计数之和是否为 0。 - 在第 6 处,我们使用
fetch_add
操作来调整_dec_count
,并返回操作前的值。如果返回值为负的increase_count
,说明当前线程是唯一操作该节点的线程(因为没有其他线程增加引用计数),此时可以安全删除该节点。如果返回值不是负的increase_count
,则说明还有其他线程正在操作此节点,不能删除。
示例:
假设有两个线程 t1
、t2
执行 pop()
函数,假如线程 t1
先于线程 t2
执行 2 处代码:
head
的类型是std::atomic<ref_node>
,而new_head
和old_head
的类型是ref_node
,所以在线程t1
和t2
中的old_head
和new_head
互相独立互不影响。- 线程
t1
将属于自身线程的old_head
和new_head
副本的引用计数 +1,并将head
中的数据修改为和old_head
相同的数据内容,head
的增计数器_ref_count
值此时为 2。 - 当线程
t1
执行完 2 处代码后,线程t2
开始执行,此时因为 CAS 机制使得线程t2
专属的old_head
中的增引用计数器被修改为和head
相同的内容,即 2。因为head
的原子类型,所以线程t1
对head
的修改会被线程t2
看到,此时线程t2
看到的head
的增引用计数器的值为 2,在线程t2
第二次 CAS 重试之后,会将old_head
的增计数器 +1,从 2 增到 3。(所以虽然_ref_count
的类型是 int,但是通过这一种方式可以间接的让其他线程看到)。
假如线程 t1
先于线程 t2
执行 4 处代码:
-
因为
head.load() == old_head
,所以线程t1
会进入 if 分支并将head
更新到node_ptr->_next
-
increase_count
表示除本线程对目标节点的引用外,其他线程对该目标节点的引用数量,这里 -2 是将线程t1
对目标节点的引用减去,再将初始化时对引用的初始化减一,一共减去二。increase_count
此时应该为 1,表示除了线程t1
在引用目标节点外,还有线程t2
在引用目标节点(除了将head
更新这种方式,也可以通过这种方式让其他线程看到引用计数的修改,increase_count
表示其他线程对目标节点的引用数量)
线程 t1
在执行 6 处代码之前,一共有两种可能:
-
第一种可能:
线程
t2
还未执行 7 处代码,此时_dec_count
的默认初始值为 0,线程t1
将_dec_count
加increase_count
,即加 1,并返回_dec_count
原本的值 0;此时不满足 if 的判断,即0 != -1
,所以不会删除数据节点。线程
t2
在线程t1
执行完 6 处代码之后执行 7 处代码,此时_dec_count
被线程t1
增加到了 1,然后在 if 判断中对_dec_count - 1
并返回_dec_count
初值,即 1;因为1 == 1
,表示当前仅有线程t2
引用目标节点,可以直接删除。 -
第二种可能:
线程
t2
已经执行了 7 处代码,此时_dec_count
的默认初始值为 0,线程t2
将_dec_count
减 1 ,并返回 0;因为0 != 1
,所以线程t2
不会将数据内容删除。线程
t1
在线程t2
执行完 7 处代码后执行 6 处代码,此时_dec_count
的值被线程t2
修改为 -1,然后在 if 判断中将_dec_count
加increase_count
,并返回 -1;因为 -1 == -1,此时仅有线程t1
引用了目标节点,线程t1
负责将节点删除。
head
是std::atomic<ref_node>
类型,old_head
和new_head
是ref_node
类型。head
的修改能及时反映到不同线程上(在局一致性内存序下)
3. 本节的一些理解
为什么需要两个计数器,一个是否足够?
如果只用一个计数器,我们首先要考虑把它放在哪里。直觉上,可能会想到将计数器放在节点内部,但这样是行不通的。因为计数器的目的是避免空悬指针解引用(访问已删除的节点),但如果计数器在节点内部,访问计数器本身就需要先访问节点。如果节点已经被删除,这就会导致问题(所以我们将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合,将一个结构体拆解成了两个)。
举个场景:线程 A 读取了 head
指针,并存到 local_head
中。这时线程 A 还没来得及通过 local_head
访问节点内部的计数器并增加计数,而线程 B 率先操作了节点的计数器,加了 1,然后弹出了节点,并在操作完成后让计数器减 1,发现计数变成了 0,于是删除了节点。此时,线程 A 再试图通过 local_head
访问节点时,节点已经被删除了,出现了空悬指针解引用。
因此,如果只使用一个计数器,它就不能放在节点内部,而需要放在节点外部。计数器需要和 head 指针处于同一“层级”(即数据结构体node和ref_node相互关联,我们通过ref_node->node_ptr即可访问数据内容,通过node->next即可访问下一个节点的计数器),成为所有线程都能直接操作的数据。
为了实现这一点,我们可以把计数器和 head
指针合二为一。这样,读取 head
指针和增加计数器的操作可以原子地进行,避免在这两步之间出现时间差(如果计数器和 head
是两个独立变量,操作之间无法保证原子性)。理想情况下,每当有一个指针指向 head
节点,计数器加 1;当指针不再指向该节点时,计数器减 1。但这种设计在实现时会遇到复杂问题。使用一个计数器无法实现,我们需要使用两个计数器来实现。
为什么一个计数器无法实现?
设想以下过程:
- 线程 A 加载
head
到new_head
(类型为ref_node
),如果head
没有变化,就增加new_head
的计数器,并更新head
的计数器。最后将new_head
的计数器赋值给old_head
,此时,三者的计数器都变为 2。注意,这些操作发生在线程 A 访问节点之前,因此确保节点不会被删除。 - 在线程 A 还没来得及访问节点时,线程 B 也加载了
head
到它的local_head
。线程 B 增加了计数器,此时计数器值变为 3(表明两个线程正在引用该节点,在此之前线程B会经历一次CAS,因为head
已经被线程A修改,所以线程B的old_head
会读取head
最新的值,此时线程B即可读取到线程A对引用计数修改的最新值,再+1会变为3)。接着,线程 B 将head
修改为指向下一个节点,并让head
的计数器减 2,计数器变回 1。 - 线程 B 完成操作后退出,此时节点只被线程A引用,计数器保持为 1。线程 A 继续操作,但发现
old_head
(3) 和当前的head
(1) 已不一致,放弃弹出该节点,并进入 else 分支让减计数器减 1,计数器变为 0。
问题是,上述过程仅仅是在两个计数器的共同配合下才会实现的。如果仅有一个计数器,那么计数器永远不会减到 0,导致节点永远不会被删除。这是因为不同线程对计数器的操作是独立的,无法相互感知。
为什么需要两个计数器?
为了解决这个问题,计数器必须与节点绑定,且每个节点只能有一个全局计数器,就像 shared_ptr
的设计一样。然而,shared_ptr
无法实现对 head
的无锁、原子性操作,因此必须引入第二个计数器。
两个计数器的设计允许:
- 一个计数器跟随
head
,用于线程安全地读取和操作(增计数器)。 - 另一个计数器绑定到节点,用于判断节点是否可以安全删除(减计数器)。
这样的设计虽然复杂,但能确保计数的正确性和节点的安全性。
相关文章:
并发编程(19)——引用计数型无锁栈
文章目录 十九、day191. 引用计数2. 代码实现2.1 单引用计数器无锁栈2.2 双引用计数器无锁栈 3. 本节的一些理解 十九、day19 上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点: 第一种方式…...
Santa Claus 2 (st表的lower_bound用法)
题目链接:Santa Claus 2 #pragma GCC optimize(2) #include <bits/stdc.h> #define int long long #define fi first #define se second #define all(v) v.begin(),v.end() using namespace std; const int inf 0x3f3f3f3f3f3f3f; const int N 2e55; int …...
Reed-Muller(RM)码之编码
点个关注吧! 看了一些中文的博客,RM码没有很详细的资料,所以本文尝试给出推导原理。 推导 RM码由 ( r , m ) ( r , m ) (r,m)两个参数定义,记作 R M ( r , m ) RM(r,m) RM(r,m)。其中满足 0 ≤ r ≤ m 0 ≤ r ≤ m 0≤r≤m,含义为: 码长: n = 2 m n=2^m n=2m 维数:…...
新世纪的语言智能:GPT-5技术革新与市场前景展望
目录 引言 第一章:GPT-4的成就与局限 1.1 GPT-4的成功 1.2 GPT-4的局限性 第二章:对GPT-5技术革新的预测 2.1 增强的上下文理解能力 2.2 多模态能力的提升 2.3 创造力与多样性的增强 2.4 常识性知识与伦理性的提升 第三章:GPT-5的市…...
国高材服务 | 高分子结晶动力学表征——高低温热台偏光显微镜
众所周知,聚合物制品的实际使用性能(如光学透明性、硬度、模量等)与材料内部的结晶形态、晶粒大小及完善程度有着密切的联系,因此,对聚合物结晶形态等的研究具有重要的理论和实际意义。 随着结晶条件的不用,…...
python+PyPDF2实现PDF的文本内容读取、多文件合并、旋转、裁剪、缩放、加解密、添加水印
目录 读取内容 合并文件 旋转 缩放 裁剪 加密和解密 添加水印 安装:pip install PyPDF2 -i https://pypi.tuna.tsinghua.edu.cn/simple 读取内容 from PyPDF2 import PdfReader, PdfMerger, PdfWriterdef read_pdf(pdf_path):pdf_reader PdfReader(pdf_p…...
蓝桥杯物联网开发板硬件组成
第一节 开发板简介 物联网设计与开发竞赛实训平台由蓝桥杯大赛技术支持单位北京四梯科技有限公司设计和生产,该产品可用于参加蓝桥杯物联网设计与开发赛道的竞赛实训或院校相关课程的 实践教学环节。 开发板基于STM32WLE5无线微控制器设计,芯片提供了25…...
idea2024创建JavaWeb项目以及配置Tomcat详解
今天呢,博主的学习进度也是步入了JavaWeb,目前正在逐步杨帆旗航,迎接全新的狂潮海浪。 那么接下来就给大家出一期有关JavaWeb的配置教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正…...
【蓝桥杯每日一题】分糖果——DFS
分糖果 蓝桥杯每日一题 2024-12-24 分糖果 DFS 题目描述 两种糖果分别有 9 个和 16 个,要全部分给 7 个小朋友,每个小朋友得到的糖果总数最少为 2 个最多为 5 个,问有多少种不同的分法。糖果必须全部分完。 只要有其中一个小朋友在两种方案中…...
矩阵在资产收益(Asset Returns)中的应用:以资产回报矩阵为例(中英双语)
本文中的例子来源于: 这本书,网址为:https://web.stanford.edu/~boyd/vmls/ 矩阵在资产收益(Asset Returns)中的应用:以资产回报矩阵为例 在量化金融中,矩阵作为一种重要的数学工具,被广泛用于描述和分析…...
Jimureport h2命令执行分析记录
首先找testConnection接口,前面进行了jimureport-spring-boot-starter-1.5.8.jar反编译查找,接口找到发现请求参数是json var1是JmreportDynamicDataSourceVo类型,也就是如上图的dbSource,根据打印的结果可以知道这里是local cac…...
1114 Family Property (25)
This time, you are supposed to help us collect the data for family-owned property. Given each persons family members, and the estate(房产)info under his/her own name, we need to know the size of each family, and the average area and n…...
OpenEuler 22.03 安装 flink-1.17.2 集群
零:规划 本次计划安装三台OpenEuler 22.03 版本操作系统的服务器,用于搭建 flink 集群。这里使用flink1.17.2 的原因,是便于后续与springboot的整合 服务器名IP地址作用其他应用flink01192.168.159.133主jdk11、flink-1.17.2flink02192.168.…...
SQL—leetcode—175. 组合两个表
175. 组合两个表 表: Person -------------------- | 列名 | 类型 | -------------------- | PersonId | int | | FirstName | varchar | | LastName | varchar | -------------------- personId 是该表的主键(具有唯一值的列)。 该表包含一些人的 ID 和…...
html 中 表格和表单的关系与区别
在 HTML 中,表格 (<table>) 和表单 (<form>) 是两种常用于展示数据和收集用户输入的元素。它们具有不同的功能和结构。以下是关于这两者的详细介绍: 1. HTML 表格(<table>) 表格用于展示结构化的数据…...
Android14 OTA升级速度过慢问题解决方案
软件版本:Android14 硬件平台:QCS6115 问题:OTA整包升级接近20min,太长无法忍受。 该问题为Android高版本的虚拟AB分区压缩技术所致,其实就是时间换空间,个人推测AB分区压缩会节约硬件存储空间࿰…...
Jetson xavier 刷机安装教程
在对Jetson进行刷机过程,浏览了很多的相关教程,大部分教程并不全,而且按照步骤执行会出现许多奇奇怪怪的错误,为了避免大家踩坑,这里给出了完整的解决方法,希望能够提供帮助! 首先大家需要准备…...
Hadoop集群(HDFS集群、YARN集群、MapReduce计算框架)
一、 简介 Hadoop主要在分布式环境下集群机器,获取海量数据的处理能力,实现分布式集群下的大数据存储和计算。 其中三大核心组件: HDFS存储分布式文件存储、YARN分布式资源管理、MapReduce分布式计算。 二、工作原理 2.1 HDFS集群 Web访问地址&…...
芯科科技蓝牙、Wi-Fi、Wi-SUN产品广获业界认可,技术创新引领行业潮流
物联网领军企业领跑未来无线开发平台发展 2024年,Silicon Labs(亦称“芯科科技“,NASDAQ:SLAB)在物联网(IoT)领域持续深耕,凭借创新的企业发展理念与实践、行业领先的技术与产品&am…...
C语言——数据在内存中的存储
目录 前言 一数据类型 类型归类 二整形在内存中的存储 原反补码 大小端 相关练习题 三浮点数在内存中的储存 浮点数储存规则 前言 只有取学习数据在内存中的存储,我们在以后才能定义好(用好)各种类型的数据! 一数据类型…...
后端-redis
Redis RedisString类型String类型的常用命令 Hash类型Hash类型的常用命令 List类型List类型的常用命令 Set类型Set类型的常用命令 SortedSet类型SortedSet类型的常用命令 Redis序列化缓存更新策略缓存穿透缓存雪崩缓存击穿 Redis Redis是一个key-value的数据库,key…...
sqoop,flume草稿
连xftp传sqoop压缩包到/opt/soft 目录下 cd opt/soft/ tar -zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz mv sqoop-1.4.7.bin__hadoop-2.6.0 sqoop cd sqoop/conf/ cp sqoop-env-template.sh sqoop-env.sh vi sqoop-env-sh export HADOOP_COMMON_HOME/opt/soft/hadoop expo…...
UE5 渲染管线 学习笔记
兰伯特 SSS为散射的意思 带Bias的可以根据距离自动切换mip的卷积值 而带Level的值mipmaps的定值 #define A8_SAMPLE_MASK .a 这样应该就很好理解了 这个只采样a通道 带Level的参考上面的 朝左上和右下进行模糊 带Bias参考上面 随机数 4D 3D 2D 1D HLSL内置UV HLSL内置鼠标坐…...
线程池使用不当导致线程死锁
线程池使用不当导致线程死锁 问题代码问题分析 问题代码 在项目开发中,为了支持并发场景,减少资源开销,通常会使用公共线程池,即预先创建一个线程池,需要并发时都将任务提交该线程池中。类似如下代码 public class T…...
SpringBoot状态机
Spring Boot 状态机(State Machine)是 Spring Framework 提供的一种用于实现复杂业务逻辑的状态管理工具。它基于有限状态机(Finite State Machine, FSM)的概念,允许开发者定义一组状态、事件以及它们之间的转换规则。…...
细说STM32F407单片机轮询方式读写SPI FLASH W25Q16BV
目录 一、工程配置 1、时钟、DEBUG 2、GPIO 3、SPI2 4、USART6 5、NVIC 二、软件设计 1、FALSH (1)w25flash.h (2) w25flash.c 1)W25Q16基本操作指令 2)计算地址的辅助功能函数 3)器…...
HTMLCSS:惊!3D 折叠按钮
这段代码创建了一个具有 3D 效果和动画的按钮,按钮上有 SVG 图标和文本。按钮在鼠标悬停时会显示一个漂浮点动画,图标会消失并显示一个线条动画。这种效果适用于吸引用户注意并提供视觉反馈。按钮的折叠效果和背景渐变增加了页面的美观性。 演示效果 HT…...
如何更好的进行时间管理
先想一下我们想要做的事情,然后拿出Excel表格将这些事情记录下来,我们把它叫做任务对这些任务按照重要性,紧急程度进行排序,拿出表格中的前六个任务,就是今天要做的任务新建另一张excel表格,表格的一列为时…...
我在华为的安全日常
在华为工作了数年后,我养成了一个习惯:每次离开座位,即便是去卫生间,我也会条件反射地锁屏电脑。晚上回到家,躺在床上,脑海中偶尔会闪过一丝疑虑:办公室的门窗是否关好?虽然这种担忧…...
for媒体打破智能座舱体验同质化,斑马智行荣获“华舆奖”优秀创
打破智能座舱体验同质化,斑马智行荣获“华舆奖”优秀创新生态伙伴 12月12日,消费者洞察与市场研究机构J.D. Power|君迪与同济大学 HVR Lab(人车关系实验室)共同发布了 2024 中国智能座舱的研究洞察,并公布了华舆奖中国…...
自己搭建专属AI:Llama大模型私有化部署
前言 AI新时代,提高了生产力且能帮助用户快速解答问题,现在用的比较多的是Openai、Claude,为了保证个人隐私数据,所以尝试本地(Mac M3)搭建Llama模型进行沟通。 Gpt4all 安装比较简单,根据 G…...
芯片Tapeout power signoff 之IR Drop Redhawk Ploc文件格式及其意义
数字IC后端工程师在芯片流程最后阶段都会使用redhawk或voltus进行设计的IR Drop功耗signoff分析。必须确保静态,动态ir drop都符合signoff标准。 在做redhawk ir drop分析前,我们需要提供一个redhawk ploc供电点坐标。 数字IC设计后端实现前期预防IR D…...
[机器学习]sklearn入门指南(1)
简介 scikit-learn(简称sklearn)是一个开源的Python机器学习库,它提供了简单而高效的工具用于数据挖掘和数据分析,并且拥有一个活跃的开发社区。它建立在NumPy、SciPy和matplotlib这些科学计算库之上,旨在提供一致且可…...
GitCode 光引计划投稿 | GoIoT:开源分布式物联网开发平台
GoIoT 是基于Gin 的开源分布式物联网(IoT)开发平台,用于快速开发,部署物联设备接入项目,是一套涵盖数据生产、数据使用和数据展示的解决方案。 GoIoT 开发平台,它是一个企业级物联网平台解决方案ÿ…...
【R语言遥感技术】“R+遥感”的水环境综合评价方法
R语言在遥感领域中是一个强大的工具,它提供了一系列的功能和优势,使得遥感数据的分析和应用更加高效和灵活。以下是R语言在遥感中的具体应用: 数据处理:R语言可以处理和清洗遥感数据,包括数据转换、滤波处理、去噪和数…...
QT--信号与槽机制
什么是信号与槽? 在 Qt 中,信号与槽是一种用于对象间通信的机制。它使得一个对象可以通知其他对象某个事件的发生,而不需要直接知道这些对象的具体实现。这种机制非常适合事件驱动的编程模型,如用户界面交互。 1. 信号ÿ…...
Windbg常用命令
禁止垃圾信息 ed nt!Kd_STORMINIPORT_Mask 0 ed nt!Kd_SXS_Mask 0 ed nt!Kd_FUSION_Mask 0 命令大全: 命令 - Windows drivers | Microsoft Learn .reload /f 重新加载符号表 常用命令 继续执行: g单步过/步入: p, t退出: q查看调用堆栈: k, kb列出模块: lm, lml设置断…...
YOLO11改进-模块-引入多分支卷积InceptionDepthwiseConvolution(IDC) 解决多尺度、小目标
YOLOv11 的设计目标是通过高效的网络结构,在保证准确率的前提下,最大化推理速度。传统卷积操作虽然能够捕获局部信息,但在处理大规模场景或复杂背景时,较小的感受野可能导致细节信息不足,影响模型的检测能力。为了解决…...
国标GB28181-2022平台EasyGBS:安防监控中P2P的穿透方法
在安防监控领域,P2P技术因其去中心化的特性而受到关注,尤其是在远程视频监控和数据传输方面。P2P技术允许设备之间直接通信,无需通过中央服务器,这在提高效率和降低成本方面具有明显优势。然而,P2P技术在实际应用中也面…...
C++软件设计模式之外观(Facade)模式
C软件设计模式中的外观(Facade)模式 1. 外观模式的定义 外观模式(Facade Pattern)是一种结构型设计模式,它为一个复杂的子系统提供一个简化的接口。外观模式通过一个统一的接口来访问子系统的多个组成部分࿰…...
Spring Boot 项目创建
创建一个新项目: 打开 Spring Initializr 网址:https://start.spring.io/ ,然后创建一个新项目: springboot3.3.5_jdk17: Project(Maven)编程语言(Java 17)Spring Boo…...
SharpDX 从入门到精通:全面学习指南
摘要: 本文旨在为想要深入学习 SharpDX 的开发者提供一份全面的指南。从 SharpDX 的基础概念入手,逐步深入探讨其在不同场景下的应用,包括图形渲染、音频处理等,并结合大量详细的代码案例帮助读者更好地理解和掌握 SharpDX 的使用…...
【Web】2024“国城杯”网络安全挑战大赛决赛题解(全)
最近在忙联通的安全准入测试,很少有时间看CTF了,今晚抽点时间回顾下上周线下的题(期末还没开始复习😢) 感觉做渗透测试一半的时间在和甲方掰扯&水垃圾洞,没啥惊喜感,还是CTF有意思 目录 Mountain ez_zhuawa 图…...
操作系统(24)提高磁盘I/O速度的途径
前言 操作系统提高磁盘I/O速度的途径多种多样,这些途径旨在减少磁盘访问的延迟和开销,提高数据传输的效率。 一、磁盘高速缓存(Disk Cache) 磁盘高速缓存是一种在内存中为磁盘数据设置的缓冲区,用于存储磁盘中某些盘块…...
en3d 部署笔记
目录 依赖项: Nvdiffrast 编译代码和frpc_linux_amd64 下载地址: tiny-cuda-nn 安装 ICON算法库依赖 icon依赖 kaolin infer_normal_fixpose 解决 报错了,推荐的安装方法: kaolin测试: ICON依赖项 requirements.txt 改进 voxelize_cuda 安装ok 运行后: 修改代…...
c++类型判断和获取原始类型
std::traits学习 类型判断和退化(获取原始类型)的原理就是利用模板的特例化。根据调用模板的特例化,在特例化模板中实现判断的逻辑或者退化的逻辑。 一、类型判断 判断整型数据的模板类 #include <iostream> namespace zk {templa…...
医疗行业 UI 设计系列合集(一):精准定位
在当今数字化时代,医疗行业与信息技术的融合日益紧密,UI 设计在其中扮演着至关重要的角色。精准定位的 UI 设计能够显著提升医疗产品与服务的用户体验,进而对医疗效果和患者满意度产生积极影响。 一、医疗行业 UI 设计的重要性概述 医疗行业…...
EasyExcel停更,FastExcel接力
11月6日消息,阿里巴巴旗下的Java Excel工具库EasyExcel近日宣布,将停止更新,未来将逐步进入维护模式,将继续修复Bug,但不再主动新增功能。 EasyExcel以其快速、简洁和解决大文件内存溢出的能力而著称,官方…...
java agent的使用【通俗易懂版】
一、静态代理Agent 1.生成Agent的jar包 (1)创建Agent项目,引入javassist.jar包 (2)编写premain方法 import java.lang.instrument.Instrumentation;public class Agent1 {public static void premain(Stri…...
010 Qt_输入类控件(LineEdit、TextEdit、ComboBox、SpinBox、DateTimeEdit、Dial、Slider)
文章目录 前言一、QLineEdit1.简介2.常见属性及说明3.重要信号及说明4.示例一:用户登录界面5.示例二:验证两次输入的密码是否一致显示密码 二、TextEdit1.简介2.常见属性及说明3.重要信号及说明4.示例一:获取多行输入框的内容5.示例二&#x…...