CompletableFuture

为什么要使用CompletableFuture

示例

CompletableFuture<ProductInfo> productFuture = CompletableFuture.supplyAsync(() -> getProductInfo());
 
CompletableFuture<Stock> stockFuture = CompletableFuture.supplyAsync(() -> getStock());
 
CompletableFuture<Promotion> promotionFuture = CompletableFuture.supplyAsync(() -> getPromotion());
 
CompletableFuture<Comments> commentsFuture = CompletableFuture.supplyAsync(() -> getComments());
 
CompletableFuture.allOf(productFuture, stockFuture, promotionFuture, commentsFuture)
    .thenAccept(v -> {
        // 所有数据都准备好了,开始组装页面
        buildPage(productFuture.join(), stockFuture.join(), 
                 promotionFuture.join(), commentsFuture.join());
    });
 

2. 创建异步任务的花式方法

// supplyAsync:我做事靠谱,一定给你返回点什么
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "我是有结果的异步任务";
});
 
// runAsync:我比较佛系,不想给你返回任何东西
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("我只是默默地执行,不给你返回值");
});
 

自定义线程池的正确姿势

// 错误示范:这是一匹脱缰的野马!
ExecutorService wrongPool = Executors.newFixedThreadPool(10);
 
// 正确示范:这才是精心调教过的千里马
ThreadPoolExecutor rightPool = new ThreadPoolExecutor(
    5,                      // 核心线程数(正式员工)
    10,                     // 最大线程数(含临时工)
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100),  // 工作队列(候客区)
    new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(),  // 线程工厂(员工登记处)
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略(客满时的处理方案)
);
 
// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "我是通过专属线程池执行的任务";
}, rightPool);
 

异步任务的取消和超时处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时操作
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        // 被中断时的处理
        return "我被中断了!";
    }
    return "正常完成";
});
 
// 设置超时
try {
    String result = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true);  // 超时就取消任务
    System.out.println("等太久了,不等了!");
}
 
// 更优雅的超时处理
future.completeOnTimeout("默认值", 3, TimeUnit.SECONDS)
      .thenAccept(result -> System.out.println("最终结果:" + result));
 
// 或者配合orTimeout使用
future.orTimeout(3, TimeUnit.SECONDS)  // 超时就抛异常
      .exceptionally(ex -> "超时默认值")
      .thenAccept(result -> System.out.println("最终结果:" + result));
 

3. 链式调用的艺术

thenApply、thenAccept、thenRun的区别

CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> {
        // 我是加工工人,负责把材料加工后返回新成品
        return s + " World";
    })
    .thenAccept(result -> {
        // 我是检验工人,只负责验收,不返回东西
        System.out.println("收到结果: " + result);
    })
    .thenRun(() -> {
        // 我是打扫工人,不关心之前的结果,只负责收尾工作
        System.out.println("生产线工作完成,开始打扫");
    });
 

异步转换:thenApplyAsync的使用场景

CompletableFuture.supplyAsync(() -> {
    // 模拟获取用户信息
    return "用户基础信息";
}).thenApplyAsync(info -> {
    // 耗时的处理操作,在新的线程中执行
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return info + " + 附加信息";
}, customExecutor);  // 可以指定自己的线程池
 

组合多个异步操作:thenCompose vs thenCombine

CompletableFuture<String> getUserEmail(String userId) {
    return CompletableFuture.supplyAsync(() -> "user@example.com");
}
 
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "userId")
    .thenCompose(userId -> getUserEmail(userId));  // 基于第一个结果去获取邮箱
 

4. 异常处理的技巧

应急的exceptionally

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("服务暂时不可用");
        }
        return "正常返回的数据";
    })
    .exceptionally(throwable -> {
        // 记录异常日志
        log.error("操作失败", throwable);
        // 返回默认值
        return "服务异常,返回默认数据";
    });
 

两全其美的handle

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟服务异常");
        }
        return "原始数据";
    })
    .handle((result, throwable) -> {
        if (throwable != null) {
            log.error("处理异常", throwable);
            return "发生异常,返回备用数据";
        }
        return result + " - 正常处理完成";
    });
 

whenComplete

// whenComplete:只是旁观者,不能修改结果
CompletableFuture<String> future1 = CompletableFuture
    .supplyAsync(() -> "原始数据")
    .whenComplete((result, throwable) -> {
        // 只能查看结果,无法修改
        if (throwable != null) {
            log.error("发生异常", throwable);
        } else {
            log.info("处理完成: {}", result);
        }
    });
 
// handle:既是参与者又是修改者
CompletableFuture<String> future2 = CompletableFuture
    .supplyAsync(() -> "原始数据")
    .handle((result, throwable) -> {
        // 可以根据结果或异常,返回新的值
        if (throwable != null) {
            return "异常情况下的替代数据";
        }
        return result + " - 已处理";
    });