查看“Kafka 的 Java 客户端-生产者的实现”的源代码
←
Kafka 的 Java 客户端-生产者的实现
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看和复制此页面的源代码。
=== 1.生产者的基本实现 === * 引入依赖 <syntaxhighlight lang="xml"> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> </dependencies> </syntaxhighlight> * 具体实现 <syntaxhighlight lang="java"> package org.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.200:9092,192.168.137.200:9093,192.168.137.200:9094"); //把发送的 key 从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息 value 从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2.创建生产消息的客户端,传入参数 Producer<String, String> producer = new KafkaProducer<>(props); //3.创建消息 //key:作用是决定往哪个分区上发,value:具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "helloKafka"); //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } </syntaxhighlight> === 2.生产者的同步发送消息 === [[文件:生产者的同步发送消息.png|无|缩略图|900x900像素]]如果生产者发送消息没有收到 ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。<syntaxhighlight lang="java"> //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); </syntaxhighlight> === 3.生产者的异步发送消息 === [[文件:生产者的异步发送消息.png|无|缩略图|900x900像素]] 异步发送,生产者发送完消息后就可以执行之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法。<syntaxhighlight lang="java"> //5.异步发送消息 producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println("发送消息失败:" + e.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } }); Thread.sleep(1000000L); </syntaxhighlight> === 4.生产者中的 ack 的配置 === 在同步发送的前提下,生产者在获得集群返回的 ack 之前会一直阻塞。那么集群什么时候返回 ack 呢?此时 ack 有3个配置: - ack = 0 kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息的,效率是最高的 - ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的 - ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。[[文件:生产者中的 ack 的配置.png|无|缩略图|900x900像素]] 下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置<syntaxhighlight lang="java"> props.put(ProducerConfig.ACKS_CONFIG, "1"); /* * 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接受者那边做好 * 消息接收的幂等性处理 */ props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); </syntaxhighlight>
返回至
Kafka 的 Java 客户端-生产者的实现
。
导航菜单
个人工具
登录
名字空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
Spring Boot 2 零基础入门
Spring Cloud
Spring Boot
设计模式之禅
VUE
Vuex
Maven
算法
技能树
Wireshark
IntelliJ IDEA
ElasticSearch
VirtualBox
软考
正则表达式
程序员精讲
软件设计师精讲
初级程序员 历年真题
C
SQL
Java
FFmpeg
Redis
Kafka
MySQL
Spring
Docker
JMeter
Apache
Linux
Windows
Git
ZooKeeper
设计模式
Python
MyBatis
软件
数学
PHP
IntelliJ IDEA
CS基础知识
网络
项目
未分类
MediaWiki
镜像
问题
健身
国债
英语
烹饪
常见术语
MediaWiki帮助
工具
链入页面
相关更改
特殊页面
页面信息