“Kafka 的 Java 客户端-生产者的实现”的版本间的差异

来自姬鸿昌的知识库
跳到导航 跳到搜索
 
(未显示同一用户的3个中间版本)
第99行: 第99行:
 
- ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的
 
- ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的
  
- ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。
+
- ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。[[文件:生产者中的 ack 的配置.png|无|缩略图|900x900像素]]
 +
 
 +
 
  
 
下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置<syntaxhighlight lang="java">
 
下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置<syntaxhighlight lang="java">
第111行: 第113行:
 
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
 
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
 
</syntaxhighlight>
 
</syntaxhighlight>
[[文件:生产者中的 ack 的配置.png|无|缩略图|900x900像素]]
+
 
 +
=== 5.关于消息发送的缓冲区 ===
 +
[[文件:消息发送的缓冲区.png|无|缩略图|900x900像素]]
 +
 
 +
* Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB<syntaxhighlight lang="java">
 +
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 +
</syntaxhighlight>
 +
* Kafka 本地线程回去缓冲区中一次拉16KB的数据,发送到 broker<syntaxhighlight lang="java">
 +
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 +
</syntaxhighlight>
 +
* 如果线程拉不到16KB的数据,间隔10ms也会将已拉到的数据发到 broker<syntaxhighlight lang="java">
 +
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
 +
</syntaxhighlight>
 +
 
 +
=== 6.其他参数配置参考 ===
 +
https://kafka.apache.org/documentation/#producerapi

2022年8月26日 (五) 11:53的最新版本

1.生产者的基本实现

  • 引入依赖
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
    </dependencies>
  • 具体实现
package org.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MySimpleProducer {

    private final static String TOPIC_NAME = "my-replicated-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.设置参数
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.200:9092,192.168.137.200:9093,192.168.137.200:9094");

        //把发送的 key 从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //把发送消息 value 从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2.创建生产消息的客户端,传入参数
        Producer<String, String> producer = new KafkaProducer<>(props);

        //3.创建消息
        //key:作用是决定往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "helloKafka");

        //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition()
                + "|offset-" + metadata.offset());
    }

}


2.生产者的同步发送消息

生产者的同步发送消息.png

如果生产者发送消息没有收到 ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。

        //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition()
                + "|offset-" + metadata.offset());


3.生产者的异步发送消息

生产者的异步发送消息.png

异步发送,生产者发送完消息后就可以执行之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法。

        //5.异步发送消息
        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    System.out.println("发送消息失败:" + e.getStackTrace());
                }
                if (metadata != null) {
                    System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                            + metadata.partition() + "|offset-" + metadata.offset());
                }
            }
        });

        Thread.sleep(1000000L);


4.生产者中的 ack 的配置

在同步发送的前提下,生产者在获得集群返回的 ack 之前会一直阻塞。那么集群什么时候返回 ack 呢?此时 ack 有3个配置:

- ack = 0 kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息的,效率是最高的

- ack = 1(默认):多副本之间的 leader 已经收到消息,并把消息写入到本地的 log 中,才会返回 ack 给生产者,性能和安全性是最均衡的

- ack = -1/all。Broker 里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader 和一个 follower 同步完成后,才会返回 ack 给生产者(此时集群中有2个 broker 已完成数据的接收),这种方式最安全,但性能最差。

生产者中的 ack 的配置.png


下面是关于 ack 和重试(如果没有收到 ack,就开启重试)的配置

props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
 * 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接受者那边做好
 * 消息接收的幂等性处理
 */
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

5.关于消息发送的缓冲区

消息发送的缓冲区.png
  • Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    
  • Kafka 本地线程回去缓冲区中一次拉16KB的数据,发送到 broker
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    
  • 如果线程拉不到16KB的数据,间隔10ms也会将已拉到的数据发到 broker
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    

6.其他参数配置参考

https://kafka.apache.org/documentation/#producerapi