当前位置: 首页 > news >正文

zk基础—5.Curator的使用与剖析二

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

11.Curator节点监听回调机制的实现源码

(1)PathCache子节点监听机制的实现源码

(2)NodeCache节点监听机制的实现源码

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

(4)PathCache实现自动重复注册监听器的效果

(5)NodeCache实现节点变化事件监听的效果

(1)PathCache子节点监听机制的实现源码

PathChildrenCache会调用原生zk客户端对象的getChildren()方法,并往该方法传入一个监听器childrenWatcher。当子节点发生事件,就会通知childrenWatcher这个原生的Watcher,然后该Watcher便会调用注册到PathChildrenCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//PathCache,监听/cluster下的子节点变化PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {...}});pathChildrenCache.start();}
}public class PathChildrenCache implements Closeable {private final WatcherRemoveCuratorFramework client;private final String path;private final boolean cacheData;private final boolean dataIsCompressed;private final CloseableExecutorService executorService;private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();...//初始化public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {this.client = client.newWatcherRemoveCuratorFramework();this.path = PathUtils.validatePath(path);this.cacheData = cacheData;this.dataIsCompressed = dataIsCompressed;this.executorService = executorService;ensureContainers = new EnsureContainers(client, path);}//获取用来存放Listener的容器listenerspublic ListenerContainer<PathChildrenCacheListener> getListenable() {return listeners;}//启动对子节点的监听public void start() throws Exception {start(StartMode.NORMAL);}private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {//处理连接状态的变化handleStateChange(newState);}};public void start(StartMode mode) throws Exception {...//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(connectionStateListener);...//把PathChildrenCache自己传入RefreshOperation中//下面的代码其实就是调用PathChildrenCache的refresh()方法offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));...}//提交一个任务到线程池进行处理void offerOperation(final Operation operation) {if (operationsQuantizer.add(operation)) {submitToExecutor(new Runnable() {@Overridepublic void run() {...operationsQuantizer.remove(operation);//其实就是调用PathChildrenCache的refresh()方法operation.invoke();...}});}}private synchronized void submitToExecutor(final Runnable command) {if (state.get() == State.STARTED) {//提交一个任务到线程池进行处理executorService.submit(command);}}...
}class RefreshOperation implements Operation {private final PathChildrenCache cache;private final PathChildrenCache.RefreshMode mode;RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {this.cache = cache;this.mode = mode;}@Overridepublic void invoke() throws Exception {//调用PathChildrenCache的refresh方法,也就是发起对子节点的监听cache.refresh(mode);}...
}public class PathChildrenCache implements Closeable {...private volatile Watcher childrenWatcher = new Watcher() {//重复注册监听器//当子节点发生变化事件时,该方法就会被触发调用@Overridepublic void process(WatchedEvent event) {//下面的代码其实依然是调用PathChildrenCache的refresh()方法offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));}};void refresh(final RefreshMode mode) throws Exception {ensurePath();//创建一个回调,在下面执行client.getChildren()成功时会触发执行该回调final BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (reRemoveWatchersOnBackgroundClosed()) {return;}if (event.getResultCode() == KeeperException.Code.OK.intValue()) {//处理子节点数据processChildren(event.getChildren(), mode);} else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {if (mode == RefreshMode.NO_NODE_EXCEPTION) {log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);ensureContainers.reset();} else {log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path);ensureContainers.reset();offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION));}}}};//下面的代码最后会调用到原生zk客户端的getChildren方法发起对子节点的监听//并且添加一个叫childrenWatcher的监听,一个叫callback的后台异步回调client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);}...
}//子节点发生变化事件时,最后都会触发执行EventOperation的invoke()方法
class EventOperation implements Operation {private final PathChildrenCache cache;private final PathChildrenCacheEvent event;EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {this.cache = cache;this.event = event;}@Overridepublic void invoke() {//调用PathChildrenCache的Listenercache.callListeners(event);}...
}

(2)NodeCache节点监听机制的实现源码

NodeCache会调用原生zk客户端对象的exists()方法,并往该方法传入一个监听器watcher。当子节点发生事件,就会通知watcher这个原生的Watcher,然后该Watcher便会调用注册到NodeCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//NodeCachefinal NodeCache nodeCache = new NodeCache(client, "/cluster");nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/cluster");if (stat == null) {} else {nodeCache.getCurrentData();}}});nodeCache.start();}
}public class NodeCache implements Closeable {private final WatcherRemoveCuratorFramework client;private final String path;private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();...private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {if (isConnected.compareAndSet(false, true)) {reset();}} else {isConnected.set(false);}}};//初始化一个Watcher,作为监听器添加到下面reset()方法执行的client.checkExists()方法中private Watcher watcher = new Watcher() {//重复注册监听器@Overridepublic void process(WatchedEvent event) {reset();}};//初始化一个回调,在下面reset()方法执行client.checkExists()成功时会触发执行该回调private final BackgroundCallback backgroundCallback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {processBackgroundResult(event);}};//初始化NodeCachepublic NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {this.client = client.newWatcherRemoveCuratorFramework();this.path = PathUtils.validatePath(path);this.dataIsCompressed = dataIsCompressed;}//获取存放Listener的容器ListenerContainerpublic ListenerContainer<NodeCacheListener> getListenable() {Preconditions.checkState(state.get() != State.CLOSED, "Closed");return listeners;}//启动对节点的监听public void start() throws Exception {start(false);}public void start(boolean buildInitial) throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(connectionStateListener);if (buildInitial) {//调用原生的zk客户端的exists()方法,对节点进行监听client.checkExists().creatingParentContainersIfNeeded().forPath(path);internalRebuild();}reset();}private void reset() throws Exception {if ((state.get() == State.STARTED) && isConnected.get()) {//下面的代码最后会调用原生的zk客户端的exists()方法,对节点进行监听//并且添加一个叫watcher的监听,一个叫backgroundCallback的后台异步回调client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);}}private void processBackgroundResult(CuratorEvent event) throws Exception {switch (event.getType()) {case GET_DATA: {if (event.getResultCode() == KeeperException.Code.OK.intValue()) {ChildData childData = new ChildData(path, event.getStat(), event.getData());setNewData(childData);}break;}case EXISTS: {if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {setNewData(null);} else if (event.getResultCode() == KeeperException.Code.OK.intValue()) {if (dataIsCompressed) {client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);} else {client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);}}break;}}}...
}

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

getChildren()方法注册的Watcher只有一次性,其注册的回调是一个异步回调。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "10".getBytes());System.out.println("创建节点'/test");client.getChildren().usingWatcher(new CuratorWatcher() {public void process(WatchedEvent event) throws Exception {//只要通知过一次zk节点的变化,这里就不会再被通知了//也就是第一次的通知才有效,这里被执行过一次后,就不会再被执行System.out.println("收到一个zk的通知: " + event);}}).inBackground(new BackgroundCallback() {//后台回调通知,表示会让zk.getChildren()在后台异步执行//后台异步执行client.getChildren()方法完毕,便会回调这个方法进行通知public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("收到一个后台回调通知: " + event);}}).forPath("/test");}
}

(4)PathCache实现自动重复注册监听器的效果

每当节点发生变化时,就会触发childEvent()方法的调用。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {//只要子节点发生变化,无论变化多少次,每次变化都会触发这里childEvent的调用public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {System.out.println("监听的子节点发生变化,收到了事件通知:" + pathChildrenCacheEvent);}});pathChildrenCache.start();System.out.println("完成子节点的监听和启动");}
}

(5)NodeCache实现节点变化事件监听的效果

每当节点发生变化时,就会触发nodeChanged()方法的调用。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");final NodeCache nodeCache = new NodeCache(client, "/test/child/id");nodeCache.getListenable().addListener(new NodeCacheListener() {//只要节点发生变化,无论变化多少次,每次变化都会触发这里nodeChanged的调用public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/test/child/id");if (stat != null) {byte[] dataBytes = client.getData().forPath("/test/child/id");System.out.println("节点数据发生了变化:" + new String(dataBytes));} else {System.out.println("节点被删除");}}});nodeCache.start();}
}

12.基于Curator的Leader选举机制的实现源码

(1)第一种Leader选举机制LeaderLatch的源码

(2)第二种Leader选举机制LeaderSelector的源码

利用Curator的CRUD+ 监听回调机制,就能满足大部分系统使用zk的场景了。需要注意的是:如果使用原生的zk去注册监听器来监听节点或者子节点,当节点或子节点发生了对应的事件,会通知客户端一次,但是下一次再有对应的事件就不会通知了。使用zk原生的API时,客户端需要每次收到事件通知后,重新注册监听器。然而Curator的PathCache + NodeCache,会自动重新注册监听器。

(1)第一种Leader选举机制LeaderLatch的源码

Curator客户端会通过创建临时顺序节点的方式来竞争成为Leader的,LeaderLatch这种Leader选举的实现方式与分布式锁的实现几乎一样。

每个Curator客户端创建完临时顺序节点后,就会对/leader/latch目录调用getChildren()方法来获取里面所有的子节点,调用getChildren()方法的结果会通过backgroundCallback回调进行通知,接着客户端便对获取到的子节点进行排序来判断自己是否是第一个子节点。

如果客户端发现自己是第一个子节点,那么就是Leader。如果客户端发现自己不是第一个子节点,就对上一个节点添加一个监听器。在添加监听器时,会使用getData()方法获取自己的上一个节点,getData()方法执行成功后会调用backgrondCallback回调。

当上一个节点对应的客户端释放了Leader角色,上一个节点就会消失,此时就会通知第二个节点对应的客户端,执行getData()方法添加的监听器。

所以如果getData()方法的监听器被触发了,即发现上一个节点不存在了,客户端会调用getChildren()方法重新获取子节点列表,判断是否是Leader。

注意:使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.getConnectionStateListenable().addListener(new ConnectionStateListener() {public void stateChanged(CuratorFramework client, ConnectionState newState) {switch (newState) {case LOST://当Leader与zk断开时,需要暂停当前Leader的工作}}});client.start();System.out.println("已经启动Curator客户端,完成zk的连接");LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");leaderLatch.start();leaderLatch.await();//阻塞等待直到当前客户端成为LeaderBoolean hasLeaderShip = leaderLatch.hasLeadership();System.out.println("是否成为Leader: " + hasLeaderShip);}
}public class LeaderLatch implements Closeable {private final WatcherRemoveCuratorFramework client;private final ConnectionStateListener listener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {handleStateChange(newState);}};...//Add this instance to the leadership election and attempt to acquire leadership.public void start() throws Exception {...//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(listener);reset();...}@VisibleForTestingvoid reset() throws Exception {setLeadership(false);setNode(null);//callback作为成功创建临时顺序节点后的回调BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {...if (event.getResultCode() == KeeperException.Code.OK.intValue()) {setNode(event.getName());if (state.get() == State.CLOSED) {setNode(null);} else {//成功创建临时顺序节点,需要通过getChildren()再去获取子节点列表getChildren();}} else {log.error("getChildren() failed. rc = " + event.getResultCode());}}};//创建临时顺序节点client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));}//获取子节点列表private void getChildren() throws Exception {//callback作为成功获取子节点列表后的回调BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (event.getResultCode() == KeeperException.Code.OK.intValue()) {checkLeadership(event.getChildren());}}};client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));}//检查自己是否是第一个节点private void checkLeadership(List<String> children) throws Exception {if (debugCheckLeaderShipLatch != null) {debugCheckLeaderShipLatch.await();}final String localOurPath = ourPath.get();//对获取到的节点进行排序List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;if (ourIndex < 0) {log.error("Can't find our node. Resetting. Index: " + ourIndex);reset();} else if (ourIndex == 0) {//如果自己是第一个节点,则标记自己为LeadersetLeadership(true);} else {//如果自己不是第一个节点,则对前一个节点添加监听String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null)) {//重新获取子节点列表getChildren();}}};BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {reset();}}};//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}...//阻塞等待直到成为Leaderpublic void await() throws InterruptedException, EOFException {synchronized(this) {while ((state.get() == State.STARTED) && !hasLeadership.get()) {wait();//Objetc对象的wait()方法,阻塞等待}}if (state.get() != State.STARTED) {throw new EOFException();}}//设置当前客户端成为Leader,并进行notifyAll()通知之前阻塞的线程private synchronized void setLeadership(boolean newValue) {boolean oldValue = hasLeadership.getAndSet(newValue);if (oldValue && !newValue) { // Lost leadership, was true, now falselisteners.forEach(new Function<LeaderLatchListener, Void>() {@Overridepublic Void apply(LeaderLatchListener listener) {listener.notLeader();return null;}});} else if (!oldValue && newValue) { // Gained leadership, was false, now truelisteners.forEach(new Function<LeaderLatchListener, Void>() {@Overridepublic Void apply(LeaderLatchListener input) {input.isLeader();return null;}});}notifyAll();//唤醒之前执行了wait()方法的线程}
}

(2)第二种Leader选举机制LeaderSelector的源码

通过判断是否成功获取到分布式锁,来判断是否竞争成为Leader。正因为是通过持有分布式锁来成为Leader,所以LeaderSelector.takeLeadership()方法不能退出,否则就会释放锁。而一旦释放了锁,其他客户端就会竞争锁成功而成为新的Leader。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");LeaderSelector leaderSelector = new LeaderSelector(client,"/leader/election",new LeaderSelectorListener() {public void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("你已经成为了Leader......");//在这里干Leader所有的事情,此时方法不能退出Thread.sleep(Integer.MAX_VALUE);}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态的变化,已经不是Leader......");if (connectionState.equals(ConnectionState.LOST)) {throw new CancelLeadershipException();}}});leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为LeaderThread.sleep(Integer.MAX_VALUE);}
}public class LeaderSelector implements Closeable {private final CuratorFramework client;private final LeaderSelectorListener listener;private final CloseableExecutorService executorService;private final InterProcessMutex mutex;...public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {Preconditions.checkNotNull(client, "client cannot be null");PathUtils.validatePath(leaderPath);Preconditions.checkNotNull(listener, "listener cannot be null");this.client = client;this.listener = new WrappedListener(this, listener);hasLeadership = false;this.executorService = executorService;//初始化一个分布式锁mutex = new InterProcessMutex(client, leaderPath) {@Overrideprotected byte[] getLockNodeBytes() {return (id.length() > 0) ? getIdBytes(id) : null;}};}public void start() {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");Preconditions.checkState(!executorService.isShutdown(), "Already started");Preconditions.checkState(!hasLeadership, "Already has leadership");client.getConnectionStateListenable().addListener(listener);requeue();}public boolean requeue() {Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");return internalRequeue();}private synchronized boolean internalRequeue() {if (!isQueued && (state.get() == State.STARTED)) {isQueued = true;//将选举的工作作为一个任务交给线程池执行Future<Void> task = executorService.submit(new Callable<Void>() {@Overridepublic Void call() throws Exception {...doWorkLoop();...return null;}});ourTask.set(task);return true;}return false;}private void doWorkLoop() throws Exception {...doWork();...}@VisibleForTestingvoid doWork() throws Exception {hasLeadership = false;try {//尝试获取一把分布式锁,获取失败会进行阻塞mutex.acquire();//执行到这一行代码,说明获取分布式锁成功hasLeadership = true;try {if (debugLeadershipLatch != null) {debugLeadershipLatch.countDown();}if (debugLeadershipWaitLatch != null) {debugLeadershipWaitLatch.await();}//回调用户重写的takeLeadership()方法listener.takeLeadership(client);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw e;} catch (Throwable e) {ThreadUtils.checkInterrupted(e);} finally {clearIsQueued();}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw e;} finally {if (hasLeadership) {hasLeadership = false;boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediatelytry {//释放锁mutex.release();} catch (Exception e) {if (failedMutexReleaseCount != null) {failedMutexReleaseCount.incrementAndGet();}ThreadUtils.checkInterrupted(e);log.error("The leader threw an exception", e);} finally {if (wasInterrupted) {Thread.currentThread().interrupt();}}}}}...
}

相关文章:

zk基础—5.Curator的使用与剖析二

大纲 1.基于Curator进行基本的zk数据操作 2.基于Curator实现集群元数据管理 3.基于Curator实现HA主备自动切换 4.基于Curator实现Leader选举 5.基于Curator实现分布式Barrier 6.基于Curator实现分布式计数器 7.基于Curator实现zk的节点和子节点监听机制 8.基于Curator创…...

bge-m3+deepseek-v2-16b+离线语音能力实现离线文档向量化问答语音版

ollama run deepseek-v2:16b ollama pull bge-m3 1、离线听写效果的大幅度提升。50M 1.3G&#xff08;每次初始化都会很慢&#xff09;---优化到首次初始化使用0延迟响应。 2、文档问答历史问题处理与优化&#xff0c;文档问答离线策略讨论与参数暴露。 3、离线大模型答复中断…...

[leetcode]3123. 最短路径中的边(Dijkstra+反向搜索找边)

题目链接 题意 给定n个点的无向图 给定一个edges{u,v,w}数组 表示u到v有一条边权为w的无向边 返回一个bool数组ans&#xff0c;ans[i]1表示edges[i]在任意一条0到n-1的最短路中 思路 先Dijkstra找出最短路再从n-1出发 反向搜索 当前点i&#xff0c;邻接点j&#xff0c;边权…...

构建macOS命令速查手册:基于Flask的轻量级Web应用实践

构建macOS命令速查手册&#xff1a;基于Flask的轻量级Web应用实践 一、项目概述 本文介绍一个基于Flask框架开发的macOS命令速查Web应用。该应用通过结构化的命令数据存储和响应式前端设计&#xff0c;为用户提供便捷的命令查询体验&#xff0c;具备以下特点&#xff1a; 六…...

中国移动启动数字乡村“五新升级”:年底前,行政村5G覆盖达95%

大湾区经济网品牌观察报道&#xff0c;近日&#xff0c;在国家全面推进乡村振兴的战略背景下&#xff0c;中国移动近日发布数字乡村升级行动计划&#xff0c;以“AI大模型数智化平台”为核心引擎&#xff0c;围绕“五新升级”构建“两个新型”信息服务体系。 一、数字基建筑基&…...

借助mcpo在open-webui中使用mcp

open-webui前几天发布了0.6版本&#xff0c;我立即进行了升级。新版本中一个重要功能是通过mcpo方式支持了mcp server。本文将介绍mcpo是什么&#xff0c;以及如何在open-webui中使用它。同时&#xff0c;我也会分享几个在接入过程中遇到的问题及解决方案。 首先来介绍mcpo&…...

Mysql的备份还原

MySQL日志 日志类型 MySQL有几个不同的日志文件&#xff0c;可以帮助你找出mysqld内部发生的事情&#xff1a; 日志文件记入文件中的信息类型错误日志记录启动、运行或停止时出现的问题。查询日志记录建立的客户端连接和执行的语句。二进制日志记录所有更改数据的语句。主要用…...

测试:正交法设计测试用例

目录 一、什么是正交法 二、利用正交表设计测试用例 正交法设计测试用例的步骤 一、什么是正交法 正交法的目的是为了减少测试用例的数量&#xff0c;让尽可能少的用例覆盖两两组合。认识正交表。 最简单的正交表是L4(2^3)&#xff0c;含意如下&#xff1a; “L”代表正…...

zk基础—5.Curator的使用与剖析一

大纲 1.基于Curator进行基本的zk数据操作 2.基于Curator实现集群元数据管理 3.基于Curator实现HA主备自动切换 4.基于Curator实现Leader选举 5.基于Curator实现分布式Barrier 6.基于Curator实现分布式计数器 7.基于Curator实现zk的节点和子节点监听机制 8.基于Curator创…...

VSCode中结合DeepSeek使用Cline插件的感受

前言 听网上有传言说AI智能插件Cline非常的好用&#xff0c;而且相对Cursor而言还是免费的&#xff0c;捆绑的大模型选择也比较的广泛。所以&#xff0c;特意安装试用了一下。 我的采用IDE是VSCode&#xff0c;捆绑的大模型是最近比较火的DeepSeek。总体使用下来感觉非常的棒。…...

安卓开发工程师-Java 常用数据结构

1. Java 中的数组和集合有什么区别&#xff1f; 数组&#xff1a; 长度固定&#xff1a;一旦声明&#xff0c;长度不能改变。类型单一&#xff1a;只能存储相同类型的元素。存储效率高&#xff1a;底层是连续的内存空间&#xff0c;访问速度快。示例代码&#xff1a; int[] …...

thinkphp8.0上传图片到阿里云对象存储(oss)

1、开通oss,并获取accessKeyId、accessKeySecret <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><tit…...

Angular 2 模板语法详解

Angular 2 模板语法详解 引言 Angular 2 作为一款强大的前端框架,以其组件化的开发模式和高效的性能被众多开发者所青睐。模板语法是Angular 2中用于定义组件UI的关键部分。本文将详细介绍Angular 2的模板语法,帮助开发者更好地理解和运用这一功能。 模板语法概述 Angula…...

进行性核上性麻痹护理攻略:多维度守护健康

日常起居护理 保证患者居住环境安全&#xff0c;清除地面障碍物&#xff0c;避免患者跌倒。家具摆放固定且合理&#xff0c;方便患者活动。为患者准备宽松、舒适、易于穿脱的衣物&#xff0c;减轻穿衣时的困难。在饮食上&#xff0c;提供富含营养、易于吞咽的食物&#xff0c;…...

MessageQueue --- RabbitMQ WorkQueue

MessageQueue --- RabbitMQ WorkQueue 什么是WorkQueue如何分发RoundRobinFair dispatch (Prefetch) --- 能者多劳 什么是WorkQueue Work queues&#xff0c;任务模型。简单来说就是让多个消费者绑定到一个队列&#xff0c;共同消费队列中的消息。当消息处理比较耗时的时候&…...

Redis内存碎片详解!

目录 一、 什么是内存碎片&#xff1f;&#x1f914;二、 为什么 Redis 会有内存碎片呢&#xff1f;&#x1f937;‍♀️三、 如何查看 Redis 内存碎片的信息&#xff1f;&#x1f50d;四、 如何清理 Redis 内存碎片&#xff1f;&#x1f9f9;五、总结&#x1f4dd; &#x1f3…...

如何使用 Nginx 代理 Easysearch 服务

Nginx 是一个高性能的 HTTP 服务器和反向代理服务器&#xff0c;广泛用于负载均衡、缓存、SSL 终端和服务代理等场景。本篇将尝试使用 Nginx 代理 Easysearch 服务&#xff0c;方法同样适用于 Elasticsearch 和 Opensearch。 测试环境 Easysearch 集群版本为 1.10.0&#xff…...

用python输出OLED字模库的符号

提示&#xff1a;博主是小白&#xff0c;如有不足&#xff0c;望海涵和指出 在单片机上练习使用OLED显示屏时&#xff0c;可以看到有个OLED字模库 本文用python将这些字符打印出来&#xff0c;代码如下&#xff08;本文只适用与128*64的OLED&#xff0c;如果是其它OLED&#xf…...

【java】Class.newInstance()

在 Java 中&#xff0c;Class.newInstance()是一个用于创建类的新实例的方法。它调用类的无参构造函数来创建对象。然而&#xff0c;从 Java 9 开始&#xff0c;Class.newInstance()方法已经被标记为废弃&#xff0c;推荐使用其他替代方法。 Class.newInstance()的使用 Class.…...

Apache Arrow 使用

下述操作参考 Building Arrow C — Apache Arrow v20.0.0.dev267 安装依赖组件 sudo apt-get install \build-essential \ninja-build \cmake 下载源码 git clone --recursive --shallow-submodules gitgithub.com:apache/arrow.git 配置 创建build目录并且进入 mkdir a…...

第二届图像处理与人工智能国际学术会议(ICIPAI2025)

重要信息 时间&#xff1a;2025年4月18日-20日 地点&#xff1a;吉林-长春&#xff08;线上线下结合&#xff09; 官网&#xff1a;www.icipai.org 简介&#xff08;部分&#xff09; 主题 其他 图像处理与人工智能&#xff08;Image Processing & Artificial Intell…...

Kafka 消息堆积的原因有哪些?

Kafka 产生消息堆积的本质原因是&#xff1a; ⚠️ “消费速度 < 生产速度”&#xff0c;也就是&#xff1a;写入太快&#xff0c;处理太慢。 下面我从实际场景出发&#xff0c;帮你梳理出常见的几种堆积情况&#xff0c;结合原因和例子&#xff0c;便于你对号入座排查问题 …...

解决cline等免费使用deepseek模型的问题

OpenAI、OpenRouter、Claude等都无法在国内免费正常使用&#xff0c;cline作为在vscode中应对cursor比较好的替代方案&#xff0c;怎么使用免费Deepseek&#xff0c;最核心的是在点击模型名称打开配置以下几项&#xff1a; 1、打开VSCode左侧的Cline\Roo Cline插件面板 2、点…...

ROS多设备交互

ROS多设备连接同一个Master&#xff1a;ROS Master多设备连接-CSDN博客 在多个PC端连接同一个ROS Master后&#xff0c;接下来就可以实现不同设备之间的话题交流&#xff0c;Master主机端启动不同PC端的功能包等功能了 尽管多个PC端拥有不同的ROS工作空间&#xff0c;但是只要…...

浅谈 MVVM 模式

MVVM&#xff08;Model-View-ViewModel&#xff09; 是一种软件架构设计模式&#xff0c;旨在将用户界面&#xff08;UI&#xff09;与业务逻辑分离&#xff0c;从而提高代码的可维护性和可测试性。它在现代前端开发和桌面应用开发中得到了广泛应用&#xff0c;尤其是在构建复杂…...

flutter点击事件教程

在 Flutter 中&#xff0c;处理点击事件是非常常见的操作。Flutter 提供了多种方式来实现用户交互&#xff0c;比如按钮点击、手势检测等。下面是一个详细的教程&#xff0c;帮助你理解如何在 Flutter 中实现点击事件。 一、使用 onPressed 实现按钮点击事件 Flutter 提供了 E…...

[SAP SD] 常用事务码

在SAP系统中&#xff0c;事务码(Transaction Code)是一个具有特定功能的代码标识符&#xff0c;用于快速调用和执行SAP系统内的各种业务模块的功能 /NT-code: 关闭当前业务窗口&#xff0c;退回到SAP初始界面&#xff0c;进入对应的T-Code窗口 /OT-code: 新建SAP GUI窗口&…...

【 <二> 丹方改良:Spring 时代的 JavaWeb】之 Spring Boot 的未来:从微服务到云原生的演进

<前文回顾> 点击此处查看 合集 https://blog.csdn.net/foyodesigner/category_12907601.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12907601&sharereferPC&sharesourceFoyoDesigner&sharefromfrom_link <今日更新> 一、引子&…...

保留格式地一键翻译英文ppt

我手头上有一个贝叶斯推断的英文ppt&#xff0c;假如我想翻译成中文&#xff0c;整合起来进行pre&#xff0c;你会怎么做&#xff1f; 1&#xff0c;复制粘贴型&#xff1a; 在翻译软件与源文件ppt之间不断流转&#xff0c;效率太低 2&#xff0c;office ppt自带翻译插入整合…...

晶晨S905L3S/S905L3SB_安卓9.0_10秒开机_通刷-线刷固件包

晶晨S905L3S&#xff0f;S905L3SB_安卓9.0_10秒开机_通刷-线刷固件包 线刷方法&#xff1a;&#xff08;新手参考借鉴一下&#xff09; 使用晶晨刷机工具USB_Burning_Tool进行刷机&#xff1b;请使用Amlogic USB Burning Tool v2.2.5或v2.2.7&#xff08;晶晨线刷烧录工具v2.2…...

Android Transition转场动效使用全解析

Transition的使用和原理 项目效果 1. 简述 Android 4.4.2 中引入了 Transition 过渡动画&#xff0c;不过功能比较简单。在 Android 5.0 的 Material Design 中引入更完整和强大的 Transition 框架。通过Transition可以实现&#xff1a; 同一个页面中的场景过渡动画Activit…...

第九章Python语言高阶加强-面向对象篇

目录 一.初始对象 二.成员方法 1.成员变量和成员方法 三.类和对象 四.构造方法 五.其他内置方法&#xff08;魔术方法&#xff09; 1.__str__字符串方法 2.__lt__小于符号比较方法 3.__le__小于等于比较符号方法 4.__eq__比较运算符实现方法 六.封装 七.继承 1.继承…...

AI重构SEO关键词智能布局

内容概要 随着人工智能技术在搜索引擎优化领域的深入发展&#xff0c;AI驱动的关键词智能布局正在重塑传统SEO策略的核心逻辑。通过整合自然语言处理、深度学习与语义分析技术&#xff0c;现代SEO系统已形成包含智能分词、意图解码、动态优化的三维技术框架&#xff0c;使关键…...

言同数字:法新社AFP海外新闻媒体发稿成功案例——出海品牌背书必备

作者&#xff1a;言同数字全球传播团队 一、品牌困境&#xff1a;当中国技术遇上海外认知壁垒 案例背景&#xff1a; 某中国光伏储能企业&#xff08;应保密要求匿名&#xff0c;代号"GreenTech"&#xff09;&#xff0c;其家用储能系统在欧洲市场遭遇&#xff1…...

第三章 react redux的学习之redux和react-redux,@reduxjs/toolkit依赖结合使用

redux系列文章目录 第一章 简单学习redux,单个reducer 第二章 简单学习redux,多个reducer 第四章 react-redux&#xff0c;reduxjs/toolkit依赖&#xff0c;学习 第五章 两张图告诉你redux常使用的api有哪些 前言 前面两章&#xff0c;我们是只使用的redux的依赖。 本章…...

【HTML】纯前端网页小游戏-戳破彩泡

分享一个简单有趣的网页小游戏 - 彩色泡泡爆破。玩家需要点击屏幕上随机出现的彩色泡泡来得分。 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-wi…...

【Python使用】嘿马云课堂web完整实战项目第3篇:增加数据,修改数据【附代码文档】

教程总体简介&#xff1a;项目概述 项目背景 项目的功能构架 项目的技术架构 CMS 什么是CMS CMS需求分析与工程搭建 静态门户工程搭建 SSI服务端包含技术 页面预览开发 4 添加“页面预览”链接 页面发布 需求分析 技术方案 测试 环境搭建 数据字典 服务端 前端 数据模型 页面原…...

数据结构【栈和队列附顺序表应用算法】

栈和队列和顺序表应用算法练习 1.栈1.1概念与结构1.2栈的实现 2.队列2.1概念与结构2.2队列的实现 3.附&#xff08;顺序表应用算法&#xff09;3.1移除元素3.2删除有序数组中的重复项3.3合并两个有序数组 1.栈 1.1概念与结构 栈&#xff1a;⼀种特殊的线性表&#xff0c;其只…...

Redis数据结构之String

目录 1.概述2.常见操作2.1 SET/GET2.2 MSET/MGET/MSETNX2.3 GETRANGE/SETRANGE2.4 INCR(BY)/DECR(BY)2.5 STRLEN2.6 APPEND2.7 GETSET 3.小结 1.概述 String是最常用的数据类型&#xff0c;一个key对应一个value。String是二进制安全的&#xff0c;可以包含任何数据&#xff0…...

Maven 远程仓库推送方法

步骤 1&#xff1a;配置 pom.xml 中的远程仓库地址 在项目的 pom.xml 文件中添加 distributionManagement 配置&#xff0c;指定远程仓库的 URL。 xml 复制 <project>...<distributionManagement><!-- 快照版本仓库 --><snapshotRepository><id…...

uname

在 C 语言中&#xff0c;uname 函数用于获取当前操作系统的相关信息。 它是 POSIX 标准的一部分&#xff0c;定义在 <sys/utsname.h> 头文件中。 通过调用 uname 函数&#xff0c;可以获取系统名称、节点名称&#xff08;主机名&#xff09;、操作系统版本、机器硬件架构…...

【无标题】object,wait,notifyAll

在 Java 中&#xff0c;Object类提供了wait()方法&#xff0c;用于线程间的协作和同步。wait()方法使得当前线程暂停执行&#xff0c;并释放当前对象的锁&#xff0c;直到其他线程调用该对象的notify()或notifyAll()方法将其唤醒。这是实现线程间通信和同步的重要机制之一。 w…...

【Vue】 核心特性实战解析:computed、watch、条件渲染与列表渲染

目录 一、计算属性&#xff08;computed&#xff09; ✅ 示例&#xff1a; 计算属性-methods实现&#xff1a;在插值模块里&#xff0c;实现函数的调用功能 计算属性-computed的实现&#xff1a; 计算属性-简写&#xff1a; ✅ 特点&#xff1a; ⚠️ 与 methods 的区别…...

精品可编辑PPT | 基于湖仓一体构建数据中台架构大数据湖数据仓库一体化中台解决方案

本文介绍了基于湖仓一体构建数据中台架构的技术创新与实践。它详细阐述了数据湖、数据仓库和数据中台的概念&#xff0c;分析了三者的区别与协作关系&#xff0c;指出数据湖可存储大规模结构化和非结构化数据&#xff0c;数据仓库用于高效存储和快速查询以支持决策&#xff0c;…...

基于Python网络爬虫的智能音乐可视化系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;智能音乐可视化系统当然不能排除在外。我本次开发的基于网络爬虫的智能音乐可视化系统是在实际应用和软件工程的开发原理之上&#xff0c;…...

基于STM32与应变片的协作机械臂力反馈控制系统设计与实现----2.2 机械臂控制系统硬件架构设计

2.2 机械臂控制系统硬件架构设计 一、总体架构拓扑 1.1 典型三级硬件架构 #mermaid-svg-MWmxD3zX6bu4iFCv {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-MWmxD3zX6bu4iFCv .error-icon{fill:#552222;}#mermaid-s…...

在线记事本——支持Markdown

项目地址 https://github.com/Anyuersuper/CloudNotebook 百度网盘 通过网盘分享的文件&#xff1a;CloudNotebook-master.zip 链接: https://pan.baidu.com/s/1kd2qNvm0eXc6_7oYDR769A?pwdyuer 提取码: yuer &#x1f4dd; 云笔记 (Cloud Notebook) 云笔记是一个简洁、安全…...

DDPM 做了什么

本博客主要侧重点在于HOW也就是DDPM怎么做的而不是WHY为什么要这样做 那么第一个问题DDPM做了一件什么事&#xff1a;这个算法通过逐渐向原图像添加噪声来破坏图像&#xff0c;然后再学习如何从噪声成恢复图像。 第二件事如何做到的&#xff1a;通过训练一个网络&#xff0c;…...

Redis数据结构之List

目录 1.概述2.常见操作2.1 LPUSH/RPUSH/LRANGE2.2 LPOP/RPOP2.3 LINDEX2.4 LLEN2.5 LREM2.6 LTRIM2.7 RPOPLPUSH2.8 LSET2.9 LINSERT 1.概述 List是简单的字符串列表&#xff0c;单key多个value&#xff0c;按照插入顺序排序。 支持添加一个元素到列表的头部(左边)或者尾部(右…...

L2-023 图着色问题 #DFS C++邻接矩阵存图

文章目录 题目解读输入格式输出格式 思路Ac CODE 参考 题目解读 给定一个无向图V&#xff0c;询问是否可以用K种颜色为V中每一个顶点分配一种颜色&#xff0c;使得不会有两个相邻顶点具有同一种颜色 输入格式 第一行给出V,E,K&#xff0c; 分别代表无向图的顶点&#xff0c;…...