数据与智能 出版了畅销书「构建企业级推荐系统:算法、工程实现与案例分析」。每周输出7篇推荐系统、数据分析、大数据、AI原创文章。「数据与智能」(同名视频号、知乎、头条、B站、快手、抖音、小红书等自媒体平台号) 社区,聚焦数据、智能领域的知识分享与传播。
作者 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
编辑 | auroral-L
全文共4296字,预计阅读35分钟。
第二章 Kafka 元数据以及消息封装
1. 元数据
1.1 Node
1.2 TopicPartition
1.3 PartitionInfo
1.4 Cluster
1.5 MetaData
2. 消息封装(RecordAccumulator 对象)
2.1 MemoryRecords
2.2 RecordBatch
接着上一篇文章展开,在了解了kafka发送消息的整体流程之后,我们针对其中的一些细节进行展开。
首先kafka发送消息时,KafkaProducer要将此消息追加到指定Topic的某个分区的Leader副本中,首先需要知道Topic的分区数量,经过路由后确定目标分区,之后KafkaProducer需要知道目标分区的Leader副本所在服务器的地址、端口等信息,才能建立连接,将消息发送到Kafka中。因此,在KafkaProducer中维护了Kafka集群的元数据,这些元数据记录了:某个Topic中有哪几个分区,每个分区的Leader副本分布在哪个节点上,Follower副本分布在哪些节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口。
1. 元数据
关于的Kafka的元数据的信息,我们可从以下几个核心类入手:Node、TopicPartition、PartitionInfo、MetaData、Cluster。
1.1 Node
代表集群的节点,包含域名、主机ip、端口、机架这些信息。
**
* Information about a Kafka node
* 代表的就是kafka的一个节点信息
*/public class Node {
private static final Node NO_NODE = new Node(-1, "", -1);
//id 编号,这个编号是我们配置参数的时候指定的broker id。
private final int id;
private final String idString;
//主机名
private final String host;
//端口号,默认是9092
private final int port;
//机架
private final String rack;
....
}
1.2 TopicPartition
代表Kafka中某个Topic的分区映射,主要由主题(topic)名和分区编号组成。
package org.apache.kafka.common;
import java.io.Serializable;
/**
* A topic name and partition number
* topic名称和分区编号
*/public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
....
}
1.3 PartitionInfo
主要阐述了集群主题和分区的映射信息,包括分区的leader和replica分布情况等,比如某个topic在集群中的分区和副本所在的位置。
package org.apache.kafka.common;
/**
* Information about a topic-partition.
*/public class PartitionInfo {
//主题
private final String topic;
//分区编号
private final int partition;
//leader partition 在哪台服务器上面
private final Node leader;
//这个分区的所有的replica都在哪些节点上面
private final Node[] replicas;
//ISR列表。
private final Node[] inSyncReplicas;
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
}
/**
* The topic name
*/ public String topic() {
return topic;
}
/**
* The partition id
*/ public int partition() {
return partition;
}
/**
* The node id of the node currently acting as a leader for this partition or null if there is no leader
*/ public Node leader() {
return leader;
}
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/ public Node[] replicas() {
return replicas;
}
/**
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/ public Node[] inSyncReplicas() {
return inSyncReplicas;
}
}
1.4 Cluster
代表Kafka集群节点、主题和分区的组成信息。
/**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/public final class Cluster {
private final boolean isBootstrapConfigured;
//kafka集群节点信息列表
private final List<Node> nodes;
//没有授权的topic
private final Set<String> unauthorizedTopics;
//内部主题,主要真的Kafka Stream应用程序执行时的内部主题
//例如状态存储的更改日志主题。这些主题由应用程序创建,仅供该流应用程序使用
private final Set<String> internalTopics;
//用于记录TopicPartition和PartitionInfo的映射关系
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//记录topic和PartitionInfo的映射信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//一个topic对应哪些可用partition,topic和PartitionInfo的映射关系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//一台服务器上面有哪些partition,Node与PartitionInfo的映射关系
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//服务器编号和服务器对应的关系,方便按照brokerId索引
private final Map<Integer, Node> nodesById;
//kafka集群的id信息
private final ClusterResource clusterResource;
/**
* 新的构造方法增加了internalTopics,旧的构造方法已经废弃掉了
* Create a new cluster with the given id, nodes and partitions
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/ public Cluster(String clusterId,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics);
}
.......
}
1.5 MetaData
在Cluster类的基础上进一步做了封装,包含了集群信息最后的更新时间、版本号以及是否需要等待更新等信息。
/**
* A class encapsulating some of the logic around metadata.
* <p>
* This class is shared by the client thread (for partitioning) and the background sender thread.
*
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
* <p>
* If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
* is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
*/public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
//过期时间5m
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
//过期值
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
//两个更新元数据的请求的最小的时间间隔,默认值是100ms
private final long refreshBackoffMs;
//多久自动更新一次元数据,默认值是5分钟更新一次。
private final long metadataExpireMs;
//对于producer端来讲,元数据是有版本号
//每次更新元数据,都会修改一下这个版本号。
private int version;
//上一次更新元数据的时间。
private long lastRefreshMs;
//上一次成功更新元数据的时间。
//如果正常情况下,如果每次都是更新成功的,那么lastRefreshMs和lastSuccessfulRefreshMs 应该是相同的。
private long lastSuccessfulRefreshMs;
//Kafka集群本身的元数据。
private Cluster cluster;
//是否更新元数据的标识
private boolean needUpdate;
/* Topics with expiry time */ //记录了当前已有的topics
private final Map<String, Long> topics;
//用于监听MetaData更新的监听器集合
private final List<Listener> listeners;
//集群资源监听器
private final ClusterResourceListeners clusterResourceListeners;
//是否需要更新全部topic的元数据信息,
private boolean needMetadataForAllTopics;
private final boolean topicExpiryEnabled;
/**
* Create a metadata instance with reasonable defaults
*/ public Metadata() {
this(100L, 60 * 60 * 1000L);
}
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
}
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashMap<>();
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
this.needMetadataForAllTopics = false;
}
/**
* Get the current cluster info without blocking
*/ public synchronized Cluster fetch() {
return this.cluster;
}
........
}
从上一篇文章中我们可以大概能知道,kafka在发送消息的过程中很重要的一点就是消息的封装、消息的请求和分发。
大致的流程如下:
〇 ProducerInterceptor对消息进行拦截
〇 Serializer对消息的key和value进行序列化
〇 Partitioner对消息进行分发到对应的Partition
〇 RecordAccumulator负责收集消息并封装成批次,通过批量发送消息
〇 Sender为实际发送的线程,负责从RecordAccumulator获取消息
〇 构建ClientRequest请求
〇 将ClientRequest请求发送给NetworkClient组件
〇 NetworkClient组件将ClientRequest请求放入KafkaChannel缓存区
〇 执行网络I/O,发送请求
〇 收到响应,调用ClientRequest的回调函数onComplete
2. 消息封装(RecordAccumulator对象)
首先我们先认识一下RecordAccumulator是何方神圣。通过类的注释可以知道RecordAccumulator本质就是用于收集消息的队列,底层实现的是MemoryRecords。
/**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/public final class RecordAccumulator {
private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
private volatile boolean closed;
//注意这里使用了原子类,所以说封装消息的过程中会涉及到线程安全
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
//消息批次大小
private final int batchSize;
//消息压缩类型,支持GZip、snappy、lz4
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final BufferPool free;
private final Time time;
//一个消息批次包括TopicPartition 分区 和 Deque<RecordBatch> 队列
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
.......
}
Kafka有两种发送消息的方式,分别是同步发送和异步发送,基本使用到的都是异步发送,主线程通过调用KafkaProducer.send()方法将消息缓存在RecordAccumulator,到达一定条件之后就会唤醒Sender线程发送消息。RecordAccumulator包含了这样的一种数据结构ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;以TopicPartition作为key,Deque作为value,Deque中存放了RecordBatch,正如我们上图所示。每一个RecordBatch就是一个MemoryRecords对象,可以了解一下MemoryRecords的数据结构。
2.1 MemoryRecords
public class MemoryRecords implements Records {
private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
// 压缩消息数据,并写入到buffer
private final Compressor compressor;
// 表示buffer中最多可以写入的字节数据大小
private final int writeLimit;
// buffer初始化大小
private final int initialCapacity;
// 用于保存消息数据的缓冲区
private ByteBuffer buffer;
// 用于表示MemoryRecords的读写模型,默认是只读
private boolean writable;
.......
}
MemoryRecords有几个重要的方法,有兴趣的可以下去了解一下。
其中
〇 buffer:用于保存消息数据的Java NIO ByteBuffer。
〇 writeLimit:记录buffer字段最多可以写入多少个字节的数据。
〇 compressor:压缩器,对消息数据进行压缩,将压缩后的数据输出到buffer。
〇 writable:此MemoryRecords对象是只读的模式,还是可写模式。在MemoryRecords发送前时,会将其设置成只读模式。
MemoryRecords中的Compressor的压缩类型是由“compression.type”配置参数指定的,即KafkaProducer.compressionType字段的值。目前KafkaProducer支持GZIP、SNAPPY、LZ4三种压缩方式,对应源码的位置如下。
// the following two functions also need to be public since they are used in MemoryRecords.iteration
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
try {
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
try {
switch (type) {
case NONE:
return new DataInputStream(buffer);
case GZIP:
return new DataInputStream(new GZIPInputStream(buffer));
case SNAPPY:
try {
InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
messageVersion == Record.MAGIC_VALUE_V0);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
2.2 RecordBatch
在简单了解完MemoryRecords类结构之后,上面说到每个RecordBatch其实就是一个MemoryRecord,所以接下来我们继续了解RecordBatch。
/**
* A batch of records that is or will be sent.
* 最终被发送出去的消息对象
* 这个类不是线程安全的,在修改的时候需要使用同步锁
*/public final class RecordBatch {
private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
//用于记录保存的record数量
public int recordCount = 0;
//最大Record的字节数
public int maxRecordSize = 0;
//尝试发送RecordBatch的次数
public volatile int attempts = 0;
//创建时间
public final long createdMs;
public long drainedMs;
//最后一次尝试发送的时间
public long lastAttemptMs;
//真正存储数据的地方
public final MemoryRecords records;
//当前RecordBatch中缓存的消息都会发送给此TopicPartition
public final TopicPartition topicPartition;
//ProduceRequestResult类型,标识RecordBatch状态的Future对象
public final ProduceRequestResult produceFuture;
//最后一次向RecordBatch追加消息的时间
public long lastAppendTime;
//Thunk对象的集合
private final List<Thunk> thunks;
//用来记录消息在RecordBatch中的偏移量
private long offsetCounter = 0L;
//是否正在重试。如果RecordBatch中的数据发送失败,则会重新尝试发送
private boolean retry;
.......
}
从上面的流程图可以看到,将消息封装成一个一个的RecordBatch之后,放到Dqueue队列中,一个RecordAccumulator由一个至多个的Dqueue组成,这样可以减少通信成本,批量发送消息,从而也能提高吞吐量。
这一节内容不多,也不是很难,大家可以简单看一下,主要讲的是关于发送消息会涉及到的几个核心元数据对象以及最终消息发送的形式,接下来我们会分享如何将封装好的消息对象通过sender线程和内部网络请求发送到kafka集群。