当前位置: 首页 > news >正文

Java自定义IO密集型和CPU密集型线程池

文章目录

  • 前言
  • 线程池各类场景描述
  • 常见场景案例设计思路
    • 公共类
      • 自定义工厂类-MyThreadFactory
      • 自定义拒绝策略-RejectedExecutionHandlerFactory
      • 自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)
    • 场景1:CPU密集型场景
      • 思路&计算公式
      • 实现代码
    • 场景2:IO密集型场景
      • 思路&计算公式
      • 实现代码
  • 其他部分组成
    • 拒绝策略兜底方案
      • 思路设计及思考
      • 设计1:数据库持久化方案
      • 设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)
      • 设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)
      • 设计4:dubbo设计思路(dump文件+抛出异常)
      • 设计5: 自定义设计-阻塞入队
  • 参考文章

稿定智能设计202502031721

前言

本章节配套源码:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo10

线程池各类场景描述

**类型场景:**不同的场景设置参数也各不相同

  • 第一种:CPU密集型:最大线程数应该等于CPU核数+1,这样最大限度提高效率。
// 通过该代码获取当前运行环境的cpu核数
Runtime.getRuntime().availableProcessors();
  • **第二种:**IO密集型:主要是进行IO操作,执行IO操作的时间较长,这时cpu出于空闲状态,导致cpu的利用率不高。线程数为2倍CPU核数。当其中的线程在IO操作的时候,其他线程可以继续用cpu,提高了cpu的利用率。

  • 第三种:混合型:如果CPU密集型和IO密集型执行时间相差不大那么可以拆分;如果两种执行时间相差很大,就没必要拆分了。

  • **第四种(了解):**在IO优化中,线程等待时间所占比越高,需要线程数越多;线程cpu时间占比越高,需要越少线程数。

线程池初始化所有参数:

corePoolSize : 核心线程数,当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大线程数,线程池中允许的线程数量的最大值。
keepAliveTime : 线程空闲时间,当线程池中的线程数大于 corePoolSize 时,多余的空闲线程将在销毁之前等待新任务的最长时间。
workQueue : 任务队列
unit : 线程空闲时间的单位。
threadFactory : 线程工厂,线程池创建线程时使用的工厂。
handler : 拒绝策略,因达到线程边界和任务队列满时,针对新任务的处理方法。CallerRunsPolicy:由提交任务的线程直接执行任务,避免任务丢失。适合任务量波动较大的场景。AbortPolicy:直接抛出 RejectedExecutionException 异常。适合任务量可控的场景。DiscardPolicy:静默丢弃任务,不抛出异常。适合对任务丢失不敏感的场景。DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新尝试提交当前任务。适合对任务时效性要求较高的场景。

核心线程池execute逻辑代码:

public void execute(Runnable command) {//任务判空if (command == null)throw new NullPointerException();//查看当前运行的线程数量int c = ctl.get();//若小于核心线程则直接添加一个工作线程并执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果线程数等于核心线程数则尝试将任务入队if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//入队失败,调用addWorker参数为false,尝试创建应急线程处理突发任务else if (!addWorker(command, false))//如果创建应急线程失败,说明当前线程数已经大于最大线程数,这个任务只能拒绝了reject(command);}

image-20250201004416120


常见场景案例设计思路

公共类

image-20250202175504266

自定义工厂类-MyThreadFactory

MyThreadFactory.java:自定义了线程池工厂类,可以自行进行命名

package demo10;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程池工厂类*/
public class MyThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String factoryName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = factoryName + "-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}

自定义拒绝策略-RejectedExecutionHandlerFactory

RejectedExecutionHandlerFactory.java:包含有多种拒绝策略,其中包含本次需要使用的阻塞入队拒绝策略

package demo10;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;/*** 拒绝策略工厂类**/
@Slf4j
public class RejectedExecutionHandlerFactory {private static final AtomicLong COUNTER = new AtomicLong();/*** 拒绝执行,抛出 RejectedExecutionException* @param source name for log* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newAbort(String source) {return (r, e) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + source);};}/*** 直接丢弃该任务* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newDiscard(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);};}/*** 调用线程运行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newCallerRun(String source) {System.out.println("thread =>" + Thread.currentThread().getName() + "触发阻塞中...");return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {r.run();}};}/*** 新线程运行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newThreadRun(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {String threadName = source + "-T-" + COUNTER.getAndIncrement();log.info("[{}] create new thread[{}] to run job", source, threadName);new Thread(r, threadName).start();}};}/*** 依据阻塞队列put 阻塞添加到队列中* @return 拒绝策略执行器*/public static RejectedExecutionHandler blockCallerPolicy(String source) {return new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, e, r);if (!e.isShutdown()) {try {// 阻塞入队操作,阻塞方为调用方执行submitjob的线程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}};}}

自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)

TaskQueue.java:线程池中实现先使用核心线程数

package demo10;import java.util.concurrent.*;public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private transient ThreadPoolExecutor parent;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(ThreadPoolExecutor parent) {this.parent = parent;}/*** 核心线程 -> 最大核心线程数 -> 队列* @param runnable the element to add* @return*/@Overridepublic boolean offer(Runnable runnable) {// 如果没有线程池父类,则直接尝试入队if (parent == null) return super.offer(runnable);// 若是工作线程数 < 最大线程数,则优先创建线程跑任务if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 工作线程数 >= 最大线程数,入队return super.offer(runnable);}
}

场景1:CPU密集型场景

思路&计算公式

**场景:**具体是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。处理起来其实没有多少优化空间,因为处理时几乎没有等待时间,所以一直占有 CPU 进行执行,才是最好的方式。

**可优化的点:**就是当单个线程累计较多任务时,其他线程能进行分担,类似fork/join框架的概念。

设置参数:设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。

Nthreads=Ncpu+1 
w/c =0 
理解也是正确的,+1 主要是防止因为系统上下文切换,让系统资源跑满!

实现代码

image-20250202184351200

这里核心+最大线程数使用的是CPU核心数+1:

package demo10.cpu;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** cpu密集型场景任务提交*  自定义队列:核心线程 -> 最大线程 -> 队列*  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务*  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离**/
@Slf4j
public class CPUThreadPoolExample {public static void main(String[] args) {// 获取 CPU 核心数int cpuCores = Runtime.getRuntime().availableProcessors();// 自定义线程池参数int corePoolSize = cpuCores + 1; // 核心线程数 cpu核心数+1int maximumPoolSize = corePoolSize; // 最大线程数 cpu核心数+1long keepAliveTime = 60L; // 空闲线程存活时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位// 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列TaskQueue<Runnable> taskQueue = new TaskQueue<>(500); // 队列容量为核心线程数的 2 倍// 创建自定义线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 将线程池对象设置到任务队列中taskQueue.setExecutor(executor);// 统计任务的执行数量int jobNums = 1000000;final AtomicInteger count = new AtomicInteger(0);// 记录任务开始时间long startTime = System.currentTimeMillis();// 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理// 如果submitjob阻塞,仅仅只会影响该thread线程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {// CPU计算int sum = 0;for (int j = 0; j < 100000; j++) {sum += j;}System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!sum = " + sum);count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();});}System.out.println("所有任务提交完成!");// 关闭线程池,等待任务全部执行完毕try {latch.await();System.out.println("所有任务执行结束!");// 记录任务结束时间long endTime = System.currentTimeMillis();// 计算任务执行时间long duration = endTime - startTime;System.out.println("任务执行总耗时: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任务完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制关闭}System.out.println("执行完任务数统计:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

效果:

image-20250202184444336


场景2:IO密集型场景

思路&计算公式

**场景:**其消耗的主要资源就是 IO 了,所接触到的 IO ,大致可以分成两种:磁盘 IO网络 IO。IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

  • 磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
  • 网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

设置参数:

# 如果存在IO,那么肯定w/c>1(阻塞耗时一般都是计算耗时的很多倍),但是需要考虑系统内存有限(每开启一个线程都需要内存空间),这里需要上服务器测试具体多少个线程数适合(CPU占比、线程数、总耗时、内存消耗)。如果不想去测试,保守点取1即,Nthreads=Ncpu*(1+1)=2Ncpu。这样设置一般都OK
# 通用就是2倍的CPU核心数(如果要效率最大化,需要测算当前系统环境每个线程任务的阻塞等待时间与实际计算时间)
Nthreads=Ncpu*(1+w/c)
公式中 W/C 为系统 阻塞率  w:等待时间 c:计算时间

实现代码

image-20250202183217126

IOIntensiveThreadPoolExample2.java:这里最终实现的Example2类来进行测试

package demo10.io;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** io密集型场景任务提交* demo3:基于demo2自定义拒绝策略*  自定义队列:核心线程 -> 最大线程 -> 队列*  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务*  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离**/
@Slf4j
public class IOIntensiveThreadPoolExample2 {public static void main(String[] args) {// 获取 CPU 核心数int cpuCores = Runtime.getRuntime().availableProcessors();// 自定义线程池参数int corePoolSize = cpuCores * 2; // 核心线程数(IO 密集型任务可以设置较大)int maximumPoolSize = cpuCores * 4; // 最大线程数long keepAliveTime = 60L; // 空闲线程存活时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位// 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列TaskQueue<Runnable> taskQueue = new TaskQueue<>(corePoolSize * 2); // 队列容量为核心线程数的 2 倍// 创建自定义线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 将线程池对象设置到任务队列中taskQueue.setExecutor(executor);// 统计任务的执行数量int jobNums = 1000;final AtomicInteger count = new AtomicInteger(0);// 记录任务开始时间long startTime = System.currentTimeMillis();// 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理// 如果submitjob阻塞,仅仅只会影响该thread线程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId + "...");try {Thread.sleep(500); // 模拟 IO 操作(如网络请求或文件读写)10s// xxxio类耗时操作} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!");count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();}});}System.out.println("所有任务提交完成!");// 关闭线程池,等待任务全部执行完毕try {latch.await();System.out.println("所有任务执行结束!");// 记录任务结束时间long endTime = System.currentTimeMillis();// 计算任务执行时间long duration = endTime - startTime;System.out.println("任务执行总耗时: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任务完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制关闭}System.out.println("执行完任务数统计:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

一个任务耗时0.5s,1000个任务执行如下:

image-20250202183457306

说明:经过测试验证,如果IO阻塞时间特别长,调大最大核心线程数效果更好。


其他部分组成

拒绝策略兜底方案

思路设计及思考

如果核心线程、最大线程、队列都满了的情况下该如何处理?如果本身就是单台机器资源打满,就需要在设计策略上改变线程池的调度方案,如果我的目的是任何一个任务都不丢弃,同时在服务器上有余力及时处理?

方案1:持久化数据库设计

  • 如:设计一张任务表间任务存储到 MySQL 数据库中;redis缓存;任务提交到中间件来缓冲。

设计思路可以如下:参考https://zhuanlan.zhihu.com/p/700719289

image-20250201084054264

方案2:Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控。

  • 后续通过翻阅源码发现一种在拒绝策略场景带退避的重试策略

方案3:ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付

**方案4:**dubbo设计思路(dump文件+抛出异常)

方案5:线程阻塞队列

思路:队列采用阻塞队列,在拒绝策略方法中使用put方法实现阻塞效果。

可能情况:阻塞主线程任务执行。


设计1:数据库持久化方案

设计思路:自定义拒绝策略,在拒绝策略情况下进行数据库持久化;自定义实现队列,在poll的时候优先从db获取任务,接着再从队列中获取。

**详细具体实现可见:**某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400


设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)

实现思路1:创建新线程执行任务

说明:为了保证任务的实时处理,这种做法需要良好的硬件设备且临时创建的线程无法做到准确的监控

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {NewThreadRunsPolicy() {super();}public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {//创建一个临时线程处理任务final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}
}

弊端:如果任务数特别多无上限场景,就会出现oom情况,导致服务挂掉。

实现思路2:拒绝策略场景带退避的重试策略

源码地址:https://github.dev/netty/netty

  • 具体代码文件:RejectedExecutionHandlers
/*** Tries to backoff when the task can not be added due restrictions for an configured amount of time. This* is only done if the task was added from outside of the event loop which means* {@link EventExecutor#inEventLoop()} returns {@code false}.*/
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {// 检查 retries 参数是否为正数,如果不是则抛出异常ObjectUtil.checkPositive(retries, "retries");// 将退避时间转换为纳秒final long backOffNanos = unit.toNanos(backoffAmount);// 返回一个实现了 RejectedExecutionHandler 接口的匿名类return new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {// 检查当前线程是否不是事件循环线程if (!executor.inEventLoop()) {// 进行最多 retries 次重试for (int i = 0; i < retries; i++) {// 尝试唤醒事件循环线程,以便它能够处理任务队列中的任务executor.wakeup(false);// 当前线程休眠指定的退避时间LockSupport.parkNanos(backOffNanos);// 尝试将任务重新加入任务队列if (executor.offerTask(task)) {// 如果任务成功加入队列,则直接返回return;}}}// 如果当前线程是事件循环线程,或者重试次数用尽后仍然无法加入任务队列,// 则抛出 RejectedExecutionException 异常throw new RejectedExecutionException();}};
}

设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)

说明:尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付,超过时间内则返回false。

github地址:https://github.dev/apache/activemq

  • 对应代码:BrokerService#getExecutor
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {try {// 在60s内进行尝试入队,如果入队失败,则抛出异常if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");}} catch (InterruptedException e) {throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");}}}

设计4:dubbo设计思路(dump文件+抛出异常)

github地址:

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!"+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),"+ " Task: %d (completed: %d),"+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating(),url.getProtocol(),url.getIp(),url.getPort());// 0-1 - Thread pool is EXHAUSTED!logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, Boolean.TRUE.toString()))) {// 进行dump文件dumpJStack();}// 指派发送消息给listener监听器dispatchThreadPoolExhaustedEvent(msg);throw new RejectedExecutionException(msg);
}

dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性


设计5: 自定义设计-阻塞入队

在线程池初始化的时候自定义拒绝策略:阻塞入队操作,阻塞方为调用方执行submitjob的线程

new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", "IOIntensiveThreadPool", e, r);if (!e.isShutdown()) {try {// 阻塞入队操作,阻塞方为调用方执行submitjob的线程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}
}

如果要执行的任务数量过多,核心线程数、最大核心线程数占满、任务队列占满,此时让任务进行入队阻塞,等待队列中任务有空余位置。


参考文章

[1]. Java 线程池讲解——针对 IO 密集型任务:https://www.jianshu.com/p/66b6dfcf3173(提出dubbo 或者 tomcat 的线程池中自定义Queue的实现,核心线程数 -> 最大线程数 -> 队列中)

[2]. 某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400

[3]. 线程池拒绝策略:https://blog.csdn.net/qq_40428665/article/details/121680262

[4]. Java线程池如何合理配置核心线程数:https://www.cnblogs.com/Vincent-yuan/p/16022613.html

[5]. 线程池参数配置:https://blog.csdn.net/whp404/article/details/131960756(计算公式)

相关文章:

Java自定义IO密集型和CPU密集型线程池

文章目录 前言线程池各类场景描述常见场景案例设计思路公共类自定义工厂类-MyThreadFactory自定义拒绝策略-RejectedExecutionHandlerFactory自定义阻塞队列-TaskQueue&#xff08;实现 核心线程->最大线程数->队列&#xff09; 场景1&#xff1a;CPU密集型场景思路&…...

Shell $0

个人博客地址&#xff1a;Shell $0 | 一张假钞的真实世界 我们已经知道在Shell中$0表示Shell脚本的文件名&#xff0c;但在有脚本调用的情形中&#xff0c;子脚本中的$0会是什么值呢&#xff1f;我们通过下面的实例来看。 已测试系统列表&#xff1a; Mac OS X EI Capitan 1…...

C++ Primer 标准库类型string

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…...

51c嵌入式~电路~合集25

我自己的原文哦~ https://blog.51cto.com/whaosoft/13241709 一、“开关电源”和“普通电源”的区别 什么叫开关电源 随着电力电子技术的发展和创新&#xff0c;使得开关电源技术也在不断地创新。目前&#xff0c;开关电源以小型、轻量和高效率的特点被广泛应用几乎所有的电…...

Vue3学习笔记-Vue开发前准备-1

一、安装15.0或更高版本的Node.js node -v npm -v 二、创建Vue项目 npm init vuelatest 三、Vue项目结构 node_modules: Vue项目运行的依赖文件public&#xff1a;资源文件夹package.json&#xff1a;信息描述文件...

架构技能(四):需求分析

需求分析&#xff0c;即分析需求&#xff0c;分析软件用户需要解决的问题。 需求分析的下一环节是软件的整体架构设计&#xff0c;需求是输入&#xff0c;架构是输出&#xff0c;需求决定了架构。 决定架构的是软件的所有需求吗&#xff1f;肯定不是&#xff0c;真正决定架构…...

51单片机 05 矩阵键盘

嘻嘻&#xff0c;LCD在RC板子上可以勉强装上&#xff0c;会有一点歪。 一、矩阵键盘 在键盘中按键数量较多时&#xff0c;为了减少I/O口的占用&#xff0c;通常将按键排列成矩阵形式&#xff1b;采用逐行或逐列的“扫描”&#xff0c;就可以读出任何位置按键的状态。&#xf…...

服务SDK三方新版中央仓库和私服发布详解

预备信息Github仓库发布Gradle版本匹配Gradle项目构建全局变量定义Gradle项目Nexus仓库配置与发布过程Gradle项目发布至Sonatype中央仓库配置过程总结当我们在实现一个项目技术总结、工具类封装或SDK封装,通常是为了方便开发者使用特定服务或平台而提供的一组工具和API。您可能…...

jmeter响应数据编码设置

jmeter响应数据编码设置 如果查看结果树的响应数据存在中文乱码&#xff0c;可以尝试以下方法&#xff1a; 1、找到jmeter的bin目录下的 jmeter.properties 文件&#xff0c;修改如下配置&#xff1a; 去掉sampleresult.default.encodingUTF-8前面的#号 2、保存文件后&#x…...

FPGA学习篇——开篇之作

今天正式开始学FPGA啦&#xff0c;接下来将会编写FPGA学习篇来记录自己学习FPGA 的过程&#xff01; 今天是大年初六&#xff0c;简单学一下FPGA的相关概念叭叭叭&#xff01; 一&#xff1a;数字系统设计流程 一个数字系统的设计分为前端设计和后端设计。在我看来&#xff0…...

Leetcode:680

1&#xff0c;题目 2&#xff0c;思路 首先就是判断它不发生改变会不会是回文如果不是回文&#xff0c;那么俩个指针从前往后与从后往前做对比如果俩字符不同&#xff0c;那就俩种选择&#xff0c;一种是保留前面的字符去掉后面字符&#xff0c;另一种是其反然后俩种选择只要满…...

Python教学:文档处理及箱线图等

代码1&#xff1a; import os import pandas as pd import numpy as py import os.path from os import listdir import openpyxl from openpyxl import Workbook import re import matplotlib.pyplot as plt # 导入matplotlib的绘图模块&#xff0c;用于可视化 cwdos.getcwd…...

SQLGlot:用SQLGlot解析SQL

几十年来&#xff0c;结构化查询语言&#xff08;SQL&#xff09;一直是与数据库交互的实际语言。在一段时间内&#xff0c;不同的数据库在支持通用SQL语法的同时演变出了不同的SQL风格&#xff0c;也就是方言。这可能是SQL被广泛采用和流行的原因之一。 SQL解析是解构SQL查询…...

C++STL(一)——string类

目录 一、string的定义方式二、 string类对象的容量操作三、string类对象的访问及遍历操作四、string类对象的修改操作五、string类非成员函数 一、string的定义方式 string是个管理字符数组的类&#xff0c;其实就是字符数组的顺序表。 它的接口也是非常多的。本章介绍一些常…...

C++ Primer 迭代器

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…...

排序算法--归并排序

归并排序是分治法的经典实现&#xff0c;适合大规模数据排序&#xff0c;尤其适合需要稳定排序的场景&#xff08;如数据库排序&#xff09; #include <stdlib.h> // 用于动态内存分配 // 合并两个已排序的子数组 void merge(int arr[], int left, int mid, int right) …...

深入探讨DICOM医学影像中的WADO服务及其具体实现

1. 引言 随着数字化医学影像技术的普及&#xff0c;如何高效、安全地存储、管理和共享医学影像数据成为医疗行业亟待解决的关键问题。DICOM&#xff08;Digital Imaging and Communications in Medicine&#xff09;作为国际公认的医学影像标准&#xff0c;在全球范围内广泛应…...

自定义数据集 使用paddlepaddle框架实现逻辑回归

导入必要的库 import numpy as np import paddle import paddle.nn as nn 数据准备&#xff1a; seed1 paddle.seed(seed)# 1.散点输入 定义输入数据 data [[-0.5, 7.7], [1.8, 98.5], [0.9, 57.8], [0.4, 39.2], [-1.4, -15.7], [-1.4, -37.3], [-1.8, -49.1], [1.5, 75.6…...

信息学奥赛一本通 2112:【24CSPJ普及组】地图探险(explore) | 洛谷 P11228 [CSP-J 2024] 地图探险

【题目链接】 ybt 2112&#xff1a;【24CSPJ普及组】地图探险&#xff08;explore&#xff09; 洛谷 P11228 [CSP-J 2024] 地图探险 【题目考点】 1. 模拟 2. 二维数组 3. 方向数组 在一个矩阵中&#xff0c;当前位置为(sx, sy)&#xff0c;将下一个位置与当前位置横纵坐…...

xxl-job 在 Java 项目的使用 以一个代驾项目中的订单模块举例

能搜到这里的最起码一定知道 xxl-job 是用来干什么的&#xff0c;我就不多啰嗦怎么下载以及它的历史了 首先我们要知道 xxl-job 这个框架的结构&#xff0c;如下图&#xff1a; xxl-job-master&#xff1a;xxl-job-admin&#xff1a;调度中心xxl-job-core&#xff1a;公共依赖…...

javaEE-8.JVM(八股文系列)

目录 一.简介 二.JVM中的内存划分 JVM的内存划分图: 堆区:​编辑 栈区:​编辑 程序计数器&#xff1a;​编辑 元数据区&#xff1a;​编辑 经典笔试题&#xff1a; 三,JVM的类加载机制 1.加载: 2.验证: 3.准备: 4.解析: 5.初始化: 双亲委派模型 概念: JVM的类加…...

模型/O功能之提示词模板

文章目录 模型/O功能之提示词模板什么是提示词模板提示词模板的输入和输出 使用提示词模板构造提示词 模型/O功能之提示词模板 在LangChain框架中&#xff0c;提示词不是简单的字符串&#xff0c;而是一个更复杂的结构&#xff0c;是一个“提示词工程”。这个结构中包含一个或多…...

[Proteus仿真]基于51单片机的智能温控系统

[Proteus仿真]基于51单片机的智能温控系统 基于51单片机的智能温控系统&#xff1a;DS18B20精准测温LCD1602双屏显示三键设置上下限声光报警&#xff0c;支持温度校准、抗干扰设计、阈值记忆。 一.仿真原理图 ​​ 二.模块介绍 温度采集模块&#xff08;DS18B20&#xff0…...

掌握 HTML5 多媒体标签:如何在所有浏览器中顺利嵌入视频与音频

系列文章目录 01-从零开始学 HTML&#xff1a;构建网页的基本框架与技巧 02-HTML常见文本标签解析&#xff1a;从基础到进阶的全面指南 03-HTML从入门到精通&#xff1a;链接与图像标签全解析 04-HTML 列表标签全解析&#xff1a;无序与有序列表的深度应用 05-HTML表格标签全面…...

ChatGPT与GPT的区别与联系

ChatGPT 和 GPT 都是基于 Transformer 架构的语言模型&#xff0c;但它们有不同的侧重点和应用。下面我们来探讨一下它们的区别与联系。 1. GPT&#xff08;Generative Pre-trained Transformer&#xff09; GPT 是一类由 OpenAI 开发的语言模型&#xff0c;基于 Transformer…...

浅谈线段树

文章同步发布于洛谷&#xff0c;建议前往洛谷查看。 前言 蒟蒻终于学会线段树&#xff08;指【模板】线段树 1 1 1&#xff09;啦&#xff01; 线段树思想 我们先来考虑 P3372&#xff08;基础线段树模板题&#xff09;给的操作&#xff1a; 区间修改&#xff08;增加&am…...

深度解读 Docker Swarm

一、引言 随着业务规模的不断扩大和应用复杂度的增加,容器集群管理的需求应运而生。如何有效地管理和调度大量的容器,确保应用的高可用性、弹性伸缩和资源的合理分配,成为了亟待解决的问题。Docker Swarm 作为 Docker 官方推出的容器集群管理工具,正是在这样的背景下崭露头…...

在线知识库的构建策略提升组织信息管理效率与决策能力

内容概要 在线知识库作为现代企业信息管理的重要组成部分&#xff0c;具有显著的定义与重要性。它不仅为组织提供了一个集中存储与管理知识的平台&#xff0c;还能够有效提升信息检索的效率&#xff0c;促进知识的创新和利用。通过这样的知识库&#xff0c;企业可以更好地应对…...

网件r7000刷回原厂固件合集测评

《网件R7000路由器刷回原厂固件详解》 网件R7000是一款备受赞誉的高性能无线路由器&#xff0c;其强大的性能和可定制性吸引了许多高级用户。然而&#xff0c;有时候用户可能会尝试第三方固件以提升功能或优化网络性能&#xff0c;但这也可能导致一些问题&#xff0c;如系统不…...

为什么命令“echo -e “\033[9;0]“ > /dev/tty0“能控制开发板上的LCD不熄屏?

为什么命令"echo -e “\033[9;0]” > /dev/tty0"能控制开发板上的LCD不熄屏&#xff1f; 在回答这个问题前请先阅读我之前写的与tty和终端有关的博文 https://blog.csdn.net/wenhao_ir/article/details/145431655 然后再来看这条命令的解释就要容易些了。 这条…...

vscode软件操作界面UI布局@各个功能区域划分及其名称称呼

文章目录 abstract检查用户界面的主要区域官方文档关于UI的介绍 abstract 检查 Visual Studio Code 用户界面 - Training | Microsoft Learn 本质上&#xff0c;Visual Studio Code 是一个代码编辑器&#xff0c;其用户界面和布局与许多其他代码编辑器相似。 界面左侧是用于访…...

【Java基础-42.3】Java 基本数据类型与字符串之间的转换:深入理解数据类型的转换方法

在 Java 开发中&#xff0c;基本数据类型与字符串之间的转换是非常常见的操作。无论是从用户输入中读取数据&#xff0c;还是将数据输出到日志或界面&#xff0c;都需要进行数据类型与字符串之间的转换。本文将深入探讨 Java 中基本数据类型与字符串之间的转换方法&#xff0c;…...

【ActiveMq RocketMq RabbitMq Kafka对比】

以下是 ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 的对比表格&#xff0c;从复杂性、功能、性能和适用场景等方面进行整理&#xff1a; 特性ActiveMQRocketMQRabbitMQKafka开发语言JavaJavaErlangScala/Java协议支持AMQP、STOMP、MQTT、OpenWire 等自定义协议AMQP、STOMP、MQTT …...

csapp笔记3.6节——控制(1)

本节解决了x86-64如何实现条件语句、循环语句和分支语句的问题 条件码 除了整数寄存器外&#xff0c;cpu还维护着一组单个位的条件码寄存器&#xff0c;用来描述最近的算数和逻辑运算的某些属性。可检测这些寄存器来执行条件分支指令。 CF&#xff08;Carry Flag&#xff09…...

网站快速收录:如何优化网站音频内容?

本文转自&#xff1a;百万收录网 原文链接&#xff1a;https://www.baiwanshoulu.com/60.html 为了优化网站音频内容以实现快速收录&#xff0c;以下是一些关键的策略和步骤&#xff1a; 一、高质量音频内容创作 原创性&#xff1a; 确保音频内容是原创的&#xff0c;避免使…...

音视频入门基础:RTP专题(8)——使用Wireshark分析RTP

一、引言 通过Wireshark可以抓取RTP数据包&#xff0c;该软件可以从Wireshark Go Deep 下载。 二、通过Wireshark抓取RTP数据包 首先通过FFmpeg将一个媒体文件转推RTP&#xff0c;生成RTP流&#xff1a; ffmpeg -re -stream_loop -1 -i input.mp4 -vcodec copy -an -f rtp …...

4-图像梯度计算

文章目录 4.图像梯度计算(1)Sobel算子(2)梯度计算方法(3)Scharr与Laplacian算子4.图像梯度计算 (1)Sobel算子 图像梯度-Sobel算子 Sobel算子是一种经典的图像边缘检测算子,广泛应用于图像处理和计算机视觉领域。以下是关于Sobel算子的详细介绍: 基本原理 Sobel算子…...

深入解析 Redis AOF 机制:持久化原理、重写优化与 COW 影响

深入解析 Redis AOF 机制&#xff1a;持久化原理、重写优化与 COW 影响 1. 引言2. AOF 机制详解2.1 AOF 解决了什么问题&#xff1f;2.2 AOF 写入机制2.2.1 AOF 的基本原理2.2.2 AOF 运行流程2.2.3 AOF 文件刷盘策略 3. AOF 重写机制3.1 AOF 文件为什么会变大&#xff1f;3.2 解…...

机器学习day8

自定义数据集 &#xff0c;使用朴素贝叶斯对其进行分类 代码 import numpy as np import matplotlib.pyplot as pltclass1_points np.array([[2.1, 2.2], [2.4, 2.5], [2.2, 2.0], [2.0, 2.1], [2.3, 2.3], [2.6, 2.4], [2.5, 2.1]]) class2_points np.array([[4.0, 3.5], …...

【前端】ES6模块化

文章目录 1. 模块化概述1.1 什么是模块化?1.2 为什么需要模块化? 2. 有哪些模块化规范3. CommonJs3.1 导出数据3.2 导入数据3.3 扩展理解3.4 在浏览器端运行 4.ES6模块化 参考视频地址 1. 模块化概述 1.1 什么是模块化? 将程序文件依据一定规则拆分成多个文件,这种编码方式…...

【leetcode练习·二叉树拓展】快速排序详解及应用

本文参考labuladong算法笔记[拓展&#xff1a;快速排序详解及应用 | labuladong 的算法笔记] 1、算法思路 首先我们看一下快速排序的代码框架&#xff1a; def sort(nums: List[int], lo: int, hi: int):if lo > hi:return# 对 nums[lo..hi] 进行切分# 使得 nums[lo..p-1]…...

Gurobi基础语法之 addConstr, addConstrs, addQConstr, addMQConstr

在新版本的 Gurobi 中&#xff0c;向 addConstr 这个方法中传入一个 TempConstr 对象&#xff0c;在模型中就会根据这个对象生成一个约束。更重要的是&#xff1a;TempConstr 对象可以传给所有addConstr系列方法&#xff0c;所以下面先介绍 TempConstr 对象 TempConstr TempC…...

游戏引擎 Unity - Unity 设置为简体中文、Unity 创建项目

Unity Unity 首次发布于 2005 年&#xff0c;属于 Unity Technologies Unity 使用的开发技术有&#xff1a;C# Unity 的适用平台&#xff1a;PC、主机、移动设备、VR / AR、Web 等 Unity 的适用领域&#xff1a;开发中等画质中小型项目 Unity 适合初学者或需要快速上手的开…...

Kamailio、MySQL、Redis、Gin后端、Vue.js前端等基于容器化部署

基于容器化的部署方案&#xff0c;通常会将每个核心服务&#xff08;如Kamailio、MySQL、Redis、Gin后端、Vue.js前端等&#xff09;独立运行在不同的容器中&#xff0c;通过Docker或Kubernetes统一管理。以下是具体实现方式和关键原因&#xff1a; 1. 容器化部署的核心思路 每…...

从1号点到n号点最多经过k条边的最短距离

目录 解析方法思路代码解释代码逐行注释1. 头文件和常量定义&#xff1a;2.边的结构体&#xff1a;3.全局变量&#xff1a;4.Bellman-Ford算法实现&#xff1a;5.主函数&#xff1a; 注意事项代码含义为什么需要 backup[a]&#xff1f;举例说明关键点 总结 解析 要实现从1号点…...

模拟实战-用CompletableFuture优化远程RPC调用

实战场景 这是广州某500-900人互联网厂的面试原题 手写并发优化解决思路 我们要调用对方的RPC接口&#xff0c;我们的RPC接口每调用一次对方都会阻塞50ms 但是我们的业务要批量调用RPC&#xff0c;例如我们要批量调用1k次&#xff0c;我们不可能在for循环里面写1k次远程调用…...

【pinia状态管理配置】

pinia状态管理配置 安装main.ts引入自定义user仓库使用自定义仓库 安装 pnpm add piniamain.ts引入 // createPinia() 函数调用创建了一个新的 Pinia 实例。 // 这个实例是状态管理的核心&#xff0c;它将管理应用中所有的 store。 import { createPinia } from pinia app.us…...

SpringBoot 引⼊MybatisGenerator

SpringBoot 引⼊MybatisGenerator 1. 引入插件2. 添加generator.xml并修改3. 生成文件 1. 引入插件 <plugin><groupId>org.mybatis.generator</groupId><artifactId>mybatis-generator-maven-plugin</artifactId><version>1.3.5</vers…...

在线销售数据集分析:基于Python的RFM数据分析方法实操训练

一、前言 个人练习&#xff0c;文章用于记录自己的学习练习过程&#xff0c;分享出来和大家一起学习。 数据集&#xff1a;在线销售数据集 分析方法&#xff1a;RFM分析方法 二、过程 1.1 库的导入与一些必要的初始设置 import pandas as pd import datetime import matplo…...

LeetCode - #197 Swift 实现找出温度更高的日期

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…...