Netty源码—10.Netty工具之时间轮二
大纲
1.什么是时间轮
2.HashedWheelTimer是什么
3.HashedWheelTimer的使用
4.HashedWheelTimer的运行流程
5.HashedWheelTimer的核心字段
6.HashedWheelTimer的构造方法
7.HashedWheelTimer添加任务和执行任务
8.HashedWheelTimer的完整源码
9.HashedWheelTimer的总结
10.HashedWheelTimer的应用
8.HashedWheelTimer的完整源码
//Netty时间轮
public class HashedWheelTimer implements Timer {static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();private static final int INSTANCE_COUNT_LIMIT = 64;private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");private final ResourceLeakTracker<HashedWheelTimer> leak;//指针转动和延时任务执行的线程private final Worker worker = new Worker();//worker任务封装的工作线程,用于指针转动和触发时间格里的延时任务的执行private final Thread workerThread;public static final int WORKER_STATE_INIT = 0;public static final int WORKER_STATE_STARTED = 1;public static final int WORKER_STATE_SHUTDOWN = 2;@SuppressWarnings({"unused", "FieldMayBeFinal"})private volatile int workerState;//0 - init, 1 - started, 2 - shut down//每个时间格的时间跨度,默认为100msprivate final long tickDuration;//时间轮(环形数组),HashedWheelBucket为每个时间格的槽private final HashedWheelBucket[] wheel;private final int mask;private final CountDownLatch startTimeInitialized = new CountDownLatch(1);//延时任务队列,队列中为等待被添加到时间轮的延时任务private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//保存已经取消的延时任务的队列private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();//记录当前的任务数private final AtomicLong pendingTimeouts = new AtomicLong(0);//最大的任务数private final long maxPendingTimeouts;//执行延时任务的线程池private final Executor taskExecutor;//工作线程启动时间private volatile long startTime;// 构造器 start //public HashedWheelTimer() {this(Executors.defaultThreadFactory());}public HashedWheelTimer(long tickDuration, TimeUnit unit) {this(Executors.defaultThreadFactory(), tickDuration, unit);}public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);}//使用默认的tickDuration(时间格跨度默认为100ms)和默认的ticksPerWheel(时间格总数默认为512)创建一个新的计时器(时间轮)public HashedWheelTimer(ThreadFactory threadFactory) {this(threadFactory, 100, TimeUnit.MILLISECONDS);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {this(threadFactory, tickDuration, unit, 512);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE);}//Creates a new timer.//@param threadFactory 创建线程的工厂//@param tickDuration 每格的时间间隔,默认100ms,0.1秒//@param unit 时间单位,默认为毫秒//@param ticksPerWheel 时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 //@param leakDetection 如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略//@param maxPendingTimeouts 最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1//@param taskExecutor 任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它//@throws NullPointerException if either of threadFactory and unit is null//@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");//2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算//3.初始化时间轮wheelwheel = createWheel(ticksPerWheel);//mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能mask = wheel.length - 1;//4.校验tickDuration和ticksPerWheel//Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);//防止溢出//tickDuration * ticksPerWheel必须小于Long.MAX_VALUEif (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));}//tickDuration不能小于1msif (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}//5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;//6.给时间轮中任务的最大数量maxPendingTimeouts赋值this.maxPendingTimeouts = maxPendingTimeouts;//7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}// 构造器 end //@Overrideprotected void finalize() throws Throwable {try {super.finalize();} finally {//This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. //If we have not yet shutdown then we want to make sure we decrement the active instance count.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();}}}//初始化时间轮环形数组//@param ticksPerWheelprivate static HashedWheelBucket[] createWheel(int ticksPerWheel) {//ticksPerWheel不能大于2^30checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");//将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//创建时间轮环形数组HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;}//将ticksPerWheel(时间轮上的时间格数)向上取值为2的次幂private static int normalizeTicksPerWheel(int ticksPerWheel) {int normalizedTicksPerWheel = 1;while (normalizedTicksPerWheel < ticksPerWheel) {normalizedTicksPerWheel <<= 1;}return normalizedTicksPerWheel;}//显式启动后台线程//即使没有调用此方法,后台线程也会按需自动启动//Starts the background thread explicitly. //The background thread will start automatically on demand even if you did not call this method.//@throws IllegalStateException if this timer has been #stop() stopped alreadypublic void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {//启动工作线程,即启动时间轮workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}//Wait until the startTime is initialized by the worker.while (startTime == 0) {try {//阻塞时间轮的工作线程startTimeInitialized.await();} catch (InterruptedException ignore) {//Ignore - it will be ready very soon.}}}@Overridepublic Set<Timeout> stop() {if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName());}if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {//workerState can be 0 or 2 at this moment - let it always be 2.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return Collections.emptySet();}try {boolean interrupted = false;while (workerThread.isAlive()) {workerThread.interrupt();try {workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return worker.unprocessedTimeouts();}//添加延时任务//@param task 任务//@param delay 延时时间//@param unit 延时时间单位@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");checkNotNull(unit, "unit");//1.将需要执行的延时任务数pendingTimeouts + 1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }//3.启动工作线程,即启动时间轮start();//将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)//在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket//4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}//5.创建延时任务实例HashedWheelTimeoutHashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);//6.将延时任务实例添加到延时任务队列中timeouts.add(timeout);return timeout;}//Returns the number of pending timeouts of this Timer.public long pendingTimeouts() {return pendingTimeouts.get();}private static void reportTooManyInstances() {if (logger.isErrorEnabled()) {String resourceType = simpleClassName(HashedWheelTimer.class);logger.error("You are creating too many " + resourceType + " instances. " +resourceType + " is a shared resource that must be reused across the JVM, " +"so that only a few instances are created.");}}//指针转动和延时任务执行的线程private final class Worker implements Runnable {//用于记录未执行的延时任务private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();//总的tick数(指针嘀嗒的次数)private long tick;@Overridepublic void run() {//1.记录时间轮启动的时间startTimestartTime = System.nanoTime();if (startTime == 0) {//我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0startTime = 1;}//2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕startTimeInitialized.countDown();//一直执行do while循环,直到时间轮被关闭do {//3.阻塞等待下一次指针转动的时间//这里会休眠tick的时间,模拟指针走动final long deadline = waitForNextTick();if (deadline > 0) {//4.计算当前指针指向的时间轮槽位idxint idx = (int) (tick & mask);//5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1processCancelledTasks();//6.获取当前指针指向的时间槽HashedWheelBucketHashedWheelBucket bucket = wheel[idx];//7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中transferTimeoutsToBuckets();//8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务//9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--bucket.expireTimeouts(deadline);//10.时间轮指针的总转动次数tick++tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);//Fill the unprocessedTimeouts so we can return them from stop() method.//11.清除时间轮中不需要处理的任务for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}//12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中//遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {//如果延时任务没被取消,记录到未执行的任务Set集合中unprocessedTimeouts.add(timeout);}}//13.处理被取消的任务processCancelledTasks();}//将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置//也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中private void transferTimeoutsToBuckets() {//每次转移10w个延时任务for (int i = 0; i < 100000; i++) {//从队列中出队一个延时任务HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {//all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {//Was cancelled in the meantime.continue;}//到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 long calculated = timeout.deadline / tickDuration;//tick已经走了的时间格,到期一共还需要需要走多少圈timeout.remainingRounds = (calculated - tick) / wheel.length;//如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行final long ticks = Math.max(calculated, tick);//槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);//根据索引该任务应该放到的槽HashedWheelBucket bucket = wheel[stopIndex];//将任务添加到槽中,链表末尾bucket.addTimeout(timeout);}}//处理取消掉的延时任务//将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {//all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}}//从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来private long waitForNextTick() {//deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔long deadline = tickDuration * (tick + 1);for (;;) {//计算当前时间距离启动时间的时间间隔final long currentTime = System.nanoTime() - startTime;//距离下一次指针跳动还需休眠多长时间long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//到了指针调到下一个槽位的时间if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}try {//表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}//记录未执行的延时任务public Set<Timeout> unprocessedTimeouts() {return Collections.unmodifiableSet(unprocessedTimeouts);}}//延时任务private static final class HashedWheelTimeout implements Timeout, Runnable {private static final int ST_INIT = 0;private static final int ST_CANCELLED = 1;private static final int ST_EXPIRED = 2;private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");private final HashedWheelTimer timer;private final TimerTask task;//任务执行的截止时间 = 当前时间 + 延时任务延时时间 - 时间轮启动时间private final long deadline;@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })private volatile int state = ST_INIT;//剩下的圈(轮)数//remainingRounds将由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正确的HashedWheelBucket之前计算和设置long remainingRounds;//HashedWheelTimerBucket槽中的延时任务列表是一个双向链表//因为只有workerThread会对它进行操作,所以不需要 synchronization / volatileHashedWheelTimeout next;HashedWheelTimeout prev;//当前延时任务所插入时间轮的哪个槽HashedWheelBucket bucket;HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {this.timer = timer;this.task = task;this.deadline = deadline;}@Overridepublic Timer timer() {return timer;}@Overridepublic TimerTask task() {return task;}@Overridepublic boolean cancel() {//only update the state it will be removed from HashedWheelBucket on next tick.if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}//If a task should be canceled we put this to another queue which will be processed on each tick.//So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way//we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.timer.cancelledTimeouts.add(this);return true;}void remove() {HashedWheelBucket bucket = this.bucket;if (bucket != null) {bucket.remove(this);} else {timer.pendingTimeouts.decrementAndGet();}}public boolean compareAndSetState(int expected, int state) {return STATE_UPDATER.compareAndSet(this, expected, state);}public int state() {return state;}@Overridepublic boolean isCancelled() {return state() == ST_CANCELLED;}@Overridepublic boolean isExpired() {return state() == ST_EXPIRED;}public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);}}}@Overridepublic void run() {try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}}@Overridepublic String toString() {final long currentTime = System.nanoTime();long remaining = deadline - currentTime + timer.startTime;StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: ");if (remaining > 0) {buf.append(remaining).append(" ns later");} else if (remaining < 0) {buf.append(-remaining).append(" ns ago");} else {buf.append("now");}if (isCancelled()) {buf.append(", cancelled");}return buf.append(", task: ").append(task()).append(')').toString();}}//存放HashedWheelTimeouts的桶//这些数据存储在一个类似于链表的数据结构中,允许轻松删除中间的hashedwheeltimeout//HashedWheelTimeout本身作为节点,因此不需要创建额外的对象//保存头结点和尾节点,方便于任务的提取和插入private static final class HashedWheelBucket {//头结点private HashedWheelTimeout head;//尾节点private HashedWheelTimeout tail;//Add HashedWheelTimeout to this bucket.public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}//Expire all HashedWheelTimeouts for the given deadline.public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;//遍历当前时间槽中的所有任务while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {//从链表中移除next = remove(timeout);if (timeout.deadline <= deadline) {//延时任务到期,执行延时任务timeout.expire();} else {//The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}//如果延时任务取消,从链表中移除} else if (timeout.isCancelled()) {next = remove(timeout);} else {//任务还没到期,剩余的轮数-1timeout.remainingRounds --;}//将指针放置到下一个延时任务上timeout = next;}}//删除槽中链表中的延时任务public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;//remove timeout that was either processed or cancelled by updating the linked-listif (timeout.prev != null) {timeout.prev.next = next;}if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {//if timeout is also the tail we need to adjust the entry tooif (timeout == tail) {tail = null;head = null;} else {head = next;}} else if (timeout == tail) {//if the timeout is the tail modify the tail to be the prev node.tail = timeout.prev;}//null out prev, next and bucket to allow for GC.timeout.prev = null;timeout.next = null;timeout.bucket = null;timeout.timer.pendingTimeouts.decrementAndGet();return next;}//Clear this bucket and return all not expired / cancelled Timeouts.public void clearTimeouts(Set<Timeout> set) {for (;;) {HashedWheelTimeout timeout = pollTimeout();if (timeout == null) {return;}if (timeout.isExpired() || timeout.isCancelled()) {continue;}set.add(timeout);}}//头结点移除private HashedWheelTimeout pollTimeout() {HashedWheelTimeout head = this.head;if (head == null) {return null;}HashedWheelTimeout next = head.next;if (next == null) {tail = this.head = null;} else {this.head = next;next.prev = null;}//null out prev and next to allow for GC.head.next = null;head.prev = null;head.bucket = null;return head;}}
}
9.HashedWheelTimer的总结
一.时间轮的转动是单线程
但是时间轮中每个时间槽里的延时任务则是由线程池来执行的。
二.延时任务保存到JVM中没有做宕机备份
系统重启时延时任务将会丢失,无法恢复任务进行重新调度。
三.时间轮调度器的时间精度不是很高
对于精度要求特别高的调度任务可能不太适合,因为时间轮的精度取决于时间格的跨度大小。
四.时间轮指针的转动是使用Sleep来完成等待的
10.HashedWheelTimer的应用
(1)时间轮的应用场景
一.Dubbo、Netty、Kafka、Redission等中间件都用到了时间轮机制
二.订单关闭、确认收货、批量定时数据更新等都可以采用时间轮机制
(2)心跳检测
心跳机制会每隔固定的时间发送一个心跳包来检测客户端和服务端的连接状态,客户端发送心跳包用来告诉服务器其还正常运行。
比如在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,默认的心跳间隔是60s。当Provider在3次心跳时间内没有收到心跳响应,会关闭连接通道。当Consumer在3次心跳时间内没有收到心跳响应,会进行重连。
在Dubbo的HeaderExchangeClient类中会向时间轮中提交该心跳任务:
一.发送心跳的时间轮
private static final HashedWheelTimer IDLE_CHECK_TIMER =new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
二.向时间轮中提交心跳任务
private void startHeartBeatTask(URL url) {//Client的具体实现决定是否启动该心跳任务if (!client.canHandleIdle()) {AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);//计算心跳间隔, 最小间隔不能低于1sint heartbeat = getHeartbeat(url);long heartbeatTick = calculateLeastDuration(heartbeat);//创建心跳任务this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);//提交到IDLE_CHECK_TIMER这个时间轮中等待执行, 等时间到了时间轮就会去取出该任务进行调度执行IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);}
}
(3)超时处理
在Dubbo中发起RPC调用时,通常会配置超时时间,当消费者调用服务提供者出现超时进行一定的逻辑处理。那么怎么检测任务调用超时了呢?我们可以利用定时任务。
每次发起RPC调用时创建一个Future,记录这个Future的创建时间与超时时间,后台有一个定时任务进行检测。当Future到达超时时间并且没有被处理时,就需要对这个Future执行超时逻辑处理。
(4)Redisson分布式锁续期
Redisson看门狗机制,通过时间轮定时给分布式锁续期。在获取锁成功后,Redisson会封装一个锁续期的延时任务放入到时间轮中。默认10秒检查一下,用于对获取到的锁进行续期,延长持有锁的时间。如果业务机器宕机了,那么续期的延时任务失效,也无法续期,锁会超时释放。
一.添加续期延时任务
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//这边newTimeout点进去发现就是往时间轮中提交了一个任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {//续期成功后继续调度, 又往时间轮中放一个续期任务renewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
二.lua续期代码
protected RFuture<Boolean> renewExpirationAsync(long threadId) {//通过lua脚本对锁进行续期return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}
相关文章:
Netty源码—10.Netty工具之时间轮二
大纲 1.什么是时间轮 2.HashedWheelTimer是什么 3.HashedWheelTimer的使用 4.HashedWheelTimer的运行流程 5.HashedWheelTimer的核心字段 6.HashedWheelTimer的构造方法 7.HashedWheelTimer添加任务和执行任务 8.HashedWheelTimer的完整源码 9.HashedWheelTimer的总结…...
Baklib激活企业知识管理新动能
Baklib核心技术架构解析 Baklib的底层架构以模块化设计为核心,融合知识中台的核心理念,通过分布式存储引擎与智能语义分析系统构建三层技术体系。数据层采用多源异构数据接入协议,支持文档、音视频、代码片段等非结构化数据的实时解析与分类…...
CSP-J/S冲奖第21天:插入排序
一、插入排序概念 1.1 生活中的类比 • 扑克牌排序:就像整理手中的扑克牌,每次将一张牌插入到已排好序的牌中合适位置 • 动态演示: 初始序列:[5, 2, 4, 6, 1, 3] 排序过程: → [2, 5, 4, 6, 1, 3] → [2, 4, 5, 6, …...
Jest系列二之基础实践
Jest基础实践 官方文档地址:https://jest.nodejs.cn/docs 生命周期 在 Jest 中,生命周期方法大致分为两类:下面所罗列的生命周期方法,也是全局方法,不需要引入,直接就可以使用。 重复性的生命周期方法&…...
Scikit-learn全攻略:从入门到工业级应用
Scikit-learn全攻略:从入门到工业级应用 引言:Scikit-learn在机器学习生态系统中的核心地位 Scikit-learn作为Python最受欢迎的机器学习库,已成为数据科学家的标准工具集。根据2023年Kaggle调查报告,超过83%的数据专业人士在日常工作中使用Scikit-learn。本文将系统性地介…...
基于Python的图书馆信息管理系统研发
标题:基于Python的图书馆信息管理系统研发 内容:1.摘要 在数字化信息快速发展的背景下,传统图书馆管理方式效率低下,难以满足日益增长的信息管理需求。本研究旨在研发一款基于Python的图书馆信息管理系统,以提高图书馆信息管理的效率和准确性…...
Pytorch学习笔记(十七)Image and Video - Adversarial Example Generation
这篇博客瞄准的是 pytorch 官方教程中 Image and Video 章节的 Adversarial Example Generation 部分。 官网链接:https://pytorch.org/tutorials/beginner/fgsm_tutorial.html 完整网盘链接: https://pan.baidu.com/s/1L9PVZ-KRDGVER-AJnXOvlQ?pwdaa2m 提取码: …...
基于Arm GNU Toolchain编译生成的.elf转hex/bin文件格式方法
基于Arm GNU Toolchain编译生成的.elf转hex/bin文件格式方法 已经弃用的版本(Version 10.3-2021.10):gcc-arm-none-eabi:https://developer.arm.com/downloads/-/gnu-rmArm GNU Toolchain当前版本:https://developer.a…...
Ubuntu系统Docker安装失败
问题: 1. 删除错误的 Docker 源 sudo rm -rf /etc/apt/sources.list.d/docker.list sudo rm -rf /etc/apt/keyrings/docker.gpg 2. 重新添加 Docker 官方 GPG 密钥 sudo mkdir -p /etc/apt/keyrings curl -fsSL https://download.docker.com/linux/ubuntu/gpg | …...
鸿蒙学习手册(HarmonyOSNext_API16)_数据持久化②:键值型数据库
概述 键值型数据库就像一个大抽屉柜,每个抽屉都有一个唯一的标签(键),里面可以放任何东西(值)。当你需要存或取东西时,直接看标签拿对应的抽屉就行,不用管其他抽屉里有什么。这种简…...
多线程 - 线程安全 2 -- > 死锁问题
目录 小结复习: 线程安全: 如何解决线程安全问题? synchronized “死锁” 死锁的三种经典场景: 1. 一个线程,一把锁。 2.两个线程,两把锁。 3. N 个线程 M 把锁 完! 小结复习:…...
JavaScript函数详解
目录 一、函数的基础概念 1. 函数的定义方式 2. 函数的参数处理 3.匿名函数与立即执行函数 4.同名函数与函数提升 二、函数的作用域与闭包 1. 作用域(Scope) 2. 闭包(Closure) 三、高阶函数与函数式编程 1. 高阶函数 2…...
Python-八股总结
目录 1 python 垃圾处理机制2 yield3 python 多继承,两个父类有同名方法怎么办?4 python 多线程/多进程/协程4.1 多线程与GIL全局解释器锁4.2 多进程4.3 协程 5 乐观锁/悲观锁6 基本数据结构**1. 列表(List)****2. 元组࿰…...
整合分块请求大模型返回的测试用例及小工具显示bug修复
在之前的分块发送需求数据给大模型进行测试用例生成时,由于数据结构的改变,需要对分块的回复进行整合,正确的整合是保障系统稳定性和功能正确性的核心。随着测试需求的复杂化,这对测试工程师提出了更高的整合和管理要求。本文将为…...
记一道CTF题—PHP双MD5加密+”SALT“弱碰撞绕过
通过分析源代码并找到绕过限制的方法,从而获取到flag! 部分源码: <?php $name_POST[username]; $passencode(_POST[password]); $admin_user "admin"; $admin_pw get_hash("0e260265122865008095838959784793");…...
stm32F103RCT6 FLASH模拟EEPROM 读写32位数据
#include “stm32flash.h” #ifndef __STMFLASH_H__ #define __STMFLASH_H__ #include "main.h" #define</...
Spring Data审计利器:@LastModifiedDate详解!!!
🕒 Spring Data审计利器:LastModifiedDate详解🔥 🌟 简介 在数据驱动的应用中,记录数据的最后修改时间是常见需求。Spring Data的LastModifiedDate注解让这一过程自动化成为可能!本篇带你掌握它的核心用法…...
【SLURM】介绍
SLURM Slurm(Simple Linux Utility for Resource Management) 是一个用于管理和调度计算集群任务的开源作业调度系统。它主要用于高性能计算(HPC)环境,比如超算中心、大学的计算集群或企业的数据中心。 本文主要针对使…...
算法-贪心算法
圣诞老人的礼物-Santa Clau’s Gifts 现在有多箱不同的糖果,每箱糖果有自己的价值和重量,每箱糖果都可以拆分成任意散装组合带走。圣 诞老人的驯鹿雪橇最多只能装下重量W的糖果,请 问圣诞老人最多能带走多大价值的糖果。 输入 第一行由两个…...
Nginx — Nginx处理Web请求机制解析
一、Nginx请求默认页面资源 1、配置文件详解 修改端口号为8080并重启服务: 二、Nginx进程模型 1、nginx常用命令解析 master进程:主进程(只有一个) worker进程:工作进程(可以有多个,默认只有一…...
GAN随手笔记
文章目录 1. description2. code 1. description 后续整理 GAN是生成对抗网络,主要由G生成器,D判别器组成,具体形式如下 D 判别器: G生成器: 2. code 部分源码,暂定,后续修改 import nump…...
Java 8 时区与历法处理指南:跨越全球的时间管理
Java 8 的 java.time API 不仅修复了旧版日期时间 API 的设计缺陷,还提供了对时区和多历法的全面支持。无论是处理全球化应用的时区转换,还是适配不同文化的日历系统,Java 8 都能轻松应对。本文将深入解析其核心功能,并提供实用代…...
【STM32】对stm32F103VET6指南者原理图详解(超详细)
目录 一、原理图基本概念二、STM32F103VET6 的主要特性二、MCU模块三、电源模块四、时钟模块五、复位模块NRST 六、GPIO模块LED 七、调试模块JTAG 八、外设模块UARTSPII2CADC 九、其它模块BOOT 一、原理图基本概念 原理图/电路图通常由硬件工程师使用Altium Designer/ KiCad / …...
瑞芯微RKRGA(librga)Buffer API 分析
一、Buffer API 简介 在瑞芯微官方的 librga 库的手册中,有两组配置 buffer 的API: importbuffer 方式: importbuffer_virtualaddr importbuffer_physicaladdr importbuffer_fd wrapbuffer 方式: wrapbuffer_virtualaddr wrapb…...
移动端六大语言速记:第1部分 - 基础语法与控制结构
移动端六大语言速记:第1部分 - 基础语法与控制结构 本文将对比Java、Kotlin、Flutter(Dart)、Python、ArkTS和Swift这六种移动端开发语言的基础语法与控制结构,帮助开发者快速理解各语言间的差异与共性。 1. 基础语法 1.1 数据类型 各语言的基本数据…...
Java 大视界 -- Java 大数据在智能金融区块链跨境支付与结算中的应用(154)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
Python Playwright库全面详解
Playwright 是 Microsoft 开发的一个现代化的端到端测试和浏览器自动化库,支持 Chromium、WebKit 和 Firefox 浏览器。它提供了跨浏览器、跨平台的自动化能力,且具有高性能和可靠性。 一、核心特性 多浏览器支持: Chromium (Chrome, Edge)We…...
脑疾病分类的疑惑【6】:脑疾病分类比较适合使用具有哪些特点的模型?
脑疾病分类是一个复杂的任务,涉及医学影像、神经电生理信号、基因数据等多种信息类型。为了有效地进行脑疾病分类,选择合适的模型是至关重要的。以下是一些适合脑疾病分类的模型特点,您可以参考这些特点来选择合适的模型: 1. 深度…...
24_原型和原型链_this
目录 一、this关键字 修改this的指向 二、原型和原型链 三、创建对象 通过构造函数创建 (es5) 通过类创建 (es6) 四、浅拷贝和深拷贝 ctrlc 浅拷贝: 只拷贝一层 深拷贝: 可以拷贝多层 一、this关键字 每个函…...
自定义类型:结构体(1)
1.结构体回顾 结构是一些值的集合,这些值被称为成员变量。结构的每个成员可以是不同类型的变量。 1.1结构的声明 struct tag {member-list; }variable-list;例如描述一个学生: struct Stu {char name[20];int age;char sex[5]; }; 1.2结构体变量的创…...
Java进阶——Lombok的使用
Lombok可以通过注解的方式,在编译时自动生成 getter、setter、构造函数、toString 等样板代码,从而减少代码的冗余,提高开发效率。本文深入讲解Lombok在实际开发中的使用。 本文目录 1. Lombok 依赖添加2. 常用Lombok注解及使用场景2.1 Gette…...
饿了么 bx-et 分析
声明: 本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 逆向分析 import requests bx_et re…...
python黑科技:无痛修改第三方库源码
需求不符合 很多时候,我们下载的 第三方库 是不会有需求不满足的情况,但也有极少的情况,第三方库 没有兼顾到需求,导致开发者无法实现相关功能。 如何通过一些操作将 第三方库 源码进行修改,是我们将要遇到的一个难点…...
PGD对抗样本生成算法实现(pytorch版)
PGD对抗样本生成算法 一、理论部分1.1 PGD 原理(1) 数学形式(2) 核心改进1.2 PGD 与其他攻击对比1.3 注意事项二、代码实现2.1 导包2.2 数据加载和处理2.3 网络构建2.4 模型加载2.5 生成对抗样本2.6 对抗测试2.7 启动攻击2.8 效果展示一、理论部分 1.1 PGD 原理 PGD 是 BIM/I-…...
小智机器人相关函数解析,BackgroundTask::Schedule (***)将一个回调函数添加到后台任务队列中等待执行
以下是对 BackgroundTask::Schedule 函数代码的详细解释: void BackgroundTask::Schedule(std::function<void()> callback) {std::lock_guard<std::mutex> lock(mutex_);if (active_tasks_ > 30) {int free_sram heap_caps_get_free_size(MALLOC_…...
C++学习之路:深入理解变量
目录 编程的本质变量的本质内存模型、变量名与值以及地址的关系数据类型C数据类型数据类型别名数据类型转换 变量作用域总结 编程的本质 编程的本质是什么?虽然程序里能实现很多复杂的逻辑,但是从底层的硬件上来看,编程的本质就是数据的搬移。…...
前端基础知识汇总
目录 HTML5详解(结构层) 什么是HTML HTML基本结构 网页基本信息 图像标签 链接标签 超链接 行内元素与块元素 列表标签 表格标签 页面结构分析 iframe内联框架 表单语法 表单元素格式 表单的简单应用 表单初级验证 CSS详解(…...
2024蓝桥杯省赛C/C++大学B组 题解
文章目录 2024蓝桥杯省赛C/C大学B组A 握手问题(5分)B 小球反弹(5分)C 好数(10分)D R 格式(10分)E 宝石组合(15分)F 数字接龙(15分)G 爬…...
BIM/I-FGSM对抗样本生成算法实现(pytorch版)
BIM/I-FGSM对抗样本生成算法 一、理论部分1.1 核心思想1.2 数学形式1.3 BIM 的优缺点1.4 BIM 与 FGSM、PGD 的关系1.5 实际应用建议二、代码实现2.1 导包2.2 数据加载和处理2.3 网络构建2.4 模型加载2.5 生成对抗样本2.6 攻击测试2.7 启动攻击2.8 效果展示一、理论部分 1.1 核心…...
前沿科技:从Gen2到Gen3—Kinova轻型机械臂的技术升级路径
Kinova品牌在轻型机械臂行业中以其轻量化、灵活性和高精度的技术特点而知名。其产品线广泛适用于医疗、科研和工业等多个领域,对机器人技术的进步起到了积极的推动作用。Kinova轻型机械臂凭借其精良的设计和稳定的性能,为用户提供了高效且可靠的解决方案…...
智研咨询:2025DeepSeek技术全景解析重塑全球AI生态的中国力量|附下载方法
导 读INTRODUCTION 随着人工智能技术的飞速发展,AI大模型已成为推动行业进步的重要力量。智研咨询最新发布的《DeepSeek技术全景解析重塑全球AI生态的中国力量》报告,深入探讨了DeepSeek公司在AI领域的突破性成就及其对全球AI生态的深远影响。 如果感兴…...
超导量子计算机编程实战:IBM Qiskit 2025新API详解
一、量子计算平台演进与Qiskit 2025定位 1.1 IBM量子硬件发展路线 2025年IBM将实现三大技术突破: 量子体积:新一代"Goldeneye"处理器达到QV 8192相干时间:超导量子比特寿命突破500μs互联规模:模块化架构支持万级量子…...
斐波那契数列----C语言
关于斐波那契 已知: 问题背景:一对兔子从第3个月开始每月生一对新兔子,新兔子同样在第3个月开始繁殖。 关键观察: 第1个月:1对(初始兔子)。 第2个月:1对(未成熟&#…...
打开pycharm显示编制索引后卡死
若项目中包含过多文件,PyCharm 启动后会进行自动索引,电脑性能不高时往往会导致崩溃(主要是内存问题)。以下为解决措施。 ✅ 1. 仅索引代码,排除文件 设置PyCharm 主要索引代码文件(.py、.ipynbÿ…...
AWS云安全全面详解:从基础防护到高级威胁应对
随着企业加速向云端迁移,AWS作为全球最大的云服务提供商之一,其安全性成为用户首要关注的问题。本文将深入剖析AWS云安全架构,从基础防护到高级威胁应对,帮助您构建全方位的云安全防线。 一、AWS安全责任共担模型 在深入探讨AWS具体安全措施前,首先需要理解AWS的安全责任…...
【C++重点】虚函数与多态
在 C 中,虚函数是实现多态的基础。多态是面向对象编程的重要特性之一,允许程序在运行时决定调用哪一个函数版本。通过虚函数,我们能够实现动态绑定,使得不同类型的对象可以通过相同的接口进行操作。 1 静态绑定与动态绑定 静态绑…...
算法学习之BFS
关于BFS我的理解是根据离我们当前这个点的权重来移动,这里权重也可以理解为离这个点的距离, 从起点开始,往前走一步,记录下所有第一步能走到的点开始,然后从所有第一部能走到的点开始向前走第二步,重复下去…...
每日小积累day1
网络: g是用来检测网络联通性的的诊断工具,使用的协议是ICMP 显示数据包括 ICMP数据:序列号,存活时间(TTL) 目标主机域名IP 往返时间(RTT) 统计数据(平均RTT等等&a…...
【NLP】13. NLP推理方法详解 --- 穷举和贪心搜索
NLP推理方法详解 — 穷举和贪心搜索 在自然语言处理(NLP)任务中,推理(Inference)是指在给定模型的情况下,找到最可能的输出序列。由于模型通常是神经网络,它会为每个可能的输出分配一个概率&am…...
基于 Python 深度学习 lstm 算法的电影评论情感分析可视化系统(2.0 系统全新升级,已获高分通过)
大家好,欢迎来到我的技术专栏!今天我将和大家聊聊如何利用 Python 的深度学习技术,打造一个集电影评论情感分析与可视化展示于一体的系统。这个系统不仅能自动采集和解析海量影评,还能实时生成直观的情感趋势图表,对于…...