“Kafka 的 Java 客户端-生产者的实现”的版本间的差异
跳到导航
跳到搜索
Jihongchang(讨论 | 贡献) |
Jihongchang(讨论 | 贡献) |
||
(未显示同一用户的3个中间版本) | |||
第99行: | 第99行: | ||
- ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的 | - ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的 | ||
− | - ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。 | + | - ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。[[文件:生产者中的 ack 的配置.png|无|缩略图|900x900像素]] |
+ | |||
+ | |||
下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置<syntaxhighlight lang="java"> | 下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置<syntaxhighlight lang="java"> | ||
第111行: | 第113行: | ||
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); | props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); | ||
</syntaxhighlight> | </syntaxhighlight> | ||
− | [[文件: | + | |
+ | === 5.关于消息发送的缓冲区 === | ||
+ | [[文件:消息发送的缓冲区.png|无|缩略图|900x900像素]] | ||
+ | |||
+ | * Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB<syntaxhighlight lang="java"> | ||
+ | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); | ||
+ | </syntaxhighlight> | ||
+ | * Kafka 本地线程回去缓冲区中一次拉16KB的数据,发送到 broker<syntaxhighlight lang="java"> | ||
+ | props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); | ||
+ | </syntaxhighlight> | ||
+ | * 如果线程拉不到16KB的数据,间隔10ms也会将已拉到的数据发到 broker<syntaxhighlight lang="java"> | ||
+ | props.put(ProducerConfig.LINGER_MS_CONFIG, 10); | ||
+ | </syntaxhighlight> | ||
+ | |||
+ | === 6.其他参数配置参考 === | ||
+ | https://kafka.apache.org/documentation/#producerapi |
2022年8月26日 (五) 11:53的最新版本
1.生产者的基本实现
- 引入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
- 具体实现
package org.example.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.200:9092,192.168.137.200:9093,192.168.137.200:9094");
//把发送的 key 从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息 value 从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2.创建生产消息的客户端,传入参数
Producer<String, String> producer = new KafkaProducer<>(props);
//3.创建消息
//key:作用是决定往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "helloKafka");
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition()
+ "|offset-" + metadata.offset());
}
}
2.生产者的同步发送消息
如果生产者发送消息没有收到 ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition()
+ "|offset-" + metadata.offset());
3.生产者的异步发送消息
异步发送,生产者发送完消息后就可以执行之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法。
//5.异步发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.out.println("发送消息失败:" + e.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
Thread.sleep(1000000L);
4.生产者中的 ack 的配置
在同步发送的前提下,生产者在获得集群返回的 ack 之前会一直阻塞。那么集群什么时候返回 ack 呢?此时 ack 有3个配置:
- ack = 0 kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息的,效率是最高的
- ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的
- ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。
下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接受者那边做好
* 消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
5.关于消息发送的缓冲区
- Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- Kafka 本地线程回去缓冲区中一次拉16KB的数据,发送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果线程拉不到16KB的数据,间隔10ms也会将已拉到的数据发到 broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);