SSE를 사용할 때 단일서버일 경우에는 문제가 없지만 서버를 Scale-out 할 때 문제가 발생한다.
그 이유는 위의 그림과 같이 사용자 A와 B의 접속 정보 즉 SseEmitter가 서버 메모리에 저장되어 있기 때문이다.
만약 사용자 A는 was1에 접속했고 사용자 B는 was2에 접속했다면 사용자 A가 B에게 메시지를 작성했다면 실시간 알림을 보낼 수 없다.
그렇다면 이 문제를 어찌 해결해야 할까?
고민을 하다가 찾은 방법은 Redis pub/sub이다.
레디스는 위 그림과 같이 Pub/Sub 기능을 제공한다. 특정 채널을 구독하는 subscriber 들에게 메시지를 브로드캐스트 하는 방법이다.
이를 이용해 어떤 애플리케이션 서버에서 새로운 알림을 발송해야 한다면 그 알림을 다른 모든 애플리케이션 서버에 브로드캐스트 하는 방식으로 알림을 발송한다면 어떨까?
이 구조에서는 scale-out된 모든 애플리케이션 서버가 Publisher이자 Consumer가 된다.
그리고 한 서버의 알림이 발생하면 그 사실을 다른 모든 애플리케이션 서버에 브로드캐스트 하는 방식이다. 브로드캐스트할 때 새로운 알림을 메시지로 전달한다.
내편 서비스에서는 중복로그인이 3개까지 허용가능했다. 핸드폰, 노트북, 데스크탑 등등 동시에 로그인한 유저에게 동일한 실시간 알림을 발송해야 했기때문에 Redis Pub/Sub이 딱 알맞은 대안이라고 생각했다.
그렇다면 메시지 큐 서비스 같은것은 사용 못하는 건가?
Pub/Sub과 메시지 큐의 Producer/Consumer의 차이가 존재한다.Pub/Sub 모델은 특정 채널에 publisher가 메시지를 발행(publish)하면 그 채널을 구독하고 있는 모든 subscriber에게 메시지를 브로드캐스트한다.
다시 말하자면 모든 구독자가 메시지를 수신한다는 의미이다.
반면, Producer/Consumer 모델의 경우 producer가 한번 발행한 메시지(이벤트)를 가장 먼저 consume한 consumer만 해당 메시지를 읽을 수 있다.
즉, 작업이 한번만 실행되도록 하고싶을 때 사용한다는 의미이다.
이제 코드를 본격적으로 살펴보자!
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
/** RedisConnectionFactory를 통해 내장 혹은 외부의 Redis를 연결한다.
* Lettuce를 사용했다. 가장 많이 사용되는 라이브러로 Spring Data Redis에
* 내장되어있다.
*/
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
/**
* RedisOperations은 RedisTemplate의 인터페이스이다. 인터페이스로 정의한 이유는
* 테스트 코드에서 테스트하기 편하게 하기 위해서이다.
* redisOperations를 통해 RedisConnection에서 넘겨준 byte 값을 객체 직렬화한다.
* Redis와 통신할때 사용
*/
@Bean
public RedisOperations<String, NotificationResponseDto> eventRedisOperations(
RedisConnectionFactory redisConnectionFactory, ObjectMapper objectMapper) {
final Jackson2JsonRedisSerializer<NotificationResponseDto> jsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
NotificationResponseDto.class);
jsonRedisSerializer.setObjectMapper(objectMapper);
final RedisTemplate<String, NotificationResponseDto> eventRedisTemplate = new RedisTemplate<>();
eventRedisTemplate.setConnectionFactory(redisConnectionFactory);
eventRedisTemplate.setKeySerializer(RedisSerializer.string());
eventRedisTemplate.setValueSerializer(jsonRedisSerializer);
eventRedisTemplate.setHashKeySerializer(RedisSerializer.string());
eventRedisTemplate.setHashValueSerializer(jsonRedisSerializer);
return eventRedisTemplate;
}
/**
* RedisMessageListenerContainer는 Spring Data Redis에서 제공하는 클래스이다.
* 컨테이너는 메시지가 도착하면 등록된 MessageListener를 호출하여 메시지를 처리한다.
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
final RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
}
위의 코드는 주석으로 조금씩 설명을 달아 놓았지만 간단히 말하자면 Redis를 사용하기 위해 설정들을 작업한 것이라고 보면된다.
@Service
@Slf4j
@RequiredArgsConstructor
@Transactional
public class NotificationService {
private final RedisOperations<String, NotificationResponseDto> eventRedisOperations;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final ObjectMapper objectMapper;
private static final long DEFAULT_TIMEOUT = 10L * 1000 * 60;
private static final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
//실질적으로 알림을 저장하고 redis 채널에 메시지를 publish하는 역할
public void send(final CreateMessageEvent createMessageEvent) {
//...
notificationRepository.save(notification);
final String id = String.valueOf(notification.getMemberId());
eventRedisOperations.convertAndSend(getChannelName(id), notification);
}
public SseEmitter subscribe(final Long memberId) throws IOException {
final String id = String.valueOf(memberId);
final SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
//초반 연결용 메시지!!
emitter.send(SseEmitter.event()
.id(id)
.name("sse"));
emitters.add(emitter);
// MessageListener 익명함수 사용해서 onMessage 구현, Redis에서 새로운 알림이 발생하면 자동적으로 onMessage가 호출
// 즉 알림을 serialize하고 해당 Client에게 알림을 전송한다.
final MessageListener messageListener = (message, pattern) -> {
final NotificationResponseDto notificationResponse = serialize(message);
sendToClient(emitter, id, notificationResponse);
};
// redisMeesageListenerContainer에 새로운 MessageListener를 추가함
redisMessageListenerContainer.addMessageListener(messageListener, ChannelTopic.of(getChannelName(id)));
// emitter의 상태를 체크함, 완료되었는지 타임아웃이 났는지
checkEmitterStatus(emitter, messageListener);
return emitter;
}
private NotificationResponseDto serialize(final Message message) {
try {
final Notification notification = this.objectMapper.readValue(message.getBody(), Notification.class);
return NotificationResponseDto.from(notification);
} catch (IOException e) {
throw new InvalidRedisMessageException(message);
}
}
// 클라이언트에게 메시지를 전달하는 부분
private void sendToClient(final SseEmitter emitter, final String id, final Object data) {
try {
emitter.send(SseEmitter.event()
.id(id)
.name("sse")
.data(data));
} catch (IOException e) {
emitters.remove(emitter);
log.error("SSE 연결이 올바르지 않습니다. 해당 memberId={}", id);
}
}
private void checkEmitterStatus(final SseEmitter emitter, final MessageListener messageListener) {
emitter.onCompletion(() -> {
emitters.remove(emitter);
redisMessageListenerContainer.removeMessageListener(messageListener);
});
emitter.onTimeout(() -> {
emitters.remove(emitter);
redisMessageListenerContainer.removeMessageListener(messageListener);
});
}
private String getChannelName(final String memberId) {
return "topics:" + memberId;
}
}
먼저 subscribe한 부분을 살펴보면 subscribe대상들의 MessageListenere들을 redisMessageListenerContainer에 넣고 관리한다.
redis에 새로운 알림이 발생하면 구독한 모든 대상에게 onMessage가 호출되는거다. 따라서 아래의 그림에 나타난 3,4번 과정이 실행되는 것이다.
또 위의 코드에서 살펴보면 send메서드에서 @Transactional(propagation = Propagation.REQUIRES_NEW)를 사용한걸 알 수 있다.
따로 트랜잭션 전파레벨을 설정한 이유는 알림이 전송이 실패했다고 기존에 작성한 메시지나 롤링페이퍼가 롤백되면 안되기 때문이다.
실시간 알림 발송은 실패해도 비즈니스 로직에는 크게 상관이 없다. 이건 어디까지나 사용자의 재미와 편의를 위한 기능일 뿐 더욱더 중요한 기능은 롤링페이퍼 작성과 메시지 작성이다.
그래서 실시간 알림 발송이 실패하더라도 기존 롤링페이퍼나 메시지 작성이 롤백되지않게 전파레벨을 REQUIRES_NEW로 설정했다.
REQUIRES_NEW는 진행중인 트랜잭션이 있다면 해당 트랜잭션을 보류하고 새로운 트랜잭션을 생성하는 것이다. 따라서 진행중인 트랜잭션에 영향이 가지 않는다.
또한, @TransactionalEventListener를 사용했는데 이것을 알기 전에 Event를 알아야 한다.
Spring Event는 Event Publisher와 Event Subscriber로 나뉜다. Event Publisher는 말 그대로 특정 이벤트를 발생시키는 역할을 수행하며, Event Subscriber는 이벤트를 처리하는 역할을 수행한다. TransactionalEventListener는 해당 트랜잭션이 Commit된 이후에 리스너가 동작하게 된다.
그렇다면 Event를 실행해서 얻는 이점은 무엇일까?
그것은 시스템의 강결합을 분리해준다.
즉 느슨한 결합상태를 만들어준다. 아래의 코드를 보면서 이야기하자.
@Service
@RequiredArgsConstructor
@Transactional
public class MessageService {
private final NotificationService notificationService;
public Long saveMessage(final MessageRequestDto messageRequestDto, final Long rollingpaperId, final Long authorId) {
//1.메시지 작성
Message message = new Message();
//2. 메시지 저장
messageRepository.save(message);
//3. 알림전송
notificationService.send();
}
}
기존의 메시지를 작성하는 부분은 메시지를 작성하고 메시지를 저장한 후 알림을 전송하는 방식이였다.
MessageService가 NotificationService를 참조하는 꼴이다. 두 서비스 사이의 강한 의존성이 생기는거다. 하지만 Spring Event를 사용한다면 두 클래스간 강결합을 끊어내고 느슨한 결합으로 만들 수 있다.
event publisher는 ApplicationEventPublisher 빈을 주입하여 publishEvent() 메서드를 통해 생성된 이벤트 객체를 넣어주면 된다.
그러면 Listener가 달린 메소드 중에 해당 이벤트 객체를 사용하는 것과 매칭이 된다. 아래의 코드를 보면 이해가 쉬울 것이다.
@Service
@RequiredArgsConstructor
@Transactional
public class MessageService {
private final ApplicationEventPublisher applicationEventPublisher;
public Long saveMessage(final MessageRequestDto messageRequestDto, final Long rollingpaperId, final Long authorId) {
//1.메시지 작성
Message message = new Message();
//2. 메시지 저장
messageRepository.save(message);
//3. 이벤트 발생
applicationEventPublisher.publishEvent(new CreateMessageEvent());
}
}
코드도 많고 설명할 부분이 많아서 설명이 길어진 것 같다.
SSE사용할 때 scale-out시 문제가 되는 부분을 해결하기 위해 Redis Pub/Sub에 대해 설명했다.
해당 코드는 여기에서 찾아 볼 수 있다.
마지막으로 아래는 실제 실시간 알림이 오는 화면이다.
테스트코드 성능 개선하기! (0) | 2023.03.20 |
---|---|
Certbot과 nginx로 https 적용하기! (0) | 2023.02.03 |