在 Kafka 中,消费者通过以下步骤来消费数据:
subscribe
方法,消费者可以指定它希望从哪些主题中接收消息。
consumer.subscribe(Arrays.asList("topic1", "topic2"));
或者使用正则表达式来订阅匹配的主题:
consumer.subscribe(Pattern.compile("topic.*"));
poll
方法,该方法会从订阅的主题中拉取一批消息。消费者可以设置一个超时时间,指定在没有消息可用时等待的最大时间。
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
ConsumerRecords
对象中,其中包含了一组记录。消费者遍历这些记录,然后处理每条消息。
for (ConsumerRecord record : records) {
// 处理消息逻辑
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
}
在处理消息时,可以进行一系列的业务逻辑,例如数据转换、存储、分析等操作。
commitSync
或 commitAsync
方法手动提交偏移。
consumer.commitSync();
enable.auto.commit
配置为 true
,由 Kafka 自动周期性地提交偏移。
properties.put("enable.auto.commit", "true");
消费者通过这些步骤来获取和处理 Kafka 主题中的消息。注意,消费者在处理消息时要考虑异常处理和保证消息处理的幂等性,以确保系统的可靠性。此外,消费者还可以通过配置项来控制各种行为,例如偏移的提交方式、消息拉取的频率等。
Proudly powered by WordPress