데이터베이스 업데이트 프로젝트를 진행하던 때에 데이터를 가공하는 작업을 수행하는데 시간이 매우 오래 걸리는 부분이 있었다.
업데이트를 위해 수집해온 데이터가 8천만개 이상인 상황에서 싱글 쓰레드로 수행하면 완료 시간이 말도 안되는 시간이 예상되어 멀티 쓰레딩을 통한 시간 단축이 필수라고 생각되었다.
구현 로직
구현은 java 8에서 비동기 요청을 처리할때 사용하는 객체인 CompletableFuture를 사용하였다.
1. 데이터 가공
2. 가공한 데이터를 저장
1번 완료 후 리턴값을 받아서 2번을 수행하는 과정을 모두 비동기 작업으로 병렬처리한다.
ThreadPoolTaskExecutor
별도의 thread pool을 생성하여 실행하기위해 Executor를 설정하여 bean으로 등록해둔다.
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100); // 실행 대기 thread 수 설정값
executor.setMaxPoolSize(200); // 동시 동작하는 최대 thread 수
executor.setQueueCapacity(100); // 대기 queue에 최대 수용 가능 한 수
executor.setThreadNamePrefix("Chemiverse-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
위 설정은 최초 100개의 thread가 요청을 수행하는데, 처리할 양이 많거나 밀릴 경우 100개까지 저장할 수 있는 대기 큐에 요청 저장되고 이 수를 넘을 경우 최대 200개까지 thread를 생성하여 수행된다.
- CallerRunsPolicy() : 예외나 누락없이 처리하기 위한 설정
- waitForTasksToCompleteOnShutdown(true) : 모든 task가 완료된 후 application이 종료되도록 하는 설정
비동기 작업 실행 메서드
supplyAsync()
반환값이 존재하는 작업을 비동기로 처리할 때 사용한다.
runAsync()
반환값이 필요없는 작업을 비동기로 처리할 때 사용한다.
None-blocking 콜백 함수
위 메서드로 비동기 실행을 하고나서 리턴 값을 활용해야할 때가 있다. 이 때 get()이나 join()을 사용하면 블로킹이 일어난다. 호출한 thread가 비동기로 실행한 thread의 동작이 끝날 때까지 다른 일을 하지 못하고 기다리는 것이다. 블로킹이 일어나면 완전한 비동기로 작업이 수행되지 못하기 때문에 논 블로킹 콜백함수를 사용한다.
thenAccept()
비동기로 실행한 반환값을 받아서 처리 후 반환하는 값이 없다.
thenApply()
반환값을 받아서 처리 후 값을 반환한다.
주요 코드
...
private final ThreadPoolTaskExecutor executor;
...
public void getMoleculeDataAsync(LinkedHashSet<Zinc> updatedZincs) throws IOException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Zinc updatedZinc : updatedZincs) {
Path path = Paths.get(util.getAbsPath() + "/zinc/" + updatedZinc.getFileName());
String[] fileString = util.getFileString(path).split("\n");
// 비동기 실행
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(
() -> parseMolecule(updatedZinc.getName(), fileString), executor) // parsing async
.thenAcceptAsync(o -> {
moleculeService.saveOrPassMolecule(o);
moleculeService.saveOrPassMoleculeSource(o);
}, executor); // save async
futures.add(future);
}
// completableFuture blocking
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
executor는 위에서 bean으로 생성해준 ThreadPoolTaskExecutor이다.
동작 방식은
1. 데이터를 가공하는 메서드(parseMolecule)를 비동기로 실행한다. 가공된 데이터를 활용해야하기 때문에 supplyAsync()로 수행하여 반환값을 받는다.
2. 1번에서 수행시킨 각 thread가 일을 끝내면 thenAcceptAsync로 반환값을 받아 다음 작업(save)을 콜백한다.
3. 모든 비동기 작업이 완료되고 난 후 다음 작업으로 넘어가야하기 때문에 allOf()로 모든 작업을 join()하여 완료될 때까지 기다린다.
(3번에서 join()을 하지 않으면 비동기로 수행한 작업이 모두 끝나기 전에 요청이 종료되거나 다음 로직으로 넘어가버리게 된다.)
allOf()
CompletableFuture.allOf()의 반환 유형은 CompletableFuture<Void>이다. 이는 결과를 반환하지 않는다는 말이다. (한계점)
만약 위와 다르게 future에서 apply로 값을 반환한다면 수동으로 futures에서 결과를 가지고 와야한다.
List<List<String>> affinities = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(o -> futures.stream()
.map(CompletableFuture::join).collect(Collectors.toList()))
.join();
결과
multi thread로 비동기 처리 후
- 약 2백만건의 데이터 입력 16시간에서 4시간으로 4배 개선
'spring & java' 카테고리의 다른 글
[Spring] 스프링 부트 핵심 가이드 - API 작성 기초 (4) | 2023.10.29 |
---|---|
[Spring] 스프링 부트 핵심 가이드 - Spring 기초 지식 (0) | 2023.10.22 |
[Spring] Spring AOP 적용 (0) | 2023.06.19 |
[Spring] Spring Security + JWT (7) | 2023.02.06 |
[JPA] batch insert 적용 (0) | 2022.11.19 |