SSA 서비스에서는 강의자가 강의 내용을 수정하면 수강생들에게 실시간으로 알림을 보내주는 기능이 있습니다. 이는 학생들이 변경된 내용을 빠르게 인지하고 학습 계획을 조정할 수 있도록 돕는 중요한 기능입니다.
하지만 인기 강의의 경우 수강생이 수백 명에 달해, 순차적으로 알림을 보내면 상당한 시간이 소요되는 문제가 있을 수 있습니다. 강의 내용 전체가 수정되거나 할 경우 또 다른 강의를 수강신청 하거나 아예 수강하지 않는 등 학생이 학습계획을 조정해야 할 수 있습니다. 또 이 경우 여러 명의 학생이 수강취소를 했을 때 해당 강의를 수강하기 원했던 다른 학생이 호시탐탐 강의를 노리고 있기 마련입니다.
이때 알림 처리가 늦어지면 강남에 있는 모든 학생에의 알림이 지연되는 것이죠. 이 문제를 해결하기 위해 카프카 컨슈머의 알림 처리 로직에 병렬 처리를 도입하여 알림 전송 성능을 개선해보기로 했습니다.
1. 테스트 시나리오 설계:
- 테스트에 사용할 베이스 코드입니다.
@BeforeEach
void setUp() {
// 1. 테스트 도구 초기화
stopWatch = new StopWatch();
latch = new CountDownLatch(NUM_STUDENTS);
// 2. Mock 객체 설정: courseClient는 ID 리스트를 반환
List<Long> studentIds = LongStream.range(1, NUM_STUDENTS + 1).boxed().toList();
GetEnrolledStudentInfosResponse response = new GetEnrolledStudentInfosResponse("강의 제목", "메모", studentIds);
given(courseClient.getEnrolledStudentInfosByLectureId(anyLong())).willReturn(response);
// 3. Mock 객체 설정: notificationService는 호출될 때마다 10ms 지연시키고 Latch를 감소시킴
doAnswer(invocation -> {
TimeUnit.MILLISECONDS.sleep(DELAY_PER_NOTIFICATION_MS);
latch.countDown();
return null;
}).when(notificationService).sendPushNotification(any());
}
protected void produceMessage() {
TestDomainEvent testEvent = new TestDomainEvent(1L);
KafkaDomainEventMessage message = KafkaDomainEventMessage.from(testEvent);
kafkaTemplate.send(KafkaTopics.LECTURE_UPDATED_TOPIC, message);
}
2. 성능 측정 방법:
- 테스트 대상: 500명의 수강생에게 알림을 보내는 상황을 가정했습니다.
- I/O 작업 시뮬레이션: 실제 FCM 외부 통신에는 네트워크 지연이 발생합니다. 비슷한 환경을 조성하기 위해 각 알림 전송마다 10ms의 지연 시간을 주었습니다.
- 정확한 시간 측정: 스프링의 StopWatch로 전체 작업 시간을 측정하고, @KafkaListener가 비동기로 동작하므로 CountDownLatch를 이용해 500개의 알림 처리가 모두 완료될 때까지 대기하도록 구현했습니다.
## 테스트 1: 기존 방식 - 순차 처리 (단순 반복문)
우선 강의 수정 이벤트를 받아 단순히 반복문을 돌며 알림을 전송하는 코드입니다.
@KafkaListener(
topics = KafkaTopics.LECTURE_UPDATED_TOPIC,
containerFactory = KafkaConsumerConfig.DOMAIN_EVENT_CONTAINER_FACTORY
)
public void consumeLectureUpdatedEvent(
KafkaDomainEventMessage message,
Acknowledgment ack,
@Header(KafkaHeaders.OFFSET) int offset
) {
log.info("Try to consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
GetEnrolledStudentInfosResponse studentInfos = courseClient.getEnrolledStudentInfosByLectureId(message.targetDomainId());
String title = studentInfos.lectureTitle();
String memo = studentInfos.lectureMemo();
List<Long> studentIds = studentInfos.studentIds();
if (studentIds == null || studentIds.isEmpty()) {
ack.acknowledge();
return;
}
// 1. 단순 반복문 각 학생에게 개별적으로 알림 처리
for (Long studentId : studentIds) {
NotificationMessageEvent event = new NotificationMessageEvent(
message.targetDomainId(),
studentId,
title,
"강의 내용이 수정되었습니다. 수정 내용: {%s}".formatted(memo)
);
notificationService.sendPushNotification(event);
log.info("send notification message. to student ID: {}", studentId);
}
ack.acknowledge();
log.info("Successfully consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
}
테스트 실행 결과입니다.

약 6.441 초 소요되었습니다.
[성능 측정 결과]
- 예상 소요 시간: 500명 * 10ms = 5,000ms (약 5초)
- 실제 측정 시간: 약 6,441ms (6.4초)
예상보다 시간이 더 걸린 이유는 반복문 자체의 오버헤드와 테스트 환경의 부가적인 처리 시간 때문일 것입니다.
학생 수가 더 많아진다면 이 시간은 선형적으로 증가할 것입니다.
## 테스트 2: Parallel Stream
강의 수정 이벤트를 받아 병렬적으로 알림을 전송하는 코드입니다.
@KafkaListener(
topics = KafkaTopics.LECTURE_UPDATED_TOPIC,
containerFactory = KafkaConsumerConfig.DOMAIN_EVENT_CONTAINER_FACTORY
)
public void consumeLectureUpdatedEvent(
KafkaDomainEventMessage message,
Acknowledgment ack,
@Header(KafkaHeaders.OFFSET) int offset
) {
log.info("Try to consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
GetEnrolledStudentInfosResponse studentInfos = courseClient.getEnrolledStudentInfosByLectureId(message.targetDomainId());
String title = studentInfos.lectureTitle();
String memo = studentInfos.lectureMemo();
List<Long> studentIds = studentInfos.studentIds();
if (studentIds == null || studentIds.isEmpty()) {
ack.acknowledge();
return;
}
// 2. ForkJoinPool parallelStream 사용
studentIds.stream().parallel().forEach(studentId -> {
try {
NotificationMessageEvent event = new NotificationMessageEvent(
message.targetDomainId(),
studentId,
title,
"강의 내용이 수정되었습니다. 수정 내용: {%s}".formatted(memo)
);
notificationService.sendPushNotification(event);
log.info("Sent notification message. to student ID: {}", studentId);
} catch (Exception e) {
// 개별 알림 실패 시 로그 기록. 필요 시 DLQ 전송 등의 로직 추가 가능
log.error("Failed to send notification to student ID: {}. Error: {}", studentId, e.getMessage());
}
});
ack.acknowledge();
log.info("Successfully consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
}
테스트 실행 결과입니다.

약 0.968초 소요되었습니다.
[성능 측정 결과]
- 실제 측정 시간: 약 968ms (0.97초)
stream().parallel()을 추가하는 것만으로 처리 시간이 약 6.7배 향상되었습니다.
내부적으로 JVM의 공용 ForkJoinPool을 사용하여 작업을 여러 쓰레드에 분배했기 때문입니다.
### JVM의 공용 ForkJoinPool을 사용할 경우 고민해야 할 점
parallelStream()을 호출하면 내부적으로 공용 ForkJoinPool을 사용합니다.
이 풀은 JVM 전역에서 단 하나만 존재하는 정적 자원(static resource)입니다.
- 기본 쓰레드 개수: ForkJoinPool의 기본 쓰레드 개수는 내 컴퓨터의 프로세서(코어) 수 - 1개로 설정됩니다.
예를 들어, 8코어 CPU를 가진 Mac에서는 7개의 쓰레드를 가진 ForkJoinPool이 생성됩니다.- 그 이유는parallelStream()을 호출한 메인 스레드까지 포함해서 CPU 코어 개수만큼만 스레드를 사용해,
불필요한 컨텍스트 스위칭(Context Switching) 비용을 최소화하고 CPU를 최대한 효율적으로 사용하기 위함입니다.
- 그 이유는parallelStream()을 호출한 메인 스레드까지 포함해서 CPU 코어 개수만큼만 스레드를 사용해,
주의사항:
- 이 풀은 JVM 내의 모든 parallelStream()이 공유합니다. 만약 애플리케이션의 여러 곳에서 동시에 parallelStream()을 사용하면, 모두 이 한정된 개수의 쓰레드를 놓고 경쟁하게 됩니다. 한쪽에서 시간이 오래 걸리는 작업을 하면 다른 쪽의 parallelStream() 성능이 저하될 수 있습니다.
- parallelStream()이 사용하는 공용 ForkJoinPool의 스레드는 기본적으로 데몬 쓰레드(Daemon Thread)로 동작합니다.
- 데몬 스레드란, 주인(애플리케이션의 메인 스레드)을 보조하는 하인 같은 존재입니다. 애플리케이션이 종료 데몬 쓰레드도 같이 종료됩니다.
따라서 스프링 부트 애플리케이션이 종료 신호를 받으면, ForkJoinPool에서 실행 중이던 작업들은 완료를 보장받지 못하고 그대로 중단됩니다.
해결책으로는 2가지를 알아 볼 수 있습니다.
- 별도의 ForkJoinPool Bean으로 등록하기
--> ForkJoinPool을 직접 스프링 Bean으로 등록하고, parallelStream() 대신 이 풀에 작업을 제출합니다. - @Async와 TaskExecutor 활용하기
--> 별도의 ThreadPoolTaskExecutor Bean을 설정하면 Graceful Shutdown이 적용됩니다.
## 테스트 3: (ExecutorService + CompletableFuture)
강의 수정 이벤트를 받아 Executor로 알림을 전송하는 코드입니다.
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 기본 쓰레드 수
executor.setMaxPoolSize(20); // 최대 쓰레드 수
executor.setQueueCapacity(100_000); // 대기 큐 크기
executor.setThreadNamePrefix("Notification-");
executor.initialize();
return executor;
}
@KafkaListener(
topics = KafkaTopics.LECTURE_UPDATED_TOPIC,
containerFactory = KafkaConsumerConfig.DOMAIN_EVENT_CONTAINER_FACTORY
)
public void consumeLectureUpdatedEvent(
KafkaDomainEventMessage message,
Acknowledgment ack,
@Header(KafkaHeaders.OFFSET) int offset
) {
log.info("Try to consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
GetEnrolledStudentInfosResponse studentInfos = courseClient.getEnrolledStudentInfosByLectureId(message.targetDomainId());
String title = studentInfos.lectureTitle();
String memo = studentInfos.lectureMemo();
List<Long> studentIds = studentInfos.studentIds();
if (studentIds == null || studentIds.isEmpty()) {
ack.acknowledge();
return;
}
// 3. Executor CompletableFuture 사용
List<CompletableFuture<Void>> futures = studentIds.stream()
.map(studentId -> CompletableFuture.runAsync(() -> {
try {
NotificationMessageEvent event = new NotificationMessageEvent(
message.targetDomainId(),
studentId,
title,
"강의 내용이 수정되었습니다. 수정 내용: {%s}".formatted(memo)
);
notificationService.sendPushNotification(event);
log.info("Sent notification message. to student ID: {}", studentId);
} catch (Exception e) {
log.error("Failed to send notification to student ID: {}. Error: {}", studentId, e.getMessage());
}
}, notificationTaskExecutor)) // 쓰레드 풀 사용
.toList();
// 모든 비동기 작업이 끝날 때까지 대기
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
ack.acknowledge();
log.info("Successfully consume lecture updated event topic. id: {}, uuid: {}, offset: {}",
message.targetDomainId(), message.uuid(), offset);
}
테스트 실행 결과입니다.

약 0.541초 소요되었습니다.
[성능 측정 결과]
- corePoolSize: 20, maxPoolSize: 20 설정 시
- 실제 측정 시간: 약 541ms (0.54초)
전용 쓰레드 풀을 사용하자 약 11.9배의 성능 향상을 보였습니다. 여기서 한 가지 의문이 생겼습니다. I/O 작업이 대부분인데, 쓰레드 수를 늘리면 성능이 더 좋아지지 않을까?
## 테스트 4: 쓰레드 풀 튜닝 (I/O Bound)
알림 전송과 같은 I/O Bound 작업은 CPU는 거의 사용하지 않고 대부분의 시간을 네트워크 응답 대기에 사용합니다. 이런 경우, CPU 코어 수보다 훨씬 많은 스레드를 할당하면 대기 시간 동안 다른 쓰레드가 작업을 처리할 수 있어 전체 처리량이 향상될 수 있습니다.
스레드 수를 20개에서 200개로 늘려 테스트를 진행해 보겠습니다.
## 그전에 !
여기서 대기 쓰레드 개수는 그대로 두고 대기 큐를 늘렸을 경우에는 처리 시간에 대해 큰 차이가 없습니다.
어느 정도 예상이 가는 내용이지만 작업에 대한 처리 자체는 쓰레드가 하기 때문에 대기 큐가 늘어난 것에는 큰 의미가 없긴 합니다.
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 기본 쓰레드 수
executor.setMaxPoolSize(20); // 최대 쓰레드 수
executor.setQueueCapacity(100_000); // 대기 큐 크기
executor.setThreadNamePrefix("Notification-");
executor.initialize();
return executor;
}

소요 시간 약 0.529초입니다.
## 핵심 !!
이 밑에가 바로 대기 큐를 그대로 두고 쓰레드 개수를 늘린 경우입니다.
톰캣의 쓰레드 개수 설정과 자바의 쓰레드 개수 설정은 서로 큰 관련이 없다고 봐도 되지만
그냥 떠오르는 숫자가 톰캣 기본 쓰레드 개수인 200이어서 쓰레드 풀 사이즈를 200개로 설정해 봤습니다.
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(200); // 기본 쓰레드 수
executor.setMaxPoolSize(200); // 최대 쓰레드 수
executor.setQueueCapacity(100_000); // 대기 큐 크기
executor.setThreadNamePrefix("Notification-");
executor.initialize();
return executor;
}

약 0.270초가 소요되었습니다.
[최종 성능 측정 결과]
- 실제 측정 시간: 약 270ms (0.27초)
스레드 수를 늘리자 처리 시간이 절반 가까이 더 줄어들어, 최초 6,441ms 대비 약 23.8배의 성능 향상을 달성했습니다.
대략 500명의 학생에게 알림을 전송하는 기준으로 6,441ms가 걸리던 것을 270ms로 개선해 볼 수 있었습니다.
여기서 쓰레드 풀 사이즈를 400으로 늘리고 성능을 측정해 봤는데 사이즈가 200인 것과 큰 차이가 없었습니다.
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(400); // 기본 쓰레드 수
executor.setMaxPoolSize(400); // 최대 쓰레드 수
executor.setQueueCapacity(100_000); // 대기 큐 크기
executor.setThreadNamePrefix("Notification-");
executor.initialize();
return executor;
}

약 0.212초가 소요되었습니다.
그 이유로는 "제한된 자원" 때문일 것 같습니다.
알림 전송은 Kafka에 메시지를 보내거나 DB에 데이터를 기록하는 작업이 포함됩니다.
이때 I/O Bound 작업이 많이 일어나게 되는데
- 쓰레드가 CPU를 사용해 연산하는 시간보다 데이터베이스와 커넥션을 맺고, 네트워크를 통해 Kafka로 데이터를 전송하는 시간이 훨씬 오래 걸립니다.
- 200개의 작업 쓰레드를 400개로 늘려도 데이터베이스나 Kafka의 자원이 충분하지 않다면 처리 속도는 최대 한계 이상으로 빨라지지 않습니다.
- I/O 작업에 소요되는 자원이나 데이터베이스 커넥션 풀 등의 자원이 전부 사용 중이라면, 쓰레드를 늘려도 대기하는 쓰레드만 많아질 뿐 전체 처리량(TPS)는 크게 개선되지 않습니다.
운영 중인 서비스라면 쓰레드 풀 사이즈를 200과 400, 아니면 그 사이의 어떤 값으로 최대한 작은 풀 사이즈를 사용하면서 성능은 그 이상의 풀 사이즈와 유사한 정도의 값을 사용하겠지만 0.0x초 정도는 큰 차이가 없다고 판단, 200으로 채택하겠습니다.
## 결론 및 고찰
- 단순 반복문은 구현이 쉽지만, 작업량이 많아지면 성능 저하가 뚜렷했습니다.
- Parallel Stream은 최소한의 코드 변경으로 꽤 높은 성능 향상을 보여주어, 복잡하지 않은 병렬화 작업에 먼저 고려할 만했습니다.
- ExecutorService는 전용 쓰레드 풀을 통해 성능을 끌어올릴 수 있었고, 쓰레드 풀 튜닝을 통해 I/O Bound 작업의 특성을 고려하여 성능을 개선해 볼 수 있었습니다.
- 카프카 컨슈머가 해당 토픽을 consume하는 것 자체는 동기적으로 작동하고 내부에서 실제 알림을 전송하는 메서드만 비동기로 처리하도록 구성해 봤습니다. 추후에 카프카도 CompletableFuture를 통해 비동기적으로 동작하도록 할 수도 있겠습니다.
물론, 알림 서버에서 요청을 아무리 빨리 처리하더라도 실제 FCM 서비스의 응답이 느리면 최종적인 알림 도달 시간은 달라질 수 있습니다. 하지만 이번 개선을 통해 시스템 내에서 발생하는 병목 현상은 성공적으로 해결해 볼 수 있었습니다.
## 참고
다만, 쓰레드 풀이 요청에 비해 충분히 넉넉하지 않을 경우 요청이 작업 큐에 들어가지 못하고 유실됩니다.
이때, 카프카 사용 시 Processing automatic preferred replica leader election를 만날 수도 있습니다.
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 기본 쓰레드 수
executor.setMaxPoolSize(20); // 최대 쓰레드 수
executor.setQueueCapacity(100); // 대기 큐 크기가 요청에 비해 작음
executor.setThreadNamePrefix("Notification-");
executor.initialize();
return executor;
}
@Bean(name = NOTIFICATION_TASK_EXECUTOR)
public TaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Notification-");
// 큐가 가득 찼을 때, 작업을 제출한 쓰레드(여기서는 카프카 컨슈머 쓰레드)가
// 직접 작업을 실행하도록 하여 작업 유실을 방지한다.
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
이 경우 저는 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());를 활용해서 Kafka Consume이 되지 않는 문제를 해결할 수는 있었지만,
해결 결과

카프카 컨슈머로 요청을 다시 위임하는, CallerRunsPolicy()를 활용하지 않고
한 번에 알림 전송을 요청하는 경우가 최대 500명 정도라고 가정하고
그만큼의 사이즈를 고려해서 쓰레드 풀을 사용할 것 같습니다.
그 이유는 현재 해당 방식이 정확히 어떤 식으로 동작하는지 정확히 확인하지 못한 상태이고,
카프카로부터 받은 요청을 다시 카프카로 넘기는 것이 타당하지 않다고 생각했기 때문입니다.
주의할 점은 카프카 환경에서 장애가 발생했을 경우 리더 파티션의 역할이었습니다.
이 부분은 추후 학습한 뒤, 적용하도록 하겠습니다.
ps. 알림 서버에서 요청을 빠르게 처리하더라도 실제 FCM 서비스의 요청 처리 속도가 느릴 경우 알림이 늦게 전송될 수도 있습니다 !!
'SSA > Back' 카테고리의 다른 글
| [SSA] 새로운 트랜잭션을 만들면 데드락이 발생하는 것을 테스트해 보자 (0) | 2025.08.29 |
|---|---|
| [SSA] 트랜잭션을 새로 만들어서 사용하면 ? (0) | 2025.08.29 |
| [SSA] 수강 기능 및 알림 기능을 한 곳에서 처리하면 발생할 수 있는 문제 (1) | 2025.07.19 |
| [SSA] 서버 SSE 기능 구현 (0) | 2025.05.11 |
| [SSA] 서버 FCM 기능 구현 (1) | 2025.05.04 |