当前位置: 首页 > news >正文

Nacos源码—4.Nacos集群高可用分析三

大纲

6.CAP原则与Raft协议

7.Nacos实现的Raft协议是如何写入数据的

8.Nacos实现的Raft协议是如何选举Leader节点的

9.Nacos实现的Raft协议是如何同步数据的

10.Nacos如何实现Raft协议的简版总结

6.CAP原则与Raft协议

(1)CAP分别指的是什么

(2)什么是分区以及容错

(3)为什么不能同时满足CAP原则

(4)Raft协议定义节点的三种状态

(5)Raft协议的数据同步流程

(6)Raft协议的Leader选举流程

(7)Raft协议如何解决脑裂问题

(8)总结

(1)CAP分别指的是什么

一.C指的是一致性Consistency

各个集群节点之间的数据,必须要保证一致。

二.A指的是可用性Availability

在分布式架构中,每个请求都能在合理的时间内获得符合预期的响应。

三.P指的是分区容错性Partition Tolerance

当集群节点间出现网络问题,整个系统依然能正常提供服务。

在CAP原则中,我们首先要保证P即分区容错性。

(2)什么是分区以及容错

分区指的是网络分区。如果在分布式架构中,出现了网络通信问题。比如节点A可以和节点B相互通信,但是不能和节点C、D进行通信。但是节点C、D之间是可以通信的,这种情况下就是出现了网络分区。

容错是指在分布式架构中,集群节点出现分区情况时,整个系统仍然要保持对外提供服务的能力,不能因为网络分区而导致整个系统不能对外提供服务。

在CAP原则下:由于P是首要保证的,所以C、A就不能兼得,必须要舍弃其一。因此需要根据业务来权衡,是更注重可用性、还是更加注重一致性。

(3)为什么不能同时满足CAP原则

首先前提条件是,需要满足P。

情况一:假设在分布式集群中选择使用CP架构,更加注重数据的一致性。这时出现了网络分区,节点A、B与节点C之间网络不互通。如果此时向集群写入一个数据,由于节点A、B能够网络互通,所以节点A、B写入的数据可以相互同步,但是节点C没办法做数据同步。那么在这种情况下,如何才能保证数据的一致性呢?

此时只能将节点C暂时看作不可用的状态,等网络恢复和数据同步好了,节点C才能正常地提供服务。否则下一次用户向集群请求获取数据时,请求到了节点C。但由于网络分区导致节点C并未同步数据,那么本次查询就查不到数据,这样就达不到CP架构的一致性要求了。所以肯定需要舍弃节点C的可用性。

情况二:假设在分布式集群中选择使用AP架构,更加注重数据的可用性。这时出现了网络分区,节点A、B与节点C之间网络不互通。虽然节点C暂时由于网络不通的原因,无法进行数据同步。但是由于集群更加注重服务的可用性,所以节点C还是可以正常提供服务。只是节点C和节点A、B之间的数据略有差异,但不影响节点的正常使用。所以就需要舍弃节点C的数据一致性。

在AP架构中,集群节点间的数据也需要同步。集群节点数据的同步一般都是通过一些异步任务来保证数据的最终一致性,只是同步时效没有那么及时。

在CP架构中,可以通过Raft协议实现数据一致性。Raft协议就是在分布式架构下,多节点保证数据一致性的协议。

(4)Raft协议定义节点的三种状态

Raft协议对集群节点定义了三种状态:

一.Follower追随者

这是默认的状态,所有的集群节点一开始都是Follower状态。

二.Candidate候选者

当某集群节点开始发起投票选举Leader时,首先会投给自己一票,这时就会从Follower状态变成Candidate状态。

三.Leader领导者

当某集群节点获得了大多数集群节点的投票,那么就会变成Leader状态。

(5)Raft协议的数据同步流程

一.Raft协议是如何处理数据写入请求

在Raft协议中,只有Leader节点才会处理客户端数据的写入请求。如果非Leader节点收到了写入请求,会转发到Leader节点上进行处理。

数据的写入一共有两个状态:uncommit和commit。这个两个状态对应于两阶段提交,可以保证数据正确写入成功。

当Leader节点接收到一个数据写入请求时:首先会在自身的节点进行数据处理,然后马上同步给集群的其他节点,此时Leader节点的这个数据的状态是uncommit状态。只有当半数以上的其他节点写入成功,Leader节点才会把数据写入成功。当Leader节点最终把数据写入成功后,会通知其他节点进行commit,此时Leader节点的这个数据的状态是commit状态。

二.非Leader节点写入失败如何处理

由于Leader节点只需要有半数以上的节点写入成功即可,所以如果有部分非Leader节点没有写入或写入失败,该如何处理?

Raft协议中的Leader节点和Follower节点会有心跳机制。在心跳传输过程中,Leader节点会把最新的数据传给其他Follower节点,以保证Follower节点中的数据和Leader节点的数据是一致的。

需要注意的是:当Follower节点没有在指定时间内接收到Leader节点发送过来的心跳包,Follower节点就会认为Leader节点挂掉了,此时Follower节点会把自身状态修改为Candidate并且重新发起投票。

https://thesecretlivesofdata.com/raft/#home
Let's say we have a single node system.
For this example, you can think of our node as a database server that stores a single value.
We also have a client that can send a value to the server.
Coming to agreement, or consensus, on that value is easy with one node.
But how do we come to consensus if we have multiple nodes?
That's the problem of distributed consensus.
Raft is a protocol for implementing distributed consensus.
Let's look at a high level overview of how it works.A node can be in 1 of 3 states: the Follower state, the Candidate state, or the Leader state.
All our nodes start in the follower state.
If followers don't hear from a leader then they can become a candidate.
The candidate then requests votes from other nodes.
Nodes will reply with their vote.
The candidate becomes the leader if it gets votes from a majority of nodes.
This process is called Leader Election.All changes to the system now go through the leader.
Each change is added as an entry in the node's log.
This log entry is currently uncommitted so it won't update the node's value.
To commit the entry the node first replicates it to the follower nodes...
then the leader waits until a majority of nodes have written the entry.
The entry is now committed on the leader node and the node state is "5".
The leader then notifies the followers that the entry is committed.
The cluster has now come to consensus about the system state.
This process is called Log Replication.

(6)Raft协议的Leader选举流程

Leader是如何选举出来的?

一.选举超时时间和选举步骤

假设使用了Raft协议的集群有3个节点:那么一开始,三个节点都会在倒计时中进行等待,此时会有一个称为Election Timeout的随机休眠时间或选举超时时间,该选举超时时间会被随机分配到150ms到300ms之间。

等待超过选举超时时间过后,节点会马上进行投票,投票分为如下几个步骤:

步骤一:先投给自己一票,并且把自己节点状态修改为Candidate

步骤二:向其他集群节点进行投票

步骤三:获取投票结果,如果过半节点投自己,则把状态修改为Leader

一旦Leader节点选举出来,其他节点的数据都要以Leader节点的为准。因此Leader节点会马上通过心跳机制,同步数据给其他节点。

https://thesecretlivesofdata.com/raft/#election
In Raft there are two timeout settings which control elections.First is the election timeout.
The election timeout is the amount of time a follower waits until becoming a candidate.
The election timeout is randomized to be between 150ms and 300ms.
After the election timeout the follower becomes a candidate and starts a new election term...
...votes for itself...
...and sends out Request Vote messages to other nodes.
If the receiving node hasn't voted yet in this term then it votes for the candidate...
...and the node resets its election timeout.
Once a candidate has a majority of votes it becomes leader.
The leader begins sending out Append Entries messages to its followers.Second is the heartbeat timeout.
These messages are sent in intervals specified by the heartbeat timeout.
Followers then respond to each Append Entries message.This election term will continue until a follower stops receiving heartbeats and becomes a candidate.
Let's stop the leader and watch a re-election happen.
Node B is now leader of term 2.
Requiring a majority of votes guarantees that only one leader can be elected per term.If two nodes become candidates at the same time then a split vote can occur.
Let's take a look at a split vote example...
Two nodes both start an election for the same term...
...and each reaches a single follower node before the other.
Now each candidate has 2 votes and can receive no more for this term.
The nodes will wait for a new election and try again.
Node A received a majority of votes in term 5 so it becomes leader.

二.不同的选举情况分析

如果集群启动时,节点C率先等待超过了选举超时时间。那么节点C会马上发起投票,并改变它自己的状态变为Candidate。等节点C获取超过半数以上的投票,那么它就会成为Leader节点。

如果在集群运行中,Leader节点突然下线。那么这时候其他的Follower节点会重新进行Leader选举。假设原本的Leader节点是B,但由于B突然下线,节点A、C会重新发起投票,最终节点C成为新的Leader节点。并且重新选举Leader后,Trem(任期)会进行递增。Term可理解为Leader的选举次数,次数越大说明数据肯定是最全的。

如果有四个节点,其中有两个Candidate节点都有2票,没有过半。在这种情况下,则会让全部节点重新进行随机睡眠,重新进行Leader选举。

(7)Raft协议如何解决脑裂问题

在Raft协议的一些情况下,可能会产生多个Leader节点。那么多个Leader节点是如何产生的?多个Leader会不会有冲突?

如果在一个集群下,出现了两个Leader节点,那么这就是脑裂问题。假设集群节点有5个,节点B是Leader,但由于发生了网络分区问题。节点A、B可以相互通信,可是节点C、D、E不能和Leader进行通信。那么节点C、D、E将会重新进行Leader选举,最终节点C也成为了Leader。此时,在原本一个集群下,就会产生两个Leader节点。

此时,如果有客户端来进行写数据:

第一个客户端请求到了节点B,由于节点B所在分区网络只有一个Follower节点,达不到半数以上要求,所以节点B的数据一直处于uncommit状态,数据也不会写入成功。

第二个客户端请求到了节点C,由于节点C所在分区网络有两个Follower节点,有半数以上支持,所以节点C的数据是能够写入成功的。

假如网络突然恢复,5个节点都可以相互通信,那么怎么处理两个Leader。这时两个Leader会相互发送心跳。节点B会发现节点C的Term比自己大,所以会认节点C为Leader并自动转换为Follower节点。

https://thesecretlivesofdata.com/raft/#replication
Once we have a leader elected we need to replicate all changes to our system to all nodes.
This is done by using the same Append Entries message that was used for heartbeats.
Let's walk through the process.First a client sends a change to the leader. Set value by "5".
The change is appended to the leader's log...
...then the change is sent to the followers on the next heartbeat.
An entry is committed once a majority of followers acknowledge it...
...and a response is sent to the client.Now let's send a command to increment the value by "2".
Our system value is now updated to "7".
Raft can even stay consistent in the face of network partitions.Let's add a partition to separate A & B from C, D & E.
Because of our partition we now have two leaders in different terms.
Let's add another client and try to update both leaders.
One client will try to set the value of node B to "3".
Node B cannot replicate to a majority so its log entry stays uncommitted.
The other client will try to set the value of node C to "8".
This will succeed because it can replicate to a majority.Now let's heal the network partition.
Node B will see the higher election term and step down.
Both nodes A & B will roll back their uncommitted entries and match the new leader's log.
Our log is now consistent across our cluster.

(8)总结

Raft协议相关论文:

https://raft.github.io/raft.pdf

Raft协议详细流程演示:

https://thesecretlivesofdata.com/raft/

Nacos既支持AP架构,也支持CP架构。前面介绍的集群源码,是属于AP架构的。在源码中可以看到很多异步任务,说明是比较看重可用性。由于是使用定时任务,那么数据会在某些特定时间出现不一致的情况,但最终还是会保证一致性。

7.Nacos实现的Raft协议是如何写入数据的

(1)Nacos 1.4.1版本实现Raft协议说明

(2)Nacos实现的Raft协议是如何写入数据的

(3)RaftCore的signalPublish()方法总结

(1)Nacos 1.4.1版本实现Raft协议说明

Nacos 1.4.1版本并没有完全按照标准的Raft协议所定义的流程来实现,所以该版本的实现中会存在一些问题。并且Nacos 1.4.1版本,已标注后期会删除这套实现。

Nacos 2.x版本会采用JRaft来实现Raft协议,JRaft就是完全按照Raft协议定义的流程来实现的。所以早期版本实现的Raft协议,没必要仔细研究,大概知道流程即可。

(2)Nacos实现的Raft协议是如何写入数据的

在Raft协议里只有Leader节点才会操作数据,并且会有两阶段提交的动作,所以可以通过服务实例注册的处理为入口进行分析。

在进行服务实例注册时:会通过一个key来选择调用不同ConsistencyService实现类的put()方法。而这个key中会包含一个很关键的属性叫做ephemeral,ephemeral默认是true,所以最终会执行AP架构下的服务注册。我们可以在yml配置文件中,把ephemeral属性设置为false,那么在服务实例注册时,就会执行CP架构下的服务注册。不过,注册中心一般很少使用CP架构。

如果执行的是CP架构下的服务注册,那么最终会调用RaftConsistencyServiceImpl的put()方法,从而触发调用Raft协议的核心方法:RaftCore的signalPublish()方法。

@Component
public class ServiceManager implements RecordListener<Service> {...//Add instance to service. 添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}...
}@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {    private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;public DelegateConsistencyServiceImpl(PersistentConsistencyServiceDelegateImpl persistentConsistencyService, EphemeralConsistencyService ephemeralConsistencyService) {this.persistentConsistencyService = persistentConsistencyService;this.ephemeralConsistencyService = ephemeralConsistencyService;}@Overridepublic void put(String key, Record value) throws NacosException {//如果是临时实例,则调用DistroConsistencyServiceImpl.put()方法//如果是持久化实例,则调用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}...private ConsistencyService mapConsistencyService(String key) {//根据不同的key选择不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}
}@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {private final RaftConsistencyServiceImpl oldPersistentConsistencyService;private final BasePersistentServiceProcessor newPersistentConsistencyService;private volatile boolean switchNewPersistentService = false;...@Overridepublic void put(String key, Record value) throws NacosException {switchOne().put(key, value);}private PersistentConsistencyService switchOne() {return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;}...
}//Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {private final RaftCore raftCore;...@Overridepublic void put(String key, Record value) throws NacosException {checkIsStopWork();try {//Raft协议的核心实现raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);}}...
}

RaftCore的signalPublish()方法中的逻辑大概分成三部分:

第一部分:方法一开始就会判断自身节点是不是Leader节点,如果不是则会通过HTTP方式转发给Leader节点进行处理。

第二部分:RaftCore的signalPublish()方法中有一行核心代码onPublish(),即如果是Leader节点则会执行RaftCore的onPublish()方法来处理数据。该方法会先把数据写入到本地文件,然后马上同步给内存注册表。

RaftStore的write(datum)方法会把服务实例信息持久化到本地文件,即把Instance服务实例信息以JSON格式持久化到Nacos服务端目录下,并且存储的文件是以命名空间##分组@@服务名来命名的。而持久化的服务实例信息,在下一次服务端重启时会重新加载到内存注册表中。

服务实例信息持久化后,会通过NotifyCenter发布ValueChangeEvent事件更新注册表。RaftCore的init()方法会向NotifyCenter注册一个订阅者PersistentNotifier。所以NotifyCenter发布ValueChangeEvent事件时,就会被PersistentNotifier的onEvent()方法监听到,然后执行PersistentNotifier的notify()方法,最后会执行Service的onChange()方法来更新内存注册表。

第三部分:主要就是遍历集群节点,向每个节点发起通知请求来进行数据同步,这里会使用CountDownLatch闭锁来实现控制集群半数节点同步成功。

在创建CountDownLatch闭锁时,会获取集群半数的数量来创建闭锁。每当有一个集群节点同步成功,就对CountDownLatch闭锁进行减1。最后使用闭锁的await()方法进行等待,直到闭锁减完或超时才继续执行。这样通过CountDownLatch并发工具类就能实现需要过半节点成功的功能。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private final RaftProxy raftProxy;private final RaftStore raftStore;public final PersistentNotifier notifier;...@PostConstructpublic void init() throws Exception {...//注册订阅者NotifyCenter.registerSubscriber(notifier);...}...//Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}//第一部分:判断自己是不是Leader节点if (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);//获取Leader节点final RaftPeer leader = getLeader();//将写请求转发给Leader节点raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));//第二部分:Leader节点会执行到这里进行数据处理,把服务实例信息写入磁盘以及内存onPublish(datum, peers.local());//第三部分:final String content = json.toString();//通过闭锁来控制半数以上节点,peers.majorityCount()就是获取集群半数以上的节点数量final CountDownLatch latch = new CountDownLatch(peers.majorityCount());//同步给其他节点for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);//通过HTTP方式通知其他节点HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode());return;}//某个节点成功,闭锁-1latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}//通过闭锁的await()方法来等待半数集群节点同步成功if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {//only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}}...//Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();//if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {//先把数据写到本地文件中raftStore.write(datum);}//同步缓存datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());//通过发布ValueChangeEvent事件来同步内存注册表NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);}
}@Deprecated
@Component
public class RaftStore implements Closeable {...//Write datum to cache file.public synchronized void write(final Datum datum) throws Exception {String namespaceId = KeyBuilder.getNamespace(datum.key);File cacheFile = new File(cacheFileName(namespaceId, datum.key));if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {MetricsMonitor.getDiskException().increment();throw new IllegalStateException("can not make cache file: " + cacheFile.getName());}ByteBuffer data;data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {fc.write(data, data.position());fc.force(true);} catch (Exception e) {MetricsMonitor.getDiskException().increment();throw e;}//remove old format file:if (StringUtils.isNoneBlank(namespaceId)) {if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {String oldDatumKey = datum.key.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));if (cacheFile.exists() && !cacheFile.delete()) {Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, datum.value);throw new IllegalStateException("failed to delete old format datum: " + datum.key);}}}}...
}//事件发布中心:事件发布机制的实现
public class NotifyCenter {...//注册订阅者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//执行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}//发布事件public static boolean publishEvent(final Event event) {return publishEvent(event.getClass(), event);}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//执行DefaultPublisher.publish()方法return publisher.publish(event);}...}...
}public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();...@Overridepublic void addSubscriber(Subscriber subscriber) {subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {...  receiveEvent(event);...}void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {...notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {final Runnable job = new Runnable() {@Overridepublic void run() {subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}

(3)RaftCore的signalPublish()方法总结

首先会判断自身节点是不是Leader,如果不是,则会转发给Leader处理。如果是Leader,则会对数据进行处理,先是写入到本地文件,然后同步到内存注册表,最后会通知其他Follower节点进行数据同步。

可见Nacos 1.4.1版本在数据的写入实现上,并没有两阶段提交的处理。而是Leader自身处理数据完成后,直接就去同步给其他集群节点。哪怕集群节点同步失败或没有过半节点成功,Leader的数据也不会回滚而只抛出异常。所以,Nacos 1.4.1版本只是实现了Raft的简化版,后续也会被废弃掉的。

相关文章:

Nacos源码—4.Nacos集群高可用分析三

大纲 6.CAP原则与Raft协议 7.Nacos实现的Raft协议是如何写入数据的 8.Nacos实现的Raft协议是如何选举Leader节点的 9.Nacos实现的Raft协议是如何同步数据的 10.Nacos如何实现Raft协议的简版总结 6.CAP原则与Raft协议 (1)CAP分别指的是什么 (2)什么是分区以及容错 (3)为…...

AWS WebRTC如何实现拉流?内部是这样实现的

当我们通过手机上的app选择某一个Iot设备,例如,摄像头,想看实时视频的时候,aws都做了什么?最近在搞自研Iot项目,借机整理一下相关流程。 App通过 AWS SDK 发起拉流请求的内部机制是AWS Kinesis Video Streams (KVS) WebRTC 模式中一个非常关键的问题。 一、KVS WebRTC …...

NGINX `ngx_http_browser_module` 深度解析与实战

1. 模块定位 ngx_http_browser_module 在 HTTP 头 User-Agent 解析的基础上&#xff0c;给出三个内置变量&#xff1a; 变量作用典型值$modern_browser当 UA 被判定为 现代浏览器 时取 modern_browser_value 指定的值&#xff1b;否则为空modern. / 1$ancient_browser当 UA 被…...

Elasticsearch知识汇总之 ElasticSearch高可用方案

六 ElasticSearch高可用方案 6.1 高可用架构 请求协调节点根据负载均衡&#xff0c;转发给主分片节点&#xff0c;主分片同步复制给从节点&#xff0c;主从节点都写入完成返回客户端请求成功。对于读请求&#xff0c;协调负载到任意节点数据节点&#xff0c;数据节点把各自符合…...

多线程2-多线程编程

引入 当我们想要代码能够实现并发执行时&#xff0c;我们可以使用多进程进行并发编程&#xff08;在Java中并不推荐这种方式&#xff0c;许多API在Java标准库中都没有提供&#xff09;&#xff0c;也可以使用多线程进行并发编程&#xff08;系统提供了相关的API&#xff0c;Ja…...

电商系统中单商户和多商户的区别

在电商的商业版图上&#xff0c;单商户与多商户模式如同两条并行的发展脉络&#xff0c;各自构建起独特的商业生态。它们在运营逻辑、商业模式等多方面存在显著差异&#xff0c;这些差异不仅塑造了不同的平台特性&#xff0c;也深刻影响着企业的发展路径。接下来&#xff0c;我…...

【东枫科技】代理英伟达产品:智能网卡的连接线

文章目录 总览详细&#xff1a;NVIDIA 400Gb/s QSFP-DD 线缆详细&#xff1a;NVIDIA 400Gb/s OSFP 线缆详细&#xff1a;NVIDIA 200Gb/s QSFP56 线缆详细&#xff1a;NVIDIA 100Gb/s QSFP28 线缆 总览 详细&#xff1a;NVIDIA 400Gb/s QSFP-DD 线缆 详细&#xff1a;NVIDIA 400…...

使用ip池后,爬虫还被封,是什么原因呢?

嘿&#xff0c;亲爱的小伙伴们&#xff01;今天我们聊一个让很多爬虫工程师抓狂的问题&#xff1a;明明用上了IP池&#xff0c;结果爬虫还是被封了&#xff01;怎么回事呢&#xff1f;如果你也曾在爬虫与反爬的“猫鼠游戏”里痛苦“翻车”&#xff0c;别着急&#xff0c;这篇文…...

C++23 新利器:深入解析栈踪迹库 (P0881R7)

文章目录 为何需要标准化的栈踪迹&#xff1f;P0881R7 的核心组件与使用基本用法示例与异常处理的集成优势与价值潜在的考量总结 对于 C 开发者而言&#xff0c;调试和错误诊断一直是开发周期中不可或缺但又充满挑战的一环。当程序崩溃或发生未预期行为时&#xff0c;获取清晰、…...

2025-05-06 事业-独立开发项目-记录

摘要: 2025-05-06 事业-独立开发项目-记录 独立开发项目记录 Product Hunt | InDev 独立开发者导航站https://www.producthunt.com/ Nomads.com - Best Places to Live for Digital Nomads (formerly Nomad List)https://nomads.com/ InDev 独立开发者导航站https://indev.bei…...

【Linux系统】探索进程等待与程序替换的奥秘

文章目录 前言一、重谈进程创建1.1 fork 函数1.2 写时拷贝1.3 fork 的常规用法1.4 fork 调用失败的原因1.5 创建一批进程 二、进程终止2.1 进程退出场景2.2 strerror 函数的作用2.3 errno 全局变量2.4 程序异常机制2.5 进程退出方式 三、进程等待3.1 进程等待必要性3.2 进程等待…...

Github 2025-05-06Python开源项目日报 Top10

根据Github Trendings的统计,今日(2025-05-06统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目10C++项目2TypeScript项目1系统设计指南 创建周期:2507 天开发语言:Python协议类型:OtherStar数量:241693 个Fork数量:42010 次…...

【愚公系列】《Manus极简入门》021-音乐创作助手:“音符魔术师”

&#x1f31f;【技术大咖愚公搬代码&#xff1a;全栈专家的成长之路&#xff0c;你关注的宝藏博主在这里&#xff01;】&#x1f31f; &#x1f4e3;开发者圈持续输出高质量干货的"愚公精神"践行者——全网百万开发者都在追更的顶级技术博主&#xff01; &#x1f…...

【Azure Redis】Redis导入备份文件(RDB)失败的原因

问题描述 在测试Azure Redis的导入/导出备份文件的功能中&#xff0c;突然发现在Redis 4.0上导入的时候&#xff0c;一直报错。 image.png 问题解答 因为门户上只是显示导入失败&#xff0c;没有任何错误消息说明。根据常理推断&#xff0c;Redis 的RDB文件格式都具有一致性。居…...

git “分离头指针”(detached HEAD) 状态。

在 Git 中&#xff0c;当你运行 git branch 命令时&#xff0c;看到如下输出&#xff1a; * (detached from 5b596b5)master 其中的&#xff1a; * (detached from 5b596b5) 表示你当前处于 “分离头指针”&#xff08;detached HEAD&#xff09; 状态。 &#x1f9e0; 什…...

Gitee的介绍

目录 1.Gitee介绍&#xff1a; 1.1 代码托管 1.2 本土化优势 1.3 企业级服务 1.4 开源生态 1.5 多形态适配 定位&#xff1a;国内开发者首选的高效代码协作平台&#xff0c;兼顾个人开源与企业级私有开发需求。 2.Gitee和GitHub区别 3.Gitee使用教程 4.Gitee相关…...

NoUniqueKey问题和Regular join介绍

问题背景 在flink任务中&#xff0c;遇到了 NoUniqueKey Join的情况&#xff0c;导致了数据膨胀&#xff0c;和下游结果与数据库数据不一致问题 那NoUniqueKey Join为什么会导致问题呢&#xff0c;下面是其中一种场景示例&#xff1a; 为什么会出现 NoUniqueKey &#xff1a;…...

TC8:SOMEIP_ETS_027-028

SOMEIP_ETS_027: echoUINT8 目的 检查method方法echoUINT8的参数及其顺序能够被顺利地发送和接收 说白了就是检查UINT8数据类型参数在SOME/IP协议层的序列化与反序列化是否正常。 UINT8相比于测试用例SOMEIP_ETS_021: echoINT8中的SINT8数据类型来说,属于无符号整数,也就是…...

小微企业SaaS ERP管理系统,SpringBoot+Vue+ElementUI+UniAPP

小微企业的SaaS ERP管理系统&#xff0c;ERP系统源码&#xff0c;ERP管理系统源代码 一款适用于小微企业的SaaS ERP管理系统, 采用SpringBootVueElementUIUniAPP技术栈开发&#xff0c;让企业简单上云。 专注于小微企业的应用需求&#xff0c;如企业基本的进销存、询价&#…...

css filter 常用方法函数和应用实例

1. blur() 模糊 filter: blur(半径);参数&#xff1a;模糊半径&#xff08;像素&#xff09;&#xff0c;值越大越模糊 示例&#xff1a;filter: blur(5px);2. brightness() 亮度 filter: brightness(百分比); 参数&#xff1a;1原始对比度&#xff0c;0全灰&#xff0c;>…...

chrome inspect 调试遇到的问题

1、oppp 手机打开webview 的时候&#xff0c; 报错这个并没有页面 Offline #V8FIG6SGLN75M7FY Pending authentication: please accept debugging session on the device. 解决方法&#xff0c;保持chrome 浏览器在显示的状态 去设置里开启usb 调试再关闭&#xff0c;反复重…...

Kotlin 中 List 和 MutableList 的区别

在 Kotlin 中&#xff0c;List 和 MutableList 是两种不同的集合接口&#xff0c;核心区别在于可变性。 Kotlin 集合框架的重要设计原则&#xff1a;通过接口分离只读&#xff08;read - only&#xff09;和可变&#xff08;mutable&#xff09;操作&#xff0c;以提高代码的安…...

openssl 生成自签名证书实现接口支持https

1.下载安装openssl Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 2.配置环境变量 将 openssl 的目录&#xff08;D:\tools\openssl\bin&#xff09;添加到 path 中 3.生成自签名证书 找一个存证书的目录打开powershell 3.1 生成私钥 openssl gen…...

React 中集成 Ant Design 组件库:提升开发效率与用户体验

React 中集成 Ant Design 组件库:提升开发效率与用户体验 一、为什么选择 Ant Design 组件库?二、基础引入方式三、按需引入(优化性能)四、Ant Design Charts无缝接入图标前面提到了利用Redux提供全局维护,但如果在开发时再自己手动封装组件,不仅效率不高,可能开发的组件…...

神经网络:节点、隐藏层与非线性学习

神经网络&#xff1a;节点、隐藏层与非线性学习 摘要&#xff1a; 神经网络是机器学习领域中一种强大的工具&#xff0c;能够通过复杂的结构学习数据中的非线性关系。本文从基础的线性模型出发&#xff0c;逐步深入探讨神经网络中节点和隐藏层的作用&#xff0c;以及它们如何…...

vue+tsc+noEmit导致打包报TS类型错误问题及解决方法

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 当我们新建vue3项目,package.json文件会自动给我添加一些配置选项,这写选项基本没有问题,但是在实际操作过程中,当项目越来越复杂就会出现问题,本文给大家分享vuetscnoEmit导致打包报TS类型错误问题及…...

Ragflow服务器上部署教程

参考官方文档进行整理 克隆相应代码 git clone https://github.com/infiniflow/ragflow.git修改vm.max_map_count sudo sysctl -w vm.max_map_count262144修改 daemon.json文件 {"registry-mirrors": ["https://docker.m.daocloud.io","https://0…...

Ubuntu 系统中解决 Firefox 中文显示乱码的完整指南

Firefox 是一款流行的网络浏览器,但在 Ubuntu 系统中有时会遇到中文显示乱码的问题。本文将为您提供一个全面的解决方案,帮助您轻松解决这个烦人的问题。 问题概述 在 Ubuntu 系统中使用 Firefox 浏览器时,有时会发现中文字符显示为乱码或方块。这通常是由于缺少合适的中文…...

JVM——垃圾回收

垃圾回收 在Java虚拟机&#xff08;JVM&#xff09;的自动内存管理中&#xff0c;垃圾回收&#xff08;Garbage Collection, GC&#xff09;是其核心组件之一。它负责回收堆内存中不再使用的对象所占用的内存空间&#xff0c;以供新对象的分配使用。下面我们将深入探讨JVM中的…...

【AI News | 20250506】每日AI进展

AI Repos 1、gitsummarize GitSummarize是一个在线工具&#xff0c;用户只需将GitHub URL中的“hub”替换为“summarize”&#xff0c;即可为任何公开或私有代码库生成交互式文档。该工具利用Gemini分析代码结构&#xff0c;自动生成系统级架构概述、目录和文件摘要、自然语言…...

LabVIEW高冲击加速度校准系统

在国防科技领域&#xff0c;高 g 值加速度传感器广泛应用于先进兵器研制&#xff0c;如深侵彻系统、精确打击弹药及钻地弹药等。其性能指标直接影响研究结果的准确性与可靠性&#xff0c;因此对该传感器进行定期校准意义重大。高冲击加速度校准系统具备多方面功能&#xff0c;适…...

优化算法 - intro

优化问题 一般形式 minimize f ( x ) f(\mathbf{x}) f(x) subject to x ∈ C \mathbf{x} \in C x∈C 目标函数 f : R n → R f: \mathbb{R}^n \rightarrow \mathbb{R} f:Rn→R限制集合例子 C { x ∣ h 1 ( x ) 0 , . . . , h m ( x ) 0 , g 1 ( x ) ≤ 0 , . . . , g r …...

从PotPlayer到专业播放器—基于 RTSP|RTMP播放器功能、架构、工程能力的全面对比分析

从PotPlayer到专业播放器SDK&#xff1a;工程项目怎么选择合适的播放方案&#xff1f; ——基于 RTSP、RTMP 播放器功能、架构、工程能力的全面对比分析 在许多音视频项目早期&#xff0c;我们都听过这句话&#xff1a; “本地测试就用 PotPlayer 播吧&#xff0c;能播就行了…...

EasyRTC嵌入式音视频通信SDK技术,助力工业制造多场景实时监控与音视频通信

一、背景 在数字化时代&#xff0c;实时监控广泛应用于安防、工业、交通等领域。但传统监控系统实时性、交互性欠佳&#xff0c;难以满足需求。EasyRTC作为先进实时通信技术&#xff0c;具有低延迟、高可靠、跨平台特性&#xff0c;能有效升级监控系统。融入EasyRTC后&#xf…...

MPay码支付系统第四方聚合收款码多款支付插件个人免签支付源码TP8框架全开源

一、源码描述 这是一套码支付源码&#xff08;MPay&#xff09;&#xff0c;基于TP8框架&#xff0c;前端layui2.9后端PearAdmin&#xff0c;专注于个人免签收款&#xff0c;通过个人的普通收款码&#xff0c;即可实现收款通知自动回调&#xff0c;支持绝大多数商城系统&#…...

wrod生成pdf。[特殊字符]改背景

import subprocess import os,time from rembg import remove, new_session from PIL import Image import io from docxtpl import DocxTemplate, InlineImage from docx.shared import Inches input_folder ‘tupian’ # 输入文件夹 kouchu_folder ‘kouchu’ # 去背景图像…...

动手学深度学习12.1. 编译器和解释器-笔记练习(PyTorch)

以下内容为结合李沐老师的课程和教材补充的学习笔记&#xff0c;以及对课后练习的一些思考&#xff0c;自留回顾&#xff0c;也供同学之人交流参考。 本节课程地址&#xff1a;无 本节教材地址&#xff1a;12.1. 编译器和解释器 — 动手学深度学习 2.0.0 documentation 本节…...

数字文明时代开源技术驱动的商业范式重构:基于开源AI大模型、AI智能名片与S2B2C商城小程序源码的协同创新研究

摘要&#xff1a;数字文明时代&#xff0c;数字技术正以指数级速度重构全球经济与社会结构。本文聚焦开源AI大模型、AI智能名片与S2B2C商城小程序源码的协同创新机制&#xff0c;从技术架构、商业逻辑、实践案例三个维度展开系统研究。基于多行业实证数据&#xff0c;揭示开源技…...

【Bootstrap V4系列】学习入门教程之 组件-轮播(Carousel)

Bootstrap V4系列 学习入门教程之 组件-轮播&#xff08;Carousel&#xff09; 轮播&#xff08;Carousel&#xff09;一、How it works二、Example2.1 Slides only 仅幻灯片2.2 With controls 带控制装置2.3 With indicators 带指示器2.4 With captions 带字幕 轮播&#xff0…...

嵌入式openharmony标准鸿蒙系统驱动开发基本原理与流程

第一:鸿蒙概述 OpenHarmony采用多内核(Linux内核或者LiteOS)设计,支持系统在不同资源容量的设备部署。当相同的硬件部署不同内核时,如何能够让设备驱动程序在不同内核间平滑迁移,消除驱动代码移植适配和维护的负担,是OpenHarmony驱动子系统需要解决的重要问题。 …...

Leetcode 刷题记录 08 —— 链表第二弹

本系列为笔者的 Leetcode 刷题记录&#xff0c;顺序为 Hot 100 题官方顺序&#xff0c;根据标签命名&#xff0c;记录笔者总结的做题思路&#xff0c;附部分代码解释和疑问解答&#xff0c;01~07为C语言&#xff0c;08及以后为Java语言。 01 合并两个有序链表 /*** Definition…...

PaddlePaddle 和PyTorch选择与对比互斥

你遇到的错误信息如下&#xff1a; RuntimeError: (PreconditionNotMet) Tensors dimension is out of bound.Tensors dimension must be equal or less than the size of its memory.But received Tensors dimension is 8, memorys size is 0.[Hint: Expected numel() * Size…...

极新月报·2025.4人工智能投融资观察

“ AI投资从‘量’向‘质’过渡 ” 4月重点关注&#xff1a; 1、四月人工智能领域投融资事件105起&#xff0c;披露金额78.63亿人民币。 2、亿级人民币以上金额的投资事件共20起 。 3、四月人工智能领域出现1起IPO事件。 4、在所有融资事件里&#xff0c;除去股权投资&…...

C++ vector 介绍与使用

目录 1.vector是什么&#xff1f; 2.vector的使用 2.1vector的构造函数 2.2vector iterator 的使用 2.3vector 空间增长问题 2.4vector的增删查改 1.vector是什么&#xff1f; 1. vector是表示可变大小数组的序列容器。 2. 就像数组一样&#xff0c;vector也 采用连续的存储…...

可以下载blender/fbx格式模型网站

glbxz.com glbxz.com可以下载blender/fbx格式模型。当然里面有免费的...

Vi/Vim 编辑器详细指南

Vi/Vim 编辑器详细指南 简介一、模式详解1. 命令模式(Normal Mode)2. 插入模式(Insert Mode)3. 可视模式(Visual Mode)4. 命令行模式(Ex Mode)二、核心操作1. 保存与退出2. 导航与移动3. 编辑与文本操作4. 搜索与替换三、高级技巧1. 多文件与窗口操作2. 宏录制3. 寄存器…...

LeetCode 热题 100 22. 括号生成

LeetCode 热题 100 | 22. 括号生成 大家好&#xff0c;今天我们来解决一道经典的算法题——括号生成。这道题在 LeetCode 上被标记为中等难度&#xff0c;要求生成所有可能的并且有效的括号组合。这是一道非常经典的回溯法题目&#xff0c;非常适合用来练习递归和回溯的技巧。…...

UE5 MetaHuman眼睛变黑

第5个材质MI_EyeOcclusion_Inst修改成透明即可...

【C语言】--指针超详解(一)

目录 一.内存和地址 1.1--内存 1.2--如何理解编址 二.指针变量和地址 2.1--取地址操作符(&) 2.2--指针变量和解引用操作符(*) 2.2.1--指针变量 2.2.2--如何理解指针类型 2.2.3--解引用操作符 2.3--指针变量的大小 三.指针变量类型的意义 3.1--从指针的解引用方…...

高频工业RFID读写器-三格电子

高频工业RFID读写器 型号&#xff1a;SG-HF40-485、SG-HF40-TCP 产品功能 高频工业读写器&#xff08;RFID&#xff09;产品用在自动化生产线,自动化分拣系统,零部件组装产线等情境下&#xff0c;在自动化节点的工位上部署RFID读写设备&#xff0c;通过与制品的交互&#xf…...