Netty源码—8.编解码原理二
大纲
1.读数据入口
2.拆包原理
3.ByteToMessageDecoder解码步骤
4.解码器抽象的解码过程总结
5.Netty里常见的开箱即用的解码器
6.writeAndFlush()方法的大体步骤
7.MessageToByteEncoder的编码步骤
8.unsafe.write()写队列
9.unsafe.flush()刷新写队列
10.如何把对象变成字节流写到unsafe底层
6.writeAndFlush()方法的大体步骤
(1)writeAndFlush()方法的调用入口
(2)writeAndFlush()方法的执行流程
(1)writeAndFlush()方法的调用入口
入口通常是:ctx.channel().writeAndFlush()。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {//网络连接tcp三次握手后,就会建立和封装一个Channel(网络连接的通信管道)//此时这个Channel就可以实现一个激活@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("Channel Active......");ctx.channel().writeAndFlush("test5");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Channel Read: " + (String)msg);String response = "Hello World......";ByteBuf responseByteBuf = Unpooled.buffer();responseByteBuf.writeBytes(response.getBytes());ctx.channel().writeAndFlush(responseByteBuf);System.out.println("Channel Write: " + response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("Channel Read Complete......");ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
(2)writeAndFlush()方法的执行流程
首先从tail结点开始往前传播。然后逐个调用ChannelHandler的write()方法,直到某个ChannelHandler不再往前传播write事件。接着逐个调用ChannelHandler的flush()方法,直到某个ChannelHandler不再往前传播flush事件。
一般而言,只要每个ChannelHandler都往下传播write事件和flush事件,那么最后都会传播到HeadContext结点的write()方法和flush()方法,然后分别执行unsafe.write()和unsafe.flush()将数据通过底层的unsafe写到JDK底层的Channel。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final DefaultChannelPipeline pipeline;...@Overridepublic ChannelFuture writeAndFlush(Object msg) {return pipeline.writeAndFlush(msg);}...
}public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;private final Channel channel;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelFuture writeAndFlush(Object msg) {//从TailContext开始传播//但TailContext没有重写writeAndFlush()方法//所以会调用AbstractChannelHandlerContext的writeAndFlush()方法return tail.writeAndFlush(msg);}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {volatile AbstractChannelHandlerContext next;volatile AbstractChannelHandlerContext prev;...@Overridepublic ChannelFuture writeAndFlush(Object msg) {return writeAndFlush(msg, newPromise());}@Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {if (msg == null) throw new NullPointerException("msg");if (!validatePromise(promise, true)) {ReferenceCountUtil.release(msg);return promise;}write(msg, true, promise);return promise;}private void write(Object msg, boolean flush, ChannelPromise promise) {//反向遍历链表进行查找AbstractChannelHandlerContext next = findContextOutbound();final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();//最终都会由Reactor线程处理Channel的数据读写if (executor.inEventLoop()) {if (flush) {//调用结点的invokeWriteAndFlush()方法next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);} else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}}private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {//逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传//即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播invokeWrite0(msg, promise);//逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传//即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播invokeFlush0();} else {writeAndFlush(msg, promise);}}private void invokeWrite0(Object msg, ChannelPromise promise) {try {//逐个调用,最终回到HeadContext的write()方法((ChannelOutboundHandler) handler()).write(this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}private void invokeFlush0() {try {//逐个调用,最终回到HeadContext的flush()方法((ChannelOutboundHandler) handler()).flush(this);} catch (Throwable t) {notifyHandlerException(t);}}...
}public class DefaultChannelPipeline implements ChannelPipeline {...final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}...@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();}}...
}//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Overridepublic void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);}//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.close(promise);}//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}
7.MessageToByteEncoder的编码步骤
(1)编码的具体步骤
(2)编码步骤的总结
(3)子类实现编码的例子
(1)编码的具体步骤
步骤一:判断对象
判断当前ChannelHandler结点能否处理写入传入的Java对象。如果能处理,则往下执行,否则直接传递给下一个ChannelHandler结点进行处理。
步骤二:分配内存
给新创建的ByteBuf对象分配一块内存空间,这块内存空间将会存放由Java对象转换来的字节数据。
步骤三:调用encode
子类会实现MessageToByteEncoder的抽象方法encode()来定义自己的编码协议,子类的encode()方法会将Java对象转换来的字节数据写入ByteBuf。
步骤四:释放对象
由于传入的Java对象已经转换成ByteBuf字节流了,所以传入的Java对象已不再使用可进行释放。
步骤五:传播数据
当子类的encode()方法将数据写入了ByteBuf对象以及释放完对象之后,则会往前一个ChannelHandler结点传播该ByteBuf对象,否则往前一个ChannelHandler结点传播空对象。
步骤六:释放内存
如果出现异常或者ByteBuf没有写入数据或者ByteBuf在pipeline中已处理完,则释放分配给ByteBuf对象的内存。
//ChannelOutboundHandlerAdapter which encodes message in a stream-like fashion from one message to an ByteBuf.
//Example implementation which encodes Integers to a ByteBuf.
//public class IntegerEncoder extends MessageToByteEncoder<Integer> {
// @code @Override
// public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
// out.writeInt(msg);
// }
//}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {private final TypeParameterMatcher matcher;private final boolean preferDirect;protected MessageToByteEncoder() {this(true);}protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {this(outboundMessageType, true);}//Create a new instance which will try to detect the types to match out of the type parameter of the class.//@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.protected MessageToByteEncoder(boolean preferDirect) {matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");this.preferDirect = preferDirect;}//Create a new instance//@param outboundMessageType,The tpye of messages to match//@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {matcher = TypeParameterMatcher.get(outboundMessageType);this.preferDirect = preferDirect;}//Returns true if the given message should be handled. //If false it will be passed to the next ChannelOutboundHandler in the ChannelPipeline.public boolean acceptOutboundMessage(Object msg) throws Exception {return matcher.match(msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ByteBuf buf = null;try {//步骤一:判断当前ChannelHandler能否处理写入的消息if (acceptOutboundMessage(msg)) {@SuppressWarnings("unchecked")I cast = (I) msg;//强制转换//步骤二:给ByteBuf对象分配内存buf = allocateBuffer(ctx, cast, preferDirect);try {//步骤三:调用子类实现的encode()方法encode(ctx, cast, buf);} finally {//步骤四:释放对象//既然自定义的Java对象msg已经转换为ByteBuf对象了,那么该对象已经没有用,需要释放掉了//注意:当传入的msg的类型是ByteBuf类型时,则不需要释放ReferenceCountUtil.release(cast);}//步骤五:如果buf中写入了数据,就把buf传到下一个ChannelHandler结点if (buf.isReadable()) {ctx.write(buf, promise);} else {//步骤六:如果buf中没有写入数据,则释放buf,并将一个空数据传到下一个ChannelHandler结点buf.release();ctx.write(Unpooled.EMPTY_BUFFER, promise);}buf = null;} else {ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable e) {throw new EncoderException(e);} finally {if (buf != null) {buf.release();//当buf在pipeline中处理完了,需要进行释放}}}//Allocate a ByteBuf which will be used as argument of #encode(ChannelHandlerContext, I, ByteBuf).//Sub-classes may override this method to returna ByteBuf with a perfect matching initialCapacity.protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) {return ctx.alloc().ioBuffer();} else {return ctx.alloc().heapBuffer();}}//Encode a message into a ByteBuf. //This method will be called for each written message that can be handled by this encoder.//@param ctx,the ChannelHandlerContext which this MessageToByteEncoder belongs to//@param msg,the message to encode//@param out,the ByteBuf into which the encoded message will be written//@throws Exception,is thrown if an error accourprotected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}
(2)编码步骤的总结
在MessageToByteEncoder的编码过程中,首先会判断当前ChannelHandler能否处理传入的Java对象,如果能处理就对新创建的ByteBuf对象分配一块内存空间。然后由子类的encode()方法实现具体的编码协议,并且把编码后的数据存放到分配给ByteBuf对象的内存空间中。最后把ByteBuf对象往前一个ChannelHandler结点进行传播。
如果在编码的过程中出现异常,那么就把已申请出来的、分配给ByteBuf对象的内存空间进行释放。
如果传入的Java对象就是一个ByteBuf对象,那么Netty在自定义编码结束后,会自动帮忙释放该对象,不需要在子类中对该对象进行释放。
(3)子类实现编码的例子
下面的Encoder便实现了将自定义的Response对象转换为字节流并写到Socket底层的效果。
public class Encoder extends MessageToByteEncoder<Response> {protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {out.writeByte(response.getVersion());out.writeInt(4+ response.getData().length);out.writeBytes(response.getData()); }
}
8.unsafe.write()将数据添加到写缓冲区
(1)unsafe.write()的入口
(2)unsafe.write()的主要逻辑
(3)写缓冲区(写队列)的数据结构
(1)unsafe.write()的入口
不管是ctx.channel().write()还是ctx.write(),最终都会来到pipeline中的head结点。
public class DefaultChannelPipeline implements ChannelPipeline {...final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}...@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();}}...
}
(2)unsafe.write()的主要逻辑
unsafe.write()方法将数据添加到写缓冲区(写队列)的主要逻辑如下。
一.Direct化ByteBuf对象
如果传进来的ByteBuf对象不是堆外内存,那么就直接转换成堆外内存,并且估算出其大小。
二.添加到写缓冲区
转换成堆外内存的ByteBuf对象首先会被封装成一个Entry对象,然后再将该Entry对象添加到写缓冲区,其中会通过几个指针来标识写缓冲区的状态。
三.设置写状态
如果内存不足,那么是不可以一直往写缓冲区里添加ByteBuf对象的。如果写缓冲区已经大于默认的64KB的大小,则会通过自旋 + CAS设置当前Channel为不可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final DefaultChannelPipeline pipeline;...protected abstract class AbstractUnsafe implements Unsafe {//写缓冲区(写队列)private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);...@Overridepublic final void write(Object msg, ChannelPromise promise) {//确保该方法的调用是在Reactor线程中assertEventLoop();//写缓冲区ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;...int size;try {//转换成堆外内存msg = filterOutboundMessage(msg);//估算出需要写入的ByteBuf的sizesize = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}//将转换成堆外内存的msg添加到写缓冲区outboundBufferoutboundBuffer.addMessage(msg, size, promise);}...} ...
}public abstract class AbstractNioByteChannel extends AbstractNioChannel {...@Overrideprotected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }...
}
(3)写缓冲区(写队列)的数据结构
ChannelOutboundBuffer里的数据结构是一个单向链表,单向链表的每个结点都是一个Entry对象。在一个Entry对象中会包含着待写出的ByteBuf对象及消息回调promise。flushedEntry指针表示第一个被写入Socket缓冲区的结点,unflushedEntry指针表示第一个未被写入Socket缓冲区的结点,tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个结点。
初次调用ChannelOutboundBuffer的addMessage()方法后,flushedEntry指针指向NULL,unflushedEntry指针和tailEntry指针都指向新添加的结点。调用多次ChannelOutboundBuffer的addMessage()方法后,如果flushedEntry指针一直指向NULL,则表示现在还没有结点的ByteBuf对象写出到Socket缓冲区。如果unflushedEntry指针之后有n个结点,则表示当前还有n个结点的ByteBuf对象还没写出到Socket缓冲区。
public final class ChannelOutboundBuffer {private final Channel channel;//Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)//The Entry that is the first in the linked-list structure that was flushedprivate Entry flushedEntry;//The Entry which is the first unflushed in the linked-list structureprivate Entry unflushedEntry;//The Entry which represents the tail of the bufferprivate Entry tailEntry;...ChannelOutboundBuffer(AbstractChannel channel) {this.channel = channel;}//Add given message to this ChannelOutboundBuffer. //The given {@link ChannelPromise} will be notified once the message was written.public void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, total(msg), promise);if (tailEntry == null) {flushedEntry = null;tailEntry = entry;} else {Entry tail = tailEntry;tail.next = entry;tailEntry = entry;}if (unflushedEntry == null) {unflushedEntry = entry;}//increment pending bytes after adding message to the unflushed arrays.incrementPendingOutboundBytes(size, false);}static final class Entry {private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {@Overrideprotected Entry newObject(Handle handle) {return new Entry(handle);}};private final Handle<Entry> handle;Entry next;Object msg;ByteBuffer[] bufs;ByteBuffer buf;ChannelPromise promise;long progress;long total;int pendingSize;int count = -1;boolean cancelled;private Entry(Handle<Entry> handle) {this.handle = handle;}static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {Entry entry = RECYCLER.get();entry.msg = msg;entry.pendingSize = size;entry.total = total;entry.promise = promise;return entry;}...}
}
9.unsafe.flush()刷新写缓冲区的数据
(1)unsafe.flush()的入口
(2)unsafe.flush()的主要逻辑
(1)unsafe.flush()的入口
不管是ctx.channel().flush()还是ctx.flush(),最终都会来到pipeline中的head结点。
public class DefaultChannelPipeline implements ChannelPipeline {...final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}...@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();}}...
}
(2)unsafe.flush()的主要逻辑
步骤一:设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,并统计需要刷新的Entry结点的数量。
步骤二:遍历写缓冲区的Entry结点把对应的ByteBuf对象写到Socket,然后移除Entry结点。如果写缓冲区大小已经小于32KB,则通过自旋 + CAS设置Channel为可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final DefaultChannelPipeline pipeline;...protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); ...@Overridepublic final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {return;}//步骤一outboundBuffer.addFlush();//步骤二flush0();}protected void flush0() {final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;...doWrite(outboundBuffer);...}}//Flush the content of the given buffer to the remote peer.protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
}public abstract class AbstractNioByteChannel extends AbstractNioChannel {...@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {//默认自旋16次,以提高内存使用率和写的吞吐量int writeSpinCount = config().getWriteSpinCount();do {Object msg = in.current();if (msg == null) {//重新注册,不关注OP_WRITE事件clearOpWrite();return;}writeSpinCount -= doWriteInternal(in, msg);} while(writeSpinCount > 0);incompleteWrite(setOpWrite);}private int doWriteInternal(ChannelOutboundBuffer in, Object msg) {...ByteBuf buf = (ByteBuf) msg;if (!buf.isReadable()) {//从写缓冲区(写队列)中移除结点in.remove();return 0;}//把ByteBuf对象写到Socket里final int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount > 0) {in.progress(localFlushedAmount);if (!buf.isReadable()) {//从写缓冲区(写队列)中移除结点in.remove();}return 1;}...}protected final void clearOpWrite() {final SelectionKey key = selectionKey();//Check first if the key is still valid as it may be canceled as part of the deregistration from the EventLoop. if (!key.isValid()) {return;}final int interestOps = key.interestOps();if ((interestOps & SelectionKey.OP_WRITE) != 0) {key.interestOps(interestOps & ~SelectionKey.OP_WRITE);}}@Overrideprotected int doWriteBytes(ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes();return buf.readBytes(javaChannel(), expectedWrittenBytes);}...
}public final class ChannelOutboundBuffer {private final Channel channel;//Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)//The Entry that is the first in the linked-list structure that was flushedprivate Entry flushedEntry;//The Entry which is the first unflushed in the linked-list structureprivate Entry unflushedEntry;//The Entry which represents the tail of the bufferprivate Entry tailEntry;private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;@SuppressWarnings("UnusedDeclaration")private volatile int unwritable;//The number of flushed entries that are not written yetprivate int flushed;...//设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,//并统计需要刷新的Entry结点的数量public void addFlush() {Entry entry = unflushedEntry;if (entry != null) {if (flushedEntry == null) {flushedEntry = entry;}do {flushed ++;//所要flush的结点数entry = entry.next;} while (entry != null);unflushedEntry = null;}}public boolean remove() {//获取当前正在被flush的结点Entry e = flushedEntry;Object msg = e.msg;//获取该结点的回调对象ChannelPromise promise = e.promise;int size = e.pendingSize;//从写缓冲队列中移除结点removeEntry(e);if (!e.cancelled) {ReferenceCountUtil.safeRelease(msg);safeSuccess(promise);//如果写缓冲区大小小于32KB,就通过自旋+CAS设置Channel状态为可写decrementPendingOutboundBytes(size, false, true);}//回收实体e.recycle();return true;}private void removeEntry(Entry e) {if (-- flushed == 0) {flushedEntry = null;if (e == tailEntry) {tailEntry = null;unflushedEntry = null;}} else {flushedEntry = e.next;}}//Return the current message to write or null if nothing was flushed before and so is ready to be written.public Object current() {Entry entry = flushedEntry;if (entry == null) {return null;}return entry.msg;}//Notify the ChannelPromise of the current message about writing progress.public void progress(long amount) {Entry e = flushedEntry;assert e != null;ChannelPromise p = e.promise;if (p instanceof ChannelProgressivePromise) {long progress = e.progress + amount;e.progress = progress;((ChannelProgressivePromise) p).tryProgress(progress, e.total);}}private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {if (size == 0) {return;}long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {setWritable(invokeLater);}}private void setWritable(boolean invokeLater) {for (;;) {final int oldValue = unwritable;final int newValue = oldValue & ~1;if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue != 0 && newValue == 0) {fireChannelWritabilityChanged(invokeLater);}break;}}}...
}
10.如何把对象变成字节流写到unsafe底层
当调用ctx.channel().writeAndFlush(user)将自定义的User对象沿着整个Pipeline进行传播时:
首先会调用tail结点的write()方法开始往前传播,传播到一个继承自MessageToByteEncoder的结点。该结点会实现MessageToByteEncoder的encode()方法来把自定义的User对象转换成一个ByteBuf对象。转换的过程首先会由MessageToByteEncoder分配一个ByteBuf对象,然后再调用其子类实现的抽象方法encode()将User对象填充到ByteBuf对象中。填充完之后继续调用write()方法把该ByteBuf对象往前进行传播,默认下最终会传播到head结点。
其中head结点的write()方法会通过底层的unsafe进行如下处理:把当前的ByteBuf对象添加到unsafe维护的一个写缓冲区里,同时计算写缓冲区大小是否超过64KB。如果写缓冲区大小超过了64KB,则设置当前Channel不可写。完成write()方法的传播后,head结点的unsafe对象维护的写缓冲区便对应着一个ByteBuf队列,它是一个单向链表。
然后会调用tail结点的flush()方法开始往前传播,默认下最终会传播到head结点。head结点在接收到flush事件时会通过底层的unsafe进行如下处理:首先进行指针调整,然后通过循环遍历从写缓冲区里把ByteBuf对象取出来。每拿出一个ByteBuf对象都会把它转化为JDK底层可以接受的ByteBuffer对象,最终通过JDK的Channel把该ByteBuffer对象写出去。每写完一个ByteBuffer对象都会把写缓冲区里的当前ByteBuf所在的Entry结点进行删除,并且判断如果当前写缓冲区里的大小已经小于32KB就通过自旋 + CAS重新设置Channel为可写。
相关文章:
Netty源码—8.编解码原理二
大纲 1.读数据入口 2.拆包原理 3.ByteToMessageDecoder解码步骤 4.解码器抽象的解码过程总结 5.Netty里常见的开箱即用的解码器 6.writeAndFlush()方法的大体步骤 7.MessageToByteEncoder的编码步骤 8.unsafe.write()写队列 9.unsafe.flush()刷新写队列 10.如何把对象…...
【踩坑系列】使用httpclient调用第三方接口返回javax.net.ssl.SSLHandshakeException异常
1. 踩坑经历 最近做了个需求,需要调用第三方接口获取数据,在联调时一直失败,代码抛出javax.net.ssl.SSLHandshakeException异常, 具体错误信息如下所示: javax.net.ssl.SSLHandshakeException: sun.security.validat…...
双目云台摄像头全方位监控方案
双目云台摄像头是一种具有两个镜头的摄像头设备,通常配备云台功能,能够实现水平和垂直方向的旋转,从而提供全方位的监控视角: 一、工作原理与特点 工作原理 :双目云台摄像头利用仿生学原理,通过两个标定后的…...
测谎仪策略思路
来源:【东吴金工 金工专题】“高频价量相关性拥抱CTA”系列研究(四):CPV因子期货版3.0—CPV测谎机 原创 高子剑 量化邻距离 2024年09月20日 14:37 该报告主要介绍了“高频价量相关性拥抱CTA”系列研究中CPV因子期货版的相关内容,…...
2025年移动端开发性能优化实践与趋势分析
启动速度优化 本质:缩短首次可见帧渲染时间。 方法: iOS:利用Core ML本地模型轻量化部署,减少云端等待。Android:强制启用SplashScreen API,通过setKeepOnScreenCondition控制动画时长。冷启动需将耗时操…...
VScode-i18n-ally-Vue
参考这篇文章,做Vue项目的国际化配置,本篇文章主要解释,下载了i18n之后,该如何对Vscode进行配置 https://juejin.cn/post/7271964525998309428 i18n Ally全局配置项 Vscode中安装i18n Ally插件,并设置其配置项&#…...
vue vue3 走马灯Carousel
背景: 在项目中需要展示多张图片,但在页面上只有一张图片的有限位置,此时考虑使用轮播图实现多张图片的展示。element组件官网有走马灯Carousel的组件详细介绍。 实现效果: 官网链接:点击跳转 核心代码: …...
Android设计模式之Builder模式
一、定义:将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。 二、核心思想: 分离构造与表示:将对象的构建过程(如参数组合、校验逻辑)与对象本身分离。 链式调用:通…...
【时时三省】(C语言基础)关系运算符和关系表达式
山不在高,有仙则名。水不在深,有龙则灵。 ----CSDN 时时三省 在if语句中对关系表达式disc > 0进行判断。其中的“>”是一个比较符,用来对两个数值进行比较。在C语言中,比较符(或称比较运算符)称为关…...
运算放大器(二)运算放大器的选型与应用
1.运算放大器的工艺决定Vos和Ib 2.TI放大器的命名规律 3.TI精密放大器家族 4.精密运放的选型指南 5.高共模抑制比放大器 6.TI其他的精密放大器 7.选型时需考虑的问题 8.TI精密运放选型实例 先确定供电电压 9.确定放大器的步骤 参考: 注:本文出自对b…...
vulhub靶场jangow-01-1.0.1
启动靶机时点shift停在这个界面 点e进入编辑页面,把ro改成rw signie init/bin/bash Ctrlx保存,ip a查看网卡信息 vim /etc/network/interfaces 把enp0s17改为ens33,保存退出 重启靶机,nmap扫ip ip为192.168.93.179 nmap扫端口 扫…...
android 一步完成 aab 安装到手机
家人们谁懂!在 Android 系统安装 aab 应用超麻烦。满心期待快速体验,却发现 aab 无法直装,得先转为 apks 格式,这过程复杂易错。好不容易转好,还得安装 apks,一番折腾,时间与耐心全耗尽。别愁&a…...
mysqlworkbench导入.sql文件
1、MySQL Workbench 新建数据库 或者 在左侧导航栏的 Schemas 区域右键选择 Create Schema...输入数据库名称(例如 mydatabase),点击 Apply确认创建,点击 Finish 2、选择目标数据库 在左侧导航栏的 Schemas 列表中&a…...
pyqt 信号与槽
PySide6 信号与槽机制详解 引言 PySide6 是 Qt for Python 的官方绑定库,为 Python 提供了强大的 GUI 开发能力。其中,信号与槽(Signals and Slots) 机制是 Qt 事件处理系统的核心,它允许对象之间进行松耦合的通信&a…...
深入探索C++:从基础到实践
目录 引言 一、C 基础语法与特性 (一)命名空间(Namespace) 单独使用 嵌套使用 调用形式 (二)输入输出流(I/O Streams) (三)变量作用域 二、C 的…...
从零开始完成冒泡排序(0基础)——C语言版
文章目录 前言一、冒泡排序的基本思想二、冒泡排序的执行过程(一)第一轮排序(二)第二轮排序(三)第三轮排序(四)第四轮排序 三、冒泡排序的代码实现(C语言)&am…...
Echars插入的柱状图条形图,鼠标放在图上显示坐标值
只需要将axiosPointer改为cross axisPointer.type支持类型及作用: line:默认直线型指向线shadow:显示坐标轴方向的阴影区域cross:交叉线(横向纵向双线)none:不显示指向器inside:结合…...
机械臂如何稳稳上桌?Mujoco场景修改实操
视频讲解: 机械臂如何稳稳上桌?Mujoco场景修改实操 前面《常见机械臂模型不用找!Mujoco这儿都有!》中介绍的mujoco-menagerie中机械臂大多都是base_link放在地上的,这些场景往往和真实的场景对应不上,比如机…...
金融级密码管理器——抗内存扫描的密钥保险箱
目录 金融级密码管理器 —— 抗内存扫描的密钥保险箱一、模块概述与设计背景二、技术原理与设计目标2.1 关键安全原理2.2 设计目标三、系统架构设计3.1 系统架构图(Mermaid示意图)四、关键技术与安全策略4.1 密钥分割与加密存储4.2 动态内存随机化技术4.3 内存扫描检测与自动…...
如何查看 SQL Server 的兼容性级别
在 SQL Server 中,兼容性级别是一个非常重要的设置,它决定了数据库在特定版本的 SQL Server 中运行时所使用的行为和功能。不同版本的 SQL Server 可能会在 SQL 查询优化、索引、语法、错误处理等方面有差异,因此,设置正确的兼容性…...
AI for CFD入门指南(传承版)
AI for CFD入门指南 前言适用对象核心目标基础准备传承机制 AI for CFDLibtorch的介绍与使用方法PytorchAutogluon MakefileVscodeOpenFOAMParaviewGambit 前言 适用对象 新加入课题组的硕士/博士研究生对AICFD交叉领域感兴趣的本科生实习生需要快速上手组内研究工具的合作研…...
人工智能与网络安全
目录 1、人工智能的安全和安全的人工智能各有什么含义,如何解决 2、当人工智能技术应用于某一安全领域,会对该领域的攻守双方带来哪些机遇与挑战 3、ChatGPT原理 、ChatGPT的缺陷 ChatGPT的缺陷 4、人工智能与算力,风险挑战 应对 5、人…...
GPIO输出实验,控制LED灯
1.实验工具:FSMP1A开发板 核心板: 拓展板: 2.实验要求:编写汇编程序,实现三盏灯流水 程序代码: .text .global _start _start: 将RCC_MP_AHB4ENSET寄存器第4位设置为1,使能GPIO外设时钟 …...
小区团购管理设计与实现(代码+数据库+LW)
摘 要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此,在计算机上安装小区团购管理软件来发挥其高效地信息处理的作用࿰…...
How to use pgbench to test performance for PostgreSQL?
pgbench 是一个用于测试 PostgreSQL 数据库性能的基准测试工具。通过模拟多个客户端并发执行 SQL 查询,它可以帮助你评估数据库的性能。以下是使用 pgbench 的基本步骤: 安装 pgbench pgbench 是 PostgreSQL 的一部分,因此在安装 PostgreSQ…...
dbeaver连接mongodb 插入日期变成了字符串
dbeaver插入mongodb数据 日期默认使用ISODate处理,但是插入数据以后实际上是ISODate(2025-03-03T03:25:19.640Z)字符串 INSERT INTO xxx.aaa (_id, chatId, buddyId, pId, lastChatId, inspiration, createTime, modelType, version, selectedInspiration, _class)…...
wgcloud怎么实现服务器或者主机的远程关机、重启操作吗
可以,WGCLOUD的指令下发模块可以实现远程关机和重启 使用指令下发模块,重启主机,远程关机,重启agent程序- WGCLOUD...
PrimeTime生成.lib竟暗藏PG添加Bug
在primeTime里生成lib,如何能带上相关的pg信息? 这是一位群友的发问,就这个问题总结了下可能的原因和解决步骤: 概念 PrimeTime是Synopsys的静态时序分析工具,通常用于在设计的各个阶段进行时序验证。 1)…...
电话号码的字母组合组合总和II 回溯注意事项(Java)
电话号码的字母组合 思路:多个循环可以考虑回溯。 首先明确: 循环的宽度是多少,即从哪些区间取数(本题目中每个数字都是3个字母,都是从三个字母中取一个数,所以可以确定循环宽度就是每个数字对应的字符串…...
【软件工程】填空题
真题 2024-10 16.数据字典是用来定义_____中各个成分的具体含义的。 17.模块设计的基本原则是_____。 18.接口是操作的一个集合,其中每个操作描述了类、构件或子系统的一个_____。 19.耦合是指不同模块之间_____的度量。 20.RUP的突出特点是,它是一种以用况为驱动的、…...
回归——数学公式推导全过程
文章目录 一、案例引入 二、如何求出正确参数 1. 最速下降法 1)多项式回归 2)多重回归 2. 随机梯度下降法 一、案例引入 以Web广告和点击量的关系为例来学习回归,假设投入的广告费和点击量呈现下图对应关系。 思考:如果花了…...
线程池详解:在SpringBoot中的最佳实践
线程池详解:在SpringBoot中的最佳实践 引言 在Java并发编程中,线程池是一种非常重要的资源管理工具,它允许我们在应用程序中有效地管理和重用线程,从而提高性能并降低资源消耗。特别是在SpringBoot等企业级应用中,正…...
.NET开源的智能体相关项目推荐
一、AntSK 由AIDotNet团队开发的人工智能知识库与智能体框架,支持多模型集成和离线部署能力。 核心能力: • 支持OpenAI、Azure OpenAI、星火、阿里灵积等主流大模型,以及20余种国产数据库(如达梦) • 内置语义内核&a…...
spring-security原理与应用系列:ignoredRequests
目录 WebSecurityConfig 何时调用 configure(WebSecurity) AbstractConfiguredSecurityBuilder 如何赋值ignoredRequests 紧接上一篇文章,这一篇我们来看看核心过滤器FilterChainProxy的构造参数对象ignoredRequests是如何被赋值的? 点击WebSecurity…...
(windows)conda虚拟环境下open-webui安装与启动
一、创建conda环境 重点强调下,如果用python pip安装,一定要选择python3.11系列版本,我选的3.11.9。 如果你的版本不是这个系列,将会出现一些未知的问题。 conda create -n open-webui python3.11 -y如下就创建好了 二、安装o…...
CentOS系统下安装tesseract-ocr5.x版本
CentOS系统下安装tesseract-ocr5.x版本 安装依赖包: yum update -y yum install autoconf automake libtool libjpeg-devel libpng-devel libtiff-devel zlib-devel yum install automake libtool bzip2 -y手动编译安装GCC(因系统默认安装的GCC版本比较…...
第五周日志-伪协议(3)
常见读取源码的file,php://filter和各种编码 还有执行php的 php://input和各种编码,data 在进行文件包含之前,先定位一下 Flag 文件的位置(这里可以使用工具扫) or直接访问 /flag.php 文件,结果返回为空&…...
飞牛NAS本地部署小雅Alist结合内网穿透实现跨地域远程在线访问观影
文章目录 前言1. VMware安装飞牛云(fnOS)1.1 打开VMware创建虚拟机1.3 初始化系统 2. 飞牛云搭建小雅Alist3. 公网远程访问小雅Alist3.1 安装Cpolar内网穿透3.2 创建远程连接公网地址 4. 固定Alist小雅公网地址 前言 嘿,小伙伴们,…...
十七天-Numpy 学习笔记
Numpy 学习笔记 Numpy 作为 Python 中用于进行科学计算的核心库,提供了高性能的多维数组对象,以及大量用于数组操作的工具。下面围绕 “常量”“数据类型”“时间日期和时间增量” 三个方面,梳理 Numpy 中基本的数据概念和数组创建相关知识。…...
浅谈WebSocket-FLV
FLV是一种视频数据封装格式,这种封装被标准通信协议HTTP-FLV和RTMP协议应用。 而WebSocket-FLV是一种非标的FLV封装数据从后端发送到前端的一种方式。 在WebSocket的url请求中,包含了需要请求设备的视频相关信息,在视频数据到达时,…...
milvus-use教程 python
简介 项目地址:milvus-use: milvus-use教程 python 需求描述 参考vanna项目,获取数据库元数据和问题sql对,存入Milvus向量数据库,之后进行检索,返回相似的数据库表和问题对。本项目采用的嵌入模型为m3e-large。该该…...
Python列表生成式
Python 的 列表生成式(List Comprehension) 是一种简洁高效的创建列表的方式,可以用一行代码替代多行循环逻辑。 传统的循环的写法 # 循环遍历列表中的每个元素,并将其平方后添加到新的列表中 original [0, 1, 2, 3, 4] squares…...
MATLAB绘图配色包说明
本栏目将分享MATLAB数据分析图表,该贴讲述配色包的使用 将配色包colormap_nclCM文件夹添加到路径close all(尽量不要删),使用map colormap(nclCM(309))时会多出来一张空白图片。配色资源来自slandarer;找不到合适颜色…...
Cursor异常问题全解析-无限使用
title: Cursor异常问题全解析无限使用 tags: cursor categories:aiai编程 mathjax: true description: Cursor异常问题全解析与解决方案大全 abbrlink: 64908bd0 date: 2025-03-19 14:48:32 🤖 Assistant 🚨 Cursor异常问题全解析与解决方案大全 &…...
Ubuntu系统保姆级Paperless-ngx部署指南:零基础实现文档云端化管理
文章目录 前言1.关于Paperless-ngx2.Docker部署3.简单使用paperless4.安装cpolar内网穿透5. 配置公网地址6. 配置固定公网地址总结 前言 在当今快节奏的办公环境中,文档管理成为了一个不可忽视的问题。想象一下这样的场景:你需要一份重要的合同…...
资本运营:基于Python实现的资本运作模拟
基于Python实现的一个简单的资本运营框架; 企业生命周期演示:观察初创→成长→上市→并购全流程 行业对比分析:不同行业的财务特征和估值差异 资本运作策略:体验IPO定价、投资决策、并购整合等操作 市场动态观察ÿ…...
每日总结3.27
蓝桥刷题 1. 团建 (树dfs) #include <bits/stdc.h> using namespace std; const int N200005; int a[N],b[N]; int ans; map<int,vector<int>>m1,m2; void dfs(int x,int y,int count) { if(a[x]!b[y]) {return;} ansmax(ans,c…...
3-2RYU控制器应用程序开发(一)
图1 SDN框架 通过利用Ryu框架的丰富功能以及RYU应用程序开发中的事件驱动(装饰器)的编程模型,我们能够开发符合需求的SDN控制器应用程序。通过开发的RYU控制器可以实现各种网络策略控制和管理功能。在SDN软件定义网络编程中,RYU应…...
算法250327题目
1114: 4006 AB问题 题目描述 给定两个整数A和B,其表示形式是:从个位开始,每三位数用逗号,隔开。 现在请计算AB的结果,并以正常形式输出。 输入 输入包含多组数据,每组数据占一行,由两个整数A和B组成&am…...
数据结构:汉诺塔问题的递归求解和分析
递归方法求解该类问题,是一种简单的思维方法,通常比使用迭代方法更简单。但是,递归方法也有劣势。此处以典型的汉诺塔问题(Tower of Hanoi)为例给予说明。 汉诺塔是根据一个传说形成的数学问题,最早是由法国…...