当前位置: 首页 > news >正文

Netty启动源码NioEventLoop剖析accept剖析read剖析write剖析

学习链接

NIO&Netty - 专栏

  • Netty核心技术十–Netty 核心源码剖析
  • Netty核心技术九–TCP 粘包和拆包及解决方案
  • Netty核心技术七–Google Protobuf
  • Netty核心技术六–Netty核心模块组件
  • Netty核心技术五–Netty高性能架构设计

聊聊Netty那些事儿 - 专栏

  • 一文搞懂Netty发送数据全流程 | 你想知道的细节全在这里

netty源码解析 - 系列

  • Netty源码分析 (一)----- NioEventLoopGroup
  • Netty源码分析 (二)----- ServerBootstrap
  • Netty源码分析 (三)----- 服务端启动源码分析
  • Netty源码分析 (四)----- ChannelPipeline
  • Netty源码分析 (五)----- 数据如何在 pipeline 中流动
  • Netty源码分析 (六)----- 客户端接入accept过程
  • Netty源码分析 (七)----- read过程 源码分析
  • Netty源码分析 (八)----- write过程 源码分析
  • Netty源码分析 (九)----- 拆包器的奥秘
  • Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
  • Netty源码分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
  • Netty源码分析 (十二)----- 心跳服务之 IdleStateHandler 源码分析

文章目录

  • 学习链接
  • 1. 源码分析
    • 1.1 启动剖析
      • AbstractBootstrap#doBind
        • AbstractBootstrap#initAndRegister
          • ServerBootstrap#init
          • AbstractUnsafe#register
          • - ChannelInitializer#initChannel
        • AbstractBootstrap#doBind0
          • AbstractUnsafe#bind
          • - NioServerSocketChannel#doBind
          • - HeadContext#channelActive
          • -- AbstractNioChannel#doBeginRead
    • 1.2 NioEventLoop 剖析
      • NioEventLoop的重要组成
      • selector何时创建
      • nio线程在何时启动
        • SingleThreadEventExecutor#execute
          • *NioEventLoop#run
          • NioEventLoop#select
          • NioEventLoop#processSelectedKeys
          • - NioEventLoop#processSelectedKey
    • 1.3 accept 剖析
      • AbstractNioMessageChannel.NioMessageUnsafe#read
      • ServerBootstrapAcceptor#channelRead
        • AbstractChannel.AbstractUnsafe#register
          • *AbstractUnsafe#register0
          • - HeadContext#channelActive
          • --AbstractNioChannel#doBeginRead
    • 1.4 read 剖析
      • AbstractNioByteChannel.NioByteUnsafe#read
        • NioSocketChannel#doReadBytes
        • MaxMessageHandle#continueReading
    • 1.5 write剖析
      • write:写队列
      • flush:刷新写队列
      • writeAndFlush: 写队列并刷新

1. 源码分析

1.1 启动剖析

我们就来看看 netty 中对下面的代码是怎样进行处理的

(先明确Java nio的基础步骤如下,而netty在启动过程中,也是需要做下面的事情的)

//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open(); //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
serverSocketChannel.configureBlocking(false);//4 启动 nio boss 线程执行接下来的操作//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
// (注意,这里将NioServerSocketChannel作为附件绑定到了selectionKey上,当此ServerSocketChannel有可连接事件时,就可以获取到此selectionKey,从而获取到对应的NioServerSocketChannel)
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);// (ServerBootstrapAcceptor是ChannelInboundHandlerAdapter入站类型的处理器)
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);

在这里插入图片描述

源码入口,可以从下面的代码进入

(暂时先不看NioEventLoopGroup,而Selector是存在于NioEventLoop中的,所以selector.open暂时不看)

public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8880); // 以bind为入口}
}

AbstractBootstrap#doBind

入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

(1、注意main线程和nio线程的切换;

​ 2、initAndRegister 对应 nio中 创建ServerSocketChannel 和 把ServerSocketChannel注册到selector上

​ 3、doBind0 对应 nio中 bind监听端口)

private ChannelFuture doBind(final SocketAddress localAddress) {// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 2. 因为 initAndRegister 是异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分// 2.1 如果已经完成(如果前面做的比较快,就进入这个if块)if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();// 3.1 立刻调用 doBind0doBind0(regFuture, channel, localAddress, promise);return promise;} // 2.2 还没有完成else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 3.2 回调 doBind0regFuture.addListener(new ChannelFutureListener() {// (这个operationComplete是由nio线程来调用的)@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 处理异常...promise.setFailure(cause);} else {promise.registered();// 3. 由注册线程去执行 doBind0doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
AbstractBootstrap#initAndRegister

关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() {Channel channel = null;try {//(1、这里会去调用NioServerSocketChannel的无参构造方法去得到channel// 2、在NioServerSocketChannel的无参构造方法中会去创建javanio的ServerSocketChannel,//    并且将该ServerSocketChannel维护在NioServerSocketChannel中,//    并配置为非阻塞模式,感兴趣的事件是OP_ACCEPT,但是还没注册到selector上,//    只是维护了这些基本信息到NioServerSocketChannel。//    并且在NioServerSocketChannel的构造方法中会去创建NioServerSocketChannelConfig//    维护到NioServerSocketChannel中。//    并且NioServerSocketChannel的构造方法中会去创建DefaultChannelPipeline//    维护到NioServerSocketChannel中)channel = channelFactory.newChannel();// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializerinit(channel);} catch (Throwable t) {// 处理异常...return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上// (这里会从eventLoopGroup中挑选出1个eventLoop来注册ServerSocketChannel)ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {// 处理异常...}return regFuture;
}
ServerBootstrap#init

关键代码 io.netty.bootstrap.ServerBootstrap#init

(这里会给ServerSocketChannel的pipeline中添加1个ChannelInitializer初始化器,该初始化器只会执行1次,后续将会移除掉。)

// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 为 NioServerSocketChannel 的pipeline 添加初始化器!!!// (1、该初始化器的initChannel方法只会执行1次,后续该初始化器将会移除掉,//      移除动作是在ChannelInitializer#initChannel中操作的。//  2、注意该初始化器的initChannel方法在此处尚未被调用。//  3、initChannel方法的调用时机是在AbstractChannel的register0方法中,//     在做完将channel注册到selector上之后的//     pipeline.invokeHandlerAddedIfNeeded()这句代码调用的)p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// (这里意味着可以通过配置给config1个handler,//  从而给serverSocketChannel的pipeline添加1个handler)ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// 1、初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel// 2、ServerBootstrapAcceptor的作用是在selector触发可连接事件时,建立连接// 3、保证添加这个动作是在nio线程中完成的ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
AbstractUnsafe#register

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查,略...AbstractChannel.this.eventLoop = eventLoop;// (判断当前线程是不是eventLoop的线程,因为顺着刚刚的逻辑,当前还在主线程中,所以走else)if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行// 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程// 这行代码完成的事实是 main -> nio boss 线程的切换eventLoop.execute(new Runnable() {@Overridepublic void run() {// (该方法在nio线程上执行,并且注意promise传进去了,用于通知其它线程)register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}

#####- *AbstractUnsafe#register0

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 1.2.1 【将原生的 nio channel 绑定到 selector 上】,//       注意此时没有注册 selector 关注事件,附件为 NioServerSocketChanneldoRegister();neverRegistered = false;registered = true;// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel//(1、调用到ServerBootstrap的init方法中为NioServerSocketChannel的//    pipeline添加的初始化器的initChannel方法。//  2、该initChannel方向pipeline中添加了ServerBootstrapAcceptor这个入站处理器)pipeline.invokeHandlerAddedIfNeeded();// (给promise对象1个成功的结果,这样前面的监听就能收到这个结果触发operationComplete方法,//   就会去通知前面在AbstractBootstrap#doBind方法中注册的监听去做doBind0绑定监听端口)// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0safeSetSuccess(promise);pipeline.fireChannelRegistered();// 对应 server socket channel 还未绑定,isActive 为 falseif (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将java的channel注册到了eventLoop的selector上// (此时,尚未注册感兴趣的事件。同时,注意当前this作为附件绑定到了selectionKey)selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {eventLoop().selectNow();selected = true;} else {throw e;}}}}
- ChannelInitializer#initChannel

关键代码 io.netty.channel.ChannelInitializer#initChannel

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {// 1.2.2.1 执行初始化!!!(调用前面添加的初始化器的initChannel方法)initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {// 1.2.2.2 移除初始化器!!!(调用完成后,移除初始化器)ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;
}
AbstractBootstrap#doBind0

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

// 3.1 或 3.2 执行 doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 1、确保执行是在nio eventLoop线程中执行// 2、绑定会从pipe的tail开始找,最终会到headContext中调用到AbstractUnsafe的bind方法channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
AbstractUnsafe#bind

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// 记录日志...}boolean wasActive = isActive();try {// 3.3 【执行端口绑定】doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}// 从这里可以看出是绑定端口后,再去触发active事件的// 当前serverSocketChannel的pipeline已经添加了head-acceptor-tail处理器链, // 并且已经绑定好端口了,所以这里触发pipeline上所有handler的active事件,// 接下来,去看HeadContext#channelActive方法if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 3.4 【触发 active 事件】pipeline.fireChannelActive();}});}safeSetSuccess(promise);
}
- NioServerSocketChannel#doBind

3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {// 调用java原生channel的绑定端口的方法javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
- HeadContext#channelActive

3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

public void channelActive(ChannelHandlerContext ctx) {// 触发所有handler的active事件ctx.fireChannelActive();// 从这里可以看出是所有handler的active触发之后,再将channel注册感兴趣的事件的// 触发 read ,目的是为了触发channel的事件注册,注册OP_ACCEPT事件,//            见AbstractNioChannel#doBeginRead// (注意: NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)readIfIsAutoRead();
}
– AbstractNioChannel#doBeginRead

关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();// readInterestOp 取值是 16,在NioServerSocketChannel创建时初始化好,代表关注 accept 事件if ((interestOps & readInterestOp) == 0) {// 注册感兴趣的事件!!!selectionKey.interestOps(interestOps | readInterestOp);}
}

1.2 NioEventLoop 剖析

在这里插入图片描述

NioEventLoop的重要组成

1、在NioEventLoop类中有成员变量

private Selector selector; 
private Selector unwrappedSelector;

2、在NioEventLoop的父类SingleThreadEventExecutor中有成员变量:

private volatile Thread thread;// 使用的跟上面同1个thread
private final Executor executor; // 由于eventLoop是单线程,其它的任务先放在taskQueue任务队列中,然后由单线程依次执行
private final Queue<Runnable> taskQueue;

3、在NioEventLoop的父类的父类AbstractScheduledEventExecutor中有成员变量

// 用来处理定时任务
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务)

selector何时创建

在NioEventLoop的唯一构造方法中,创建了selector

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");// 在这里创建selectorfinal SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

那为什么会有2个selector呢?

因为netty要把selector里面原来的selectionKey的set实现改为用数组实现,因为数组遍历的性能比set好!

nio线程在何时启动

(当首次调用eventLoop的execute方法时,会启动线程,并且state状态位控制只会启动1次。)

public class TestEventLoop {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();group.next()// 入口.execute(() -> {System.out.println("Hello");});}}
SingleThreadEventExecutor#execute

提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断当前线程是否是eventLoop的thread,很显然,现在eventLoop的thread是nullboolean inEventLoop = inEventLoop();// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列addTask(task);if (!inEventLoop) {// inEventLoop 如果为 false 表示由其它线程来调用 execute,// 即首次调用时,需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThreadstartThread();if (isShutdown()) {// 如果已经 shutdown,做拒绝逻辑,代码略...}}if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果线程由于 IO select 阻塞了,添加任务的线程需要负责唤醒 NioEventLoop 线程wakeup(inEventLoop);}
}private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 启动线程doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}private void doStartThread() {assert thread == null;// 这个executor是在 MultithreadEventExecutorGroup的构造方法中初始化的,//               (直接new的ThreadPerTaskExecutor)executor.execute(new Runnable() {@Overridepublic void run() {// 将线程池的当前线程保存在成员变量中,以便后续使用// 将thread线程设置为执行线程// (所以eventLoopgroup中的executor属性的线程和thread属性是同一个线程)thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下// 【启动 EventLoop 主循环 】SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally { // 清理工作,代码略... }}});
}@Override
protected void wakeup(boolean inEventLoop) {// !eventLoop的理解: 只有其它非nio线程提交任务,才会有机会去唤醒selector停止阻塞//                  (因为如果是eventLoop自己提交任务给自己,在提交的时候,//                    当前eventLoop正在运行,没有阻塞,所以不需要唤醒selector)// wakenUp的理解: 当多个其它非nio线程提交任务,那么只会将selector唤醒1次if (!inEventLoop && wakenUp.compareAndSet(false, true)) {// 唤醒 select 阻塞线程// (这个wakeup调用后,如果selector正在select,那么直接唤醒,//   如果selector还没select,那么该selector去select时,就不会阻塞。//   类似于LockSupport的park和unpark。)selector.wakeup();}
}
*NioEventLoop#run

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有定时任务,有没有 IO 事件,如果有,则执行。

// 死循环执行
for (;;) {try {try {//calculateStrategy 的逻辑如下:/* [代码] hasTasks ? selectNow() : SelectStrategy.SELECT; */// 当有任务时, 会执行一次selectNow(),去获取看看是否有io事件,//           并且会清除上一次的wakeup结果, 无论有没有IO事件,都会跳过switch// (因为有任务的话,即便没有io事件,也得干活,所以没有必要阻塞了)// 当没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞// (因为没有任务的话,那就等有io事件了,再干活,所以就设置超时阻塞,//   同时还要看在阻塞期间有其它非nio线程提交任务,并唤醒selector。//   那么默认阻塞多久呢?那就需要看NioEventLoop#select(boolean oldWakenUp)方法//    默认是阻塞1s + 0.5ms,不过还得看有没有定时任务。)switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE: // -2continue;case SelectStrategy.BUSY_WAIT: // -3case SelectStrategy.SELECT: // -1// 因为IO线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,// 因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒// 进行 select 阻塞,并设置唤醒状态为 falseboolean oldWakenUp = wakenUp.getAndSet(false);// 这里select方法中会调用select(timeoutMillis)阻塞,那么什么时候唤醒呢?// 当有io事件时自动唤醒// 或者超时自动唤醒// 或者有任务提交时,手动唤醒以便及时处理io事件以外的普通任务// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup// 下面的 select 方法不会阻塞// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时// 才能执行,让 select 方法无谓阻塞select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}// 有任务 或者 正在等待io事件但io事件还没来就被唤醒了 或者 io事件来了cancelledKeys = 0;needsToSelectAgain = false;// (如果eventLoop在执行非io任务的事件过长,势必会影响到io事件的处理)// ioRatio 默认是 50final int ioRatio = this.ioRatio;// 如果ioRatio设置为 100,那么会让普通任务都运行完。// 如果ioRatio不设置为100,那么会根据io事件处理的运行时间,算出普通任务可以运行的时间,//                       算出的这个时间仅仅是用来判断要不要继续运行下1个普通任务,//                       因此,如果1个普通任务本身耗时就特别长,//                            这里是没有中断这个任务的说法的,而且还得任务响应中断才行。if (ioRatio == 100) {try {processSelectedKeys();} finally {// ioRatio 为 100 时,总是运行完所有非 IO 任务runAllTasks();}} else {                final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 记录 io 事件处理耗时final long ioTime = System.nanoTime() - ioStartTime;// 运行非 IO 任务,一旦超时会退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}
}
NioEventLoop#select

io.netty.channel.nio.NioEventLoop#select

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// 计算等待时间// * 没有 scheduledTask定时任务,超时时间为 1s// * 有 scheduledTas定时任务k,超时时间为 `下一个定时任务执行时间 - 当前时间`long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果超时,退出循环if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果期间又有task退出循环,如果没这个判断,那么任务就会等到下次select超时时才能被执行// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeupif (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// select 有限时阻塞// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,// 导致不断空轮询,cpu 占用 100%// (所以就用了 selectCnt ++ 来统计次数,因为如果bug发生的话,循环会很快,//   这样selectCnt就会猛增,就检测到了)int selectedKeys = selector.select(timeoutMillis);// 计数加 1selectCnt ++;// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()|| hasScheduledTasks()) {break;}if (Thread.interrupted()) {// 线程被打断,退出循环// 记录日志selectCnt = 1;break;}long time = System.nanoTime();if (time-TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 如果超时,计数重置为 1,下次循环就会 breakselectCnt = 1;} // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512// 这是为了解决 nio 空轮询 bug// (重新创建1个selector,来替换原来的selector,来解决nio空轮询bug)else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 重建 selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {// 记录日志}} catch (CancelledKeyException e) {// 记录日志}
}
NioEventLoop#processSelectedKeys

处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

private void processSelectedKeys() {if (selectedKeys != null) {// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}private void processSelectedKeysOptimized() {// 遍历所有的selectionKeyfor (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// 获取完就置为nullselectedKeys.keys[i] = null;// 附件就是 NioServerSocketChannelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {// 处理selectionKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}
- NioEventLoop#processSelectedKey

io.netty.channel.nio.NioEventLoop#processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 当 key 取消或关闭时会导致这个 key 无效if (!k.isValid()) {// 无效时处理...return;}try {int readyOps = k.readyOps();// 连接事件// (这个是客户端需要监听处理的事件,服务端就不用管)if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// OP_ACCEPT 和 OP_CONNECT的理解:// ServerBootstrap用来处理服务器端,而Bootstrap用于客户端。当服务器绑定端口后,会注册OP_ACCEPT事件,等待客户端连接。一旦有连接进来,就会触发这个事件,然后创建子Channel来处理通信。而客户端在连接服务器时,会发起非阻塞的连接操作,这时候会注册OP_CONNECT,当连接建立完成后,触发该事件,之后就可以进行读写操作了// OP_CONNECT只是在连接过程中注册,一旦连接成功,就会触发,之后可能需要修改感兴趣的事件为OP_READ等。// 需要注意,当连接失败时,OP_CONNECT也会触发,这时候需要处理异常情况。比如,在Netty中,连接失败会触发相应的异常处理机制,比如channel的exceptionCaught方法// 总结来说,OP_ACCEPT是服务器端用于接收新连接,而OP_CONNECT是客户端用于处理连接建立完成的事件// 可读:当客户端或服务端的缓冲区有接收到数据,这时候,就会通知程序有数据可以读了,然后这里就会触发read事件,然后handler就使用channel去读取数据,假设这里在handler里面只读了1半数据,然后就不读了,就是说还有数据没有读,但是这个时候,就去处理下1个selectionKey,那么当调用下1次selector.select方法时,仍然会由于有数据要读取,而被唤醒,仍然是对应该channel的selectionKey的可读事件。// 可写:当客户端或服务端需要将数据发送出去,这时候,需要订阅可写事件,当发送缓冲区可写时,就会触发这个事件,然后使用channel将数据写出到缓冲区,假设1次没写完,那就需要继续订阅可写事件,直到全部数据写完,然后数据全部写完之后,取消订阅可写事件,然后又有数据需要发送,就再次订阅可写事件,写完之后,就取消订阅可写事件。// 可写事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}// 可读 或 可接入事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// (这个方法同时处理 可接入 和 可读 事件,因为如果是NioServerSocketChannel它感兴趣的是OP_ACCEPT事件,而如果是NioSocketChannel它感兴趣的是OP_READ事件,对应的unsafe是不一样的。)// 如果是可接入 AbstractNioMessageChannel.NioMessageUnsafe#read// 如果是可读   AbstractNioByteChannel.NioByteUnsafe#readunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

1.3 accept 剖析

在这里插入图片描述

其中,前面3步,在NioEventLoop#processSelectedKey中已经做过分析了。

nio 中如下代码,在 netty 中的流程

//1 阻塞直到事件发生
selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {    //2 拿到一个事件SelectionKey key = iter.next();//3 如果是 accept 事件if (key.isAcceptable()) {//4 执行 acceptSocketChannel channel = serverSocketChannel.accept();channel.configureBlocking(false);//5 关注 read 事件channel.register(selector, SelectionKey.OP_READ);}// ...
}

入口代码

// 服务端
public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8888);}
}
// 客户端
public class TestSourceClient {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).connect(new InetSocketAddress("localhost", 8888));}}

AbstractNioMessageChannel.NioMessageUnsafe#read

先来看可接入事件处理(accept)

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// doReadMessages中执行了accept并创建【NioSocketChannel】作为消息放入readBuf// readBuf 是一个 ArrayList 用来缓存消息// (看下面的doReadMessages方法)int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// localRead 为 1,就一条消息,即接收一个客户端连接allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t; // 忽略暂时的异常}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 触发 read 事件,让NioServerSocketChannel的pipelin上的 handler 处理,// 这时 肯定交给 ServerBootstrapAcceptor#channelReadpipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();// 触发读取完毕事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);// 触发异常事件pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}@Override
protected int doReadMessages(List<Object> buf) throws Exception {// 获取到SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 创建NioSocketChannelbuf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;
}

ServerBootstrapAcceptor#channelRead

关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {// 这时的 msg 是 NioSocketChannelfinal Channel child = (Channel) msg;// NioSocketChannel 添加  childHandler 即初始化器// (这里添加的是初始化器)child.pipeline().addLast(childHandler);// 设置选项setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}
AbstractChannel.AbstractUnsafe#register

又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查,略...AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 这行代码完成的事实是 nio boss -> nio worker 线程的切换!!!eventLoop.execute(new Runnable() {@Overridepublic void run() {// 调用注册的方法register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
*AbstractUnsafe#register0

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 这里面在 AbstractNioChannel 的 doRegister()方法中会将channel注册到selector上doRegister();neverRegistered = false;registered = true;//【关键代码,注意初始化器执行前后。这里将会为NiosocketChannel添加自定义的handler。】// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tailpipeline.invokeHandlerAddedIfNeeded();// 执行后就是 head -> logging handler -> my handler -> tail// 上面将客户端的channel已经配置好了,所以通知promise已经成功设置了safeSetSuccess(promise);// 触发handler的 channelRegistered事件pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {// 触发 pipeline 上 active 事件// (这里就会在HeadContext#channelActive中让channel关注可读事件)pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
- HeadContext#channelActive

回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 触发read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)// (注册可读事件)readIfIsAutoRead();
}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {// 进入该调用,经过pipeline逐链调用,会来到HeadContext的read方法channel.read();}
}@Override
public void read(ChannelHandlerContext ctx) {// 接着这里调用到了AbstractNioChannel#doBeginReadunsafe.beginRead();
}
–AbstractNioChannel#doBeginRead

io.netty.channel.nio.AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 这时候 interestOps 是 0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {// 关注 read 事件!!!selectionKey.interestOps(interestOps | readInterestOp);}
}

1.4 read 剖析

接着NioEventLoop#processSelectedKey的那节,当对方发送消息来时。

AbstractNioByteChannel.NioByteUnsafe#read

再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

public final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// io.netty.allocator.type 决定 allocator 的实现final ByteBufAllocator allocator = config.getAllocator();// 用来分配 byteBuf,确定单次读取大小final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);// 上面还是空的byteBuf// 读取(这里就会将缓冲区中的数据读取到byteBuf中,//      调用NioSocketChannel#doReadBytes)allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;// 触发read事件,让pipeline上的handler处理,这时是处理 NioSocketChannel上的 handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} // 是否要继续循环while (allocHandle.continueReading());allocHandle.readComplete();// 触发 read complete 事件pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}
NioSocketChannel#doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());// 读取数据return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
MaxMessageHandle#continueReading

io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return // 一般为 trueconfig.isAutoRead() &&// respectMaybeMoreData 默认为 true// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&// 小于最大次数,maxMessagePerRead 默认 16totalMessages < maxMessagePerRead &&// 实际读到了数据totalBytesRead > 0;
}

1.5 write剖析

public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8888);}
}
public class TestSourceClient {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().write("halo");}});}}).connect(new InetSocketAddress("localhost", 8888));}}

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);
}

write:写队列

我们来看看channel中unsafe的write方法,先来看看其中的一个属性

AbstractUnsafe

protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

我们来看看 ChannelOutboundBuffer 这个类

public final class ChannelOutboundBuffer {private final Channel channel;private ChannelOutboundBuffer.Entry flushedEntry;private ChannelOutboundBuffer.Entry unflushedEntry;private ChannelOutboundBuffer.Entry tailEntry;

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。其中的属性我们下面会详细讲

我们回到正题,接着看 unsafe.write(msg, promise);

AbstractUnsafe

@Override
public final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;int size;try {msg = filterOutboundMessage(msg);size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, size, promise);
}

1.调用 filterOutboundMessage() 方法,将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

@Override
protected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

2.接下来,估算出需要写入的ByteBuf的size
3.最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) {// 创建一个待写出的消息节点Entry entry = Entry.newInstance(msg, size, total(msg), promise);if (tailEntry == null) {flushedEntry = null;tailEntry = entry;} else {Entry tail = tailEntry;tail.next = entry;tailEntry = entry;}if (unflushedEntry == null) {unflushedEntry = entry;}incrementPendingOutboundBytes(size, false);
}

想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图

在这里插入图片描述
hannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise,下面分别是三个指针的作用

1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点

初次调用 addMessage 之后,各个指针的情况为

在这里插入图片描述
fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的节点

第二次调用 addMessage之后,各个指针的情况为
在这里插入图片描述
第n次调用 addMessage之后,各个指针的情况为
在这里插入图片描述
可以看到,调用n次addMessage,flushedEntry指针一直指向NULL,表示现在还未有节点需要写出到Socket缓冲区,而unFushedEntry之后有n个节点,表示当前还有n个节点尚未写出到Socket缓冲区中去

flush:刷新写队列

不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

HeadContext

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();
}

之后进入到AbstractUnsafe

AbstractUnsafe

public final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {return;}outboundBuffer.addFlush();flush0();
}

flush方法中,先调用 outboundBuffer.addFlush();

ChannelOutboundBuffer

public void addFlush() {Entry entry = unflushedEntry;if (entry != null) {if (flushedEntry == null) {flushedEntry = entry;}do {flushed ++;if (!entry.promise.setUncancellable()) {int pending = entry.cancel();decrementPendingOutboundBytes(pending, false, true);}entry = entry.next;} while (entry != null);unflushedEntry = null;}
}

可以结合前面的图来看,首先拿到 unflushedEntry 指针,然后将 flushedEntry 指向unflushedEntry所指向的节点,调用完毕之后,三个指针的情况如下所示
在这里插入图片描述

相当于所有的节点都即将开始推送出去

接下来,调用 flush0();

AbstractUnsafe

protected void flush0() {doWrite(outboundBuffer);
}

发现这里的核心代码就一个 doWrite,继续跟

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1;boolean setOpWrite = false;for (;;) {// 拿到第一个需要flush的节点的数据Object msg = in.current();if (msg instanceof ByteBuf) {// 强转为ByteBuf,若发现没有数据可读,直接删除该节点ByteBuf buf = (ByteBuf) msg;boolean done = false;long flushedAmount = 0;// 拿到自旋锁迭代次数if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}// 自旋,将当前节点写出for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}}in.progress(flushedAmount);// 写完之后,将当前节点删除if (done) {in.remove();} else {break;}} }
}

这里略微有点复杂,我们分析一下

1.第一步,调用current()先拿到第一个需要flush的节点的数据

ChannelOutBoundBuffer

public Object current() {Entry entry = flushedEntry;if (entry == null) {return null;}return entry.msg;
}

2.第二步,拿到自旋锁的迭代次数

if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();
}

3.自旋的方式将ByteBuf写出到jdk nio的Channel

for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}
}

doWriteBytes 方法跟进去

protected int doWriteBytes(ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes();return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

我们发现,出现了 javaChannel(),表明已经进入到了jdk nio Channel的领域,我们来看看 buf.readBytes(javaChannel(), expectedWrittenBytes);

public int readBytes(GatheringByteChannel out, int length) throws IOException {this.checkReadableBytes(length);int readBytes = this.getBytes(this.readerIndex, out, length);this.readerIndex += readBytes;return readBytes;
}

我们来看关键代码 this.getBytes(this.readerIndex, out, length)

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {this.checkIndex(index, length);if (length == 0) {return 0;} else {ByteBuffer tmpBuf;if (internal) {tmpBuf = this.internalNioBuffer();} else {tmpBuf = ((ByteBuffer)this.memory).duplicate();}index = this.idx(index);tmpBuf.clear().position(index).limit(index + length);//将tmpBuf中的数据写到out中return out.write(tmpBuf);}
}

我们来看看out.write(tmpBuf)

public int write(ByteBuffer src) throws IOException {ensureOpen();if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {int n = 0;int ti = -1;try {begin();ti = threads.add();if (!isOpen())return 0;do {n = IOUtil.write(fd, src, -1, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {threads.remove(ti);end(n > 0);assert IOStatus.check(n);}}
}

和read实现一样,SocketChannelImpl的write方法通过IOUtil的write实现:关键代码 n = IOUtil.write(fd, src, -1, nd);

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {//如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else {//非DirectBuffer//获取已经读取到的位置int var5 = var1.position();//获取可以读到的位置int var6 = var1.limit();assert var5 <= var6;//申请一个原buffer可读大小的DirectByteBufferint var7 = var5 <= var6 ? var6 - var5 : 0;ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);int var10;try {var8.put(var1);var8.flip();var1.position(var5);//通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);}var10 = var9;} finally {//回收分配的DirectByteBufferUtil.offerFirstTemporaryDirectBuffer(var8);}return var10;}
}

代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filterOutboundMessage(),将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

说明到了这一步所有的 var1 意境是直接内存DirectBuffer,就不需要走到else,就不需要write两次了

4.删除该节点

节点的数据已经写入完毕,接下来就需要删除该节点

ChannelOutBoundBuffer

public boolean remove() {Entry e = flushedEntry;Object msg = e.msg;ChannelPromise promise = e.promise;int size = e.pendingSize;removeEntry(e);if (!e.cancelled) {ReferenceCountUtil.safeRelease(msg);safeSuccess(promise);}// recycle the entrye.recycle();return true;
}

首先拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()方法移除该节点

private void removeEntry(Entry e) {if (-- flushed == 0) {flushedEntry = null;if (e == tailEntry) {tailEntry = null;unflushedEntry = null;}} else {flushedEntry = e.next;}
}

这里的remove是逻辑移除,只是将flushedEntry指针移到下个节点,调用完毕之后,节点图示如下
在这里插入图片描述

writeAndFlush: 写队列并刷新

理解了write和flush这两个过程,writeAndFlush 也就不难了

public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}public ChannelFuture writeAndFlush(Object msg) {return writeAndFlush(msg, newPromise());
}public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {write(msg, true, promise);return promise;
}private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} 
}

可以看到,最终,通过一个boolean变量,表示是调用 invokeWriteAndFlush,还是 invokeWrite,invokeWrite便是我们上文中的write过程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {invokeWrite0(msg, promise);invokeFlush0();
}

可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的

private void invokeWrite(Object msg, ChannelPromise promise) {invokeWrite0(msg, promise);
}private void invokeFlush(Object msg, ChannelPromise promise) {invokeFlush0(msg, promise);
}

由此看来,invokeWriteAndFlush基本等价于write方法之后再来一次flush

相关文章:

Netty启动源码NioEventLoop剖析accept剖析read剖析write剖析

学习链接 NIO&Netty - 专栏 Netty核心技术十–Netty 核心源码剖析Netty核心技术九–TCP 粘包和拆包及解决方案Netty核心技术七–Google ProtobufNetty核心技术六–Netty核心模块组件Netty核心技术五–Netty高性能架构设计 聊聊Netty那些事儿 - 专栏 一文搞懂Netty发送数…...

<03.13>八股文补充知识

import java.lang.reflect.*; public class Main {public static void main(String[] args) throws Exception {// 获取 Class 对象//1. 通过类字面量Class<?> clazz Person.class;//2 通过对象实例化String str "Hello";Class<?> clazz_str str.ge…...

[HUBUCTF 2022 新生赛]messy_traffic

下载附件 看到文件类型直接用wireshark打开&#xff0c;对MySQL协议进行追踪流&#xff0c;并没有什么发现&#xff0c;后面对NO.437发现有用信息&#xff0c;http追踪流 发现**system(‘cat passwd.txt’);**这里是在打开查看passwd.txt&#xff0c;密码是"SignUpForHUBU…...

条款1:理解模版性别推导

目录 问题引出 情况1&#xff1a;ParamType是个指针或引用&#xff0c;但不是个万能引用。 情况2&#xff1a;ParamType是个万能引用 情况3&#xff1a;ParamType既非指针也非引用 问题引出 函数模板大致形如&#xff1a; template<typename T> void f(ParamType p…...

kafka连问

1&#xff0c;kafka多消费者指部署多个服务消费节点吗 2&#xff0c;多个消费节点自动组成消费组吗 3&#xff0c;消费者组与多消费节点关系 4&#xff0c;一个分区&#xff0c;多个消费者&#xff0c;可以保证有序消费吗 5&#xff0c;kafka如何实现顺序消费&#xff0c;一…...

Linux中基础开发工具详细介绍

目录 软件包管理器什么是软件包Linux软件生态 yum具体操作查看软件包安装软件卸载软件注意事项 编辑器VimLinux编辑器-vim使用vim的基本概念快速编辑的指令 编译器gcc/g背景知识gcc编译选项预处理(进行宏替换)编译&#xff08;生成汇编&#xff09;汇编&#xff08;生成机器可识…...

浅谈时钟启动和Systemlnit函数

时钟是STM32的关键&#xff0c;是整个系统的心脏&#xff0c;时钟如何启动&#xff0c;时钟源如何选择&#xff0c;各个参数如何设置&#xff0c;我们从源码来简单分析一下时钟的启动函数Systemlnit&#xff08;&#xff09;。 Systemlnit函数简介 我们先来看一下源程序的注释…...

社交软件频繁更新,UI 设计在其中扮演什么角色?

在当今数字化时代&#xff0c;社交软件已成为人们日常生活中不可或缺的一部分。随着科技的飞速发展和用户需求的不断变化&#xff0c;社交软件更新频率日益加快。在这频繁更新的背后&#xff0c;UI 设计扮演着至关重要的角色&#xff0c;它如同社交软件的 “门面担当” 与 “交…...

SQLMesh 系列教程:解锁SQLMesh的宏与变量魔法

在数据库流水线开发中&#xff0c;代码复用与动态配置是提升效率的核心诉求。SQLMesh以其独特的宏系统与用户定义变量机制&#xff0c;重新定义了SQL生成的灵活性。与传统模板引擎不同&#xff0c;SQLMesh的宏并非简单的字符串替换&#xff0c;而是基于语义理解的智能代码重构—…...

React篇之three渲染

需求&#xff1a;拖拽右侧面板&#xff0c;里面的three模型能够自适应 import { useEffect, useState, useRef } from react import ./App.css import * as THREE from three; import { GLTFLoader } from three/addons/loaders/GLTFLoader.js; import { debounce } from loda…...

PHP与前端框架的无缝集成:最佳实践与案例分析

PHP与前端框架的无缝集成&#xff1a;最佳实践与案例分析 在现代Web开发中&#xff0c;PHP作为后端语言与前端框架的集成已成为一种常见的开发模式。无论是传统的MVC架构&#xff0c;还是现代的SPA&#xff08;单页应用&#xff09;&#xff0c;PHP与前端框架的无缝集成能够显…...

Redis内存淘汰策略

Redis 是一种高性能的键值存储系统&#xff0c;广泛用于缓存、消息队列等场景。由于 Redis 数据存储在内存中&#xff0c;而内存资源有限&#xff0c;因此需要内存淘汰策略来管理内存的使用。Redis 提供了多种内存淘汰策略&#xff0c;可以根据不同的应用场景选择合适的策略。 …...

Facebook 的框架及技术栈

一、前端框架与技术 React.js 及其生态系统 核心原理与特点 React.js 是 Facebook 开源的用于构建用户界面的 JavaScript 库。它的核心概念是组件化&#xff0c;将用户界面拆分成一个个独立的、可复用的组件。每个组件都有自己的状态&#xff08;state&#xff09;和属性&#…...

QT中的布局管理

在 Qt 中&#xff0c;布局管理器&#xff08;如 QHBoxLayout 和 QVBoxLayout&#xff09;的构造函数可以接受一个 QWidget* 参数&#xff0c;用于指定该布局的父控件。如果指定了父控件&#xff0c;布局会自动将其管理的控件添加到父控件中。 在你的代码中&#xff0c;QHBoxLa…...

如何学习VBA_3.2.20:DTP与Datepicker实现日期的输入

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的劳动效率&#xff0c;而且可以提高数据处理的准确度。我推出的VBA系列教程共九套和一部VBA汉英手册&#xff0c;现在已经全部完成&#xff0c;希望大家利用、学习。 如果…...

在 LaTeX 中强制表格位于页面顶部

在 LaTeX 中强制表格位于页面顶部&#xff0c;可以通过以下 多种方法结合使用&#xff0c;按优先级推荐&#xff1a; 方法 1&#xff1a;使用 [!t] 位置限定符 原理&#xff1a;通过 [!t] 强制 LaTeX 优先将表格放置在页面顶部&#xff08;Top&#xff09;&#xff0c;! 表示忽…...

dify+mysql的诗词助手

目录 数据库表结构&#xff1a; 数据库查询的http服务搭建&#xff1a; 流程引擎搭建&#xff1a; 开始&#xff0c; HTTP查询数据库&#xff0c; LLM数据分析&#xff0c; 直接回复&#xff0c; 效果测试&#xff1a; 下载链接&#xff1a; 数据库表结构&#xff1a;…...

PyTorch 入门学习

目录 PyTorch 定义 核心作用 应用场景 Pytorch 基本语法 1. 张量的创建 2. 张量的类型转换 3. 张量数值计算 4. 张量运算函数 5. 张量索引操作 6. 张量形状操作 7. 张量拼接操作 8. 自动微分模块 9. 案例-线性回归案例 PyTorch 定义 PyTorch 是一个基于 Python 深…...

【视频】SRS将RTMP转WebRTC、HLS流;获取RTSP转其它流

1、安装依赖库 sudo apt install tclsh sudo apt install cmake sudo apt install autotools-dev automake m4 perl sudo apt install libtool2、源码安装 1)下载源码 https://github.com/ossrs/srs/releases/tag/v5.0-r32)配置、编译 ./configure && make -j83、…...

linux中如何查询文件夹大小

在 Linux 中&#xff0c;可以使用 du 命令查看文件夹大小。以下是常用方法&#xff1a; 标题1. 查看文件夹大小 du -sh /path/to/directory-s&#xff1a;显示总大小。 -h&#xff1a;以易读格式&#xff08;如 KB、MB、GB&#xff09;显示大小。 标题2&#xff1a;查看文件…...

MySQL增删改查操作 -- CRUD

个人主页&#xff1a;顾漂亮 目录 1.CRUD简介 2.Create新增 使用示例&#xff1a; 注意点&#xff1a; 3.Retrieve检索 使用示例&#xff1a; 注意点&#xff1a; 4.where条件查询 前置知识&#xff1a;-- 运算符 比较运算符 使用示例&#xff1a; 注意点&#xf…...

uniapp+Vue3 组件之间的传值方法

一、父子传值&#xff08;props / $emit 、ref / $refs&#xff09; 1、props / $emit 父组件通过 props 向子组件传递数据&#xff0c;子组件通过 $emit 触发事件向父组件传递数据。 父组件&#xff1a; // 父组件中<template><view class"container">…...

TDengine SQL 函数

单行函数 数学函数 ABSACOSASINATANCEILCOSDEGREESEXPFLOORGREATESTLEASTLNLOGMODPIPOWRADIANSRANDROUNDSIGNSINSQRTTANTRUNCATE 字符串函数 ASCIICHARCHAR_LENGTHCONCATCONCAT_WSLENGTHLOWERLTRIMPOSITIONREPEATREPLACERTRIMSUBSTRING/SUBSTRSUBSTRING_INDEXTRIMUPPER 转换函数…...

智能三防手持终端破解传统仓储效率困局

在数字化浪潮的推动下&#xff0c;传统仓储管理模式正面临效率低、成本高、错误频发等瓶颈。如何实现精准、高效、智能化的仓储管理&#xff0c;上海岳冉三防智能手持终端机以RFID技术为核心&#xff0c;结合工业级三防&#xff08;防水、防摔、防尘&#xff09;设计&#xff0…...

力扣——K个一组翻转链表

题目链接&#xff1a; 链接 题目描述&#xff1a; 思路&#xff1a; 可以理解为把原链表的每一段进行反转 把链表的每一段看成新链表&#xff0c;单独进行反转&#xff0c;然后再放回原链表 关键是截取k个节点、进行反转后&#xff0c;怎么再和原链表链接起来 我们把截取的…...

5-27 临摹大师-IP-Adapter

前言&#xff1a; 前一节我们主要介绍ControlNet中如何对黑白照片进行上色 主要介绍ControlNet中的IP-Adapter。这个也是一种类似的风格借鉴&#xff0c;类似Reference的能力。 当然IP-Adapter有两点或许可以吸引我们&#xff0c;一个是国人腾讯公司制作的。另一个在速度和效…...

MinIO的预签名直传机制

我们传统使用MinIo做OSS对象存储的应用方式往往都是在后端配置与MinIO的连接和文件上传下载的相关接口&#xff0c;然后我们在前端调用这些接口完成文件的上传下载机制&#xff0c;但是&#xff0c;当并发量过大&#xff0c;频繁访问会对后端的并发往往会对服务器造成极大的压力…...

树莓科技集团董事长:第五代产业园运营模式的深度剖析与展望​

第五代产业园运营模式&#xff0c;以创新为核心驱动&#xff0c;强调数字化、网络化和资源整合。树莓科技集团在这一领域具有代表性&#xff0c;其运营模式值得深入剖析。 核心特征 数字化转型&#xff1a;第五代产业园高度重视数字化技术的应用&#xff0c;通过构建数字化平…...

项目组织管理类型-职能式组织和矩阵式组织的区别

在职能式组织和矩阵式组织中&#xff0c;任务分配和人员安排确实有显著的不同&#xff0c;让我们通过以下例子来进一步解释&#xff1a; 职能式组织在职能式组织中&#xff0c;任务通常是根据部门的职能进行下达的。 例如&#xff0c;一家制造公司的组织结构如下&#xff1a; …...

树莓科技(成都)集团:如何铸就第五代产业园标杆

树莓科技&#xff08;成都&#xff09;集团铸就第五代产业园标杆&#xff0c;主要体现在以下几个方面&#xff1a; 精准定位与前瞻布局 树莓科技并非盲目扩张&#xff0c;而是精准锚定数字经济发展方向。以成都为起点&#xff0c;迅速构建起全国性的园区版图&#xff0c;体现…...

【Quest开发】手柄交互震动

软件&#xff1a;Unity 2022.3.51f1c1、vscode、Meta XR All in One SDK V72&#xff08;要提前导入哦&#xff09; 硬件&#xff1a;Meta Quest3 参考Meta开发文档&#xff1a;https://developers.meta.com/horizon/documentation/unity/unity-haptics-sdk-integrate 这篇官…...

《Transformer如何进行图像分类:从新手到入门》

引言 如果你对人工智能&#xff08;AI&#xff09;或深度学习&#xff08;Deep Learning&#xff09;感兴趣&#xff0c;可能听说过“Transformer”这个词。它最初在自然语言处理&#xff08;NLP&#xff09;领域大放异彩&#xff0c;比如在翻译、聊天机器人和文本生成中表现出…...

数字图像处理与Python语言实现-Box模糊CUDA实现

Box模糊CUDA实现 文章目录 Box模糊CUDA实现1、Box模糊的基本原理2、算法优化:滑动窗口技术3、参数对模糊效果的影响4、Box模糊的优缺点5、与高斯模糊的对比6、实际应用场景7、算法实现7.1 PyCUDA实现7.2 CuPy实现7.3 C++与CUDA实现8、总结在图像处理领域,**Box模糊(方框模糊…...

MAVEN解决版本依赖冲突

文章目录 一、依赖冲突概念1、什么是依赖冲突2、依赖冲突的原因3、如何解决依赖冲突 二、查看依赖冲突-maven-helper1、安装2、helper使用1、conflicts的阅读顺序&#xff08;从下向上看&#xff09;2、dependencies as List的阅读顺序&#xff08;从下向上看&#xff09;3、de…...

Compose 实践与探索五 —— AnimationSpec

不论是 animateXxxAsState() 还是 Animatable 的 animateTo() 都可以传入 AnimationSpec 以配置动画的规格&#xff1a; Composable fun animateDpAsState(targetValue: Dp,animationSpec: AnimationSpec<Dp> dpDefaultSpring,label: String "DpAnimation",…...

Embedding模型到底是什么?

嵌入模型&#xff08;Embedding Model&#xff09;是一种将高维数据映射到低维空间的工具&#xff0c;广泛应用于自然语言处理&#xff08;NLP&#xff09;、推荐系统和图像识别等领域。它的核心目标是将复杂的数据&#xff08;如文本、图像或用户行为&#xff09;转换为稠密的…...

数据结构(一)——绪论

一、数据结构的研究内容 1.数据的各种逻辑结构和物理结构&#xff0c;以及他们之间的相应关系 2.存储结构的方法&#xff0c;对每种结构定义相适应的各种运算 3.设计出相应的算法 4.分析算法的效率 二、数据结构的基本概念 1.数据&#xff08;data&#xff09;&#xff1a…...

VMware虚拟机网络连接模式介绍以及nat模式访问公网实践

在 VMware 虚拟机中&#xff0c;网络配置是非常重要的一部分。VMware 提供了三种主要的网络连接模式&#xff0c;分别是桥接模式&#xff08;Bridged&#xff09;、NAT模式&#xff08;NAT&#xff09; 和仅主机模式&#xff08;Host-Only&#xff09;。每种模式都有其特定的用…...

Selenium Manager和webdriver manager的区别与联系

一、引言 1.1 自动化测试的重要性 在现代软件开发流程中&#xff0c;自动化测试已经成为保证软件质量和提高交付效率的关键实践。随着软件开发周期的缩短和软件复杂性的增加&#xff0c;手工测试已无法满足快速迭代的需求。自动化测试能够快速、准确地执行重复性测试任务&…...

八叉树地图的原理与实现

八叉树与体素图 八叉树地图 八叉树地图是可变分辨率的三维栅格地图&#xff0c;可以自由调整分辨率&#xff0c;如下所示&#xff1a; 根据点云的数量或密度决定每个叶子方块是否被占据 体素图 体素就是固定分辨率的三维栅格地图&#xff0c;如下所示&#xff1a; 根据点云…...

DeepSeek模型本地化部署方案及Python实现

DeepSeek实在是太火了&#xff0c;虽然经过扩容和调整&#xff0c;但反应依旧不稳定&#xff0c;甚至小圆圈转半天最后却提示“服务器繁忙&#xff0c;请稍后再试。” 故此&#xff0c;本文通过讲解在本地部署 DeepSeek并配合python代码实现&#xff0c;让你零成本搭建自己的AI…...

【Linux】浅谈冯诺依曼和进程

一、冯诺依曼体系结构 冯诺依曼由 输入设备、输出设备、运算器、控制器、存储器 五部分组成。 冯诺依曼的设计特点 二进制表示 所有数据&#xff08;包括程序指令&#xff09;均以二进制形式存储和运算&#xff0c;简化了硬件逻辑设计&#xff0c;提高了可靠性。 存储程序原理…...

基于深度学习的多模态人脸情绪识别研究与实现(视频+图像+语音)

这是一个结合图像和音频的情绪识别系统&#xff0c;从架构、数据准备、模型实现、训练等。包括数据收集、预处理、模型训练、融合方法、部署优化等全流程。确定完整系统的组成部分&#xff1a;数据收集与处理、模型设计与训练、多模态融合、系统集成、部署优化、用户界面等。详…...

【蓝桥杯】第15届c++B组--R格式

问题描述 小蓝最近在研究一种浮点数的表示方法&#xff1a;RR 格式。对于一个大于 0 的浮点数 dd&#xff0c;可以用 RR 格式的整数来表示。给定一个转换参数 nn&#xff0c;将浮点数转换为 RR 格式整数的做法是: 将浮点数乘以 2n2n&#xff1b; 四舍五入到最接近的整数。 …...

【初阶三】认识C语言—下

【初阶三】认识C语言—下 1.函数2.数组3.操作符3.1算数操作符3.2移位操作符和位操作符3.3赋值操作符3.4单目操作符 4.常见关键字4.1关键字typedef4.2 关键字static 5. define定义常宏6.指针6.1内存6.2取地址操作符& 7.结构体 1.函数 函数就像一个工厂&#xff0c;通过输入原…...

【C#】使用DeepSeek帮助评估数据库性能问题,C# 使用定时任务,每隔一分钟移除一次表,再重新创建表,和往新创建的表追加5万多条记录

&#x1f339;欢迎来到《小5讲堂》&#x1f339; &#x1f339;这是《C#》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解。&#x1f339; &#x1f339;温馨提示&#xff1a;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01;&#…...

前端学习笔记(三)——ant-design vue表单传递数据到父页面

前言 善用AI&#xff0c;快速解决定位 原理 a-form所在的SFC&#xff08;单文件&#xff09;vue中需要将表单数据传递给父页面SFC文件中&#xff0c;使用emit方法 代码 子组件&#xff08;Form.vue&#xff09; <template><a-form submit"handleSubmit&qu…...

计算机视觉算法实战——驾驶员玩手机检测(主页有源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​ 1. 领域简介&#xff1a;玩手机检测的重要性与技术挑战 驾驶员玩手机检测是智能交通安全领域的核心课题。根据NHTSA数据&#xff0…...

C语言(23)

字符串函数 11.strstr函数 1.1函数介绍&#xff1a; 头文件&#xff1a;string.h char *strstr ( const char * str1,const char *str2); 作用&#xff1a;在一个字符串&#xff08;str1&#xff09;中寻找另外一个字符串&#xff08;str2&#xff09;是否出现过 如果找到…...

Python入门教程:从零开始学习Python编程

引言 Python是一种高级编程语言&#xff0c;因其简洁的语法和强大的功能而广受欢迎。无论你是编程新手&#xff0c;还是有经验的开发者&#xff0c;Python都是一个非常好的选择。本文将带你从零开始学习Python编程&#xff0c;涵盖基础语法、常用库以及一些实用的编程技巧。 目…...