1.kafka 生产者简介
1.1 生产者传输实体
kafka Producer 发送的数据对象叫做 ProducerRecord,它有四个关键参数:
- Topic – 主题
- Partition – 分区(非必填)
- Key – 键(非必填)
- Value – 值
1.2 生产者发送流程
kafka 生产者发送消息流程:
- 序列化 – 发送前,生产者要先把键和值序列化。
- 分区 – 数据被传给分区器。如果在
ProducerRecord中已经指定了分区(Partition),那么分区器什么也不会做;否则,分区器会根据ProducerRecord的键(Key)来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。 - 批次传输 – 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
- 批次,就是一组消息,这些消息属于同一个主题和分区。
- 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。
- 响应 – 服务器收到消息会返回一个响应。
- 如果成功,则返回一个
RecordMetaData对象,它包含了主题、分区、偏移量; - 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。
- 如果成功,则返回一个

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
- 生产者会向任意 broker 发送一个元数据请求(
MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。 - 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

2.生产者 API
2.1.创建生产者
Kafka 生产者核心配置:
bootstrap.servers– 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到bootstrap.servers中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为bootstrap.servers指定所有的 Broker。
2.2.异步发送
直接发送消息,不关心消息是否到达。
这种方式吞吐量最高,但有小概率会丢失消息。
2.3.同步发送
2.4.异步响应发送
2.5.关闭连接
3.生产者的连接
Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。
3.1. 何时创建 TCP 连接
Kafka 生产者创建连接有三个时机:
(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二:Producer 通过
metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。
3.2.何时关闭 TCP 连接
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
主动关闭是指调用 producer.close() 方法来关闭生产者连接;甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。
如果设置 Producer 端 connections.max.idle.ms 参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms 指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1,TCP 连接将成为永久长连接。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
4.序列化
Go 本身不内置 kakfa 的序列化器,只能通过一些第三方库,比如 IBM/sarama、kafka-go 等,它们均提供了常见类型的序列化方式。
- sarama:依赖于用户提供的序列化方法。通常,用户需要自己处理消息的序列化和反序列化。常见的做法是将消息数据编码为 JSON、Protobuf 或 Avro 格式(常用的消息格式)。在发送消息时,用户需要将消息对象转为字节流(byte slice),然后通过
sarama.ProducerMessage发送到 Kafka。 - kafka-go:
kafka-go支持StringEncoder和BytesEncoder,并且可以轻松扩展以支持其他格式(如 JSON 或 Protobuf)
5.分区
5.1.什么是分区
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。
5.2.为什么要分区
为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。
5.3.分区策略
所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。
前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。
如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。
- 没有 key 时的分发逻辑:每隔
topic.metadata.refresh.interval.ms的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。 - 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错。
5.4.自定义分区策略
如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?在 Java 中,可以按如下方式实现:
首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partition 和 close,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的 topic、key、keyBytes、value和 valueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。
负载均衡算法常见的有:
- 随机算法
- 轮询算法
- 最小活跃数算法
- 源地址哈希算法
可以根据实际需要去实现