查看“Spring Boot 中使用 Kafka”的源代码
←
Spring Boot 中使用 Kafka
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看和复制此页面的源代码。
https://www.bilibili.com/video/BV1Xy4y1G7zA?p=23 === 1.引入依赖 === <syntaxhighlight lang="xml"> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </syntaxhighlight> === 2.编写配置文件 === <syntaxhighlight lang="yaml"> server: port: 8080 spring: kafka: bootstrap-servers: - 192.168.137.200:9092 - 192.168.137.200:9093 - 192.168.137.200:9094 producer: # 生产者 retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息 key 和 消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每一批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每一批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于 TIME 时提交 # TIME # 当每一批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,被处理 record 数量大于等于 COUNT 时提交 # COUNT # TIME | COUNT 有一个条件满足时提交 # COUNT_TIME # 当每一批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,手动调用 Acknowledgment.acknowledge() 后提交 # MANUAL # 手动调用 Acknowledgment.acknowledge() 后立即提交,一般使用这种 ack-mode: MANUAL_IMMEDIATE redis: host: 127.0.0.1 </syntaxhighlight> === 3.编写消息生产者 === <syntaxhighlight lang="java"> package io.github.jihch.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/msg") public class MyKafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public String sendMessage() { kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message"); return "send success!"; } } </syntaxhighlight> === 4.编写消费者 === <syntaxhighlight lang="java"> package io.github.jihch.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class MyConsumer { @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交 offset ack.acknowledge(); } } </syntaxhighlight> === 5.消费者中配置消费主题、分区和偏移量 === https://www.bilibili.com/video/BV1Xy4y1G7zA?p=24<syntaxhighlight lang="java"> @KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }, concurrency = "3") //concurrency 就是同组下的消费者个数,就是并发消费数,建议小于等于分区数 public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交 offset ack.acknowledge(); } </syntaxhighlight>
返回至
Spring Boot 中使用 Kafka
。
导航菜单
个人工具
登录
名字空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
Spring Boot 2 零基础入门
Spring Cloud
Spring Boot
设计模式之禅
VUE
Vuex
Maven
算法
技能树
Wireshark
IntelliJ IDEA
ElasticSearch
VirtualBox
软考
正则表达式
程序员精讲
软件设计师精讲
初级程序员 历年真题
C
SQL
Java
FFmpeg
Redis
Kafka
MySQL
Spring
Docker
JMeter
Apache
Linux
Windows
Git
ZooKeeper
设计模式
Python
MyBatis
软件
数学
PHP
IntelliJ IDEA
CS基础知识
网络
项目
未分类
MediaWiki
镜像
问题
健身
国债
英语
烹饪
常见术语
MediaWiki帮助
工具
链入页面
相关更改
特殊页面
页面信息