8 Spring异步通信-Kakfa
2024-04-09 21:10:36  阅读数 163

前言:为什么选择Kafka?

Kafka 相对于其它消息系统的优点

  • 支持集群模型,扩展性好。scalability
  • 将 Topic 分区到所有实例上,容错性好。resilient

Kafka 的特点

  • 使用 topic 发布/订阅 消息。
  • Topic 支持副本,分布在不同机器上。
  • 集群的节点可以是多个 topic 的分区的 Leader 节点,而不是整个 topic 的 leader 节点。


    image.png

1 环境配置

1.1 添加 Kafka 依赖

注意:Kafka 暂时没有 Starter。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

1.2 SpringBoot 对 Kakfa 的自动操作

添加 spring-kafka 会触发 SpringBoot 自动装配,在 Spring 应用上下文中注入 kafkaTemplate 实例。用户使用 kafkaTemplate 进行消息收发。

1.3 SpringBoot Kafka 服务相关配置

spring.kafka.bootstrap-servers:配置 kafka 服务
该配置接受一个列表,如:

spring:
  kafka:
    bootstrap-servers:
    - kafka.testserver.com:9092
    - kafka.testserver.com:9093
    - kafka.testserver.com:9094

2 使用 KafkaTemplate 发送消息

2.1 KafkaTemplate 发送消息的 API

  • 不需要 convertAndSend(),因为 Kafka Template 可以直接处理域对象。
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

2.2 KafkaTemplate send 方法支持的参数

  • The topic to send the message to (required for send())
  • A partition to write the topic to (optional)
  • A key to send on the record (optional)
  • A timestamp (optional; defaults to System.currentTimeMillis())
  • The payload (required)

2.3 KafkaTemplate 发送消息示例

package test.messaging.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
    private KafkaTemplate<String, Order> kafkaTemplate;
    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("orders.topic", order);
    }
}

2.4 如何配置 KafkaTemplate 的默认 topic

spring:
  kafka:
    emplate:
      default-topic: tacocloud.orders.topic

然后可以在代码中使用 kafkaTemplate.sendDefault(order);

3 使用Kafka接收消息

3.1 为什么要编写 Kafka Listener

KafkaTemplate doesn’t offer any methods for receiving messages. That means the only way to consume messages from a Kafka topic using Spring is to write a message listener.

3.2 Kafka Listener 的标识注解

org.springframework.kafka.annotation.KafkaListener;
@KafkaListener 注解到方法上。
每当 topic 有新消息时,对应方法就会被调用一次。

3.3 Kafka Listener 的示例

package tacos.kitchen.messaging.kakfa.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class OrderListener {
    private KitchenUI ui;
    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }
    // 定义KafkaListener
    @KafkaListener(topics="orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}

3.4 获取 Kafka 消息的元数据信息

参数除了 Order 对象外,也可以有 ConsumerRecord、Message。
It’s worth noting that the message payload is also available via ConsumerRecord .value() or Message.getPayload().
如:

@KafkaListener(topics="orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
  log.info("Received from partition {} with timestamp {}",
      record.partition(), record.timestamp());
  ui.displayOrder(order);
}
// 或者
@KafkaListener(topics="orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
      headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
      headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}