[MSA] (2) MSA 구성요소 – Saga 구현

MSA 구조는 장점만 있는 완벽한 구조가 아니다. 실제로 잘 갖추어진 환경을 만들려면 많은 것들을 준비해야 한다. 이번 글은 그 구성요소들과 시스템에 대한 고민을 정리한 글이다.

MSA 를 구현하려면

완벽한 시스템, 서비스는 없다. 도메인을 작은 단위로 분해하게 되면 각 서비스를 개발하거나 유지보수할 때, 확장성에서는 분명한 장점을 갖는다. 그러나 서비스를 나누었기 때문에 발생하는 문제점들도 분명히 있다. 예를 들면 Transaction 이나 다른 서비스를 호출할 때 해당 서비스가 너무 바쁘거나, 혹은 장애가 발생하였을 때든 어떤 이유로 Error 를 받게 되었을 경우 연쇄적으로 장애를 일으킬수도 있다. 또한 많은 서비스가 관여하는 비즈니스 로직은 추적하기가 어렵기도 하다. 물론 Transaction 은 2PC 부터 현재 많이 사용되는 Saga를 이용하여 핸들링하고, 장애 대응은 Circuit breaker Pattern(회로 차단 패턴) 을 이용한 장애 대응이 도움이 될 수 있다. 이런 대응책들이 갖춰지지 않으면 서비스에 부하가 걸리거나, 의도치 않는 상황에 마주쳤을 때 쉽게 문제가 생기게 된다.

이 글은 Saga Pattern 에 대해서 고민했던 내용과 그 고민이다.

Saga Pattern

MSA 구조로 개발하다 보면, 여러 Database 들을 사용하게 되는 경우가 많다. ( 혹은 하나의 DB에 많은 서비스들이 접근할 수도 있다. ) 비즈니스 로직을 도메인에 따라 많은 서비스에 분리했기 때문에, 가장 먼저 고민했던 것은 Saga 의 구현이다. 실제 적용했던 프로젝트는 NoSQL과 SQL을 모두 사용했기 때문에, 특정 Database 자체의 기능 만으로는 Saga를 구현할 수 없었다.
이 문제를 해결하기 위해 실제 Project 에서는 Choreography Saga 를 구현했다. 다른 Framework 나 서비스를 사용하지 않고, Kafka 를 이용해서 각 서비스들마다 Transaction 을 실행하게끔 했다. 이렇게 작성했더니, 시간이 지날수록 서비스들 사이의 관계가 너무 복잡해지는 문제가 발생했다. 또한 트랜잭션이 많은 서비스에 걸쳐있는 경우에 순서상으로 마지막에 있는 서비스에 과도한 트랜잭션 처리 로직이 포함된다는 단점도 있었다.
이 때문에 차후에 적용되는 프로젝트나 리팩토링을 통해 Orchestration Saga 방식으로 변경하고자 했고, 그 과정에서 많이 쓰이는 Framework 2가지 Axon 과 Eventuate Tram 에 대해서 알게 되었다. 이번 글에서는 Eventuate Tram 을 이용하여 Saga를 구현해보자.

Eventuate Tram 사용하기.

Eventuate Tram 과 Axon 둘다 공통적인 부분이 있는데, 별도의 다른 서비스가 필요하다는 것이다. Eventuate Tram 의 경우에는 Database 에 작성되는 Message를 관찰하며, Event가 발생했을 때 Kafka 에 Produce 해주는 CDC(Change Data Capture) Service 가 필요하다. 이 서비스가 Kafka 에 메시지를 발행하게 되고, Saga Participant 들이 Kafka 메시지를 구독하여 이벤트를 처리하게 된다. 따라서, Message Queue 와 Eventuate CDC Service, Database 와 Message Table 이 꼭 필요하다. 필자는 Kubernetes 환경에서 테스트했고, Message Queue 로는 Kafka, Database 는 MySQL을 이용했다.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: eventuate-tram-cdc
  namespace: dev-platform
spec:
  selector:
    matchLabels:
      app: eventuate-tram-cdc
  template:
    metadata:
      labels:
        app: eventuate-tram-cdc
        namespace: dev-platform
    spec:
      containers:
        - name: eventuate-tram-cdc
          image: eventuateio/eventuate-cdc-service:0.16.0.RELEASE
          ports:
            - containerPort: 8099
          imagePullPolicy: IfNotPresent
          envFrom:
            - configMapRef:
                name: eventuate-tram-environment-cm
---
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: dev-platform
  name: eventuate-tram-environment-cm
data:
  EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092" #Kubernetes svc
  EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: "kafka-zookeeper:2181" # Kubernetes svc

  EVENTUATE_CDC_READER_READER1_TYPE: "mysql-binlog"
  EVENTUATE_CDC_READER_READER1_DATASOURCEURL: "jdbc:mysql://mysql:3306/saga_service" # Kubernetes svc
  EVENTUATE_CDC_READER_READER1_MONITORINGSCHEMA: "saga_service"
  EVENTUATE_CDC_READER_READER1_DATASOURCEUSERNAME: "{ DATABASE_USER }"
  EVENTUATE_CDC_READER_READER1_DATASOURCEPASSWORD: "{ DATABASE_PASSWORD }"
  EVENTUATE_CDC_READER_READER1_DATASOURCEDRIVERCLASSNAME: "com.mysql.cj.jdbc.Driver"
  EVENTUATE_CDC_READER_READER1_LEADERSHIPLOCKPATH: "/eventuate/cdc/leader/saga_service"
  EVENTUATE_CDC_READER_READER1_CDCDBUSERNAME: "{ DATABASE_USER }"
  EVENTUATE_CDC_READER_READER1_CDCDBPASSWORD: "{ DATABASE_PASSWORD }"
  EVENTUATE_CDC_READER_READER1_READOLDDEBEZIUMDBOFFSETSTORAGETOPIC: "false"
  EVENTUATE_CDC_READER_READER1_MYSQLBINLOGCLIENTUNIQUEID: "1"
  EVENTUATE_CDC_READER_READER1_OFFSETSTOREKEY: "MySqlBinlogCustomerService"
  EVENTUATE_CDC_READER_READER1_OFFSETSTORAGETOPICNAME: "db.history.common"
  EVENTUATE_CDC_READER_READER1_OUTBOXID: "1"

  EVENTUATE_CDC_PIPELINE_PIPELINE1_TYPE: "eventuate-tram"
  EVENTUATE_CDC_PIPELINE_PIPELINE1_READER: "reader1"
  EVENTUATE_CDC_PIPELINE_PIPELINE1_EVENTUATEDATABASESCHEMA: "saga_service"

Kubernetes 환경이 아니라면 Docker-compose 를 사용할 수도 있고, 혹은 직접 프로세스를 띄워도 상관 없다. ConfigMap 은 Example 을 참고해서 만들었다.
Database는 Eventuate 에서 제공하는 eventuateio/eventuate-mysql 를 사용하는것이 가장 좋으나, 필자는 이미 MySQL 을 사용하고 있었기에 또 새로운 Deployment 를 구성하지 않고, 기존에 사용하는 MySQL 에 DB와 Table 을 추가했다.

DROP table IF EXISTS events;
DROP table IF EXISTS entities;
DROP table IF EXISTS snapshots;
DROP table IF EXISTS message;
DROP table IF EXISTS received_messages;
DROP table IF EXISTS cdc_monitoring;

create table events (
event_id VARCHAR(300) PRIMARY KEY,
event_type VARCHAR(300),
event_data VARCHAR(300) NOT NULL,
entity_type VARCHAR(300) NOT NULL,
entity_id VARCHAR(300) NOT NULL,
triggering_event VARCHAR(300),
metadata VARCHAR(300),
published TINYINT DEFAULT 0
);

CREATE INDEX events_idx ON events(entity_type, entity_id, event_id);
CREATE INDEX events_published_idx ON events(published, event_id);
create table entities (
entity_type VARCHAR(300),
entity_id VARCHAR(300),
entity_version VARCHAR(300) NOT NULL,
PRIMARY KEY(entity_type, entity_id)
);

create table snapshots (
entity_type VARCHAR(300),
entity_id VARCHAR(300),
entity_version VARCHAR(300),
snapshot_type VARCHAR(300) NOT NULL,
snapshot_json VARCHAR(300) NOT NULL,
triggering_events VARCHAR(300),
PRIMARY KEY(entity_type, entity_id, entity_version)
);

CREATE TABLE message (
id VARCHAR(300) NOT NULL,
destination VARCHAR(300) NOT NULL,
headers VARCHAR(1000) NOT NULL, //2023.8.17 1000으로 변경.
message_partition bigint(20) DEFAULT NULL, //2023.8.17 추가.
payload VARCHAR(300) NOT NULL,
published smallint(6) DEFAULT '0',
creation_time bigint(20) DEFAULT NULL,
PRIMARY KEY (id),
KEY message_published_idx (published,id)
);

create table received_messages (
consumer_id VARCHAR(300) NOT NULL,
message_id VARCHAR(300) NOT NULL,
creation_time bigint(20) DEFAULT NULL,
PRIMARY KEY (consumer_id, message_id)
);

create table cdc_monitoring (
reader_id VARCHAR(1000) PRIMARY KEY,
last_time BIGINT
);

CREATE TABLE offset_store(client_name VARCHAR(255) NOT NULL PRIMARY KEY, serialized_offset VARCHAR(255));
ALTER TABLE received_messages MODIFY creation_time BIGINT;
DROP Table IF Exists saga_instance_participants;
DROP Table IF Exists saga_instance;
DROP Table IF Exists saga_lock_table;
DROP Table IF Exists saga_stash_table;

CREATE TABLE saga_instance_participants (
  saga_type VARCHAR(255) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  destination VARCHAR(100) NOT NULL,
  resource VARCHAR(100) NOT NULL,
  PRIMARY KEY(saga_type, saga_id, destination, resource)
);


CREATE TABLE saga_instance(
  saga_type VARCHAR(255) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  state_name VARCHAR(100) NOT NULL,
  last_request_id VARCHAR(100),
  end_state INT(1),
  compensating INT(1),
  failed INT(1),
  saga_data_type VARCHAR(1000) NOT NULL,
  saga_data_json VARCHAR(1000) NOT NULL,
  PRIMARY KEY(saga_type, saga_id)
);

create table saga_lock_table(
  target VARCHAR(100) PRIMARY KEY,
  saga_type VARCHAR(255) NOT NULL,
  saga_Id VARCHAR(100) NOT NULL
);

create table saga_stash_table(
  message_id VARCHAR(100) PRIMARY KEY,
  target VARCHAR(100) NOT NULL,
  saga_type VARCHAR(255) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  message_headers VARCHAR(1000) NOT NULL,
  message_payload VARCHAR(1000) NOT NULL
  );

위 쿼리를 실행시켜 Table 을 적절하게 만들어주고, CDC 서비스와 Kafka, Database 를 연결시켜 설정을 마무리한다.

다음과 같이 Kafka. MySQL, CDC Service 를 구성하였다.

Kafka Topic 도 잘 작성되는걸 확인할 수 있다.

Saga Orchetrator 를 Spring framework 로 구현하기 위한 의존성을 추가하고, 핵심이 되는 Saga를 작성한다.

//Eventuate-tram Saga orchestrator
implementation 'io.eventuate.tram.sagas:eventuate-tram-sagas-spring-orchestration-simple-dsl-starter'
implementation "io.eventuate.tram.core:eventuate-tram-spring-jdbc-kafka"
implementation "io.eventuate.tram.core:eventuate-tram-spring-commands"
implementation "io.eventuate.tram.core:eventuate-tram-spring-jdbc-kafka"
@Slf4j
@Component
@RequiredArgsConstructor
public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

    ...

    private final SagaDefinition<CreateOrderSagaData> sagaDefinition =
            step()
                    .invokeLocal(this::create)
                    .withCompensation(this::reject)
                    .step()
                    .invokeParticipant(this::reserveCredit)
                    .onReply(CustomerNotFound.class, this::handleCustomerNotFound)
                    .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)
                    .step()
                    .invokeLocal(this::approve)
                    .build();
   ...

Eventuate Tram 을 통해 Saga Orchestrator 를 구현할 때 Saga Definition이 직관적으로 한눈에 파악되는게 정말 큰 장점이라 생각한다. 예시로 작성한 Saga Definition 을 보면 알겠지만, withCompensation 을 통한 보상 트랜잭션 등록, invokeParticipant 를 통해 Saga 참여자에게 알리거나 , onReply 를 통해 Error Handling 을 하는 등 실제로 비즈니스 로직을 작성할 때 굉장히 직관적으로 보였다.
예제를 통해 사용해본 결과, 기존 Choreography Saga 에서 참여자들 사이의 Transaction 흐름을 쉽게 볼 수 없었다는 문제 해결을 위해 좋은 선택지로 보인다. 다음 글에서는 Java 측에서 많이 사용되는 MSA 프레임워크인 Axon 을 이용해서 구현해보겠다.

Leave a Comment