博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka Producer相关代码分析【转】
阅读量:4511 次
发布时间:2019-06-08

本文共 2095 字,大约阅读时间需要 6 分钟。

来源:

@jewes 2015-01-17 20:36 字数 1967 阅读 1093

Kafka Producer相关代码分析

kafka


Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker)。本文将分析Producer相关的代码实现。

 

类kafka.producer.Producer

如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息。(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看)。这个类提供了同步和异步两种方式来发送消息。

异步发送消息是基于同步发送消息的接口来实现的。异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。

Producer发送同步消息是委托给EventHandler做的,EventHandler是个接口,具体实现为DefaultEventHandler。它们的简化类图如下: 

Producer类图

可以看到,Producer类中的成员producerSendThread和queue是为了发送异步消息的,eventHandler是为了发送同步消息的,当然异步消息也需要它。KeyedMessage是封装了用户发送的消息。Seq是Scala中的序列,可以看成是Java中的List。

KeyedMessage的简化类图如下: 

KeyMessage类图

 

类DefaultEventHandler

DefaultEventHandler是接口EventHandler唯一的实现。从上一节可以看到Producer发送给EventHandler的消息格式为KeyedMessage。我们来看看在将KeyedMessage发送给Broker之前需要做哪些工作。

 

序列化

KeyedMessage中的KV是由用户指定的自定义类型,而在发送给broker的时候是以二进制流来发送,因此还需要将用户自定类型的数据转换为二进制流。在初始化Producer的时候需要配置serializer.class,它就是用来处理这个事情的。此外,还要把多条Message组合成MessageSet并按照用户指定的压缩方式进行压缩。

 

找到对应的broker

KeyedMessage中指定了该Message的topic,而一个topic可以有多个partition,每个partition有多个replica,由多个Broker来管理,这些Broker中有一个leader。只有leader broker能够响应客户端的读写请求。

由此可见,在将KeyedMessage发送给broker之前,必须找到该条Message对应的leader broker,具体步骤为: 

1. 找出该topic的所有partition, 
2. 找出该KeyedMessage应该发送到的那个partition,在初始化Producer的时候配置partitioner.class就是用来对Message进行分区的 
3. 找出对应partition所在的leader broker。

最后,DefaultEventHandler将序列化后的Message封装成ProducerRequest,它自身并没有将ProducerRequest发送给broker的逻辑,而是将其交给SyncProducer来继续后面发送的流程。

 

ProducerPool, SyncProducer和BlockingChannel

它们在一起是完成最后的数据发送任务。先来看它们的类图: 

ProducerPool等类图 
ProducerPool中有一个HashMap,其key为brokerid,value为连接到这个broker的SyncProducer。因此ProducerPool的更准确名字应该为SyncProducerPool。

BlockingChannel可以看成是一个Socket客户端,它有两个成员变量分别是机器名和端口号。它的connect方法会打开到对应机器的socket。它的send方法可以发送RequestOrResponse,它是真正发送数据的地方。

SyncProducer提供了两个send方法,分别用来发送ProducerRequest和TopicMetadataRequest。它内部是调用了blockingChannel来发送数据的。

 

小结

Producer发送数据的简化序列图如下:序列图

从图中可见,各个类的职责明确,BlockingChannel负责最底层的数据发送,SyncProducer负责将Request发送到一个指定的Broker那里,DefaultEventHandler负责数据转换和选择正确的Broker,直接给客户端使用的Producer则在此基础上提供了同步和异步两种发送方式。

转载于:https://www.cnblogs.com/the-tops/p/5786726.html

你可能感兴趣的文章
常用的自动化测试框架及测试框架的发展(Alpha)
查看>>
C#调用MySQL数据库(使用MySql.Data.dll连接)mysql-connector-net-6.10.4.msi
查看>>
Python: PS 滤镜--高反差保留 (High pass)
查看>>
matlab 高阶(二) —— 数值、溢出问题的解决
查看>>
64 位系统 vs2013 配置 OpenCV-3.1.0
查看>>
古之人不余欺也
查看>>
西藏印象:夜色篇
查看>>
从二叉树到完全二叉树
查看>>
排序算法之插入
查看>>
ubuntu下安装 nginx + php + memcached + mariadb
查看>>
Flink job submit & kafka sasl
查看>>
Vue项目的性能优化之路
查看>>
php在linux后台执行
查看>>
【bzoj4176】Lucas的数论 莫比乌斯反演+杜教筛
查看>>
php 生产A-AZ,或者A-CZ等方法,此方法是用着生产excle不定列
查看>>
工作记录01/11/11
查看>>
Operating System Concepts with java 项目: Shell Unix 和历史特点
查看>>
Zabbix监控mysql
查看>>
NFS的安装
查看>>
为tomcat 安装 native 和配置apr
查看>>