Javascript 기반의 Socket Server 를 Java 기반으로 새로 만들어야 했던 일이 있었다.
기존 NestJS와 + SocketIO 기반으로 만들어진 Socket Server 가 있었는데, 어떤 어려움이 있었고 이를 어떻게 해결하였는지에 관한 기록이다.
문제 인식
기존에 JavaScript 기반으로 만들어진 서버를 언어까지 바꿔가면서 통으로 리팩토링 했던 이유는 크게 두가지였다.
- Javascript 미숙 : 현재 나는 Javascript 를 작은 토이 프로젝트에서나 사용해봤지, 실무에서 직접 사용한 경험이 없어 절대적으로 숙련도가 부족한 상황이었다. 계속해서 유지보수 해야 하는데 유지보수하는데 자신이 많이 없었다.
- Socket Server 라는 Application 의 모호성 : 개발 당시에는 Socket 통신을 할 여지가 많지 않아, 아주 큰 특정 Json return을 나눠서 쪼개 보내기 위한 용도로만 Socket Server 를 사용했다. 그런데, 추가적인 요구사항을 맞추다 보니, 다른 Application 에서도 Socket 통신을 해야할 여지가 생겨나게 되었는데 그렇게 만들고 나니 Socket Server 라는 도메인이 너무나도 모호했다.
다른 Application 에서 Socket 을 이용해서 통신하고 싶으면, Kafka를 통해 Message 를 전송하고 해당 Message 를 Socket Server 가 구독하여 클라이언트에게 전달하는 구조가 되어버렸다.
이러한 상황은 네트워크 비용을 상승시키는 구조라고 생각했고, 그 과정에서 Socket Server 가 아닌 명확한 도메인을 가진 Application Server 로 리팩토링하기로 결정했다. 그렇게 결정하고 나니, 단지 SocketIO 를 쉽고 빠르게 사용할 수 있는 예제가 많아서 유일하게 JavaScript로 개발된 Application 을 고집할 이유가 없어졌고, 이미 여러가지 모듈이 잘 만들어진 Java와 Spring 기반으로 리팩토링해서 다른 Application 에도 쉽게 적용할 수 있도록 만드는것이 나은 선택이라고 생각했다.
또한 SocketIO 의 Text body에 특정한 규격을 만들기 위해 이런 저런 정보들을 추가하면서, 데이터 파싱 및 가공하는데 너무 반복하는 일이 많다고도 느껴졌다.
이런 문제들을 겪다가, 리팩토링 하는 기간이 있을 때 Javascript 로 만들어진 Socket Server 를 완전히 Java + Spring Framework로 구현된 프로젝트로 이관하게 되었다.
STOMP
들어가기 이전에, STOMP 에 대해서 먼저 알아보자.
STOMP (Simple/Stream Text Oriented Message Protocol) 는 TCP 나 WebSocket 같은 신뢰성 있는 양방향 연결 위에서 작동하는 Text( 왔다갔다 할 수 있는 이 데이터를 Frame 이라고 한다 ) 기반( 혹은 Binary )의 Pub – Sub 방식의 메시지 프로토콜을 말한다.
즉 메세징 전송을 효율적을 하기 위해 탄생한 프로토콜이고, 기본적으로 pub/sub 구조로 되어있어 메세지를 전송하고 메세지를 받아 처리하는 부분이 확실히 정해져 있기 때문에 개발자 입장에서 명확하게 인지하고 개발할 수 있는 이점이 있다. Stomp 프로토콜은 WebSocket 위에서 동작하는 프로토콜로써 클라이언트와 서버가 전송할 메세지의 유형, 형식, 내용들을 정의하는 매커니즘이다.
또한, STOMP를 이용하면 메세지의 헤더에 값을 줄 수 있어 헤더 값을 기반으로 통신 시 인증 처리를 구현하는 것도 가능하며 프로토콜이 정의한 규칙만 잘 지키면 여러 언어 및 플랫폼 간 메세지를 상호 운영할 수 있다.
STOMP Frame 또한 Text, Binary를 변환하는 약속이기 때문에 자세한 명세가 있다.
Stomp Frame 가 어떻게 생겼는지 살펴보자.
COMMAND
header1:value1
header2:value2
Body^@
여기에서 COMMAND 는 클라이언트가 STOMP 프로토콜을 이용해 보낼 때, 나타낼 수 있는 메시지의 의도를 나타낸다.
어떤 Command 들이 있는지 링크의 글중 일부를 발췌해서 보자.
전체 14가지( WebSocket 연결 2가지, Client요청 9가지, Server의 응답 3가지 )의 Command 에 대해 어떻게 작동할 지에 대해 정의하면 된다.
클라이언트의 측에서 연결 2가지를 포함한 11가지 요청 외에 다른 요청이 들어오면, 서버는 모두 Error 로 응답하면 됨을 알 수 있다.
Spring Web Socket
Spring Websocket 은 STOMP 프로토콜을 지원하는 WebSocket 을 쉽게 컨트롤 할 수 있게 도와준다. 코드로 직접 한번 보자.
dependencies{
//org.springframework.kafka:spring-kafka 를 포함하고 있다.
implementation project(":kafka")// kafka 관련 Module Project 이다. Producer, Listener.. 등등이 포함된다.
implementation 'org.springframework.boot:spring-boot-starter-websocket'
}
@Component
public class StompHandler implements ChannelInterceptor {
...
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor headerAccessor = this.wrapToStompHeaderAccessor(message);
String authorizationHeader = String.valueOf(headerAccessor.getNativeHeader("Authorization"));
StompCommand stompCommand = this.parseStompCommand(accessor);
this.validator.validate(authorizationHeader, stompCommand);
...
return message;
}
@EventListener
public void brokerAvailabilityEvent(BrokerAvailabilityEvent event) {
...
}
@EventListener
public void sessionConnectEvent(SessionConnectEvent event){
StompHeaderAccessor stompHeaderAccessor = this.wrapToStompHeaderAccessor(event);
//SimpHeaderAccessor simpleHeaderAccessor = this.wrapToSimpleHeaderAccessor(event);
...
}
@EventListener
public void sessionConnectedEvent(SessionConnectedEvent event) {
...
}
@EventListener
public void sessionSubscribeEvent(SessionSubscribeEvent event) {
...
}
@EventListener
public void SessionUnsubscribeEvent(SessionUnsubscribeEvent event) {
...
}
@EventListener
public void sessionDisconnectEvent(SessionDisconnectEvent event) {
...
}
private StompHeaderAccessor wrapToStompHeaderAccessor(AbstractSubProtocolEvent event){
return this.wrapToStompHeaderAccessor(event.getMessage());
}
private StompHeaderAccessor wrapToStompHeaderAccessor(Message<?> message){
return StompHeaderAccessor.wrap(message);
}
private SimpMessageHeaderAccessor wrapToSimpleHeaderAccessor(AbstractSubProtocolEvent event){
return SimpMessageHeaderAccessor.wrap(event.getMessage());
}
private StompCommand parseStompCommand(StompHeaderAccessor accessor){
return accessor.getCommand();
}
}
Spring Websocket 을 이용하면 ChannelInterceptor 라는 인터페이스를 상속받아서 웹소켓 연결이 성립될 때와 끊길 때 추가적인 작업을 반복적인 코드 작성 없이 손쉽게 만들 수 있다. 또한 Spring의 ApplicationListener 를 이용하면 손쉽게 WebSocket과 관련된 이벤트를 처리할 수 있다. 어떤 이벤트를 처리할 수 있는지 살펴보자.
- BrokerAvailabilityEvent : Message Broker 의 상태( 사용 가능, 사용 불가능 )가 변경될 때 이벤트가 발생된다. 브로커가 Available -> UnAvailable 로 변환되는 경우도, UnAvailable -> Available 로 변환되는 두가지 경우 모두 해당된다.
- SessionConnectEvent : 새 STOMP CONNECT가 수신되면 게시되어 새 Client의 시작을 나타낸다.
- SessionConnectedEvent : SessionConnectEvent 직후에 브로커가 CONNECT에 응답하여 STOMP CONNECTED 프레임을 보낸 경우에 게시된다. 이 이벤트를 받은 시점에 STOMP 세션은 완전히 설정된 것으로 간주할 수 있다.
- SessionSubscribeEvent : Stomp SUBSCRIBE 이벤트가 수신되면 게시된다.
- SessionUnSubscribeEvent : Stomp UNSUBSCRIBE 이벤트가 수신되면 게시된다.
- SessionDisconnectedEvent : Stomp DISCONNECT 이벤트가 수신되면 게시된다.
우선 “BrokerAvailabilityEvent” 를 제외한 5가지 이벤트들은 AbstractSubProtocolEvent 라는 추상클래스를 상속받아 만드는 이벤트들이다.
그리고 이 AbstractSubProtocol 은 Spring에서 Event 를 다루기 위한 기본이 되는 ApplicationEvent 를 상속받아서 만들어진다.
왜 이렇게 나뉘어져 있을까?
그 이유는 Broker 가 의도치 않게 브로커가 종료되는 경우는 DISCONNECT Command 를 이용해 정상적으로 종료되지 않게 되고, 이는 비정상적 종료에 해당한다. 때문에 STOMP Message 자체가 없을 수 있다는 것이다.
//org.springframework.web.socket.messaging.AbstractSubProtocolEvent.java
public abstract class AbstractSubProtocolEvent extends ApplicationEvent {
private final Message<byte[]> message; //Message 라는 Websocket 관련 도메인 정보가 포함되어 있다.
@Nullable
private final Principal user;
protected AbstractSubProtocolEvent(Object source, Message<byte[]> message) {
this(source, message, (Principal)null);
}
protected AbstractSubProtocolEvent(Object source, Message<byte[]> message, @Nullable Principal user) {
super(source);
Assert.notNull(message, "Message must not be null");
this.message = message;
this.user = user;
}
public Message<byte[]> getMessage() {
return this.message;
}
@Nullable
public Principal getUser() {
return this.user;
}
public String toString() {
String var10000 = this.getClass().getSimpleName();
return var10000 + "[" + this.message + "]";
}
}
여기서 보면 알겠지만, BrokerAvailability 를 제외한 Spring 에서 지원하는 나머지 5가지는 모두 대응되는 Command 가 있고, 그 말은 즉 Body 와 Header 를 가지고 있는 데이터를 말한다.
그 때문에, 의도치 않게 종료되었거나, 다시 연결이 돌아왔을 때 발생되는 BrokerAvailabilityEvent 는 message 라는 STOMP, Websocket 관련 도메인 정보( Frame )가 없기 때문에 Spring Websocket 에서 ApplicationEvent 를 확장시킨 AbstractSubProtocol 이 아닌 Spring Core 의 ApplicationEvent 를 확장해 만들어진 것이다.
이제 Message의 Frame 을 파싱하는 과정에서 이 Message 가 STOMP 프로토콜을 따른다면 StompHeaderAccessor 를 이용해서 Command 와 각종 정보들을 파싱하면 되고, STOMP 가 아닌, 다른 General 한 형식을 따른다면 SimpHeaderAccessor 를 통해 헤더 정보를 파싱한다.
추가적으로 StompCommand 는 다음과 같다.
public enum StompCommand {
STOMP(SimpMessageType.CONNECT),
CONNECT(SimpMessageType.CONNECT),
DISCONNECT(SimpMessageType.DISCONNECT),
SUBSCRIBE(SimpMessageType.SUBSCRIBE, true, true, false),
UNSUBSCRIBE(SimpMessageType.UNSUBSCRIBE, false, true, false),
SEND(SimpMessageType.MESSAGE, true, false, true),
ACK(SimpMessageType.OTHER),
NACK(SimpMessageType.OTHER),
BEGIN(SimpMessageType.OTHER),
COMMIT(SimpMessageType.OTHER),
ABORT(SimpMessageType.OTHER),
CONNECTED(SimpMessageType.OTHER),
RECEIPT(SimpMessageType.OTHER),
MESSAGE(SimpMessageType.MESSAGE, true, true, true),
ERROR(SimpMessageType.OTHER, false, false, true);
private final SimpMessageType messageType;
private final boolean destination;
private final boolean subscriptionId;
private final boolean body;
...
}
public enum SimpMessageType {
CONNECT,
CONNECT_ACK,
MESSAGE,
SUBSCRIBE,
UNSUBSCRIBE,
HEARTBEAT,
DISCONNECT,
DISCONNECT_ACK,
OTHER;
private SimpMessageType() {
}
}
Stomp 는 General 한 Message 인 Simple Message를 확장시켜 만들기 때문에, StompCommand 또한 정의되어 있는 SimpMessageType 으로부터 확장시켜 정의한다. ( 추가적으로 Spring 또한 STOMP 프로토콜의 명세에 따라 총 14가지 Command 를 정의하고 있음을 알 수 있다. )
그리고 presend( message, channel ) 함수에서 메시지의 헤더에 접근하고, STOMP Frame 의 Command를 확인하여 해당 명령어에 맞는 처리를 수행하면 된다.
- 여기서 추가적으로 HandShakeInterceptor 라는 인터페이스를 구현해서 ws로 통신 프로토콜을 업그레이드 하기 이전, Http Handshake 과정이 일어날 때 토큰을 검증하는 방법으로도 구현할 수 있다. 자세한 내용은 이 블로그 에 자세하게 설명되어 있다.
Servlet 기반에서 한번 Websocket 을 구성해보자. ( Webflux 를 사용한다면 Docs 를 참고하자. )
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Value("...")
private String SOCKET_ENDPOINT;
@Value("...")
private String APPLICATION_DESTINATION_PREFIX;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(SOCKET_ENDPOINT)
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic","/queue");
// registry.enableStompBrokerRelay("/test")
// .setRelayHost("localhost")
// .setRelayPort(61613)
// .setClientLogin("admin")
// .setClientPasscode("password");
registry.setApplicationDestinationPrefixes(APPLICATION_DESTINATION_PREFIX);
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(512 * 1024);
registry.setSendTimeLimit(10 * 1000);
registry.setSendBufferSizeLimit(512 * 1024);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(this.stompHandler);
}
}
실제 WebSocketMessageBroker 를 설정하는 부분이다.
클라이언트에서 서버로 오는 Inbound Channel 에 아까 만든 ChannelInterceptor 를 등록하고, Prefix 등을 설정해주면 된다.
여기서 registry.enableSimpleBroker 를 활성화시키면 InMemoryBroker 를 통해 메시지가 전달되게 된다. 만약, 이부분에서 사용하려는 외부 브로커가 STOMP 를 Native로 지원한다면 ( ex, ActiveMQ ) 여기서 enableSimpleBroker 가 아닌 enableStompBrokerRelay 를 이용해주면 된다.
만약 그렇지 않다면 Simple In-memory broker 를 사용하고 @MessageMapping 을 통해 매핑된 핸들러에서 외부 브로커에 메시지를 Producing 해주는 과정이 추가적으로 포함되어야 한다.
- 일반적인 경우는 SimpleMessageBroker 하나로 처리가 가능하나, 반드시 외부 브로커가 필요한 경우가 생길 수 있다. 예를들면 동일한 채팅방 역할을 하는 Application 이 Scaling 되면서 2개로 늘어났다고 생각해보자. 그러면 In-Memory Broker 가 각각 2개가 독립적으로 생겨나게 된다. 당연하게도 이 둘은 서로 독립적인 broker 를 사용하기 때문에 같은 Topic 을 구독하고 있는 Subscriber 모두에게 전달하지 못하는 경우가 생겨난다.
따라서, 이 문제를 해결하려면 두 어플리케이션 모두에게 데이터를 공유해줄 수 있는 별도의 외부 브로커가 필요하게 된다.
이제 메시지를 읽어들여보자. 이 예제는 Docs 를 참고했다.
@Controller
@RequiredArgsConstructor
public class GreetingController {
private final SimpMessagingTemplate template;
private final KafkaProducer kafkaProducer; // 외부 브로커, Kafka 를 이용한다.
private final AwesomeService service; // 비즈니스로직 처리.
@MessageMapping("/topic")
//@SendTo("/topic/greetings")
//@SendToUser("/topic/greetings")
//public String greet(StompMessageDto stompMessage) {
public void greet(StompMessageDto stompMessage) {
this.service.handleMessage(stompMessage);
//return stompMessage.textMessage; // 1)SendTo 어노테이션과 함께 사용할 경우.
//this.template.convertAndSend("/topic/greetings", stompMessage.textMessage); // 2) SendTo 없이 직접 전달.
this.kafkaProducer.produce("/topic/greetings", stompMessage);// 3) 외부 브로커에게 전달.
}
}
@RequiredArgsConstructor
public class KafkaConsumer{
private final SimpMessagingTemplate template;
...
@Value("...")
private String topic;
@KafkaListener( topics = topic, containerFactory = "concurrentKafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, Message> record){
String message = record.value().textMessage;
this.service.handleMessage(record.value());
this.template.convertAndSend(topic, message);
}
//Reactor로 만들면..
@KafkaListener( topics = "topic", containerFactory = "concurrentKafkaListenerContainerFactory")
public Mono<Void> consume(ConsumerRecord<String, Message> record){
return Mono.just(record.value())
.mapNotNull(message -> {
this.service.handleMessage(message);
this.template.convertAndSend("topic", message.textMessage);
return null;
});
}
}
@MessageMapping 어노테이션을 통해 발행된 메시지를 읽어들이고, @Send( Topic 에 Publish, 1:N ) 와 @SendToUser ( 특정 유저에게 전달, 1:1 ) 를 이용해서 메시지를 전달할 수 있다. 혹은 단순하게 SimpMessagingTemplate 를 통해 convertAndSend 를 통해 데이터를 전송할수도 있다.
나의 경우에는, Message 를 SimpleMessageBroker 가 아닌 외부 브로커를 통해 전달하기 때문에, Kafka 를 이용해서 메시지를 전달해주었고, 해당 Topic 을 Kafka로 부터 구독하여 다시 자신의 로컬의 In-Memory Broker 를 통해 전달하여 문제를 해결했다.
마치며
Spring Framework 의 Web socket server 로 이관하면서 얻은 이점이 몇가지 있다.
- Kafka 설정 : 이미 다른 Application에서 사용중인 Kafka 관련 모듈을 그대로 import 하여 손쉽게 사용할 수 있었다.
- Interceptor 적용 : ChannelHandler를 통해 소켓이 연결될 때, 연결이 끊어질 때 Interceptor 를 통해 추가적인 작업을 반복적인 코드 작성 없이 쉽게 할 수 있었다.
그리고 STOMP 프로토콜, WebSocket, 외부 브로커의 관계에 대해 좀 더 이해하게 된 것 같다.