redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接
Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式
Lettuce的核心功能包括:
- 高性能:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。
- 丰富的API:提供了丰富的Redis命令API,支持多种Redis数据类型和操作。
- 高级特性:支持命令批处理、事务、发布订阅等功能,并且可以适应不同的Redis数据类型和应用场景。
- 灵活性:支持多种Redis序列化器和编解码器,方便在不同场景下使用。
Lettuce的这些特性使得它成为了一个受欢迎的Redis客户端,广泛应用于各种需要高性能Redis交互的场景中。
Lettuce使用了Connection Watchdog(连接看门狗),用于管理和监控与远程服务器的连接。在网络通信中,Channel
代表了与远程服务的连接,当连接丢失或关闭时,Connection Watchdog 会自动尝试重新连接。它并不直接依赖于 channelActive()
来实现自动重连,而是负责在连接丢失时主动检测并安排重新连接的任务。
主要作用
-
监控连接状态:
ConnectionWatchdog
继承自ChannelInboundHandlerAdapter
,实现了 Netty 的ChannelHandler
接口。它在channelActive()
和channelInactive()
事件中插入了额外的逻辑。- 当连接激活时(
channelActive()
),它会初始化连接,清除之前的重连状态。 - 当连接关闭时(
channelInactive()
),它会检测连接是否已经关闭,并尝试重新连接。
-
自动重连:
- 自动调度重连:如果连接丢失,
ConnectionWatchdog
会在适当的时间间隔后安排一个新的重连尝试。 - 重连机制:它会基于预定义的重连延迟、尝试次数和其他条件,调度新的重连任务。重连操作是异步执行的,使用
reconnectWorkers
线程池来处理。 - 延迟处理:重连延迟使用
Delay
和StatefulDelay
管理,以确保每次重连尝试之间有适当的间隔,防止过于频繁的重连尝试。
- 自动调度重连:如果连接丢失,
-
连接恢复:
- 如果
channelInactive()
事件触发(即连接丢失),ConnectionWatchdog
会在重连条件满足时重新启动连接。它通过reconnectionHandler.reconnect()
来尝试重新建立连接。 - 重连失败事件:如果重连尝试失败,它会触发
ReconnectFailedEvent
事件,将失败信息发布到事件总线eventBus
,供其他组件处理。
- 如果
-
支持可配置的重连逻辑:
ConnectionWatchdog
提供了多种配置项来控制重连行为,如:- 重连延迟:使用
Delay
来管理每次重连之间的延迟。 - 重连调度:在连接丢失时,自动触发重连,且支持延迟、间隔等参数。
- 重连暂停:通过
setReconnectSuspended(true)
方法可以暂停重连尝试,避免在某些情况下自动重连。
- 重连延迟:使用
关键方法
-
channelActive(ChannelHandlerContext ctx)
:- 这个方法在
Channel
激活时调用。当连接建立成功时,会被触发。它会初始化一些内部状态,并清除之前的重连调度。
- 这个方法在
-
channelInactive(ChannelHandlerContext ctx)
:- 当
Channel
关闭时调用。如果连接丢失,ConnectionWatchdog
会根据当前配置来判断是否需要进行重连。 - 如果启用了重连监听(
listenOnChannelInactive
为true
),并且重连没有被暂停,它会调用scheduleReconnect()
来触发重连。
- 当
-
scheduleReconnect()
:- 用于调度下一次的重连尝试。它会检查当前连接是否有效,如果没有有效的连接(即连接丢失),则会安排在适当的延迟后尝试重新连接。
- 这个方法会使用
reconnectDelay
来计算每次重连之间的延迟时间。 - 重连尝试会通过
reconnectionHandler.reconnect()
来实际执行重连逻辑。
-
run(int attempt)
:- 这是执行实际重连的代码。如果
scheduleReconnect()
被调用,run()
会尝试重新建立连接。 - 如果重连成功,它会停止重连操作;如果失败,则发布
ReconnectFailedEvent
事件,并根据情况决定是否继续重连。
- 这是执行实际重连的代码。如果
为什么 channelActive()
不会自动重连?
在 Netty
中,channelActive()
只是一个通道激活事件。当一个连接成功建立时,channelActive()
会被触发,通常表示连接已经准备好进行数据传输。然而,channelActive()
事件本身并不处理连接丢失后的自动重连。
ConnectionWatchdog
的作用就是在连接丢失或关闭时自动安排重连任务。- 自动重连的原因:因为一旦连接丢失,
channelInactive()
会被触发。ConnectionWatchdog
会在channelInactive()
中判断是否启用重连逻辑,然后调度一个新的重连任务,确保在连接失败后能够尝试重新连接。 channelActive()
只关心通道的初始化,不能保证在通道关闭或掉线后自动恢复连接。ConnectionWatchdog
负责在通道断开后,通过一定的重连策略来确保连接恢复。
总结
ConnectionWatchdog
的作用是:
- 监控连接的生命周期,当连接丢失时触发重连。
- 管理重连过程,通过延迟和重连尝试次数来合理安排重连。
- 确保连接恢复,并在重连失败时通过事件总线通知其他组件。
因此,channelActive()
只是连接建立时的一个简单事件,而 ConnectionWatchdog
是负责监控连接丢失后自动重连的核心组件。
##看门狗源码
/** Copyright 2011-2019 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package io.lettuce.core.protocol;import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;/*** A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.** @author Will Glozer* @author Mark Paluch* @author Koji Lin*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);private final Delay reconnectDelay;private final Bootstrap bootstrap;private final EventExecutorGroup reconnectWorkers;private final ReconnectionHandler reconnectionHandler;private final ReconnectionListener reconnectionListener;private final Timer timer;private final EventBus eventBus;private Channel channel;private SocketAddress remoteAddress;private long lastReconnectionLogging = -1;private String logPrefix;private final AtomicBoolean reconnectSchedulerSync;private volatile int attempts;private volatile boolean armed;private volatile boolean listenOnChannelInactive;private volatile Timeout reconnectScheduleTimeout;/*** Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new* {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.** @param reconnectDelay reconnect delay, must not be {@literal null}* @param clientOptions client options for the current connection, must not be {@literal null}* @param bootstrap Configuration for new channels, must not be {@literal null}* @param timer Timer used for delayed reconnect, must not be {@literal null}* @param reconnectWorkers executor group for reconnect tasks, must not be {@literal null}* @param socketAddressSupplier the socket address supplier to obtain an address for reconnection, may be {@literal null}* @param reconnectionListener the reconnection listener, must not be {@literal null}* @param connectionFacade the connection facade, must not be {@literal null}* @param eventBus Event bus to emit reconnect events.*/public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier,ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus) {LettuceAssert.notNull(reconnectDelay, "Delay must not be null");LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");LettuceAssert.notNull(bootstrap, "Bootstrap must not be null");LettuceAssert.notNull(timer, "Timer must not be null");LettuceAssert.notNull(reconnectWorkers, "ReconnectWorkers must not be null");LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");LettuceAssert.notNull(reconnectionListener, "ReconnectionListener must not be null");LettuceAssert.notNull(connectionFacade, "ConnectionFacade must not be null");LettuceAssert.notNull(eventBus, "EventBus must not be null");this.reconnectDelay = reconnectDelay;this.bootstrap = bootstrap;this.timer = timer;this.reconnectWorkers = reconnectWorkers;this.reconnectionListener = reconnectionListener;this.reconnectSchedulerSync = new AtomicBoolean(false);this.eventBus = eventBus;Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr).onErrorResume(t -> {if (logger.isDebugEnabled()) {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress, t);} else {logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()+ ", reusing cached address " + remoteAddress);}return Mono.just(remoteAddress);});this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer,reconnectWorkers, connectionFacade);resetReconnectDelay();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt);if (evt instanceof ConnectionEvents.Activated) {attempts = 0;resetReconnectDelay();}super.userEventTriggered(ctx, evt);}void prepareClose() {setListenOnChannelInactive(false);setReconnectSuspended(true);Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {reconnectScheduleTimeout.cancel();}reconnectionHandler.prepareClose();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {reconnectSchedulerSync.set(false);channel = ctx.channel();reconnectScheduleTimeout = null;logPrefix = null;remoteAddress = channel.remoteAddress();logPrefix = null;logger.debug("{} channelActive()", logPrefix());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}/*** Enable {@link ConnectionWatchdog} to listen for disconnected events.*/void arm() {this.armed = true;setListenOnChannelInactive(true);}/*** Schedule reconnect if channel is not available/not active.*/public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {attempts++;final int attempt = attempts;int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {ConnectionWatchdog.this.run(attempt);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}/*** Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with* the same handler instances contained in the old channel's pipeline.** @param attempt attempt counter** @throws Exception when reconnection fails.*/public void run(int attempt) throws Exception {reconnectSchedulerSync.set(false);reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if (isReconnectSuspended()) {logger.debug("Skip reconnect scheduling, reconnect is suspended");return;}boolean shouldLog = shouldLog();InternalLogLevel infoLevel = InternalLogLevel.INFO;InternalLogLevel warnLevel = InternalLogLevel.WARN;if (shouldLog) {lastReconnectionLogging = System.currentTimeMillis();} else {warnLevel = InternalLogLevel.DEBUG;infoLevel = InternalLogLevel.DEBUG;}InternalLogLevel warnLevelToUse = warnLevel;try {reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();CompletableFuture<Channel> future = tuple.getT1();future.whenComplete((c, t) -> {if (c != null && t == null) {return;}CompletableFuture<SocketAddress> remoteAddressFuture = tuple.getT2();SocketAddress remote = remoteAddress;if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally()&& !remoteAddressFuture.isCancelled()) {remote = remoteAddressFuture.join();}String message = String.format("Cannot reconnect to [%s]: %s", remote,t.getMessage() != null ? t.getMessage() : t.toString());if (ReconnectionHandler.isExecutionException(t)) {if (logger.isDebugEnabled()) {logger.debug(message, t);} else {logger.log(warnLevelToUse, message);}} else {logger.log(warnLevelToUse, message, t);}eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remote, t, attempt));if (!isReconnectSuspended()) {scheduleReconnect();}});} catch (Exception e) {logger.log(warnLevel, "Cannot reconnect: {}", e.toString());eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remoteAddress, e, attempt));}}private boolean isEventLoopGroupActive() {if (!isEventLoopGroupActive(bootstrap.group()) || !isEventLoopGroupActive(reconnectWorkers)) {return false;}return true;}private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {return !(executorService.isShuttingDown());}private boolean shouldLog() {long quietUntil = lastReconnectionLogging + LOGGING_QUIET_TIME_MS;return quietUntil <= System.currentTimeMillis();}/*** Enable event listener for disconnected events.** @param listenOnChannelInactive {@literal true} to listen for disconnected events.*/public void setListenOnChannelInactive(boolean listenOnChannelInactive) {this.listenOnChannelInactive = listenOnChannelInactive;}public boolean isListenOnChannelInactive() {return listenOnChannelInactive;}/*** Suspend reconnection temporarily. Reconnect suspension will interrupt reconnection attempts.** @param reconnectSuspended {@literal true} to suspend reconnection*/public void setReconnectSuspended(boolean reconnectSuspended) {reconnectionHandler.setReconnectSuspended(reconnectSuspended);}public boolean isReconnectSuspended() {return reconnectionHandler.isReconnectSuspended();}ReconnectionHandler getReconnectionHandler() {return reconnectionHandler;}private void resetReconnectDelay() {if (reconnectDelay instanceof StatefulDelay) {((StatefulDelay) reconnectDelay).reset();}}private String logPrefix() {if (logPrefix != null) {return logPrefix;}String buffer = "[" + ChannelLogDescriptor.logDescriptor(channel) + ", last known addr=" + remoteAddress + ']';return logPrefix = buffer;}
}
##redis lettuce重新连接代码
-
重连处理 (
reconnect
和reconnect0
):reconnect()
方法会尝试重新连接 Redis 服务器。当远程地址发生变化时,它会尝试通过socketAddressSupplier
获取新的地址并发起连接。reconnect0()
方法执行实际的重连逻辑。它通过 Netty 的bootstrap.connect(remoteAddress)
发起连接,并通过ChannelFuture
来管理连接的异步状态。- 如果连接失败,会通过
ChannelFutureListener
监听连接结果,执行相关的失败处理(如关闭通道、记录异常等)。
-
连接逻辑(Netty 的
bootstrap.connect()
):- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
ChannelFuture
和ChannelPromise
的使用,这些都是 Netty 中用于异步连接、处理连接结果的关键工具。 bootstrap.connect(remoteAddress)
是用来发起连接的核心方法。它返回一个ChannelFuture
,通过这个ChannelFuture
可以监听连接成功与否的结果。
- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
-
连接超时处理:
- 在重连的过程中,代码实现了一个超时机制(
TimeoutException
)。如果重连操作超时,它会取消连接操作,并触发异常。 - 通过
eventLoop().schedule()
来设定连接超时。
- 在重连的过程中,代码实现了一个超时机制(
-
Channel
的初始化与配置:- 在连接成功之后,会通过
RedisChannelInitializer
初始化通道的处理流水线(ChannelPipeline
)。如果初始化失败,会进行相应的失败处理,包括重置连接、关闭连接等。 - 如果连接成功,则会执行一些调试输出和状态更新。
- 在连接成功之后,会通过
-
错误处理与异常捕获:
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
reconnect0
中,如果连接失败或初始化失败,都会通过completeExceptionally()
完成CompletableFuture
,确保连接错误能够被外部捕获。
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
-
使用
CompletableFuture
:- 重连操作通过
CompletableFuture
来管理异步结果。CompletableFuture<Channel>
用来表示连接是否成功,CompletableFuture<SocketAddress>
用来表示地址解析的结果。
- 重连操作通过
关键部分的 Netty 连接代码:
-
连接过程中的异步操作:
bootstrap.connect(remoteAddress)
返回一个ChannelFuture
,表示异步连接操作,addListener()
用来监听连接的结果。
-
异常处理与重试机制:
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
ChannelPipeline
,并进行后续的操作。
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
-
超时处理:
TimeoutException
用来在连接超时后进行错误处理。
##网络连接源码如下
Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();
protected Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> reconnect() {CompletableFuture<Channel> future = new CompletableFuture<>();CompletableFuture<SocketAddress> address = new CompletableFuture<>();socketAddressSupplier.subscribe(remoteAddress -> {address.complete(remoteAddress);if (future.isCancelled()) {return;}reconnect0(future, remoteAddress);}, ex -> {if (!address.isDone()) {address.completeExceptionally(ex);}future.completeExceptionally(ex);});this.currentFuture = future;return Tuples.of(future, address);}
##reconnect0重连
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {ChannelFuture connectFuture = bootstrap.connect(remoteAddress);ChannelPromise initFuture = connectFuture.channel().newPromise();logger.debug("Reconnecting to Redis at {}", remoteAddress);result.whenComplete((c, t) -> {if (t instanceof CancellationException) {connectFuture.cancel(true);initFuture.cancel(true);}});initFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {connectFuture.cancel(true);close(it.channel());result.completeExceptionally(it.cause());} else {result.complete(connectFuture.channel());}});connectFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {initFuture.tryFailure(it.cause());return;}ChannelPipeline pipeline = it.channel().pipeline();RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);if (channelInitializer == null) {initFuture.tryFailure(new IllegalStateException("Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));return;}channelInitializer.channelInitialized().whenComplete((state, throwable) -> {if (throwable != null) {if (isExecutionException(throwable)) {initFuture.tryFailure(throwable);return;}if (clientOptions.isCancelCommandsOnReconnectFailure()) {connectionFacade.reset();}if (clientOptions.isSuspendReconnectOnProtocolFailure()) {logger.error("Disabling autoReconnect due to initialization failure", throwable);setReconnectSuspended(true);}initFuture.tryFailure(throwable);return;}if (logger.isDebugEnabled()) {logger.info("Reconnected to {}, Channel {}", remoteAddress,ChannelLogDescriptor.logDescriptor(it.channel()));} else {logger.info("Reconnected to {}", remoteAddress);}initFuture.trySuccess();});});Runnable timeoutAction = () -> {initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",timeout, timeoutUnit)));};Timeout timeoutHandle = timer.newTimeout(it -> {if (connectFuture.isDone() && initFuture.isDone()) {return;}if (reconnectWorkers.isShutdown()) {timeoutAction.run();return;}reconnectWorkers.submit(timeoutAction);}, this.timeout, timeoutUnit);initFuture.addListener(it -> timeoutHandle.cancel());}
##netty Bootstrap网络连接
public ChannelFuture connect(SocketAddress remoteAddress) {ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");validate();return doResolveAndConnect(remoteAddress, config.localAddress());}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {if (!regFuture.isSuccess()) {return regFuture;}return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}
##
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {try {final EventLoop eventLoop = channel.eventLoop();final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.doConnect(remoteAddress, localAddress, promise);return promise;}final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();if (resolveFailureCause != null) {// Failed to resolve immediatelychannel.close();promise.setFailure(resolveFailureCause);} else {// Succeeded to resolve immediately; cached? (or did a blocking lookup)doConnect(resolveFuture.getNow(), localAddress, promise);}return promise;}// Wait until the name resolution is finished.resolveFuture.addListener(new FutureListener<SocketAddress>() {@Overridepublic void operationComplete(Future<SocketAddress> future) throws Exception {if (future.cause() != null) {channel.close();promise.setFailure(future.cause());} else {doConnect(future.getNow(), localAddress, promise);}}});} catch (Throwable cause) {promise.tryFailure(cause);}return promise;}
##
private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}
##
@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise);}
##
@Overridepublic ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}
##
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}
##
@Overridepublic void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) {unsafe.connect(remoteAddress, localAddress, promise);}
##
@Overridepublic final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}try {if (connectPromise != null) {// Already a connect in process.throw new ConnectionPendingException();}boolean wasActive = isActive();if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);} else {connectPromise = promise;requestedRemoteAddress = remoteAddress;// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);}connectPromise = null;close(voidPromise());}}});}} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));closeIfClosed();}}
##socket连接工具
@Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {doBind0(localAddress);}boolean success = false;try {boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}
##socketChannel连接远程地址 socketChannel.connect(remoteAddress)
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)throws IOException {try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws IOException {return socketChannel.connect(remoteAddress);}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}}
相关文章:
redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接
Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式 Lettuce的核心功能包括: 高性能:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。丰富的API:提供了丰富…...
【PyTorch】实现在训练过程中自定义动态调整学习率
问题描述: 在使用 PyTorch 训练自定义神经网络时,我们希望能够动态地调整学习率,以便在训练过程中逐渐优化模型,避免过拟合并加速收敛。那么,如何在 PyTorch 中实现这一功能呢? 解决方案: 在训…...
【Flink-scala】DataStream编程模型总结
系列文章目录 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scala】DataStream编程模型之水位线 5.【…...
语音芯片赋能可穿戴设备:开启个性化音频新体验
在科技日新月异的今天,语音芯片与可穿戴设备的携手合作,正引领我们步入一个前所未有的个性化音频时代。这一创新融合,用户可以享受到更加个性化、沉浸式的音频体验。下面将详细介绍语音芯片与可穿戴设备合作的优点和具体应用。 1. 定制化音效…...
JavaFX使用jfoenix的UI控件
jfoenix还是一个不错的样式,推荐使用,而且也可以支持scene builder中的拖拖拽拽 需要注意的是过高的javafx版本可能会使得某些样式或控件无法使用 比如alert控件,亲测javaFX 19版本可以正常使用 1.在pom.xml中引入依赖 GitHub地址https://gi…...
SpringBoot集成ENC对配置文件进行加密
在线MD5生成工具 配置文件加密,集成ENC 引入POM依赖 <!-- ENC配置文件加密 --><dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><version>2.1.2</ver…...
基于AI对话生成剧情AVG游戏
游戏开发这个领域,一直有较高的学习门槛。作为一个非专业的游戏爱好者,如果想要开发游戏,往往受制于游戏引擎的专业程度,难以完成复杂的游戏项目。 AI IDE的诞生,提供了另外的一种思路,即通过AI 生成项目及…...
MAVEN--Maven的生命周期,pom.xml详解,Maven的高级特性(模块化、聚合、依赖管理)
目录 (一)Maven的生命周期 1.Maven的三套生命周期 2.Maven常用命令 (二)pom.xml详解 (三)Maven的高级特性(模块化、聚合、依赖管理) 1.Maven的依赖范围 2.版本维护 3.依赖传…...
SpringBoot的事务钩子函数
如果需要在A方法执行完成之后做一个不影响主方法运行的动作B,我们需要判断这个A方法是否存在事务,并且使用异步执行动作B; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transa…...
uniapp滚动消息列表
两个相同的循环列表,循环滚动 <view class"winners_list uni-flex uni-column" :animation"animationData"><view v-for"(item, index) in winnersList" :key"index" class"li uni-flex uni-column"&g…...
基于python对pdf文件进行加密等操作
利用python对pdf文件进行操作 读取pdf-源码 import PyPDF2 # 读取pdf格式的文件 reader PyPDF2.PdfFileReader(示例文件/aaa.pdf) print(reader)# 读取指定页面的文件 page reader.getPage(0) # 输出当前页面的文本数据 print(page.extractText())读取pdf-源码解析 这段代…...
Three.js材质纹理扩散过渡
Three.js材质纹理扩散过渡 import * as THREE from "three"; import { ThreeHelper } from "/src/ThreeHelper"; import { LoadGLTF, MethodBaseSceneSet } from "/src/ThreeHelper/decorators"; import { MainScreen } from "/src/compone…...
【Leetcode 每日一题 - 扩展】45. 跳跃游戏 II
问题背景 给定一个长度为 n n n 的 0 0 0 索引 整数数组 n u m s nums nums。初始位置为 n u m s [ 0 ] nums[0] nums[0]。 每个元素 n u m s [ i ] nums[i] nums[i] 表示从索引 i i i 向前跳转的最大长度。换句话说,如果你在 n u m s [ i ] nums[i] nums[i…...
被裁20240927 --- YOLO 算法
背景 在云端部署ViSP,ViSP实现视觉伺服、yolo实现视觉跟踪。 开源的2d视觉跟踪算法有哪些? 开源的2D视觉跟踪算法有很多呢,这里给你推荐一些比较知名和常用的吧。 ByteTrackV2:这是一个通用2D跟踪算法,提出了分层的…...
AI技术架构:从基础设施到应用
人工智能(AI)的发展,正以前所未有的速度重塑我们的世界。了解AI技术架构,不仅能帮助我们看懂 AI 的底层逻辑,还能掌握其对各行业变革的潜力与方向。 一、基础设施层:AI 技术的坚实地基 基础设施层是 AI 技…...
植物大战僵尸辅助【控制台版本】
前面介绍了使用CE和OD的简单使用:CE和OD介绍和使用CE查找阳光的教学:阳光基地址和偏移地址,下面先使用最简单的控制台程序来实现修改阳光的功能。 项目地址 1.分析程序 我们的控制台程序想要修改植物大战僵尸游戏内的数据,它们…...
css中样式前加 css样式前面加个圆点
创建CSS样式,样式名称的前面需要加什么 1、我们只知道符号代表的意思是at,其翻译是 在... 例如media就是 在媒介上。没人规定本身具有什么意义,或者说就算规定了我们也改变不了,只需要知道其规定属性的用法即可。 2、px;}然后根据你自己索要…...
算法刷题Day18: BM41 输出二叉树的右视图
题目链接 描述 思路: 递归构造二叉树在Day15有讲到。复习一下,就是使用递归构建左右子树。将中序和前序一分为二。 接下来是找出每一层的最右边的节点,可以利用队列层次遍历。 利用队列长度记录当前层有多少个节点,每次从队列里…...
如何实现规范化LabVIEW编程
规范编写LabVIEW程序的目的是提高代码的可读性、可维护性、可扩展性,并确保团队成员能够高效地理解和修改代码。以下是一些关键建议,帮助您编写更专业的LabVIEW代码,并确保它易于后续的升级和维护: 1. 合理的项目结构 目录结构…...
TQ15EG开发板教程:使用SSH登录petalinux
本例程在上一章“创建运行petalinux2019.1”基础上进行,本例程将实现使用SSH登录petalinux。 将上一章生成的BOOT.BIN与imag.ub文件放入到SD卡中启动。给开发板插入电源与串口,注意串口插入后会识别出两个串口号,都需要打开,查看串…...
Springboot+vue实现大文件上传
背景:为了实现大文件上传的功能 1新建数据表sql file_chunk CREATE TABLE file_chunk (id bigint UNSIGNED NOT NULL AUTO_INCREMENT,file_name varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci NULL DEFAULT NULL COMMENT 文件名,chunk_nu…...
Linux笔记
常用的基本命令 查询某个安装包有没有安装某个软件 使用的命令是rpm -qa |grep 软件名字 卸载软件 rpm -e --nodeps 软件名称 查看已经启动的服务 netstat -tunlp 一般我们在Linux系统中上传文件一般上传到 /usr/local/src的目录下 查看防火墙的命令 firewall-cmd --sta…...
相机(Camera)成像原理详解
简介:个人学习分享,如有错误,欢迎批评指正。 成像流程 1、光学相机的定义 顾名思义,光学相机就是利用光学原理进行成像的相机,而且市面上的相机几乎都是光学相机,只不过随着时代的发展,胶卷式…...
计算机网络知识点全梳理(一.TCP/IP网络模型)
目录 TCP/IP网络模型概述 应用层 什么是应用层 应用层功能 应用层协议 传输层 什么是传输层 传输层功能 传输层协议 网络层 什么是网络层 网络层功能 网络层协议 数据链路层 什么是数据链路层 数据链路层功能 物理层 物理层的概念和功能 写在前面 本系列文…...
后端接受前端传递数组进行批量删除
问题描述:当我们需要做批量删除功能的时候,我们循环单次删除的接口也能进行批量删除,但要删除100条数据就要调用100次接口,或者执行100次sql,这样系统开销是比较大的,那么我们直接采用接收的数组格式数据sq…...
理解数据结构 hashtable的简易理解思路
结构图 为了方便演示,下图中分区算法为下标取模 private int hashFun(int id) {//使用 hash并取模return id % size;}Hashtable的结构如图所示:是一个数组(元素为各个链表的表头) 多个链表组成,也就是说 hashtable 结…...
大数据面试题--企业面试真题
大数据面试题--企业面试真题 PlanHub 点击访问获取: 大数据面试体系专栏_酷兜科技www.kudoumh.top/hlwai/85.html 点击访问获取: 大数据面试体系专栏_酷兜科技www.kudoumh.top/hlwai/85.html 大数据面试题汇总 HDFS 1、 HDFS 读写流程。 2、HDF…...
数据结构(C语言版)-6.查找
1. 查找的基本概念 2. 静态查找 2.1 顺序查找 typedef int KeyType; typedef int InfoType; typedef struct {KeyType key;InfoType otherdata; }SeqList; // 顺序表类型 // 顺序查找int SeqSearch(SeqList R[], int n, int k) {int i n;R[0].key k; // R[0].key为查找不成…...
RabbitMQ消息队列的笔记
Rabbit与Java相结合 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 在配置文件中编写关于rabbitmq的配置 rabbitmq:host: 192.168.190.132 /…...
linux不同发行版中的主要差异
一、初始化系统 Linux不同发行版中的系统初始化系统(如 System V init、Upstart 或 systemd) System V init: 历史:System V init 是最传统的 Linux 系统初始化系统,起源于 Unix System V 操作系统。运行级别ÿ…...
Elasticsearch+Kibana分布式存储引擎
1.ElaticSearch介绍 ElaticSearch ,简称为 ES , ES 是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检 索数据;本身扩展性很好,可以扩展到上百台服务器,处理 PB 级别的数据。 ES 也使用 …...
spark 分布式 原理
Apache Spark 是一个快速且通用的大数据处理引擎,它支持分布式计算。Spark 的设计旨在通过高效的内存内计算和对多种数据源的支持来简化大规模数据集的处理。以下是关于 Spark 分布式原理的详细介绍: 1. 架构概述 Driver Program(驱动程序&…...
Hadoop学习笔记(包括hadoop3.4.0集群安装)(黑马)
Hadoop学习笔记 0-前置章节-环境准备 0.1 环境介绍 配置环境:hadoop-3.4.0,jdk-8u171-linux-x64 0.2 VMware准备Linux虚拟机 0.2.1主机名、IP、SSH免密登录 1.配置固定IP地址(root权限) 开启master,修改主机名为…...
thinkphp:try-catch捕获异常
使用简单的例子,实现了一个简单的try-catch捕获异常的实例 //开始事务Db::startTrans(); try{ //有异常抛出异常 if(存在错误){ throw new \Exception("异常信息"); } // 提交事务 Db::commit(); // 返回成功信息 ... } catch (\…...
如何使用 uni-app 构建直播应用程序?
使用uni-app构建直播应用程序涉及前端和后端的开发,以及音视频处理技术的选择。下面我将概述一个典型的直播应用架构,并详细说明如何在uni-app中实现关键功能。 直播应用架构 前端(uni-app):负责用户界面展示、互动逻…...
正则表达式入门教程
正则表达式入门教程 1. 引言 正则表达式(Regular Expression,简称Regex)是一种用于处理字符串的强大工具,它允许用户通过特定的模式(pattern)来搜索、匹配、查找和替换文本中的数据。正则表达式在文本处理、数据验证、数据提取等领域有着广泛的应用。本教程将带你了解正…...
uniapp 常用的指令语句
uniapp 是一个使用 Vue.js 开发的跨平台应用框架,因此,它继承了 Vue.js 的大部分指令。以下是一些在 uniapp 中常用的 Vue 指令语句及其用途: v-if / v-else-if / v-else 条件渲染。v-if 有条件地渲染元素,v-else-if 和 v-else 用…...
【Figma_01】Figma软件初始与使用
Figma初识与学习准备 背景介绍软件使用1.1 切换主题1.2 官方社区 设计界面2.1 创建一个项目2.2 修改文件名2.3 四种模式2.4 新增界面2.5 图层2.6 工具栏2.7 属性栏section透明度和圆角改变多边形的边数渐变效果描边设置阴影等特效拖拽相同的图形 背景介绍 Ul设计:User Interfa…...
AI工具如何深刻改变我们的工作与生活
在当今这个科技日新月异的时代,人工智能(AI)已经从科幻小说中的概念变成了我们日常生活中不可或缺的一部分。从智能家居到自动驾驶汽车,从医疗诊断到金融服务,AI正以惊人的速度重塑着我们的世界。 一、工作方式的革新…...
信息安全实训室网络攻防靶场实战核心平台解决方案
一、引言 网络安全靶场,作为一种融合了虚拟与现实环境的综合性平台,专为基础设施、应用程序及物理系统等目标设计,旨在向系统用户提供全方位的安全服务,涵盖教学、研究、训练及测试等多个维度。随着网络空间对抗态势的日益复杂化…...
平方根无迹卡尔曼滤波(SR-UKF)的MATLAB例程,使用三维非线性的系统
本MATLAB 代码实现了平方根无迹卡尔曼滤波(SR-UKF)算法,用于处理三维非线性状态估计问题 文章目录 运行结果代码概述代码 运行结果 三轴状态曲线对比: 三轴误差曲线对比: 误差统计特性输出(命令行截图&…...
【从零开始入门unity游戏开发之——C#篇04】栈(Stack)和堆(Heap),值类型和引用类型,以及特殊的引用类型string,垃圾回收( GC)
文章目录 知识回顾一、栈(Stack)和堆(Heap)1、什么是栈和堆2、为什么要分栈和堆3、栈和堆的区别栈堆 4、总结 二、值类型和引用类型1、那么值类型和引用类型到底有什么区别呢?值类型引用类型 2、总结 三、特殊的引用类…...
人员离岗监测摄像机智能人员睡岗、逃岗监测 Python 语言结合 OpenCV
在安全生产领域,人员的在岗状态直接关系到生产流程的顺利进行和工作环境的安全稳定。人员离岗监测摄像机的出现,为智能人员睡岗、逃岗监测提供了高效精准的解决方案,而其中的核心技术如AI识别睡岗脱岗以及相关的算法盒子和常见的安全生产AI算…...
Linux-ubuntu点LED灯C语言版
一,C语言点灯 1.寄存器配置 设置为SVC模式,复用寄存器设置GPIO1-IO003,设置电气属性,设置为输出模式。 2.软件 汇编语言对模式设置,并且将堆栈指针指向主程序: .global _start_start: /*设置为svr模式 */mrs …...
第P3周:Pytorch实现天气识别
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 目标 读取天气图片,按文件夹分类搭建CNN网络,保存网络模型并加载模型使用保存的模型预测真实天气 具体实现 (一…...
代理IP与生成式AI:携手共创未来
目录 代理IP:网络世界的“隐形斗篷” 1. 隐藏真实IP,保护隐私 2. 突破网络限制,访问更多资源 生成式AI:创意与效率的“超级大脑” 1. 提高创作效率 2. 个性化定制 代理IP与生成式AI的协同作用 1. 网络安全 2. 内容创作与…...
函数式编程
Lambda表达式 1、什么时候可以使用Lambda表达式呢? 一般都是在简化匿名内部类,当这个函数是一个接口,并且接口中只要一个方法时,就可以使用Lambda表达式 2、格式 (参数列表)->{方法体} 其中形参也不需要传,只需要传实参 只关注参数列表和方法体不关注方法啥的东西…...
抖音SEO短视频矩阵源码系统开发分享
在数字营销的前沿阵地,抖音短视频平台凭借其独特的魅力和庞大的用户基础,已成为社交媒体领域一股不可小觑的力量。随着平台影响力的持续扩大,如何有效提升视频内容的可见度与流量成为了内容创作者关注的焦点。在此背景下,一套专为…...
常见的锁与线程安全
目录 一、STL,智能指针和线程安全 STL中的容器是否是线程安全的? 智能指针是否是线程安全的? 二、其他常见的各种锁 三、自旋锁 四、读者写者问题 读写锁接口 读者优先伪代码 一、STL,智能指针和线程安全 STL中的容器是否是线程安全的? 不是 . 原因是 , STL 的设…...
java中的List、数组和set
在Java中,List、数组(Array)和Set 是三种常用的数据结构,它们各自有不同的特性、用途和实现方式。下面我们将深入探讨这三者的特点、区别以及它们在 Java 中的常见使用场景。 1. 数组(Array) 特性&#x…...