SSE + Redis Pub/Sub, Scale-out을 고려한 알람 시스템 구현
개인 프로젝트에서 진행했던 알람 시스템 구현에 대해서 포스팅 하려한다.
알람 시스템을 구현하기 위해서는 서버의 알람 이벤트를 클라이언트에게 전송할 수 있어야 하는데 이 때 고려했던 방법은 SSE 연결과 웹소켓을 이용한 연결이었다. 알람 전송을 위해서는 단방향 전송이 필요했기 때문에 SSE 연결을 통해 알람 기능을 구현하고자 했다.
알람 구독 시나리오는 다음과 같다. 클라이언트 사이드에서는 javascript 가 제공하는 eventsource 객체를 통해 서버의 /subscribe 로 연결 요청을 보내고 연결을 맺는다. 서버에서는 in-memory 방식으로 해당 클라이언트의 연결정보를 관리하면서 연결을 유지한다. 아래는 클라이언트와 서버와 Sse 연결을 맺는 기본 구현 코드이다.
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(
@CurrentMember Member member,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
return ResponseEntity.ok(sseEmitterService.subscribe(member, lastEventId));
}
public SseEmitter subscribe(Member member, String lastEventId) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
String uniqueEmitterId = generateUniqueClientId(member.getEmail(),LocalDateTime.now());
sseEmitterRepository.save(uniqueEmitterId, emitter);
emitter.onCompletion(() -> sseEmitterRepository.deleteById(uniqueEmitterId));
emitter.onTimeout(() -> sseEmitterRepository.deleteById(uniqueEmitterId));
emitter.onError((e) -> sseEmitterRepository.deleteById(uniqueEmitterId));
sendToClient(emitter,uniqueEmitterId,"init","init");
if(lastEventId != null){
LocalDateTime dateTime = extractDateTime(lastEventId);
List<Notification> notifications = notificationService.findAllAfterDateTime(dateTime, member);
notifications.forEach(
notification -> sendToClient(emitter,
uniqueEmitterId,
"notification",
NotificationResponse.from(notification))
);
}
return emitter;
}
public void sendToClient(SseEmitter emitter, String id, String name, Object data) {
try {
emitter.send(
SseEmitter.event()
.id(id)
.name(name)
.data(data)
);
} catch (IOException e) {
log.error(e.getMessage(), e);
sseEmitterRepository.deleteById(id);
throw new RuntimeException(FAILED_SSE_NOTIFICATION_SEND.getMessage());
}
}
@Repository
public class SseEmitterRepository {
private Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
public List<SseEmitter> findAllStartWithByMemberEmail(String email) {
return emitterMap.entrySet().stream()
.filter(entry-> entry.getKey().startsWith(email))
.map(Entry::getValue)
.toList();
}
public SseEmitter save(String id, SseEmitter emitter) {
return emitterMap.put(id, emitter);
}
public void deleteById(String uniqueEmitterId) {
emitterMap.remove(uniqueEmitterId);
}
public void deleteAll(){
emitterMap.clear();
}
}
톰캣의 쓰레드풀에 있는 각 쓰레드당 하나의 연결 요청을 처리하기 때문에 공유 자원에 대한 경쟁상황이 생길 수 있으므로
ConcurrentHashMap을 사용하여 SseEmitter 객체를 관리한다.
해당 프로젝트에서 알람이 발생하는 상황은 다음과 같다.
- A유저의 게시글에 B 유저가 댓글을 달았을 때
- A유저의 댓글에 B 유저가 대댓글을 달았을 떄
- Todo 업데이트 마감 시간 공지
- 새벽 1시 각 유저에게 성취도 피드백 메시지 공지
단일서버에서 1번 상황이 발생했다고 가정했을 때는 모든 요청이 하나의 서버로만 전달 되기때문에 해당 서버는 반드시 알람 구독을 맺고있는 클라이언트의 정보를 갖고 있을 것이다. 그렇기 때문에 A의 게시글에 B 의 댓글이 등록될 때 댓글 등록 이벤트를 발생시키고 이벤트 리스너에서 해당 이벤트를 수신하여 A에게 알람을 전송하면 문제가 없다.
하지만 본 프로젝트는 단일서버가 아닌 Scale-out 된 분산서버 환경을 고려하고 설계하였기 때문에 서버가 여러개인 상황을 가정하면, 댓글 등록 이벤트를 발생시킨 서버에서 A유저의 연결정보가 서버에 존재한다고 보장할 수 없다. 왜냐하면 DB에 따로 연결정보를 관리하지 않고 in-memory 형식으로 연결정보를 관리하고 있기 때문이다.
그렇기 때문에 댓글 등록 이벤트가 발생했을 때 A의 연결정보를 갖고 있는 서버에서 댓글 등록 이벤트를 처리하고 알림을 전송할 수 있도록 하는 구조설계가 필요했다.
위와 같은 이유로 Kafka 를 통한 Producer-Consumer 그리고 Redis Pub/Sub 두가지를 고려했다. Kafka 는 토픽을 구독하는 컨슈머 그룹내에 하나의 컨슈머가 토픽에 발행된 메시지를 처리하게 되는데 이 때 해당 컨슈머가 A의 연결정보를 가지고 있다라고 보장할 수 없는 상황이긴 마찬가지였다. 그렇기 때문에 채널을 구독하고 있는 모든 Subcriber가 메시지를 수신하고 해당 메시지를 처리할 수 있는 서버(Subscriber)에서 알람 전송을 처리할 수 있는 구조인 Redis Pub/Sub 구조를 채택하게 되었다.
먼저 레디스 채널에 전송할 Event 객체를 구현했다.
public class NotificationEvent {
String clientEmail;
Long targetEntityId;
Long relatedEntityId;
String message;
NotificationType notificationType;
}
각 필드에 대한 설명
1. SseEmitter 객체를 찾을때 clientEmail을 기반으로 연결을 찾도록 설계하였기 때문에 clientEmail 을 필드로 갖는다.
2. targetEntityId - 게시글에 댓글이 달렸다면 해당 게시글의 id 를 필드로 갖는다
3. relatedEntityId - 게시글에 댓글이 달렸다면 해당 댓글의 id 를 필드로 갖는다.
4. message - 이벤트 발생상황에 맞는 메시지
@Getter
public enum NotificationMessage {
REPLY_ADD("회원님의 게시글에 댓글이 달렸어요 :) ") ,
NESTED_REPLY_ADD("회원님의 댓글에 댓글이 달렸어요 :)"),
TODO_UPDATE("잊지 않으셨죠 ? 투두 리스트 제출 마감 시간은 오후 11시 입니다 :)"),
;
private final String message;
NotificationMessage(String message) {
this.message = message;
}
}
5. NotificationType - 각 상황에 맞는 알림 타입
@Getter
public enum NotificationType {
REPLY_ADD("댓글 추가", "/board",true),
NESTED_REPLY_ADD("대댓글 추가", "/board", true),
TODO_UPDATE("TODO 업데이트", null, true),
ACHIEVEMENT_FEEDBACK("성취도", null, true),
;
private final String value;
private final String baseTargetUrl;
private final Boolean isNeededSave;
NotificationType(String value, String baseTargetUrl,Boolean isNeededSave) {
this.value = value;
this.baseTargetUrl = baseTargetUrl;
this.isNeededSave = isNeededSave;
}
}
1. 위 dto 객체를 생성하여 스프링의 EventPublisher를 통해 각 알림 타입에 맞는 이벤트를 비동기 발행한다. 해당 이벤트는 댓글이 생성될 때 발행하게 되는데 해당 이벤트의 발행이 댓글 생성 및 저장에 영향을 주어서는 안되고 동기적으로 처리될 필요가 없다고 생각했기 때문에 비동기처리 하였다.
2. publishNotificationEvent 메서드에서 해당 이벤트를 수신하고 레디스 토픽("NotificationTopic")에 publish 한다.
3. 레디스 토픽을 구독하고 있는 서버에서 handleNotificationEvent 함수를 통해 토픽에 발행된 메시지를 수신하고 알람을 보낸다.
@Async
public void publishReplyAddEvent(Board board, Reply reply) {
Member boardWriter = board.getMember();
Member replyWriter = reply.getMember();
if(reply.getParent() != null){
eventPublisher.publishEvent(new NotificationEvent(
reply.getParent().getMember().getEmail(),
board.getId(),
reply.getId(),
NotificationMessage.NESTED_REPLY_ADD.getMessage(),
NotificationType.NESTED_REPLY_ADD));
}
if (!boardWriter.getEmail().equals(replyWriter.getEmail())) {
eventPublisher.publishEvent(new NotificationEvent(
boardWriter.getEmail(),
board.getId(),
reply.getId(),
NotificationMessage.REPLY_ADD.getMessage(),
NotificationType.REPLY_ADD));
}
@Async
@EventListener
public void publishNotificationEvent(NotificationEvent event){
try {
String channelEvent = objectMapper.writeValueAsString(event);
redisOperations.convertAndSend(notificationTopic.getTopic(), channelEvent);
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
throw new InvalidRedisMessageException(INVALID_REDIS_MESSAGE);
}
}
public void handleNotificationEvent(String message) {
NotificationEvent event = convertToObject(message, NotificationEvent.class);
Member member = memberService.findByEmail(event.getClientEmail());
Notification notification = Notification.from(member, event);
if (notification.getNotificationType().getIsNeededSave()) {
notificationService.save(notification);
}
List<SseEmitter> emitters = sseEmitterService.findAllStartWithByMemberEmail(member.getEmail());
if (!emitters.isEmpty()) {
String response = convertToJson(NotificationResponse.from(notification));
emitters.forEach(emitter -> sseEmitterService.sendToClient(
emitter,
sseEmitterService.generateUniqueClientId(member.getEmail(), LocalDateTime.now()),
NOTIFICATION_EVENT_NAME,
response)
);
}
}
만약 SSE 연결이 끊겨있는 상황에서는 알람을 수신하지 못하기 때문에 알람을 RDB 에 저장해놓고 유저가 다시 SSE 연결을 맺었을 때 받지 못했던 알람을 수신할 수 있게 하였다. 구조를 개선하기 전 알람 기능의 문제는 결국 stateful 하게 설계되었기 때문에 발생하는 문제였고 stateless 하게 변경하면서 문제를 개선할 수 있었다.
결론은 알람이 이벤트가 어디에서 발생이 되어도 분산되어 있는 서버 중 처리할 수 있는 서버에서 알람을 처리할 수 있게 되었다.