查看“Java 客户端消费者的实现细节”的源代码
←
Java 客户端消费者的实现细节
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看和复制此页面的源代码。
https://www.bilibili.com/video/BV1Xy4y1G7zA?p=18 === 1.消费者的基本实现 === <syntaxhighlight lang="java"> package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MySimpleConsumer { private final static String TOPIC_NAME = "my-replicated-topic"; private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.200:9092,192.168.137.200:9093,192.168.137" + ".200:9094"); // 消费分组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //创建一个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * poll() API 是拉取消息的长轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record:records) { System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } }// end while }// end main }//end class </syntaxhighlight> === 2.关于消费者自动提交和手动提交 offset === https://www.bilibili.com/video/BV1Xy4y1G7zA?p=19 ==== 1)提交的内容 ==== 消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的 _consumer_offsets 主题里面。 ==== 2)自动提交 ==== 消费者 poll 消息下来以后就会自动提交 offset<syntaxhighlight lang="java"> //是否自动提交 offset,默认就是 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //自动提交 offset 的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); </syntaxhighlight>注意:自动提交会丢消息。因为消费者在消费前提交 offset,有可能提交完成后还没消费时消费者挂了。 ==== 3)手动提交 ==== 需要把自动提交的配置改成 false<syntaxhighlight lang="java"> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); </syntaxhighlight>手动提交又分成了两种: * 手动同步提交 在消费完消息后调用同步提交的方法,当集群返回 ack 前一直阻塞,返回 ack 后表示提交成功,执行之后的逻辑<syntaxhighlight lang="java"> while (true) { /* poll() API 是拉取消息的长轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 手动同步提交 offset,当前线程会阻塞直到 offset 提交成功 // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 consumer.commitSync(); //==========阻塞==== 提交成功 } }// end while </syntaxhighlight> * 手动异步提交 在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用<syntaxhighlight lang="java"> while (true) { /* poll() API 是拉取消息的长轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if (e != null) { System.err.println("Commit failed for " + map); System.err.println("Commit failed exception:" + e.getStackTrace()); } } }); } }// end while </syntaxhighlight> 3.长轮询 poll 消息 * 默认情况下 ,消费者一次会 poll 500条消息。 <syntaxhighlight lang="java"> //一次 poll 最大拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); </syntaxhighlight> * 代码中设置了长轮询的时间是 1000 毫秒<syntaxhighlight lang="java"> while (true) { /* poll() API 是拉取消息的长轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } </syntaxhighlight>意味着: * 如果一次 poll 到500条,就直接执行 for 循环 * 如果这一次没有 poll 到500条。且时间在1秒内,那么长轮询继续 poll,要么到500条,要么到1s * 如果多次 poll 都没达到500条,且1秒时间到了,那么直接执行 for 循环
返回至
Java 客户端消费者的实现细节
。
导航菜单
个人工具
登录
名字空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
Spring Boot 2 零基础入门
Spring Cloud
Spring Boot
设计模式之禅
VUE
Vuex
Maven
算法
技能树
Wireshark
IntelliJ IDEA
ElasticSearch
VirtualBox
软考
正则表达式
程序员精讲
软件设计师精讲
初级程序员 历年真题
C
SQL
Java
FFmpeg
Redis
Kafka
MySQL
Spring
Docker
JMeter
Apache
Linux
Windows
Git
ZooKeeper
设计模式
Python
MyBatis
软件
数学
PHP
IntelliJ IDEA
CS基础知识
网络
项目
未分类
MediaWiki
镜像
问题
健身
国债
英语
烹饪
常见术语
MediaWiki帮助
工具
链入页面
相关更改
特殊页面
页面信息