본문 바로가기
토이프로젝트

Kafka 를 이용해서 Eventually Consistency 를 구현해보자!

by 이석준석이 2022. 6. 27.

주문, 결제, 재고의 상태를 맞춰보자!


예시) 결제를 완료하면, 주문의 상태를 결제대기 -> 배송준비, 재고를 한개 감소 하는 로직이 있다고 가정해본다.

 

1. 만약 주문과 결제의 DB 를 같은 서버의 인스턴스에서 접근가능하다면, 하나의 트랜잭션에 묶여 아래처럼 구현될 것 같다.

@Transactional
public void finishPayment() {
    paymentRepository.save(payment);
    
    orderRepository.findById(payment.getOrderId());
    order.paymentFinished();
    
    prodRepository.findById(payment.getProductId());
    prod.minusStock();
}

public class Order {
    public void paymentFinished() {
        this.status = OrderStatus.PREPARE_SHIPPING
    }
}

public class Prod {
	public void minusStock() {
    	this.stock = stock - 1;
    }
}

 

2. 만약 "주문 API, 결제 API, 재고 API 가 따로있는 상태" 라면 어떻게 해야할까?

@Transactional
public void finishPayment() {
    paymentRepository.save(payment);
    
    // 외부호출을 통해 상태를 변경
    orderClient.paymentFinished(payment.getOrderId());
    productClient.paymentFinished(payment.getProductId());
}

 

하나의 트랜잭션으로 묶여있는 1번 케이스atomic 하게 상태를 변경할 수 있다.

  • 상품재고가 감소되는것이 실패하거나, 주문의 상태변경이 실패되면 롤백 가능하다.

 

하지만, API 호출을 통해 상태를 변경하는 경우, 아래와 같은 문제점이 발생할 수 있다.

  • orderClient 는 성공했지만, pruductClient 가 실패한다면..
  • 결제는 롤백될 수 있으나 주문의 상태가 배송준비인 상태가 되는 문제가 생긴다.

 

이러한 상황을 Message Queue 를 이용하여 해결할 수 있고, 이번에 Message Queue 중 하나인 Kafka 를 공부하여, 해당 목표를 달성하는 기능을 구현해본다.

 

Eventually Consistency 란?


결제의 입장에서는 Message Queue 에 전송이 성공하면, 결제가 완료됐다고 생각하고 저장한다.

  • 결제의 입장에서는 Message Queue 에 적재가 성공했으니, "주문과 상품재고는 언젠가 갱신이 되겠지" 라고 가정해본다.

주문과 상품재고의 입장에서는 Message Queue 에 있는 Record 를 Consume 하여, 자신들의 상태를 변경한다.

  • 이 과정에서 주문과 상품재고의 상태가 변경되는 순간이 다를 수 있고,
  • 즉시 반영되지 않을 수 있으나
  • Message Queue 에 데이터가 항상 저장되어있음이 보장된다면 결국에는 결제, 주문, 상품재고의 상태는 우리의 목표대로 상태가 저장될 것이다.

Eventually Consistency (궁극적 일관성) : 결국 일관성(consistency) 이 유지된다.

 

Message Queue 가 가져야 할 기능, Solved By Kafka


필요한 기능은 크게 아래의 3가지로 생각된다.

  • Message Queue 에 저장된 데이터는 절대 없어져서는 안된다.
  • Payment 의 네트워크 상황이 열악하더라도 Payment 가 저장됐다면 Message Queue 에 한번의 결제완료 상태만 저장되어야 한다.
    • 아니라면 ..? 상품재고가 2회 이상 차감될 가능성이 있다.
  • 주문과 상품재고는 Message Queue 내 결제에 대해서 처리(Consume)를 완료했다면 이를 처리했다는 것을 어딘가에 저장 [(ex) flag 처리] 한 뒤,  다음에 다시 처리하지 않아야한다.

1. Message Queue 에 저장된 데이터는 절대 없어져서는 안된다.


Kafka 는

  • 적재된 데이터를 디스크에 저장한다.
    • 메모리를 사용하여 캐싱하기에 느리지않아요~!
  • HA (High Availavility) a.k.a 클러스터링 기능을 제공하며, replicas (복제본) 를 사용해 저장된 데이터를 여러곳에 저장할 수 있기에 안전하게 데이터를 저장할 수 있다.
  • 데이터를 저장하는 Producer (위의 예제에서 Payment)acks 옵션 min.insync.replicas 옵션을 사용하여 복제본 까지 데이터가 안전하게 저장됐음을 확인할 수 있다.

Keyword : Clustering, acks(by producer), min.insync.replicas(by topic)

 

2. Payment 의 네트워크 상황이 열악하더라도, 한번의 결제완료만 저장돼야한다.


Kafka 는

  • Producer 의 enable.idempotence 옵션을 통해, Kafka 에 단 한번만 저장(멱등성 프로듀서) 되도록 설정할 수 있다.

Keyword : enable.idempotence, 멱등성 프로듀서

 

3. 주문과 상품재고는 Consume 을 완료했다면, 이를 처리했다는 것을 어딘가에 저장해야 한다.


Kafka 는

  • 컨슈머 그룹 내에서 커밋을 통해, 해당 데이터가 처리됐음을 Kafka 내부에 저장해 놓고 해당 데이터의 처리여부를 확인할 수 있다.

Keyword : 컨슈머그룹, commit

 

카프카는 우리가 목표로하는 3가지를 제공하기에, Eventually Consistency 를 달성할 수 있다.

 

나의 토이프로젝트


카프카에 대한 공부를 어느정도 완료한 뒤, 토이프로젝트를 구현해봤다. (https://github.com/robin00q/kafka_toy_project)

 

시나리오 : 결제가 완료되면, 주문의 상태를 배송준비중으로 변경 (Using Kafka) 한다.

 

결제 (Payment) 모듈

  • DB : MySQL (결제완료의 상태를 저장)
  • Kafka Producer
    • payment 토픽 발행
    • acks = all 옵션을 사용하여 토픽의 min.insync.replicas 만큼의 팔로워 파티션에 데이터가 저장됐음을 보장한다.

주문 (Order) 모듈

  • DB : MySQL (주문의 상태를 저장)
  • Kafka Consumer
    • payment 토픽 구독
    • 수동커밋 모드를 사용하여, 주문의 상태가 변경됐다면 컨슈머그룹에 커밋한다.

핵심코드 1 (결제모듈 - 결제 완료)


  1. 결제 완료 메소드 내에서, 결제완료 상태를 저장하고, payment.finishPayment() 를 호출한다.
  2. Payment 도메인 내에서, ApplicationEvent 로 PaymentFinishedEvent 를 발행한다.
  3. 이벤트핸들러는 Kafka 에 메세지를 프로듀싱한다.
// 결제 완료
public Long startPayment(StartPaymentRequest request) {
    Payment payment = request.toPayment();
    // 저장을 통해 결제가 됐다고 가정한다.
    paymentRepository.save(payment);
    payment.finishPayment();

    return payment.getId();
}


// 결제 완료 이벤트 발행
public class Payment {
    public void finishPayment() {
        Events.raise(new PaymentFinishedEvent(orderId, amount));
    }
}

// 이벤트를 수신한 이벤트핸들러는 Kafka 에 메세지를 프로듀싱한다
public class PaymentFinishedEventHandler {

    private static final String PAYMENT_FINISHED_TOPIC = "payments";

    private final KafkaTemplate<String, String> kafkaTemplate;

    @EventListener(PaymentFinishedEvent.class)
    @Async
    public void handle(PaymentFinishedEvent event) {
        PaymentFinishedRecord record = PaymentFinishedRecord.from(event);
    }
}

핵심코드 2 (주문모듈 - 주문 상태 변경)


  1. Kafka 메세지 프로듀서는 결제완료 레코드를 컨슈밍한다.
  2. Order 를 조회한 뒤, finishPayment() 메소드를 이용해서 결제가 완료됐음을 주문도메인에 알린다.
  3. Order 도메인 내에서 주문의 상태를 변경한다.
  4. 이후 주문상태 저장, 수동커밋된다.
// Kafka 에서 메세지를 컨슘한다.
@KafkaListener(topics = "payments",
        groupId = PaymentFinishedConsumerConfig.PAYMENTS_CONSUMER_GROUP)
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
    log.info("consume Record: {}", data.value());

    try {
        PaymentFinishedRecord record =
                objectMapper.readValue(data.value(), PaymentFinishedRecord.class);
        orderPaymentFinishedService.paymentFinished(
                record.getOrderId(), record.getAmount());

        // 예외가 발생하지 않는다면, 수동커밋을 진행하여 읽은 메세지는 처리됐음을 보장한다.!
        acknowledgment.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

// 주문을 조회하고, finishPayment() 메소드를 호출한다.
@Transactional
public void paymentFinished(long orderId, int amount) {
    Order order = orderRepository.findById(orderId);

    order.finishPayment();
}

// 주문의 상태를 변경한다.
public class Order {
    public void finishPayment() {
        this.status = OrderStatus.PREPARE_SHIPPING;
    }
}

 

가장 힘들었던 트러블슈팅..?


Docker 를 이용해서 한 개의 zookeeper, 두 개의 kafka broker 를 구성하는 과정에서 애를 많이 먹었다.

나의 카프카 설정에서는 2개의 주소가 필요했다.

  1. 카프카 브로커 끼리의 통신을 하기위한 주소
  2. 외부 클라이언트 [(ex) Producer, Consumer App] 가 접근하여 사용할 수 있는 주소

 

위 설정은 advertised.listener 를 통해 설정할 수 있다.

  • advertised.listener 에 2개의 주소를 등록한다.
    1. INTERNAL : 카프카 브로커 끼리 통신을 하기 위한 주소
    2. EXTERNAL : 외부 클라이언트가 접근하여 사용할 수 있는 주소
  • inter.broker.listener.name 에 INTERNAL 을 입력하여, 내부 브로커의 통신에서는 INTERNAL 주소를 사용하도록 명시한다.

아래의 주석을 읽는다면 더 쉽게 이해할 수 있을 것이다.

version: "3"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.1.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    networks:
      - kafka_net


  kafka-1:
    image: confluentinc/cp-kafka:7.1.1
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      # 브로커 간 통신에서 사용하는 리스너의 이름을 정의한다.
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      # 외부에서 접근하는 클라이언트가 사용할 리스너가 "listeners 설정" 과 다른 경우, 사용하는 리스너이다.
      # 외부 클라이언트가 사용할 수 있도록 zookeeper 에 게시한다.
      ### INTERNAL 로 정의한 부분은 브로커 간의 통신에서 사용할 것이고, (for Docker Internal Network)
      ### EXTERNAL 로 정의한 부분은 외부에서 프로듀서, 컨슈머의 요청의 통신에서 사용한다. (for External Clients)
      ### 참고 : https://www.confluent.io/ko-kr/blog/kafka-listeners-explained/
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9092,EXTERNAL://localhost:29092
      # 리스너 이름과 보안프로토콜 간의 매핑
      # PLAINTEXT : 암호화 되지않은 인증이없는 프로토콜
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - kafka_net

  kafka-2:
    image: confluentinc/cp-kafka:7.1.1
    depends_on:
      - zookeeper-1
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:9092,EXTERNAL://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - kafka_net

networks:
  kafka_net:
    driver: bridge

 

다음으로..


위의 예시 시나리오에는 문제가 있다.

  • 결제 전, API 호출을 통해 Product 에 재고가 있었음을 파악한 뒤, 결제를 완료했지만, 순간 다른 클라이언트가 먼저 결제를 완료하여 재고를 차감했다면?
    • 재고 차감에 실패했기에, 결제와 주문쪽의 상태를 롤백하는 일종의 보상 트랜잭션이 필요하다.

 

다음에는 Redis 를 공부할 생각이다.

  • 상품쪽 모듈에 대해서는 Redis 로 재고관리하는 부분을 구현하고,
  • 그 이후 보상트랜잭션까지 구현한 뒤, 2편을 작성해보려고 한다.!

 

참고 및 공부자료



... 저 사실 못해서 틀린내용이 있을지도.?.. 지적.. 해주시면.. 감사... 반영... 하겠... 습니다...

'토이프로젝트' 카테고리의 다른 글

Redis 를 이용해서 재고를 관리해보자!  (2) 2022.08.03