개인 프로젝트에서 진행했던 알람 시스템 구현에 대해서 포스팅 하려한다. 

알람 시스템을 구현하기 위해서는 서버의 알람 이벤트를 클라이언트에게 전송할 수 있어야 하는데  이 때 고려했던 방법은 SSE 연결과 웹소켓을 이용한 연결이었다. 알람 전송을 위해서는 단방향 전송이 필요했기 때문에 SSE 연결을 통해 알람 기능을 구현하고자 했다. 


알람 구독 시나리오는 다음과 같다. 클라이언트 사이드에서는 javascript 가 제공하는 eventsource 객체를 통해 서버의 /subscribe 로 연결 요청을 보내고 연결을 맺는다. 서버에서는 in-memory 방식으로 해당 클라이언트의 연결정보를 관리하면서 연결을 유지한다. 아래는 클라이언트와 서버와 Sse 연결을 맺는 기본 구현 코드이다. 

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(
        @CurrentMember Member member,
        @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
    return ResponseEntity.ok(sseEmitterService.subscribe(member, lastEventId));
}
public SseEmitter subscribe(Member member, String lastEventId) {
    SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
    String uniqueEmitterId = generateUniqueClientId(member.getEmail(),LocalDateTime.now());
    sseEmitterRepository.save(uniqueEmitterId, emitter);

    emitter.onCompletion(() -> sseEmitterRepository.deleteById(uniqueEmitterId));
    emitter.onTimeout(() -> sseEmitterRepository.deleteById(uniqueEmitterId));
    emitter.onError((e) -> sseEmitterRepository.deleteById(uniqueEmitterId));

    sendToClient(emitter,uniqueEmitterId,"init","init");

    if(lastEventId != null){
        LocalDateTime dateTime = extractDateTime(lastEventId);
        List<Notification> notifications = notificationService.findAllAfterDateTime(dateTime, member);
        notifications.forEach(
                notification -> sendToClient(emitter,
                        uniqueEmitterId,
                        "notification",
                        NotificationResponse.from(notification))
        );
    }

    return emitter;
}
public void sendToClient(SseEmitter emitter, String id, String name, Object data) {
    try {
        emitter.send(
                SseEmitter.event()
                        .id(id)
                        .name(name)
                        .data(data)
        );
    } catch (IOException e) {
        log.error(e.getMessage(), e);
        sseEmitterRepository.deleteById(id);
        throw new RuntimeException(FAILED_SSE_NOTIFICATION_SEND.getMessage());
    }
}
@Repository
public class SseEmitterRepository {
    private Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();

    public List<SseEmitter> findAllStartWithByMemberEmail(String email) {
        return emitterMap.entrySet().stream()
                .filter(entry-> entry.getKey().startsWith(email))
                .map(Entry::getValue)
                .toList();
    }

    public SseEmitter save(String id, SseEmitter emitter) {
        return emitterMap.put(id, emitter);
    }

    public void deleteById(String uniqueEmitterId) {
        emitterMap.remove(uniqueEmitterId);
    }

    public void deleteAll(){
        emitterMap.clear();
    }
}

 

톰캣의 쓰레드풀에 있는 각 쓰레드당 하나의 연결 요청을 처리하기 때문에 공유 자원에 대한 경쟁상황이 생길 수 있으므로

ConcurrentHashMap을 사용하여 SseEmitter 객체를 관리한다. 


 

해당 프로젝트에서 알람이 발생하는 상황은 다음과 같다.

  •  A유저의 게시글에 B 유저가 댓글을 달았을 때
  •  A유저의 댓글에 B 유저가 대댓글을 달았을 떄
  • Todo 업데이트 마감 시간 공지
  • 새벽 1시 각 유저에게 성취도 피드백 메시지 공지 

단일서버에서 1번 상황이 발생했다고 가정했을 때는 모든 요청이 하나의 서버로만 전달 되기때문에 해당 서버는 반드시 알람 구독을 맺고있는 클라이언트의 정보를 갖고 있을 것이다. 그렇기 때문에 A의 게시글에 B 의 댓글이 등록될 때 댓글 등록 이벤트를 발생시키고 이벤트 리스너에서 해당 이벤트를 수신하여 A에게 알람을 전송하면 문제가 없다. 

 

하지만 본 프로젝트는 단일서버가 아닌 Scale-out 된 분산서버 환경을 고려하고 설계하였기 때문에 서버가 여러개인 상황을 가정하면, 댓글 등록 이벤트를 발생시킨 서버에서 A유저의 연결정보가 서버에 존재한다고 보장할 수 없다. 왜냐하면 DB에 따로 연결정보를 관리하지 않고 in-memory 형식으로 연결정보를 관리하고 있기 때문이다. 

그렇기 때문에 댓글 등록 이벤트가 발생했을 때 A의 연결정보를 갖고 있는 서버에서 댓글 등록 이벤트를 처리하고 알림을 전송할 수 있도록 하는 구조설계가 필요했다. 

 

위와 같은 이유로 Kafka 를 통한 Producer-Consumer 그리고 Redis Pub/Sub 두가지를 고려했다. Kafka 는 토픽을 구독하는 컨슈머 그룹내에 하나의 컨슈머가 토픽에 발행된 메시지를 처리하게 되는데 이 때 해당 컨슈머가 A의 연결정보를 가지고 있다라고 보장할 수 없는 상황이긴 마찬가지였다. 그렇기 때문에 채널을 구독하고 있는 모든 Subcriber가 메시지를 수신하고 해당 메시지를 처리할 수 있는 서버(Subscriber)에서 알람 전송을 처리할 수 있는 구조인 Redis Pub/Sub 구조를 채택하게 되었다. 

 

먼저 레디스 채널에 전송할 Event 객체를 구현했다.

public class NotificationEvent {
    String clientEmail;
    Long targetEntityId;
    Long relatedEntityId;
    String message;
    NotificationType notificationType;
}

 

각 필드에 대한 설명

1. SseEmitter 객체를 찾을때 clientEmail을 기반으로 연결을 찾도록 설계하였기 때문에 clientEmail 을 필드로 갖는다.

2. targetEntityId - 게시글에 댓글이 달렸다면 해당 게시글의 id 를 필드로 갖는다

3. relatedEntityId - 게시글에 댓글이 달렸다면 해당 댓글의 id 를 필드로 갖는다.

4. message - 이벤트 발생상황에 맞는 메시지 

@Getter
public enum NotificationMessage {
    REPLY_ADD("회원님의 게시글에 댓글이 달렸어요 :) ") ,
    NESTED_REPLY_ADD("회원님의 댓글에 댓글이 달렸어요 :)"),
    TODO_UPDATE("잊지 않으셨죠 ? 투두 리스트 제출 마감 시간은 오후 11시 입니다 :)"),
    ;
    private final String message;

    NotificationMessage(String message) {
        this.message = message;
    }
}

5. NotificationType - 각 상황에 맞는 알림 타입

@Getter
public enum NotificationType {
    REPLY_ADD("댓글 추가", "/board",true),
    NESTED_REPLY_ADD("대댓글 추가", "/board", true),
    TODO_UPDATE("TODO 업데이트", null, true),
    ACHIEVEMENT_FEEDBACK("성취도", null, true),
    ;

    private final String value;
    private final String baseTargetUrl;
    private final Boolean isNeededSave;

    NotificationType(String value, String baseTargetUrl,Boolean isNeededSave) {
        this.value = value;
        this.baseTargetUrl = baseTargetUrl;
        this.isNeededSave = isNeededSave;
    }
}

 

1. 위 dto 객체를 생성하여 스프링의 EventPublisher를 통해 각 알림 타입에 맞는 이벤트를 비동기 발행한다. 해당 이벤트는 댓글이 생성될 때 발행하게 되는데 해당 이벤트의 발행이 댓글 생성 및 저장에 영향을 주어서는 안되고 동기적으로 처리될 필요가 없다고 생각했기 때문에 비동기처리 하였다. 

2. publishNotificationEvent 메서드에서 해당 이벤트를 수신하고 레디스 토픽("NotificationTopic")에 publish 한다.

3. 레디스 토픽을 구독하고 있는 서버에서 handleNotificationEvent 함수를 통해 토픽에 발행된 메시지를 수신하고 알람을 보낸다. 

@Async
public void publishReplyAddEvent(Board board, Reply reply) {
    Member boardWriter = board.getMember();
    Member replyWriter = reply.getMember();

    if(reply.getParent() != null){
        eventPublisher.publishEvent(new NotificationEvent(
                reply.getParent().getMember().getEmail(),
                board.getId(),
                reply.getId(),
                NotificationMessage.NESTED_REPLY_ADD.getMessage(),
                NotificationType.NESTED_REPLY_ADD));
    }

    if (!boardWriter.getEmail().equals(replyWriter.getEmail())) {
        eventPublisher.publishEvent(new NotificationEvent(
                boardWriter.getEmail(),
                board.getId(),
                reply.getId(),
                NotificationMessage.REPLY_ADD.getMessage(),
                NotificationType.REPLY_ADD));
    }
@Async
@EventListener
public void publishNotificationEvent(NotificationEvent event){
    try {
        String channelEvent = objectMapper.writeValueAsString(event);
        redisOperations.convertAndSend(notificationTopic.getTopic(), channelEvent);
    } catch (JsonProcessingException e) {
        log.error(e.getMessage(), e);
        throw new InvalidRedisMessageException(INVALID_REDIS_MESSAGE);
    }
}
public void handleNotificationEvent(String message) {
    NotificationEvent event = convertToObject(message, NotificationEvent.class);

    Member member = memberService.findByEmail(event.getClientEmail());

    Notification notification = Notification.from(member, event);
    if (notification.getNotificationType().getIsNeededSave()) {
        notificationService.save(notification);
    }

    List<SseEmitter> emitters = sseEmitterService.findAllStartWithByMemberEmail(member.getEmail());
    if (!emitters.isEmpty()) {
        String response = convertToJson(NotificationResponse.from(notification));
        emitters.forEach(emitter -> sseEmitterService.sendToClient(
                emitter,
                sseEmitterService.generateUniqueClientId(member.getEmail(), LocalDateTime.now()),
                NOTIFICATION_EVENT_NAME,
                response)
        );
    }
}

 

만약 SSE 연결이 끊겨있는 상황에서는 알람을 수신하지 못하기 때문에  알람을 RDB 에 저장해놓고 유저가 다시 SSE 연결을 맺었을 때 받지 못했던 알람을 수신할 수 있게 하였다. 구조를 개선하기 전 알람 기능의 문제는 결국 stateful 하게 설계되었기 때문에 발생하는 문제였고 stateless 하게 변경하면서 문제를 개선할 수 있었다.

결론은 알람이 이벤트가 어디에서 발생이 되어도 분산되어 있는 서버 중 처리할 수 있는 서버에서 알람을 처리할 수 있게 되었다.

사전 설정

build.gradle 의존성 추가

implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'org.springframework.boot:spring-boot-starter-actuator'

application-dev.yaml 설정 추가

server:
  tomcat:
    mbeanregistry:
      enabled: true


 management:
     endpoints:
        health:
            show-details: always
        web:
            exposure:
                include: *

 

병목 지점 디버깅용 Aspect 추가 

Controller, Service , Repository Layer 각 함수 실행시간 로깅

@Aspect
@Component
@Slf4j
public class ExecutionTimeAspect {
    @Pointcut("execution(* com.eighttoten.service..*(..)) ||"
            + "execution(* com.eighttoten.controller..*(..)) ||"
            + "execution(* com.eighttoten.repository..*(..)))")
    public void businessLogicPointcut(){}

    @Around("businessLogicPointcut()")
    public Object loggingMethodExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
        long start = System.currentTimeMillis();
        Object result = joinPoint.proceed();

        long executionTime = System.currentTimeMillis() - start;
        log.info("Method name : {} , executed in {} ms", joinPoint.getSignature(), executionTime);
        return result;
    }
}

 

로그인 시나리오 (로컬에서 테스트 진행) - k6 사용

 

- 10개의 동시요청을 19번 반복 (총 190번의 요청)

- 로컬에서 진행하였기 때문에 네트워크 병목은 거의 없다.

- p(95) 를 기준으로 705ms 정도 나오는것을 확인할 수 있었다. 

 

우선 로그인 시나리오는 다음과 같다.

 

POST localhost:8080/login -> EmailPasswordAuthenticationFilter -> CustomAuthenticationProvider -> MemberDetailsService.loadUserByUsername(조회 쿼리) -> AuthSuccessHandler -> authService.findByEmail(조회 쿼리) -> authRepository.save(쓰기 쿼리)

 

병목지점 찾기 

- 병목 지점을 찾기위해서 ExecutionTimeAspect 로 로깅 + Intellij Profiler 를 활용하여 실행시간 추가 로깅 

 

로깅을 해본 결과 BCryptPasswordEncoder.matches 메서드에서 전체 실행시간의 94프로를 차지하고 있는 것을 확인할 수 있었다. 

 

어플리케이션 설계상 해당 부분은 남겨두고 먼저 해결할 수 있는 부분에 집중하기로 했다.

 

해당 부분 병목에 비하면 작은 수준이지만 문제가 있는 부분은 refreshToken 저장을 위한 조회와 저장쿼리를 날린다는 것인데 즉 
유저 n명당 auth테이블에 접근하는 쿼리가 2개 발생하게 된다. 즉 n+1 문제가 발생하게 된다. 

 

 

이 부분을 해결하기 위해 고려한 방식은 위 테이블 구조처럼 물리적으로는 분리되어 있는 테이블 두개를 조인해서 Auth,Member를 필드로 갖는 Dto를 하나 만들어서 두개를 동시에 조회해서 사용하면 어떨까 라고 생각했는데 애초에 인증절차를 진행하는 부분과 , 인증 테이블에 인증객체를 넣어주는 방식이 분리되어 작동 하기 때문에 같은 트랜잭션에 소속될 수 없고 결국 조회쿼리를 한번 더 날려야하는 구조였기에 맞지 않다고 생각을 했다.

 

RefreshToken 자체가 유저가 로그인 할 때마다 매번 갱신되는 데이터이기도 하고 재인증 과정에서 자주 사용되는 데이터라는 점, saveOrUpdate시 조회,저장쿼리 모두 날리는 상황에서 하나의 쿼리로 처리할 수 있다는 부분에서 Auth 정보를 메모리에 저장하여 사용하는 방식을 택했다. 레디스 사용을 위한 구조 개선에 대한 코드는 본 포스트에서는 다루지 않겠다. 

 

Redis 적용전

 

 

Redis 적용후

Redis 적용전 (rps)

730ms (10개 동시요청 19번 반복 )

 

Redis 적용후

authService.save : 420ms (10개 동시요청 19번 반복)

 

위 케이스에서는 유저 1000명에 대해서 테스트를 진행했다. 드라마틱한 개선효과는 아니지만 아주 .. 약간의 개선은 이루어 졌지만 미미하다고 생각이 든다. 하지만 유저테이블에 인덱스가 걸려있지 않은 상황이고 유저의 수가 10만 -> 100만 혹은 더 많은 유저가 존재한다고 생각한다면 유저테이블을 풀스캔해야하고 선형적으로 조회 시간이 증가할 것이고 더 큰 개선효과가 생기지 않을까 생각해본다.

 

소규모 어플리케이션의 유저수를 기준으로 봤을 때는 성능 개선보다는 세션 저장을 위한 구조적인 이점을 가져수 있었다는 점이 더 크지 않을까 

 

실질적으로 문제를 해결하기 위해서는 bcrypt의 matches  연산이 cpu 집약적인 연산이기 때문에 cpu 코어를 늘려 가용가능한 쓰레드 수를 늘리거나 보다 실용적인 방법으로는 서버를 scale-out 하는 방법이 있을 것 같다. 

 

+ Recent posts