查看“Kafka 的 Java 客户端-生产者的实现”的源代码
←
Kafka 的 Java 客户端-生产者的实现
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看和复制此页面的源代码。
=== 1.生产者的基本实现 === * 引入依赖 <syntaxhighlight lang="xml"> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> </dependencies> </syntaxhighlight> * 具体实现 <syntaxhighlight lang="java"> package org.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.200:9092,192.168.137.200:9093,192.168.137.200:9094"); //把发送的 key 从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息 value 从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2.创建生产消息的客户端,传入参数 Producer<String, String> producer = new KafkaProducer<>(props); //3.创建消息 //key:作用是决定往哪个分区上发,value:具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "helloKafka"); //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } </syntaxhighlight> === 2.生产者的同步发送消息 === [[文件:生产者的同步发送消息.png|无|缩略图|900x900像素]]如果生产者发送消息没有收到 ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。<syntaxhighlight lang="java"> //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); </syntaxhighlight> === 3.生产者的异步发送消息 === [[文件:生产者的异步发送消息.png|无|缩略图|900x900像素]] 异步发送,生产者发送完消息后就可以执行之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法。<syntaxhighlight lang="java"> //5.异步发送消息 producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println("发送消息失败:" + e.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } } }); Thread.sleep(1000000L); </syntaxhighlight>
返回至
Kafka 的 Java 客户端-生产者的实现
。
导航菜单
个人工具
登录
名字空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
Spring Boot 2 零基础入门
Spring Cloud
Spring Boot
设计模式之禅
VUE
Vuex
Maven
算法
技能树
Wireshark
IntelliJ IDEA
ElasticSearch
VirtualBox
软考
正则表达式
程序员精讲
软件设计师精讲
初级程序员 历年真题
C
SQL
Java
FFmpeg
Redis
Kafka
MySQL
Spring
Docker
JMeter
Apache
Linux
Windows
Git
ZooKeeper
设计模式
Python
MyBatis
软件
数学
PHP
IntelliJ IDEA
CS基础知识
网络
项目
未分类
MediaWiki
镜像
问题
健身
国债
英语
烹饪
常见术语
MediaWiki帮助
工具
链入页面
相关更改
特殊页面
页面信息