👩🏻💻 Programming/Java
ThreadPoolExecutor vs ThreadPoolTaskExecutor
한국의 메타몽
2024. 10. 29. 00:00
목차
- ThreadPoolExecutor (from Java)
(1) Executors.newFixedThreadPool
(2) Executors.newCachedFixedThreadPool
(3) Custom ThreadPoolExecutor - ThreadPoolTaskExecutor (from Spring)
- 정리
Java의 ThreadPoolExecutor에 대한 전반적인 내용은 pompitzz님의 Java의 ThreadPoolExecutor, Spring의 ThreadPoolTaskExecutor를 참고했다.
1. ThreadPoolExecutor (from Java)
ThreadPoolExecutor
는 자바 표준 라이브러리(java.util.concurrent)에서 제공하는 기본적인 thread-pool 구현체- Java 1.5 이후
ExecutorService
의 구현체인ThreadPoolExecutor
를 통해 스레드 풀을 직접 설정하여 사용할 수 있음
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
/**
- corePoolSize : 기본 풀 size를 의미. 최초 스레드는 corePoolSize만큼 생성
- maximumPoolSize : workQueue가 가득 찼을 때 추가적인 theead-pool을 생성하는데, 이때 허용 가능한 thread의 수
- keepAliveTime : corePoolSize보다 스레드가 많아졌을 경우, maximumPoolSize까지 스레드가 생성되는데 keepAliveTime까지 유지했다가 다시 corePoolSize로 유지되는 시간
- unit : keepAliveTime의 시간 단위 (ex : SECONDS)
- workQueue : corePoolSize보다 스레드가 많아졌을 경우, 남는 스레드가 없을 경우 해당 큐에 저장
- threadFactory(생략 가능) : 스레드 풀에서 스레드를 생성할 때 사용할 threadFactory
- handler (생략 가능) : 스레드 풀이 가득차 더 이상 작업을 수용할 수 없을때 사용되는 핸들러를 정의
*/
스레드 풀의 스레드 개수의 증가 절차는
corePoolSize 증가 -> workQueue가 가득찰 때까지 queue에 추가 -> maximumPoolSize 증가
순서를 거친다.
Executors 팩토리를 활용하여 ThreadPoolExecutor 생성하기
- Java에서 제공하는
Executors
팩토리에는ThreadPoolExecutor
를 이용한 팩토리 메서드들을 제공
(1) Executors.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool
은 메서드 명과 동일하게 고정된 크기의 스레드 풀을 생성- 해당 메서드는 작업 큐의 크기가 제한되지 않은
LinkedBlockingQueue
로 지정했기 때문에 사실상maximumPoolSize
는 사용되지 않음 - 즉, 작업이 계속해서 들어오더라도 지정된 스레드 개수만큼 스레드가 증가하고 그 이후에 작업 큐에 계속해서 작업이 쌓이게 됨
예시 코드
// 100ms 만큼 block되는 작업
private Runnable getTask() {
return () -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
}
// threadPool을 종료하고 모든 작업이 끝날 때까지 기다린다.
private void shutDownAndWaitUntilTerminated(ExecutorService executorService) {
try {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Test
void testFixedThreadPool() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 100개의 작업을 수행
IntStream.range(0, 100).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(5);
assertThat(queueSize).isEqualTo(95);
String message = String.format("CurrentPoolSize: %s, WorkQueueSize: %s", poolSize, queueSize);
System.out.println(message);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
// 촐력결과 -> CurrentPoolSize: 5, WorkQueueSize: 95
- 5개의 고정된 스레드 풀을 할당하였으므로 100개의 작업을 동시에 실행하더라도 나머지 95개의 작업은 작업 큐에 쌓이는 것을 확인할 수 있음
newFixedThreadPool
는 workQueue
사이즈가 제한되지 않기 때문에 작업이 계속해서 들어온다면 메모리 초과가 발생할 수 있으므로 상용에서 사용하기엔 무리가 있음
(2) Executors.newCachedFixedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool
은corePoolSize
를 0개로 지정하고,maximumPoolSize
를Integer.MAX_VALUE
로 지정- 즉 작업이 계속 추가되면
Integer.MAX_VALUE
까지 스레드는 계속해서 늘어날 수 있음- 늘어난 스레드는 keepAliveTime에 지정된 만큼 60초동안 유지된 후 제거됨
- 그리고
workQueue
는SynchronousQueue
로 지정된 것을 알 수 있는데SynchronousQueue
는 이름을 Queue이지만 기존 Queue와는 다르게 들어오는 작업을 즉시 작업 스레드에게 넘겨줌 - 그러므로 작업이 계속해서 추가되면
SynchronousQueue
는 스레드에게 작업을 즉시 넘겨주게 되므로 스레드 풀의 스레드는 계속해서 늘어나게 됨
예시 코드
@Test
void testCachedThreadPool() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
IntStream.range(0, 1000).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(1000);
assertThat(queueSize).isEqualTo(0);
TimeUnit.SECONDS.sleep(65);
// keepAlive 시간이후엔 스레드들이 제거된다.
assertThat(threadPoolExecutor.getPoolSize()).isEqualTo(0);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
newCachedThreadPool
를 활용하여 1000개의 작업을 동시에 수행시키면 즉시 poolSize는 1000개까지 증가하여 작업을 수행- 그리고
keepAliveTime
이 60초이므로 그 이후엔 추가기된 스레드는 제거됨
newCachedThreadPool
은 작업이 계속해서 들어온다면 스레드 풀의 스레드 개수를 거의 무제한으로 증가시키기 때문에 상용에서 사용하기엔 무리가 있음
(3) Custom ThreadPoolExecutor
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,5,3, SECONDS, new LinkedBlockingQueue<>(7));
/*
corePoolSize : 1 = 최초 스레드의 생성 사이즈
maximumPoolSize : 5 = workQueue가 가득 찼을때 추가적으로 허용하는 thread 수
keepAliveTime : 3 = maximumPoolSize에서 coolPoolSize가 될때까지 유지되는 시간
unit : SECONDS = keepAliveTime의 시간 단위
workQueue : 7 = corePoolSize보다 스레드가 많아질 경우, 남는 스레드가 없을 때 해당 큐에 저장
*/
- 위와 같이 직접
ThreadPoolExecutor
를 생성할 수 있음
예시 코드
class ThreadPoolExecutorTest {
private ThreadPoolExecutor threadPoolExecutor;
@BeforeEach
void setUp() {
threadPoolExecutor = new ThreadPoolExecutor(1, 5, 3, SECONDS, new LinkedBlockingQueue<>(7));
}
@AfterEach
void tearDown() {
threadPoolExecutor.shutdown();
}
@Test
void testThreadPoolExecutorQueueCapacity() throws InterruptedException {
// 태스크 12개 제출 (최대 5개의 스레드 + 7개의 큐 용량)
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 12; i++) {
int taskId = i;
futures.add(threadPoolExecutor.submit(() -> {
Thread.sleep(1000);
return "Task " + taskId + " completed";
}));
}
// 완료된 태스크 확인
int completedTasks = 0;
for (Future<String> future : futures) {
try {
assertEquals("Task " + completedTasks + " completed", future.get());
completedTasks++;
} catch (ExecutionException e) {
System.out.println("Task failed: " + e.getMessage());
}
}
assertEquals(12, completedTasks);
}
@Test
void testThreadPoolExecutorRejectPolicy() {
// 큐와 최대 스레드 용량을 초과한 경우 예외 발생 확인
for (int i = 0; i < 12; i++) {
threadPoolExecutor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 이미 12개가 꽉찼는데 한 개를 더 넣어버려서 예외 발생
assertThrows(RejectedExecutionException.class, () -> threadPoolExecutor.submit(() -> {}));
}
}
2. ThreadPoolTaskExecutor (from Spring)
- 보다시피 패키지 경로는
springframework
ThreadPoolTaskExecutor
는 Spring 프레임워크에서 제공하는 스레드 풀 구현체로, 내부적으로는ThreadPoolExecutor
를 래핑하여 사용- 기본적으로
ThreadPoolTaskExecutor
는corePoolSize
maxPoolSize
queueCapacity
등ThreadPoolExecutor
와 유사한 설정을 제공하지만, 여기에 Spring의 라이프사이클과 설정 편의성을 더함- 쉽게 말해 Spring에서 빈 관리 및 설정이 용이함
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 비동기 ThreadPool 설정 Bean
*/
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("defaultTaskExecutor")
public ThreadPoolTaskExecutor defaultTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("Default-");
executor.initialize(); // ThreadPoolExecutor 초기화
return executor;
}
}
// 비동기 로직이 필요한 곳에서 @Async 어노테이션을 붙이고 로직 작성
@Async
public void callAsyncMethod(...) {
// 비동기 로직 작성
}
- 예시로, 위 코드에서
initialize()
메서드를 통해ThreadPoolExecutor
를 초기화하고 Spring이 빈을 자동으로 관리해주는 등의 이점을 제공 - Spring의 비동기 작업(ex :
@Async
)이나 특정 배치 작업과 같은 Spring 내장 기능에서 쉽게 사용할 수 있도록 최적화 되어 있음 - 따라서 Spring의 어플리케이션 컨텍스트에서 관리되기 때문에, Spring의 설정 파일이나 Bean 설정에서 사용하기 용이하며, Spring 기반 비동기 및 배치 작업에 적합함
3. 정리
특성 | ThreadPoolExecutor | ThreadPoolTaskExecutor |
---|---|---|
제공 위치 | Java 표준 라이브러리 | Spring Framework |
Spring과의 연계 | Spring 비의존 | Spring과 긴밀히 연동 가능 |
Spring Lifecycle 지원 | X | O |
설정 용이성 | 직접 인스턴스화 | Spring에서 빈 관리 및 설정 용이 |
주 사용처 | Spring 외부의 멀티스레딩 | Spring 비동기 작업(@Async) 및 배치 작업 |