数据与智能 出版了畅销书「构建企业级推荐系统:算法、工程实现与案例分析」。每周输出7篇推荐系统、数据分析、大数据、AI原创文章。「数据与智能」(同名视频号、知乎、头条、B站、快手、抖音、小红书等自媒体平台号) 社区,聚焦数据、智能领域的知识分享与传播。
作者 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
编辑 | auroral-L
第二章 Kafka 元数据以及消息封装
1. 元数据
1.1 Node
1.2 TopicPartition
1.3 PartitionInfo
1.4 Cluster
1.5 MetaData
2. 消息封装(RecordAccumulator 对象)
2.1 MemoryRecords
2.2 RecordBatch
1. 元数据
1.1 Node
* Information about a Kafka node
* 代表的就是kafka的一个节点信息
*/public class Node {
private static final Node NO_NODE = new Node(-1, "", -1);
//id 编号,这个编号是我们配置参数的时候指定的broker id。
private final int id;
private final String idString;
private final String host;
private final int port;
private final String rack;
1.2 TopicPartition
package org.apache.kafka.common;
import java.io.Serializable;
* A topic name and partition number
* topic名称和分区编号
*/public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
1.3 PartitionInfo
package org.apache.kafka.common;
* Information about a topic-partition.
*/public class PartitionInfo {
private final String topic;
private final int partition;
//leader partition 在哪台服务器上面
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
* The topic name
*/ public String topic() {
return topic;
* The partition id
*/ public int partition() {
return partition;
* The node id of the node currently acting as a leader for this partition or null if there is no leader
*/ public Node leader() {
return leader;
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/ public Node[] replicas() {
return replicas;
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/ public Node[] inSyncReplicas() {
return inSyncReplicas;
1.4 Cluster
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
//内部主题,主要真的Kafka Stream应用程序执行时的内部主题
private final Set<String> internalTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
* 新的构造方法增加了internalTopics,旧的构造方法已经废弃掉了
* Create a new cluster with the given id, nodes and partitions
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/ public Cluster(String clusterId,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics);
1.5 MetaData
* A class encapsulating some of the logic around metadata.
* <p>
* This class is shared by the client thread (for partitioning) and the background sender thread.
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
* <p>
* If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
* is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
*/public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
//如果正常情况下,如果每次都是更新成功的,那么lastRefreshMs和lastSuccessfulRefreshMs 应该是相同的。
private long lastSuccessfulRefreshMs;
private Cluster cluster;
private boolean needUpdate;
/* Topics with expiry time */ //记录了当前已有的topics
private final Map<String, Long> topics;
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean topicExpiryEnabled;
* Create a metadata instance with reasonable defaults
*/ public Metadata() {
this(100L, 60 * 60 * 1000L);
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashMap<>();
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
this.needMetadataForAllTopics = false;
* Get the current cluster info without blocking
*/ public synchronized Cluster fetch() {
return this.cluster;
〇 ProducerInterceptor对消息进行拦截
〇 Serializer对消息的key和value进行序列化
〇 Partitioner对消息进行分发到对应的Partition
〇 RecordAccumulator负责收集消息并封装成批次,通过批量发送消息
〇 Sender为实际发送的线程,负责从RecordAccumulator获取消息
〇 构建ClientRequest请求
〇 将ClientRequest请求发送给NetworkClient组件
〇 NetworkClient组件将ClientRequest请求放入KafkaChannel缓存区
〇 执行网络I/O,发送请求
〇 收到响应,调用ClientRequest的回调函数onComplete
2. 消息封装(RecordAccumulator对象)
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/public final class RecordAccumulator {
private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final BufferPool free;
private final Time time;
//一个消息批次包括TopicPartition 分区 和 Deque<RecordBatch> 队列
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
Kafka有两种发送消息的方式,分别是同步发送和异步发送,基本使用到的都是异步发送,主线程通过调用KafkaProducer.send()方法将消息缓存在RecordAccumulator,到达一定条件之后就会唤醒Sender线程发送消息。RecordAccumulator包含了这样的一种数据结构ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;以TopicPartition作为key,Deque作为value,Deque中存放了RecordBatch,正如我们上图所示。每一个RecordBatch就是一个MemoryRecords对象,可以了解一下MemoryRecords的数据结构。
2.1 MemoryRecords
public class MemoryRecords implements Records {
private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
// 压缩消息数据,并写入到buffer
private final Compressor compressor;
// 表示buffer中最多可以写入的字节数据大小
private final int writeLimit;
// buffer初始化大小
private final int initialCapacity;
// 用于保存消息数据的缓冲区
private ByteBuffer buffer;
// 用于表示MemoryRecords的读写模型,默认是只读
private boolean writable;
〇 buffer:用于保存消息数据的Java NIO ByteBuffer。
〇 writeLimit:记录buffer字段最多可以写入多少个字节的数据。
〇 compressor:压缩器,对消息数据进行压缩,将压缩后的数据输出到buffer。
〇 writable:此MemoryRecords对象是只读的模式,还是可写模式。在MemoryRecords发送前时,会将其设置成只读模式。
// the following two functions also need to be public since they are used in MemoryRecords.iteration
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
try {
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
case LZ4:
try {
OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
throw new IllegalArgumentException("Unknown compression type: " + type);
} catch (IOException e) {
throw new KafkaException(e);
public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
try {
switch (type) {
case NONE:
return new DataInputStream(buffer);
case GZIP:
return new DataInputStream(new GZIPInputStream(buffer));
case SNAPPY:
try {
InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
case LZ4:
try {
InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
messageVersion == Record.MAGIC_VALUE_V0);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
throw new IllegalArgumentException("Unknown compression type: " + type);
} catch (IOException e) {
throw new KafkaException(e);
2.2 RecordBatch
* A batch of records that is or will be sent.
* 最终被发送出去的消息对象
* 这个类不是线程安全的,在修改的时候需要使用同步锁
*/public final class RecordBatch {
private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
public int recordCount = 0;
public int maxRecordSize = 0;
public volatile int attempts = 0;
public final long createdMs;
public long drainedMs;
public long lastAttemptMs;
public final MemoryRecords records;
public final TopicPartition topicPartition;
public final ProduceRequestResult produceFuture;
public long lastAppendTime;
private final List<Thunk> thunks;
private long offsetCounter = 0L;
private boolean retry;