当前位置: 首页 > news >正文

Java线程池

专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162

本文目标:

  1. 理解线程池运行原理
    • 线程的各种属性参数
    • 关闭问题
    • 异常处理
    • 拒绝策略
    • 常见的线程池
  2. 可以分析下自身工作中用的各种线程池和参数设定

工作中用到的有

  • 普通的 ThreadPoolExecutor 自己设置参数 和 名称,配合FutureTask相关完成业务逻辑
  • ScheduleThreadPool,主要是定期拉取某些资源
  • SingleThreadScheduledExecutor,单线程定期运行某项任务

目录

  • 线程池
    • 常见接口和类
      • interface Executor
      • interface ExecutorService extends Executor
      • abstract class AbstractExecutorService implements ExecutorService
      • class ThreadPoolExecutor extends AbstractExecutorService
      • ThreadFactory
    • ThreadPoolExecutor 构造函数和成员含义
      • 线程池参数设置
      • 线程池的处理流程
      • ThreadPoolExecutor 例子代码
    • 线程池使用后关闭问题?
      • 线程池`shutdown`,`shutdownNow`
      • 线程池的自动关闭
    • 拒绝策略
      • 线程添加到线程池中被拒绝的原因
      • 常见的几种拒绝策略
    • 线程异常处理
    • 常见线程池
      • newFixedThreadPool
      • CachedThreadPool
      • newSingleThreadExecutor 单一无界顺序线程
      • newScheduleThreadPool 延时/定期调度线程池
      • newSingleThreadScheduledExecutor
      • ForkJoinPool(java.util.concurrent;)
        • new ForkJoinPool()
        • ForkJoinTask类介绍
        • ForkJoinTask例子代码
      • FutureTask 与 线程池使用例子代码

线程池

  • 为什么需要线程池 ?

    1. 线程的创建销毁是个消耗系统的操作,实现线程资源的充分复用
    2. 方便对线程的统一管理、分配、调优和监控
  • 线程池应该具备哪些功能 ?

  • 线程池的实现需要注意哪些细节 ?

常见接口和类

interface Executor

public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}

interface ExecutorService extends Executor

  • execute() 执行任务
  • shutdown() 调用后不再接收新任务,如果里面有任务,就执行完
  • shutdownNow() 调用后不再接受新任务,如果有等待任务,移出队列;有正在执行的,尝试停止之
  • isShutdown() 判断线程池是否关闭
  • isTerminated() 判断线程池中任务是否执行完成
  • submit() 提交任务
  • invokeAll() 执行一组任务

abstract class AbstractExecutorService implements ExecutorService

class ThreadPoolExecutor extends AbstractExecutorService

ThreadFactory

  • 工厂模式
  • ThreadFactory
/*** The default thread factory*/static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}

ThreadPoolExecutor 构造函数和成员含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
  • corePoolSize: 指定了线程池中的基本线程数量,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程;在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTimeallowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize

  • maximumPoolSize: 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量

  • keepAliveTime: 线程存活时间(在corePoolSize<*<maximumPoolSize情况下有用);超过corePoolSize的多余的线程会在多长时间内被销毁;

  • unit: keepAliveTime的单位

  • workQueue: 阻塞任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为:1.直接提交队列、2.有界任务队列、3.无界任务队列、4.优先任务队列(特殊的无界队列)几种;

  • threadFactory: 线程工厂,用于创建线程,一般用默认即可;

  • handler: 拒绝策略;当任务太多来不及处理时,如何拒绝任务;

  • workerCount: 当前活跃的线程数(也即线程池中的线程数量)

线程池参数设置

CPU密集型:不宜过多设置多线程,因为线程切换会损耗性能;一般是n+1(n为CPU核心数)

IO密集型:一般业务开发主要是数据库、缓存等,涉及到磁盘和网络IO,要考虑多设置线程池,增加IO并发度。一般是n*2(n为CPU核心数)

配合jstack命令:如果线程池中的大部分线程都处于运行状态,则可以提高线程池数量;


队列一般选择有界队列,因为任务很多都塞到工作队列,会占用内存,可能有OOM风险,且此时最大线程数已经失去意义了。

线程池的处理流程

1、如果当前线程池的线程数还没有达到核心大小(poolSize < corePoolSize),无论是否有空闲的线程都创建一个新的线程来处理新提交的任务;2、如果当前线程池的线程数等于核心大小了(poolSize === corePoolSize),即核心工作线程已经满了,但任务队列未满时,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command)3、如果任务队列满(先是队列满,然后是判断最大线程数有没有达到)3.1、当前poolSize<maximumPoolSize,那么就新增线程来处理任务;(这些新的线程在没有工作的时候,最长存活的最长时间为设置的keepAliveTime)3.2、当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的拒绝策略RejectedExecutionHandler

ThreadPoolExecutor 例子代码

package com.threadpool;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author mubi* @Date 2019/4/8 10:52 PM*/
public class ThreadPoolUse {static class MyThreadFactory implements ThreadFactory {private AtomicInteger count = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);String threadName = "MyThread" + count.addAndGet(1);System.out.println(threadName);t.setName(threadName);return t;}}static class MyTask implements Runnable{int id;MyTask(int id){this.id = id;}@Overridepublic void run() {try{System.out.println("myTask id:" + id);TimeUnit.SECONDS.sleep(10);}catch (Exception e){e.printStackTrace();}}}public static void main(String[] args){int corePoolSize = 2;int maximumPoolSize = 4;int keepAliveTime = 2;TimeUnit timeUnit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1);RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();ThreadFactory threadFactory = new MyThreadFactory();ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handler);for(int i=0;i<6;i++) {MyTask task = new MyTask(i);executor.execute(task);}executor.shutdown();}}

线程池使用后关闭问题?

是否需要关闭?如何关闭?

A pool that is no longer referenced in a program and has no remaining threads will be shutdown automatically.

如果程序中不再持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。

注:线程池中没有线程是指线程池中的所有线程都已运行完自动消亡,然而我们常用的FixedThreadPool的核心线程没有超时策略,所以并不会自动关闭。

线程池shutdownshutdownNow

  1. shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
  2. shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为stop。
package com.threadpool;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author mubi* @Date 2019/4/8 10:52 PM*/
public class ThreadPoolUse {static class MyThreadFactory implements ThreadFactory {private AtomicInteger count = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);String threadName = "MyThread" + count.addAndGet(1);System.out.println(threadName);t.setName(threadName);return t;}}static class MyTask implements Runnable{int id;MyTask(int id){this.id = id;}@Overridepublic void run() {try{System.out.println("myTask id:" + id);TimeUnit.SECONDS.sleep(10);}catch (Exception e){e.printStackTrace();}}}static void testFixPool(){while(true) {ExecutorService executorService = Executors.newFixedThreadPool(8);executorService.execute(() -> System.out.println("running"));executorService = null;}}static void testCachedThreadPool(){while(true) {// 默认keepAliveTime为 60sExecutorService executorService = Executors.newCachedThreadPool();ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;// 为了更好的模拟,动态修改为1纳秒threadPoolExecutor.setKeepAliveTime(1, TimeUnit.NANOSECONDS);threadPoolExecutor.execute(() -> System.out.println("running"));executorService = null;}}static class MyRejectPolicy implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (r instanceof MyTask) {MyTask r1 = (MyTask) r;//直接打印System.out.println("Reject Thread:" + r1.id);}}}static void test(){int corePoolSize = 2;int maximumPoolSize = 5;int keepAliveTime = 60 * 1;TimeUnit timeUnit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
//        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();RejectedExecutionHandler handler = new MyRejectPolicy();ThreadFactory threadFactory = new MyThreadFactory();ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handler);MyTask task = new MyTask(1);executor.execute(task);executor.shutdown();// 已经关闭的线程池,引用还在,再有新任务,会执行拒绝策略MyTask task2 = new MyTask(2);executor.execute(task2);}public static void main(String[] args){test();System.out.println("main end");}}

线程池的自动关闭

  1. 自动关闭线程池:核心线程数为 0 并指定线程存活时间

  2. 通过 allowCoreThreadTimeOut 控制核心线程存活时间

/*** Sets the policy governing whether core threads may time out and* terminate if no tasks arrive within the keep-alive time, being* replaced if needed when new tasks arrive. When false, core* threads are never terminated due to lack of incoming* tasks. When true, the same keep-alive policy applying to* non-core threads applies also to core threads. To avoid* continual thread replacement, the keep-alive time must be* greater than zero when setting {@code true}. This method* should in general be called before the pool is actively used.** @param value {@code true} if should time out, else {@code false}* @throws IllegalArgumentException if value is {@code true}*         and the current keep-alive time is not greater than zero** @since 1.6*/
public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}
}
  1. 线程池中的线程设置为守护线程

拒绝策略

线程池的拒绝策略是指当任务添加到线程池中被拒绝而采取的处理措施

线程添加到线程池中被拒绝的原因

当任务添加到线程池中之所以被拒绝,可能是由于:第一线程池异常关闭;第二,任务数量超过线程池的最大限制,并设置有界的workeQueue

import java.text.SimpleDateFormat;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author mubi* @Date 2019/9/2 21:38*/
public class ThreadPoolTest {static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");static RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("MyTest Task " + r.toString() +" rejected from " +e.toString());}};static AtomicInteger atomicInteger = new AtomicInteger(0);ThreadFactory threadFactory = new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("mythread-" + atomicInteger.get());atomicInteger.incrementAndGet();return thread;}};void test1(){ThreadPoolExecutor EXECUTOR =new ThreadPoolExecutor(5,10,3000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),threadFactory,defaultHandler);int n = 2;for (int i = 0; i < n; i++) {// 使用前可以判断
//            if(!EXECUTOR.isShutdown()) {Thread t = threadFactory.newThread(new Runnable() {@Overridepublic void run() {System.out.println("Hello World");try{TimeUnit.SECONDS.sleep(5);}catch (Exception e){}System.out.println("Hello World2");}});EXECUTOR.execute(t);
//            }else {
//                System.out.println("executor already shutdown");
//            }EXECUTOR.shutdown();}}void test2(){ThreadPoolExecutor EXECUTOR =new ThreadPoolExecutor(5,10,3000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),threadFactory,defaultHandler);int n = 15;for (int i = 0; i < n; i++) {final int tmpint = i;EXECUTOR.execute(new Runnable() {@Overridepublic void run() {try {System.out.println(tmpint+"Hello World");TimeUnit.MILLISECONDS.sleep(1000L);} catch (InterruptedException e) {}}});}EXECUTOR.shutdown();}public static void main(final String[] args) throws Exception {ThreadPoolTest threadPoolTest = new ThreadPoolTest();threadPoolTest.test2();System.out.println("end main()");}}

常见的几种拒绝策略

  1. ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。(默认
  2. ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)自己处理该任务

线程异常处理

参考阅读博文:线程池中某个线程执行有异常,该如何处理?

实际工作中一般是自身线程内部try catch处理好各种异常,log打好,监控加好;也可以抛出异常,但要注意拒绝策略,可以加上重试

常见线程池

newFixedThreadPool

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程

// 核心和最大线程数都是固定数,使用`LinkedBlockingQueue`队列
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
static void testFixPool(){while(true) {ExecutorService executorService = Executors.newFixedThreadPool(8);executorService.execute(() -> System.out.println("running"));executorService = null;}
}public static void main(String[] args){testFixPool();System.out.println("main end");
}
  • 因为固定线程池不会自己销毁,最终会耗尽内存(需要在合适的时候shutdown
running
running
running
running
running
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native threadat java.lang.Thread.start0(Native Method)at java.lang.Thread.start(Thread.java:717)at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)at com.threadpool.ThreadPoolUse.testFixPool(ThreadPoolUse.java:71)at com.threadpool.ThreadPoolUse.main(ThreadPoolUse.java:77)

CachedThreadPool

static void testCachedThreadPool(){while(true) {// 默认keepAliveTime为 60sExecutorService executorService = Executors.newCachedThreadPool();ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;// 为了更好的模拟,动态修改为1纳秒threadPoolExecutor.setKeepAliveTime(1, TimeUnit.NANOSECONDS);threadPoolExecutor.execute(() -> System.out.println("running"));executorService = null;}
}public static void main(String[] args){testCachedThreadPool();System.out.println("main end");
}
  • 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程
  • 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  • CachedThreadPool 的线程 keepAliveTime 默认为 60s,核心线程数量为 0 ,所以不会有核心线程存活阻止线程池自动关闭。详见 线程池之ThreadPoolExecutor构造;为了更快的模拟,构造后将 keepAliveTime 修改为1纳秒,相当于线程执行完马上会消亡,所以线程池可以被回收。实际开发中,如果CachedThreadPool确实忘记关闭,在一定时间后是可以被回收的,但仍然建议显示关闭。
// 核心线程数为0,每次都必须新建线程,无法线程复用(空闲后会在60s后销毁)
// SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

newSingleThreadExecutor 单一无界顺序线程

作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

// 核心,最大线程数都是1,使用`LinkedBlockingQueue`阻塞无界(这里指队列大小是Integer.MAX_VALUE)队列
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}

newScheduleThreadPool 延时/定期调度线程池

作用:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

特征:
* 线程池中具有指定数量的线程,即便是空线程也将保留
* 可定时或者延迟执行线程活动

创建方式:
* Executors.newScheduledThreadPool(int corePoolSize);// corePoolSize线程的个数
* newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);// corePoolSize线程的个数,threadFactory创建线程的工厂

newSingleThreadScheduledExecutor

作用:创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

特征:
* 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行
* 可定时或者延迟执行线程活动

创建方式:
* Executors.newSingleThreadScheduledExecutor() ;
* Executors.newSingleThreadScheduledExecutor(ThreadFactory threadFactory) ;//threadFactory创建线程的工厂

代码例子eg:

public class HookTest {static volatile boolean flag = true;static AtomicInteger i = new AtomicInteger(0);static ScheduledExecutorService service;public static void main(String[] args) throws Exception{Runnable runnable = new Runnable() {@Overridepublic void run() {try {int curVal = i.get();System.out.println(curVal + ":" + flag);if(curVal >= 10){flag = false;}else {i.addAndGet(1);}} catch (Exception e) {System.out.println("发生异常");}}};service = Executors.newSingleThreadScheduledExecutor();service.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);new Thread(new Runnable() {@Overridepublic void run() {while (flag) {}service.shutdown();System.out.println("end:" + flag);}}).start();}}
/*
0:true
1:true
2:true
3:true
4:true
5:true
6:true
7:true
8:true
9:true
10:true
end:false
*/

ForkJoinPool(java.util.concurrent;)

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService

ForkJoin思想:把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可

  • fork

n. 餐叉; 叉(挖掘用的园艺工具); (道路、河流等的) 分岔处,分流处,岔口,岔路;
v. 分岔; 岔开两条分支; 走岔路中的一条; 叉运; 叉掘;

  • join

v. 连接; 接合; 联结; 结合; 联合; 汇合; 成为…的一员; 参加; 加入;
n. 结合; 连接; 接合点;

new ForkJoinPool()
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}
/*** Creates a {@code ForkJoinPool} with the given parameters, without* any security checks or parameter validation.  Invoked directly by* makeCommonPool.*/
private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

默认线程数量是:Runtime.getRuntime().availableProcessors()(1个cpu,2核,超线程数2,返回4)

ForkJoinTask类介绍
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

Abstract base class for tasks that run within a ForkJoinPool. A ForkJoinTask is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.

ForkJoinTask例子代码
public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// invokeAll(leftTask, rightTask);// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}static void testForkJoinPool() throws Exception{ForkJoinPool forkjoinPool = new ForkJoinPool();int sta = 1;int end = 100;//生成一个计算任务,计算连续区间范围的和ForkJoinTaskExample task = new ForkJoinTaskExample(sta, end);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);System.out.println("result:" + result.get());}public static void main(String[] args) throws Exception{testForkJoinPool();TimeUnit.SECONDS.sleep(1);}
}

FutureTask 与 线程池使用例子代码

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;class Task implements Callable<Integer>{String name;public Task(String name) {this.name = name;}@Overridepublic Integer call() throws Exception {Integer res = new Random().nextInt(100);Thread.sleep(1000);System.out.println("任务执行:获取到结果 :"+res);return  res;}public String getName() {return name;}
}public class Solution {public void testFutureAndThreadPool(){// 线程池ExecutorService executorService = Executors.newFixedThreadPool(10);//进行异步任务列表List<FutureTask<Integer>> futureTasks = new ArrayList<>();long start = System.currentTimeMillis();int n = 10;for(int i=0;i<n;i++){Task task = new Task("name_" + i);//创建一个异步任务FutureTask<Integer> futureTask = new FutureTask<>(task);futureTasks.add(futureTask);//提交异步任务到线程池,让线程池管理任务。//由于是异步并行任务,所以这里并不会阻塞executorService.submit(futureTask);}int count = 0;for (FutureTask<Integer> futureTask : futureTasks) {// get()// get(long timeout, TimeUnit unit) 第一个参数为最大等待时间,第二个为时间的单位try{count += futureTask.get();}catch (Exception e){e.printStackTrace();}}//清理线程池executorService.shutdown();long end = System.currentTimeMillis();System.out.println("线程池的任务全部完成:结果为:"+count+",main线程关闭,进行线程的清理");System.out.println("使用时间:"+(end-start)+"ms");}public void testLine(){long start = System.currentTimeMillis();int n = 10;int count = 0;for(int i=0;i<n;i++){Task task = new Task("name_" + i);try{count += task.call();}catch (Exception e){e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println("线程池的任务全部完成:结果为:"+count+",main线程关闭,进行线程的清理");System.out.println("使用时间:"+(end-start)+"ms");//清理线程池}public static void main(String[] args) {Solution solution = new Solution();solution.testFutureAndThreadPool();solution.testLine();System.out.println("the end");}}

相关文章:

Java线程池

专栏系列文章地址&#xff1a;https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标&#xff1a; 理解线程池运行原理 线程的各种属性参数关闭问题异常处理拒绝策略常见的线程池 可以分析下自身工作中用的各种线程池和参数设定 工作中用到的有 普通的 Th…...

2025年01月27日Github流行趋势

项目名称&#xff1a;onlook项目地址url&#xff1a;https://github.com/onlook-dev/onlook项目语言&#xff1a;TypeScript历史star数&#xff1a;5340今日star数&#xff1a;211项目维护者&#xff1a;Kitenite, drfarrell, iNerdStack, abhiroopc84, apps/dependabot项目简介…...

C# 数组和列表的基本知识及 LINQ 查询

数组和列表的基本知识及 LINQ 查询 一、基本知识二、引用命名空间声明三、数组3.1、一维数组3.2、二维数组3.3、不规则数组 Jagged Array 四、列表 List4.1、一维列表4.2、二维列表 五、数组和列表使用 LINQ的操作和运算5.1、一维 LIST 删除所有含 double.NaN 的行5.2、一维 LI…...

Deepseek本地部署(ollama+open-webui)

ollama 首先是安装ollama&#xff0c;这个非常简单 https://ollama.com/ 下载安装即可 open-webui 这个是为了提供一个ui&#xff0c;毕竟我们也不想在cmd和模型交互&#xff0c;很不方便。 第一&#xff0c;需要安装python3.11&#xff0c;必须是3.11&#xff08;其他版…...

(七)Spring Cloud Alibaba 2023.x:RocketMQ 消息队列配置与实现

目录 前言 准备 安装RocketMq服务 下载rocketmq服务 下载rocketmq 控制台 项目集成 引入依赖 生产者服务配置 消费者服务配置 发送队列消息 前言 在微服务架构中&#xff0c;异步消息通信是实现系统解耦、提高性能和增强系统可靠性的重要手段。在 Spring Cloud Alib…...

2848、与车相交的点

2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i&#xff0c;nums[i] [starti, endi] &#xff0c;其中 starti 是第 i 辆车的起点&#xff0c;endi 是第 i 辆车的终点。 返回数轴上被车 任意…...

51单片机开发:温度传感器

温度传感器DS18B20&#xff1a; 初始化时序图如下图所示&#xff1a; u8 ds18b20_init(void){ds18b20_reset();return ds18b20_check(); }void ds18b20_reset(void){DS18B20_PORT 0;delay_10us(75);DS18B20_PORT 1;delay_10us(2); }u8 ds18b20_check(void){u8 time_temp0;wh…...

三甲医院大型生信服务器多配置方案剖析与应用(2024版)

一、引言 1.1 研究背景与意义 在当今数智化时代&#xff0c;生物信息学作为一门融合生物学、计算机科学和信息技术的交叉学科&#xff0c;在三甲医院的科研和临床应用中占据着举足轻重的地位。随着高通量测序技术、医学影像技术等的飞速发展&#xff0c;生物医学数据呈爆发式…...

【机器学习】自定义数据集 ,使用朴素贝叶斯对其进行分类

一、贝叶斯原理 贝叶斯算法是基于贝叶斯公式的&#xff0c;其公式为&#xff1a; 其中叫做先验概率&#xff0c;叫做条件概率&#xff0c;叫做观察概率&#xff0c;叫做后验概率&#xff0c;也是我们求解的结果&#xff0c;通过比较后验概率的大小&#xff0c;将后验概率最大的…...

ASP.NET Core 启动并提供静态文件

ASP.NET Core 启动并提供静态文件 即是单个可执行文件&#xff0c;它既运行 API 项目&#xff0c;也托管 前端项目&#xff08;通常是前端的发布文件&#xff09;。 这种方式一般是通过将 前端项目 的发布文件&#xff08;例如 HTML、CSS、JavaScript&#xff09;放入 Web AP…...

MySQL 导入数据

MySQL 导入数据 引言 MySQL 是一款广泛使用的开源关系型数据库管理系统,它以其稳定性和高效性被广泛应用于各种规模的应用程序中。在数据库管理过程中,数据的导入是至关重要的一个环节。本文将详细介绍如何在 MySQL 中导入数据,包括导入数据的准备、操作步骤以及注意事项。…...

MINIRAG: TOWARDS EXTREMELY SIMPLE RETRIEVAL-AUGMENTED GENERATION论文翻译

感谢阅读 注意不含评估以后的翻译原论文地址标题以及摘要介绍部分MiniRAG 框架2.1 HETEROGENEOUS GRAPH INDEXING WITH SMALL LANGUAGE MODELS2.2 LIGHTWEIGHT GRAPH-BASED KNOWLEDGE RETRIEVAL2.2.1 QUERY SEMANTIC MAPPING2.2.2 TOPOLOGY-ENHANCED GRAPH RETRIEVAL 注意不含评…...

将 OneLake 数据索引到 Elasticsearch - 第二部分

作者&#xff1a;来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo 本文分为两部分&#xff0c;第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。 在本文中&#xff0c;我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器…...

数据密码解锁之DeepSeek 和其他 AI 大模型对比的神秘面纱

本篇将揭露DeepSeek 和其他 AI 大模型差异所在。 目录 ​编辑 一本篇背景&#xff1a; 二性能对比&#xff1a; 2.1训练效率&#xff1a; 2.2推理速度&#xff1a; 三语言理解与生成能力对比&#xff1a; 3.1语言理解&#xff1a; 3.2语言生成&#xff1a; 四本篇小结…...

安心即美的生活方式

如果你的心是安定的&#xff0c;那么&#xff0c;外界也就安静了。就像陶渊明说的&#xff1a;心远地自偏。不是走到偏远无人的边荒才能得到片刻清净&#xff0c;不需要使用洪荒之力去挣脱生活的枷锁&#xff0c;这是陶渊明式的中国知识分子的雅量。如果你自己是好的男人或女人…...

基于深度学习的输电线路缺陷检测算法研究(论文+源码)

输电线路关键部件的缺陷检测对于电网安全运行至关重要&#xff0c;传统方法存在效率低、准确性不高等问题。本研究探讨了利用深度学习技术进行输电线路关键组件的缺陷检测&#xff0c;目的是提升检测的效率与准确度。选用了YOLOv8模型作为基础&#xff0c;并通过加入CA注意力机…...

手写防抖函数、手写节流函数

文章目录 1 手写防抖函数2 手写节流函数 1 手写防抖函数 函数防抖是指在事件被触发n秒后再执行回调&#xff0c;如果在这n秒内事件又被触发&#xff0c;则重新计时。这可以使用在一些点击请求的事件上&#xff0c;避免因为用户的多次点击向后端发送多次请求。 function debou…...

UE 导入sbsar插件

Substance 3D 插件支持直接在 Unreal Engine 5 和 Unreal Engine 4 中使用 Substance 材质。无论您是在处理游戏、可视化&#xff0c;还是在移动设备、桌面或 XR 上进行部署&#xff0c;Substance 都能提供独特的体验&#xff0c;并优化功能以提高生产力。 Substance 资源平台…...

pytorch实现简单的情感分析算法

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 在PyTorch中实现中文情感分析算法通常涉及以下几个步骤&#xff1a;数据预处理、模型定义、训练和评估。下面是一个简单的实现示例&#xff0c;使用LSTM模型进行中文情感分析。 1. 数据预处理 首先&#xff0c;我…...

Baklib揭示内容中台实施最佳实践的策略与实战经验

内容概要 在当前数字化转型的浪潮中&#xff0c;内容中台的概念日益受到关注。它不再仅仅是一个内容管理系统&#xff0c;而是企业提升运营效率与灵活应对市场变化的重要支撑平台。内容中台的实施离不开最佳实践的指导&#xff0c;这些实践为企业在建设高效内容中台时提供了宝…...

11.[前端开发]Day11-HTML+CSS阶段练习(仿小米和考拉页面)

一、小米穿戴设备&#xff08;浮动&#xff09; 完整代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"vie…...

设计模式学习(二)

结构型 适配器模式 定义 它允许将一个类的接口转换成客户端期望的另一个接口。适配器模式通常用于使不兼容的接口能够一起工作。 适配器模式的角色 目标接口&#xff08;Target&#xff09;&#xff1a;客户端期望的接口。适配者&#xff08;Adaptee&#xff09;&#xff…...

【Docker】快速部署 Nacos 注册中心

【Docker】快速部署 Nacos 注册中心 引言 Nacos 注册中心是一个用于服务发现和配置管理的开源项目。提供了动态服务发现、服务健康检查、动态配置管理和服务管理等功能&#xff0c;帮助开发者更轻松地构建微服务架构。 仓库地址 https://github.com/alibaba/nacos 步骤 拉取…...

大白话讲清楚embedding原理

Embedding&#xff08;嵌入&#xff09;是一种将高维数据&#xff08;如单词、句子、图像等&#xff09;映射到低维连续向量的技术&#xff0c;其核心目的是通过向量表示捕捉数据之间的语义或特征关系。以下从原理、方法和应用三个方面详细解释Embedding的工作原理。 一、Embe…...

pandas中的apply方法使用

apply 用于对 DataFrame 或 Series 中的数据进行逐行或逐列的操作。它可以接受一个函数&#xff08;通常是 lambda 函数或自定义函数&#xff09;&#xff0c;并将该函数应用到每一行或每一列上。apply语法&#xff1a; DataFrame.apply(func, axis0, rawFalse, result_typeNo…...

简单易懂的倒排索引详解

文章目录 简单易懂的倒排索引详解一、引言 简单易懂的倒排索引详解二、倒排索引的基本结构三、倒排索引的构建过程四、使用示例1、Mapper函数2、Reducer函数 五、总结 简单易懂的倒排索引详解 一、引言 倒排索引是一种广泛应用于搜索引擎和大数据处理中的数据结构&#xff0c;…...

Nginx知识

nginx 精简的配置文件 worker_processes 1; # 可以理解为一个内核一个worker # 开多了可能性能不好events {worker_connections 1024; } # 一个 worker 可以创建的连接数 # 1024 代表默认一般不用改http {include mime.types;# 代表引入的配置文件# mime.types 在 ngi…...

CNN的各种知识点(三):有关于VGG16 的结构展开的问题(1)

有关于VGG16 的结构展开的问题&#xff08;1&#xff09; 1. VGG16 的原生结构2. model.avgpool 的作用原生 VGG16 中没有 avgpool 层&#xff1f;代码中的 model.avgpool 是什么&#xff1f; 3. model.classifier 的作用原生 VGG16 的 classifier用户代码中的 classifier 4. 为…...

vue3中el-input无法获得焦点的问题

文章目录 现象两次nextTick()加setTimeout()解决结论 现象 el-input被外层div包裹了&#xff0c;设置autofocus不起作用&#xff1a; <el-dialog v-model"visible" :title"title" :append-to-bodytrue width"50%"><el-form v-model&q…...

sqli-labs靶场通关

sqli-las通关 mysql数据库5.0以上版本有一个自带的数据库叫做information_schema,该数据库下面有两个表一个是tables和columns。tables这个表的table_name字段下面是所有数据库存在的表名。table_schema字段下是所有表名对应的数据库名。columns这个表的colum_name字段下是所有…...

深度学习深度解析:从基础到前沿

引言 深度学习作为人工智能的一个重要分支&#xff0c;通过模拟人脑的神经网络结构来进行数据分析和模式识别。它在图像识别、自然语言处理、语音识别等领域取得了显著成果。本文将深入探讨深度学习的基础知识、主要模型架构以及当前的研究热点和发展趋势。 基础概念与数学原理…...

sobel边缘检测算法

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 Sobel边缘检测算法是一种用于图像处理中的边缘检测方法&#xff0c;它能够突出图像中灰度变化剧烈的地方&#xff0c;也就是边缘。该算法通过计算图像在水平方向和垂直方向上的梯度来检测边缘&#xff0c;梯度值越大…...

LeetCode 349: 两个数组的交集

LeetCode 349: 两个数组的交集 - C语言 问题描述 给定两个数组 ransomNote 和 magazine&#xff0c;你需要判断 ransomNote 是否可以由 magazine 里的字符构成。每个字符可以使用一次。 解题思路 通过统计 magazine 中每个字符的频次&#xff0c;并与 ransomNote 中字符的需…...

MATLAB的数据类型和各类数据类型转化示例

一、MATLAB的数据类型 在MATLAB中 &#xff0c;数据类型是非常重要的概念&#xff0c;因为它们决定了如何存储和操作数据。MATLAB支持数值型、字符型、字符串型、逻辑型、结构体、单元数组、数组和矩阵等多种数据类型。MATLAB 是一种动态类型语言&#xff0c;这意味着变量的数…...

c++ list的front和pop_front的概念和使用案例—第2版

在 C 标准库中&#xff0c;std::list 的 front() 和 pop_front() 是与链表头部元素密切相关的两个成员函数。以下是它们的核心概念和具体使用案例&#xff1a; 1. front() 方法 概念&#xff1a; 功能&#xff1a;返回链表中第一个元素的引用&#xff08;直接访问头部元素&am…...

如何使用SliverList组件

文章目录 1 概念介绍2 使用方法3 示例代码 我们在上一章回中介绍了沉浸式状态栏相关的内容&#xff0c;本章回中将介绍SliverList组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1 概念介绍 我们在这里介绍的SliverList组件是一种列表类组件&#xff0c;类似我们之前介…...

DIFY源码解析

偶然发现Github上某位大佬开源的DIFY源码注释和解析&#xff0c;目前还处于陆续不断更新地更新过程中&#xff0c;为大佬的专业和开源贡献精神点赞。先收藏链接&#xff0c;后续慢慢学习。 相关链接如下&#xff1a; DIFY源码解析...

搜索引擎友好:设计快速收录的网站架构

本文来自&#xff1a;百万收录网 原文链接&#xff1a;https://www.baiwanshoulu.com/14.html 为了设计一个搜索引擎友好的网站架构&#xff0c;以实现快速收录&#xff0c;可以从以下几个方面入手&#xff1a; 一、清晰的目录结构与层级 合理划分内容&#xff1a;目录结构应…...

2007-2019年各省科学技术支出数据

2007-2019年各省科学技术支出数据 1、时间&#xff1a;2007-2019年 2、来源&#xff1a;国家统计局、统计年鉴 3、指标&#xff1a;行政区划代码、地区名称、年份、科学技术支出 4、范围&#xff1a;31省 5、指标解释&#xff1a;科学技术支出是指为促进科学研究、技术开发…...

【数据分析】案例03:当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib)

当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib) 当当网近30日热销书籍官网写在前面 实验目的:实现当当网近30日热销图书的数据采集与可视化分析。 电脑系统:Windows 使用软件:Visual Studio Code Python版本:python 3.12.4 技术需求:scrapy、…...

DRM系列二:DRM总体介绍

一、简介 DRM&#xff0c;全称Direct Rending Manger。是目前Linux主流的图形显示框架。相比较传统的Framebuffer&#xff08;FB原生不支持多层合成&#xff0c;不支持VSYNC&#xff0c;不支持DMA-BUF&#xff0c;不支持异步更新&#xff0c;不支持fence机制等等&#xff09;&…...

步进电机的型号和分类

步进电机的型号和分类通常根据其尺寸、结构、相数、步距角等参数来区分。以下是一些常见的步进电机型号、分类方法以及如何识别它们的指南&#xff1a; 一、常见步进电机型号 步进电机的型号通常由厂家命名&#xff0c;但也有一些通用的命名规则。以下是一些常见的型号系列&am…...

【力扣】15.三数之和

AC截图 题目 思路 这道题如果简单的用暴力三重遍历去做&#xff0c;会超时。所以我们思考假如有三个下标&#xff0c;i&#xff0c;l&#xff0c;r 其中i0&#xff08;初始&#xff09;&#xff0c;li1 rnums.size()-1 我们固定nums[i]的值&#xff0c;那么就转换为两数之和…...

Redis 基础命令

1. redis 命令官网 https://redis.io/docs/latest/commands/ 2. 在 redis-cli 中使用 help 命令 # 查看 help string 基础命令 keys * # * 代表通配符set key value # 设置键值对del key # 删除键expire key 时间 # 给键设置时间 # -2 代表时间到期了&#xff0c; -1 代表…...

CSES Missing Coin Sum

思路是对数组排序 设 S [ i ] S[i] S[i] 是数组的前缀和 R [ i ] R[i] R[i] 是递增排序后的数组 遍历数组&#xff0c;如果出现 S [ i − 1 ] 1 < R [ i ] S[i - 1] 1 < R[i] S[i−1]1<R[i]&#xff0c;就代表S[i - 1] 1是不能被合成出来的数字 因为&#xff1a…...

Python中的数据类(dataclass):简化类的定义与数据管理

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 随着Python语言的发展,代码的简洁性与可维护性变得愈发重要。Python 3.7引入的dataclass模块为数据类的定义提供了一种简便而高效的方式,…...

Java线程认识和Object的一些方法ObjectMonitor

专栏系列文章地址&#xff1a;https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标&#xff1a; 要对Java线程有整体了解&#xff0c;深入认识到里面的一些方法和Object对象方法的区别。认识到Java对象的ObjectMonitor&#xff0c;这有助于后面的Synchron…...

使用真实 Elasticsearch 进行高级集成测试

作者&#xff1a;来自 Elastic Piotr Przybyl 掌握高级 Elasticsearch 集成测试&#xff1a;更快、更智能、更优化。 在上一篇关于集成测试的文章中&#xff0c;我们介绍了如何通过改变数据初始化策略来缩短依赖于真实 Elasticsearch 的集成测试的执行时间。在本期中&#xff0…...

统计学中的样本概率论中的样本

不知道当初谁想的把概率论和数理统计合并&#xff0c;作为一门课。这本身是可以合并&#xff0c;完整的一条线&#xff0c;看这里。但是&#xff0c;作为任课老师应该从整体上交代清楚&#xff0c;毕竟是两个学科&#xff0c;不同的学科合并必然会有各种不协调的问题。 举个最…...

SQL 总结

SQL 总结 引言 SQL(Structured Query Language)是一种用于管理关系数据库的计算机语言。自从1970年代被发明以来,SQL已经成为了数据库管理的基础。本文将对SQL的基本概念、常用命令、高级特性以及SQL在数据库管理中的应用进行总结。 SQL基本概念 数据库 数据库是存储数…...