zk源码—1.数据节点与Watcher机制及权限二
大纲
1.ZooKeeper的数据模型、节点类型与应用
(1)数据模型之树形结构
(2)节点类型与特性(持久 + 临时 + 顺序 )
(3)节点的状态结构(各种zxid + 各种version)
(4)节点的版本(version + cversion + aversion)
(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)
2.发布订阅模式:用Watcher机制实现分布式通知
(1)Watcher机制是如何实现的
(2)Watcher机制的底层原理
(3)客户端Watcher注册实现过程
(4)服务端处理Watcher过程
(5)服务端Watch事件的触发过程
(6)客户端回调Watcher的处理过程
(7)利用Watcher实现发布订阅
(8)Watcher具有的特性
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
2.发布订阅模式:用Watcher机制实现分布式通知
(4)服务端处理Watcher过程
zk服务端处理Watcher事件基本有两个过程:
一.判断收到的请求是否需要注册Watcher事件
二.将对应的Watcher事件存储到WatchManager
以下是zk服务端处理Watcher的序列图:
zk服务端接收到客户端请求后的具体处理:
一.当服务端收到客户端标记了Watcher事件的getData请求时,会调用到FinalRequestProcessor的processRequest()方法,判断当前客户端请求是否需要注册Watcher事件。
二.当getDataRequest.getWatch()的值为true时,则表明当前客户端请求需要进行Watcher注册。
三.然后将当前的ServerCnxn对象(即Watcher事件)和数据节点路径,传入到zks.getZKDatabase()的getData()方法中来实现Watcher事件的注册,也就是实现存储Watcher事件到WatchManager中。具体就是:调用DataTree.dataWatches这个WatchManager的addWatch()方法,将该客户端请求的Watcher事件(也就是ServerCnxn对象)存储到DataTree.dataWatches这个WatchManager的两个HashMap(watchTable和watch2Paths)中。
补充说明:
首先,ServerCnxn对象代表了一个客户端和服务端的连接。ServerCnxn接口的默认实现是NIOServerCnxn,也可以选NettyServerCnxn。由于NIOServerCnxn和NettyServerCnxn都实现了Watcher的process接口,所以可以把ServerCnxn对象看作是一个Watcher对象。
然后,zk服务端的数据库DataTree中会有两个WatchManager,分别是dataWatches和childWatches,分别对应节点和子节点数据变更。
接着,WatchManager中有两个HashMap:watch2Paths和watchTable。当前的ServerCnxn对象和数据节点路径最终会被存储在这两HashMap中。watchTable可以根据数据节点路径来查找对应的Watcher,watch2Paths可以根据Watcher来查找对应的数据节点路径。
同时,WatchManager除了负责添加Watcher事件,还负责触发Watcher事件,以及移除那些已经被触发的Watcher事件。
public class FinalRequestProcessor implements RequestProcessor {ZooKeeperServer zks;...public void processRequest(Request request) {...ServerCnxn cnxn = request.cnxn;...switch (request.type) {...case OpCode.getData: {...byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);...}case OpCode.getChildren: {...List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);...}}...}
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {private ZKDatabase zkDb;private FileTxnSnapLog txnLogFactory = null;...public ZKDatabase getZKDatabase() {return this.zkDb;}...
}public class ZKDatabase {protected DataTree dataTree;protected FileTxnSnapLog snapLog;...public byte[] getData(String path, Stat stat, Watcher watcher) {return dataTree.getData(path, stat, watcher);}public List<String> getChildren(String path, Stat stat, Watcher watcher) {return dataTree.getChildren(path, stat, watcher);}...
}public class DataTree {private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();private final WatchManager dataWatches = new WatchManager();private final WatchManager childWatches = new WatchManager();...public byte[] getData(String path, Stat stat, Watcher watcher) {DataNode n = nodes.get(path);synchronized (n) {n.copyStat(stat);if (watcher != null) {dataWatches.addWatch(path, watcher);}return n.data;}}public List<String> getChildren(String path, Stat stat, Watcher watcher) {DataNode n = nodes.get(path);synchronized (n) {n.copyStat(stat);if (watcher != null) {childWatches.addWatch(path, watcher);}return new ArrayList<String>(n.getChildren());}}...
}class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();...synchronized void addWatch(String path, Watcher watcher) {HashSet<Watcher> list = watchTable.get(path);if (list == null) {list = new HashSet<Watcher>(4);watchTable.put(path, list);}list.add(watcher);HashSet<String> paths = watch2Paths.get(watcher);if (paths == null) {paths = new HashSet<String>();watch2Paths.put(watcher, paths);}paths.add(path);}...
}
(5)服务端Watch事件的触发过程
对于标记了Watcher注册的请求,zk会将其对应的ServerCnxn对象(Watcher事件)存储到DataTree里的WatchManager的HashMap(watchTable和watch2Paths)中。之后,当服务端对指定节点进行数据更新后,会通过调用DataTree里的WatchManager的triggerWatch()方法来触发Watcher。
无论是触发DataTree的dataWatches,还是触发DataTree的childWatches,Watcher的触发逻辑都是一样的。
具体的Watcher触发逻辑如下:
步骤一:首先封装一个具有这三个属性的WatchedEvent对象:通知状态(KeeperState)、事件类型(EventType)、数据节点路径(path)。
步骤二:然后根据数据节点路径从DateTree的WatchManager中取出Watcher。如果为空,则说明没有任何客户端在该数据节点上注册过Watcher。如果存在,则将Watcher事件添加到自定义的Wathcers集合中,并且从DataTree的WatchManager的watchTable和watch2Paths中移除。最后调用Watcher的process()方法向客户端发送通知。
public class DataTree {private final WatchManager dataWatches = new WatchManager();private final WatchManager childWatches = new WatchManager();...public Stat setData(String path, byte data[], int version, long zxid, long time) {Stat s = new Stat();DataNode n = nodes.get(path);byte lastdata[] = null;synchronized (n) {lastdata = n.data;n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}...dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}...
}class WatchManager {...Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}...
}
具体的Watcher的process()方法,会由NIOServerCnxn来实现。Watcher的process()方法的具体逻辑如下:
步骤一:标记响应头ReplyHeader的xid为-1,表示当前响应是一个通知。
步骤二:将触发WatchManager.triggerWatch()方法时封装的WatchedEvent,包装成WatcherEvent,以便进行网络传输序列化。
步骤三:向客户端发送响应。
public interface Watcher {abstract public void process(WatchedEvent event);...
}public abstract class ServerCnxn implements Stats, Watcher {...public abstract void process(WatchedEvent event);
}public class NIOServerCnxn extends ServerCnxn {...@Overridepublic void process(WatchedEvent event) {ReplyHeader h = new ReplyHeader(-1, -1L, 0);// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();sendResponse(h, e, "notification");}
}
(6)客户端回调Watcher的处理过程
对于来自服务端的响应:客户端会使用SendThread的readResponse()方法来进行统一处理。如果反序列化后得到的响应头replyHdr的xid为-1,则表明这是一个通知类型的响应。
SendThread接收事件通知的处理步骤如下:
步骤一:反序列化成WatcherEvent对象
zk客户端接收到请求后,首先将字节流反序列化成WatcherEvent对象。
步骤二:处理chrootPath
如果客户端设置了chrootPath为/app,而服务端响应的节点路径为/app/a,那么经过chrootPath处理后,就会统一变成一个相对路径:/a。
步骤三:还原成WatchedEvent对象
将WatcherEvent对象转换成WatchedEvent对象。
步骤四:回调Watcher
通过调用EventThread的queueEvent()方法,将WatchedEvent对象交给EventThread线程来回调Watcher。所以服务端的Watcher事件通知,最终会交给EventThread线程来处理。
public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;protected final ZKWatchManager watchManager;private final ZKClientConfig clientConfig;...
}public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;...class SendThread extends ZooKeeperThread {...@Overridepublic void run() {clientCnxnSocket.introduce(this, sessionId, outgoingQueue);...while (state.isAlive()) {...//将outgoingQueue里的请求发送出去 + 处理接收到的响应clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);...}}void readResponse(ByteBuffer incomingBuffer) {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");...//处理事务回调if (replyHdr.getXid() == -1) {WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");if (chrootPath != null) {String serverPath = event.getPath();if (serverPath.compareTo(chrootPath) == 0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else...}WatchedEvent we = new WatchedEvent(event);eventThread.queueEvent( we );return;}...finishPacket(packet);}...}
}public class ClientCnxnSocketNIO extends ClientCnxnSocket {...@Overridevoid doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {...doIO(pendingQueue, cnxn);...}void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {SocketChannel sock = (SocketChannel) sockKey.channel();...//处理接收响应if (sockKey.isReadable()) {...sendThread.readResponse(incomingBuffer);...}//处理发送请求if (sockKey.isWritable()) {Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());...p.createBB();sock.write(p.bb);outgoingQueue.removeFirstOccurrence(p);pendingQueue.add(p);...}...}
}
EventThread线程是zk客户端专门用来处理服务端事件通知的线程。
EventThread处理事件通知的步骤如下:
步骤一:EventThread的queueEvent()方法首先会根据WatchedEvent对象,从ZKWatchManager中取出所有注册过的客户端Watcher。
步骤二:然后从ZKWatchManager的管理中删除这些Watcher。这也说明客户端的Watcher机制是一次性的,触发后就会失效。
步骤三:接着将所有获取到的Watcher放入waitingEvents队列中。
步骤四:最后EventThread线程的run()方法,通过循环的方式,每次都会从waitingEvents队列中取出一个Watcher进行串行同步处理。也就是调用EventThread线程的processEvent()方法来最终执行实现了Watcher接口的process()方法,从而实现回调处理。
public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;private final ClientWatchManager watcher;...class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();//通过以下这两个变量来实现waitingEvents为空时,加入的Watcher要马上执行,而不用等待run()方法private volatile boolean wasKilled = false;private volatile boolean isRunning = false;... public void queueEvent(WatchedEvent event) {queueEvent(event, null);}private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {if (event.getType() == EventType.None && sessionState == event.getState()) {return;}sessionState = event.getState();final Set<Watcher> watchers;if (materializedWatchers == null) {//对WatchedEvent对象进行处理,从ZKWatchManager的管理中删除这些Watcherwatchers = watcher.materialize(event.getState(), event.getType(), event.getPath());} else {watchers = new HashSet<Watcher>();watchers.addAll(materializedWatchers);}WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);//将获取到的所有Watcher放入waitingEvents队列waitingEvents.add(pair);}...public void run() {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } }}}private void processEvent(Object event) {if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {watcher.process(pair.event);}}...}...}
}public class ZooKeeper implements AutoCloseable {...static class ZKWatchManager implements ClientWatchManager {private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();...public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {Set<Watcher> result = new HashSet<Watcher>();switch (type) {...case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;...}return result;}final private void addTo(Set<Watcher> from, Set<Watcher> to) {if (from != null) {to.addAll(from);}}...}...
}
总结zk的Watcher机制处理过程:
一.zk是通过在客户端和服务端创建观察者信息列表来实现Watcher机制的。
二.客户端调用getData()、getChildren()、exist()等方法时,会将Watcher事件放到本地的ZKWatchManager中进行管理。
三.服务端在接收到客户端的请求后首先判断是否需要注册Watcher,若是则将ServerCnxn对象当成Watcher事件放入DataTree的WatchManager中。
四.服务端触发Watcher事件时,会根据节点路径从WatchManager中取出对应的Watcher,然后发送通知类型的响应给客户端。
五.客户端在接收到通知类型的响应后,首先通过SendThread线程提取出WatchedEvent对象。然后将WatchedEvent对象交给EventThread线程来回调Watcher。也就是查询本地的ZKWatchManager获得对应的Watcher事件,删除ZKWatchManager的Watcher并将Watcher放入waitingEvents队列。后续EventThread线程便会在其run()方法中串行出队waitingEvents,执行Watcher的process()回调。
客户端的Watcher管理器是ZKWatchManager。
服务端的Watcher管理器是WatchManager。
以上的处理设计实现了一个分布式环境下的观察者模式,通过将客户端和服务端处理Watcher事件时所需要的信息分别保存在两端,减少了彼此通信的内容,大大提升了服务的处理性能。
(7)利用Watcher实现发布订阅
一.发布订阅系统一般有推模式和拉模式
推模式是指服务端主动将数据更新发送给所有订阅的客户端,拉模式是指客户端主动发起请求来获取最新数据(定时轮询拉取)。
二.zk采用了推拉相结合来实现发布和订阅功能
首先客户端需要向服务端注册自己关注的节点(添加Watcher事件)。一旦该节点发生变更,服务端就会向客户端发送Watcher事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。
如果将配置信息放到zk上进行集中管理,那么应用启动时需要主动到zk服务端获取配置信息,然后在指定节点上注册一个Watcher监听。接着只要配置信息发生变更,zk服务端就会实时通知所有订阅的应用。从而让应用能实时获取到所订阅的配置信息节点已发生变更了的消息。
注意:原生zk客户端可以通过getData()、exists()、getChildren()三个方法,向zk服务端注册Watcher监听。而且注册到Watcher监听具有一次性,所以zk客户端获得服务端的节点变更通知后需要再次注册Watcher。
三.使用zk来实现发布订阅功能总结
步骤一:将配置信息存储到zk的节点上。
步骤二:应用启动时先从zk节点上获取配置信息,然后再向该zk节点注册一个数据变更的Watcher监听。一旦该zk节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到zk服务端发过来的数据变更通知后重新获取最新数据。
(8)Watcher具有的特性
一.一次性
无论是客户端还是服务端,一旦Watcher被触发或者回调,zk都会将其移除,所以使用zk的Watcher时需要反复注册。这样的设计能够有效减轻服务端的压力。否则,如果一个Watcher注册后一直有效,那么频繁更新的节点就会频繁发送通知给客户端,这样就会影响网络性能和服务端性能。
二.客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序。注意不要因一个Watcher的处理逻辑而影响整个客户端的Watcher回调。
三.轻量
WatchedEvent是Watcher机制的最小通知单元,WatchedEvent只包含三部分内容:通知状态、事件类型和节点路径。所以Watcher通知非常简单,只告诉客户端发生的事件,不包含具体内容。所以原始数据和变更后的数据无法从WatchedEvent中获取,需要客户端主动重新去获取数据。
客户端向服务端注册Watcher时:不会把客户端真实的Watcher对象传递到服务端,只会在客户端请求中使用boolean属性来标记Watcher对象,服务端也只会保存当前连接的ServerCnxn对象。
这种轻量的Watcher设计机制,在网络开销和服务端内存开销上都是很低的。
(9)总结
有一个问题:当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?
答案是否定的,通过对zk内部实现机制的解析可以知道:Watcher事件的触发机制取决于会话的连接状态和客户端注册事件的类型,当客户端会话状态或数据节点发生改变时,都会触发对应的Watcher事件。Watcher具有一次性,曾经的监控要重新监控。
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
前面介绍完了数据模型、Watcher监控机制,并实现了在分布式环境中经常用到的分布式锁、配置管理等功能,这些功能的本质都在于操作数据节点。如果作为分布式锁或配置项的数据节点被错误删除或修改,那么对整个分布式系统有很大的影响,甚至会造成严重的生产事故。所以zk提供了一个很好的解决方案,那就是ACL权限控制。
(1)ACL的使用(scheme:id:permission)
如何使用zk的ACL机制来实现客户端对数据节点的访问控制。一个ACL权限设置通常可以分为3部分,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission),最终组成的一条ACL请求信息格式为"scheme:id:permission"。
一.权限模式:Scheme
权限模式就是用来设置zk服务器进行权限验证的方式。zk的权限验证方式大体分为两种类型:一种是范围验证,另外一种是口令验证。但具体来分,则有4种权限模式:
模式一:所谓的范围验证就是zk可针对一个IP或一段IP授予某种权限
比如通过"ip:192.168.0.11"让某机器对服务器的一数据节点具有写入权限,或者也可以通过"ip:192.168.0.11/22"给一段IP地址的机器赋权。
模式二:另一种权限模式就是口令验证,也可以理解为用户名密码的方式
在zk中这种验证方式是Digest认证。即在向客户端传送"username:password"这种形式的权限表示符时,服务端会对密码部分使用SHA-1和BASE64算法进行加密以保证安全。
模式三:还有一种权限模式Super可以认为是一种特殊的Digest认证
具有Super权限的客户端可以对zk上的任意数据节点进行任意操作。
模式四:最后一种授权模式是world模式
其实这种授权模式对应于系统中的所有用户,本质上起不到任何作用,设置了world权限模式系统中的所有用户操作都可以不进行权限验证。
下面代码给出了Digest模式下客户端的调用方式:
create /digest_node1
setAcl /digest_node1 digest:用户名:base64格式密码:rwadc
getAcl /digest_node1
addauth digest user:passwd
二.授权对象:ID
所谓的授权对象就是要把权限赋予谁,对应于4种不同的权限模式来说:
如果使用IP方式,那么授权对象可以是一个IP地址或IP地址段
如果使用Digest或Super方式,那么授权对象对应于一个用户名
如果使用World模式,那么就是授权系统中所有的用户
三.权限信息:Permission
权限就是指可以在数据节点上执行的操作种类,zk定义好的权限有5种:
数据节点(Create)创建权限,可以在该数据节点下创建子节点
数据节点(Wirte)更新权限,可以更新该数据节点
数据节点(Read)读取权限,可以读取该数据节点内容以及子节点信息
数据节点(Delete)删除权限,可以删除该数据节点的子节点
数据节点(Admin)管理者权限,可以对该数据节点体进行ACL权限设置
需要注意的是:每个节点都有维护自身的ACL权限数据,即使是该节点的子节点也有自己的ACL权限而不是直接继承其父节点权限。所以如果客户端只配置"/Config"节点的读取权限,该客户端是没有其子节点的"/Config/dataBase"的读取权限的。
(2)实现自己的权限控制
虽然zk自身的权限控制机制已经做得很细,但是zk还提供了一种权限扩展机制来让用户实现自己的权限控制方式。官方文档对这种机制的定义是"Pluggable ZooKeeper Authenication",意思是可插拔的授权机制,从名称上可看出它的灵活性。
那么这种机制是如何实现的呢?首先,要想实现自定义的权限控制机制,最核心的一点是实现zk提供的权限控制器接口AuthenticationProvider。然后,实现了自定义权限后,如何才能让服务端使用自定义的权限验证方式呢?接下来就需要将自定义的权限控制注册到服务端,而注册的方式有两种:第一种是通过设置系统属性来注册自定义的权限控制器,第二种是在配置文件zoo.cfg中进行配置。
//第一种注册方式
-Dzookeeper.authProvider.x=CustomAuthenticationProvider//第二种方式
authProvider.x=CustomAuthenticationProvider
(3)ACL内部实现原理之客户端处理过程
下面深入到底层介绍zk是如何实现ACL权限控制机制的,先看一下客户端是如何操作的,以节点授权addAuth接口为例。
步骤一:客户端会通过ClientCnxn的addAuthInfo()方法,向服务端发送请求。
步骤二:addAuthInfo()方法会将scheme和auth封装成AuthPacket对象,并封装一个表示权限操作请求的RequestHeader对象。
步骤三:接着AuthPacket对象和RequestHeader对象会被封装到Packet对象中,最后会将该Packet对象添加到outgoingQueue队列,发送给服务端。
public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;protected final HostProvider hostProvider;protected final ZKWatchManager watchManager;private final ZKClientConfig clientConfig;...public void addAuthInfo(String scheme, byte auth[]) {cnxn.addAuthInfo(scheme, auth);}...
}public class ClientCnxn {private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();volatile States state = States.NOT_CONNECTED;...public void addAuthInfo(String scheme, byte auth[]) {if (!state.isAlive()) {return;}authInfo.add(new AuthData(scheme, auth));queuePacket(new RequestHeader(-4, OpCode.auth), null,new AuthPacket(0, scheme, auth), null, null, null, null, null, null);}public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {Packet packet = null;packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;synchronized (state) {if (!state.isAlive() || closing) {conLossPacket(packet);} else {if (h.getType() == OpCode.closeSession) {closing = true;}outgoingQueue.add(packet);}}sendThread.getClientCnxnSocket().packetAdded();return packet;}...
}
ACL权限控制机制的客户端实现相对简单,只是封装请求类型为权限请求,方便服务器识别处理,而发送到服务器的信息包括前面提到的权限校验信息。
(4)ACL内部实现原理之服务端实现过程
相比于客户端的处理过程,服务器端对ACL内部实现就比较复杂。
一.当客户端发出的节点ACL授权请求到达服务端后
步骤一:首先调用NIOServerCnxn.readRequest()方法作为服务端处理的入口,而readRequest()方法其内部只是调用processPacket()方法。
public class NIOServerCnxn extends ServerCnxn {private final ZooKeeperServer zkServer;private ByteBuffer incomingBuffer = lenBuffer;...private void readRequest() throws IOException {zkServer.processPacket(this, incomingBuffer);}...
}
步骤二:然后在ZooKeeperServer的processPacket()方法的内部,首先反序列化客户端的请求信息并封装到AuthPacket对象中,之后通过ProviderRegistry的getProvider()方法根据scheme判断具体实现类。
以Digest模式为例,该实现类是DigestAuthenticationProvider,此时就会调用handleAuthentication方法进行权限验证。如果返回KeeperException.Code.OK则表示该请求已经通过了权限验证,如果返回的状态是其他或者抛出异常则表示权限验证失败。
所以权限认证的最终实现方法是handleAuthentication()方法,该方法的工作是解析客户端传递的权限验证类型,并通过addAuthInfo()方法将权限信息添加到authInfo集合中。
其中addAuthInfo()方法的作用是将解析到的权限信息存储到zk服务端的内存中,这些权限信息在整个会话存活期间会一直保存在zk服务端。如果会话关闭,那么权限信息就会被删除,这个特性类似于数据节点中的临时节点。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {...public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");incomingBuffer = incomingBuffer.slice();if (h.getType() == OpCode.auth) {LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());AuthPacket authPacket = new AuthPacket();ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);String scheme = authPacket.getScheme();AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);Code authReturn = KeeperException.Code.AUTHFAILED;if (ap != null) {try {authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());} catch(RuntimeException e) {authReturn = KeeperException.Code.AUTHFAILED;}}if (authReturn == KeeperException.Code.OK) {LOG.info("auth success " + cnxn.getRemoteSocketAddress());ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());cnxn.sendResponse(rh, null, null);} else {...}return;} else {...}cnxn.incrOutstandingRequests(h);}...
}public class DigestAuthenticationProvider implements AuthenticationProvider {...public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {String id = new String(authData);try {String digest = generateDigest(id);if (digest.equals(superDigest)) {cnxn.addAuthInfo(new Id("super", ""));}cnxn.addAuthInfo(new Id(getScheme(), digest));return KeeperException.Code.OK;} catch (NoSuchAlgorithmException e) {LOG.error("Missing algorithm",e);}return KeeperException.Code.AUTHFAILED;}...
}public abstract class ServerCnxn implements Stats, Watcher {protected ArrayList<Id> authInfo = new ArrayList<Id>();...public void addAuthInfo(Id id) {//将权限信息添加到authInfo集合if (authInfo.contains(id) == false) {authInfo.add(id);}}
}
二.当服务端已将客户端ACL授权请求解析并将对应的会话权限信息存储好后
服务端处理一次请求时,是如何进行权限验证的?
首先通过PrepRequestProcessor中的checkAcl()方法检查对应的请求权限。如果该节点没有任何权限设置则直接返回,如果该节点有权限设置则循环遍历节点的权限信息进行检查,如果具有相应的权限则直接返回表明权限认证成功,否则直接抛出NoAuthException异常表明权限认证失败。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {...protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {...case OpCode.delete:...checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);...case OpCode.setData:...checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);...... }static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm, List<Id> ids) {...for (ACL a : acl) {if (authId.getScheme().equals(id.getScheme()) && ap.matches(authId.getId(), id.getId())) {return;}}throw new KeeperException.NoAuthException();}...
}
(5)ACL权限总结
客户端发送ACL权限请求时的处理:首先会封装请求类型,然后将权限信息封装到request中,最后发送request给服务端。
服务器对ACL权限请求的授权处理:首先分析请求类型是否是权限相关操作,然后根据不同的权限模式调用不同的实现类验证权限,最后存储权限信息。
注意:会话的授权信息存储在服务端内存。如果客户端会话关闭,授权信息会被删除。下次连接服务器后,需要重新调用授权接口进行授权;
zk作为分布式系统协调框架,往往在一个分布式系统下起到关键的作用。尤其是在分布式锁、配置管理等应用场景中。如果因错误操作对重要数据节点进行变更或删除,对整个系统影响很大,甚至可能会导致整个分布式服务不可用,所以设计使用zk时一定要考虑对关键节点添加权限控制。
问题:如果一个客户端对服务器上的一个节点设置了只有它自己才能操作的权限,那么等该客户端下线后,对其创建的节点要想进行修改应该怎么做?
可以通过"super模式"删除该节点或变更该节点的权限验证方式,正因为"super模式"有如此大的权限,在平时使用时应更加谨慎。
相关文章:
zk源码—1.数据节点与Watcher机制及权限二
大纲 1.ZooKeeper的数据模型、节点类型与应用 (1)数据模型之树形结构 (2)节点类型与特性(持久 临时 顺序 ) (3)节点的状态结构(各种zxid 各种version) (4)节点的版本(version cversion aversion) (5)使用ZooKeeper实现锁(悲观锁 乐观锁) 2.发布订阅模式࿱…...
交换机和集线器的区别
集线器(Hub)—— 大喇叭广播站 工作原理: 集线器像村里的“大喇叭”,收到任何消息都会广播给所有人。 比如A对B说“你好”,全村人(C、D、E)都能听到,但只有B会回…...
微服务系统记录
记录下曾经工作涉及到微服务的相关知识。 1. 架构设计与服务划分 关键内容 领域驱动设计(DDD): 利用领域模型和限界上下文(Bounded Context)拆分业务,明确服务边界。通过事件风暴(Event Storm…...
同花顺客户端公司财报抓取分析
目标客户端下载地址:https://ft.51ifind.com/index.php?c=index&a=download PC版本 主要难点在登陆,获取token中的 jgbsessid (每次重新登录这个字段都会立即失效,且有效期应该是15天的) 抓取jgbsessid 主要通过安装mitmproxy 使用 mitmdump + 下边的脚本实现监听接口…...
二叉树与红黑树核心知识点及面试重点
二叉树与红黑树核心知识点及面试重点 一、二叉树 (Binary Tree) 1. 基础概念 定义:每个节点最多有两个子节点(左子节点和右子节点) 术语: 根节点:最顶层的节点 叶子节点:没有子节点的节点 深度…...
Rocket-JWT鉴权
目录 一、概述 二、相关依赖 三、环境准备 3.1 创建项目 3.2 读取私钥信息 3.3 token数据负载 3.4 生成token 四、Web鉴权 4.1 验证载体 4.2 接收请求 五、总结 Welcome to Code Blocks blog 本篇文章主要介绍了 [Rocket-JWT鉴权] ❤博主广交技术好友,喜…...
2025 年网络安全终极指南
我们生活在一个科技已成为日常生活不可分割的一部分的时代。对数字世界的依赖性日益增强的也带来了更大的网络风险。 网络安全并不是IT专家的专属特权,而是所有用户的共同责任。通过简单的行动,我们可以保护我们的数据、隐私和财务,降低成为…...
横扫SQL面试——PV、UV问题
📊 横扫SQL面试:UV/PV问题 🌟 什么是UV/PV? 在数据领域,UV(Unique Visitor,独立访客) 和 PV(Page View,页面访问量) 是最基础也最重要的指标&…...
ctf-show-杂项签到题
下载文件,解压需要密码,用010打开没看出什么 然后用Advanced Archive Password Recovery暴力破解,发现没用 怀疑是伪解密,解压出来发现加密受损用随波逐流修复加密文件 打开修复的加密文件直接得flag flag:flag{79d…...
对解释器模式的理解
对解释器模式的理解 一、场景1、题目【[来源](https://kamacoder.com/problempage.php?pid1096)】1.1 题目描述1.2 输入描述1.3 输出描述1.4 输入示例1.5 输出示例 二、不采用解释器模式1、代码2、“缺点” 三、采用解释器模式1、代码2、“优点” 四、思考1、解释器模式的意义…...
高频面试题(含笔试高频算法整理)基本总结回顾64
干货分享,感谢您的阅读! (暂存篇---后续会删除,完整版和持续更新见高频面试题基本总结回顾(含笔试高频算法整理)) 备注:引用请标注出处,同时存在的问题请在相关博客留言…...
python入门之从安装python及vscode开始
本篇将解决三个问题: 1. 如何下载及安装官方python? 2. 如何下载及安装vscode? 3. 如何配置vscode的python环境? 一、python下载及安装 1.搜索python,找到官网并打开 2.找到download,按需选择版本下载 …...
OpenGL学习笔记(模型材质、光照贴图)
目录 光照与材质光照贴图漫反射贴图采样镜面光贴图 GitHub主页:https://github.com/sdpyy OpenGL学习仓库:https://github.com/sdpyy1/CppLearn/tree/main/OpenGLtree/main/OpenGL):https://github.com/sdpyy1/CppLearn/tree/main/OpenGL 光照与材质 在现实世界里&…...
视觉_transform
visual_transform 图像分块 (Patch Embedding) 假设输入图像为 x ∈ R ∗ H ∗ ∗ W ∗ ∗ C ∗ x∈R^{*H**W**C*} x∈R∗H∗∗W∗∗C∗ C 是图像的通道数(例如,RGB图像的 C3) 将图像分割成N个大小为P*CP的patch,每个patch的大…...
Redis的安装及通用命令
二. Redis 的安装及通用命令 1. Ubuntu 安装 Redis (1) 切换到 root 用户: su root(2) 搜索 Redis 软件包 apt search redis(3) 安装 Redis apt install redis(4) 查看 Redis netstat -anp | grep redis(5) 切换到 Redis 目录下 cd /etc/redis/(6) 修改 Redis 配置文件:…...
Python 实现的运筹优化系统代码详解(0-1规划背包问题)
一、引言 在数学建模与实际决策场景的交织领域中,诸多复杂问题亟待高效且精准的解决方案。0-1 规划作为一种特殊且极为重要的优化方法,宛如一把万能钥匙,能够巧妙开启众多棘手问题的解决之门。它专注于处理决策变量仅能取 0 或 1 这两种极端状…...
护网蓝初面试题
《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…...
音视频学习(三十二):VP8和VP9
VP8 简介 全称:Video Processing 8发布者:原 On2 Technologies(2010 被 Google 收购)定位:开源视频压缩标准,主要竞争对手是 H.264应用: WebRTC 视频通信HTML5 <video> 标签(…...
美国mlb与韩国mlb的关系·棒球9号位
MLB(Major League Baseball,美国职业棒球大联盟)作为全球最高水平的职业棒球联赛,与韩国市场流行的“MLB”时尚品牌之间存在着授权合作关系,但两者在业务范畴和品牌定位上存在显著差异。 一、品牌授权背景:…...
免费在线PUA测试工具:识别情感操控,守护情感健康
免费在线PUA测试工具:识别情感操控,守护情感健康 你是否曾经在感情中感到困惑、不安,甚至怀疑自己?今天为大家推荐一个专业的PUA测试工具,帮助你识别是否正在经历情感操控。 测试工具链接:PUA测试工具 什么…...
nginx中的try_files指令
try_files 是 Nginx 中一个非常有用的指令,用于按顺序检查文件是否存在,并返回第一个找到的文件。如果所有指定的文件都不存在,则执行回退逻辑,如重定向到一个指定的 URI 或返回一个错误代码。 作用 文件查找:按顺序检…...
[特殊字符] 驱动开发硬核特训 · Day 4
主题:从硬件总线到驱动控制 —— I2C 协议与传感器驱动开发全解析 I2C(Inter-Integrated Circuit)总线是一种广泛用于嵌入式设备的串行通信协议,因其低成本、简单布线和多从设备支持,成为连接各种传感器(温…...
Python 实现玻璃期货数据处理、入库与分析:从代码到应用
Python 实现期货数据处理与分析:从代码到应用 引言 在金融市场中,期货数据的处理和分析对于投资者和分析师来说至关重要。Python 凭借其丰富的库和简洁的语法,成为了处理和分析期货数据的强大工具。本文将详细解读一段用于处理期货持仓和行…...
神经网络之损失函数
引言:损失函数 (Loss Function)是机器学习和深度学习中非常重要的一个概念。用于衡量模型的预测值与真实值之间的差异,从而指导模型优化其参数以最小化这种差异。 一、损失函数作用 量化误差:损失函数是将预测值和真实…...
在Ubuntu内网环境中为Gogs配置HTTPS访问(通过Apache反向代理使用IP地址)
一、准备工作 确保已安装Gogs并运行在HTTP模式(默认端口3000) 确认服务器内网IP地址(如192.168.1.100) 二、安装Apache和必要模块 sudo apt update sudo apt install apache2 -y sudo a2enmod ssl proxy proxy_http rewrite headers 三、创建SSL证书 1. 创建证书存储目录…...
printf
printf() 是 C 和 C 标准库中的一个输出函数,位于 <cstdio> 头文件中。下面为你详细介绍它的相关知识点。 1. 基本使用 printf() 函数的作用是按照指定格式将数据输出到标准输出设备(通常是控制台)。其基本语法如下: cpp …...
Leetcode 311 Sparse Matrix Multiplication 稀疏矩阵相乘
Problem Given two sparse matrices A and B, return the result of AB. You may assume that A’s column number is equal to B’s row number. Example: A [[ 1, 0, 0],[-1, 0, 3] ]B [[ 7, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 1 ] ]| 1 0 0 | | 7 0 0 | | 7 0 0 | AB …...
mysql和sqlite关于data数据的识别问题
<input type"date" name"birthday" value""> # 表单传入的日期 birthday request.form.get(birthday) # 获取日期 birthday Column(birthday, Date, comment出生日期, nullableTrue) # 数据库的数据字段模型 birthday_str request…...
2024 天梯赛——工业园区建设题解
思路 将点 i i i 视为固定点, 点 j j j 视为灵活点,其中 s i 1 s_{i} 1 si1, s j 0 s_{j} 0 sj0。维护四个队列,其中 q 0 q_{0} q0 和 q 1 q_{1} q1 分别维护还没有被选用的固定点 和 灵活点, Q 0 Q…...
亚马逊AI新功能上线:5大亮点解锁精准消费预测
在人工智能技术不断重塑跨境电商生态之际,全球电商巨头亚马逊(Amazon)再次迈出关键一步。近日,亚马逊正式对其卖家中心推出一系列基于AI的新功能,聚焦于消费数据预测、用户行为洞察、库存智能管理与个性化营销服务等方…...
opus+ffmpeg+c++实现录音
说明: opusffmpegc实现录音 效果图: step1:C:\Users\wangrusheng\source\repos\WindowsProject1\WindowsProject1\WindowsProject1.cpp // WindowsProject1.cpp : 定义应用程序的入口点。 //#include "framework.h" #include "Windows…...
ComfyUI的本地私有化部署使用Stable Diffusion文生图
什么是ComfyUI ? ComfyUI是一个基于节点流程的Stable Diffusion操作界面。以下是关于它的详细介绍: 特点与优势 高度可定制:提供丰富的节点类型,涵盖文本处理、图像处理、模型推理等功能。用户可根据需求自由组合节点࿰…...
【学习笔记17】Windows环境下安装RabbitMQ
一. 下载RabbitMQ( 需要按照 Erlang/OTP 环境的版本依赖来下载) (1) 先去 RabbitMQ 官网,查看 RabbitMQ 需要的 Erlang 支持:https://www.rabbitmq.com/ 进入官网,在 Docs -> Install and Upgrade -> Erlang V…...
【LeetCode 热题100】55:跳跃游戏(详细解析)(Go语言版)
🚀 LeetCode 热题 55:跳跃游戏(Jump Game)完整解析 📌 题目描述 给定一个非负整数数组 nums,你最初位于数组的第一个下标。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一…...
OpenCV轮廓检测全面解析:从基础到高级应用
一、概述 轮廓检测是计算机视觉中的基础技术,用于识别和提取图像中物体的边界。与边缘检测不同,轮廓检测更关注将边缘像素连接成有意义的整体,形成封闭的边界。 轮廓检测的核心价值 - 物体识别:通过轮廓可以识别图像中的独立物体…...
微服务入门:Spring Boot 初学者指南
大家好,这里是架构资源栈!点击上方关注,添加“星标”,一起学习大厂前沿架构! 微服务因其灵活性、可扩展性和易于维护性而成为现代软件架构的重要组成部分。在本博客中,我们将探讨如何使用 Spring Boot 构建…...
Windows环境下开发pyspark程序
Windows环境下开发pyspark程序 一、环境准备 1.1. Anaconda/Miniconda(Python环境) 如果不怕包的版本管理混乱,可以直接使用已有的Python环境。 需要安装anaconda/miniconda(python3.8版本以上):Anaconda…...
嵌入式学习笔记——大小端及跳转到绝对地址
大小端以及跳转到绝对地址 0x100000 嵌入式编程中的大小端详解一、大端模式与小端模式二、判断当前系统是大端还是小端方法一:指针强制类型转换方法二:使用联合体(union) 三、结构体位段和大小端的影响四、大小端影响内存的 memc…...
eprime相嵌模式实验设计
一、含义与模型结构 该模式的实验设计至少 由两个存储不同实验材料及 属性的List和一个核心实验 过程CEP组成。子list1和 list2相嵌在父List中,CEP 可以调用List中的材料,也 可以调用list1和list2中的材 料。 二、相嵌模式的应用 应用于解决“多重随…...
编译uboot的Makefile编写
make ARCHarm CROSS_COMPILEarm-linux-gnueabihf- distcleanmake ARCHarm CROSS_COMPILEarm-linux-gnueabihf- mx6ull_14x14_ddr512_emmc_defconfigmake V1 ARCHarm CROSS_COMPILEarm-linux-gnueabihf- -j12 这三条命令中 ARCHarm 设置目标为 arm 架构, CROSS_COMP…...
Go语言常用算法实现
以下是Go语言中常用的算法实现,涵盖排序、搜索、数据结构操作等核心算法。 一、排序算法 1. 快速排序 func QuickSort(arr []int) []int {if len(arr) < 1 {return arr}pivot : arr[0]var left, right []intfor i : 1; i < len(arr); i {if arr[i] < pi…...
Windows上使用NSSM注册定时服务
适用和不适用场景 适用场景 持续运行 的脚本或程序(如 Laravel 的 schedule:run 每分钟检查任务)后台常驻 的任务或服务(如监听服务、实时同步) 不适用场景 低频次任务(如每日/每周备份) NSSM 常驻内存…...
【Gorm】模型定义
intro package mainimport ("gorm.io/gorm""gorm.io/driver/sqlite" // GORM 使用该驱动来连接和操作 SQLite 数据库。 )type Product struct {gorm.Model // 嵌入GORM 内置的模型结构,包含 ID、CreatedAt、UpdatedAt、DeletedAt 四个字段Cod…...
程序化广告行业(65/89):AdX/SSP系统深度剖析与实战要点
程序化广告行业(65/89):AdX/SSP系统深度剖析与实战要点 大家好!一直以来,我都对程序化广告领域充满热情,这个领域发展迅速且不断涌现新的技术和模式。之前我们探讨了程序化广告的一些基础内容,…...
算法刷题记录——LeetCode篇(2.7) [第161~170题](持续更新)
更新时间:2025-04-06 算法题解目录汇总:算法刷题记录——题解目录汇总技术博客总目录:计算机技术系列博客——目录页 优先整理热门100及面试150,不定期持续更新,欢迎关注! 169. 多数元素 给定一个大小为…...
conda安装指定版本python环境
1. 创建指定 Python 版本的环境 使用以下命令创建环境,并将 <env_name> 替换为你的环境名称,<python_version> 替换为具体的 Python 版本(如 3.8, 3.9 等) conda create -n <env_name> python<python_vers…...
PH热榜 | 2025-04-05
1. Comp AI 标语:开源的 Vanta 和 Drata 替代方案 介绍:这款开源的 Drata 和 Vanta 替代方案,能够帮助你在几周内,轻松满足 SOC 2、ISO 27001 和 GDPR 等合规框架的要求,而不是像往常那样拖延数月。 产品网站&#…...
C++之红黑树
目录 一、红黑树的概念 1.1、红黑树的规则 1.2、红黑树如何确保最长路径不超过最短路径的二倍 1.3、红黑树的效率 二、红黑树的实现 2.1、红黑树的结构 2.2、红黑树的插入 2.2.1、红黑树插入一个值的大概过程 2.2.2、情况一:变色 2.2.3、情…...
各个语言对不同数据结构的叫法
一、基础数据结构对比 数组(Array) C/C:固定大小数组(int arr),动态数组通过vector(C)实现 Java:固定数组(int[]),动态数组…...
蓝桥杯 web 水果拼盘 (css3)
做题步骤: 看结构:html 、css 、f12 分析: f12 查看元素,你会发现水果的高度刚好和拼盘的高度一样,每一种水果的盘子刚好把页面填满了,所以咱们就只要让元素竖着排列,加上是竖着,排不下的换行…...