본문 바로가기
spring & java

[Spring] SSE + Redis pub/sub

by do5do 2024. 1. 18.

이번에 개인프로젝트를 진행하면서 실시간 알림을 구현했는데, 해당 내용에 대해 정리해보고자 한다.

 

먼저 SSE에 대해 이해를 해야한다.

 

SSE

전통적인 방식

웹 브라우저에서 전통적으로 변경된 데이터를 얻기 위해서는 클라이언트가 서버에 요청을 해야한다. 대표적인 방식이 Polling이다. Polling은 일정한 주기를 가지고 서버에 요청/응답을 하는 방식으로 서버에 변경 사항이 있는지는 클라이언트의 요청을 통해서 알 수 있다. 이는 클라이언트가 요청을 보냈을 때 변경 사항이 없다면 리소스 낭비가 되고, 짧은 주기로 지속적으로 요청하는 것은 서버에 부담이 될 수 있다. 

 

Polling을 개선한 방식이 Long-polling인데, 이는 클라이언트가 요청을 보냈을 때 서버측에서 연결을 열어두는 시간을 길게 가진다. 그 사이에 이벤트가 발생하면 클라이언트에게 응답을 보내고, 곧바로 클라이언트는 다시 요청을 보내고 기다린다.

 

Long-polling은 polling에 비해 서버의 부담이 덜어지긴 하지만, 이벤트 발생의 주기가 짧다면 Polling과 별차이가 없다. 또한 다수의 클라이언트가 요청을 하고 이벤트가 많다면 서버의 부담이 급격하게 증가하게 된다.

 

Streaming

클라이언트와 서버간에 연결된 통로로 데이터를 보내는 방식이다. 서버에서 클라이언트로 이벤트를 전송할 때 연결을 끊지 않고 메세지만 보내는 것을 반복한다. SSE(Server-Sent Events)에서 사용되는 기술이 해당 방식이다.

 

요약하자면, SSE는 클라이언트가 서버와 한번 연결을 맺고나면 일정 시간 동안 서버에 변경이 발생할 때마다 데이터를 전송 받는 방식이다. 응답마다 다시 요청해야하는 Long-polling 방식보다 효율적이다. 다만, 클라이언트는 서버에서 오는 데이터를 받기만 하는 단방향 연결이다. (소켓과의 차이)

 

소켓은 HTTP가 아닌 양방향 통신을 위한 프로토콜을 사용해야하지만, SSE는 소켓과 유사한 역할을 하면서 HTTP 통신이기에 동작 방식을 이해한다면 소켓보다 가볍고 비교적 적용하기 쉽다.

 

그럼 구현 전에 흐름부터 파악하자.

 

Flow

목표는 연결된 클라이언트에게 이벤트가 발생하면 실시간으로 데이터를 전송하는 것이다.

연결된 클라이언트라는 것은 현재 사이트에서 활동 중인 유저를 말한다. 이는 다시 말하면 로그인 중인 유저이다.

 

1. 로그인 직후 클라이언트는 서버에게 연결 요청을 한다.

2. 서버는 해당 클라이언트의 연결 정보를 식별할 수 있도록 해야한다. (유저에 따라 알림을 보내기 위함)

3. 해당 유저에게 이벤트 발생 시 이벤트 데이터를 전송한다.

 

이제 이 흐름대로 구현해주면 된다. 

 

구독 요청 

클라이언트는 서버에게 연결 요청을 하려면 SSE를 구독하는 API가 필요하다.

 

NotificationController

@RequestMapping("/notification")
@RequiredArgsConstructor
@RestController
public class NotificationController {

    private final NotificationService notificationService;
    
    // 응답 시 MIME 타입을 text/event-stream으로 보내야한다.
    @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe(
            @AuthenticationPrincipal UserDetails userDetails) {
        if (ObjectUtils.isEmpty(userDetails)) { // 이유가 있어서 예외처리 하지만 여기선 설명하지 않는다.
            throw new NotificationException(INVALID_REQUEST, "연결 시 인증이 필요합니다.");
        }
        return ResponseEntity.ok(notificationService.subscribe(userDetails.getUsername()));
    }
    ...
}

 

구독 요청에 대한 응답 시 MIME 타입을 text/event-stream으로 보내서 streaming이라는 것을 알려줌으로써 클라이언트는 서버와의 연결을 끊지 않는다. 

 

*참고로 produces는 반환하는 데이터 타입을 정의하고, consumes는 들어오는 데이터 타입을 정의한다. (클라이언트에서 구독 요청을 할 때의 mediaType도 text/event-stream이다.)

 

NotificationService

@Slf4j
@RequiredArgsConstructor
@Service
public class NotificationService {

    private final SseEmitterRepository sseEmitterRepository;

    @Value("${sse.timeout}")
    private Long timeout; // emitter 만료 시간 -> 1분으로 설정

    public SseEmitter subscribe(String memberKey) {
        SseEmitter sseEmitter = new SseEmitter(timeout); // emitter 생성
        sseEmitterRepository.save(memberKey, sseEmitter);
		
        // emitter handling
        sseEmitter.onTimeout(sseEmitter::complete);
        sseEmitter.onError((e) -> sseEmitter.complete());
        sseEmitter.onCompletion(() -> sseEmitterRepository.delete(memberKey));
		
        // dummy data 전송
        send(MsgFormat.SUBSCRIBE, memberKey, sseEmitter);
        return sseEmitter;
    }
    
    private void send(Object data, String emitterKey, SseEmitter sseEmitter) {
        try {
            log.info("send to client {}:[{}]", emitterKey, data);
            // 이벤트 데이터 전송
            sseEmitter.send(SseEmitter.event()
                    .id(emitterKey)
                    .data(data, MediaType.APPLICATION_JSON)); // data가 메시지만 포함된다면 타입을 지정해줄 필요는 없다.
        } catch (IOException | IllegalStateException e) {
            log.error("IOException | IllegalStateException is occurred. ", e);
            sseEmitterRepository.deleteById(emitterKey);
        }
    }
    
    ...
}

 

Spring에서는 SSE를 구현한 SseEmitter 객체를 제공해준다.

덕분에 간편하게 SSE 이벤트를 전송할 수 있다.

 

1. SseEmitter 생성

emitter를 생성할 때는 타임아웃을 설정해줄 수 있는데, 기본 값은 30초인 것으로 알고 있다.

 

2. Emitter handling

emitter가 timeout이 되었을 때, 에러가 났을 때 모두 complete 메서드를 실행한다. (SseEmitter의 내부 구현을 보면 콜백 메서드를 지정할 수 있다.)

 

onCompletion은 완료된 emitter에 대한 처리인데, emitter 생성 후 저장해줬던 값을 삭제한다. 

SseEmitter가 만료되면 클라이언트가 재연결 요청을 보내는데, 그때마다 새로운 emitter를 생성하게 되니 만료된 값은 삭제 처리를 하는 것이다. (emitter를 저장할 때 Map을 사용하고, 키가 동일하다면 굳이 삭제해줄 필요는 없겠지만 명시적으로는 괜찮다고 본다.)

 

3. dummy data 전송

SseEmitter 생성 이후 어떠한 데이터도 전송하지 않으면 타임아웃 되면서 503에러를 뱉는다고 한다.

구현 전 SSE에 대해 찾아보는 과정에서 주의 사항에 대해 미리 확인했었기 때문에 이 부분을 직접 겪진 않았다.

 

send() 메서드에서 예외처리하는 IllegalStateException은 만료된 emitter로 send() 메서드를 호출하면 발생하는 에러이다. (내부 코드에서 확인 가능) 에러 발생 시 해당 emitter를 지워준다.

 

(+) 추가로 이벤트를 전송할 때 .name("event-name")으로 이벤트 이름을 지정할 수 있고, 클라이언트에서는 지정된 이름의 알림을 받을 수 있다.

 

SseEmitterRepository

@Repository
public class SseEmitterRepository {
	
    // thread-safe한 자료구조를 사용한다.
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    public SseEmitter save(String eventId, SseEmitter sseEmitter) {
        emitters.put(eventId, sseEmitter);
        return sseEmitter;
    }

    public Optional<SseEmitter> findById(String memberId) {
        return Optional.ofNullable(emitters.get(memberId));
    }

    public void deleteById(String eventId) {
        emitters.remove(eventId);
    }
}

 

SseEmitter는 메모리에 저장해둔다. 

메모리에 저장해두는 이유는 빈번하게 추가/삭제가 일어나기도 하고, 영속성이 필요한 데이터가 아니기때문에 빠르고 쉽게 처리하기 위함이라고 생각했다.

 

참고)

처음 구현할 때, 메모리에 저장하는 방법은 다른 인스턴스에서는 접근하지 못하니 안 좋은 방법일 것 같아서 Redis(별도 서버로 뒀다고 가정)에 저장하려고 했었다. 하지만 SseEmitter는 serializer가 구현되어 있지 않아서(직렬화 x) 애초에 redis에 저장할 수 없었다.

-> 이 고민에 대해서는 Redis pub/sub으로 해결하였다. (밑에서 설명)

 

Thread-safe한 자료구조

여기서 짚고 가야할 부분은 ConcurrentHashMap을 사용했다는 점이다.

여러 스레드가 동시에 접근해도 안전한 자료구조인데, 이를 사용한 이유는 emitter를 핸들링하는 객체는 SseEmitter 자신이 아니라 그 위 부모인 ResponseBodyEmitter가 내부 필드로 가지고 있는 Handler이기 때문이다. 이 handler는 별개의 스레드로 동작하기 때문에 스레드에 안전한 자료구조를 사용해야 한다.

 

여기까지하면 구독 요청에 대한 부분은 끝났다.

이제 event가 발생했을 때 emitter로 알림을 전송해주면 된다.

 

알림 이벤트 전송

프로젝트에서 SSE 이벤트가 발생하는 시점은 펀딩이 종료되었을 때 성공/실패 여부를 펀딩한 유저들에게 알림으로 전송해줘야 했다. 

 

이벤트 처리는 Spring의 Event를 사용했다.

펀딩이 종료 되는 시점에 알림 이벤트를 발행하고, 해당 이벤트 리스너가 이벤트가 발행됐을 때 정의된 메서드를 실행한다.

 

NotificationEventPublisher

...
import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class NotificationEventPublisher {

    private final ApplicationEventPublisher eventPublisher; // 이벤트 발행을 처리하는 인터페이스

    public void publishEvent(NotificationEvent event) {
        eventPublisher.publishEvent(event); // publishEvent()로 이벤트를 발행한다.
    }
}

 

이벤트를 발행하는 객체이다.

 

NotificationEventHandler

...
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class NotificationEventHandler {

    private final NotificationService notificationService;

    @Async
    @EventListener // 이벤트 구독
    public void handleEvent(NotificationEvent event) {
        notificationService.sendNotification(event); // 알림 발송 메서드
    }
}

 

@EventListener로 이벤트를 구독할 수 있으며,

발행한 이벤트 타입(NotificationEvent)을 인자로 받는 리스너가 이벤트를 처리한다.

 

여기서는 알림을 받아야하는 펀딩한 유저가 다수이기 때문에 빠르게 발송하기 위해 비동기처리 하였다.  (Thread pool 설정은 해뒀었다.)

 

NotificationService

...

@Transactional
public void sendNotification(NotificationEvent event) {
    String key = event.memberKey();
    Member member = authenticationService.getMemberOrThrow(key);
    Notification notification =
            Notification.of(event.message(), event.notificationType(), event.relatedUri());
    notification.addMember(member);
    notificationRepository.save(notification);

    // 유저와 연결된 emitter를 찾는다.
    sseEmitterRepository.findById(key)
                .ifPresent(emitter -> 
                        send(NotificationDto.fromEntity(notification), key, emitter)); // 데이터 전송
}

 

유저에게 emitter로 알림을 전송하는 메서드다. 

유저와 연결된 emitter를 찾고, 해당 emitter로 클라이언트에게 데이터를 전송한다. (send() 메서드는 위에 구독 요청 부분에서 확인 가능)

 

이렇게 sse 알림 구현은 끝났다. (1차 완료)

 

알림 전송 테스트

브라우저에서 직접 테스트를 해보았다. 프론트를 만들기엔 여력도 능력도 부족해서 브라우저의 콘솔창을 이용해보았다.

SSE 이벤트 전송 테스트

테스트에서 사용한 구독 요청 api는 위에서 보여준 api 명세와는 다를 것이다. 기존의 api는 토큰을 받아야하는데, EventSource()는 헤더를 지원하지 않기때문에 테스트를 위해 임시로 명세를 바꿨다. (실제 구현 시 EventSource에서 헤더를 전달할 수 있도록 라이브러리를 사용한다고 한다.)

 

첫번째 로그는 SSE 구독 시 전송되는 더미 데이터이고, 두번째 로그인 테스트 메시지는 이벤트 발행 후 전송된 메세지이다.

잘 된다!

 

브라우저를 끄거나 sse.close();를 할 때까지 연결을 끊지 않고 계속 유지하는 것을 확인할 수 있었다.

 

추가 구현

지금까지 구현한 SSE 알림 발송 기능은 단일 서버에서는 문제가 없다. 하지만 다중 서버 환경에서는 아니다.

SSE 연결 정보를 각각의 인스턴스 메모리에 저장하기 때문에 서로 공유할 수 없어서 이벤트 발행 서버와 유저가 구독한 서버가 서로 다르다면 알림을 전송하지 못한다.

 

이러한 문제가 있기 때문에 다중 서버 환경에서는 보통 Redis의 pub/sub 기능으로 해결한다고 한다.

 

Redis pub/sub

먼저 Redis에서 제공하는 pub/sub 기능은 위에서 살펴봤던 이벤트 발행/구독과 개념은 비슷하다.

이벤트를 발행하는 Publisher가 특정 채널로 메시지를 발행하면, redis가 해당 채널을 구독하고 있는 Subscriber에게 메시지를 전송한다. 이때 메시지는 보관되지 않고 전송되면 끝이라는 특징이 있다. (받든 말든 상관없이 보내면 끝)

 

pub/sub에 대해 간단하게 알아봤고, 

그럼 이제 다중 서버 환경에서 SSE에 Redis의 pub/sub을 적용하면 어떤식으로 흘러가는지 보자.

 

다이어그램

SSE + Redis pub/sub diagram

 

위 다이어그램은 Redis pub/sub을 적용하면서 개인적인 이해를 바탕으로 그려본 것이다.

그러니 잘못된 부분이 있을 수도 있다.

 

참고) 여기서 Batch Operation은 이벤트 발생을 의미한다. 펀딩이 종료된 시점에 성공/실패 여부를 펀딩한 유저들에게 알림으로 전송하는데, 펀딩 종료 시점을 알기위해 배치를 사용했기 때문에 해당 워딩으로 하였다.

 

동작 설명

1. 유저가 로그인을 한다.

-> load balancer에 의해 1번 서버로 요청이 간다. (가정)

 

2. 유저(1234)는 1번 서버와 SSE 지속 연결을 한다.

-> SSE 연결 시 Subscriber가 유저 id를 채널명(channel:1234)으로 하여 구독한다. 

 

3. 3번 서버에서 지정된 시간에 배치가 동작한다. (가정)

-> 완료된 펀딩을 한 유저들에게 알림 이벤트가 발행된다. (유저(1234)가 알림을 받을 유저에 포함되어 있다고 가정)

-> 해당 유저들의 채널로 Publisher가 메시지를 발행한다.

-> Redis가 해당 채널을 구독한 Subscriber에게 메시지를 전달한다.

-> 구독한 Subscriber가 있는 서버에서 유저의 emitter를 찾아 클라이언트에게 데이터를 전송한다.

 

이와 같은 과정으로 위에서 구현한 sse 알림 처리 로직을 리팩터링 해보자.

먼저 각각의 역할을 구분해준다.

 

SseEmitterService

@Slf4j
@RequiredArgsConstructor
@Service
public class SseEmitterService {

    private final SseEmitterRepository sseEmitterRepository;

    @Value("${sse.timeout}")
    private Long timeout;

    public SseEmitter createEmitter(String emitterKey) {
        return sseEmitterRepository.save(emitterKey, new SseEmitter(timeout));
    }

    public void deleteEmitter(String emitterKey) {
        sseEmitterRepository.deleteById(emitterKey);
    }

    public void sendNotificationToClient(String emitterKey, NotificationDto notificationDto) {
        sseEmitterRepository.findById(emitterKey)
                .ifPresent(emitter -> send(notificationDto, emitterKey, emitter));
    }

    public void send(Object data, String emitterKey, SseEmitter sseEmitter) {
        try {
            log.info("send to client {}:[{}]", emitterKey, data);
            sseEmitter.send(SseEmitter.event()
                    .id(emitterKey)
                    .data(data, MediaType.APPLICATION_JSON));
        } catch (IOException | IllegalStateException e) {
            log.error("IOException | IllegalStateException is occurred. ", e);
            sseEmitterRepository.deleteById(emitterKey);
        }
    }
}

 

NotificationService에 있던 SseEmitter 관련 로직들을 따로 분리하여 SseEmitterService를 만들었다.

 

RedisMessageService

...
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;

@RequiredArgsConstructor
@Service
public class RedisMessageService {

    private final RedisMessageListenerContainer container;
    private final RedisSubscriber subscriber; // 따로 구현한 Subscriber
    private final RedisTemplate<String, Object> redisTemplate;

    // 채널 구독
    public void subscribe(String channel) {
        container.addMessageListener(subscriber, ChannelTopic.of(getChannelName(channel)));
    }

    // 이벤트 발행
    public void publish(String channel, NotificationDto notificationDto) {
        redisTemplate.convertAndSend(getChannelName(channel), notificationDto);
    }

    // 구독 삭제
    public void removeSubscribe(String channel) {
        container.removeMessageListener(subscriber, ChannelTopic.of(getChannelName(channel)));
    }

    private String getChannelName(String id) {
        return CHANNEL_PREFIX + id;
    }
}

 

Redis의 채널을 구독하고 이벤트를 발행하는 기능을 하는 RedisMessageService 객체를 따로 생성했다.

(메시지를 발행하는 Publisher는 로직이 단순해서 따로 분리하지 않고 RedisMessageService에 포함시켰다.)

 

RedisSubscriber

@Slf4j
@RequiredArgsConstructor
@Component
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final SseEmitterService sseEmitterService;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String channel = new String(message.getChannel())
                    .substring(CHANNEL_PREFIX.length());

            NotificationDto notificationDto = objectMapper.readValue(message.getBody(),
                    NotificationDto.class);

            // 클라이언트에게 event 데이터 전송
            sseEmitterService.sendNotificationToClient(channel, notificationDto);
        } catch (IOException e) {
            log.error("IOException is occurred. ", e);
        }
    }
}

 

채널을 구독하는 Subscriber 객체이다.

채널로 메시지가 발행되면 onMessage() 메서드가 실행된다. 여기서 해당 유저에게 SSE 이벤트를 전송하면 된다.

 

NotificationService

@Slf4j
@RequiredArgsConstructor
@Service
public class NotificationService {

    private final NotificationRepository notificationRepository;
    private final AuthenticationService authenticationService;
    private final SseEmitterService sseEmitterService;
    private final RedisMessageService redisMessageService;

    public SseEmitter subscribe(String memberKey) {
        SseEmitter sseEmitter = sseEmitterService.createEmitter(memberKey);
        sseEmitterService.send(MsgFormat.SUBSCRIBE, memberKey, sseEmitter); // send dummy

        redisMessageService.subscribe(memberKey); // redis 구독

        sseEmitter.onTimeout(sseEmitter::complete);
        sseEmitter.onError((e) -> sseEmitter.complete());
        sseEmitter.onCompletion(() -> {
            sseEmitterService.deleteEmitter(memberKey);
            redisMessageService.removeSubscribe(memberKey); // 구독한 채널 삭제
        });
        return sseEmitter;
    }

    @Transactional
    public void sendNotification(NotificationEvent event) {
        Member member = authenticationService.getMemberOrThrow(event.memberKey());
        Notification notification =
                Notification.of(event.message(), event.notificationType(), event.relatedUri());
        notification.addMember(member);
        notificationRepository.save(notification);

        // redis 이벤트 발행
        redisMessageService.publish(event.memberKey(), NotificationDto.fromEntity(notification));
    }

    ...
}

 

마지막으로 수정된 NotificationService이다. 

redis의 채널 구독 및 삭제는 emitter의 생명 주기와 같게 처리한다. 여기서 redis는 emitter의 이벤트 발송을 도와주는 역할이기 때문에 emitter가 기준이어야 한다고 생각했다.

 

여기까지 하면 리팩터링도 끝난다.

NotificationController의 SSE 구독 메서드는 이미 리팩터링 된 코드였다. 전,후가 크게 차이가 없기 때문이다.

 

정리 및 회고

긴 정리였다. 구현하기 전에는 엄청 헤맸다. 사실 알림처리 쉽게 보고 이틀이면 끝날줄 알았다.. 그런데... 추가 구현까지 총 5일은 걸린 것 같다. 막상하니 생각해야 할 부분도 많았고, 처음에 동작 방식에 대해 이해한다고 시간이 걸렸던 것 같다. 이벤트 구독, 발행에 대한 개념을 알고 나니 Redis pub/sub을 이해하는데 도움이 많이 됐던 것 같다.