Kafka 消费者 API 是 Kafka 提供给开发者的一组接口和工具,用于构建应用程序以消费 Kafka 主题中的消息。消费者 API 具有以下主要作用:
consumer.subscribe(Arrays.asList("topic1", "topic2"));
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
ConsumerRecords
对象中,开发者可以遍历这些消息并执行相应的业务逻辑。
for (ConsumerRecord record : records) {
// 处理消息逻辑
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
}
consumer.commitSync();
properties.put("enable.auto.commit", "true");
// 创建多个消费者实例
Consumer consumer1 = new KafkaConsumer<>(properties);
Consumer consumer2 = new KafkaConsumer<>(properties);
// 在独立的线程中运行每个消费者实例
new Thread(() -> {
while (true) {
ConsumerRecords records = consumer1.poll(Duration.ofMillis(100));
// 处理消息逻辑
}
}).start();
new Thread(() -> {
while (true) {
ConsumerRecords records = consumer2.poll(Duration.ofMillis(100));
// 处理消息逻辑
}
}).start();
总体而言,Kafka 消费者 API 提供了灵活且功能丰富的工具,使得开发者能够轻松地构建消费 Kafka 主题的应用程序,处理实时流数据。
Proudly powered by WordPress