openharmony5.0.0中C++公共基础类测试-线程相关(一)
C++公共基础类测试及源码剖析
延续传统,show me the code,除了给出应用示例还重点分析了下openharmony中的实现。
简介
openharmony中提供了C++公共基础类库,为标准系统提供了一些常用的C++开发工具类,本文分析其实现,并给出使用示例,库主要涉及内容如下:
- 文件、路径、字符串相关操作的能力增强接口
- 读写锁、信号量、定时器、线程增强及线程池等接口
- 安全数据容器、数据序列化等接口
- 各子系统的错误码相关定义
环境
系统:openharmony5.0.0
部署:参照源码中的说明文档(quickstart-pkg-3568-helloworld.md)
本节重点说明关于线程相关的内容。
源码目录
commonlibrary/c_utils
├─ base
│ ├── include # 对各子系统开放的接口头文件
│ ├── src # 源文件
│ └── test # 测试代码
├─ Docs├── en # 英文文档└── zh-cn # 中文文档
线程相关
概述
包含强化线程能力、线程池、线程安全Map、线程安全栈与队列、线程安全阻塞队列
- 强化线程能力:提供例如启动线程、同步通知、异步通知等功能的接口
- 线程池:提供线程安全的线程池功能。线程安全是对于线程池本身而非池内线程而言的。 维护一个任务队列,一个线程组。使用者向任务队列中注册需要进行的任务,线程组执行任务队列中的任务。
- 线程安全Map:提供了一个线程安全的map实现。SafeMap在STL map基础上封装互斥锁,以确保对map的操作安全。
- 线程安全栈与队列:线程安全队列,是在dequeue的基础上封装std::lock_guard,以此实现线程的相关操作。根据继承SafeQueueInner抽象类,并对dequeue的pop方法的重写,可以实现SafeStack和SafeQueue的相关方法。
- 线程安全阻塞队列:线程安全阻塞队列SafeBlockQueue类,提供阻塞和非阻塞版的入队入队和出队接口,并提供可最追踪任务完成状态的的SafeBlockQueueTracking类。
强化线程能力
接口说明
OHOS::Thread
返回值类型 | 名称 |
---|---|
Thread() 构造函数, 构造一个Thread对象,但并不会启动线程。 | |
virtual ~Thread() 析构函数 | |
ThreadStatus | Start(const std::string& name, int32_t priority = THREAD_PROI_NORMAL, size_t stack = 0); 创建并启动一个子线程,循环执行Run(),当Run()返回false或通知退出时停止。 |
ThreadStatus | NotifyExitSync() 同步通知线程退出,即阻塞式停止子线程。 当前线程被阻塞,等待子线程结束。 |
void | virtual NotifyExitAsync() 异步通知线程退出,即子线程退出与否不阻塞当前线程。 通知子线程停止,当前线程继续运行。 |
bool | virtual ReadyToWork() 判断线程是否已经准备就绪,始终返回true。 |
bool | IsExitPending() const 获取线程退出待定标志位。 |
bool | IsRunning() const 判断线程是否在运行 |
pthread_t | GetThread() const 获取线程ID |
类图关系
源码剖析
从接口和类图中我们可以看到强化线程能力这一节,主要是提供例如启动线程、同步通知、异步通知等功能的接口,此库主要是通过thread类进行接口的封装,客户需要实现一个run的接口函数,此函数由库内部调用。异步通知是使用c++的一个线程同步的工具类
condition_variable
来实现的,其他的接口由对应的标志位来实现的(不再详细说)。
对condition_variable不熟悉的码友可以看下这篇介绍
启动函数调用流程如下:
创建并启动子线程的函数(start),主要通过linux系统函数pthread_create创建线程,将入口函数设置为ThreadParam类中的代理函数(Proxy),在Proxy函数中设置线程名、优先级等,并通过循环函数(ThreadStart)循环调用用户端的run函数,并实时检测更新线程状态。具体流程如下:
ThreadStatus Thread::Start(const std::string& name, int32_t priority, size_t stack)|-->status_ = ThreadStatus::OK //一些变量的初始化|-->ThreadParam para; //定义线程参数|-->para.startRoutine = ThreadStart;//初始化参数的值,由线程启动时调用|-->para.args = this;//将本线程(thread)传到参数中,作为ThreadStart的传参|-->para.name = name;//线程名称|-->para.priority = priority//线程优先级|-->bool res = CreatePThread(para, stack, &thread_)//创建线程|-->auto t = new ThreadParam;//新创建线程参数对象|-->t->startRoutine = para.startRoutine//将相关参数赋值到t对象中|-->para.args = t//将参数对象t作为ThreadParam::Proxy传参|-->para.startRoutine = reinterpret_cast<ThreadFunc>(&ThreadParam::Proxy)//强制进行类型转换|-->int result = pthread_create(&thread, &attr, reinterpret_cast<PThreadRoutine>(para.startRoutine), para.args)//核心函数将ThreadParam::Proxy作为线程的入口函数
线程创建后会触发代理函数(Proxy)如下:
static int Proxy(const ThreadParam* t)|-->(void)setpriority(PRIO_PROCESS, 0, prio);//系统函数,设置当前用户的优先级|-->prctl(PR_SET_NAME, threadName.substr(0, MAX_THREAD_NAME_LEN).c_str(), 0, 0, 0)//系统函数,设置进程名|-->t->startRoutine(t->args)//回调ThreadStart(由start函数设置)
t->startRoutine为start函数中设置的循环函数(ThreadStart),执行过程如下:
int Thread::ThreadStart(void* args)|-->循环执行以下操作|-->result = self->Run()//回调客户端重写的run函数|-->std::unique_lock<std::mutex> lk(self->lock_);|-->if ((!result) || self->exitPending_)//当退出标志为true时|-->self->cvThreadExited_.notify_all();//唤醒所有等待的线程|-->break;//线程退出循环
在同步通知线程退出时,采用了线程间同步的一种高级工具(std::condition_variable),允许线程在某些条件不满足时挂起,直到其他线程通知它们条件已经满足。
ThreadStatus Thread::NotifyExitSync()|-->std::unique_lock<std::mutex> lk(lock_);//C++标准库中的一种 RAll风格的互斥锁管理器,,它可以方便地管理互斥锁的锁定和解锁操作。在进入代码块时自动锁定 self->lock_,在离开代码块时(无论是通寸正常执行结束还是通过异常退出),自动解锁 self->lock从而确保互斥锁的正确使用和避免死锁情况的发生|-->exitPending_ = true;//退出标志|-->while (running_) {cvThreadExited_.wait(lk);}//会释放互斥锁,然后在等待期间重新获取它,|-->exitPending_ = false;//退出标志|-->return status_;
由ThreadStart函数中已确认线程退出标志(exitPending_)后通过cvThreadExited_.notify_all()唤醒,此函数再进行返回。
应用示例
本案例完成如下工作:
- 主线程每1秒打印子进程的相关信息。主线程在第5秒时,关闭子线程运行。
- 创建1个子线程,每隔1秒打印当前运行次数。
// 时间标记量
static const int FORMAX = 5;// 自定义类,继承OHOS::Thread,Run()重新写
class ThreadSample : public OHOS::Thread {
public:ThreadSample() : OHOS::Thread::Thread(){}~ThreadSample(){}
protected:bool Run() override;
};// 程序运行代码,每隔1秒打印相关信息
bool ThreadSample::Run(){static int current = 0;current++;cout << "Run(): current = " << current << endl;sleep(1);return true;
}int main(int argc, char **argv){ThreadSample thread;// 启动线程thread.Start("thread sample", OHOS::THREAD_PROI_NORMAL, 0);// 打印线程相关信息for (int i = 0; i < (2 * FORMAX); i++) {cout << "main: i = " << i << endl;cout << " ThreadId = " << thread.GetThread() << endl;cout << " ReadyToWork = " << thread.ReadyToWork() << endl;cout << " IsExitPending = " << thread.IsExitPending() << endl;cout << " IsRunning = " << thread.IsRunning() << endl;if (i == (1 * FORMAX)) {// 异步停止线程,不用等待,直接返回cout << "main: NotifyExitAsync" << endl;thread.NotifyExitAsync();}sleep(1);}thread.NotifyExitSync();// 等待退出return 0;
}
- 运行结果
# ./utils_thread
main: i = 0ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0
Run(): current = 1 IsRunning = 1Run(): current = main: i = 21ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0IsRunning = 1
Run(): current = 3
main: i = 2ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0IsRunning = 1
Run(): current = 4
main: i = 3ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0IsRunning = 1
Run(): current = 5
main: i = 4ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0IsRunning = 1
Run(): current = 6
main: i = 5ThreadId = 4152769824ReadyToWork = 1IsExitPending = 0IsRunning = 1
main: NotifyExitAsync
main: i = 6ThreadId = 4294967295ReadyToWork = 1IsExitPending = 1IsRunning = 0
main: i = 7ThreadId = 4294967295ReadyToWork = 1IsExitPending = 1IsRunning = 0
main: i = 8ThreadId = 4294967295ReadyToWork = 1IsExitPending = 1IsRunning = 0
main: i = 9ThreadId = 4294967295ReadyToWork = 1IsExitPending = 1IsRunning = 0
注意在第32行Run(): current = 6
之后,便不再打印current,说明此线程已经退出,所以此后打印的信息即为线程退出后的状态。通过多次启动此进程(utils_thread),可见ThreadId在IsRunning为true的状态时呈现的是动态的,在为false时是固定的,也进一步说明threadid退出后固定为INVALID_PTHREAD_T。
线程池
接口说明
- OHOS::ThreadPool
Name | |
---|---|
ThreadPool(const std::string& name = std::string()) 构造ThreadPool。为线程池内线程命名。 | |
~ThreadPool() override | |
void | AddTask(const Task& f) 向任务队列中添加一个Task。若未调用Start()则直接执行Task且不会向任务队列添加该Task. |
size_t | GetCurTaskNum() 获取当前任务数。 |
size_t | GetMaxTaskNum() const 获取最大任务数。 |
std::string | GetName() const 获取线程池命名。 |
size_t | GetThreadsNum() const 获取线程池内线程数。 |
void | SetMaxTaskNum(size_t maxSize) 设置任务队列中最大任务数。 |
uint32_t | Start(int threadsNum) 启动给定数量threadsNum的线程,执行任务队列中的任务。 |
void | Stop() 停止线程池,等待所有线程结束。 |
类关系图
ThreadPool
源码剖析
从以上类图可知线程池类(ThreadPool)继承了NoCopyable类,禁止拷贝操作,通过成员变量tasks维护一个任务队列,通过成员变量(threads)维护一个线程组。使用者向任务队列中注册需要进行的任务,线程组执行任务队列中的任务实现对任务的管理。以上主要操作涉及接口主要为Start(线程组执行任务队列中的任务)和AddTask(向任务队列中注册需要进行的任务)。
禁止拷贝操作的实现:
讨论禁止拷贝操作的实现首先要了解c++11引入的一种语法,用于显式地禁用某个函数,包括构造函数、析构函数、拷贝构造函数、赋值运算符等。通过将函数声明为 =delete
,可以显式地阻止该函数被调用,即使在类内部或者友元函数中也无法使用。
class NoCopyable {
protected: //允许子类继承,但防止直接实例化!NoCopyable() {}//构造函数virtual ~NoCopyable() {}//析构函数
private://设置为private可以确保NoCopyable类的复制构造函数和移动赋值操作符完全不可访问,包括在其子类中。这样可以更好地隐藏类的实现细节,防止意外的复制和移动操作。如果将DISALLOW_COPY_AND_MOVE设置为protected,子类可能会尝试调用这些被删除的函数,导致编译错误。将它们设置为private可以确保编译器阻止任何对这些函数的调用,包括在子类中。DISALLOW_COPY_AND_MOVE(NoCopyable);//DISALLOW_COPY_AND_MOVE为定义的宏,可展开为如下:
DISALLOW_COPY_AND_MOVE(NoCopyable)展开后如下:
NoCopyable(const NoCopyable&) = delete;NoCopyable& operator= (const NoCopyable&) = delete//=delete;用于禁用复制构造函数,这意味着任何尝试复制该对象的代码都会导致编译错误
NoCopyable(NoCopyable&&) = delete;NoCopyable& operator= (NoCopyable&&) = delete //&& 表示右值引用,意味着该构造函数可以接受一个临时对象(即即将被销毁的对象)作为参数//=delete;用于禁用移动构造函数。任何尝试通过移动构造函数来复制该对象的代码都会导致编译错误
线程组执行任务队列中的任务
线程组的执行主要通过start函数执行,传入的参数为启动的线程数量,主要执行过程如下:
uint32_t ThreadPool::Start(int numThreads)
|-->threads_.reserve(numThreads)//预留对应启动线程的空间
|-->for (int i = 0; i < numThreads; ++i) {
|-->std::thread t([this] { this->WorkInThread(); });//这个函数会一直循环,直到线程池停止|-->while (running_) {//只要是启动状态便一直循环|-->Task task = ScheduleTask();//任务调度,会将最新的任务取出来|--> while (tasks_.empty() && running_) {hasTaskToDo_.wait(lock);}//如果任务队列为空,且在运行状态时,便开启等待(hasTaskToDo_:线程间同步的一种高级工具(std::condition_variable))|-->task = tasks_.front();//取出最新的任务|-->if (maxTaskNum_ > 0)acceptNewTask_.notify_one();//通知给新任务(添加新任务时当超过最大任务数时便会wait)|-->task();//执行取出来的任务
|-->int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str());//设置线程名称
|-->threads_.push_back(std::move(t));//将新创建的线程对象t移动到threads容器中。使用std::move可以避免不必要的复制操作,提高性能。
向任务队列中注册需要进行的任务
通过AddTask接口向任务队列(tasks_)中注册任务,主要执行过程如下:
void ThreadPool::AddTask(const Task &f)
|-->if (threads_.empty())f();//如果线程组为空则直接执行任务函数
|--> while (Overloaded()) acceptNewTask_.wait(lock)//如果过载时则一直等待|-->return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_)//判读是否过载
|-->tasks_.push_back(f)//将任务函数放入任务队列中
|-->hasTaskToDo_.notify_one()//通知线程组有新的任务来了
其中hasTaskToDo_和acceptNewTask_皆为std::condition_variable类型,此类型的详细说明可参考这篇文章
应用示例
本案例完成如下工作:
- 创建1个线程池,设置该线程池内部有1024个线程空间。
- 启动5个线程。每个线程每秒打印1段字符串,10秒后停止。
// 线程执行函数
void func(const std::string &name)
{for (int i = 0; i < 10; i++) {cout << "func: " << name << " and i = " << i << endl;sleep(1);}
}int main(int argc, char **argv)
{OHOS::ThreadPool thread_poll("thread_poll_name");int max_task_num = 1024;int start_task_num = 5;string str_name;cout << "get max task num(default): " << thread_poll.GetMaxTaskNum() << endl;// 查看默认的线程池最大任务数cout << "set max task num: " << max_task_num << endl;thread_poll.SetMaxTaskNum(max_task_num);// 设置线程池的最大任务数cout << "get max task num(set): " << thread_poll.GetMaxTaskNum() << endl;// 再查看线程池最大任务数cout << "start thread: " << start_task_num << endl; // 开启启动线程thread_poll.Start(start_task_num);for (int i = 0; i < start_task_num; i++) {cout << "add task: i = " << i << endl;str_name = "thread_pool_" + to_string(i);auto task = std::bind(func, str_name);用于绑定函数和参数,不用立即执行// 添加任务到线程池中,并启动运行thread_poll.AddTask(task);sleep(1);}cout << "stop thread: start" << endl; // 等待关闭所有的线程,会等待线程池程序全部结束才返回thread_poll.Stop();cout << "stop thread: end" << endl;return 0;
}
- 执行过程
由于是多个线程同时执行所以会导致串口打印的时候反应不过来,有一些字段会稍微乱一些。
# ./utils_thread_poll
get max task num(default): 0
set max task num: 1024
get max task num(set): 1024
start thread: 5
add task: i = 0
func: thread_pool_0 and i = 0
add task: i = 1
func: thread_pool_0 and i = 1
func: thread_pool_1 and i = 0
func: thread_pool_0 and i = add task: i = 22func: thread_pool_2 and i = 0
func: thread_pool_1 and i = 1
add task: i = func: thread_pool_0 and i = 33func: thread_pool_1 and i = 2
func: thread_pool_2 and i = 1
func: thread_pool_3 and i = 0
func: thread_pool_0 and i = 4
add task: i = 4
func: thread_pool_2 and i = 2
func: thread_pool_3 and i = 1
func: thread_pool_4 and i = func: 0
thread_pool_1 and i = 3
func: thread_pool_0 and i = func: thread_pool_2 and i = 5
3
func: thread_pool_1 and i = 4
func: thread_pool_3 and i = 2
func: thread_pool_4 and i = 1
stop thread: start
func: thread_pool_0 and i = 6
func: thread_pool_1 and i = 5
func: thread_pool_3 and i = 3
func: thread_pool_4 and i = 2
func: thread_pool_2 and i = 4
func: thread_pool_0 and i = 7
func: thread_pool_4 and i = 3
func: thread_pool_3 and i = func: thread_pool_2func: thread_pool_1 and i = 6and i = 5
4
func: thread_pool_0 and i = 8
func: thread_pool_2 and i = func: 6thread_pool_1 and i =
func: thread_pool_3 and i = 5
7
func: thread_pool_4 and i = 4
func: thread_pool_0 and i = 9
func: thread_pool_3 and i = func: 6
func: thread_pool_4 and i = func: thread_pool_2 and i = 57thread_pool_1 and i = 8
func: thread_pool_3 and i = 7
func: thread_pool_4 and i = 6func:
thread_pool_2 and i = 8
func: thread_pool_1 and i = 9
func: thread_pool_3 and i = 8
func: thread_pool_4 and i = 7
func: thread_pool_2 and i = 9
func: thread_pool_3 and i = 9
func: thread_pool_4 and i = 8
func: thread_pool_4 and i = 9
stop thread: end
- 疑问答疑
问题一:WorkInThread函数中使用while(running_)死循环,发现只有当调用stop时,才会将running_置为false,当不调用stop时便会一直死循环吧?这样明显不合理吧?
答疑:这个循环不会导致死循环,因为它依赖于 ScheduleTask()
函数来获取任务,并且只有在有任务的情况下才会执行 task()
。如果 ScheduleTask()
返回一个空的任务(即 task
为 false
),线程会在下一次循环中再次尝试获取任务,而不会无限执行同一个任务。在某些情况下确实可能导致线程一直循环而不做任何工作,根据代码片段,ScheduleTask()
函数内部调用了 acceptNewTask_.notify_one();
,这表明线程池使用了条件变量来管理任务的调度,从而避免了不必要的CPU消耗。
线程安全Map
接口说明
返回类型 | 名称 |
---|---|
SafeMap() | |
SafeMap(const SafeMap& rhs) | |
~SafeMap() | |
void | Clear() 删除map中存储的所有键值对。 |
void | EnsureInsert(const K& key, const V& value) 在map中插入元素。 |
void | Erase(const K& key) 删除map中键为key的键值对。 |
bool | Find(const K& key, V& value) 在map中查找元素。 |
bool | FindOldAndSetNew(const K& key, V& oldValue, const V& newValue) 在map中查找元素并将key对应的oldValue 替换为newValue 。 |
bool | Insert(const K& key, const V& value) 在map中插入新元素。 |
bool | IsEmpty() 判断map是否为空。 |
void | Iterate(const SafeMapCallBack& callback) 遍历map中的元素。 |
SafeMap& | operator=(const SafeMap& rhs) |
V | ReadVal(const K& key) 线程安全地读map内元素 |
void | ChangeValueByLambda(const K& key, LambdaCallback callback) 线程安全地操作safemap内元素,操作行为需要自定义 |
int | Size() 获取map的size大小。 |
类关系图
源码剖析
源码是在safe_map.h文件中直接实现的。
主要是对std::map<K, V> map_的二次封装,和c++标准map的接口类似,就不再详细说明了。只看个例子
bool FindOldAndSetNew(const K& key, V& oldValue, const V& newValue)//查找并替换键值对
|-->auto iter = map_.find(key);//查找键为key的键值对
|-->if (iter != map_.end()) {//判断是否找到了键值对
|-->oldValue = iter->second;//将旧值赋给oldValue
|-->map_.erase(iter);//删除原有键值对
|-->map_.insert(std::pair<K, V>(key, newValue));//插入新的键值对
应用示例
本案例主要完成如下工作:
- 创建1个子线程,负责每秒调用EnsureInsert()插入元素;
- 创建1个子线程,负责每秒调用Insert()插入元素;
- 创建1个子线程,负责每秒调用Erase()删除元素;
- 创建1个子线程,负责每秒调用FindOldAndSetNew()替换元素的值;
- 主线程等待上述线程结束,Iterate()和Find()查看所有元素;
- 主线程等待上述线程结束,清空SafeMap,并调用IsEmpty()查看是否确实是空。
通过创建线程池,设置最大任务数,并启动线程,通过AddTask的方式添加不同的任务,例如EnsureInsert、Insert等。
OHOS::ThreadPool threads("name_rwlock_threads")
threads.SetMaxTaskNum(128);
threads.Start(4);
str_name = "Thread_xxx";//Thread_EnsureInsert
auto task_ensure_insert = std::bind(<任务函数名称>, str_name);
threads.AddTask(task_ensure_insert);//添加任务
测试在map中插入元素(EnsureInsert)
static struct MapInfo m_map1_insert[] = {{ 1, "aaa" },{ 2, "bbb" },{ 3, "ccc" },{ 4, "ddd" },{ 5, "eee" },{ 6, "fff" },{ 7, "ggg" },{ 8, "hhh" },{ 9, "iii" },{ 10, "jjj" }
};
// 使用EnsureInsert()插入元素
void map_ensure_insert(const string& name)
{int key = 0;string value = "";for (int i = 0; i < (sizeof(m_map1_insert) / sizeof(struct MapInfo)); i++) {key = m_map1_insert[i].key;value = m_map1_insert[i].str;m_safemap.EnsureInsert(key, value);cout << name << ": insert successful and key = " << key << " and value = " << value << endl;sleep(1);}
}
测试在map中插入元素(Insert)
static struct MapInfo m_map2_insert[] = {{ 101, "111" },{ 102, "222" },{ 103, "333" },{ 104, "444" },{ 105, "555" },{ 106, "666" },{ 107, "777" },{ 108, "888" },{ 109, "999" },{ 110, "000" }
};
void map_insert(const string& name)
{int key = 0;string value = "";for (int i = 0; i < (sizeof(m_map2_insert) / sizeof(struct MapInfo)); i++) {key = m_map2_insert[i].key;value = m_map2_insert[i].str;if (m_safemap.Insert(key, value) == false) {cout << name << ": insert failed and key = " << to_string(key) << " and value = " << value << endl;} else {cout << name << ": insert successful and key = " << to_string(key) << " and value = " << value << endl;}sleep(1);}
}
测试在map中使用erase删除元素
void map_erase(const string& name)
{int key = 0;string value = "";for (int i = 0; i < (sizeof(m_map2_insert) / sizeof(struct MapInfo)); i++) {key = m_map2_insert[i].key;m_safemap.Erase(key);cout << name << ": Erase successful and key = " << to_string(key) << endl;sleep(1);}
}
测试使用FindOldAndSetNew替换元素的值
void map_findold_and_setnew(const string& name)
{int key = 0;string old_value = "";string new_value = "";for (int i = 0; i < (sizeof(m_map1_insert) / sizeof(struct MapInfo)); i++) {key = m_map1_reset[i].key;old_value = "";new_value = m_map1_reset[i].str;if (m_safemap.FindOldAndSetNew(key, old_value, new_value) == false) {cout << name << ": FindOldAndSetNew failed and key = " << to_string(key) << " and old_value = " << old_value << endl;} else {cout << name << ": FindOldAndSetNew successful and key = " << to_string(key)<< " and old_value = " << old_value << " and new_value = " << new_value << endl;}sleep(1);}
}
最后设置结束,并等待结束,然后打印SafeMap所有元素,最后清空并确认是否清空完成。
// 设置结束,并等待结束threads.Stop();cout << "Threads Stop" << endl;// 打印SafeMap所有元素cout << "SafeMap Iterate: " << endl;m_safemap.Iterate(map_iterate_print);cout << "SafeMap Find: " << endl;map_find_print();// 清空SafeMapcout << "SafeMap Clear" << endl;m_safemap.Clear();// 查看是否是空的cout << "SafeMap IsEmpty: " << m_safemap.IsEmpty() << endl;
- 执行结果
# ./utils_thread_map
Thread_EnsureInsert: insert successful and key = 1Thread_Insert and value = aaa: insert successful and key =
101 and value = 111
Thread_Erase: Erase successful and key = 101
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 1 and old_value = aaa and new_value = abc
Thread_EnsureInsert: insert successful and key = 2 and value = bbb
Thread_Erase: Erase successful and key = 102
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 2 and old_value = bbb and new_value = bcd
Thread_Insert: insert successful and key = 102 and value = 222
Thread_Erase: Erase successful and key = 103
Thread_EnsureInsert: insert successful and key = Thread_InsertThread_FindOldAndSetNew: FindOldAndSetNew failed and key = 3 and old_value =
3 and value = ccc
: insert successful and key = 103 and value = 333
Thread_Erase: Erase successful and key = 104
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 4 and old_value = ddd and new_value = def
Thread_EnsureInsert: insert successful and key = Thread_Insert: insert successful and key = 104 and value = 444
4 and value = ddd
Thread_Erase: Erase successful and key = 105
Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 5 and old_value =
Thread_Insert: insert successful and key = 105 and value = 555
Thread_EnsureInsert: insert successful and key = 5 and value = eee
Thread_Erase: Erase successful and key = 106
Thread_Insert: insert successful and key = Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 1066 and value = 666 and old_value =
Thread_EnsureInsert: insert successful and key = 6 and value = fffThread_Erase: Erase successful and key = 107
Thread_Insert: insert successful and key = 107 and value = 777
Thread_EnsureInsert: insert successful and key = Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 7 and old_value = ggg and new_value = ghi
7 and value = ggg
Thread_Erase: Erase successful and key = 108
Thread_EnsureInsert: insert successful and key = 8 and value = hhh
Thread_Insert: insert successful and key = 108Thread_FindOldAndSetNew and value = : FindOldAndSetNew failed and key = 8 and old_value =
888
Thread_Erase: Erase successful and key = Thread_FindOldAndSetNew109: FindOldAndSetNew failed and key = 9 and old_value = Thread_Insert: insert successful and key = 109 and value = 999
Thread_EnsureInsert: insert successful and key = 9 and value = iii
Thread_Erase: Erase successful and key = 110
Thread_Insert: insert successful and key = 110 and value = 000
Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 10 and old_value =
Thread_EnsureInsert: insert successful and key = 10 and value = jjj
Threads Stop
SafeMap Iterate:
key = 1, value = abc
key = 2, value = bcd
key = 3, value = ccc
key = 4, value = def
key = 5, value = eee
key = 6, value = fff
key = 7, value = ghi
key = 8, value = hhh
key = 9, value = iii
key = 10, value = jjj
key = 103, value = 333
key = 104, value = 444
key = 105, value = 555
key = 106, value = 666
key = 107, value = 777
key = 108, value = 888
key = 109, value = 999
key = 110, value = 000
SafeMap Find:
key = 1, value = abc
key = 2, value = bcd
key = 3, value = ccc
key = 4, value = def
key = 5, value = eee
key = 6, value = fff
key = 7, value = ghi
key = 8, value = hhh
key = 9, value = iii
key = 10, value = jjj
key = 103, value = 333
key = 104, value = 444
key = 105, value = 555
key = 106, value = 666
key = 107, value = 777
key = 108, value = 888
key = 109, value = 999
key = 110, value = 000
SafeMap Clear
SafeMap IsEmpty: 1
线程安全阻塞队列
这些类模板使用 C++ 标准库中的互斥锁(std::mutex
)、条件变量(std::condition_variable
)和队列(std::queue
)来实现线程安全的操作。
接口说明
#include <safe_block_queue.h>
OHOS::SafeBlockQueue
返回值 | 名称 |
---|---|
SafeBlockQueue(int capacity) 构造函数,整数参数 capacity ,用于设置队列的最大容量 | |
virtual ~SafeBlockQueue() 析构函数 | |
void | virtual Push(T const& elem) 入队操作(阻塞版),将一个元素插入队列的末尾,如果队列已满,则使用条件变量阻塞当前线程,直到队列中有空位。插入完成后,通知等待队列不为空的线程。 |
bool | virtual PushNoWait(T const& elem) 入队操作(非阻塞版),如果队列已满,直接返回 false;否则插入元素并返回 true。 |
T | Pop() 出队操作(阻塞版),从队列的开头移除一个元素,如果队列为空,则使用条件变量阻塞当前线程,直到队列中有元素。移除完成后,通知等待队列不满的线程。 |
bool | PopNotWait(T& outtask) 出队操作(非阻塞版),如果队列为空,直接返回 false;否则移除元素并返回 true |
unsigned int | Size() 获取队列容量 |
bool | IsEmpty() 队列判空 |
bool | IsFull() 队列判满 |
OHOS::SafeBlockQueueTracking,该类继承自 SafeBlockQueue
。SafeBlockQueueTracking
旨在提供线程安全的阻塞队列功能,并在此基础上增加对任务完成情况的跟踪能力。
class SafeBlockQueueTracking : public SafeBlockQueue
返回值 | 名称 |
---|---|
explicit | SafeBlockQueueTracking(int capacity) 构造函数,接受一个 int 类型的参数 capacity ,用于设置队列的最大容量,并初始化 |
virtual ~SafeBlockQueueTracking() 析构函数 | |
void | virtual Push(T const& elem) 入队操作(阻塞版),首先增加 unfinishedTaskCount_ 计数。如果队列已满,则使用条件变量 cvNotFull_ 阻塞当前线程,直到队列有空位。插入完成后,通知等待队列不为空的线程。 |
bool | virtual PushNoWait(T const& elem) 入队操作(非阻塞版)先检查队列是否已满。如果已满,则直接返回 false ;否则插入元素,并增加 unfinishedTaskCount_ 计数,通知等待队列不为空的线程,返回 true |
bool | OneTaskDone() 一个任务完成时的响应函数,它会减少 unfinishedTaskCount_ 计数。如果计数减少到 0,则通知所有等待任务完成的线程 |
void | Join() 等待未完成队列,会阻塞当前线程,直到队列中的所有任务都已完成(即 unfinishedTaskCount_ 计数变为 0) |
int | GetUnfinishTaskNum() 获取未完成任务数 |
类关系图
源码剖析
源码是在safe_block_queue.h文件中直接实现的。
主要是对std::queue queueT_的二次封装,和c++标准map的接口类似,就不再详细说明了,直接看应用示例吧。
应用示例
(1)使用SafeBlockQueue接口的案例
- 判断命令行是否使用阻塞,还是非阻塞;
- 创建子线程生产者,使用阻塞/非阻塞方式,入队操作;
- 创建子线程消费者,使用阻塞/非阻塞方式,出队操作;
- 主线程等待所有子线程结束
// 定义常量
const char STRING_WAIT[] = "wait";
const char STRING_NOWAIT[] = "nowait";// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueue<int> m_safeBlockQueue(SIZE);// 返回时间字符串
static string get_curtime()
{string str = "";time_t time_now = time(nullptr);struct tm tm_now;localtime_r(&time_now, &tm_now);str += to_string(tm_now.tm_year + 1900) + "-" + to_string(tm_now.tm_mon + 1) + "-" + to_string(tm_now.tm_mday);str += " ";str += to_string(tm_now.tm_hour) + ":" + to_string(tm_now.tm_min) + ":" + to_string(tm_now.tm_sec);return str;
}static void product_wait(const string &name)
{for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;// 使用阻塞方式的SafeBlockQueuem_safeBlockQueue.Push(i);cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;// 等待1秒cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}static void consume_wait(const string &name)
{for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;// 使用阻塞方式的SafeBlockQueueint value = m_safeBlockQueue.Pop();cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;// 等待0.5秒cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(500));}
}static void product_nowait(const string &name)
{bool ret;for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;// 使用非阻塞方式的SafeBlockQueueret = m_safeBlockQueue.PushNoWait(i);cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;// 等待1秒cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}static void consume_nowait(const string &name)
{for (int i = 0; i < (SIZE * 2); i++) {// 等待有新数据int value = 0;// 使用非阻塞方式的SafeBlockQueuebool ret = m_safeBlockQueue.PopNotWait(value);cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;// 等待500毫秒std::this_thread::sleep_for(std::chrono::milliseconds(500));}
}int main(int argc, char **argv)
{bool enable_wait = true;OHOS::ThreadPool threads("threads");string str_name = "";// 获取命令行参数if (argc != 2) {cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;return -1;}if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {enable_wait = true;} else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {enable_wait = false;} else {cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;return -1;}threads.SetMaxTaskNum(4);threads.Start(2);// 创建生产者线程cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));threads.AddTask(task_product);// 等待SIZE秒,将SafeBlockQueue容器填满cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));// 创建消费者线程cout << get_curtime() << ", " << __func__ << ": consume start" << endl; auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));threads.AddTask(task_consumer);threads.Stop();cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl; return 0;
}
- 执行结果
# ./utils_thread_queue wait
2017-8-5 17:2:16, main: task_product start
2017-8-5 17:2:16, main: sleep 5 sec
2017-8-5 17:2:16, product_wait: Push Start, i = 0
2017-8-5 17:2:16, product_wait: Push Success, i = 0
2017-8-5 17:2:16, product_wait: Sleep 1 sec
2017-8-5 17:2:17, product_wait: Push Start, i = 1
2017-8-5 17:2:17, product_wait: Push Success, i = 1
2017-8-5 17:2:17, product_wait: Sleep 1 sec
2017-8-5 17:2:18, product_wait: Push Start, i = 2
2017-8-5 17:2:18, product_wait: Push Success, i = 2
2017-8-5 17:2:18, product_wait: Sleep 1 sec
2017-8-5 17:2:19, product_wait: Push Start, i = 3
2017-8-5 17:2:19, product_wait: Push Success, i = 3
2017-8-5 17:2:19, product_wait: Sleep 1 sec
2017-8-5 17:2:20, product_wait: Push Start, i = 4
2017-8-5 17:2:20, product_wait: Push Success, i = 4
2017-8-5 17:2:20, product_wait: Sleep 1 sec
2017-8-5 17:2:21, main: consume start
2017-8-5 17:2:21, consume_wait: Pop Start, i = 0
2017-8-5 17:2:21, consume_wait: Pop Success, i = 0, value = 0
2017-8-5 17:2:21, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:21, product_wait: Push Start, i = 5
2017-8-5 17:2:21, product_wait: Push Success, i = 5
2017-8-5 17:2:21, product_wait: Sleep 1 sec
2017-8-5 17:2:21, consume_wait: Pop Start, i = 1
2017-8-5 17:2:21, consume_wait: Pop Success, i = 1, value = 1
2017-8-5 17:2:21, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:22, consume_wait: Pop Start, i = 2
2017-8-5 17:2:22, consume_wait: Pop Success, i = 2, value = 2
2017-8-5 17:2:22, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:22, product_wait: Push Start, i = 6
2017-8-5 17:2:22, product_wait: Push Success, i = 6
2017-8-5 17:2:22, product_wait: Sleep 1 sec
2017-8-5 17:2:22, consume_wait: Pop Start, i = 3
2017-8-5 17:2:22, consume_wait: Pop Success, i = 3, value = 3
2017-8-5 17:2:22, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:23, consume_wait: Pop Start, i = 4
2017-8-5 17:2:23, consume_wait: Pop Success, i = 4, value = 4
2017-8-5 17:2:23, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:23, product_wait: Push Start, i = 7
2017-8-5 17:2:23, product_wait: Push Success, i = 7
2017-8-5 17:2:23, product_wait: Sleep 1 sec
2017-8-5 17:2:23, consume_wait: Pop Start, i = 5
2017-8-5 17:2:23, consume_wait: Pop Success, i = 5, value = 5
2017-8-5 17:2:23, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:24, product_wait: Push Start, i = 8
2017-8-5 17:2:24, product_wait: Push Success, i = 8
2017-8-5 17:2:24, product_wait: Sleep 1 sec
2017-8-5 17:2:24, consume_wait: Pop Start, i = 6
2017-8-5 17:2:24, consume_wait: Pop Success, i = 6, value = 6
2017-8-5 17:2:24, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:24, consume_wait: Pop Start, i = 7
2017-8-5 17:2:24, consume_wait: Pop Success, i = 7, value = 7
2017-8-5 17:2:24, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:25, product_wait: Push Start, i = 9
2017-8-5 17:2:25, product_wait: Push Success, i = 9
2017-8-5 17:2:25, product_wait: Sleep 1 sec
2017-8-5 17:2:25, consume_wait: Pop Start, i = 8
2017-8-5 17:2:25, consume_wait: Pop Success, i = 8, value = 8
2017-8-5 17:2:25, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:25, consume_wait: Pop Start, i = 9
2017-8-5 17:2:25, consume_wait: Pop Success, i = 9, value = 9
2017-8-5 17:2:25, consume_wait: Sleep 0.5 sec
2017-8-5 17:2:26, main: Queue Wait End
(2)使用SafeBlockQueueTracking接口的案例
- 判断命令行是否使用阻塞,还是非阻塞;
- 创建子线程生产者,使用阻塞/非阻塞方式,入队操作;
- 创建子线程消费者,使用阻塞/非阻塞方式,出队操作;
- 主线程等待所有子线程结束
// 定义常量
const char STRING_WAIT[] = "wait";
const char STRING_NOWAIT[] = "nowait";// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueueTracking<int> m_safeBlockQueueTracking(SIZE);// 返回时间字符串
static string get_curtime()
{string str = "";time_t time_now = time(nullptr);struct tm tm_now;localtime_r(&time_now, &tm_now);str += to_string(tm_now.tm_year + 1900) + "-" + to_string(tm_now.tm_mon + 1) + "-" + to_string(tm_now.tm_mday);str += " ";str += to_string(tm_now.tm_hour) + ":" + to_string(tm_now.tm_min) + ":" + to_string(tm_now.tm_sec);return str;
}static void product_wait(const string &name)
{for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;// 使用阻塞方式的SafeBlockQueueTrackingm_safeBlockQueueTracking.Push(i);cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;// 等待1秒cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}static void consume_wait(const string &name)
{for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;// 使用阻塞方式的SafeBlockQueueTrackingint value = m_safeBlockQueueTracking.Pop();cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;m_safeBlockQueueTracking.OneTaskDone();cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;// 等待0.5秒cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(500));}
}static void product_nowait(const string &name)
{bool ret;for (int i = 0; i < (2 * SIZE); i++) {cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;// 使用非阻塞方式的SafeBlockQueueTrackingret = m_safeBlockQueueTracking.PushNoWait(i);cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;if (ret == true) {m_safeBlockQueueTracking.OneTaskDone();cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;}// 等待1秒cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}static void consume_nowait(const string &name)
{for (int i = 0; i < (SIZE * 2); i++) {// 等待有新数据int value = 0;// 使用非阻塞方式的SafeBlockQueueTrackingbool ret = m_safeBlockQueueTracking.PopNotWait(value);cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;// 等待500毫秒std::this_thread::sleep_for(std::chrono::milliseconds(500));}
}int main(int argc, char **argv)
{bool enable_wait = true;OHOS::ThreadPool threads("threads");string str_name = "";// 获取命令行参数if (argc != 2) {cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;return -1;}if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {enable_wait = true;} else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {enable_wait = false;} else {cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;return -1;}threads.SetMaxTaskNum(4);threads.Start(2);// 创建生产者线程cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));threads.AddTask(task_product);// 等待SIZE秒,将SafeBlockQueue容器填满cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));// 创建消费者线程cout << get_curtime() << ", " << __func__ << ": consume start" << endl; auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));threads.AddTask(task_consumer);threads.Stop();cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl; return 0;
}
- 执行结果
# ./utils_thread_queue_track nowait
2017-8-5 17:37:0, main: task_product start
2017-8-5 17:37:0, main: sleep 2017-8-5 17:37:0, product_nowait: Push Start, i = 5 sec
0
2017-8-5 17:37:0, product_nowait: Push ret = 1, i = 0
2017-8-5 17:37:0, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:0, product_nowait: Sleep 1 sec
2017-8-5 17:37:1, product_nowait: Push Start, i = 1
2017-8-5 17:37:1, product_nowait: Push ret = 1, i = 1
2017-8-5 17:37:1, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:1, product_nowait: Sleep 1 sec
2017-8-5 17:37:2, product_nowait: Push Start, i = 2
2017-8-5 17:37:2, product_nowait: Push ret = 1, i = 2
2017-8-5 17:37:2, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:2, product_nowait: Sleep 1 sec
2017-8-5 17:37:3, product_nowait: Push Start, i = 3
2017-8-5 17:37:3, product_nowait: Push ret = 1, i = 3
2017-8-5 17:37:3, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:3, product_nowait: Sleep 1 sec
2017-8-5 17:37:4, product_nowait: Push Start, i = 4
2017-8-5 17:37:4, product_nowait: Push ret = 1, i = 4
2017-8-5 17:37:4, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:4, product_nowait: Sleep 1 sec
2017-8-5 17:37:5, main: consume start
2017-8-5 17:37:5, consume_nowait: PopNotWait ret = 1, value = 0
2017-8-5 17:37:5, product_nowait: Push Start, i = 5
2017-8-5 17:37:5, product_nowait: Push ret = 1, i = 5
2017-8-5 17:37:5, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:5, product_nowait: Sleep 1 sec
2017-8-5 17:37:5, consume_nowait: PopNotWait ret = 1, value = 1
2017-8-5 17:37:6, consume_nowait: PopNotWait ret = 1, value = 2
2017-8-5 17:37:6, product_nowait: Push Start, i = 6
2017-8-5 17:37:6, product_nowait: Push ret = 1, i = 6
2017-8-5 17:37:6, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:6, product_nowait: Sleep 1 sec
2017-8-5 17:37:6, consume_nowait: PopNotWait ret = 1, value = 3
2017-8-5 17:37:7, consume_nowait: PopNotWait ret = 1, value = 4
2017-8-5 17:37:7, product_nowait: Push Start, i = 7
2017-8-5 17:37:7, product_nowait: Push ret = 1, i = 7
2017-8-5 17:37:7, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:7, product_nowait: Sleep 1 sec
2017-8-5 17:37:7, consume_nowait: PopNotWait ret = 1, value = 5
2017-8-5 17:37:8, consume_nowait: PopNotWait ret = 1, value = 6
2017-8-5 17:37:8, product_nowait: Push Start, i = 8
2017-8-5 17:37:8, product_nowait: Push ret = 1, i = 8
2017-8-5 17:37:8, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:8, product_nowait: Sleep 1 sec
2017-8-5 17:37:8, consume_nowait: PopNotWait ret = 1, value = 7
2017-8-5 17:37:9, product_nowait: Push Start, i = 2017-8-5 17:37:9, consume_nowait: PopNotWait ret = 19, value = 82017-8-5 17:37:9, product_nowait: Push ret = 1, i = 9
2017-8-5 17:37:9, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:9, product_nowait: Sleep 1 sec
2017-8-5 17:37:9, consume_nowait: PopNotWait ret = 1, value = 9
2017-8-5 17:37:10, main: Queue Wait End
线程安全栈与队列
线程安全队列,是在dequeue的基础上封装std::lock_guard,以此实现线程的相关操作。根据继承SafeQueueInner抽象类,并对dequeue的pop方法的重写,可以实现SafeStack和SafeQueue的相关方法。
对std::lock_guard不熟悉的码友可以看这篇文章
接口说明
- OHOS::SafeQueueInner
返回值 | 名称 |
---|---|
SafeQueueInner() 构造函数 | |
virtual ~SafeQueueInner() 析构函数 | |
void | Erase(T& object) 移除某个元素 |
bool | Empty() 队列判空 |
void | Clear() 清空队列元素 |
int | Size() 获取队列的容量 |
void | Push(const T& pt) 入队操作 |
void | virtual void DoPush(const T& pt) = 0 Push底层调用DoPush,需要重写 |
bool | Pop(T& pt) 出队操作 |
bool | virtual DoPop(T& pt) = 0 Push底层调用DoPop,需要重写 |
- OHOS::SafeQueue
class SafeQueue : public SafeQueueInner
返回值 | 名称 |
---|---|
void | DoPush(const T& pt) 入队操作 |
bool | DoPop(T& pt) 出队操作 |
- OHOS::SafeStack
class SafeStack : public SafeQueueInner
返回值 | 名称 |
---|---|
void | DoPush(const T& pt) 入栈操作 |
bool | DoPop(T& pt) 出栈操作 |
类关系图
源码剖析
源码是在safe_queue.h文件中直接实现的,主要包含两个类SafeStack、SafeQueue。
SafeStack模板类,可以存储任何类型的对象。它继承自 SafeQueueInner<T>
,这意味着 SafeStack将继承 SafeQueueInner
的所有成员变量和成员函数,除了那些被重写的虚函数。SafeQueue模板类,可以存储任何类型的对象。它继承自 SafeQueueInner<T>
,这意味着 SafeQueue
将继承 SafeQueueInner
的所有成员变量和成员函数,除了那些被重写的虚函数。两个类只有重写的push和pop函数有区别,主要在于实现队列为先进先出,栈为后进先出的特点。
应用示例
本案例主要完成如下工作:
- 创建2个子线程,1个线程负责入队操作,1个线程负责出队操作
- 子线程入队操作,每1秒做1次入队操作,循环5次
- 子线程入队操作,每2秒做1次出队操作,循环5次
// 定义栈变量
static OHOS::SafeStack<int> m_safeStack;static void funcSafeStackPush(const string &name)
{for (int i = 0; i < 5; i++) {// 入队操作cout << name << ", Push Start and i = " << i << endl;m_safeStack.Push(i);cout << name << ", Push Successful and i = " << i << " and value = " << i << endl;// 睡眠1秒cout << name << ", Sleep 1 sec" << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}static void funcSafeStackPop(const string &name)
{bool ret;int value;for (int i = 0; i < 5; i++) {// 出队操作cout << name << ", Pop Start and i = " << i << endl;ret = m_safeStack.Pop(value);if (ret) {cout << name << ", Pop Successful and i = " << i << " and ret = " << ret << " and value = " << value << endl;} else {cout << name << ", Pop Failed and i = " << i << endl;}// 睡眠2秒cout << name << ", Sleep 2 sec" << endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 2));}
}int main(int argc, char **argv)
{OHOS::ThreadPool threads("threads");string str_name;// 清空SafeStackm_safeStack.Clear();threads.SetMaxTaskNum(128);threads.Start(2);// 开启子线程,使用Pushstr_name = "Thread_SafeStack_Push";auto task_push = std::bind(funcSafeStackPush, str_name);threads.AddTask(task_push);// 开启子线程,使用Popstr_name = "Thread_SafeStack_Pop";auto task_pop = std::bind(funcSafeStackPop, str_name);threads.AddTask(task_pop);// 设置结束,并等待结束threads.Stop();cout << "Threads Stop" << endl;return 0;
}
- 执行结果
# ./utils_thread_safestack
Thread_SafeStack_Push, Push Start and i = 0
Thread_SafeStack_Push, Push Successful and i = 0 and value = 0
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 0
Thread_SafeStack_Pop, Pop Successful and i = 0 and ret = 1 and value = 0
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 1
Thread_SafeStack_Push, Push Successful and i = 1 and value = 1
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 1
Thread_SafeStack_Pop, Pop Successful and i = 1 and ret = 1 and value = 1
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 2
Thread_SafeStack_Push, Push Successful and i = 2 and value = 2
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Push, Push Start and i = 3
Thread_SafeStack_Push, Push Successful and i = 3 and value = 3
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 2
Thread_SafeStack_Pop, Pop Successful and i = 2 and ret = 1 and value = 3
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 4
Thread_SafeStack_Push, Push Successful and i = 4 and value = 4
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 3
Thread_SafeStack_Pop, Pop Successful and i = 3 and ret = 1 and value = 4
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Pop, Pop Start and i = 4
Thread_SafeStack_Pop, Pop Successful and i = 4 and ret = 1 and value = 2
Thread_SafeStack_Pop, Sleep 2 sec
Threads Stop
参考文档
C++公共基础库:包含简要说明、编译构建及各个功能节点的使用说明
凌智电子开发板
相关文章:
openharmony5.0.0中C++公共基础类测试-线程相关(一)
C公共基础类测试及源码剖析 延续传统,show me the code,除了给出应用示例还重点分析了下openharmony中的实现。 简介 openharmony中提供了C公共基础类库,为标准系统提供了一些常用的C开发工具类,本文分析其实现,并给…...
前缀和相似题共赏
P3131 [USACO16JAN] Subsequences Summing to Sevens S P3131 [USACO16JAN] Subsequences Summing to Sevens S 思路: 一看到区间和我们应该就能马上想到把这个区间拆分成两个前缀相减的形式 式子为:(Pre[r] - Pre[l-1]) % 7 0 Pre[r] % 7 Pre[l-1] % 7 Pre[r] Pre[l-1] 所…...
一文读懂https
http和https的关系 http,应用层协议,由于采用明文传输,不安全,还有很多其他安全问题,为此就衍生出了同为应用层协议的https。https在http的基础上引入了SSL(Secure Socket Layer 安全套接层)和…...
为什么 requests 不是 python 标准库?
为什么 requests 不是 python 标准库? requests开发者Kenneth之前还严肃地征求过这个意见,感兴趣的可以看看 https://github.com/psf/requests/issues/2424 我大致瞅了下,基本都不赞成requests加入python标准库,主要有以下两个原…...
[STM32] 4-1 UART与串口通信
文章目录 前言4-1 UART与串口通信串口简介串口接线 数据帧串口的数据帧格式空闲位起始位数据位校验位(位于数据位内部)奇偶校验 停止位 异步通信和波特率同步通信异步通信波特率 流控的概念串口流控的工作原理 随堂测试问题1:说出Tx、Rx、CTS、RTS、VCC、…...
7-1 三种语言的单词转换
编写程序实现:首先从键盘输入若干个中文与英文单词的偶对,以空行作结束标记;再输入若干个英文与丹麦文单词的偶对,以空行作结束标记。然后输入一个中文单词,输出对应的丹麦文单词;若不存在该单词࿰…...
高防IP是什么
"高防IP"是指"高防护IP",是一种防御DDoS(分布式拒绝服务攻击)的网络安全服务。在分布式拒绝服务攻击中,攻击者会利用许多不同的计算机或者其他设备,通过向目标发送大量的网络请求来尝试使目标服务…...
基于javaweb的SSM宠物商城设计与实现(源码+文档+部署讲解)
技术范围:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…...
《TCP/IP详解 卷1:协议》之第六章:ICMP:Internet控制报文协议
目录 一、ICMP协议 二、ICMP 报文格式 三、ICMP询问报文 四、ICMP 差错报告报文 五、ICMP端口不可达差错 一、ICMP协议 ICMP(Internet Control Message Protocol,互联网控制消息协议)是网络层的一个核心协议,用于在IP主机、…...
SpringBoot项目,密码加密之“BCrypt加密”
前言 这种方法,是当前推荐的密码加密方式。(现在不推荐使用MD5加密了)。 如何在springboot项目中,使用bcrypt加密?请分步骤详细介绍一下 一.在Spring Boot项目中使用BCrypt加密的详细步骤 BCrypt是当前推荐用于密码存…...
外贸获客新革命:基于AI的搜索引擎排名攻防战——48小时抢占谷歌TOP3的技术逻辑与实战路径
一、传统SEO的三大死亡陷阱(为什么你的客户正在被AI截流?) 关键词荒漠化 人工筛选关键词效率不足1%,95%的B2B采购商使用长尾词搜索(如"IP68 waterproof LED strip for outdoor projects")而非通…...
0101基础知识-区块链-web3
文章目录 1 web3学习路线2 区块链简史2.1 区块链2.2 公共账本2.3 区块链的设计哲学2.3.1 去中心化2.3.2 共识2.3.2.1 上链2.3.2.2 共识算法 3 web3面向资产的互联网3.1 安全性和去中心化的权衡 4 智能合约4.1 以太坊智能合约4.2 去中心化应用 5 小结结语 1 web3学习路线 参考下…...
SpringMVC从入门到上手-全面讲解SpringMVC的使用.
一、springmvc介绍 MVC全称Model View Controller,是一种设计创建Web应用程序的模式。这三个单词分别代表Web应用程序的三个部分: Model(模型):指数据模型。用于存储数据以及处理用户请求的业务逻辑。在Web应用中&…...
解锁现代生活健康密码,开启养生新方式
在科技飞速发展的当下,我们享受着便捷生活,却也面临诸多健康隐患。想要维持良好状态,不妨从这些细节入手,解锁科学养生之道。 肠道是人体重要的消化器官,也是最大的免疫器官,养护肠道至关重要。日常可多…...
绿色森林人文生活纪实摄影Lr调色教程,手机滤镜PS+Lightroom预设下载!
调色介绍 绿色森林人文生活纪实摄影 Lr 调色,是借助 Lightroom 软件,对以绿色森林为背景,记录人文生活场景的纪实摄影作品进行后期调色处理。通过调整画面的色彩、光影、对比度等参数,让画面融入绿色森林的独特氛围,真…...
【项目篇】仿照RabbitMQ模拟实现消息队列
大家好呀 我是浪前 项目篇:仿照RabbitMQ模拟实现消息队列 今天是项目的第一篇,我们先来创建出最核心的几个类。 仿照RabbitMQ模拟实现消息队列 创建Exchange类MessageQueue类Binding类Message类1:BasicProperties类2:正文部分3&a…...
JAVA程序获取SVN提交记录
1.获取文件提交记录 private String userName "userName "; //svn账号 private String password "password "; //svn密码 private String urlString "urlString "; //svnurl 换成自己对应的svn信息 package com.tengzhi.common.dao;import…...
从检索到生成:RAG 如何重构大模型的知识边界?
目录 一、技术演进图谱说明 二、RAG 技术概述 (一)核心思想说明 (二)RAG 发展路径与研究范式 三、Naive RAG:最基础的检索增强生成范式 (一)Naive RAG 的标准流程 1. 索引(In…...
rabbitmq-spring-boot-start版本优化升级
文章目录 1.前言2.优化升级内容3.依赖4.使用4.1发送消息代码示例4.2消费监听代码示例4.3 brock中的消息 5.RabbmitMq的MessageConverter消息转换器5.1默认行为5.2JDK 序列化的缺点5.3使用 JSON 进行序列化 6.总结 1.前言 由于之前手写了一个好用的rabbitmq-spring-boot-start启…...
SVN仓库突然没有权限访问
如果svn仓库突然出现无法访问的情况,提示没有权限,所有账号都是如此,新创建的账号也不行。 并且会突然提示要输入账号密码。 出现这个情况时,大概率库里面的文件有http或者https的字样,因为单独给该文件添加权限导致…...
vue实现静默打印pdf
浏览器中想要打印文件,不依靠浏览器自带的打印窗口,想要实现静默打印(也就是不弹出打印对话框),同时控制打印份数的功能,一种方式是使用vue-plugin-hiprint和本地安装客户端electron-hiprint 本来是浏览器去…...
如何开启远程桌面连接外网访问?异地远程控制内网主机
实现远程桌面连接外网访问,能够突破地域限制,随时随地访问远程计算机,满足远程办公、技术支持等多种需求。下面为你详细介绍开启方法。 一、联网条件 确保本地计算机和远程计算机都有稳定的网络连接,有联网能上网。 二、开启远程…...
数据结构与算法学习笔记(Acwing提高课)----动态规划·数字三角形
数据结构与算法学习笔记----动态规划数字三角形 author: 明月清了个风 first publish time: 2025.4.23 ps⭐️终于开始提高课的题啦,借的人家的号看,以后给y总补票叭,提高课的题比之前的多很多啊哈哈哈哈,基本上每种题型都对应了…...
RK3568平台开发系列讲解(调试篇)debugfs文件系统及常见调试节点介绍
更多内容可以加入Linux系统知识库套餐(教程+视频+答疑) 🚀返回专栏总目录 文章目录 一、什么是debugfs二、/proc/filesystems三、debugfs的挂载3.1、fstab 的文件结构3.2、手动挂载与卸载四、debugfs 常见目录有哪些4.1、/sys/kernel/debug/gpio4.2、/sys/kernel/debug/…...
数字化转型避坑指南:中钧科技如何用“四个锚点”破解转型深水区
数字化转型浪潮下,企业常陷入四大典型陷阱:跟风式投入、数据沼泽化、流程伪在线、安全裸奔化。中钧科技旗下产品以“经营帮”平台为核心,通过针对性方案帮助企业绕开深坑。 陷阱一:盲目跟风,为数字化而数字化 许…...
数字化转型下的批发订货系统:降本增效的关键路径
随着数字化转型的不断深入,越来越多的企业开始拥抱现代化的技术和工具,以提升业务效率、降低运营成本。批发行业,作为一个高度依赖库存和订单管理的行业,数字化转型尤为关键。传统的批发订货系统存在信息不对称、操作复杂、效率低…...
一 、环境的安装 Anaconda + Pycharm + PaddlePaddle
《从零到一实践:系统性学习生成式 AI(NLP)》 一 、环境的安装 Anaconda Pycharm PaddlePaddle 1. Anaconda 软件安装 Anaconda 软件安装有大量的教程,此处不在说明,安装完成之后界面如下: 2. 创建 Anaconda 虚拟环境 Paddl…...
Dhtmlx Gantt教程
想实现的效果 插件安装: npm i dhtmlx-gantt使用该插件的时候,直接导入包和对应的样式即可: import { Gantt} from "dhtmlx-gantt"; import "dhtmlx-gantt/codebase/dhtmlxgantt.css";也可以安装试用版本,…...
大模型框架技术全景与下一代架构演进
一、大模型框架概述 大模型框架是支撑千亿级参数模型训练、推理及产业落地的技术底座,涵盖分布式计算、高效内存管理、多模态融合等核心模块。从GPT-3到Gemini Ultra,大模型框架的迭代推动AI从“作坊式实验”迈向“工业化生产”。据Gartner预测&a…...
官方不存在tomcat10-maven-plugin插件
Maven 中央仓库中没有官方的tomcat10-maven-plugin。Apache Tomcat Maven 插件项目目前仅对以下插件提供官方支持: tomcat6-maven-plugin tomcat7-maven-plugin tomcat8-maven-plugin tomcat9-maven-plugin 如果你想使用 cargo 命令来跑支持 Jakarta EE 的 Tomcat 1…...
vue3 el-table 右击
在 Vue 3 中使用 Element Plus 的 <el-table> 组件时,如果你想实现右击(右键点击)事件的处理,你可以通过监听 contextmenu 事件来实现。contextmenu 事件在用户尝试打开上下文菜单(通常是右键点击)时…...
第一节:核心概念高频题-Vue3响应式原理与Vue2的区别
Vue2:基于Object.defineProperty监听对象属性,需手动处理数组方法重写 Vue3:采用Proxy代理实现全量响应式,支持动态新增属性和深层嵌套对象监听 一、实现机制对比 1. Vue2:基于 Object.defineProperty • 原理&#…...
【锂电池剩余寿命预测】CNN卷积神经网络锂电池剩余寿命预测(Pytorch完整源码和数据)
目录 效果一览程序获取程序内容代码分享效果一览 程序获取 获取方式一:文章顶部资源处直接下载:...
web刷题笔记
2024isctf ezrce 禁用了一些关键字符,查询函数,系统执行函数,执行函数都有,空格也和斜杆也禁用了,但是其他一些很大一部分字符都没有禁用,属于关键词禁用的类型,正常的步骤是去查一下列表&#…...
基于FPGA 和DSP 的高性能6U VPX 采集处理板
基于FPGA 和DSP 的高性能6U VPX 采集处理板,是一款处理架构采用FPGADSP 的高性能的6U VPX 采集处理板。板载4 片高速ADC 共8 个采集通道,可支持8 路采样率最高2.6Gsps/14Bit 的模拟信号通道。 板卡FPGA 采用Xilinx 公司KU 系列的XCKU115-2FLVF1924I&…...
uniapp中使用<cover-view>标签
文章背景: uniapp中遇到了原生组件(canvas)优先级过高覆盖vant组件 解决办法: 使用<cover-view>标签 踩坑: 我想实现的是一个vant组件库中动作面板的效果,能够从底部弹出框,让用户进行选择,我直…...
【JavaScript】详讲运算符--算术运算符
1、运算符简介 运算符也叫操作符,通过运算符可以对一个或多个值进行运算,比如:typeof就是运算符,可以来获得一个值的类型,它会将该值的类型以字符串的形式返回,即:typeof 变量名的结果为字符串类型。 <…...
.NET 6 + Dapper + User-Defined Table Type
大家都知道,对于SQL Server IN是有限制条件的,如果IN里面的内容过多,在执行的时候会被自动截断,因而导致查询到的结果不是实际需要的结果。 select * from Payments where Id in (1,2,3,4,...) 为了解决上面的限制,可以…...
使用 Conda 创建新环境
使用 Conda 创建新环境 在使用 Conda 进行包管理和环境隔离时,创建新环境是一个非常常见的操作。通过创建独立的环境,可以避免不同项目之间的依赖冲突,并且能够灵活地管理各个项目的运行环境。 以下是使用 Conda 创建和管理新环境的详细步骤…...
数据为基:机器学习中数值与分类数据的处理艺术及泛化实践
数据为基:机器学习中数值与分类数据的处理艺术及泛化实践 摘要 在机器学习实践中,数据质量对模型效果的影响往往超过算法选择。本文通过详实的案例解析,系统阐述数值型数据与分类数据的特征工程处理方法,揭示数据预处理对模型泛…...
Docker镜像与容器概念解析
Docker镜像与容器概念解析 -更适合大学生宝宝体制的docker学习指南 一、Docker镜像:应用程序的基因库 (1)本质特征:镜像是一个只读的二进制文件包,相当于应用程序的”基因图谱”。就像生物体的DNA决定了生物特征&a…...
基于GMM的语音识别
语音识别是近年来发展非常迅速的一项计算机智能技术,广泛应用在语音控制、身份识别等多个领域。本次项目主要研究语音识别的预处理过程和特征参数的提取环节。通过对原始语音信号进行预加重和分帧、加窗,滤除低频干扰,提升对语音识别有用的部…...
K8S安全认证
一。用户认证的基本框架 在K8S集群中,客户端通常有两类: 1.User Account:一般独立于K8S之外的其他服务管理的用过户账号 2.Service Account:K8S管理的账号,用于为Pod中的服务进程在访问K8S提供身份标识 ApiServer是…...
咖啡机语音芯片方案-WTN6040FP-14S直接驱动4欧/3W喇叭-大功率输出
一、开发背景 随着智能家居市场的快速发展和消费者对家电产品交互体验要求的不断提高,语音提示功能已成为现代咖啡机产品的重要卖点之一。传统咖啡机仅依靠指示灯和简单蜂鸣器提示,无法满足用户对操作引导、状态反馈和个性化体验的需求。 WTN6040FP-14大…...
Vue3集成百度实时语音识别
示例 SpeechRecognitionModal.vue 组件 <template><transition name"modal-fade"><div v-if"isOpen" class"modal-overlay" click.self"handleOverlayClick"><div class"modal-container"><div…...
C# 设计原则总结
跟着视频学习的,记录一下最后的总结。 接口隔离: 单一职责: 里氏替换: 依赖倒置; 迪米特法则; 开闭原则:...
zkPass案例实战之合约篇
目录 一、contracts/contracts/ProofVerifier.sol 1. License 和 Solidity 版本 2. 导入依赖 3. 合约声明和默认分配器地址 4. 验证证明 5. 验证分配器签名 6. 验证验证者签名 7. 签名前缀处理 8. 签名恢复 总结 二、contracts/contracts/SampleAttestation.sol 1. …...
docker学习笔记5-docker中启动Mysql的最佳实践
一、查找目录文件位置 1、mysql的配置文件路径 /etc/mysql/conf.d 2、mysql的数据目录 /var/lib/mysql 3、环境变量 4、端口 mysql的默认端口3306。 二、启动命令 1、启动命令说明 docker run -d -p 3306:3306 -v /app/myconf:/etc/mysql/conf.d # 挂载配置目录 -v…...
彻底禁用windows的语音识别快捷键win+ctrl+s
工作中经常使用ctrls保存,但是经常误触win,结果弹出如下对话框,甚是闹心: 搜索网络,问AI,竟然没有一个好用的不依赖常驻内存软件的办法,最终经过探索与验证,总算是彻底解决了此问题&…...
大数据学习(112)-Analytic函数集
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言📝支持一…...