1. FeatureTask

FutureTask是Runnable, Future接口的实现类;
image-20250129000018423

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
FutureTask<String> futureTask = new FutureTask<String>( () -> {
System.out.println(Thread.currentThread().getName()+"\t -----come in");
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
return "task over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"\t ----忙其它任务了");
//System.out.println(futureTask.get());
// System.out.println(futureTask.get(3,TimeUnit.SECONDS));
while(true) {
if(futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
}
}
}
运行结果

main —-忙其它任务了
t1 —–come in
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
正在处理中,不要再催了,越催越慢 ,再催熄火
task over

2. FeatureTask配合线程池使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
//3个任务,目前开启多个异步任务线程来处理,请问耗时多少?
// 注意:此处使用异步线程执行任务,但是get方法会阻塞线程知道执行结束
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();

FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task1 over";
});
threadPool.submit(futureTask1);

FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
return "task2 over";
});
threadPool.submit(futureTask2);
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println(Thread.currentThread().getName()+"\t -----end");
threadPool.shutdown();
}
运行结果

task1 over
task2 over
—-costTime: 830 毫秒
main —–end

3. CompletableFuture

image-20250128145141778

1
2
3
4
5
6
// 使用completebleFuture开启异步任务时,使用的是默认的ForkJoinPool线程池。
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

image-20250128230501069

  1. 描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

    • thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage
    • thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage
    • thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage
    • 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
1
2
3
4
5
6
7
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f ->{
return f + 2;
}).thenApply(f ->{
return f + 3;
}).thenAccept(System.out::println);
  1. 描述AND汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口。这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});

CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("-----开始两个结果合并");
return x + y;
});

System.out.println(result.join());
  1. 描述OR汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return "playA";
});

CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return "playB";
});

CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winer";
});

System.out.println(Thread.currentThread().getName()+"\t"+"-----: "+result.join());
  1. 异常处理,主要是exceptionally、whencomplete、whencompleteAsync、handle、handleAsync接口,类比try{}catch{}中的 catch{}、try{}finally{}中的 finally{}。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
ExecutorService threadPool = Executors.newFixedThreadPool(3);

CompletableFuture.supplyAsync(() ->{
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1;
},threadPool).handle((f,e) -> {
// int i=10/0;
System.out.println("222");
return f + 2;
}).handle((f,e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("----计算结果: "+v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});

System.out.println(Thread.currentThread().getName()+"----主线程先去忙其它任务");

threadPool.shutdown();
}

4. CompletableFuture搭配线程池使用

1
2
3
4
5
6
7
8
9
10
11
{
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return "hello supplyAsync";
},threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.