Springboot 에서 WebSocket 통신하기

Javascript 기반의 Socket Server 를 Java 기반으로 새로 만들어야 했던 일이 있었다.
기존 NestJS와 + SocketIO 기반으로 만들어진 Socket Server 가 있었는데, 어떤 어려움이 있었고 이를 어떻게 해결하였는지에 관한 기록이다.

문제 인식

기존에 JavaScript 기반으로 만들어진 서버를 언어까지 바꿔가면서 통으로 리팩토링 했던 이유는 크게 두가지였다.

  1. Javascript 미숙 : 현재 나는 Javascript 를 작은 토이 프로젝트에서나 사용해봤지, 실무에서 직접 사용한 경험이 없어 절대적으로 숙련도가 부족한 상황이었다. 계속해서 유지보수 해야 하는데 유지보수하는데 자신이 많이 없었다.
  2. Socket Server 라는 Application 의 모호성 : 개발 당시에는 Socket 통신을 할 여지가 많지 않아, 아주 큰 특정 Json return을 나눠서 쪼개 보내기 위한 용도로만 Socket Server 를 사용했다. 그런데, 추가적인 요구사항을 맞추다 보니, 다른 Application 에서도 Socket 통신을 해야할 여지가 생겨나게 되었는데 그렇게 만들고 나니 Socket Server 라는 도메인이 너무나도 모호했다.

다른 Application 에서 Socket 을 이용해서 통신하고 싶으면, Kafka를 통해 Message 를 전송하고 해당 Message 를 Socket Server 가 구독하여 클라이언트에게 전달하는 구조가 되어버렸다.
이러한 상황은 네트워크 비용을 상승시키는 구조라고 생각했고, 그 과정에서 Socket Server 가 아닌 명확한 도메인을 가진 Application Server 로 리팩토링하기로 결정했다. 그렇게 결정하고 나니, 단지 SocketIO 를 쉽고 빠르게 사용할 수 있는 예제가 많아서 유일하게 JavaScript로 개발된 Application 을 고집할 이유가 없어졌고, 이미 여러가지 모듈이 잘 만들어진 Java와 Spring 기반으로 리팩토링해서 다른 Application 에도 쉽게 적용할 수 있도록 만드는것이 나은 선택이라고 생각했다.

또한 SocketIO 의 Text body에 특정한 규격을 만들기 위해 이런 저런 정보들을 추가하면서, 데이터 파싱 및 가공하는데 너무 반복하는 일이 많다고도 느껴졌다.

이런 문제들을 겪다가, 리팩토링 하는 기간이 있을 때 Javascript 로 만들어진 Socket Server 를 완전히 Java + Spring Framework로 구현된 프로젝트로 이관하게 되었다.

STOMP

들어가기 이전에, STOMP 에 대해서 먼저 알아보자.
STOMP (Simple/Stream Text Oriented Message Protocol) 는 TCP 나 WebSocket 같은 신뢰성 있는 양방향 연결 위에서 작동하는 Text( 왔다갔다 할 수 있는 이 데이터를 Frame 이라고 한다 ) 기반( 혹은 Binary )의 Pub – Sub 방식의 메시지 프로토콜을 말한다.

즉 메세징 전송을 효율적을 하기 위해 탄생한 프로토콜이고, 기본적으로 pub/sub 구조로 되어있어 메세지를 전송하고 메세지를 받아 처리하는 부분이 확실히 정해져 있기 때문에 개발자 입장에서 명확하게 인지하고 개발할 수 있는 이점이 있다. Stomp 프로토콜은 WebSocket 위에서 동작하는 프로토콜로써 클라이언트와 서버가 전송할 메세지의 유형, 형식, 내용들을 정의하는 매커니즘이다.

또한, STOMP를 이용하면 메세지의 헤더에 값을 줄 수 있어 헤더 값을 기반으로 통신 시 인증 처리를 구현하는 것도 가능하며 프로토콜이 정의한 규칙만 잘 지키면 여러 언어 및 플랫폼 간 메세지를 상호 운영할 수 있다.
STOMP Frame 또한 Text, Binary를 변환하는 약속이기 때문에 자세한 명세가 있다.

Stomp Frame 가 어떻게 생겼는지 살펴보자.

COMMAND
header1:value1
header2:value2

Body^@

여기에서 COMMAND 는 클라이언트가 STOMP 프로토콜을 이용해 보낼 때, 나타낼 수 있는 메시지의 의도를 나타낸다.
어떤 Command 들이 있는지 링크의 글중 일부를 발췌해서 보자.

전체 14가지( WebSocket 연결 2가지, Client요청 9가지, Server의 응답 3가지 )의 Command 에 대해 어떻게 작동할 지에 대해 정의하면 된다.
클라이언트의 측에서 연결 2가지를 포함한 11가지 요청 외에 다른 요청이 들어오면, 서버는 모두 Error 로 응답하면 됨을 알 수 있다.

Spring Web Socket

Spring Websocket 은 STOMP 프로토콜을 지원하는 WebSocket 을 쉽게 컨트롤 할 수 있게 도와준다. 코드로 직접 한번 보자.

dependencies{
    //org.springframework.kafka:spring-kafka 를 포함하고 있다.
    implementation project(":kafka")// kafka 관련 Module Project 이다. Producer, Listener.. 등등이 포함된다.
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
}
@Component
public class StompHandler implements ChannelInterceptor {
    ...    

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor headerAccessor = this.wrapToStompHeaderAccessor(message);
        String authorizationHeader = String.valueOf(headerAccessor.getNativeHeader("Authorization"));
        StompCommand stompCommand = this.parseStompCommand(accessor);
        this.validator.validate(authorizationHeader, stompCommand);
        ...
        return message;
    }


    @EventListener
    public void brokerAvailabilityEvent(BrokerAvailabilityEvent event) {
        ...
    }
    
    @EventListener
    public void sessionConnectEvent(SessionConnectEvent event){
        StompHeaderAccessor stompHeaderAccessor = this.wrapToStompHeaderAccessor(event);
        //SimpHeaderAccessor simpleHeaderAccessor = this.wrapToSimpleHeaderAccessor(event);
        ...
    }
    
    @EventListener
    public void sessionConnectedEvent(SessionConnectedEvent event) {
        ...
    }

    @EventListener
    public void sessionSubscribeEvent(SessionSubscribeEvent event) {
        ...
    }

    @EventListener
    public void SessionUnsubscribeEvent(SessionUnsubscribeEvent event) {
        ...
    }

    @EventListener
    public void sessionDisconnectEvent(SessionDisconnectEvent event) {
        ...
    }
    
    private StompHeaderAccessor wrapToStompHeaderAccessor(AbstractSubProtocolEvent event){
        return this.wrapToStompHeaderAccessor(event.getMessage());
    }

    private StompHeaderAccessor wrapToStompHeaderAccessor(Message<?> message){
        return StompHeaderAccessor.wrap(message);
    }

    private SimpMessageHeaderAccessor wrapToSimpleHeaderAccessor(AbstractSubProtocolEvent event){
        return SimpMessageHeaderAccessor.wrap(event.getMessage());
    }

    private StompCommand parseStompCommand(StompHeaderAccessor accessor){
        return accessor.getCommand();
    }
}

Spring Websocket 을 이용하면 ChannelInterceptor 라는 인터페이스를 상속받아서 웹소켓 연결이 성립될 때와 끊길 때 추가적인 작업을 반복적인 코드 작성 없이 손쉽게 만들 수 있다. 또한 Spring의 ApplicationListener 를 이용하면 손쉽게 WebSocket과 관련된 이벤트를 처리할 수 있다. 어떤 이벤트를 처리할 수 있는지 살펴보자.

  1. BrokerAvailabilityEvent : Message Broker 의 상태( 사용 가능, 사용 불가능 )가 변경될 때 이벤트가 발생된다. 브로커가 Available -> UnAvailable 로 변환되는 경우도, UnAvailable -> Available 로 변환되는 두가지 경우 모두 해당된다.
  2. SessionConnectEvent : 새 STOMP CONNECT가 수신되면 게시되어 새 Client의 시작을 나타낸다.
  3. SessionConnectedEvent : SessionConnectEvent 직후에 브로커가 CONNECT에 응답하여 STOMP CONNECTED 프레임을 보낸 경우에 게시된다. 이 이벤트를 받은 시점에 STOMP 세션은 완전히 설정된 것으로 간주할 수 있다.
  4. SessionSubscribeEvent : Stomp SUBSCRIBE 이벤트가 수신되면 게시된다.
  5. SessionUnSubscribeEvent : Stomp UNSUBSCRIBE 이벤트가 수신되면 게시된다.
  6. SessionDisconnectedEvent : Stomp DISCONNECT 이벤트가 수신되면 게시된다.

우선 “BrokerAvailabilityEvent” 를 제외한 5가지 이벤트들은 AbstractSubProtocolEvent 라는 추상클래스를 상속받아 만드는 이벤트들이다.
그리고 이 AbstractSubProtocol 은 Spring에서 Event 를 다루기 위한 기본이 되는 ApplicationEvent 를 상속받아서 만들어진다.

왜 이렇게 나뉘어져 있을까?
그 이유는 Broker 가 의도치 않게 브로커가 종료되는 경우는 DISCONNECT Command 를 이용해 정상적으로 종료되지 않게 되고, 이는 비정상적 종료에 해당한다. 때문에 STOMP Message 자체가 없을 수 있다는 것이다.

//org.springframework.web.socket.messaging.AbstractSubProtocolEvent.java

public abstract class AbstractSubProtocolEvent extends ApplicationEvent {
    private final Message<byte[]> message; //Message 라는 Websocket 관련 도메인 정보가 포함되어 있다.
    @Nullable
    private final Principal user;

    protected AbstractSubProtocolEvent(Object source, Message<byte[]> message) {
        this(source, message, (Principal)null);
    }

    protected AbstractSubProtocolEvent(Object source, Message<byte[]> message, @Nullable Principal user) {
        super(source);
        Assert.notNull(message, "Message must not be null");
        this.message = message;
        this.user = user;
    }

    public Message<byte[]> getMessage() {
        return this.message;
    }

    @Nullable
    public Principal getUser() {
        return this.user;
    }

    public String toString() {
        String var10000 = this.getClass().getSimpleName();
        return var10000 + "[" + this.message + "]";
    }
}

여기서 보면 알겠지만, BrokerAvailability 를 제외한 Spring 에서 지원하는 나머지 5가지는 모두 대응되는 Command 가 있고, 그 말은 즉 Body 와 Header 를 가지고 있는 데이터를 말한다.
그 때문에, 의도치 않게 종료되었거나, 다시 연결이 돌아왔을 때 발생되는 BrokerAvailabilityEvent 는 message 라는 STOMP, Websocket 관련 도메인 정보( Frame )가 없기 때문에 Spring Websocket 에서 ApplicationEvent 를 확장시킨 AbstractSubProtocol 이 아닌 Spring Core 의 ApplicationEvent 를 확장해 만들어진 것이다.

이제 Message의 Frame 을 파싱하는 과정에서 이 Message 가 STOMP 프로토콜을 따른다면 StompHeaderAccessor 를 이용해서 Command 와 각종 정보들을 파싱하면 되고, STOMP 가 아닌, 다른 General 한 형식을 따른다면 SimpHeaderAccessor 를 통해 헤더 정보를 파싱한다.

추가적으로 StompCommand 는 다음과 같다.

public enum StompCommand {
    STOMP(SimpMessageType.CONNECT),
    CONNECT(SimpMessageType.CONNECT),
    DISCONNECT(SimpMessageType.DISCONNECT),
    SUBSCRIBE(SimpMessageType.SUBSCRIBE, true, true, false),
    UNSUBSCRIBE(SimpMessageType.UNSUBSCRIBE, false, true, false),
    SEND(SimpMessageType.MESSAGE, true, false, true),
    ACK(SimpMessageType.OTHER),
    NACK(SimpMessageType.OTHER),
    BEGIN(SimpMessageType.OTHER),
    COMMIT(SimpMessageType.OTHER),
    ABORT(SimpMessageType.OTHER),
    CONNECTED(SimpMessageType.OTHER),
    RECEIPT(SimpMessageType.OTHER),
    MESSAGE(SimpMessageType.MESSAGE, true, true, true),
    ERROR(SimpMessageType.OTHER, false, false, true);

    private final SimpMessageType messageType;
    private final boolean destination;
    private final boolean subscriptionId;
    private final boolean body;
...
}

public enum SimpMessageType {
    CONNECT,
    CONNECT_ACK,
    MESSAGE,
    SUBSCRIBE,
    UNSUBSCRIBE,
    HEARTBEAT,
    DISCONNECT,
    DISCONNECT_ACK,
    OTHER;

    private SimpMessageType() {
    }
}

Stomp 는 General 한 Message 인 Simple Message를 확장시켜 만들기 때문에, StompCommand 또한 정의되어 있는 SimpMessageType 으로부터 확장시켜 정의한다. ( 추가적으로 Spring 또한 STOMP 프로토콜의 명세에 따라 총 14가지 Command 를 정의하고 있음을 알 수 있다. )

그리고 presend( message, channel ) 함수에서 메시지의 헤더에 접근하고, STOMP Frame 의 Command를 확인하여 해당 명령어에 맞는 처리를 수행하면 된다.

  • 여기서 추가적으로 HandShakeInterceptor 라는 인터페이스를 구현해서 ws로 통신 프로토콜을 업그레이드 하기 이전, Http Handshake 과정이 일어날 때 토큰을 검증하는 방법으로도 구현할 수 있다. 자세한 내용은 이 블로그 에 자세하게 설명되어 있다.

Servlet 기반에서 한번 Websocket 을 구성해보자. ( Webflux 를 사용한다면 Docs 를 참고하자. )

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;
    
    @Value("...")
    private String SOCKET_ENDPOINT;
    @Value("...")
    private String APPLICATION_DESTINATION_PREFIX;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint(SOCKET_ENDPOINT)
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic","/queue");
//        registry.enableStompBrokerRelay("/test")
//                .setRelayHost("localhost")
//                .setRelayPort(61613)
//                .setClientLogin("admin")
//                .setClientPasscode("password");
        registry.setApplicationDestinationPrefixes(APPLICATION_DESTINATION_PREFIX);
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(512 * 1024);
        registry.setSendTimeLimit(10 * 1000);
        registry.setSendBufferSizeLimit(512 * 1024);
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(this.stompHandler);
    }
}

실제 WebSocketMessageBroker 를 설정하는 부분이다.
클라이언트에서 서버로 오는 Inbound Channel 에 아까 만든 ChannelInterceptor 를 등록하고, Prefix 등을 설정해주면 된다.

여기서 registry.enableSimpleBroker 를 활성화시키면 InMemoryBroker 를 통해 메시지가 전달되게 된다. 만약, 이부분에서 사용하려는 외부 브로커가 STOMP 를 Native로 지원한다면 ( ex, ActiveMQ ) 여기서 enableSimpleBroker 가 아닌 enableStompBrokerRelay 를 이용해주면 된다.

만약 그렇지 않다면 Simple In-memory broker 를 사용하고 @MessageMapping 을 통해 매핑된 핸들러에서 외부 브로커에 메시지를 Producing 해주는 과정이 추가적으로 포함되어야 한다.

  • 일반적인 경우는 SimpleMessageBroker 하나로 처리가 가능하나, 반드시 외부 브로커가 필요한 경우가 생길 수 있다. 예를들면 동일한 채팅방 역할을 하는 Application 이 Scaling 되면서 2개로 늘어났다고 생각해보자. 그러면 In-Memory Broker 가 각각 2개가 독립적으로 생겨나게 된다. 당연하게도 이 둘은 서로 독립적인 broker 를 사용하기 때문에 같은 Topic 을 구독하고 있는 Subscriber 모두에게 전달하지 못하는 경우가 생겨난다.
    따라서, 이 문제를 해결하려면 두 어플리케이션 모두에게 데이터를 공유해줄 수 있는 별도의 외부 브로커가 필요하게 된다.

이제 메시지를 읽어들여보자. 이 예제는 Docs 를 참고했다.

@Controller
@RequiredArgsConstructor
public class GreetingController {
    private final SimpMessagingTemplate template;
    private final KafkaProducer kafkaProducer; // 외부 브로커, Kafka 를 이용한다.
    private final AwesomeService service; // 비즈니스로직 처리.

    @MessageMapping("/topic")
    //@SendTo("/topic/greetings")
    //@SendToUser("/topic/greetings")
    //public String greet(StompMessageDto stompMessage) {
    public void greet(StompMessageDto stompMessage) {
        this.service.handleMessage(stompMessage);
        //return stompMessage.textMessage; // 1)SendTo 어노테이션과 함께 사용할 경우.
        //this.template.convertAndSend("/topic/greetings", stompMessage.textMessage); // 2) SendTo 없이 직접 전달.
        this.kafkaProducer.produce("/topic/greetings", stompMessage);// 3) 외부 브로커에게 전달.
    }
}

@RequiredArgsConstructor
public class KafkaConsumer{
    private final SimpMessagingTemplate template;
    ...

    @Value("...")
    private String topic;

    @KafkaListener( topics = topic, containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, Message> record){
        String message = record.value().textMessage;
        this.service.handleMessage(record.value());
        this.template.convertAndSend(topic, message);
    }

    //Reactor로 만들면..
    @KafkaListener( topics = "topic", containerFactory = "concurrentKafkaListenerContainerFactory")
    public Mono<Void> consume(ConsumerRecord<String, Message> record){
        return Mono.just(record.value())
                .mapNotNull(message -> {
                    this.service.handleMessage(message);
                    this.template.convertAndSend("topic", message.textMessage);
                    return null;
                });
    }
}

@MessageMapping 어노테이션을 통해 발행된 메시지를 읽어들이고, @Send( Topic 에 Publish, 1:N ) 와 @SendToUser ( 특정 유저에게 전달, 1:1 ) 를 이용해서 메시지를 전달할 수 있다. 혹은 단순하게 SimpMessagingTemplate 를 통해 convertAndSend 를 통해 데이터를 전송할수도 있다.
나의 경우에는, Message 를 SimpleMessageBroker 가 아닌 외부 브로커를 통해 전달하기 때문에, Kafka 를 이용해서 메시지를 전달해주었고, 해당 Topic 을 Kafka로 부터 구독하여 다시 자신의 로컬의 In-Memory Broker 를 통해 전달하여 문제를 해결했다.

마치며

Spring Framework 의 Web socket server 로 이관하면서 얻은 이점이 몇가지 있다.

  1. Kafka 설정 : 이미 다른 Application에서 사용중인 Kafka 관련 모듈을 그대로 import 하여 손쉽게 사용할 수 있었다.
  2. Interceptor 적용 : ChannelHandler를 통해 소켓이 연결될 때, 연결이 끊어질 때 Interceptor 를 통해 추가적인 작업을 반복적인 코드 작성 없이 쉽게 할 수 있었다.

그리고 STOMP 프로토콜, WebSocket, 외부 브로커의 관계에 대해 좀 더 이해하게 된 것 같다.

Leave a Comment