【项目篇之统一内存操作】仿照RabbitMQ模拟实现消息队列
我们的操作分为两种,一种是在内存上进行统一的操作,一种是在硬盘上面操作,今天我写的文章是编写了一个MemoryDataCenter类来实现了 在内存上面的统一操作:
实现统一内存操作
- 如何使用内存来组织数据
- 创建一个类来统一管理内存上的所有数据
- 针对交换机的操作
- 针对队列进行操作
- 针对绑定进行操作
- 新增绑定
- 获取绑定
- 删除绑定
- 针对消息进行操作
- 添加消息
- 查询信息
- 删除消息
- 发送消息到指定队列
- 从队列中取出到消息
- 获取指定队列中消息的个数
- 针对未确认消息进行操作
- 添加未确认消息
- 删除未确认的消息
- 获取指定的未确认消息
- 恢复数据到内存中去
- 总代码如下所示:
- 内存管理数据的总结
对于MQ来说,是以在内存上存储数据为主的,
在硬盘上存储数据为辅的
在硬盘上存储数据主要是为了进行持久化保存,重启之后,数据不会丢失
但是我们在真正去进行消息转发的的过程中,各种核心的逻辑都还是以内存上存储的数据为主的
因为访问内存比访问硬盘要快得多,所以我们还是要使用内存来组织数据
如何使用内存来组织数据
在交换机上使用一个数据结构:HashMap来组织数据:key是交换机的name,value是交换机对象
队列: 直接使用HashMap来组织数据: key是队列的name,value是队列对象
绑定: 使用一个嵌套的HashMap来组织数据:key是exchangeName,value是一个HashMap2,这个HashMap2中也有一个key1:key1是队列的name,HashMap2中的value2是绑定对象。
去查询这个绑定的时候,先根据交换机名字去查询,查询到的结果也还是一个HashMap2,表示的是该交换机都绑定了哪些队列,然后再进一步根据队列名字去HashMap2中查询绑定对象
消息: 也使用一个HashMap去进行管理数据:key是messageId,value是Message对象
消息是存放在队列中的,消息和队列之间还有一个归属关系,
通过MessageId可以查询到是哪一个Message对象,同时我们还得知道当前这个消息对象是放到哪一个队列中的
映射:
所以再做出一个映射:表示队列和消息之间的关联:
表示出每个队列中都有哪些消息:
如何表示:
使用嵌套的HashMap来组织,key是queueName,value是一个LinkedList,这个LinkedList里面的每一个元素都是一个Message对象
表示“未被确认”的消息:
首先,为什么会有这个未被确认的消息,是和我们的MQ的特性有关:
表示“未被确认”的消息:这里面就存储了当前队列中哪些消息被消费者取走了,但是还没有应答
我们使用嵌套的HashMap来组织这个"未被确认的消息"数据:key是queueName,value是一个HashMap
这个value的HashMap中的key是messageId,value是Message对象
后续实现消息确认的逻辑是
需要根据ACK相应的内容,这里会提供一个确认的messageId,根据这个messageId来把上述结构中的Message对象找到并进行移除
所以这里使用HashMap去查找更好一些
创建一个类来统一管理内存上的所有数据
创建一个类来统一管理内存上的所有数据
/*
使用这个类来统一管理内存上的所有数据 */
public class MemoryDataCenter { //key是exchangeName,value是exchange对象 private HashMap<String, Exchange> exchangeMap = new HashMap<>(); }
注意:这个类之后会提供一些增删改查的操作,让上一层代码去进行调用,也就是我们的brokerServer要处理很多的请求,此时有些请求涉及到去创建交换机,创建绑定,创建队列,此时服务器处理多个请求,
这个类提供的一些方法,会在多线程环境下使用,所以就需要去处理线程不安全的问题,
此时就会涉及到多线程的问题,也就会涉及到线程安全的问题,但是HashMap是一个线程不安全的数据结构,所以就需要换一个数据结构来操作,使用另一个线程安全的数据结构:ConcurrentHashMap
所以代码修改如下:
//key是exchangeName,value是exchange对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
//存储交换机
//key是exchangeName,value是exchange对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>(); //存储队列
//key是queueName,value是MSGQueue对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>(); //存储绑定
//第一个key是exchangeName,第二个key是queueName
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>(); //存储消息
//key是messageId,value是Message对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>(); //存储: 消息和队列之间的从属关系
//key是队列的名字queueName,value是一个Message的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>(); //待确认的消息
//第一个key是queueName,第二个key是messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();
针对交换机的操作
//插入交换机
public void insertExchange(Exchange exchange){ //把交换机插入到HashMap表中即可 exchangeMap.put(exchange.getName(),exchange); System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName = "+exchange.getName());
} //查找交换机: 根据交换机名字查找交换机
public Exchange getExchange(String exchangeName){ //在HashMap表中查找交换机 return exchangeMap.get(exchangeName);
} //删除交换机:
public void deleteExchange(String exchangeName){ exchangeMap.remove(exchangeName); System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName = "+exchangeName);
}
针对队列进行操作
//插入队列
public void insertQueue(MSGQueue queue){ queueMap.put(queue.getName(), queue); System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName());
} //查找队列
public MSGQueue getQueue(String queueName){ return queueMap.get(queueName);
} //删除队列
public void deleteQueue(String queueName){ queueMap.remove(queueName); System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName);
}
针对绑定进行操作
新增绑定
- 查询:根据exchangeName查询哈希表是否存在
- 查询:根据queueName查询绑定是否存在
- 插入:在内置的哈希表中插入queueName和绑定
多个线程去进行插入绑定的方法时候,要保证是线程安全的
第一步的查询操作中是从哈希表中查询出数据,本身是线程安全的
但是第二步和第三步的操作有线程安全的风险:
会出现下面的情况:
虽然哈希表本身是线程安全的,同时哈希表的查询和插入单独拎出来一个也是线程安全的,但是这两步查询操作和插入操作放在一起进行联动,那就需要将这两步合在一起,成为一个原子性的操作,进行加锁
//新增绑定 public void insertBinding(Binding binding) throws MqException { //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个 ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //不存在就创建出来 bindingMap =new ConcurrentHashMap<>(); //创建完毕之后就放入bindingsMap中去 bindingsMap.put(binding.getExchangeName(), bindingMap); } //上面这段代码有点多,我们也可以使用下面这段代码来代替 //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), // k -> new ConcurrentHashMap<>()); synchronized (bindingMap){ //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定 if(bindingMap.get(binding.getQueueName()) != null){ throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } //3.查询到绑定不存在,就可以插入绑定了 bindingMap.put(binding.getQueueName(), binding); } System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName() +", queueName = " +binding.getQueueName()); }
获取绑定
//获取绑定:写两个版本
// 1. 根据exchangeName和queueName确定唯一一个Binding
// 2. 根据exchangeName获取到所有的Binding // 1. 根据exchangeName和queueName确定唯一一个Binding
public Binding getBinding(String exchangeName, String queueName){ ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName); if(bindingMap == null){ return null; } return bindingMap.get(queueName);
} // 2. 根据exchangeName获取到所有的Binding
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){ return bindingsMap.get(exchangeName);
}
删除绑定
//删除绑定:
public void deleteBinding(Binding binding) throws MqException { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //该交换机没有绑定任何队列,抛出异常 throw new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.remove(binding.getQueueName()); System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName() + ", queueName = " + binding.getQueueName());
}
针对消息进行操作
添加消息
//添加消息
public void addMessage(Message message){ messageMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId());
}
查询信息
我们是根据MessagId来查询消息的:
public Message getMessage(String messageId){ return messageMap.get(messageId);
}
删除消息
我们是根据MessageId来删除消息的
public void removeMessage(String messageId){ messageMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息被删除成功了 messageId= "+messageId);
}
发送消息到指定队列
//发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message){ //把消息放到对应的数据结构中 //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中 LinkedList<Message> messages =queueMessageMap.get(queue.getName()); if(messages == null){ //如果没有这个消息链表,就创建出这个链表 messages = new LinkedList<>(); //创建出链表之后,就把这个链表放到队列中去 queueMessageMap.put(queue.getName(), messages); } //再把消息数据加到messages这个链表里面 //由于链表本身是线程不安全的,所以要加锁 synchronized (messages){ //把消息数据加到messages这个链表里面 messages.add(message); } //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入 //如果这个消息已经在消息中心中存在了,这里再次插入消息也没事, // 主要是相同messageId对应的message内容是一样的 addMessage(message); System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId()); }
从队列中取出到消息
//从队列中获取到消息
public Message pollMessage(String queueName){ //先根据队列名去查找一下。对应的队列的消息链表 //如果没找到,说明队列中没有消息 LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ return null; } synchronized (messages){ if(messages.size() == 0){ return null; } //如果链表中有元素,那就进行头删操作//消息取出来之后,就要在messages中删除,因为消息已经被取走了 Message currentMessage = messages.remove(0); System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId()); return currentMessage; }
}
获取指定队列中消息的个数
//获取指定队列中消息的个数
public int getMessageCount(String queueName){ LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ //队列中没有消息 return 0; } synchronized (messages){ return messages.size(); }
}
针对未确认消息进行操作
添加未确认消息
public void addMessageWaitAck(String queueName, Message message){ ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>()); messageHashMap.put(message.getMessageId(),message); System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId()); }
删除未确认的消息
//删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); }
获取指定的未确认消息
//删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); }
恢复数据到内存中去
//这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中
//通过DiskDataCenter来获取到硬盘上面的数据
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { //0. 在恢复之间,先把里面的所有数据全部清空 exchangeMap.clear(); queueMap.clear(); bindingsMap.clear(); messageMap.clear(); queueMessageMap.clear(); //1.恢复所有的交换机数据 List<Exchange> exchanges = diskDataCenter.selectAllExchanges(); for(Exchange exchange : exchanges){ exchangeMap.put(exchange.getName(), exchange); } //2.恢复所有的队列数据 // 2. 恢复所有的队列数据 List<MSGQueue> queues = diskDataCenter.selectAllQueues(); for (MSGQueue queue : queues) { queueMap.put(queue.getName(), queue); } //3.恢复所有的绑定数据 List<Binding> bindings = diskDataCenter.selectAllBindings(); for(Binding binding : bindings){ ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); } //4.恢复所有的消息数据 //遍历所有的队列,然后根据每个队列的名字获取到所有的消息 for(MSGQueue queue : queues){ LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName()); queueMessageMap.put(queue.getName(),messages); for(Message message : messages){ messageMap.put(message.getMessageId(), message); } } //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块 //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息” //这个消息在硬盘上存储的时候,就是当做了“未被取走” }
总代码如下所示:
package org.example.mqtexxt.mqserver.datacenter; import org.example.mqtexxt.common.MqException;
import org.example.mqtexxt.mqserver.core.Binding;
import org.example.mqtexxt.mqserver.core.Exchange;
import org.example.mqtexxt.mqserver.core.MSGQueue;
import org.example.mqtexxt.mqserver.core.Message; import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap; /*
使用这个类来统一管理内存上的所有数据 */public class MemoryDataCenter { //存储交换机 //key是exchangeName,value是exchange对象 private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>(); //存储队列 //key是queueName,value是MSGQueue对象 private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>(); //存储绑定 //第一个key是exchangeName,第二个key是queueName private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>(); //存储消息 //key是messageId,value是Message对象 private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>(); //存储: 消息和队列之间的从属关系 //key是队列的名字queueName,value是一个Message的链表 private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>(); //待确认的消息 //第一个key是queueName,第二个key是messageId private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>(); //针对交换机进行操作: //插入交换机 public void insertExchange(Exchange exchange){ //把交换机插入到HashMap表中即可 exchangeMap.put(exchange.getName(),exchange); System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName = "+exchange.getName()); } //查找交换机: 根据交换机名字查找交换机 public Exchange getExchange(String exchangeName){ //在HashMap表中查找交换机 return exchangeMap.get(exchangeName); } //删除交换机: public void deleteExchange(String exchangeName){ exchangeMap.remove(exchangeName); System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName = "+exchangeName); } //针对队列进行操作 //插入队列 public void insertQueue(MSGQueue queue){ queueMap.put(queue.getName(), queue); System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName()); } //查找队列 public MSGQueue getQueue(String queueName){ return queueMap.get(queueName); } //删除队列 public void deleteQueue(String queueName){ queueMap.remove(queueName); System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName); } //针对绑定的操作 //新增绑定 public void insertBinding(Binding binding) throws MqException { //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个 ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //不存在就创建出来 bindingMap =new ConcurrentHashMap<>(); //创建完毕之后就放入bindingsMap中去 bindingsMap.put(binding.getExchangeName(), bindingMap); } //上面这段代码有点多,我们也可以使用下面这段代码来代替 //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), // k -> new ConcurrentHashMap<>()); synchronized (bindingMap){ //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定 if(bindingMap.get(binding.getQueueName()) != null){ throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } //3.查询到绑定不存在,就可以插入绑定了 bindingMap.put(binding.getQueueName(), binding); } System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName() +", queueName = " +binding.getQueueName()); } //获取绑定:写两个版本 // 1. 根据exchangeName和queueName确定唯一一个Binding // 2. 根据exchangeName获取到所有的Binding // 1. 根据exchangeName和queueName确定唯一一个Binding public Binding getBinding(String exchangeName, String queueName){ ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName); if(bindingMap == null){ return null; } return bindingMap.get(queueName); } // 2. 根据exchangeName获取到所有的Binding public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){ return bindingsMap.get(exchangeName); } //删除绑定 public void deleteBinding(Binding binding) throws MqException { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //该交换机没有绑定任何队列,抛出异常 throw new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.remove(binding.getQueueName()); System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName() + ", queueName = " + binding.getQueueName()); } //添加消息 public void addMessage(Message message){ messageMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId()); } //根据MessageId查询信息 public Message getMessage(String messageId){ return messageMap.get(messageId); } //根据MessageId删除消息 public void removeMessage(String messageId){ messageMap.remove(messageId); System.out.println("[MemoryDatCenter] 消息被删除成功了 messageId= "+messageId); } //发送消息到指定队列 public void sendMessage(MSGQueue queue, Message message){ //把消息放到对应的数据结构中 //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中 LinkedList<Message> messages =queueMessageMap.get(queue.getName()); if(messages == null){ //如果没有这个消息链表,就创建出这个链表 messages = new LinkedList<>(); //创建出链表之后,就把这个链表放到队列中去 queueMessageMap.put(queue.getName(), messages); } //再把消息数据加到messages这个链表里面 //由于链表本身是线程不安全的,所以要加锁 synchronized (messages){ //把消息数据加到messages这个链表里面 messages.add(message); } //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入 //如果这个消息已经在消息中心中存在了在,这里再次插入消息也没事, // 主要是相同messageId对应的message内容是一样的 addMessage(message); System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId()); } //从队列中获取到消息 public Message pollMessage(String queueName){ //先根据队列名去查找一下。对应的队列的消息链表 //如果没找到,说明队列中没有消息 LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ return null; } synchronized (messages){ if(messages.size() == 0){ return null; } //如果链表中有元素,那就进行头删操作 Message currentMessage = messages.remove(0); System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId()); return currentMessage; } } //获取指定队列中消息的个数 public int getMessageCount(String queueName){ LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ //队列中没有消息 return 0; } synchronized (messages){ return messages.size(); } } //添加未确认消息 public void addMessageWaitAck(String queueName, Message message){ ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>()); messageHashMap.put(message.getMessageId(),message); System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId()); } //删除未确认的消息(消息已经确认了) public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); } //获取指定的未确认消息 public Message getMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String,Message> messageConcurrentHashMap = queueMessageWaitMap.get(queueName); if(messageId == null){ return null; } return messageConcurrentHashMap.get(messageId); } //这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 //通过DiskDataCenter来获取到硬盘上面的数据 public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { //0. 在恢复之间,先把里面的所有数据全部清空 exchangeMap.clear(); queueMap.clear(); bindingsMap.clear(); messageMap.clear(); queueMessageMap.clear(); //1.恢复所有的交换机数据 List<Exchange> exchanges = diskDataCenter.selectAllExchanges(); for(Exchange exchange : exchanges){ exchangeMap.put(exchange.getName(), exchange); } //2.恢复所有的队列数据 // 2. 恢复所有的队列数据 List<MSGQueue> queues = diskDataCenter.selectAllQueues(); for (MSGQueue queue : queues) { queueMap.put(queue.getName(), queue); } //3.恢复所有的绑定数据 List<Binding> bindings = diskDataCenter.selectAllBindings(); for(Binding binding : bindings){ ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); } //4.恢复所有的消息数据 //遍历所有的队列,然后根据每个队列的名字获取到所有的消息 for(MSGQueue queue : queues){ LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName()); queueMessageMap.put(queue.getName(),messages); for(Message message : messages){ messageMap.put(message.getMessageId(), message); } } //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块 //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息” //这个消息在硬盘上存储的时候,就是当做了“未被取走” } }
内存管理数据的逻辑已经编写得差不多了,下面进行简单的总结
内存管理数据的总结
内存管理这一块主要是使用到了一系列的数据结构,保存和管理,交换机,队列,绑定,消息
我们广泛地使用到了哈希表,链表,嵌套的结构
线程安全问题
涉及到加锁操作
要不要加锁?
锁加到哪里?
使用哪个对象作为锁对象?
是没有统一的加锁规则的,只能要具体问题具体分析
总的加锁原则是:
如果代码不加锁,会造成什么样的后果和问题
后果是否很严重?
你能不能接受后果?
相关文章:
【项目篇之统一内存操作】仿照RabbitMQ模拟实现消息队列
我们的操作分为两种,一种是在内存上进行统一的操作,一种是在硬盘上面操作,今天我写的文章是编写了一个MemoryDataCenter类来实现了 在内存上面的统一操作: 实现统一内存操作 如何使用内存来组织数据 创建一个类来统一管理内存上的…...
强化学习机器人模拟器——GridWorld:一个用于强化学习的 Python 环境
GridWorld 是一个为强化学习(Reinforcement Learning, RL)实验设计的多功能 Python 环境。它提供了一个可定制的二维网格,智能体(agent)需要从起始位置导航到目标位置,避开障碍物、穿越泥泞单元格并收集奖励。本篇博客将详细介绍 grid_world.py 代码中实现的 GridWorld 环…...
DeepSeek Copilot idea插件推荐
🌌 DeepSeek Copilot for IntelliJ IDEA 让 AI 成为你的编程副驾驶,极速生成单元测试 & 代码注释驱动开发! 🚀 简介 DeepSeek Copilot 是一款为 IntelliJ IDEA 打造的 AI 编程助手插件,它能够智能分析你的代码逻辑…...
vue-cropper实现图片裁剪
一、什么是vue-cropper? Vue-Cropper 是一个基于 Vue.js 的图片裁剪组件库,专为 Web 应用设计。当你在网上搜索的时候发现还有一个叫cropper的库,下面是他们的区别: 特性cropper.jsvue-cropper框架依赖纯 JavaScript&am…...
MPI,Pthreads和OpenMP等并行实验环境配置
(假设你已按照文档前面的步骤正确安装了 VMware 和 Ubuntu 20.04) 第一部分:安装 C/OpenMP/Pthreads 环境(修正后) 打开终端: 在 Ubuntu 中启动终端应用程序。 更新软件包列表: sudo ap…...
Spring AI Advisors API:AI交互的灵活增强利器
Spring AI Advisors API:AI交互的灵活增强利器 前言 在当今的软件开发领域,随着人工智能技术的飞速发展,将AI融入应用程序变得越来越普遍。Spring AI作为一个强大的框架,为开发者提供了便捷的方式来实现这一目标。其中的Advisor…...
排序功法入门指南【江湖算法笔记】
话说江湖风云变幻,各路英雄好汉行走江湖,总得有个名号排行。若问“东邪西毒南帝北丐”谁强谁弱,总得排个座次不是?这排序之道,恰似武功秘籍,练好了能号令群雄,练岔了怕是要被笑掉大牙࿰…...
Free Draft Model!Lookahead Decoding加速大语言模型解码新路径
Free Draft Model!Lookahead Decoding加速大语言模型解码新路径 大语言模型(LLMs)在当今AI领域大放异彩,但其自回归解码方式锁死了生成效率。本文将为你解读一种全新的解码算法——Lookahead Decoding,它无需Draft Mo…...
Spring AI 实战:第八章、Spring AI Tool Calling之与时俱进
引言:AI的"知识截止日期"尴尬 如果你想问大模型"明天是星期几?",猜猜TA会怎么答复你~ @GetMapping("/tools/simple/test") public String simpleTest() {return chatClient.prompt...
PyTorch数据集与数据集加载
PyTorch中的Dataset与DataLoader详解 1. Dataset基础 Dataset是PyTorch中表示数据集的抽象类,我们需要继承它并实现两个关键方法: from torch.utils.data import Datasetclass CustomDataset(Dataset):def __init__(self, data, labels):""…...
探秘 Git 底层原理:理解版本控制的基石
Git 是一款开源的分布式版本控制系统,在软件开发领域广泛应用,能有效管理项目的版本变更,Git 已经成为了版本控制的代名词。日常使用中,我们通过git commit提交代码,用git push推送变更,这些便捷操作背后&a…...
chili3d调试10 网页元素css node deepwiki 生成圆柱体 生成零件图片
.input是input的外框,.input input是input的内框 沙雕 全部input都换成textarea了 自己的方法用接口定义,把自己的方法pub出去,定义在内部拉出去只是取个值 这其实是mainwindow端pub回来的 窗口pub端把数据pub回 mainwindow端让mainwindow端…...
【计网】互联网的组成
回顾: 互联网(Internet):它是一个专有名词,是一个特定的互连网,它是指当下全球最大的、最开放的、由众多网络相互连接而形成的特定的的互连网,采用TCP/IP协议族作为通信规则。 一、互联网的组成部分 从互联网的工作方…...
Go语言接口实现面对对象的三大特征
一.知识回顾 在 Go 语言中,接口是一种强大的抽象机制,它允许我们定义一组方法签名,任何类型只要实现了这些方法,就被视为实现了该接口。接口的实现是隐式的,这意味着类型不需要显式声明它实现了某个接口,只…...
TS 字面量类型
str是string类型l str2是常量,类型是字面量类型 用途:配合联合类型确定更严谨精确的可选值利恩...
langchain中 callbacks constructor实现
目录 代码代码解释代码结构代码功能 类似例子 代码 from typing import Any, Dict, Listfrom langchain_openai import ChatOpenAI from langchain_core.callbacks import BaseCallbackHandler from langchain_core.messages import BaseMessage from langchain_core.outputs …...
小土堆pytorch--tensorboard的使用
小土堆pytorch--tensorboard的使用 小土堆pytorch--tensorboard的使用0.介绍1.使用tensorboard绘制 y x 等简单函数1.1 相应的代码1.2 对上述代码的解释1.3 可能遇到的问题1.3.1 问题1.3.2 解决方法 2.使用tensorboard加载数据集中的图片2.1 相应代码2.2 对上述代码的解释2.2.…...
从 0 到 1:使用 Jetpack Compose 和智能自动化实现高效 Android UI 开发
现代 Android UI 开发正逐步从命令式 XML 向声明式 Compose 转变。Compose 凭借其简洁、高效、易测试的特点,能够让开发者更专注于界面和业务逻辑,而不必陷入大量模板化的代码。手把手带你构建一个完整的 Todo List 应用,并演示如何借助自动化…...
学习黑客 week1周测 复盘
Day 7 – 周测 & 复盘 今天任务: 完成 10 道快测题,涵盖 Week 1 的核心知识点:《CIA 三要素》、OWASP Top 10、MITRE ATT&CK、NIST RMF、Linux 权限、TCP/IP、网络安全法、“黑客五阶段” 与风险管理。撰写 300 字周总结…...
【五一培训】Day 3
Topic 1:元学习 一、概念:learn to learn 区分少样本学习与元学习 少样本学习(Few-shot learning)是元学习的一个重要应用,它指的是机器能够在仅有少量样本的情况下,成功地学习和泛化到新任务上。在许多现…...
C++继承详讲
1.继承的概念 继承是实现代码复用的手段,它允许程序员在保持基类特性的基础上进行扩展,增加功能,这样产生新的类,称派生类。 2.继承和组合 1.继承体系下,子类对象包含父类的成员。组合体系下,子类对象包含…...
第四节:OpenCV 基础入门-第一个 OpenCV 程序:图像读取与显示
一、引言:为什么选择 OpenCV? 在计算机视觉领域,OpenCV(Open Source Computer Vision Library)是一个开源的、跨平台的计算机视觉库,广泛应用于图像处理、模式识别、机器学习等领域。它支持多种编程语言&a…...
基于PHP实现的easy管理系统
easy管理系统 2.0.1 easy管理系统 是一个多功能的 Web 管理平台,旨在简化项目管理、文件共享和协作流程。它集成了大创项目管理、在线文档生成、代码托管等多种功能,并提供了用户管理、系统设置、日志查看等后台管理能力。 ✨ 功能特性 统一管理平台:…...
ios systeam introduction
Here is an in-depth look at Apple’s iOS, from its inception to its latest major release, covering architecture, core components, security, app lifecycle, development tools, and the headline features of iOS 18. iOS began life as “iPhone OS,” unveiled alo…...
【论文阅读】LLMOPT:一种提升优化泛化能力的统一学习框架
文章目录 第一遍一、摘要二、关键词三、预知识1. 什么是优化泛化问题2. 什么是消融研究3. model alignment(模型对齐) 第二遍:了解论文论点一、研究背景与目的二、相关工作三、LLMOPT框架四、METHODOLOGY(方法论)1. 数据处理2. 学习过程3. 自…...
Prompt多版本测试指南:如何科学评估不同提示词的效果
对于现代AI开发来说,同一个需求,不同的提示表达方式往往会产生截然不同的结果。因此,如何设计、测试和优化提示词成为了一项关键技能。 本文将深入探讨Prompt多版本测试的技术方法,帮助你系统性地评估不同提示词的效果࿰…...
每日c/c++题 备战蓝桥杯(洛谷P1015 [NOIP 1999 普及组] 回文数)
洛谷P1015 [NOIP 1999 普及组] 回文数 题解 题目描述 P1015 回文数 是NOIP 1999普及组的经典模拟题。题目要求如下: 给定一个数N(十进制)和进制K(2≤K≤16),将N转换为K进制表示后,通过以下操…...
最小单调子序列的长度+联通最小乘积
因为题目ICPC是英文版,基于大家都不怎么看的懂的情况下直接给大家进行题目讲解 题目1: 题目分析: 构造一个长度为n的排列 p(里面的数是1-n),不能重复得 max(lis(p),lds(p)) 最小。 其中,lis(p)是 p 的最长递增子序…...
OpenHarmony平台驱动开发(一),ADC
OpenHarmony平台驱动开发(一) ADC 概述 功能简介 ADC(Analog to Digital Converter),即模拟-数字转换器,可将模拟信号转换成对应的数字信号,便于存储与计算等操作。除电源线和地线之外&#…...
数据结构与算法:回溯
回溯 先给出一些leetcode算法题,以后遇见了相关题目再往上增加 主要参考代码随想录 2.1、组合问题 关于去重:两种写法的性能分析 需要注意的是:使用set去重的版本相对于used数组的版本效率都要低很多,大家在leetcode上提交&#x…...
KaiwuDB X 遨博智能 | 构建智能产线监测管理新系统
01 项目背景 遨博智能作为国内协作机器人行业领军企业,深度布局制造、农业、医疗、教育、民生等场景,出货量连续四年蝉联国内第一、世界第二。随着工业自动化的蓬勃发展,遨博智能生产规模不断扩大,先后在常州、淄博等地建设完成…...
高等数学第三章---微分中值定理与导数的应用(§3.6 函数图像的描绘§3.7 曲率)
3.6 函数图像的描绘 一、曲线的渐近线 对于某些函数,其图形向无穷远处延伸时,会越来越趋近于某一条直线,这条直线被称为曲线的渐近线 (Asymptote)。 1. 定义 若曲线 y f ( x ) yf(x) yf(x) 上一点 P ( x , y ) P(x, y) P(x,y) 沿曲线趋…...
【PostgreSQL数据分析实战:从数据清洗到可视化全流程】4.2 数据类型转换(CAST函数/自定义函数)
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 PostgreSQL数据分析实战:数据清洗之数据类型转换(CAST函数/自定义函数)4.2 数据类型转换:让数据「格式正确,类型对…...
docker:制作镜像+上传镜像+拉取镜像
1.dockerfile制作镜像 示例内容: 1.创建一个index.js的文件 console.log("hello world")2.在相同目录下创建名为dockerfile的文件 FROM node:alpine COPY index.js /index.js CMD node /index.js3.构建镜像 docker build -t minterra/hello-docker . …...
信息系统监理师第二版教材模拟题第三组(含解析)
信息系统监理师模拟题第三组(30题) 监理基础理论 信息系统工程监理的性质是( ) A. 服务性、独立性、公正性、科学性 B. 强制性、营利性、行政性、技术性 C. 临时性、从属性、随意性、主观性 D. 单一性、封闭性、被动性、保守性答案:A 解析:监理具有服务性、独立性、公正…...
潮乎盲盒商城系统全开源多级分销推广海报奖品兑换试玩概率OSS云存储多端源码
一、源码描述 这是一套潮乎盲盒商城源码,仿小叮当盲盒商城,后端Laravel框架前端uniappvue,前后端数据库分离,支持四端同步数据(H5小程序等),测试环境: php7.4,mysql5.6,…...
文章记单词 | 第64篇(六级)
一,单词释义 residence [ˈrezɪdəns] n. 住宅;居住;住所;居住期fling [flɪŋ] v. (用力地)扔,掷,抛;猛动(身体或身体部位);急冲&a…...
数据同步实战篇
文章目录 数据同步实战篇1. mysql数据同步1.1 mysql集群部署1.2 数据同步1.2.1 同步复制1.2.2 异步复制1.2.3 半同步复制 2. redis数据同步2.1 redis集群部署2.2 数据同步 3. mq数据同步3.1 mq集群部署3.2 数据同步 4. es数据同步4.1 es集群部署4.2 数据同步 数据同步实战篇 数…...
具身系列——Double DQN算法实现CartPole游戏(强化学习)
完整代码参考: rl/ddqn_cartpole.py 陈先生/ailib - Gitee.com 部分训练得分: Model saved to ./output/best_model.pth New best model saved with average reward: 9.6 Episode: 0 | Train Reward: 25.0 | Epsilon: 0.995 | Best Eval Avg: 9.6…...
以下是在 Ubuntu 上的几款PDF 阅读器,涵盖轻量级、功能丰富和特色工具:
默认工具:Evince(GNOME 文档查看器) 特点:Ubuntu 预装,轻量快速,支持基本标注和书签。 安装:已预装,或手动安装: sudo apt install evince功能全面:Okular&…...
有关水下图像增强的论文
4.21 TEBCF:Real-World Underwater Image Texture Enhancement Model Based on Blurriness and Color Fusion 基于模糊和颜色融合的现实水下图像纹理增强模型 2022年的一篇文章,基于传统方法,基于不同的色彩方法构建了两个新的融合输入。一…...
Raycaster光线投射
Raycaster光线投射 3D虚拟工厂在线体验 描述 光线投射Raycaster,用于进行raycasting(光线投射)。 光线投射用于进行鼠标拾取(在三维空间中计算出鼠标移过了什么物体)。 构造器 Raycaster( origin : Vector3, dire…...
javaEE——单例模式
目录 前言1.概念2. 实现3. 比较和改进总结 前言 本篇文章来介绍单例模式,并讲述在保证线程安全的前提下,单例模式的写法。 1.概念 单例模式是一种设计模式,可以说是写代码的一种模板,如果在一些固定的场景下按照设计模式进行写…...
WSL在D盘安装Ubuntu
目录 前提条件步骤一:查看可用的Linux发行版步骤二:安装Ubuntu 22.04步骤三:导出已安装的Ubuntu到D盘步骤四:注销当前Ubuntu安装步骤五:在D盘导入Ubuntu启动Ubuntu 前提条件 Windows 10或Windows 11系统已启用WSL功能…...
Java并发编程-多线程基础(三)
文章目录 线程间通信线程间通信的核心问题volatile 关键字1. 核心特性2. 使用限制3. 示例 synchronized 关键字1. 核心特性2. 示例 volatile 与 synchronized 的对比Volatile 和 Synchronized 最佳实践 线程间通信 线程间通信的核心问题 多个线程通过共享内存实现信息交换&am…...
React--》掌握react构建拖拽交互的技巧
在这篇文章中将深入探讨如何使用react-dnd,从基础的拖拽操作到更复杂的自定义功能带你一步步走向实现流畅、可控且用户友好的拖拽体验,无论你是刚接触拖拽功能的初学者还是想要精细化拖拽交互的经验开发者,都能从中找到适合自己的灵感和解决方案。 目录 …...
【Qt】常用的类与数据类型
目录 一、Qt常见基本数据类型 二、Qt 字符串类应用 2.1 操作字符串 2.2 查询字符串 三、QMap 类&QHash 类&QVector 类 3.1 QMap 类 3.2 QHash 类 3.3 QVector 类 四、QList 类&QLinkedList 类 4.1 QList 类 4.2 QLinkedList 类 4.3 STL 风格迭代器遍历…...
React实现B站评论Demo
该Demo涉及的技术点 useState函数(数据驱动视图)子组件的封装条件判断回调函数的封装 1、评论数据 {"list": [{"rpid": 3,"user": {"uid": "13258165","avatar": "http://toutiao.…...
从实列中学习linux shell12 通过Shell脚本来优化MySQL数据库性能,特别是慢SQL跟踪和索引优化
在Shell脚本中优化MySQL数据库性能,特别是慢SQL跟踪和索引优化 可以通过以下步骤实现。以下是一个结构化的解决方案,包含示例代码和详细说明: 1. 启用慢查询日志 目标:动态启用慢查询日志并配置参数,收集慢SQL数据。…...
ES6入门---第三单元 模块一:类、继承
补充: prototype 属性使您有能力向对象添加属性和方法。 object.prototype.namevalue <script>function Person(name, age){this.name name;this.age age;}/* Person.prototype.showName function(){return 名字为: ${this.name};};Person.prototype.showA…...