当前位置: 首页 > news >正文

zk源码—3.单机和集群通信原理一

大纲

1.单机版的zk服务端的启动过程

(1)预启动阶段

(2)初始化阶段

2.集群版的zk服务端的启动过程

(1)预启动阶段

(2)初始化阶段

(3)Leader选举阶段

(4)Leader和Follower启动阶段

1.单机版的zk服务端的启动过程

(1)预启动阶段

(2)初始化阶段

单机版zk服务端的启动,主要分为两个阶段:预启动阶段和初始化阶段,其启动流程图如下:

图片

接下来介绍zk服务端的预启动阶段(启动管理)与初始化阶段的具体流程,也就是单机版的zk服务端是如何从初始化到对外提供服务的。

(1)预启动阶段

在zk服务端进行初始化之前,首先要对配置文件等信息进行解析和载入,而zk服务端的预启动阶段的主要工作流程如下:

一.启动QuorumPeerMain入口程序

二.解析zoo.cfg配置文件

三.创建和启动历史文件清理器

四.根据配置判断是集群模式还是单机模式

public class QuorumPeerMain {protected QuorumPeer quorumPeer;...//1.启动程序入口public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {//启动程序main.initializeAndRun(args);} catch (IllegalArgumentException e) {...}LOG.info("Exiting normally");System.exit(0);}protected void initializeAndRun(String[] args) {QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {//2.解析配置文件config.parse(args[0]);}//3.创建和启动历史文件清理器DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();//4.根据配置判断是集群模式还是单机模式if (args.length == 1 && config.isDistributed()) {//集群模式runFromConfig(config);} else {//单机模式ZooKeeperServerMain.main(args);}}...
}public class ZooKeeperServerMain {private ServerCnxnFactory cnxnFactory;...public static void main(String[] args) {ZooKeeperServerMain main = new ZooKeeperServerMain();try {//启动程序main.initializeAndRun(args);} catch (IllegalArgumentException e) {...}LOG.info("Exiting normally");System.exit(0);}protected void initializeAndRun(String[] args) {...ServerConfig config = new ServerConfig();//2.解析配置文件if (args.length == 1) {config.parse(args[0]);} else {config.parse(args);}runFromConfig(config);}//以下是初始化阶段的内容public void runFromConfig(ServerConfig config) {...txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);...   }...
}public class DatadirCleanupManager {//配置zoo.cfg文件中的autopurge.snapRetainCount和autopurge.purgeInterval实现数据快照文件的定时清理;private final File snapDir;//数据快照地址private final File dataLogDir;//事务日志地址//配置zoo.cfg文件中的autopurge.snapRetainCount,可指定需要保留的文件数目,默认是保留3个;private final int snapRetainCount;//需要保留的文件数目//配置zoo.cfg文件中的autopurge.purgeInterval可指定清理频率,以小时为单位,默认0表示不开启自动清理功能;private final int purgeInterval;//清理频率private Timer timer;...
}

一.启动程序

QuorumPeerMain类是zk服务的启动入口,可理解为Java中的main函数。通常我们执行zkServer.sh脚本启动zk服务时,就会运行这个类。QuorumPeerMain的main()方法会调用它的initializeAndRun()方法来启动程序。

二.解析zoo.cfg配置文件

在QuorumPeerMain的main()方法中,会执行它的initializeAndRun()方法。

在QuorumPeerMain的initializeAndRun()方法中,便会解析zoo.cfg配置文件。

在ZooKeeperServerMain的initializeAndRun()方法中,也会解析zoo.cfg配置文件。

zoo.cfg配置文件配置了zk运行时的基本参数,包括tickTime、dataDir等。

三.创建和启动历史文件清理器

文件清理器在日常的使用中非常重要。面对大流量的网络访问,zk会产生海量的数据。如果磁盘数据过多或者磁盘空间不足,可能会导致zk服务端不能正常运行,所以zk采用DatadirCleanupManager类去清理历史文件。

其中DatadirCleanupManager类有5个属性,如上代码所示。DatadirCleanupManager会对事务日志和数据快照文件进行定时清理,这种自动清理历史数据文件的机制可以尽量避免zk磁盘空间的浪费。

四.判断集群模式还是单机模式

根据从zoo.cfg文件解析出来的集群服务器地址列表来判断是否是单机模式。如果是单机模式,则会调用ZooKeeperServerMain的main()方法来进行启动。如果是集群模式,则会调用QuorumPeerMain的runFromConfig()方法来进行启动。

(2)初始化阶段

初始化阶段会根据预启动解析出的配置信息,初始化服务器实例。该阶段的主要工作流程如下:

一.创建数据持久化工具实例FileTxnSnapLog

二.创建服务端统计工具实例ServerStats

三.根据两个工具实例创建单机版服务器实例

四.创建网络连接工厂实例

五.初始化网络连接工厂实例

六.启动网络连接工厂实例的线程

七.恢复单机版服务器实例的本地数据

八.创建并启动服务器实例的会话管理器

九.初始化单机版服务器实例的请求处理链

十.注册单机版服务器实例到网络连接工厂实例

单机版服务器实例:ZooKeeperServer

网络连接工厂实例:ServerCnxnFactory

会话管理器:SessionTracker

public class ZooKeeperServerMain {private ServerCnxnFactory cnxnFactory;...//以下是初始化阶段的内容public void runFromConfig(ServerConfig config) {...//1.创建zk数据管理器——持久化工具类FileTxnSnapLog的实例txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);//2.创建zk服务运行统计器——统计工具类ServerStats的实例//3.创建服务器实例ZooKeeperServerfinal ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);txnLog.setServerStats(zkServer.serverStats());  ...//4.创建服务端网络连接工厂实例ServerCnxnFactorycnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);cnxnFactory.startup(zkServer);...}...
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {private final ServerStats serverStats;private FileTxnSnapLog txnLogFactory = null;private ZKDatabase zkDb;protected int tickTime = DEFAULT_TICK_TIME;private final ZooKeeperServerListener listener;...public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {//2.创建服务端统计工具ServerStats实例serverStats = new ServerStats(this);this.txnLogFactory = txnLogFactory;this.txnLogFactory.setServerStats(this.serverStats);this.zkDb = zkDb;this.tickTime = tickTime;setMinSessionTimeout(minSessionTimeout);setMaxSessionTimeout(maxSessionTimeout);listener = new ZooKeeperServerListenerImpl(this);}...
}

一.创建数据持久化工具实例FileTxnSnapLog

可以通过FileTxnSnapLog对zk服务器的内存数据进行持久化,具体会将内存数据持久化到配置文件里的事务日志文件 + 快照数据文件中。

所以在执行ZooKeeperServerMain的runFromConfig()方法启动zk服务端时,首先会根据zoo.cfg配置文件中的dataDir数据快照目录和dataLogDir事务日志目录,通过"new FileTxnSnapLog()"来创建持久化工具类FileTxnSnapLog的实例。

二.创建服务端统计工具实例ServerStats

ServerStats用于统计zk服务端运行时的状态信息,主要统计的数据包括:服务端向客户端发送的响应包次数、接收客户端发送的请求包次数、服务端处理请求的延迟情况、处理客户端的请求次数。

在执行ZooKeeperServerMain.runFromConfig()方法时,执行到ZooKeeperServer的构造方法就会首先创建ServerStats实例。

三.根据两个工具实例创建单机版服务器实例

ZooKeeperServer是单机版服务端的核心实体类。在执行ZooKeeperServerMain.runFromConfig()方法时,创建完zk数据管理器——持久化工具类FileTxnSnapLog的实例后,就会通过"new ZooKeeperServer()"来创建单机版服务器实例ZooKeeperServer。

此时会传入从zoo.cfg配置文件中解析出的tickTime和会话超时时间来创建服务器实例。创建完服务器实例ZooKeeperServer后,接下来才会对该ZooKeeperServer服务器实例进行更多的初始化工作,包括网络连接器、内存数据库和请求处理器等组件的初始化。

四.创建网络连接工厂实例

zk中客户端和服务端的网络通信,本质是通过Java的IO数据流进行通信的。zk一开始就是使用自己实现的NIO进行网络通信的,但之后引入了Netty框架来满足不同使用情况下的需求。

在执行ZooKeeperServerMain的runFromConfig()方法时,创建完服务器实例ZooKeeperServer后,就会通过ServerCnxnFactory的createFactory()方法来创建服务端网络连接工厂实例ServerCnxnFactory。

ServerCnxnFactory的createFactory()方法首先会获取配置值,判断是使用NIO还是使用Netty,然后再通过反射去实例化服务端网络连接工厂。

可以通过配置zookeeper.serverCnxnFactory来指定使用:zk自己实现的NIO还是Netty框架,来构建服务端网络连接工厂ServerCnxnFactory。

public abstract class ServerCnxnFactory {...static public ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();return serverCnxnFactory;}...
}

五.初始化网络连接工厂实例

在执行ZooKeeperServerMain的runFromConfig()方法时,创建完服务端网络连接工厂ServerCnxnFactory实例后,就会调用网络连接工厂ServerCnxnFactory的configure()方法来初始化网络连接工厂ServerCnxnFactory实例。

这里以NIOServerCnxnFactory的configure()方法为例,该方法主要会启动一个NIO服务器,以及创建三类线程:

一.处理客户端连接的AcceptThread线程

二.处理客户端请求的一批SelectorThread线程

三.处理过期连接的ConnectionExpirerThread线程

初始化完ServerCnxnFactory实例后,虽然此时NIO服务器已对外开放端口,客户端也能访问到2181端口,但此时zk服务端还不能正常处理客户端请求。

public class NIOServerCnxnFactory extends ServerCnxnFactory {//最大客户端连接数protected int maxClientCnxns = 60;//处理过期连接的线程private ConnectionExpirerThread expirerThread;//处理客户端建立连接的线程private AcceptThread acceptThread;//处理客户端请求的线程private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();//会话过期相关int sessionlessCnxnTimeout;//连接过期队列private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;//selector线程数,CPU核数的一半private int numSelectorThreads;//工作线程数private int numWorkerThreads;...public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {...maxClientCnxns = maxcc;sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);//连接过期队列cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);//创建一个自动处理过期连接的ConnectionExpirerThread线程expirerThread = new ConnectionExpirerThread();int numCores = Runtime.getRuntime().availableProcessors();numSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1));numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);...//创建一批SelectorThread线程for (int i=0; i<numSelectorThreads; ++i) {selectorThreads.add(new SelectorThread(i));}//打开ServerSocketChannelthis.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);//绑定端口,启动NIO服务器ss.socket().bind(addr);ss.configureBlocking(false);//创建一个AcceptThread线程acceptThread = new AcceptThread(ss, addr, selectorThreads);}...
}

六.启动网络连接工厂实例的线程

在执行ZooKeeperServerMain的runFromConfig()方法时,调用完网络连接工厂ServerCnxnFactory的configure()方法初始化网络连接工厂ServerCnxnFactory实例后,便会调用ServerCnxnFactory的startup()方法去启动ServerCnxnFactory的线程。

注意:对过期连接进行处理是由一个ConnectionExpirerThread线程负责的。

public class NIOServerCnxnFactory extends ServerCnxnFactory {private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;//连接的过期队列...public void startup(ZooKeeperServer zks, boolean startServer) {//6.启动各种线程start();setZooKeeperServer(zks);if (startServer) {//7.恢复本地数据zks.startdata();//8.创建并启动会话管理器SessionTracker//9.初始化zk的请求处理链//10.注册zk服务器实例zks.startup();}}public void start() {stopped = false;if (workerPool == null) {workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);}for (SelectorThread thread : selectorThreads) {if (thread.getState() == Thread.State.NEW) {thread.start();}}if (acceptThread.getState() == Thread.State.NEW) {acceptThread.start();}if (expirerThread.getState() == Thread.State.NEW) {expirerThread.start();}}...//用来处理过期连接,会启动一个超时检查线程来检查连接是否过期private class ConnectionExpirerThread extends ZooKeeperThread {ConnectionExpirerThread() {super("ConnnectionExpirer");}@Overridepublic void run() {while (!stopped) {//使用了分桶管理策略long waitTime = cnxnExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {conn.close();}}}}//用来处理要建立连接的客户端OP_ACCEPT请求private class AcceptThread extends AbstractSelectThread {private final ServerSocketChannel acceptSocket;private final SelectionKey acceptKey;private final Collection<SelectorThread> selectorThreads;private Iterator<SelectorThread> selectorIterator;...public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {super("NIOServerCxnFactory.AcceptThread:" + addr);this.acceptSocket = ss;this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));selectorIterator = this.selectorThreads.iterator();}@Overridepublic void run() {...while (!stopped && !acceptSocket.socket().isClosed()) {select();}...}private void select() {selector.select();Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();if (!key.isValid()) {continue;}if (key.isAcceptable()) {if (!doAccept()) {pauseAccept(10);}} else {LOG.warn("Unexpected ops in accept select " + key.readyOps());}}}private void pauseAccept(long millisecs) {acceptKey.interestOps(0);selector.select(millisecs);acceptKey.interestOps(SelectionKey.OP_ACCEPT);}private boolean doAccept() {boolean accepted = false;SocketChannel sc = null;sc = acceptSocket.accept();accepted = true;InetAddress ia = sc.socket().getInetAddress();int cnxncount = getClientCnxnCount(ia);...sc.configureBlocking(false);// Round-robin assign this connection to a selector threadif (!selectorIterator.hasNext()) {selectorIterator = selectorThreads.iterator();}SelectorThread selectorThread = selectorIterator.next();...acceptErrorLogger.flush();return accepted;}}//用来处理AcceptThread线程建立好的客户端连接请求class SelectorThread extends AbstractSelectThread {private final int id;private final Queue<SocketChannel> acceptedQueue;private final Queue<SelectionKey> updateQueue;public SelectorThread(int id) throws IOException {super("NIOServerCxnFactory.SelectorThread-" + id);this.id = id;acceptedQueue = new LinkedBlockingQueue<SocketChannel>();updateQueue = new LinkedBlockingQueue<SelectionKey>();}public boolean addAcceptedConnection(SocketChannel accepted) {if (stopped || !acceptedQueue.offer(accepted)) {return false;}wakeupSelector();return true;}...@Overridepublic void run() {while (!stopped) {select();processAcceptedConnections();processInterestOpsUpdateRequests();}...}private void select() {selector.select();Set<SelectionKey> selected = selector.selectedKeys();ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);Iterator<SelectionKey> selectedKeys = selectedList.iterator();while(!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selected.remove(key);...}}private void processAcceptedConnections() {SocketChannel accepted;while (!stopped && (accepted = acceptedQueue.poll()) != null) {SelectionKey key = null;key = accepted.register(selector, SelectionKey.OP_READ);NIOServerCnxn cnxn = createConnection(accepted, key, this);key.attach(cnxn);addCnxn(cnxn);}}...}...protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) {return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);}private void addCnxn(NIOServerCnxn cnxn) throws IOException {...//激活连接touchCnxn(cnxn);}public void touchCnxn(NIOServerCnxn cnxn) {//这个cnxnExpiryQueue与管理过期连接有关cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());}
}

七.恢复单机版服务器实例的本地数据

启动zk服务端需要从本地快照数据文件 + 事务日志文件中进行数据恢复。在执行ZooKeeperServerMain的runFromConfig()方法时,调用完ServerCnxnFactory的startup()方法启动ServerCnxnFactory的线程后,就会调用单机版服务器实例ZooKeeperServer的startdata()方法来恢复本地数据。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {private ZKDatabase zkDb;private FileTxnSnapLog txnLogFactory = null;...//7.恢复本地数据public void startdata() {if (zkDb == null) {zkDb = new ZKDatabase(this.txnLogFactory);}if (!zkDb.isInitialized()) {loadData();}}public void loadData() throws IOException, InterruptedException {if (zkDb.isInitialized()) {setZxid(zkDb.getDataTreeLastProcessedZxid());} else {setZxid(zkDb.loadDataBase());}// Clean up dead sessionsLinkedList<Long> deadSessions = new LinkedList<Long>();for (Long session : zkDb.getSessions()) {if (zkDb.getSessionWithTimeOuts().get(session) == null) {deadSessions.add(session);}}for (long session : deadSessions) {killSession(session, zkDb.getDataTreeLastProcessedZxid());}// Make a clean snapshottakeSnapshot();}public void takeSnapshot(){try {txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());} catch (IOException e) {LOG.error("Severe unrecoverable error, exiting", e);System.exit(10);}}...
}

八.创建并启动服务器实例的会话管理器

会话管理器SessionTracker主要负责zk服务端的会话管理。在执行ZooKeeperServerMain的runFromConfig()方法时,调用完单机版服务器实例ZooKeeperServer的startdata()方法完成本地数据恢复后,就会调用ZooKeeperServer的startup()方法来开始创建并启动会话管理器,也就是在startup()方法中会调用createSessionTracker()和startSessionTracker()方法。SessionTracker其实也是一个继承了ZooKeeperThread的线程。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {protected SessionTracker sessionTracker;private FileTxnSnapLog txnLogFactory = null;private ZKDatabase zkDb;...public synchronized void startup() {startupWithServerState(State.RUNNING);}private void startupWithServerState(State state) {//8.创建并启动会话管理器SessionTrackerif (sessionTracker == null) {createSessionTracker();}startSessionTracker();//9.初始化服务器实例ZooKeeperServer的请求处理链setupRequestProcessors();//注册JMX服务registerJMX();//开启监控JVM停顿的线程startJvmPauseMonitor();setState(state);notifyAll();}protected void createSessionTracker() {sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),tickTime, createSessionTrackerServerId, getZooKeeperServerListener());}protected void startSessionTracker() {((SessionTrackerImpl)sessionTracker).start();}...
}public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {private final ExpiryQueue<SessionImpl> sessionExpiryQueue;private final SessionExpirer expirer;...@Overridepublic void run() {while (running) {//使用了分桶管理策略long waitTime = sessionExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}for (SessionImpl s : sessionExpiryQueue.poll()) {setSessionClosing(s.sessionId);expirer.expire(s);}}}...
}

九.初始化单机版服务器实例的请求处理链

在执行ZooKeeperServerMain的runFromConfig()方法时,在ZooKeeperServer的startup()方法中调用方法创建并启动好会话管理器后,就会继续在ZooKeeperServer的startup()方法中调用方法初始化请求处理链,也就是在startup()方法中会调用setupRequestProcessors()方法。

zk处理请求的方式是典型的责任链模式,zk服务端会使用多个请求处理器来依次处理一个客户端请求。所以在服务端启动时,会将这些请求处理器串联起来形成一个请求处理链。

单机版服务器的请求处理链包括3个请求处理器:

第一个请求处理器是:PrepRequestProcessor

第二个请求处理器是:SyncRequestProcessor

第三个请求处理器是:FinalRequestProcessor

zk服务端会严格按照顺序分别调用这3个请求处理器处理客户端的请求,其中PrepRequestProcessor和SyncRequestProcessor其实也是一个线程。服务端收到的客户端请求会不断被添加到请求处理器的请求队列中,然后请求处理器线程启动后就会不断从请求队列中提取请求出来进行处理。

图片

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {protected RequestProcessor firstProcessor;...public synchronized void startup() {startupWithServerState(State.RUNNING);}private void startupWithServerState(State state) {//8.创建并启动会话管理器SessionTrackerif (sessionTracker == null) {createSessionTracker();}startSessionTracker();//9.初始化服务器实例ZooKeeperServer的请求处理链setupRequestProcessors();//注册JMX服务registerJMX();//开启监控JVM停顿的线程startJvmPauseMonitor();setState(state);notifyAll();}protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);((PrepRequestProcessor)firstProcessor).start();}...
}public interface RequestProcessor {void processRequest(Request request) throws RequestProcessorException;void shutdown();
}public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();private final RequestProcessor nextProcessor;ZooKeeperServer zks;...public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener());this.nextProcessor = nextProcessor;this.zks = zks;}public void processRequest(Request request) {submittedRequests.add(request);}@Overridepublic void run() {...while (true) {Request request = submittedRequests.take();pRequest(request);}...}protected void pRequest(Request request) throws RequestProcessorException {...//事务请求处理pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);...//交给下一个处理器处理nextProcessor.processRequest(request);}...
}

十.注册单机版服务器实例到网络连接工厂实例

就是调用ServerCnxnFactory的startup()方法中的setZooKeeperServer()方法,将初始化好的单机版服务器实例ZooKeeperServer注册到网络连接工厂实例ServerCnxnFactory。同时,也会将网络连接工厂实例ServerCnxnFactory注册到单机版服务器实例ZooKeeperServer。此时,zk服务端就可以对外提供正常的服务了。

public class NIOServerCnxnFactory extends ServerCnxnFactory {...public void startup(ZooKeeperServer zks, boolean startServer) {//6.启动各种线程start();//10.注册zk服务器实例setZooKeeperServer(zks);if (startServer) {//7.恢复本地数据zks.startdata();//8.创建并启动会话管理器SessionTracker//9.初始化zk的请求处理链zks.startup();}}...
}public abstract class ServerCnxnFactory {...protected ZooKeeperServer zkServer;final public void setZooKeeperServer(ZooKeeperServer zks) {this.zkServer = zks;if (zks != null) {if (secure) {zks.setSecureServerCnxnFactory(this);} else {zks.setServerCnxnFactory(this);}}}...
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {...protected ServerCnxnFactory serverCnxnFactory;protected ServerCnxnFactory secureServerCnxnFactory;public void setServerCnxnFactory(ServerCnxnFactory factory) {serverCnxnFactory = factory;}public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {secureServerCnxnFactory = factory;}...
}

相关文章:

zk源码—3.单机和集群通信原理一

大纲 1.单机版的zk服务端的启动过程 (1)预启动阶段 (2)初始化阶段 2.集群版的zk服务端的启动过程 (1)预启动阶段 (2)初始化阶段 (3)Leader选举阶段 (4)Leader和Follower启动阶段 1.单机版的zk服务端的启动过程 (1)预启动阶段 (2)初始化阶段 单机版zk服务端的启动&…...

车企数字化转型:从“制造工厂”到“移动科技平台”的升维路径

一、战略重构&#xff1a;政策与产业变革的双重倒逼 中国《智能网联汽车技术路线图2.0》明确要求2030年L4级自动驾驶新车渗透率达20%&#xff0c;而麦肯锡数据显示&#xff0c;全球车企数字化投入占比已从2018年的7%跃升至2025年的18%。当前车企面临三大核心挑战&#xff1a;用…...

C++-Mongoose(2)-https-server-openssl

OpenSSL生成HTTPS自签名证书 - 简书 1.Openssl windowsubuntu下载http://www.openssl.vip/download1.VS2019编译OpenSSL 2.VS2019编译第一个OpenSSL项目 1.ubuntu编译OpenSSL 3.0 2.编写第一个OpenSSL 1.windows下编译OpenSSL 安装vs2019 perl nasm安装activePerl…...

nginx正向代理https

一、需求 公司内部服务器向外访问腾讯接口&#xff1a;https://qyapi.weixin.qq.com/cgi-bin&#xff0c;不能使用http直接访问。并且不支持域名&#xff0c;还需要设置互联网出口-出向白名单ip。 如何在尽量少改动代码的情况下实现应用的出向访问链接&#xff0c;考虑使用正向…...

Flask中的蓝图(Blueprint)浅讲

BluePrint Flask中的蓝图&#xff08;Blueprint&#xff09;​是一种强大的组织工具&#xff0c;能够将大型应用拆分为可重用的模块化组件 1. ​模块化组织 用途&#xff1a;将应用按功能拆分为独立模块&#xff0c;提升代码可维护性。​示例&#xff1a; # user/views.py fr…...

虚拟表、TDgpt、JDBC 异步写入…TDengine 3.3.6.0 版本 8 大升级亮点

近日&#xff0c;TDengine 3.3.6.0 版本正式发布。除了此前已亮相的时序数据分析 AI 智能体 TDgpt&#xff0c;本次更新还带来了多个针对性能与易用性的重要增强&#xff1a;虚拟表全面上线&#xff0c;支持更灵活的一设备一表建模&#xff1b;JDBC 写入机制全新升级&#xff0…...

大型语言模型智能应用Coze、Dify、FastGPT、MaxKB 对比,选择合适自己的LLM工具

大型语言模型智能应用Coze、Dify、FastGPT、MaxKB 对比&#xff0c;选择合适自己的LLM工具 Coze、Dify、FastGPT 和 MaxKB 都是旨在帮助用户构建基于大型语言模型 (LLM) 的智能应用的平台。它们各自拥有独特的功能和侧重点&#xff0c;以下是对它们的简要对比&#xff1a; Coz…...

WEB安全--XSS--DOM破坏

一、前言 继XSS基础篇后&#xff0c;我们知道了三种类型的XSS&#xff0c;这篇文章主要针对DOM型XSS的原理进行深入解析。 二、DOM型XSS原理 2.1、什么是DOM 以一个形象的比喻&#xff1a; 网页就像是一座房子&#xff0c;而 **DOM** 就是这座房子的“蓝图”或者“结构图”。…...

持续集成:GitLab CI/CD 与 Jenkins CI/CD 的全面剖析

一、引言 在当今快速迭代的软件开发领域,持续集成(Continuous Integration,CI)已成为保障软件质量、加速开发流程的关键实践。通过频繁地将代码集成到共享仓库,并自动进行构建和测试,持续集成能够尽早发现并解决代码冲突和缺陷。而 GitLab CI/CD 和 Jenkins CI/CD 作为两…...

Go语言sync.Mutex包源码解读

互斥锁sync.Mutex是在并发程序中对共享资源进行访问控制的主要手段&#xff0c;对此Go语言提供了非常简单易用的机制。sync.Mutex为结构体类型&#xff0c;对外暴露Lock()、Unlock()、TryLock()三种方法&#xff0c;分别用于阻塞加锁、解锁、非阻塞加锁操作&#xff08;加锁失败…...

FreeRTOS软件定时器

软件定时器就是"闹钟"&#xff0c;你可以设置闹钟&#xff0c; 用软件定时器的话USE_TIMER要设置为1 在30分钟后让你起床工作每隔1小时让你例行检查机器运行情况 软件定时器也可以完成两类事情&#xff1a; 在"未来"某个时间点&#xff0c;运行函数周期…...

Selenium三大等待

一、强制等待 1.设置完等待后不管有没有找到元素&#xff0c;都会执行等待&#xff0c;等待结束后才会执行下一步 2.实例&#xff1a; driver webdriver.Chrome()driver.get("https://www.baidu.com")time.sleep(3) # 设置强制等待driver.quit() 二、隐性等待 …...

【Ansible自动化运维】一、初步了解,开启自动化运维之旅

在当今数字化时代&#xff0c;随着企业 IT 基础设施规模的不断扩大&#xff0c;传统的手工运维方式逐渐显得力不从心。自动化运维技术应运而生&#xff0c;其中 Ansible 凭借其简洁易用、功能强大的特点&#xff0c;成为众多运维工程师和开发人员的首选工具。本篇文章将从基础概…...

雪花算法、md5加密

雪花算法生成ID是一个64位长整型&#xff08;但是也可以通过优化简短位数&#xff09; 组成部分&#xff1a; 时间戳 机器ID 序列号 用途&#xff1a; 分布式系统唯一ID生成&#xff1a;解决数据库自增ID在分布式环境下的唯一性问题、避免UUID的无序性和性能问题 有序性…...

micro介绍

micro介绍 Micro 的首要特点是易于安装&#xff08;它只是一个静态的二进制文件&#xff0c;没有任何依赖关系&#xff09;和易于使用Micro 支持完整的插件系统。插件是用 Lua 编写的&#xff0c;插件管理器可自动为你下载和安装插件。使用简单的 json 格式配置选项&#xff0…...

电视盒子 刷armbian

参考 中兴电视盒子中兴B860AV3.2-M刷Armbian新手级教程-CSDN博客 1.刷安卓9 带root版本 a. 下载安卓线刷包 链接&#xff1a;https://pan.baidu.com/s/1hz87_ld2lJea0gYjeoHQ8A?pwdd7as 提取码&#xff1a;d7as b.拆机短接 3.安装usbburning工具 使用方法 &#xff0c;…...

(七)lerobot开源项目so100新版本全流程操作(操作记录)

目录 《项目简介》 一、环境配置 1、创建虚拟环境 2、克隆项目并安装所需包 二、主从臂硬件准备 1、舵机配置 &#xff08;1&#xff09;分别查看主从臂的开发板端口号 &#xff08;2&#xff09;分别设置主从臂的舵机 2、组装主从臂 3、查看主从臂端口号和相机端…...

智慧景区能源管理解决方案,为旅游“升温”保驾护航

景区能源管理 当下痛点 1 高峰期用电负荷大 节假日和旅游旺季等高峰期用电需求增大&#xff0c;电力供应不足、电网负荷过大&#xff1b; 2 设备维护困难 景区内电力设备多且散&#xff0c;包括发电机组、变电站、配电设备等&#xff0c;维护和管理困难&#xff0c;特别是…...

LCR 056. 两数之和 IV - 输入二叉搜索树

文章目录 题意思路代码 题意 题目链接 思路 代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), …...

AI搜索+法律咨询:在「事实重构」与「程序正义」的博弈场‌

已经写了股票和医疗相关的内容&#xff0c;今天聊一下AI搜索和法律结合的应用场景。AI搜索不替用户做选择&#xff0c;却让我们握住了法律武器的说明书。 ​​​​​​​一、AI重构事实&#xff1a;技术理想与法律现实的碰撞 ‌ 1、案例切入&#xff1a;AI能否还原车祸责任比…...

多模态大模型重塑自动驾驶:技术融合与实践路径全解析

目录 1、 引言&#xff1a;AI与自动驾驶的革命性融合 2、五大领先多模态模型解析 2.1 Qwen2.5-Omni&#xff1a;全模态集大成者 2.2. LLaVA&#xff1a;视觉语言理解专家 2.3. Qwen2-VL&#xff1a;长视频理解能手 2.4. X-InstructBLIP&#xff1a;跨模态理解框架 2.5. …...

海阳科技IPO:业务独立性、业绩稳定性、财务规范性存致命缺陷

三角形是最稳定的结构&#xff0c;它既是完美的相互制衡&#xff0c;又是有力的彼此支撑。 ——佚名 引 言 IPO审议指标、要求、规定众多&#xff0c;有无一个直接简单的公式&#xff1f;该公式可以直接将造假等“低劣”IPO项目排除在外&#xff1f; 在《奕泽财经》看来…...

PyTorch 与 Python 装饰器及迭代器的比较与应用

在深度学习和 Stable Diffusion&#xff08;SD&#xff09;训练过程中&#xff0c;PyTorch 不仅依赖于 Python 的基础特性&#xff0c;而且通过扩展和封装这些特性&#xff0c;提供了更高效、便捷的训练和推理方式。本文将从装饰器和迭代器两个方面详细解释 Python 中的原生实现…...

大数据(5)(基础概念)Spark从入门到实战:核心原理与大数据处理实战案例

目录 一、背景介绍1‌. 为什么需要Spark&#xff1f;‌‌2. Spark的诞生‌&#xff1a; 二、Spark核心原理1. ‌四大核心特性‌2. ‌核心架构‌3. ‌执行流程‌ 三、Spark实战案例案例1&#xff1a;单词计数&#xff08;WordCount&#xff09;案例2&#xff1a;实时流处理&…...

Ubuntu小练习

文章目录 一、远程连接1、通过putty连接2、查看putty运行状态3、通过Puuty远程登录Ubuntu4、添加新用户查看是否添加成功 5、用新用户登录远程Ubuntu6、使用VNC远程登录树莓派 二、虚拟机上talk聊天三、Opencv1、简单安装版&#xff08;适合新手安装&#xff09;2、打开VScode特…...

运行Spark会出现恶问题

1. 依赖冲突问题&#xff1a;Spark依赖众多组件&#xff0c;如Scala、Hadoop等。不同版本的依赖之间可能存在兼容性问题&#xff0c;导致Spark无法正常运行。比如&#xff0c;特定版本的Spark可能要求与之匹配的Scala版本&#xff0c;若使用了不兼容的Scala版本&#xff0c;会在…...

uniapp大文件分包

1. 在pages.json中配置 "subPackages":[{"root":pagesUser,"pages":[{"path":mine/xxx,"style":xxx },{"path":mine/xxx,"style":xxx}]},{"root":pagesIndex,"pages":[{"p…...

Git 源码打包、迁移、恢复和备份

介绍 Git 项目打包方式&#xff0c;适用于源码交付、迁移、备份等场景。 一 Git 仓库的两种类型 在实际项目开发与交付中&#xff0c;常接触 的 两种 Git 仓库&#xff1a; 仓库类型是否包含源码适用场景普通仓库是本地开发、运行、构建裸仓库否代码托管、只读交付、备份 普…...

Linux 内核网络协议栈中的 struct packet_type:以 ip_packet_type 为例

在 Linux 内核的网络协议栈中,struct packet_type 是一个核心数据结构,用于注册特定协议类型的数据包处理逻辑。它定义了如何处理特定协议的数据包,并通过协议类型匹配机制实现协议分发。本文将通过分析 ip_packet_type 的定义和作用,深入探讨其在网络协议栈中的重要性。 …...

LeetCodeHot100-第三章:数学

面试经典 150 题 - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台 目录 &#x1f388;1、双指针&#xff1a;9. 回文数 &#x1f388;2、逻辑题 &#xff1a;66. 加一 &#x1f388;3、逻辑题&#xff1a;172. 阶乘后的零 &#x1f388;4、…...

JavaScript 错误处理:理解和应对异常

在编程中&#xff0c;错误是不可避免的&#xff0c;特别是在进行复杂的逻辑操作、与外部系统交互或处理用户输入时。错误处理是软件开发中非常重要的一部分&#xff0c;它可以帮助开发者提高应用的稳定性和可用性。本文将深入探讨JavaScript中的错误处理机制&#xff0c;如何利…...

LangGraph异步化sqlite checkpoint

安装 pip install langgraph-checkpoint-sqlite异步checkpiont初始化&#xff1a; from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver conn aiosqlite.connect(":memory:", check_same_threadFalse) memory AsyncSqliteSaver(conn)如果使用异步流式…...

StarRocks 助力首汽约车精细化运营

作者&#xff1a;任智红&#xff0c;首汽约车大数据负责人 更多交流&#xff0c;联系我们&#xff1a;https://wx.focussend.com/weComLink/mobileQrCodeLink/334%201%202/ffbe5 导读&#xff1a; 本文整理自首汽约车大数据负责人任智红在 StarRocks 年度峰会上的演讲&#xf…...

Versatile-OCR-Program:可以从复杂的教育材料(如试卷)中提取结构化数据的开源多模态OCR工具

Versatile-OCR-Program 此 OCR 系统专门设计用于以针对机器学习 &#xff08;ML&#xff09; 训练优化的格式从复杂的教育材料&#xff08;如试卷&#xff09;中提取结构化数据。它支持多语言文本、数学公式、表格、图表和图表&#xff0c;非常适合创建高质量的训练数据集。 主…...

时序数据库 TDengine Cloud 私有连接实战指南:4步实现数据安全传输与成本优化

小T导读&#xff1a;在物联网和工业互联网场景下&#xff0c;企业对高并发、低延迟的数据处理需求愈发迫切。本文将带你深入了解 TDengineCloud 如何通过全托管服务与私有连接&#xff0c;帮助企业实现更安全、更高效、更低成本的数据采集与传输&#xff0c;从架构解析到实际配…...

vue项目本地调试使用https

由于测试环境远程接口&#xff0c;是采用https协议&#xff0c;为了能正常携带cookie访问接口&#xff0c;需要把本地项目也采用https协议访问。前提是后端的cookie设置在二级域名下&#xff0c;且允许固定其他子域名跨域访问&#xff08;需要在后端设置&#xff09; 项目框架…...

【学习笔记】文件上传漏洞--二次渲染、.htaccess、变异免杀

目录 第十二关 远程包含地址转换 第十三关 突破上传删除 条件竞争 第十四关 二次渲染 第十五关 第十六关 第十七关 .htaccess 第十八关 后门免杀 第十九关 日志包含 第十二关 远程包含地址转换 延续第十一关&#xff0c;加一个文件头&#xff0c;上传成功&#xff0c…...

探秘 MQTT 协议:物联网的 “隐形桥梁”

在当今数字化时代&#xff0c;物联网技术正以前所未有的速度改变着我们的生活。从智能家居到工业自动化&#xff0c;从远程医疗到智能交通&#xff0c;无数设备相互连接、交换信息&#xff0c;构建起一个庞大而复杂的智能世界。而在这背后&#xff0c;有一个关键的 “隐形桥梁”…...

[ctfshow web入门] web24

前置知识 isset&#xff1a;判断这个变量是否声明且不为NULL&#xff0c;否则返回False mt_srand&#xff1a;设置随机数种子&#xff0c;如果不手动设置&#xff0c;那么系统会自动进行一次随机种子的设置 mt_rand&#xff1a;生成一个随机数&#xff0c;这个随机数与种子有个…...

Unity 实现伤害跳字

核心组件&#xff1a; Dotween TextMeshPro 过程轨迹如下图&#xff1a; 代码如下&#xff1a; using System.Collections; using System.Collections.Generic; using DG.Tweening; using TMPro; using UnityEngine; using UnityEngine.Pool;public class …...

在SQLark 中快速生成测试数据

在软件开发与数据库管理过程中&#xff0c;高质量的测试数据是保障系统稳定性和性能优化的关键。然而&#xff0c;手动构造仿真数据不仅耗时耗力&#xff0c;还难以覆盖多样化的测试场景。现在&#xff0c;可以使用 SQLark 的数据生成功能&#xff0c;通过 8大类47子类的数据规…...

Postman接口测试详解

一、为何使用postman postman是一款简单高效的接口测试工具&#xff0c;能够很方便发送接口请求&#xff0c;易于保存接口请求脚本&#xff0c;postman提供接口响应数据比对功能&#xff0c;可以设置预期结果作断言&#xff0c;还能把测试用例放在一个集合中批量执行&#xff…...

[ctfshow web入门] web30

信息收集 题目将flag system php不区分大小写地过滤了 解题 前置知识 print_r&#xff1a;php中用于打印数组 scandir&#xff1a;php中用于获取指点目录下的所以文件目录名 getcwd&#xff1a;获取当前目录 目录获取 这里提供两种方法 print_r(scandir(getcwd())); pri…...

ElasticSearch迁移数据

一、查询索引 1、查询所有索引 curl --user elastic:123456 -XGET "http://localhost:19200/_cat/indices?v&sindex" 2、查询索引配置 以索引名称hello为例 curl --user elastic:123456 -XGET "http://localhost:19200/hello/_settings?pretty" 3…...

ES:账号、索引、ILM

目录 笔记1&#xff1a;账号权限查看、查看账号、创建账号等查看所有用户查看特定用户验证权限修改用户权限删除用户 笔记2&#xff1a;索引状态和内容的查看等查看所有索引查看特定索引内容查看索引映射查看索引设置查看索引统计信息查看ILM策略 笔记1&#xff1a;账号权限查看…...

Spring MVC 逻辑视图(JSP、Thymeleaf、FreeMarker)与非逻辑视图(JSON、Excel、PDF、XML)详解及示例

Spring MVC 逻辑视图与非逻辑视图详解及示例 一、逻辑视图与非逻辑视图的定义 类型定义逻辑视图通过视图解析器&#xff08;ViewResolver&#xff09;将逻辑名称&#xff08;如 success&#xff09;映射到具体视图实现。非逻辑视图直接返回具体视图对象&#xff08;如 JsonVie…...

开发体育赛事直播系统:实现聊天交友的私聊功能技术实现全方案解析

基于“东莞梦幻网络科技”体育赛事直播系统&#xff0c;展示前后端技术&#xff08;PHP ThinkPHP Vue.js Android Java iOS OC&#xff09;实现的“用户与用户之间私聊”完整方案&#xff0c;包括功能描述、界面效果、技术实现、数据结构、接口设计及关键代码示例。 一、私…...

UTF-8和GBK编码的区别和详细解释

各位看官&#xff0c;大家早安午安晚安呀~~~ 如果您觉得这篇文章对您有帮助的话 欢迎您一键三连&#xff0c;小编尽全力做到更好 欢迎您分享给更多人哦 今天我们来学习&#xff1a;一个小的知识点—UTF-8和GBK编码的解释 1.关于bite位和进制的关系 1 个比特&#xff08;bit&am…...

java导入excel更新设备经纬度度数或者度分秒

文章目录 一、背景介绍二、页面效果三、代码0.pom.xml1.ImportDevice.vue2.ImportDeviceError.vue3.system.js4.DeviceManageControl5.DeviceManageUserControl6.Repeater7.FileUtils8.ResponseModel9.EnumLongitudeLatitude10.词条 四、注意点本人其他相关文章链接 一、背景介…...

使用python访问mindie部署的vl多模态模型

说明 今天使用mindie1.0部署了qwen2_7b_vl模型&#xff0c;测试过程出现一些问题&#xff0c;这里总结下。 问题1&#xff1a;transformers版本太低 报错信息&#xff1a; [ERROR] [model_deploy_config.cpp:159] Failed to get vocab size from tokenizer wrapper with ex…...