当前位置: 首页 > news >正文

zk源码—2.通信协议和客户端原理二

大纲

1.ZooKeeper如何进行序列化

2.深入分析Jute的底层实现原理

3.ZooKeeper的网络通信协议详解

4.客户端的核心组件和初始化过程

5.客户端核心组件HostProvider

6.客户端核心组件ClientCnxn

7.客户端工作原理之会话创建过程

6.客户端核心组件ClientCnxn

(1)客户端核心类ClientCnxn和Packet

(2)请求队列outgoingQueue与响应等待队列pendingQueue

(3)SendThread

(4)EventThread

(5)总结

(1)客户端核心类ClientCnxn和Packet

一.ClientCnxn

ClientCnxn是zk客户端的核心工作类,负责维护客户端与服务端间的网络连接并进行一系列网络通信。

二.Packet

Packet是ClientCnxn内部定义的、作为zk客户端中请求与响应的载体。也就是说Packet可以看作是一个用来进行网络通信的数据结构,Packet的主要作用是封装网络通信协议层的数据。

Packet中包含了一些请求协议的相关属性字段:请求头信息requestHeader、响应头信息replyHeader、请求体request、响应体response、节点路径clientPath以及serverPath、Watcher监控信息。

Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。该方法只会将requestHeader、request和readOnly三个属性进行序列化。Packet的其余属性保存在客户端的上下文,不进行服务端的网络传输。

public class ApiOperatorDemo implements Watcher {private final static String CONNECT_STRING = "192.168.30.10:2181";private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zookeeper;public static void main(String[] args) throws Exception {zookeeper = new ZooKeeper(CONNECT_STRING, 5000, new ApiOperatorDemo());countDownLatch.await();String result = zookeeper.setData("/node", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}@Overridepublic void process(WatchedEvent watchedEvent) {//如果当前的连接状态是连接成功的,那么通过计数器去控制if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();}}
}public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {if (clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider;//创建ClientCnxn实例cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);cnxn.start();}protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);}private ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null || clientCnxnSocketName.equals(ClientCnxnSocketNIO.class.getSimpleName())) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();} else if (clientCnxnSocketName.equals(ClientCnxnSocketNetty.class.getSimpleName())) {clientCnxnSocketName = ClientCnxnSocketNetty.class.getName();}Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());return clientCxnSocket;}...public Stat setData(final String path, byte data[], int version) {final String clientPath = path;PathUtils.validatePath(clientPath);final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.setData);SetDataRequest request = new SetDataRequest();request.setPath(serverPath);request.setData(data);request.setVersion(version);SetDataResponse response = new SetDataResponse();//提交请求ReplyHeader r = cnxn.submitRequest(h, request, response, null);...return response.getStat();}...
}public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}public void start() {sendThread.start();eventThread.start();}...public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) {return submitRequest(h, request, response, watchRegistration, null);}public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {ReplyHeader r = new ReplyHeader();//封装成Packet对象Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);synchronized (packet) {if (requestTimeout > 0) {waitForPacketFinish(r, packet);} else {while (!packet.finished) {packet.wait();}}}if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {sendThread.cleanAndNotifyState();}return r;}public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {Packet packet = null;packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;synchronized (state) {if (!state.isAlive() || closing) {conLossPacket(packet);} else {if (h.getType() == OpCode.closeSession) {closing = true;}//将Packet对象添加到outgoingQueue队列,后续请求的发送交给SendThread来处理outgoingQueue.add(packet);}}sendThread.getClientCnxnSocket().packetAdded();return packet;}...static class Packet {RequestHeader requestHeader;//请求头ReplyHeader replyHeader;//响应头Record request;//请求体Record response;//响应体ByteBuffer bb;String clientPath;//节点路径String serverPath;//节点路径boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration;public boolean readOnly;WatchDeregistration watchDeregistration;...public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}}...
}

(2)请求队列outgoingQueue与响应等待队列pendingQueue

ClientCnxn中有两个核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端的响应等待队列。

outgoingQueue队列是一个客户端的请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。

pendingQueue队列是一个服务端的响应等待队列,用于存储已从客户端发送到服务端,但是需要等待服务端响应的Packet集合。

当zk客户端对请求信息进行封装和序列化后,zk不会立刻就将一个请求信息通过网络直接发送给服务端,而是会先将请求信息添加到请求队列中,之后通过SendThread线程来处理相关的请求发送操作。

public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();...public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}public void start() {sendThread.start();eventThread.start();}...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;setDaemon(true);}...@Overridepublic void run() {...while (state.isAlive()) {...//通过clientCnxnSocket.doTransport方法处理请求发送和响应接收clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);...}...}}
}

一.请求发送

SendThread线程在调用ClientCnxnSocket的doTransport()方法时,会从ClientCnxn的outgoingQueue队列中提取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet对象的请求头中,然后再调用Packet对象的createBB方法进行序列化,最后才发送出去。

请求发送完毕后,会立即将该Packet对象保存到pendingQueue队列中,以便等待服务端的响应返回后可以进行相应的处理。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {private final Selector selector = Selector.open();protected ClientCnxn.SendThread sendThread;protected LinkedBlockingDeque<Packet> outgoingQueue;...@Overridevoid doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}updateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();sendThread.primeConnection();}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//通过doIO方法处理请求发送和响应接收doIO(pendingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}}selected.clear();}void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();//处理响应接收if (sockKey.isReadable()) {...}//处理请求发送if (sockKey.isWritable()) {//从outgoingQueue队列中提取出一个可发送的Packet对象Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());if (p != null) {updateLastSend();if (p.bb == null) {if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) {//同时生成一个客户端请求序号XID并将其设置到Packet对象的请求头中p.requestHeader.setXid(cnxn.getXid());}//进行序列化p.createBB();}//发送请求给服务端sock.write(p.bb);if (!p.bb.hasRemaining()) {sentCount.getAndIncrement();outgoingQueue.removeFirstOccurrence(p);if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {disableWrite();} else {enableWrite();}}}...
}

二.响应接收

客户端获取到来自服务端的响应后,其中的SendThread线程在调用ClientCnxnSocket的doTransport()方法时,便会调用ClientCnxnSocket的doIO()方法,根据不同的响应进行不同的处理。

情况一:如果检测到当前客户端尚未进行初始化,则客户端和服务端还在创建会话,那么此时就直接将收到的ByteBuffer序列化成ConnectResponse对象。

情况二:如果接收到的服务端响应是一个事件,那么此时就会将接收到的ByteBuffer序列化成WatcherEvent对象,并将WatchedEvent对象放入待处理队列waitingEvents中。

情况三:如果接收到的服务端响应是一个常规的请求响应,那么就从pendingQueue队列中取出一个Packet对象来进行处理;此时zk客户端会检验服务端响应中包含的XID值来确保请求处理的顺序性,然后再将接收到的ByteBuffer序列化成相应的Response对象。

最后,会在finishPacket()方法中处理Packet对象中关联的Watcher事件。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();//处理响应接收if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);...if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount.getAndIncrement();readLength();} else if (!initialized) {//如果检测到当前客户端的网络练车ClientCnxnSocket尚未进行初始化readConnectResult();enableRead();if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else {//处理服务端返回的响应sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}//处理请求发送if (sockKey.isWritable()) {...}}...
}abstract class ClientCnxnSocket {protected ByteBuffer incomingBuffer = lenBuffer;...void readConnectResult() throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);//将接收到的ByteBuffer序列化成ConnectResponse对象ConnectResponse conRsp = new ConnectResponse();conRsp.deserialize(bbia, "connect");// read "is read-only" flagboolean isRO = false;isRO = bbia.readBool("readOnly");this.sessionId = conRsp.getSessionId();//通过SendThread.onConnected方法建立连接sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);}...
}public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");...//如果服务端返回的响应是一个事件if (replyHdr.getXid() == -1) {//将接收到的ByteBuffer序列化成WatcherEvent对象WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");...WatchedEvent we = new WatchedEvent(event);...//将WatchedEvent对象放入待处理队列waitingEvents中eventThread.queueEvent( we );return;}...//如果服务端返回的响应是一个常规的请求响应Packet packet;synchronized (pendingQueue) {//从pendingQueue队列中取出一个Packetpacket = pendingQueue.remove();}try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("...");}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}//将接收到的ByteBuffer序列化成Response对象if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}} finally {finishPacket(packet);}}}...protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());p.replyHeader.setErr(Code.OK.intValue());}}}if (p.cb == null) {synchronized (p) {p.finished = true;//客户端封装好Packet发送请求时,会调用Packet对象的wait()方法进行阻塞,这里就进行了通知p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}...
}

(3)SendThread

SendThread是客户端ClientCnxn内部的一个IO调度线程,SendThread的作用是用于管理客户端和服务端之间的所有网络IO操作。

在zk客户端的实际运行过程中:

一.一方面SendThread会维护客户端与服务端之间的会话生命周期

通过在一定的周期频率内向服务端发送一个PING包来实现心跳检测。同时如果客户端和服务端出现TCP连接断开,就会自动完成重连操作。

二.另一方面SendThread会管理客户端所有的请求发送和响应接收操作

将上层客户端API操作转换成相应的请求协议并发送到服务端,并且完成对同步调用的返回和异步调用的回调,同时SendThread还负责将来自服务端的事件传递给EventThread去处理。

注意:为了向服务端证明自己还存活,客户端会周期性发送Ping包给服务端。服务端收到Ping包之后,会根据当前时间重置与客户端的Session时间,更新该Session的请求延迟时间,进而保持客户端与服务端的连接状态。

public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...//处理服务端的响应void readResponse(ByteBuffer incomingBuffer) throws IOException {...eventThread.queueEvent( we );...finishPacket(packet);...}@Overridepublic void run() {...while (state.isAlive()) {...//出现TCP连接断开,就会自动完成重连操作if (!clientCnxnSocket.isConnected()) {if (rwServerAddress != null) {serverAddress = rwServerAddress;rwServerAddress = null;} else {serverAddress = hostProvider.next(1000);}startConnect(serverAddress);clientCnxnSocket.updateLastSendAndHeard();}...if (state.isConnected()) {...//发送PING包进行心跳检测sendPing();...}}...//处理请求发送和响应接收clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);}private void sendPing() {lastPingSentNs = System.nanoTime();RequestHeader h = new RequestHeader(-2, OpCode.ping);queuePacket(h, null, null, null, null, null, null, null, null);}...}...
}

(4)EventThread

EventThread是客户端ClientCnxn内部的另一个核心线程,EventThread负责触发客户端注册的Watcher监听和异步接口注册的回调。

EventThread中有一个waitingEvents队列,临时存放要被触发的Object,这些Object包括客户端注册的Watcher监听和异步接口中注册的回调。

EventThread会不断从waitingEvents队列中取出Object,然后识别出具体类型是Watcher监听还是AsyncCallback回调,最后分别调用process()方法和processResult()方法来实现事件触发和回调。

public class ClientCnxn {...class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();EventThread() {super(makeThreadName("-EventThread"));setDaemon(true);}@Overridepublic void run() {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled) {synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}}}public void queuePacket(Packet packet) {if (wasKilled) {synchronized (waitingEvents) {if (isRunning) waitingEvents.add(packet);else processEvent(packet);}} else {waitingEvents.add(packet);}}...}...
}

(5)总结

客户端ClientCnxn的工作原理:

当客户端向服务端发送请求操作时,首先会将请求信息封装成Packet对象并加入outgoingQueue请求队列中,之后通过SendThread网络IO调度线程将请求发送给服务端。当客户端接收到服务端响应时,通过EventThread线程来处理服务端响应及触发Watcher监听和异步回调。

7.客户端工作原理之会话创建过程

(1)初始化阶段:实例化ZooKeeper对象

(2)会话创建阶段:建立连接并发送会话创建请求

(3)响应处理阶段:接收会话创建请求的响应

(1)初始化阶段:实例化ZooKeeper对象

一.初始化ZooKeeper对象

二.设置会话默认的Watcher

三.构造服务器地址管理器StaticHostProvider

四.创建并初始化客户端的网络连接器ClientCnxn

五.初始化SendThread和EventThread

一.初始化ZooKeeper对象

通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象。在初始化过程中,会创建客户端的Watcher管理器ZKWatchManager。

二.设置会话默认的Watcher

如果在ZooKeeper的构造方法中传入了一个Watcher对象,那么客户端会将该对象作为默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中。

三.构造服务器地址管理器StaticHostProvider

在ZooKeeper构造方法中传入的服务器地址字符串,客户端会将其存放在服务器地址列表管理器StaticHostProvider中。

四.创建并初始化客户端的网络连接器ClientCnxn

创建的网络连接器ClientXnxn是用来管理客户端与服务端的网络交互。ClientCnxn中有两个核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端的响应等待队列。ClientCnxn是客户端的网络连接器,ClientCnxnSocket是客户端的网络连接,ClientCnxn构造方法会传入ClientCnxnSocket。

五.初始化SendThread和EventThread

ClientCnxn的构造方法会创建两个核心线程SendThread和EventThread。SendThread用于管理客户端和服务端之间的所有网络IO,EventThread用于处理客户端的事件,比如Watcher和回调等。

初始化SendThread时,会将ClientCnxnSocket分配给SendThread作为底层网络IO处理器。初始化EventThread时,会初始化队列waitingEvents用于存放所有等待被客户端处理的事件。

public class CreateSessionDemo {private final static String CONNECTSTRING = "192.168.1.5:2181";private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {//创建zkZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {public void process(WatchedEvent watchedEvent) {//如果当前的连接状态是连接成功, 则通过计数器去控制, 否则进行阻塞, 因为连接是需要时间的//如果已经获得连接了, 那么状态会是SyncConnectedif (watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();System.out.println(watchedEvent.getState());}//如果数据发生了变化if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {System.out.println("节点发生了变化, 路径: " + watchedEvent.getPath());}}});//进行阻塞countDownLatch.await();...}
}public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;protected final ZKWatchManager watchManager;//ZKWatchManager实现了ClientWatchManager...//1.初始化ZooKeeper对象public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {...//创建客户端的Watcher管理器ZKWatchManagerwatchManager = defaultWatchManager();//2.设置会话默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);//3.构造服务器地址列表管理器StaticHostProviderhostProvider = aHostProvider;//4.创建并初始化客户端的网络连接器ClientCnxn + 5.初始化SendThread和EventThreadcnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);//6.启动SendThread和EventThreadcnxn.start();}protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);}//从配置中获取客户端使用的网络连接配置:使用NIO还是Netty,然后通过反射进行实例化客户端Socketprivate ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());return clientCxnSocket;}static class ZKWatchManager implements ClientWatchManager {private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();protected volatile Watcher defaultWatcher;...}protected ZKWatchManager defaultWatchManager() {//创建客户端的Watcher管理器ZKWatchManagerreturn new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));}...
}public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();private final HostProvider hostProvider;public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...this.hostProvider = hostProvider;//5.初始化SendThread和EventThreadsendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));//客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTINGstate = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;//设置为守护线程setDaemon(true);}...}class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();EventThread() {super(makeThreadName("-EventThread"));setDaemon(true);}}...
}

(2)会话创建阶段:建立连接并发送会话创建请求

一.启动SendThread和EventThread

二.获取一个服务端地址

三.创建TCP连接

四.构造ConnectRequest请求

五.发送ConnectRequest请求

一.启动SendThread和EventThread

即执行SendThread和EventThread的run()方法。

二.获取一个服务端地址

在开始创建TCP连接前,SendThread需要先获取一个zk服务端地址,也就是通过StaticHostProvider的next()方法获取出一个地址。

然后把该地址委托给初始化SendThread时传入的ClientCnxnSocket去创建一个TCP连接。

三.创建TCP连接

首先在SocketChannel中注册OP_CONNECT,表明发起建立TCP连接的请求。

然后执行SendThread的primeConnection()方法发起创建TCP长连接的请求。

四.构造ConnectRequest请求

SendThread的primeConnection()方法会构造出一个ConnectRequest请求,ConnectRequest请求代表着客户端向服务端发起的是一个创建会话请求。

SendThread的primeConnection()方法会将该请求包装成IO层的Packet对象,然后将该Packet对象放入outgoingQueue请求发送队列中。

五.发送ConnectRequest请求

ClientCnxnSocket会从outgoingQueue请求发送队列取出待发送的Packet,然后将其序列化成ByteBuffer后再发送给服务端。

ClientCnxnSocket是客户端的网络连接,ClientCnxn是客户端的网络连接器。

public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;...//初始化ZooKeeper对象public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {...cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);//启动SendThread和EventThreadcnxn.start();}...
}public class ClientCnxn {//1.启动SendThread和EventThreadpublic void start() {sendThread.start();eventThread.start();}class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));//客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTINGstate = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;//设置为守护线程setDaemon(true);}@Overridepublic void run() {clientCnxnSocket.introduce(this, sessionId, outgoingQueue);InetSocketAddress serverAddress = null;...while (state.isAlive()) {...//2.获取其中一个zk服务端的地址serverAddress = hostProvider.next(1000);//向zk服务端发起建立连接请求startConnect(serverAddress);...//4.构造请求 + 5.发送请求 + 处理响应clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);}...}private void startConnect(InetSocketAddress addr) throws IOException {...//3.委托给初始化SendThread时传给SendThread的clientCnxnSocket去创建TCP连接//接下来以ClientCnxnSocketNetty的connect为例clientCnxnSocket.connect(addr);}void primeConnection() throws IOException {...long sessId = (seenRwServerBefore) ? sessionId : 0;//4.构造ConnectRequest请求-会话创建请求ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);...//把会话创建请求放入请求发送队列outgoingQueueoutgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));...}...}...
}public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock();//3.创建TCP长连接registerAndConnect(sock, addr);initialized = false;//Reset incomingBufferlenBuffer.clear();incomingBuffer = lenBuffer;}void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {//先在SocketChannel中注册OP_CONNECT事件,表明发起建立TCP连接的请求sockKey = sock.register(selector, SelectionKey.OP_CONNECT);boolean immediateConnect = sock.connect(addr);if (immediateConnect) {sendThread.primeConnection();}}void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}...for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {//对于要发起建立TCP连接的请求,则执行sendThread.primeConnection()方法if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();//比如处理发送会话创建的请求sendThread.primeConnection();}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//处理建立好TCP连接后的其他读写请求doIO(pendingQueue, cnxn);}}...}void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {SocketChannel sock = (SocketChannel) sockKey.channel();...//6.接收服务端对会话创建请求的响应if (sockKey.isReadable()) {...}//5.发送会话创建请求if (sockKey.isWritable()) {//从outgoingQueue中取出会话创建请求的Packet对象Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());...//进行序列化后发送到服务端p.createBB();sock.write(p.bb);outgoingQueue.removeFirstOccurrence(p);pendingQueue.add(p);...}...}...
}

(3)响应处理阶段:接收会话创建请求的响应

一.接收服务端对会话创建请求的响应

二.处理会话创建请求的响应

三.更新ClientCnxn客户端连接器

四.生成SyncConnected-None事件

五.从ZKWatchManager查询Watcher

六.EventThread线程触发处理Watcher

一.接收服务端对会话创建请求的响应

客户端的网络连接接收到服务端响应后,会先判断自己是否已被初始化。如果尚未初始化,那么就认为该响应是会话创建请求的响应,直接通过ClientCnxnSocket的readConnectResult()方法进行处理。ClientCnxnSocket是客户端的网络连接,ClientCnxn是客户端的网络连接器。

二.处理会话创建请求的响应

ClientCnxnSocket的readConnectResult()方法会对响应进行反序列化,也就是反序列化成ConnectResponse对象,然后再从该对象中获取出会话ID。

三.更新ClientCnxn客户端连接器

服务端的响应表明连接成功,那么就需要通知SendThread线程,通过SendThread线程进一步更新ClientCnxn客户端连接器的信息,包括readTimeout、connectTimeout、会话状态、HostProvider.lastIndex。

四.生成SyncConnected-None事件

为了让上层应用感知会话已成功创建,SendThread会生成一个SyncConnected-None事件代表会话创建成功,并将该事件通过EventThread的queueEvent()方法传递给EventThread线程。

五.从ZKWatchManager查询Watcher

EventThread线程通过queueEvent方法收到事件后,会从ZKWatchManager管理器查询出对应的Watcher,然后将Watcher放到EventThread的waitingEvents队列中。

客户端的Watcher管理器是ZKWatchManager。

服务端的Watcher管理器是WatchManager。

六.EventThread线程触发处理Watcher

EventThread线程会不断从waitingEvents队列取出待处理的Watcher对象,然后调用Watcher的process()方法来触发Watcher。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {SocketChannel sock = (SocketChannel) sockKey.channel();...//1.接收服务端对会话创建请求的响应if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount.getAndIncrement();readLength();} else if (!initialized) {//判断客户端的网络连接是否已初始化//收到服务端响应时,还没有建立连接,说明这次响应是对建立TCP连接的响应//2.处理会话创建请求的响应readConnectResult();enableRead();if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;//设置客户端的网络连接为已初始化} else {//处理服务端的非建立连接请求的响应sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}if (sockKey.isWritable()) {...}}...
}abstract class ClientCnxnSocket {...void readConnectResult() throws IOException {//对会话创建请求的响应进行反序列化ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ConnectResponse conRsp = new ConnectResponse();conRsp.deserialize(bbia, "connect");boolean isRO = false;try {isRO = bbia.readBool("readOnly");} catch (IOException e) {LOG.warn("Connected to an old server; r-o mode will be unavailable");}this.sessionId = conRsp.getSessionId();//3.更新ClientCnxn客户端连接器:包括状态、HostProvider的lastIndex游标sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);}...
}public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException {negotiatedSessionTimeout = _negotiatedSessionTimeout;if (negotiatedSessionTimeout <= 0) {changeZkState(States.CLOSED);eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));eventThread.queueEventOfDeath();}readTimeout = negotiatedSessionTimeout * 2 / 3;connectTimeout = negotiatedSessionTimeout / hostProvider.size();hostProvider.onConnected();sessionId = _sessionId;sessionPasswd = _sessionPasswd;changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;//4.生成SyncConnected-None事件eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));}}private final ClientWatchManager watcher;class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();public void queueEvent(WatchedEvent event) {queueEvent(event, null);}private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {if (event.getType() == EventType.None && sessionState == event.getState()) {return;}sessionState = event.getState();final Set<Watcher> watchers;if (materializedWatchers == null) {watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());} else {watchers = new HashSet<Watcher>();watchers.addAll(materializedWatchers);}WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);waitingEvents.add(pair);}public void run() {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//5.EventThread触发处理WatcherprocessEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}}}private void processEvent(Object event) {if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {watcher.process(pair.event);}}...}...}
}

相关文章:

zk源码—2.通信协议和客户端原理二

大纲 1.ZooKeeper如何进行序列化 2.深入分析Jute的底层实现原理 3.ZooKeeper的网络通信协议详解 4.客户端的核心组件和初始化过程 5.客户端核心组件HostProvider 6.客户端核心组件ClientCnxn 7.客户端工作原理之会话创建过程 6.客户端核心组件ClientCnxn (1)客户端核心…...

Python设计模式:构建模式

1. 什么是构建模式 构建模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;它允许使用多个简单的对象一步步构建一个复杂的对象。构建模式通过将构建过程与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。换句话说&#xff0c;构建模…...

C++类间的 “接力棒“ 传递:继承(下)

文章目录 5. 继承与友元6.继承与静态成员7.菱形继承8.继承和组合希望读者们多多三连支持小编会继续更新你们的鼓励就是我前进的动力&#xff01; 本篇接着补充继承方面的内容&#xff0c;同时本篇的菱形继承尤为重要 5. 继承与友元 class Student; class Person { public:fri…...

C++11QT复习 (十六)

文章目录 Day11 移动语义回顾一、移动语义基础概念二、自定义 String 类的移动语义实现输出运算符重载&#xff1a; 三、测试函数&#xff1a;验证移动与拷贝行为四、左值与右值的补充说明右值引用作为函数返回值 五、知识总结如何区分左值与右值&#xff1f; 六、附加说明&…...

Redis客户端命令到服务器底层对象机制的完整流程?什么是Redis对象机制?为什么要有Redis对象机制?

Redis客户端命令到服务器底层对象机制的完整流程 客户端 → RESP协议封装 → TCP传输 → 服务器事件循环 → 协议解析 → 命令表查找 → 对象机制 → 动态编码 → 数据结构操作 → 响应编码 → 网络回传 Redis客户端命令到服务器底层对象机制的完整流程可分为协议封装、命令解…...

鸿蒙NEXT开发节流、防抖工具类(ArkTs)

import { CacheUtil } from ./CacheUtil; import { DateUtil } from ./DateUtil;/*** 节流、防抖工具类&#xff08;用于点击事件&#xff0c;防止按钮被重复点击&#xff09;** author 鸿蒙布道师* since 2025/04/07*/ export class ClickUtil {private static throttleTimeou…...

Qt程序 Windows打包

目的 运行Qt的程序&#xff0c;遇上如下问题&#xff1a; 显然是少很多Qt库&#xff0c;那就把Qt库放到这里&#xff0c;Qt提供这一个命令windeployqt.exe. windeployqt windeployqt是Qt框架提供的一个工具&#xff0c;主要用于自动打包Windows平台上的Qt应用程序及其依赖项…...

2025-04-07(DS复习):Databricks DLT 详解

Databricks Delta Live Tables (DLT) 详解 Delta Live Tables (DLT) 是 Databricks 提供的一个智能框架&#xff0c;用于构建可靠、可扩展的数据处理管道。它简化了ETL(提取、转换、加载)和ELT(提取、加载、转换)流程的开发和管理&#xff0c;特别适合在数据湖house架构中实现…...

音视频入门基础:RTCP专题(3)——RTCP协议简介(中)

本文接着《音视频入门基础&#xff1a;RTCP专题&#xff08;2&#xff09;——RTCP协议简介&#xff08;上&#xff09;》&#xff0c;继续对RTCP协议进行简介。本文的一级标题从“九”开始。 九、Sender and Receiver Reports 本段内容对应《RFC 3550》的第6.4节。根据《RFC …...

嵌入式工程师多线程编程(二)生产者-消费者模式

生产者-消费者模式详解&#xff1a;多线程编程的核心范式 生产者-消费者模式(Producer-Consumer Pattern)是多线程编程中最经典的设计模式之一&#xff0c;它通过解耦生产者和消费者的工作流程&#xff0c;实现了线程间的高效协作与资源管理。本文将深入剖析这一模式的原理、实…...

秒杀系统的性能优化

秒杀任务总体QPS预期是每秒几十万&#xff0c;对tomcat、redis、JVM参数进行优化。 tomcat线程数 4核8G的机器&#xff0c;一般就是开200-300个工作线程&#xff0c;这是个经验值。每秒一个线程处理3-5个请求&#xff0c;200多个线程的QPS可以达到1000左右。线程不能太多&…...

MySQL学习笔记集--索引

索引 索引是数据库中用于提高查询效率的一种数据结构。 它类似于书籍的目录&#xff0c;通过索引可以快速定位到表中的特定行&#xff0c;而无需扫描整个表。 索引的类型 主键索引&#xff08;Primary Key Index&#xff09; 自动创建&#xff0c;用于唯一标识表中的每一行。…...

深入理解重排(Reflow)与重绘(Repaint),写出高性能 CSS 动画

在前端开发中&#xff0c;CSS 动画是提升用户体验的重要手段&#xff0c;但很多开发者在使用动画时并不了解浏览器背后的渲染机制&#xff0c;导致动画卡顿甚至影响整体性能。本文将带你深入理解 CSS 中的两大核心概念 —— 重排&#xff08;Reflow&#xff09; 与 重绘&#x…...

Elasticsearch 从入门到实战:文档聚合操作及总结

四、文档操作&#xff1a;数据的增删改查 4.1 添加文档 文档&#xff08;Document&#xff09;是索引中的最小数据单元&#xff0c;使用 POST 或 PUT 添加&#xff1a; json POST /products/_doc/1 { "name": "华为Mate50 Pro", "price": 6…...

前缀和和差分笔记

前缀和和差分笔记 一维前缀和 示意图如下&#xff1a; 代码&#xff1a; **核心公式&#xff1a;sum[i]sum[i-1]a[i];&#xff08;计算前缀和的&#xff09;**#include<bits/stdc.h> using namespace std; const int N10000; #define ll long long int a[N],sum[N]; i…...

SSRF漏洞利用的小点总结和实战演练

含义理解&#xff1a; SSRF&#xff08;Server-Side Request Forgery&#xff0c;服务器请求伪造&#xff09;是一种由攻击者构造请求&#xff0c;由服务端发起请求的安全漏洞&#xff0c;一般情况下&#xff0c;SSRF攻击的目标是外网无法访问的内网系统。 攻击者通过篡改URL…...

IAR推动嵌入式开发:云就绪、可扩展的CI/CD和可持续自动化

全球领先的嵌入式系统开发软件解决方案供应商IAR正式发布全新云就绪平台&#xff0c;为嵌入式开发团队提供企业级的可扩展性、安全性和自动化能力。该平台于在德国纽伦堡举办的embedded world 2025展会上正式亮相&#xff0c;标志着将现代DevSecOps工作流集成到嵌入式软件开发中…...

瓦片数据合并方法

影像数据 假如有两份影像数据 1.全球底层影像0-5级别如下&#xff1a; 2.局部高清影像数据级别9-14如下&#xff1a; 合并方法 将9-14文件夹复制到全球底层0-5的目录下 如下&#xff1a; 然后合并xml文件 使得Tileset设置到最高级&#xff08;包含所有级别&#xff09;&…...

RISC-V AIA学习---IPI 处理器间中断

对于有多个hart的机器&#xff0c;必须为每个 hart 提供一个由具体实现定义的内存地址。向这个地址写入数据&#xff0c;就能向该 hart 发送一个机器级软件中断&#xff08;主代码为 3&#xff09;。换句话说&#xff0c;机器级的 IPI 可以通过这种方式&#xff0c;以机器级软件…...

Automattic 裁员16%,Matt Mullenweg称此举旨在提升盈利能力并增强投资实力

2025年4月3日&#xff0c;Automattic——这家以 WordPress.com、Tumblr 和 WooCommerce 等产品闻名的公司&#xff0c;宣布裁减其全球员工队伍的16%。这一决定是在周三通过公司博客文章和 Slack 内部消息向员工透露的。根据裁员前 Automattic 官网显示的员工人数&#xff08;1,…...

图解AUTOSAR_SWS_FlexRayInterface

AUTOSAR FlexRay Interface 模块分析 本文档基于AUTOSAR SWS FlexRayInterface规范,对FlexRay Interface模块进行详细分析。 1. FlexRay Interface 模块架构 1.1 模块架构概览 1.2 架构说明 FlexRay Interface模块是AUTOSAR中的ECU抽象层组件,为上层模块提供统一的抽象接…...

AI赋能ArcGIS Pro——水系网络AI智能提取 | GIS人工智能制图技术解析

我们之前做了做了几期的AIGIS的分享。我们今天要再次做一个分享。 AI赋能ArcGIS Pro——水系网络智能提取全解析 DeepSeek结合ArcGIS Pro制作一个批量建库的脚本工具&#xff08;代码一字未改&#xff0c;直接运行&#xff09; 看老外如何玩DeepSeek&#xff01;15分钟快速创…...

STM32江科大----IIC

声明&#xff1a;本人跟随b站江科大学习&#xff0c;本文章是观看完视频后的一些个人总结和经验分享&#xff0c;也同时为了方便日后的复习&#xff0c;如果有错误请各位大佬指出&#xff0c;如果对你有帮助可以点个赞小小鼓励一下&#xff0c;本文章建议配合原视频使用❤️ 如…...

RAG(检索增强生成)系统,提示词(Prompt)表现测试(数据说话)

在RAG(检索增强生成)系统中,评价提示词(Prompt)设计是否优秀,必须通过量化测试数据来验证,而非主观判断。以下是系统化的评估方法、测试指标和具体实现方案: 一、提示词优秀的核心标准 优秀的提示词应显著提升以下指标: 维度量化指标测试方法事实一致性Faithfulness …...

【leetcode hot 100 763】划分字母区间

解法一&#xff1a;用map记录<字母&#xff0c;字母出现的次数>&#xff0c;循环取出value-1&#xff0c;每次判断已经取出的字母&#xff08;Set记录&#xff09;是否还在后面存在&#xff08;value>1&#xff09;&#xff0c;若存在继续循环&#xff0c;若不存在开启…...

PCB工艺:现代电子产品的核心制造技术

引言 PCB&#xff08;Printed Circuit Board&#xff0c;印刷电路板&#xff09;是电子设备的核心组成部分&#xff0c;几乎所有现代电子产品&#xff0c;从智能手机到航天设备&#xff0c;都依赖于PCB实现电路连接。PCB制造工艺的进步直接影响电子产品的性能、可靠性和成本。…...

【UE5 C++课程系列笔记】34——结构体与Json的相互转化

目录 准备工作 一、结构体转Json 二、Json转结构体 三、复杂结构体与Json的转换 主要通过借助FJsonObjectConverter类实现结构体和 JSON 之间的相互转换。 准备工作 首先新建一个结构体如下 添加两个方法分别用于将Struct转为Json、Json转为Struct 一、结构体转Json FStri…...

2025最新系统 Git 教程(二)

第2章 Git基础 2.1 Git 基础 - 获取 Git 仓库 如果你只想通过阅读一章来学习 Git&#xff0c;那么本章将是你的不二选择。 本章涵盖了你在使用 Git 完成各种工作时将会用到的各种基本命令。 在学习完本章之后&#xff0c;你应该能够配置并初始化一个仓库&#xff08;reposito…...

力扣hot100_动态规划

动态规划 hot100_198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。…...

玄机-第六章-哥斯拉4.0流量分析的测试报告

目录 一、测试环境 二、测试目的 三、操作过程 Flag1 Flag2 Flag3 Flag4 Flag5 Flag6 Flag7 Flag8 Flag9 Flag10 Flag11 Flag12 Flag13 pam_unix.so关键代码 四、结论 一、测试环境 靶场介绍&#xff1a;国内厂商设置的玄机靶场&#xff0c;以应急响应题目著…...

【Hadoop入门】Hadoop生态圈概述:核心组件与应用场景概述

1 Hadoop生态圈概述 Hadoop生态圈是以 HDFS&#xff08;分布式存储&#xff09; 和 YARN&#xff08;资源调度&#xff09; 为核心&#xff0c;围绕大数据存储、计算、管理、分析等需求发展出的一系列开源工具集合。 核心特点&#xff1a; 模块化&#xff1a;各组件专注解决特定…...

深度学习实战电力设备缺陷检测

本文采用YOLOv11作为核心算法框架&#xff0c;结合PyQt5构建用户界面&#xff0c;使用Python3进行开发。YOLOv11以其高效的实时检测能力&#xff0c;在多个目标检测任务中展现出卓越性能。本研究针对电力设备缺陷数据集进行训练和优化&#xff0c;该数据集包含丰富的电力设备缺…...

随机产生4位随机码(java)

Random类&#xff1a; 用于生成随机数 import java.util.Random; 导入必要的类 generateVerificationCode()方法&#xff1a; 这是一个静态方法&#xff0c;可以直接通过类名调用 返回一个6位数字的字符串&#xff0c;首位不为0 生成首位数字&#xff1a; random.nextInt…...

音视频入门基础:RTCP专题(4)——RTCP协议简介(下)

本文接着《音视频入门基础&#xff1a;RTCP专题&#xff08;3&#xff09;——RTCP协议简介&#xff08;中&#xff09;》&#xff0c;继续对RTCP协议进行简介。本文的一级标题从“十四”开始。 十四、SDES: Source Description RTCP Packet 本段内容对应《RFC 3550》的第6.5节…...

PyCharm2024.3.5专业版解决Conda executable is not found问题

项目场景&#xff1a; pycharm使用anaconda 内的虚拟环境 pycharm 2024.3.5 专业版 C:\Users\Administrator>conda infoactive environment : transmute_recipe_generatoractive env location : D:\anaconda3\envs\transmute_recipe_generatorshell level : 1user config…...

滑动窗口思想 面试算法高频题

基本思想 滑动窗口思想其实就是快慢型的特例 计算机网络中滑动窗口协议&#xff08;Sliding Window Protocol&#xff09;&#xff0c;该协议是TCP实现流量控制等的核心策略之一。事实上在与流量控制、熔断、限流、超时等场景下都会首先从滑动窗口的角度来思考问题&#xff0…...

Linux中特殊的变量

1.$# 含义&#xff1a;表示传入脚本或函数的参数数量。 用法&#xff1a;用于检查用户是否提供了足够的参数。 示例&#xff1a; #!/bin/bash echo "参数数量: $#"2.$? 含义&#xff1a;表示上一条命令的退出状态。如果命令成功执行&#xff0c;值为 0&#xff1b;…...

Linux文件系统与日志分析

目录 一.日志 1.1日志的定义 1.2日志的功能 1.3日志的分类 1.4日志的文件格式 1.5用户日志 1.6一些常见的日志 1.7日志消息的级别 二.系统日志管理 rsyslog 2.1rsyslog的定义 2.2rsyslog 配置文件 2.3rsyslog的实际应用----单独显示某一服务的日志 1.编辑rsyslog配…...

从传统物流到智能调度的全链路升级

一、TMS系统升级的核心目标与整体框架 &#xff08;一&#xff09;为什么要升级&#xff1f;传统物流管理的三大痛点 调度效率低下&#xff1a;过去依赖人工分单、手动匹配承运商&#xff0c;订单量大时容易出错&#xff0c;比如不同区域的订单混排导致运输路线绕路&#xff…...

UE5中如何修复后处理动画蓝图带来的自然状态下的metablriger身体绑定形变(如耸肩)问题

【[metablriger] UE5中如何修复后处理动画蓝图带来的自然状态下的metablriger身体绑定形变(如耸肩)问题】 UE5中如何修复后处理动画蓝图带来的自然状态下的metablriger身体绑定形变(如耸肩)问题...

STL_vector_01_基本用法

&#x1f44b; Hi, I’m liubo&#x1f440; I’m interested in harmony&#x1f331; I’m currently learning harmony&#x1f49e;️ I’m looking to collaborate on …&#x1f4eb; How to reach me …&#x1f4c7; sssssdsdsdsdsdsdasd&#x1f383; dsdsdsdsdsddfsg…...

css2学习总结之尚品汇静态页面

css2总结之尚品汇 一、布局 在 PC 端网页中&#xff0c;一般都会有一个固定宽度且水平居中的盒子&#xff0c;来显示网页的主要内容&#xff0c;这是网页 的版心。 版心的宽度一般是 960 ~ 1200 像素之间。 版心可以是一个&#xff0c;也可以是多个。 二、布局相关名词 我…...

Lua 第5部分 表

表&#xff08; Table &#xff09;是 Lua 语言中最主要&#xff08;事实上也是唯一的&#xff09;和强大的数据结构。 使用表&#xff0c;Lua语言可以以一种简单、统一且高效的方式表示数组、集合、记录和其他很多数据结构。 Lua语言也使用表来表示包&#xff08; package &am…...

01分数规划

https://ac.nowcoder.com/acm/contest/22353/1011 并不需要高级数据结构&#xff0c;对答案二分即可。 假定当前二分的答案为 x x x&#xff0c;则 ∑ v i ∑ w i ≥ x \frac{ \sum_{v_i} }{\sum_{w_i}} ≥ x ∑wi​​∑vi​​​≥x 成立时 x x x 才可能是最后的答案。 化简式…...

无人机动力系统全维度解析:技术演进、选型策略与未来趋势

一、动力系统技术理念与设计逻辑 &#xff08;一&#xff09;核心技术指标 能量密度&#xff1a;决定续航能力的关键参数&#xff0c;单位为 Wh/kg。当前主流锂聚合物电池能量密度约 250-300Wh/kg&#xff0c;氢燃料电池可达 500-800Wh/kg&#xff0c;航空燃油则高达 12,000W…...

重新审视中国的GB标准(44495 – 44497)

此前&#xff0c;我们深入探讨了中国新推出的智能互联汽车(ICV)网络安全标准GB Standard 44495-2024。我们探讨了该标准对汽车制造商的影响、与UNECE R155和ISO/SAE 21434等全球标准的一致性&#xff0c;以及该标准对未来汽车网络安全的意义。 然而&#xff0c;GB 44495-2024并…...

Linux进程控制(五)之做一个简易的shell

文章目录 做一个简易的shell预备知识代码实现运行结果 做一个简易的shell 重谈Shell shell是操作系统的一层外壳程序&#xff0c;帮我们用户执行指令&#xff0c; 获取到指令后&#xff0c;交给操作系统&#xff0c;操作系统执行完后&#xff0c;把执行结果通过shell交给用户…...

Apache Kafka全栈技术解析

目录 第一章 Kafka概述与核心价值 1.1 消息队列的演进与Kafka的诞生 1.2 Kafka的核心应用场景 1.3 Kafka生态全景图 第二章 Kafka核心概念与架构解析 2.1 核心概念深度剖析 2.2 Kafka架构设计精要 第三章 Kafka环境搭建与配置 3.1 单机部署实战 3.2 集群部署最佳实践 …...

结合 Flink/Spark 进行 AI 大数据处理(实时数据 + AI 推理的应用场景)

随着企业对实时智能决策的需求日益增强,将 Flink / Spark 等流批计算框架 与 大模型推理能力相结合,正在成为 AI 工业化落地的重要实践路径。本篇文章将深入介绍如何将 AI 模型集成到大数据流处理系统中,实现实时感知、智能判断与自动反馈。 1. 为什么需要“实时数据 + AI 推…...

开发PDF时,如何比较 PDF 文件

在 PDF 论坛上&#xff0c;“如何比较 PDF 文件”是一个经常被提到的问题。在开始之前&#xff0c;重要的是要明确你想要比较的内容是什么。 不同的 PDF 文件可能看起来一样吗&#xff1f; 是的&#xff0c;可能。不同的 PDF 创建工具可能会生成在视觉上完全相同的页面&#x…...