👩🏻‍💻 Programming/Java

ThreadPoolExecutor vs ThreadPoolTaskExecutor

한국의 메타몽 2024. 10. 29. 00:00

목차

  1. ThreadPoolExecutor (from Java)
    (1) Executors.newFixedThreadPool
    (2) Executors.newCachedFixedThreadPool
    (3) Custom ThreadPoolExecutor
  2. ThreadPoolTaskExecutor (from Spring)
  3. 정리



Java의 ThreadPoolExecutor에 대한 전반적인 내용은 pompitzz님의 Java의 ThreadPoolExecutor, Spring의 ThreadPoolTaskExecutor를 참고했다.




1. ThreadPoolExecutor (from Java)

 

  • ThreadPoolExecutor는 자바 표준 라이브러리(java.util.concurrent)에서 제공하는 기본적인 thread-pool 구현체
  • Java 1.5 이후 ExecutorService의 구현체인 ThreadPoolExecutor를 통해 스레드 풀을 직접 설정하여 사용할 수 있음

 

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}

/**
- corePoolSize : 기본 풀 size를 의미. 최초 스레드는 corePoolSize만큼 생성
- maximumPoolSize : workQueue가 가득 찼을 때 추가적인 theead-pool을 생성하는데, 이때 허용 가능한 thread의 수
- keepAliveTime : corePoolSize보다 스레드가 많아졌을 경우, maximumPoolSize까지 스레드가 생성되는데 keepAliveTime까지 유지했다가 다시 corePoolSize로 유지되는 시간
- unit : keepAliveTime의 시간 단위 (ex : SECONDS)
- workQueue : corePoolSize보다 스레드가 많아졌을 경우, 남는 스레드가 없을 경우 해당 큐에 저장
- threadFactory(생략 가능) : 스레드 풀에서 스레드를 생성할 때 사용할 threadFactory
- handler (생략 가능) : 스레드 풀이 가득차 더 이상 작업을 수용할 수 없을때 사용되는 핸들러를 정의
*/

 

스레드 풀의 스레드 개수의 증가 절차는

corePoolSize 증가 -> workQueue가 가득찰 때까지 queue에 추가 -> maximumPoolSize 증가


순서를 거친다.



Executors 팩토리를 활용하여 ThreadPoolExecutor 생성하기

  • Java에서 제공하는 Executors 팩토리에는 ThreadPoolExecutor를 이용한 팩토리 메서드들을 제공

 

(1) Executors.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • newFixedThreadPool은 메서드 명과 동일하게 고정된 크기의 스레드 풀을 생성
  • 해당 메서드는 작업 큐의 크기가 제한되지 않은 LinkedBlockingQueue로 지정했기 때문에 사실상 maximumPoolSize는 사용되지 않음
  • 즉, 작업이 계속해서 들어오더라도 지정된 스레드 개수만큼 스레드가 증가하고 그 이후에 작업 큐에 계속해서 작업이 쌓이게 됨

 

예시 코드

// 100ms 만큼 block되는 작업
private Runnable getTask() {
    return () -> {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    };
}

// threadPool을 종료하고 모든 작업이 끝날 때까지 기다린다.
private void shutDownAndWaitUntilTerminated(ExecutorService executorService) {
    try {
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

@Test
void testFixedThreadPool() throws Exception {
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

    // 100개의 작업을 수행
    IntStream.range(0, 100).forEach(i -> threadPoolExecutor.execute(getTask()));

    int poolSize = threadPoolExecutor.getPoolSize();
    int queueSize = threadPoolExecutor.getQueue().size();

    assertThat(poolSize).isEqualTo(5);
    assertThat(queueSize).isEqualTo(95);

    String message = String.format("CurrentPoolSize: %s, WorkQueueSize: %s", poolSize, queueSize);
    System.out.println(message);

    shutDownAndWaitUntilTerminated(threadPoolExecutor);
}

// 촐력결과 -> CurrentPoolSize: 5, WorkQueueSize: 95
  • 5개의 고정된 스레드 풀을 할당하였으므로 100개의 작업을 동시에 실행하더라도 나머지 95개의 작업은 작업 큐에 쌓이는 것을 확인할 수 있음

 

newFixedThreadPoolworkQueue 사이즈가 제한되지 않기 때문에 작업이 계속해서 들어온다면 메모리 초과가 발생할 수 있으므로 상용에서 사용하기엔 무리가 있음



(2) Executors.newCachedFixedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • newCachedThreadPoolcorePoolSize를 0개로 지정하고, maximumPoolSizeInteger.MAX_VALUE로 지정
  • 즉 작업이 계속 추가되면 Integer.MAX_VALUE까지 스레드는 계속해서 늘어날 수 있음
    • 늘어난 스레드는 keepAliveTime에 지정된 만큼 60초동안 유지된 후 제거됨
  • 그리고 workQueueSynchronousQueue로 지정된 것을 알 수 있는데 SynchronousQueue는 이름을 Queue이지만 기존 Queue와는 다르게 들어오는 작업을 즉시 작업 스레드에게 넘겨줌
  • 그러므로 작업이 계속해서 추가되면 SynchronousQueue는 스레드에게 작업을 즉시 넘겨주게 되므로 스레드 풀의 스레드는 계속해서 늘어나게 됨

 

예시 코드

@Test
void testCachedThreadPool() throws Exception {
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

    IntStream.range(0, 1000).forEach(i -> threadPoolExecutor.execute(getTask()));

    int poolSize = threadPoolExecutor.getPoolSize();
    int queueSize = threadPoolExecutor.getQueue().size();

    assertThat(poolSize).isEqualTo(1000);
    assertThat(queueSize).isEqualTo(0);

    TimeUnit.SECONDS.sleep(65);

    // keepAlive 시간이후엔 스레드들이 제거된다.
    assertThat(threadPoolExecutor.getPoolSize()).isEqualTo(0);

    shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
  • newCachedThreadPool를 활용하여 1000개의 작업을 동시에 수행시키면 즉시 poolSize는 1000개까지 증가하여 작업을 수행
  • 그리고 keepAliveTime이 60초이므로 그 이후엔 추가기된 스레드는 제거됨

 

newCachedThreadPool은 작업이 계속해서 들어온다면 스레드 풀의 스레드 개수를 거의 무제한으로 증가시키기 때문에 상용에서 사용하기엔 무리가 있음



(3) Custom ThreadPoolExecutor

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,5,3, SECONDS, new LinkedBlockingQueue<>(7));
/*
corePoolSize : 1 = 최초 스레드의 생성 사이즈 
maximumPoolSize : 5 = workQueue가 가득 찼을때 추가적으로 허용하는 thread 수
keepAliveTime : 3 = maximumPoolSize에서 coolPoolSize가 될때까지 유지되는 시간
unit : SECONDS = keepAliveTime의 시간 단위
workQueue : 7 = corePoolSize보다 스레드가 많아질 경우, 남는 스레드가 없을 때 해당 큐에 저장
*/
  • 위와 같이 직접 ThreadPoolExecutor를 생성할 수 있음



예시 코드

class ThreadPoolExecutorTest {

    private ThreadPoolExecutor threadPoolExecutor;

    @BeforeEach
    void setUp() {
        threadPoolExecutor = new ThreadPoolExecutor(1, 5, 3, SECONDS, new LinkedBlockingQueue<>(7));
    }

    @AfterEach
    void tearDown() {
        threadPoolExecutor.shutdown();
    }

    @Test
    void testThreadPoolExecutorQueueCapacity() throws InterruptedException {
        // 태스크 12개 제출 (최대 5개의 스레드 + 7개의 큐 용량)
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 12; i++) {
            int taskId = i;
            futures.add(threadPoolExecutor.submit(() -> {
                Thread.sleep(1000);
                return "Task " + taskId + " completed";
            }));
        }

        // 완료된 태스크 확인
        int completedTasks = 0;
        for (Future<String> future : futures) {
            try {
                assertEquals("Task " + completedTasks + " completed", future.get());
                completedTasks++;
            } catch (ExecutionException e) {
                System.out.println("Task failed: " + e.getMessage());
            }
        }
        assertEquals(12, completedTasks);
    }

    @Test
    void testThreadPoolExecutorRejectPolicy() {
        // 큐와 최대 스레드 용량을 초과한 경우 예외 발생 확인
        for (int i = 0; i < 12; i++) {
            threadPoolExecutor.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 이미 12개가 꽉찼는데 한 개를 더 넣어버려서 예외 발생
        assertThrows(RejectedExecutionException.class, () -> threadPoolExecutor.submit(() -> {}));
    }
}



2. ThreadPoolTaskExecutor (from Spring)

  • 보다시피 패키지 경로는 springframework
  • ThreadPoolTaskExecutor는 Spring 프레임워크에서 제공하는 스레드 풀 구현체로, 내부적으로는 ThreadPoolExecutor를 래핑하여 사용
  • 기본적으로 ThreadPoolTaskExecutorcorePoolSize maxPoolSize queueCapacityThreadPoolExecutor와 유사한 설정을 제공하지만, 여기에 Spring의 라이프사이클과 설정 편의성을 더함
    • 쉽게 말해 Spring에서 빈 관리 및 설정이 용이함

 

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * 비동기 ThreadPool 설정 Bean
 */
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean("defaultTaskExecutor")
    public ThreadPoolTaskExecutor defaultTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("Default-");
        executor.initialize();	 // ThreadPoolExecutor 초기화
        return executor;
    }
}

 

// 비동기 로직이 필요한 곳에서 @Async 어노테이션을 붙이고 로직 작성

@Async
public void callAsyncMethod(...) {
	// 비동기 로직 작성
}
  • 예시로, 위 코드에서 initialize() 메서드를 통해 ThreadPoolExecutor를 초기화하고 Spring이 빈을 자동으로 관리해주는 등의 이점을 제공
  • Spring의 비동기 작업(ex : @Async)이나 특정 배치 작업과 같은 Spring 내장 기능에서 쉽게 사용할 수 있도록 최적화 되어 있음
  • 따라서 Spring의 어플리케이션 컨텍스트에서 관리되기 때문에, Spring의 설정 파일이나 Bean 설정에서 사용하기 용이하며, Spring 기반 비동기 및 배치 작업에 적합함



3. 정리

 

특성 ThreadPoolExecutor ThreadPoolTaskExecutor
제공 위치 Java 표준 라이브러리 Spring Framework
Spring과의 연계 Spring 비의존 Spring과 긴밀히 연동 가능
Spring Lifecycle 지원 X O
설정 용이성 직접 인스턴스화 Spring에서 빈 관리 및 설정 용이
주 사용처 Spring 외부의 멀티스레딩 Spring 비동기 작업(@Async) 및 배치 작업