CompletableFuture란?
Java 8에서 도입된 CompletableFuture는 비동기 작업을 선언적으로 조합할 수 있는 API입니다. 기존 Future는 결과를 get()으로 블로킹 대기해야 했지만, CompletableFuture는 콜백 체이닝으로 논블로킹 파이프라인을 구성합니다.
택배 추적에 비유하면 이해가 쉽습니다. 기존 Future는 택배가 올 때까지 문 앞에서 기다리는 것이고, CompletableFuture는 “택배 도착하면 → 서명하고 → 상자 열어서 → 내용물 확인”이라는 지시를 미리 등록해두는 것입니다.
기본 생성과 실행
// CompletableFutureBasic.java — 기본 생성과 실행
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {
// 1. supplyAsync — 값을 반환하는 비동기 작업
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000); // 1초 지연 시뮬레이션
return "데이터베이스 조회 결과";
});
// ForkJoinPool.commonPool()에서 실행
// 2. runAsync — 반환값 없는 비동기 작업
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
sleep(500);
System.out.println("로그 저장 완료");
});
// 3. 커스텀 스레드 풀 지정 (운영 환경 필수!)
ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("실행 스레드: " + Thread.currentThread().getName());
return 42;
}, pool); // 두 번째 인자로 Executor 전달
// 결과 확인
System.out.println(future1.get(3, TimeUnit.SECONDS));
// 실행 결과: 데이터베이스 조회 결과
System.out.println(future3.get());
// 실행 결과:
// 실행 스레드: pool-1-thread-1
// 42
pool.shutdown();
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
체이닝: thenApply, thenCompose, thenAccept
// ChainingExample.java — 비동기 파이프라인 체이닝
import java.util.concurrent.CompletableFuture;
public class ChainingExample {
public static void main(String[] args) throws Exception {
// thenApply — 결과를 변환 (map과 유사)
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> " Hello, CompletableFuture! ")
.thenApply(String::trim) // 공백 제거
.thenApply(String::toUpperCase) // 대문자 변환
.thenApply(s -> s + " [처리완료]"); // 접미사 추가
System.out.println(pipeline.get());
// 실행 결과: HELLO, COMPLETABLEFUTURE! [처리완료]
// thenCompose — 비동기 작업 연결 (flatMap과 유사)
CompletableFuture<String> composed = getUserId()
.thenCompose(ChainingExample::getUserName) // userId → userName 비동기 조회
.thenCompose(ChainingExample::getUserEmail); // userName → email 비동기 조회
System.out.println(composed.get());
// 실행 결과: user-42@example.com
// thenAccept — 결과를 소비 (반환값 없음)
CompletableFuture.supplyAsync(() -> "처리 데이터")
.thenAccept(data -> System.out.println("수신: " + data))
.thenRun(() -> System.out.println("파이프라인 종료"));
// 실행 결과:
// 수신: 처리 데이터
// 파이프라인 종료
// Async 변형 — 별도 스레드에서 콜백 실행
CompletableFuture.supplyAsync(() -> "데이터")
.thenApplyAsync(s -> {
// 이 콜백은 다른 스레드에서 실행
System.out.println("콜백 스레드: " + Thread.currentThread().getName());
return s.length();
});
Thread.sleep(500); // 비동기 완료 대기
}
// 비동기 단계별 메서드
static CompletableFuture<Long> getUserId() {
return CompletableFuture.supplyAsync(() -> 42L);
}
static CompletableFuture<String> getUserName(Long id) {
return CompletableFuture.supplyAsync(() -> "user-" + id);
}
static CompletableFuture<String> getUserEmail(String name) {
return CompletableFuture.supplyAsync(() -> name + "@example.com");
}
}
조합: allOf, anyOf, thenCombine
// CombineExample.java — 여러 비동기 작업 조합
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Stream;
public class CombineExample {
public static void main(String[] args) throws Exception {
// thenCombine — 두 작업의 결과를 합침
CompletableFuture<String> priceFuture = CompletableFuture
.supplyAsync(() -> { sleep(800); return 15000; }) // 가격 조회
.thenCombine(
CompletableFuture.supplyAsync(() -> { sleep(600); return 0.1; }), // 할인율 조회
(price, discount) -> String.format("최종 가격: %,d원", (int)(price * (1 - discount)))
);
System.out.println(priceFuture.get());
// 실행 결과: 최종 가격: 13,500원
// 두 작업이 병렬 실행 → 총 소요 ~800ms (직렬이면 1,400ms)
// allOf — 모든 작업이 완료될 때까지 대기
CompletableFuture<String> api1 = CompletableFuture.supplyAsync(() -> { sleep(300); return "사용자 정보"; });
CompletableFuture<String> api2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "주문 내역"; });
CompletableFuture<String> api3 = CompletableFuture.supplyAsync(() -> { sleep(200); return "배송 상태"; });
CompletableFuture<List<String>> allResults = CompletableFuture
.allOf(api1, api2, api3)
.thenApply(v -> Stream.of(api1, api2, api3)
.map(CompletableFuture::join) // 이미 완료되었으므로 블로킹 없음
.toList()
);
System.out.println(allResults.get());
// 실행 결과: [사용자 정보, 주문 내역, 배송 상태]
// 총 소요 ~500ms (가장 느린 작업 기준)
// anyOf — 가장 먼저 완료되는 작업 사용
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> { sleep(1000); return "서버 A 응답"; }),
CompletableFuture.supplyAsync(() -> { sleep(200); return "서버 B 응답"; }),
CompletableFuture.supplyAsync(() -> { sleep(500); return "서버 C 응답"; })
);
System.out.println(fastest.get());
// 실행 결과: 서버 B 응답 (200ms로 가장 빠름)
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
에러 처리
// ErrorHandling.java — CompletableFuture 예외 처리 패턴
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ErrorHandling {
public static void main(String[] args) throws Exception {
// exceptionally — 예외 발생 시 대체값 반환
CompletableFuture<String> withFallback = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("DB 연결 실패");
return "데이터";
})
.exceptionally(ex -> {
System.err.println("에러 발생: " + ex.getMessage());
return "캐시 데이터"; // 대체값
});
System.out.println(withFallback.get());
// 실행 결과:
// 에러 발생: java.lang.RuntimeException: DB 연결 실패
// 캐시 데이터
// handle — 성공/실패 모두 처리
CompletableFuture<String> handled = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("랜덤 에러");
return "정상 결과";
})
.handle((result, ex) -> {
if (ex != null) {
return "기본값 (에러: " + ex.getMessage() + ")";
}
return result + " [검증 완료]";
});
System.out.println(handled.get());
// 성공 시: 정상 결과 [검증 완료]
// 실패 시: 기본값 (에러: java.lang.RuntimeException: 랜덤 에러)
// 타임아웃 설정 (Java 9+)
CompletableFuture<String> withTimeout = CompletableFuture
.supplyAsync(() -> { sleep(5000); return "느린 응답"; })
.orTimeout(2, TimeUnit.SECONDS) // 2초 초과 시 TimeoutException
.exceptionally(ex -> "타임아웃 기본값");
System.out.println(withTimeout.get());
// 실행 결과: 타임아웃 기본값
// completeOnTimeout (Java 9+) — 타임아웃 시 기본값
CompletableFuture<String> withDefault = CompletableFuture
.supplyAsync(() -> { sleep(5000); return "느린 응답"; })
.completeOnTimeout("빠른 기본값", 1, TimeUnit.SECONDS);
System.out.println(withDefault.get());
// 실행 결과: 빠른 기본값
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
스레드 풀 설계
// ThreadPoolConfig.java — 운영 환경 스레드 풀 설정
import java.util.concurrent.*;
public class ThreadPoolConfig {
public static void main(String[] args) {
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU 바운드 작업: 코어 수 + 1
ExecutorService cpuPool = Executors.newFixedThreadPool(cpuCores + 1);
// I/O 바운드 작업: 코어 수 * (1 + 대기/처리 비율)
// 대기 시간이 처리 시간의 10배라면: cores * 11
ExecutorService ioPool = new ThreadPoolExecutor(
cpuCores * 2, // 코어 스레드 수
cpuCores * 10, // 최대 스레드 수
60L, TimeUnit.SECONDS, // 유휴 스레드 유지 시간
new LinkedBlockingQueue<>(1000), // 작업 큐 (제한 필수!)
new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);
// 사용 예시
CompletableFuture<String> dbQuery = CompletableFuture
.supplyAsync(() -> "DB 결과", ioPool); // I/O 작업 → ioPool
CompletableFuture<Integer> calculation = CompletableFuture
.supplyAsync(() -> fibonacci(40), cpuPool); // CPU 작업 → cpuPool
System.out.println("CPU 코어: " + cpuCores);
// 실행 결과: CPU 코어: 8 (환경에 따라 다름)
cpuPool.shutdown();
ioPool.shutdown();
}
static int fibonacci(int n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
실전 팁
CompletableFuture를 효과적으로 사용하기 위한 핵심 원칙입니다.
- 커스텀 스레드 풀을 반드시 사용합니다. 기본
ForkJoinPool.commonPool()은 모든 CompletableFuture가 공유하므로, 하나의 느린 작업이 전체 시스템을 블로킹할 수 있습니다 - 체이닝에서
thenApplyvsthenCompose를 구분합니다. 동기 변환은thenApply, 비동기 작업 연결은thenCompose입니다. 잘못 사용하면CompletableFuture<CompletableFuture<T>>중첩이 발생합니다 - 타임아웃을 항상 설정합니다. Java 9+의
orTimeout()또는completeOnTimeout()을 사용합니다 join()대신get()에 타임아웃을 지정합니다.join()은 무한 대기 가능성이 있습니다- 에러 처리를 체인 끝에 배치합니다.
exceptionally()를 파이프라인 마지막에 두면 모든 단계의 예외를 포착합니다 - Java 21 이상이라면 Virtual Thread와 함께 사용하면 I/O 바운드 작업에서 더 높은 처리량을 얻을 수 있습니다