Java 동시성 프로그래밍 — CompletableFuture 실전 가이드

CompletableFuture란?

Java 8에서 도입된 CompletableFuture는 비동기 작업을 선언적으로 조합할 수 있는 API입니다. 기존 Future는 결과를 get()으로 블로킹 대기해야 했지만, CompletableFuture는 콜백 체이닝으로 논블로킹 파이프라인을 구성합니다.

택배 추적에 비유하면 이해가 쉽습니다. 기존 Future는 택배가 올 때까지 문 앞에서 기다리는 것이고, CompletableFuture는 “택배 도착하면 → 서명하고 → 상자 열어서 → 내용물 확인”이라는 지시를 미리 등록해두는 것입니다.

기본 생성과 실행

// CompletableFutureBasic.java — 기본 생성과 실행
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureBasic {
    public static void main(String[] args) throws Exception {
        // 1. supplyAsync — 값을 반환하는 비동기 작업
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000); // 1초 지연 시뮬레이션
            return "데이터베이스 조회 결과";
        });
        // ForkJoinPool.commonPool()에서 실행

        // 2. runAsync — 반환값 없는 비동기 작업
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            sleep(500);
            System.out.println("로그 저장 완료");
        });

        // 3. 커스텀 스레드 풀 지정 (운영 환경 필수!)
        ExecutorService pool = Executors.newFixedThreadPool(4);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("실행 스레드: " + Thread.currentThread().getName());
            return 42;
        }, pool); // 두 번째 인자로 Executor 전달

        // 결과 확인
        System.out.println(future1.get(3, TimeUnit.SECONDS));
        // 실행 결과: 데이터베이스 조회 결과

        System.out.println(future3.get());
        // 실행 결과:
        // 실행 스레드: pool-1-thread-1
        // 42

        pool.shutdown();
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

체이닝: thenApply, thenCompose, thenAccept

// ChainingExample.java — 비동기 파이프라인 체이닝
import java.util.concurrent.CompletableFuture;

public class ChainingExample {
    public static void main(String[] args) throws Exception {

        // thenApply — 결과를 변환 (map과 유사)
        CompletableFuture<String> pipeline = CompletableFuture
            .supplyAsync(() -> "  Hello, CompletableFuture!  ")
            .thenApply(String::trim)           // 공백 제거
            .thenApply(String::toUpperCase)    // 대문자 변환
            .thenApply(s -> s + " [처리완료]"); // 접미사 추가

        System.out.println(pipeline.get());
        // 실행 결과: HELLO, COMPLETABLEFUTURE! [처리완료]

        // thenCompose — 비동기 작업 연결 (flatMap과 유사)
        CompletableFuture<String> composed = getUserId()
            .thenCompose(ChainingExample::getUserName)   // userId → userName 비동기 조회
            .thenCompose(ChainingExample::getUserEmail);  // userName → email 비동기 조회

        System.out.println(composed.get());
        // 실행 결과: user-42@example.com

        // thenAccept — 결과를 소비 (반환값 없음)
        CompletableFuture.supplyAsync(() -> "처리 데이터")
            .thenAccept(data -> System.out.println("수신: " + data))
            .thenRun(() -> System.out.println("파이프라인 종료"));
        // 실행 결과:
        // 수신: 처리 데이터
        // 파이프라인 종료

        // Async 변형 — 별도 스레드에서 콜백 실행
        CompletableFuture.supplyAsync(() -> "데이터")
            .thenApplyAsync(s -> {
                // 이 콜백은 다른 스레드에서 실행
                System.out.println("콜백 스레드: " + Thread.currentThread().getName());
                return s.length();
            });

        Thread.sleep(500); // 비동기 완료 대기
    }

    // 비동기 단계별 메서드
    static CompletableFuture<Long> getUserId() {
        return CompletableFuture.supplyAsync(() -> 42L);
    }

    static CompletableFuture<String> getUserName(Long id) {
        return CompletableFuture.supplyAsync(() -> "user-" + id);
    }

    static CompletableFuture<String> getUserEmail(String name) {
        return CompletableFuture.supplyAsync(() -> name + "@example.com");
    }
}

조합: allOf, anyOf, thenCombine

// CombineExample.java — 여러 비동기 작업 조합
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Stream;

public class CombineExample {
    public static void main(String[] args) throws Exception {

        // thenCombine — 두 작업의 결과를 합침
        CompletableFuture<String> priceFuture = CompletableFuture
            .supplyAsync(() -> { sleep(800); return 15000; })    // 가격 조회
            .thenCombine(
                CompletableFuture.supplyAsync(() -> { sleep(600); return 0.1; }), // 할인율 조회
                (price, discount) -> String.format("최종 가격: %,d원", (int)(price * (1 - discount)))
            );

        System.out.println(priceFuture.get());
        // 실행 결과: 최종 가격: 13,500원
        // 두 작업이 병렬 실행 → 총 소요 ~800ms (직렬이면 1,400ms)

        // allOf — 모든 작업이 완료될 때까지 대기
        CompletableFuture<String> api1 = CompletableFuture.supplyAsync(() -> { sleep(300); return "사용자 정보"; });
        CompletableFuture<String> api2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "주문 내역"; });
        CompletableFuture<String> api3 = CompletableFuture.supplyAsync(() -> { sleep(200); return "배송 상태"; });

        CompletableFuture<List<String>> allResults = CompletableFuture
            .allOf(api1, api2, api3)
            .thenApply(v -> Stream.of(api1, api2, api3)
                .map(CompletableFuture::join) // 이미 완료되었으므로 블로킹 없음
                .toList()
            );

        System.out.println(allResults.get());
        // 실행 결과: [사용자 정보, 주문 내역, 배송 상태]
        // 총 소요 ~500ms (가장 느린 작업 기준)

        // anyOf — 가장 먼저 완료되는 작업 사용
        CompletableFuture<Object> fastest = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> { sleep(1000); return "서버 A 응답"; }),
            CompletableFuture.supplyAsync(() -> { sleep(200);  return "서버 B 응답"; }),
            CompletableFuture.supplyAsync(() -> { sleep(500);  return "서버 C 응답"; })
        );

        System.out.println(fastest.get());
        // 실행 결과: 서버 B 응답 (200ms로 가장 빠름)
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

에러 처리

// ErrorHandling.java — CompletableFuture 예외 처리 패턴
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class ErrorHandling {
    public static void main(String[] args) throws Exception {

        // exceptionally — 예외 발생 시 대체값 반환
        CompletableFuture<String> withFallback = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("DB 연결 실패");
                return "데이터";
            })
            .exceptionally(ex -> {
                System.err.println("에러 발생: " + ex.getMessage());
                return "캐시 데이터"; // 대체값
            });

        System.out.println(withFallback.get());
        // 실행 결과:
        // 에러 발생: java.lang.RuntimeException: DB 연결 실패
        // 캐시 데이터

        // handle — 성공/실패 모두 처리
        CompletableFuture<String> handled = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) throw new RuntimeException("랜덤 에러");
                return "정상 결과";
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    return "기본값 (에러: " + ex.getMessage() + ")";
                }
                return result + " [검증 완료]";
            });

        System.out.println(handled.get());
        // 성공 시: 정상 결과 [검증 완료]
        // 실패 시: 기본값 (에러: java.lang.RuntimeException: 랜덤 에러)

        // 타임아웃 설정 (Java 9+)
        CompletableFuture<String> withTimeout = CompletableFuture
            .supplyAsync(() -> { sleep(5000); return "느린 응답"; })
            .orTimeout(2, TimeUnit.SECONDS)          // 2초 초과 시 TimeoutException
            .exceptionally(ex -> "타임아웃 기본값");

        System.out.println(withTimeout.get());
        // 실행 결과: 타임아웃 기본값

        // completeOnTimeout (Java 9+) — 타임아웃 시 기본값
        CompletableFuture<String> withDefault = CompletableFuture
            .supplyAsync(() -> { sleep(5000); return "느린 응답"; })
            .completeOnTimeout("빠른 기본값", 1, TimeUnit.SECONDS);

        System.out.println(withDefault.get());
        // 실행 결과: 빠른 기본값
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

스레드 풀 설계

// ThreadPoolConfig.java — 운영 환경 스레드 풀 설정
import java.util.concurrent.*;

public class ThreadPoolConfig {
    public static void main(String[] args) {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // CPU 바운드 작업: 코어 수 + 1
        ExecutorService cpuPool = Executors.newFixedThreadPool(cpuCores + 1);

        // I/O 바운드 작업: 코어 수 * (1 + 대기/처리 비율)
        // 대기 시간이 처리 시간의 10배라면: cores * 11
        ExecutorService ioPool = new ThreadPoolExecutor(
            cpuCores * 2,             // 코어 스레드 수
            cpuCores * 10,            // 최대 스레드 수
            60L, TimeUnit.SECONDS,    // 유휴 스레드 유지 시간
            new LinkedBlockingQueue<>(1000), // 작업 큐 (제한 필수!)
            new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
        );

        // 사용 예시
        CompletableFuture<String> dbQuery = CompletableFuture
            .supplyAsync(() -> "DB 결과", ioPool);     // I/O 작업 → ioPool

        CompletableFuture<Integer> calculation = CompletableFuture
            .supplyAsync(() -> fibonacci(40), cpuPool); // CPU 작업 → cpuPool

        System.out.println("CPU 코어: " + cpuCores);
        // 실행 결과: CPU 코어: 8 (환경에 따라 다름)

        cpuPool.shutdown();
        ioPool.shutdown();
    }

    static int fibonacci(int n) {
        if (n <= 1) return n;
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}

실전 팁

CompletableFuture를 효과적으로 사용하기 위한 핵심 원칙입니다.

  • 커스텀 스레드 풀을 반드시 사용합니다. 기본 ForkJoinPool.commonPool()은 모든 CompletableFuture가 공유하므로, 하나의 느린 작업이 전체 시스템을 블로킹할 수 있습니다
  • 체이닝에서 thenApply vs thenCompose를 구분합니다. 동기 변환은 thenApply, 비동기 작업 연결은 thenCompose입니다. 잘못 사용하면 CompletableFuture<CompletableFuture<T>> 중첩이 발생합니다
  • 타임아웃을 항상 설정합니다. Java 9+의 orTimeout() 또는 completeOnTimeout()을 사용합니다
  • join() 대신 get()에 타임아웃을 지정합니다. join()은 무한 대기 가능성이 있습니다
  • 에러 처리를 체인 끝에 배치합니다. exceptionally()를 파이프라인 마지막에 두면 모든 단계의 예외를 포착합니다
  • Java 21 이상이라면 Virtual Thread와 함께 사용하면 I/O 바운드 작업에서 더 높은 처리량을 얻을 수 있습니다

이 글이 도움이 되었나요?