Kafka 是一个高吞吐、低延迟分布式的消息队列系统。kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。

官网:https://kafka.apache.org

Kafka 简介

Kafka 架构

  • kafka 集群有多个 Broker 服务器组成,每个类型的消息被定义为 topic
  • 同一 topic 内部的消息按照一定的 key 和算法被分区(partition)存储在不同的 Broker 上。
  • 消息生产者 producer 和消费者 consumer 可以在多个 Broker 上生产/消费 topic。

Broker:服务器节点

消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群;

Topic:消息主题(类型)

主题是对一组消息的抽象分类,比如例如page view日志、click日志等都可以以topic的形式进行抽象划分类别。在物理上,不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可使得数据的生产者或消费者不必关心数据存于何处;

Topic 即为每条发布到 Kafka 集群的消息的类别,topic 在 Kafka 中可以由多个消费者订阅、消费。

一个 topic 可以有多个 partition,分布在不同的 broker 服务器上。

Partiton:分区

每个主题又被分成一个或者若干个分区(Partition)。每个分区在本地磁盘上对应一个文件夹,分区命名规则为主题名称后接 “ - ” 连接符,之后再接分区编号,分区编号从0开始至分区总数减-1。

强一致性:Kafka 只保证一个分区内的消息有序,不能保证一个主题的不同分区之间的消息有序。如果你想要保证所有的消息都绝对有序可以只为一个主题分配一个分区。

Producers :生产者

消息生产者,负责发布消息到 Kafka broker。

Consumers:消费者

消息消费者,向 Kafka broker 读取消息的客户端。

Kafka 中其他关键词

LogSegment:日志分段

在Kafka中,每个分区又被划分为多个日志分段(LogSegment)组成,日志分段是Kafka日志对象分片的最小单位 LogSegment 算是一个逻辑概念,对应一个具体的日志文件(“.log”的数据文件)和两个索引文件(“.index”和“.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。

Offset:消息偏移量

每个 partition 中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到partition中。每个消息都有一个连续的序列号称之为 offset 消息偏移量,用于在 partition 内唯一标识消息(并不表示消息在磁盘上的物理位置)。

分区会给每个消息记录分配一个顺序 ID 号(偏移量offset), 能够唯一地标识该分区中的每个记录。Kafka 集群保留所有发布的记录,不管这个记录有没有被消费过,Kafka 提供相应策略通过配置从而对旧数据处理。

实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即、消费者可以通过修改偏移量读取任何位置的数据。

Messeage:消息

消息是Kafka中存储的最小最基本的单位,即为一个commit log,由一个固定长度的消息头和一个可变长度的消息体组成。

Kafka 的使用场景

日志收集:

一个公司可以用Kafka可以收集各种服务的log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr 等。

消息系统:

解耦和生产者和消费者、缓存消息等。

用户活动跟踪:

Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。

运营指标:

Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

流式处理:

比如 spark streaming 和 storm。

Kafka 集群部署

集群规划:

Zookeeper 集群共三台服务器,分别为:sean01、sean02、sean03。

Kafka 集群共三台服务器,分别为:sean01、sean02、sean03。

1、Zookeeper 集群准备

kafka 是一个分布式消息队列,需要依赖 ZooKeeper,请先安装好 ZooKeeper 集群。

2、安装 Kafka

(1)下载压缩包(官网地址:http://kafka.apache.org/downloads.html)

(2)解压:

tar -zxvf kafka_2.10-0.9.0.1.tgz -C ../	 # "-C"的作用是解压到指定路径

(3)修改配置文件:config/server.properties

核心配置参数说明:

**broker.id:**broker 集群中唯一标识 id,0、1、2、3 依次增长(broker 即 Kafka 集群中的一台服务器)。

注:当前 Kafka 集群共三台节点,分别为:sean01、sean02、sean03。对应的 broker.id 分别为 0、1、2。

**zookeeper.connect:**zookeeper 集群地址列表。

(4)最后将当前服务器上的 Kafka 目录同步到其他服务器节点上。

3、启动 Kafka 集群

  • 启动 Zookeeper 集群。
  • 启动 Kafka 集群。

分别在三台服务器上执行以下命令启动:

bin/kafka-server-start.sh config/server.properties

4、测试

(1)创建 topic:

bin/kafka-topics.sh --zookeeper sean01:2181,sean02:2181,sean03:2181 --create --replication-factor 2 --partitions 3 --topic test

参数说明:
replication-factor:指定每个分区的复制因子个数,默认 1 个
partitions:指定当前创建的 kafka 分区数量,默认为 1 个
topic:指定新建 topic 的名称

(2)查看 topic 列表:

bin/kafka-topics.sh --zookeeper sean01:2181,sean02:2181,sean03:2181 --list

(3)查看 “test” topic 描述:

bin/kafka-topics.sh --zookeeper sean01:2181,sean02:2181,sean03:2181 --describe --topic test

(4)创建生产者:sean03

bin/kafka-console-producer.sh --broker-list sean01:9092,sean02:9092,sean03:9092 --topic test

(5)创建消费者:sean02

bin/kafka-console-consumer.sh --zookeeper sean01:2181,sean02:2181,sean03:2181 --from-beginning --topic test

注:
查看帮助手册:

bin/kafka-console-consumer.sh help

(6)查看结果:

先在生产者节点sean03中输入几句话

然后到消费者节点sean02中查看

Flume与Kafka 整合

1、Flume 安装

Flume 详细的安装流程详见:[Flume框架][https://www.seanxia.cn/大数据/93326c6d.html]

2、Flume + Kafka

  • 启动 Kafka 集群。
bin/kafka-server-start.sh config/server.properties
  • 配置 Flume 集群,并启动 Flume 集群。
bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

其中,Flume 配置文件 fk.conf 内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = sean03
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = sean01:9092,sean02:9092,sean03:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、测试

  • 分别启动 Zookeeper、Kafka、Flume 集群。

  • 创建 topic:

# 使用JavaAPI插入数据时可以不创建,系统会自动创建一个分区和副本都为1的指定Topic
bin/kafka-topics.sh --zookeeper sean01:9092,sean02:9092,sean03:9092 --create --replication-factor 2 --partitions 3 --topic testflume
  • 启动消费者:
# 这里我们也可以使用JavaAPI操作,不用shell操作
bin/kafka-console-consumer.sh --zookeeper sean01:9092,sean02:9092,sean03:9092 --from-beginning --topic testflume
  • 运行 “RpcClientDemo” 代码,通过 RPC 请求发送数据到 Flume 集群。

Flume 中 source 类型为 AVRO 类型,此时通过 Java 发送 rpc 请求,测试数据是否传入 Kafka。

相关Demo可以参考 Flume 官方文档:http://flume.apache.org/FlumeDeveloperGuide.html

先定义生产者:RpcClientDemo类

public class RpcClientDemo {

public static void main(String[] args) throws InterruptedException {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
// 使用远程Flume代理主机和端口初始化客户端
client.init("sean03", 41414);

// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i =0; i < 300; i++) {
int number = new Random().nextInt(3);
String sampleData ;
if(number == 0){
sampleData = "Hello Flume! ERROR " + i;
}else if(number==1){
sampleData = "Hello Flume! INFO " + i;
}else {
sampleData = "Hello Flume! WARNING " + i;
}
client.sendDataToFlume(sampleData);
Thread.sleep(500);
}
client.cleanUp();
}
}

class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}

public void cleanUp() {
// Close the RPC connection
client.close();
}
}

定义消费者:MyConsumer

public class MyConsumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;

public MyConsumer(String topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}

private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "sean01:2181,sean02:2181,sean03:2181");
props.put("group.id", "xss01");
props.put("zookeeper.session.timeout.ms", "400");
props.put("auto.commit.interval.ms", "100");
props.put("auto.offset.reset","smallest");
// props.put("auto.commit.enable","false"); // 关闭自动提交,开启手动提交

return new ConsumerConfig(props);
}

// push消费方式,服务端推送过来。主动方式是pull
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//mytopic2
topicCountMap.put(topic, 1); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
// 每个线程对应于一个KafkaStream
List<KafkaStream<byte[], byte[]>> list = consumerMap.get(topic);
KafkaStream stream = list.get(0);

ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("xixii................");
while (it.hasNext()){
String data = new String(it.next().message());
System.out.println("开始处理数据 ...:"+ data);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("数据处理中..." + data);
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("处理完数据..." + data);
// consumer.commitOffsets();
}

}

public static void main(String[] args) {
MyConsumer consumerThread = new MyConsumer("testflume");
consumerThread.start();
}
}

先启动 RpcClientDemo

再启动 MyConsumer

说明数据已经传入Kafka中,Flume和Kafka整合成功。

Flume & Kafka & Storm(Spark)

利用Flume和Kafka,我们还可以跟流式计算框架Storm或Spark进行整合,来处理工作中的业务需求。

大概流程如下:

Kafka 对比 HDFS

1、Kafka 分区里的数据是有序的,读写更快。

2、Kafka 写数据先写入内存中,来保证高吞吐量。

吞吐量:单位时间内读写的数据量大小。

3、因为Kafka的数据先写入内存,一旦在写入过程中服务器宕机,会有丢失数据的风险。

  • 可以通过增加备份来降低数据丢失的风险(副本机制)。

  • 设置 acks 的值。

    acks = 0:tomcat客户端只需发送给leader服务器,无需返回的响应信息。速度最快

    acks = 1:tomcat客户端需要收到leader服务器返回的响应信息。默认此配置

    acks = -1 或 all:tomcat客户端需要收到所有副本返回的响应信息。速度最慢

ISR(in syncronized replication): leader候选人机制

所有的副本都是在 ISR 列表中的节点上,来保证数据的完整性。

当 leader 服务器宕机时,Zookeeper 会从 ISR 列表中去选举新的 leader,每个副本节点在同步数据时,由于每个节点负载消耗不同,如果某个节点负载过高,数据无法在短时间内同步,就会从 isr 列表中暂时移除,等它同步正常之后在添加会 isr 列表。

Kafka的数据丢失和重复消费

发生原因

原因都起始于偏移量 offset 的提交周期问题。系统默认1分钟。

数据丢失

生产者 producer

当默认状态下(acks=1),客户端提交数据到leader服务器,收到leader的响应信息后结束发送。然后在 leader 进行同步备份节点数据时,leader 服务器宕机,而数据还未备份成功,数据就会丢失。下次客户端来访问就访问不到。

消费者 consumer

客户端提交的间隔比较频繁,数据未处理成功叫提交偏移量,此时服务器宕机,数据就会丢失。

试验一下:API中把自动提交的间隔调小

然后把API中的提交数据过程打开

启动消费端,查看控制台:在xss5未处理完成的时候断开

再次重启,查看:发现从 xss6 开始,xss5被忽略了,但xss5上次的偏移量并未提交完成,数据丢失!

重复消费

重复消费存在于消费者中。

客户端自动提交的时间间隔太久,在还未提交前服务器宕机,下次重启后只会从之前记录的偏移量开始消费,就会造成重复消费。

试验一下:API中把自动提交的间隔调的大一些

看消费端,当小费到 xss15 的时候关掉服务

然后重新启动消费端,发现还是从 xss1 开始,由于未到一分钟还未提交偏移量,形成重复消费。

解决办法

1、调整生产者的 acks 状态

改变acks的值,设置为 -1 或 all

acks = -1 或 all:tomcat客户端需要收到所以副本返回的响应信息。速度最慢

2、关闭自动提交,使用手动提交方式。

auto.commit.enable","false"

默认配置为打开:true

其他详细配置可以参照官网:http://kafka.apache.org/090/documentation.html#highlevelconsumerapi

注:手动提交虽然能解决 Kafka 的数据丢失和重复消费问题,但是效率速度上相对自动提交降低很多,所以具体问题还需具体分析,应该是在实际业务中去取舍。(比如人口统计重复消费问题可以忽略少量,但是效率第一;再如银行业务是绝对不允许重复消费和数据丢失的,对精确度要求更高)

Kafka 副本机制

Kafka的副本机制指的是多个服务端节点对其他节点的 topic 分区的日志进行复制。

当集群中的某个节点出现故障,访问故障节点的请求就会被转移到其他正常节点(这个过程叫做 Reblance)。

Kafka每个主题的每个分区都有一个主副本以及0或多个从副本,从副本保持与主副本的数据同步,当主副本出故障时就会被从副本替代。

在Kafka中,并不是所有的副本都能拿来替代主副本,所以在Kafka的Leader节点中维护着一个ISR列表,候选人机制。
当Leader服务器宕机时,Zookeeper会从ISR列表中去选举新的Leader。每个副本节点在同步数据时,由于负载消耗不同,如果某个节点负载较高,数据无法在短时间内同步,就会从ISR列表中暂时移除,等它正常同步之后,再添加到ISR列表中。

Kafka 数据存储

Kafka中的消息是以主题(Topic)为基本单位进行组织的,各个主题之间相互独立。在这里主题只是一个逻辑上的抽象概念,而在实际数据文件的存储中,Kafka中的消息存储在物理上是以一个或多个分区(Partition)构成,每个分区对应本地磁盘上的一个文件夹,每个文件夹内包含了日志索引文件(“.index”和“.timeindex”)和日志数据文件(“.log”)两部分。分区数量可以在创建主题时指定,也可以在创建Topic后进行修改。

同时,Kafka为了实现集群的高可用性,在每个Partition中可以设置有一个或者多个副本(Replica),分区的副本分布在不同的Broker节点上。同时,从副本中会选出一个副本作为Leader,Leader副本负责与客户端进行读写操作。而其他副本作为Follower会从Leader副本上进行数据同步。

Kafka中分区/副本的日志文件存储分析

每个分区又有1至多个副本,分区的副本分布在集群的不同代理上,以提高可用性。从存储的角度上来说,分区的每个副本在逻辑上可以抽象为一个日志(Log)对象,即分区副本与日志对象是相对应的。下图是在三个Kafka Broker节点所组成的集群中分区的主/备份副本的物理分布情况图:

Kafka中日志索引和数据文件的存储结构

在Kafka中,每个 Log 对象又可以划分为多个 LogSegment 文件,每个 LogSegment 文件包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。

其中,每个 LogSegment 中的日志数据文件大小均相等(该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的**“log.segment.bytes”**进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件)。

Kafka将日志文件封装成一个FileMessageSet对象,将偏移量索引文件和消息时间戳索引文件分别封装成OffsetIndex 和 TimerIndex 对象。Log和LogSegment均为逻辑概念,Log是对副本在Broker上存储文件的抽象,而 LogSegment 是对副本存储下每个日志分段的抽象,日志与索引文件才与磁盘上的物理存储相对应;下图为Kafka日志存储结构中的对象之间的对应关系图:

Kafka中Message的存储和查找过程

Message是按照topic来组织,每个topic可以分成多个的partition。比如:有5个partition的名为为page_visits的topic的目录结构为:

partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件。下图是某个partition目录下的文件:

可以看到,这个partition有4个LogSegment。

查找Message原理图:

比如,要查找绝对offset为7的Message:

  1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
  2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
  3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

思考拓展

Kafka使用磁盘也可以高效读写的原因?

1、顺序写入

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

上图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。

这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示 读取到了第几条数据

上图中有两个消费者,Consumer1有两个offset分别对应Partition0、Partition1(假设每一个Topic一个Partition);Consumer2有一个offset对应Partition2。这个offset是由客户端SDK负责保存的,Kafka的Broker完全无视这个东西的存在;一般情况下SDK会把它保存到zookeeper里面。(所以需要给Consumer提供zookeeper的地址)。

如果不删除硬盘肯定会被撑满,所以Kakfa提供了两种策略来删除数据。一是基于时间,二是基于partition文件大小。具体配置可以参看它的配置文档。

2、Memory Mapped Files(内存映射文件)

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。

Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

使用这种方式可以获取很大的I/O提升, 省去了用户空间到内核空间 复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠, 写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。 Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫 异步 (async)。

3、Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易根据偏移量查找消息、定期清除和删除已经消费完成的数据文件,减少磁盘容量的占用;
  • 采用稀疏索引存储的方式构建日志的偏移量索引文件,并将其映射至内存中,提高查找消息的效率,同时减少磁盘IO操作;并大幅降低index文件元数据占用空间大小。
  • Kafka将消息追加的操作逻辑变成为日志数据文件的顺序写入,极大的提高了磁盘IO的性能;

关于Kafka的经典面试知识点

请参考:消息中间件如何实现每秒几十万的高并发写入?

1、页缓存技术 + 磁盘顺序写

2、零拷贝技术