주문, 결제, 재고의 상태를 맞춰보자!
예시) 결제를 완료하면, 주문의 상태를 결제대기 -> 배송준비, 재고를 한개 감소 하는 로직이 있다고 가정해본다.
1. 만약 주문과 결제의 DB 를 같은 서버의 인스턴스에서 접근가능하다면, 하나의 트랜잭션에 묶여 아래처럼 구현될 것 같다.
@Transactional
public void finishPayment() {
paymentRepository.save(payment);
orderRepository.findById(payment.getOrderId());
order.paymentFinished();
prodRepository.findById(payment.getProductId());
prod.minusStock();
}
public class Order {
public void paymentFinished() {
this.status = OrderStatus.PREPARE_SHIPPING
}
}
public class Prod {
public void minusStock() {
this.stock = stock - 1;
}
}
2. 만약 "주문 API, 결제 API, 재고 API 가 따로있는 상태" 라면 어떻게 해야할까?
@Transactional
public void finishPayment() {
paymentRepository.save(payment);
// 외부호출을 통해 상태를 변경
orderClient.paymentFinished(payment.getOrderId());
productClient.paymentFinished(payment.getProductId());
}
하나의 트랜잭션으로 묶여있는 1번 케이스는 atomic 하게 상태를 변경할 수 있다.
- 상품재고가 감소되는것이 실패하거나, 주문의 상태변경이 실패되면 롤백 가능하다.
하지만, API 호출을 통해 상태를 변경하는 경우, 아래와 같은 문제점이 발생할 수 있다.
- orderClient 는 성공했지만, pruductClient 가 실패한다면..
- 결제는 롤백될 수 있으나 주문의 상태가 배송준비인 상태가 되는 문제가 생긴다.
이러한 상황을 Message Queue 를 이용하여 해결할 수 있고, 이번에 Message Queue 중 하나인 Kafka 를 공부하여, 해당 목표를 달성하는 기능을 구현해본다.
Eventually Consistency 란?
결제의 입장에서는 Message Queue 에 전송이 성공하면, 결제가 완료됐다고 생각하고 저장한다.
- 결제의 입장에서는 Message Queue 에 적재가 성공했으니, "주문과 상품재고는 언젠가 갱신이 되겠지" 라고 가정해본다.
주문과 상품재고의 입장에서는 Message Queue 에 있는 Record 를 Consume 하여, 자신들의 상태를 변경한다.
- 이 과정에서 주문과 상품재고의 상태가 변경되는 순간이 다를 수 있고,
- 즉시 반영되지 않을 수 있으나
- Message Queue 에 데이터가 항상 저장되어있음이 보장된다면 결국에는 결제, 주문, 상품재고의 상태는 우리의 목표대로 상태가 저장될 것이다.
Eventually Consistency (궁극적 일관성) : 결국 일관성(consistency) 이 유지된다.
Message Queue 가 가져야 할 기능, Solved By Kafka
필요한 기능은 크게 아래의 3가지로 생각된다.
- Message Queue 에 저장된 데이터는 절대 없어져서는 안된다.
- Payment 의 네트워크 상황이 열악하더라도 Payment 가 저장됐다면 Message Queue 에 한번의 결제완료 상태만 저장되어야 한다.
- 아니라면 ..? 상품재고가 2회 이상 차감될 가능성이 있다.
- 주문과 상품재고는 Message Queue 내 결제에 대해서 처리(Consume)를 완료했다면 이를 처리했다는 것을 어딘가에 저장 [(ex) flag 처리] 한 뒤, 다음에 다시 처리하지 않아야한다.
1. Message Queue 에 저장된 데이터는 절대 없어져서는 안된다.
Kafka 는
- 적재된 데이터를 디스크에 저장한다.
- 메모리를 사용하여 캐싱하기에 느리지않아요~!
- HA (High Availavility) a.k.a 클러스터링 기능을 제공하며, replicas (복제본) 를 사용해 저장된 데이터를 여러곳에 저장할 수 있기에 안전하게 데이터를 저장할 수 있다.
- 데이터를 저장하는 Producer (위의 예제에서 Payment) 는 acks 옵션과 min.insync.replicas 옵션을 사용하여 복제본 까지 데이터가 안전하게 저장됐음을 확인할 수 있다.
Keyword : Clustering, acks(by producer), min.insync.replicas(by topic)
2. Payment 의 네트워크 상황이 열악하더라도, 한번의 결제완료만 저장돼야한다.
Kafka 는
- Producer 의 enable.idempotence 옵션을 통해, Kafka 에 단 한번만 저장(멱등성 프로듀서) 되도록 설정할 수 있다.
Keyword : enable.idempotence, 멱등성 프로듀서
3. 주문과 상품재고는 Consume 을 완료했다면, 이를 처리했다는 것을 어딘가에 저장해야 한다.
Kafka 는
- 컨슈머 그룹 내에서 커밋을 통해, 해당 데이터가 처리됐음을 Kafka 내부에 저장해 놓고 해당 데이터의 처리여부를 확인할 수 있다.
Keyword : 컨슈머그룹, commit
카프카는 우리가 목표로하는 3가지를 제공하기에, Eventually Consistency 를 달성할 수 있다.
나의 토이프로젝트
카프카에 대한 공부를 어느정도 완료한 뒤, 토이프로젝트를 구현해봤다. (https://github.com/robin00q/kafka_toy_project)
시나리오 : 결제가 완료되면, 주문의 상태를 배송준비중으로 변경 (Using Kafka) 한다.
결제 (Payment) 모듈
- DB : MySQL (결제완료의 상태를 저장)
- Kafka Producer
- payment 토픽 발행
- acks = all 옵션을 사용하여 토픽의 min.insync.replicas 만큼의 팔로워 파티션에 데이터가 저장됐음을 보장한다.
주문 (Order) 모듈
- DB : MySQL (주문의 상태를 저장)
- Kafka Consumer
- payment 토픽 구독
- 수동커밋 모드를 사용하여, 주문의 상태가 변경됐다면 컨슈머그룹에 커밋한다.
핵심코드 1 (결제모듈 - 결제 완료)
- 결제 완료 메소드 내에서, 결제완료 상태를 저장하고, payment.finishPayment() 를 호출한다.
- Payment 도메인 내에서, ApplicationEvent 로 PaymentFinishedEvent 를 발행한다.
- 이벤트핸들러는 Kafka 에 메세지를 프로듀싱한다.
// 결제 완료
public Long startPayment(StartPaymentRequest request) {
Payment payment = request.toPayment();
// 저장을 통해 결제가 됐다고 가정한다.
paymentRepository.save(payment);
payment.finishPayment();
return payment.getId();
}
// 결제 완료 이벤트 발행
public class Payment {
public void finishPayment() {
Events.raise(new PaymentFinishedEvent(orderId, amount));
}
}
// 이벤트를 수신한 이벤트핸들러는 Kafka 에 메세지를 프로듀싱한다
public class PaymentFinishedEventHandler {
private static final String PAYMENT_FINISHED_TOPIC = "payments";
private final KafkaTemplate<String, String> kafkaTemplate;
@EventListener(PaymentFinishedEvent.class)
@Async
public void handle(PaymentFinishedEvent event) {
PaymentFinishedRecord record = PaymentFinishedRecord.from(event);
}
}
핵심코드 2 (주문모듈 - 주문 상태 변경)
- Kafka 메세지 프로듀서는 결제완료 레코드를 컨슈밍한다.
- Order 를 조회한 뒤, finishPayment() 메소드를 이용해서 결제가 완료됐음을 주문도메인에 알린다.
- Order 도메인 내에서 주문의 상태를 변경한다.
- 이후 주문상태 저장, 수동커밋된다.
// Kafka 에서 메세지를 컨슘한다.
@KafkaListener(topics = "payments",
groupId = PaymentFinishedConsumerConfig.PAYMENTS_CONSUMER_GROUP)
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
log.info("consume Record: {}", data.value());
try {
PaymentFinishedRecord record =
objectMapper.readValue(data.value(), PaymentFinishedRecord.class);
orderPaymentFinishedService.paymentFinished(
record.getOrderId(), record.getAmount());
// 예외가 발생하지 않는다면, 수동커밋을 진행하여 읽은 메세지는 처리됐음을 보장한다.!
acknowledgment.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
// 주문을 조회하고, finishPayment() 메소드를 호출한다.
@Transactional
public void paymentFinished(long orderId, int amount) {
Order order = orderRepository.findById(orderId);
order.finishPayment();
}
// 주문의 상태를 변경한다.
public class Order {
public void finishPayment() {
this.status = OrderStatus.PREPARE_SHIPPING;
}
}
가장 힘들었던 트러블슈팅..?
Docker 를 이용해서 한 개의 zookeeper, 두 개의 kafka broker 를 구성하는 과정에서 애를 많이 먹었다.
- 참고 : https://www.confluent.io/ko-kr/blog/kafka-listeners-explained/
- 링크 내에서, HOW TO: Connecting to Kafka On Docker 를 참고!
나의 카프카 설정에서는 2개의 주소가 필요했다.
- 카프카 브로커 끼리의 통신을 하기위한 주소
- 외부 클라이언트 [(ex) Producer, Consumer App] 가 접근하여 사용할 수 있는 주소
위 설정은 advertised.listener 를 통해 설정할 수 있다.
- advertised.listener 에 2개의 주소를 등록한다.
- INTERNAL : 카프카 브로커 끼리 통신을 하기 위한 주소
- EXTERNAL : 외부 클라이언트가 접근하여 사용할 수 있는 주소
- inter.broker.listener.name 에 INTERNAL 을 입력하여, 내부 브로커의 통신에서는 INTERNAL 주소를 사용하도록 명시한다.
아래의 주석을 읽는다면 더 쉽게 이해할 수 있을 것이다.
version: "3"
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:7.1.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
networks:
- kafka_net
kafka-1:
image: confluentinc/cp-kafka:7.1.1
depends_on:
- zookeeper-1
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
# 브로커 간 통신에서 사용하는 리스너의 이름을 정의한다.
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# 외부에서 접근하는 클라이언트가 사용할 리스너가 "listeners 설정" 과 다른 경우, 사용하는 리스너이다.
# 외부 클라이언트가 사용할 수 있도록 zookeeper 에 게시한다.
### INTERNAL 로 정의한 부분은 브로커 간의 통신에서 사용할 것이고, (for Docker Internal Network)
### EXTERNAL 로 정의한 부분은 외부에서 프로듀서, 컨슈머의 요청의 통신에서 사용한다. (for External Clients)
### 참고 : https://www.confluent.io/ko-kr/blog/kafka-listeners-explained/
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9092,EXTERNAL://localhost:29092
# 리스너 이름과 보안프로토콜 간의 매핑
# PLAINTEXT : 암호화 되지않은 인증이없는 프로토콜
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka_net
kafka-2:
image: confluentinc/cp-kafka:7.1.1
depends_on:
- zookeeper-1
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:9092,EXTERNAL://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka_net
networks:
kafka_net:
driver: bridge
다음으로..
위의 예시 시나리오에는 문제가 있다.
- 결제 전, API 호출을 통해 Product 에 재고가 있었음을 파악한 뒤, 결제를 완료했지만, 순간 다른 클라이언트가 먼저 결제를 완료하여 재고를 차감했다면?
- 재고 차감에 실패했기에, 결제와 주문쪽의 상태를 롤백하는 일종의 보상 트랜잭션이 필요하다.
다음에는 Redis 를 공부할 생각이다.
- 상품쪽 모듈에 대해서는 Redis 로 재고관리하는 부분을 구현하고,
- 그 이후 보상트랜잭션까지 구현한 뒤, 2편을 작성해보려고 한다.!
참고 및 공부자료
- (DDD Start!!!, 현재는 리뉴얼된 신판이 나왔다고 한다.) http://www.yes24.com/Product/Goods/27750871
- (인프런 카프카 강의) https://www.inflearn.com/course/%EC%95%84%ED%8C%8C%EC%B9%98-%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D
- https://www.popit.kr/rest-%EA%B8%B0%EB%B0%98%EC%9D%98-%EA%B0%84%EB%8B%A8%ED%95%9C-%EB%B6%84%EC%82%B0-%ED%8A%B8%EB%9E%9C%EC%9E%AD%EC%85%98-%EA%B5%AC%ED%98%84-1%ED%8E%B8/
- (아파치 카프카 한글 번역) https://godekdls.github.io/Apache%20Kafka/contents/
... 저 사실 못해서 틀린내용이 있을지도.?.. 지적.. 해주시면.. 감사... 반영... 하겠... 습니다...
'토이프로젝트' 카테고리의 다른 글
Redis 를 이용해서 재고를 관리해보자! (2) | 2022.08.03 |
---|