这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!
基于Java的API
首先, 在了解生产者发送消息的原理之前,我们应该先学会如何去发送消息。
Kafka为我们提供了很多项目可以操作的API客户端,包括:
C/C++
GO
Python
...
通过官网查看API菜单,官方文档上也是Java的版本。我们根据提示一步步操作即可~
先新建maven项目,并且引入对应的****kafka-clients依赖
建议:Kafka-clients依赖版本,最好和安装的kafka版本一致
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
同步发送
Kafka生产者主要靠KafkaProducer来进行操作。点击到对应的文档页面,我们可以看到关于KafkaProducer<K,V> 的详细信息。
一个好的组件是非常贴心的, 甚至我们都不用去网上搜任何相关的资料,只需要通过查看对应的注释就可以知道这个东西该怎么用。
Properties config = new Properties();
// --bootstrap-server
config.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"master:9092,node01:9092,node02:9092"
);
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"key01",
"data from " + KafkaQuickProducer.class.getName()
);
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(
MessageFormat.format("{0} {1} {2} {3}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.timestamp()
)
);
} catch (Exception e) {
e.printStackTrace();
}
以上代码就是同步发送的过程,这已经是在开发过程中需要配置的最小单元,而其他关于生产者的配置,我们可以通过ProducerConfig来进行查看
** 与命令行上的参数,基本上是一模一样的**
而关于序列化器的问题,我们在下面原理的部分说明
异步发送
我们在调用同步send的时候,发现有两个参数的方法, 而这个方法实现的就是****异步发送
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
异步发送会将发送结果以事件驱动的形式传递,那么这里,我们就需要注意一点:
程序调用完成之后,不能让他立即执行,否则我们无法查看到具体的发送结果
接下来我们看具体的程序实现。理论上:我们只需要改最后发送的部分
Properties config = new Properties();
// --bootstrap-server
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"key01",
"data from " + KafkaQuickProducer.class.getName()
);
async(producer, record);
} catch (Exception e) {
e.printStackTrace();
}
// 异步发送
private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {
producer.send(record, (recordMetadata, exception) -> {
if (null != exception) {
exception.printStackTrace();
return;
}
System.out.println(
MessageFormat.format("{0} {1} {2} {3}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.timestamp()
)
);
});
try {
// 将程序进行阻塞,防止由于消息发送成功之后进程停止而无法接收到事件反馈
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
这属于整个生产者发送消息方式的最小单元,本文属于Producer入门阶段。
在ProducerConfig中还包含了非常多的配置项,更多的配置信息我们会在优化章节中说明。
原理
在第一部分,我们已经了解到,关于生产者最基本的使用方式,到这里,其实我想跟大家聊一聊:
生产者在发送消息的时候中间到底经历了什么?
大家应该已经看到上面的那张原理图,我们可以从中找出答案!
主线程
**这里我们分为两个线程块来说明, 第一部分是Main主线程, 也就是生产者在调用****send()**方法时所在的线程
在这里,我们可以看到:
外部数据首先被封装为ProducerRecord**,然后调用**send()**方法。
在send()过程中,经过拦截器、序列化器、分区器等处理之后进入到RecordAccumulator中。
接下来我们仔细聊一聊拦截器、序列化器、分区器的作用
拦截器
拦截器很类似于我们在SpringMVC中Interceptor的功能,而且在Producer中我们是可以自定义拦截器的。
我们可以在发送之前对数据进行拦截处理,比如说:统计生产者发送数据的总量等等。
当然目前来讲,我们如果不开发Kafka监控平台的话,这里拦截器的用处并不大。我们忽略不计即可
后续如果有机会的话,我们可以专门写篇文章,用来介绍如何开发一个拦截器
序列化器
而序列化器,主要对两个部分的数据进行处理:
Key
Value
byte[] serializedKey
= serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue
= valueSerializer.serialize(record.topic(), record.headers(), record.value());
从本质上来讲,外部数据属于属于对象,而对象不能直接通过网络进行传输。 所以我们就需要一个序列化器,将它转换成字节数组,进而进行传输
Kafka本身为我们提供了很多可用的序列化器,不过我们能用到最多的还是StringSerializer。
在生产端将消息进行序列话,那么在消费端必然会进行反序列化操作
分区器
我们知道Kafka是以Topic为消息发送的主体,不过由于Topic是一个虚拟的概念, 所以我们没有办法在实际中查看到关于Topic的相关信息。 但是前面我们也说过, 当前Topic下的消息数据都是通过Partition进行存储的。
发送出去的消息需要存储在哪个分区中就是通过分区器来进行指定的,在我们没有指定分区策略的情况下,生产者会通过默认的分区策略指定当前消息应该存储在哪个分区下
分区的内容还是比较多的,我们会在下一节做详细的说明
RecordAccumulator
此时,在主线程的区域中,当消息进入到默认大小为32m的记录缓冲区时, 本区的工作就到此结束。
缓冲区中有多个双端队列,分别对应Topic不同的分区。每一个分区就会创建一个双端队列。
此时的消息将会被按照批次的方式存放在队列中, 默认一批为16k大小。当缓冲区达到指定条件之后,****sender线程将会被唤醒,Sender程序将会冲队列中不断拉出消息进行下一步的发送
Sender线程
影响Sender线程唤醒的条件
想要唤醒Sender线程有两个因素,但不是说这两个条件都必须满足,他们是或的关系。
batch.size
是一个条件,这也是后期针对生产者优化的主要参数之一。当发送消息之后,生产者会将消息进行整合。将其按照一批一批的方式发送给Broker,从而减少网络间的传输请求次数。默认情况下为16k。
而如果一批数据的大小累计达到了设置的
batch.size
之后,sender才会做发送数据的操作这是第一个限制
下面再来介绍一个非常强势的参数:
liner.ms
。生产者优化的主要参数之二。这么说吧,如果你设置的liner.ms=0,表示不延迟直接发送。那么batch.size就不会生效了
而liner.ms=0属于默认配置
如果数据一直没有达到设置的
batch.size
大小,数据也不能不发对吧。所以Kafka也就为我们提供了这样的参数:当sender等待liner.ms设置的时间之后【单位ms】,不管数据如何都会将消息进行发送
如未设置当前参数,表示没有延迟,直接发送
下面举个小例子
config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");
开始发送
将
RecordAccumulator
内存储的数据拉取出来之后,开始将其创建为一个个的Request请求。这里需要注意的是:NetworkClient并非一股脑的将全部可发送数据进行传输请求
正相反,为了能够保证不同分区所对应DQueue的数据进入到对应的Broker所在的分区内,Kafka将按照<BrokerId, Request>的形式对请求进行传输。如果传输到达Broker之后没有acks应答,那么当前节点下最多能够保存5个未响应的请求。
ACKS
这里简单聊一下它的应答方式。在
ProducerConfig.ACKS_DOC
下我们也可以看到相关的说明:acks=0: 生产者不会等待Broker的应答,直接表示消息已经发送成功。而消息有没有真正达到Broker,不关心。
当然了,这种方式在性能上来讲是最好的,适合一些数据不重要的场景
acks=1: 生产者将消息发送到Broker之后,由Leader在本地将消息进行存储之后,返回发送成功的应答。
如果Follower还没有同步到消息,Leader就已经挂了。那么此时就会出现消息丢失的情况
acks=all:生产者将消息发送到Broker之后,由Leader在本地将消息进行存储,并且Follower同步完消息之后才会返回发送成功的应答。
这种方式是最能保证数据安全的情况,但是性能也是最低的~
最后:
当Broker返回成功应答之后,RecordAccumulator中的数据将会被清理
如果失败,可以尝试重试等操作