集群的节点可以是多个 topic 的分区的 Leader 节点,而不是整个 topic 的 leader 节点。
注意:Kafka 暂时没有 Starter。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加 spring-kafka 会触发 SpringBoot 自动装配,在 Spring 应用上下文中注入 kafkaTemplate 实例。用户使用 kafkaTemplate 进行消息收发。
spring.kafka.bootstrap-servers:配置 kafka 服务
该配置接受一个列表,如:
spring:
kafka:
bootstrap-servers:
- kafka.testserver.com:9092
- kafka.testserver.com:9093
- kafka.testserver.com:9094
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
package test.messaging.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("orders.topic", order);
}
}
spring:
kafka:
emplate:
default-topic: tacocloud.orders.topic
然后可以在代码中使用 kafkaTemplate.sendDefault(order);
KafkaTemplate doesn’t offer any methods for receiving messages. That means the only way to consume messages from a Kafka topic using Spring is to write a message listener.
org.springframework.kafka.annotation.KafkaListener;
@KafkaListener 注解到方法上。
每当 topic 有新消息时,对应方法就会被调用一次。
package tacos.kitchen.messaging.kakfa.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
// 定义KafkaListener
@KafkaListener(topics="orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
参数除了 Order 对象外,也可以有 ConsumerRecord、Message。
It’s worth noting that the message payload is also available via ConsumerRecord .value() or Message.getPayload().
如:
@KafkaListener(topics="orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
// 或者
@KafkaListener(topics="orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}