Featured image of post Giới Thiệu Về Apache Kafka và Event Driven Architecture Phần II

Giới Thiệu Về Apache Kafka và Event Driven Architecture Phần II

Ở bài trước, chúng ta đã cùng nhau tìm hiểu các khái niệm cơ bản về Event Driven Architecture và các thành phần chính của Apache Kafka. Trong phần này, hãy cùng áp dụng vào thực tế cách sử dụng Kafka trong Java Spring Boot qua các ví dụ cụ thể. Chúng ta sẽ học cách cài đặt Kafka, triển khai producer để gửi thông tin đơn hàng và consumer để xử lý dữ liệu. Cùng bắt đầu ngay thôi!

Cài đặt kafka

Để sử dụng bất cứ dịch vụ bên ngoài nào thì việc đầu tiên chúng ta cần cài đặt nó trên môi trường phát triển mà chúng ta đang làm việc, bạn có thể cài đặt Apache kafka trực tiếp vào máy local của mình hoặc sử dụng docker compose với container Zookeeper để quản lý Kafka cluster và kafka service sử dụng cho việc producer/consume message.

Về việc cài đặt thì trên mạng cũng có khá nhiều bài hướng dẫn rồi, mình sẽ dẫn link tại đây cho mọi người dễ dàng cài đặt. Chỉ cần làm theo hướng dẫn là được :D

https://kafka.apache.org/quickstart

https://topdev.vn/blog/cai-dat-apache-kafka-su-dung-docker-compose/

https://www.conduktor.io/kafka/how-to-install-apache-kafka-on-mac/

Sử Dụng Kafka với Spring Boot

Bây giờ, giả sử bạn đã cài đặt xong zookeeper và kafka service, hãy chạy đoạn command dưới đây để khởi tạo một topic để chúng ta có thể sử dụng các service như là producer để produce event cho các consumer.

1
bin/kafka-topics.sh --create --topic order_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Bây giờ, sau khi đã tạo topic xong chúng ta cần thêm config cho kafka cluster và đăng ký schema trong service Java producer/consumer của chúng ta.
application.yml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      schema.registry.url: http://localhost:8081
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      specific-avro-reader: true

Sau khi đã thêm config xong, chúng ta cần sử dụng Avro để định nghĩa schema và tạo các class Java từ schema, định nghĩa cho data mà một event sẽ contain. Chúng ta chỉ đơn giản tạo 1 file

Order.avsc:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "product", "type": "string"},
    {"name": "quantity", "type": "int"},
    {"name": "price", "type": "double"}
  ]
}

Sau đó cần sử dụng câu lệnh này để generate Java Class từ schema

1
java -jar avro-tools-1.10.2.jar compile schema Order.avsc .

Bây giờ giả sử chúng ta có Service A là 1 producer có nhiệm vụ produce 1 event tới các consumer để thông báo mỗi khi có User đặt hàng thành công, chúng ta cần tạo một file

OrderProducer.java:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    private static final String TOPIC = "order_topic";
    
    public void sendOrder(Order order) {
        kafkaTemplate.send(TOPIC, order);
    }
}

Và một service B là 1 consumer lắng nghe topic “order_topic” để consume tất cả các event mới. Chúng ta cần tạo 1 file

OrderConsumer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {
    
    @KafkaListener(topics = "order_topic", groupId = "order_group")
    public void consumeOrder(Order order) {
        System.out.println("Order received: " + order);
        // Xử lý đơn hàng: giảm số lượng hàng tồn kho
        reduceInventory(order);
        // Thông báo cho các dịch vụ khác của nhà sản xuất
        notifyManufacturerServices(order);
    }
    
    private void reduceInventory(Order order) {
        // Giảm số lượng hàng tồn kho
        System.out.println("Reducing inventory for product: " + order.getProduct());
    }
    
    private void notifyManufacturerServices(Order order) {
        // Thông báo cho các dịch vụ khác
        System.out.println("Notifying manufacturer services about the order: " + order);
    }
}

Bây giờ, mỗi khi có một User đặt hàng thành công thì service A - producer sẽ produce một event vào topic “order-topic” để cho tất cả các consumer đang listen topic này có thể consume event một cách nhanh chóng và hiệu quả nhất!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/orders")
public class OrderController {
    
    @Autowired
    private OrderProducer orderProducer;
    
    @PostMapping
    public String createOrder(@RequestBody Order order) {
        orderProducer.sendOrder(order);
        return "Order placed successfully!";
    }
}

Tổng Kết

Trong bài viết này, chúng ta đã tìm hiểu cách cài đặt Kafka và Schema Registry, sử dụng Avro để định nghĩa schema và tạo các class Java từ schema, và cách sử dụng Kafka với Spring Boot để mô phỏng quy trình đặt hàng online. Chúng ta đã tạo Kafka producer để gửi thông tin đơn hàng và Kafka consumer để nhận và xử lý đơn hàng, bao gồm việc giảm số lượng hàng tồn kho và thông báo cho các dịch vụ khác của nhà sản xuất.

Lưu ý: ĐÂY CHỈ LÀ VÍ DỤ MINH HOẠ CƠ BẢN CÁCH CÀI ĐẶT VÀ SỬ DỤNG KAFKA TRONG SPRING BOOT. TRONG THỰC TẾ CÁCH CONFIG KAFKA CLUSTER, CŨNG NHƯ VIỆC REGISTER SCHEMA, HANDLE ERROR VÀ XỬ LÝ EVENT CÓ THỂ PHỨC TẠP HƠN RẤT NHIỀU!

Hy vọng bài viết này đã giúp bạn hiểu rõ hơn các khái niệm và cách sử dụng cơ bản của Kafka với Spring Boot. Nếu bạn có bất kỳ câu hỏi hoặc gợi ý nào, xin hãy để lại bình luận bên dưới.

HAPPY CODING!

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy