Redis를 이용한 동시성 이슈 해결하기 (2/2)

이전 글에 이어서 Redis 를 이용한 Distributed Lock 을 구현한 방법에 대해서 작성한다. Kurly Tech Blog 에 작성된 글을 바탕으로 작성하였다.

이미 원글에서도 상세하게 설명하고 있지만, 분산락을 구현하기 위한 구현체로 Redisson 을 사용했다.
Spring framework 기반에서 사용할 수 있는 대표적인 Redis Client 로 Lettuce , Redisson 가 있는데 이중에서 Lettuce 의 경우는 락을 획득하기 위해서 계속해서 재시도 요청을 보내, Redis 에 부담을 주는 Spin Lock 형태이며 공식적으로 분산락도 구현되어 있지 않다. 따라서 직접 구현해야 하는 부분이 많은 반면 Redisson 은 분산락에 관한 많은 기능을 제공하고 쉽게 이용할 수 있다는 점과 Pub – Sub 방식으로 Spin lock 보다는 Redis 에 부하를 덜 주기 때문에 보통 이를 이용해서 구현하는 예제가 많았다. ( 원글도 해당 라이브러리를 이용한다. )

필자도 마찬가지로 Redisson client를 이용하여 Lock 을 구현하였다.

// build.gradle
dependencies{
    // Redisson client
    implementation "org.redisson:redisson-spring-boot-starter:${redissonVersion}"

    //Embedded Redis
    implementation "it.ozimov:embedded-redis:0.7.3" exclude group: "org.slf4j", module: "slf4j-simple"
}

1. Embedded Redis 구성하기

Redis 를 이용하여 락을 구현하게 되면 해당 로직은 Redis 가 없으면 정상적으로 동작하지 않게 된다. 즉, Redis 에 강한 의존성을 갖게 된다. 이를 해결하기 위해서 두가지 해결방법을 생각할 수 있는데, 첫 번째로 Redis 가 없는 환경에서는 다른 방식의 락을 사용하도록 하는 것‘Redis 가 없는 환경을 제거하는 것’ 이다.

이미 Caching을 위해서 Redis를 사용하고 있어 이미 Redis 에 많은 의존성을 갖고 있는 상황이었기에, Redis 의 의존성을 제거하기 보다는 Production, Develop, Local 까지 모든 환경에 Redis 를 제공하는 방향으로 구성했다. 이를 위해 Embedded Redis 를 이용해 로컬 환경에서도 Redis 를 사용할 수 있게 하여 Redis 가 없는 환경을 제거했다.

Embedded redis 가 x86 만을 지원하기 때문에 Arm 기반에서는 시작조차 되지 않는다. 이를 해결하기 위한 방법으로 Executable File 을 이용하여 RedisServer 를 구성할 수 있는 생성자를 제공한다.

package redis.embedded;

...

public class RedisServer extends AbstractRedisInstance {
    private static final String REDIS_READY_PATTERN = ".*(R|r)eady to accept connections.*";
    private static final int DEFAULT_REDIS_PORT = 6379;

    public RedisServer() {
        this(6379);
    }

    public RedisServer(int port) {
        super(port);
        this.args = builder().port(port).build().args;
    }

    public RedisServer(File executable, int port) {
        super(port);
        this.args = Arrays.asList(executable.getAbsolutePath(), "--port", Integer.toString(port));
    }
...
}

따라서, ARM 에서 정상적으로 동작하는 Redis 바이너리를 classpath에 포함시킨 뒤에 ARM 일 경우 해당 생성자를 이용하여 구성하게끔 했다.

@Slf4j
@Profile({"local","test"})
@Configuration
public class EmbeddedRedis {

    private RedisServer redisServer;
    private final int redisPort = 6379;

    @PostConstruct
    public void init() throws IOException {
        if (isArm()) {
            this.redisServer = new RedisServer(getRedisFileForArm(), this.redisPort);
            this.redisServer.start();
        } else {
            this.redisServer = new RedisServer(this.redisPort);
            this.redisServer.start();
            log.info("In-memory redis enabled at {}", this.redisPort);
        }
    }

    private File getRedisFileForArm() {
        try {
            return new ClassPathResource("redis/redis-server-arm64").getFile();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    @PreDestroy
    public void destroy(){
        this.redisServer.stop();
    }
}
@Profile({"local","test"})
@Configuration
public class LocalRedissonConfiguration {
    private static final String REDIS_LOCAL_HOST = "redis://127.0.0.1";

    @Bean(destroyMethod = "shutdown")
    @DependsOn("embeddedRedis")
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress(REDIS_LOCAL_HOST+":6379");
        return Redisson.create(config);
    }
}

Local 환경이 아닌, 기존에 구성되어 있는 Redis 혹은 Redis Cluster 를 이용하는 경우에는 AutoConfiguration을 이용한다. appliation.properties ( application.yaml ) 에 “spring.data.redis” ( RedisProperties.class ) 항목을 구성해주면 된다.

package org.redisson.spring.starter;

@Configuration
@ConditionalOnClass({Redisson.class, RedisOperations.class})
@ConditionalOnMissingClass({"org.springframework.boot.autoconfigure.AutoConfiguration"})
@AutoConfigureBefore({RedisAutoConfiguration.class})
@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
public class RedissonAutoConfiguration {
    
    ...
    @Bean(
        destroyMethod = "shutdown"
    )
    @ConditionalOnMissingBean({RedissonClient.class})
    public RedissonClient redisson() throws IOException {
        Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
        Method usernameMethod = ReflectionUtils.findMethod(RedisProperties.class, "getUsername");
        Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
        Method connectTimeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getConnectTimeout");
        Method clientNameMethod = ReflectionUtils.findMethod(RedisProperties.class, "getClientName");
        Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, this.redisProperties);
        String prefix = this.getPrefix();
        String username = null;
        int database = this.redisProperties.getDatabase();
        String password = this.redisProperties.getPassword();
        boolean isSentinel = false;
        boolean isCluster = false;
...

2. DistributedLock 구현하기

기술블로그에서 설명한 부분을 따라서 직접 구현해본다. Distributed Lock 을 구현할 Method 에 Lock을 구현할 커스텀 어노테이션 과 AOP가 핵심이다.

/**
 * Reference from <a href="https://helloworld.kurly.com/blog/distributed-redisson-lock/">kurly tech blog</a>
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {

    // Information of the key to be used for locking.
    String key();

    // The unit of waitTime and leaseTime. The default is seconds (SECONDS).
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    // The maximum waiting time to acquire the lock. The default is 5 seconds.
    long waitTime() default 5L;

    // The maximum time to hold the acquired lock. The default is 3 seconds.
    long leaseTime() default 3L;
}
/**
 * Reference from <a href="https://helloworld.kurly.com/blog/distributed-redisson-lock/">kurly tech blog</a>
 */
@Aspect
@RequiredArgsConstructor
@Slf4j
public class DistributedLockAop{
    private static final String REDISSON_LOCK_PREFIX = "LOCK:";

    private final RedissonClient redissonClient;
    private final RedisLockTransaction redisLockTransaction;


    @Around("@annotation(dev.notypie.lock.DistributedLock)")
    public Object doLock(final ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        DistributedLock redisLock = method.getAnnotation(DistributedLock.class);

        String key = REDISSON_LOCK_PREFIX + this.getDynamicValue(signature.getParameterNames(), joinPoint.getArgs(), redisLock.key());
        //Get lock
        RLock rLock = redissonClient.getLock(key);

        try {
            boolean available = rLock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());
            if (!available) {
                return false;
            }
            return this.redisLockTransaction.proceed(joinPoint);
        } catch (InterruptedException e) {
            throw new InterruptedException();
        } finally {
            try {
                rLock.unlock();
            } catch (IllegalMonitorStateException e) {
                log.info("Redisson Lock Already UnLock {} {}"
                        , method.getName(), key);
            }
        }
    }


    private Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
        ExpressionParser parser = new SpelExpressionParser();
        StandardEvaluationContext context = new StandardEvaluationContext();

        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], args[i]);
        }

        return parser.parseExpression(key).getValue(context, Object.class);
    }
}

AOP 에서 rLock.tryLock(…) 을 통해 락 획득을 시도하고, 락을 획득했다면 DistributedLock 어노테이션이 선언된 메서드를 실행한 뒤 finally block 에서 unlock 하는 직관적인 함수이다.

/**
 * Reference from <a href="https://helloworld.kurly.com/blog/distributed-redisson-lock/">kurly tech blog</a>
 */
public class RedisLockTransaction {

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Object proceed(final ProceedingJoinPoint joinPoint) throws Throwable{
        return joinPoint.proceed();
    }
}

개인적으로 가장 핵심이라고 생각되는 부분이다. DistributedLock 어노테이션이 선언된 메서드를 별도의 트랜잭션( Propagation.REQUIRES_NEW )으로 실행한다. 이렇게 구성하면 앞서 살펴본 AOP 의 finally block 에 도달하여 Lock 을 해제하는 시점보다 이곳에서 @DistributedLock 메소드에 새로 할당한 Transaction이 완료되는 시점이 더 빠르다는것이다.

이렇게 구성하는 이유는 본문에서도 자세히 설명하고 있지만, 여러 사용자가 접근해서 Lock 을 얻으려 시도할 때 Lock 이 해제되는 시점이 영속성 컨텍스트에 반영되는 시점보다 빠르다면 Database에 반영되기 이전 찰나의 시점에 Lock 을 획득하여 변경되기 이전의 값으로 Transaction 이 시작되어 동시성 이슈가 발생할 수 있기 때문에 그렇다.

3. 적용하기

그렇다면 문제가 발생했던 부분에 실제로 적용해보자.

@DistributedLock(key = "#id")
public ResponseEntity<EntityResponseDto> executeService(Long id) {
    return this.repository.findById(id).stream()
            .filter(Entity::isNotOccupied)
            .map(entity -> {
                entity.setOccupied(true);
                // entity.addRuntimeCount(); 와 같은 식으로 N번 실행도 응용 가능하다.
                return entity;
            })
            .map( entity -> this.repository.save(entity))
			.map( entity -> this.kafaService.produceMessage..) 
            ...
}

이제 DistributedLock 을 적용했으니 Entity 의 Occupied 플래그를 int형으로 변환하면 딱 한번만 실행되게 할 수도, 혹은 지정한 특정 횟수 만큼만 실행시킬 수도 있다.
테스트코드를 다시 돌려 확인해보면

이제 올바르게 작동하는것을 확인할 수 있다.

4. 마치며

원문에 너무 상세하고 자세히 설명되어 있고, 사용하기 쉽게 AOP 와 Custom Annotation, 심지어 테스트코드까지 모든 소스코드가 공개되어 있어 구현하기는 어렵지 않았고, 간단한 응용을 통해 쉽게 구현할 수 있었다.
새 트랜잭션을 구성하게 하여 Lock의 해제 시점보다 Commit 의 시점을 반드시 빠르게 만드는 방법이 쉽고 직관적으로 이해할 수 있어서 너무 편리하고 좋았던것 같다.

Leave a Comment