大数据分布式消息缓存——Kafka
Kafka
Kafka概述
消息队列及其实现原理
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
为什么需要消息队列
- 解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
- 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
- 灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
- 缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
什么是Kafka
在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费Kafka 的数据进行计算。
Apache Kafka 是一个开源消息系统(是一个队列平台),由Scala 写成。Kafka由 Linkedin公司开发,项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。在2010年贡献给了 Apache并成为顶级开源项目。
Kafka是一个分布式(多节点搭建)、分区的(Kafka通过partition进行分区)、多副本的(存在多个副本)、多订阅者(对于kafka的一个topic可以存在多个消费者),基于 zookeeper协调的(zk可以维护kafka消息的偏移量)分布式日志系统,主要应用场景是:日志收集、消息系统、用户活动跟踪、运营指标、流式处理等。
Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为Producer,消息接受者称为 Consumer,此外kafka 集群有多个kafka 实例组成,每个实例(server)称为broker。
无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。
Kafka特性
高吞吐量、低延迟: kafka每秒可以处理几十万条消息,它的延退最低只有几毫
可扩展性: kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数干个客户端同时读写
Kafka架构
Kafka组件
Broker
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个topic;
Producer
日志消息生产者,定位就是向 kafka broker 发消息的客户端,即写数据。生产者可以发布数据到它指定的 topic中,并可以指定在 topice里哪些消息分配到哪些分区(比如简单的通过hash轮流分发各个分区或通过指定分区语义分配key到对应分区)
- 生产者直接把消息发送给对应分区的 broker,而不需要任何路由层。
- 批处理发送,当 message积累到一定数量或等待一定时间后进行发送
Consumer
消息消费者,向 kafka broker 取消息的客户端;
一种更抽象的消费方式:消费组( consumer groupid) streaming),该方式包含了传统的 queue和发布订阅方式:
- 首先消费者标记自己一个消费组名。消息将投递到每个消费组中的某一个消费者实例上。
- 如果所有的消费者实例都有相同的消费组,这样就像传统的 queue方式。
- 如果所有的消费者实例都有不同的消费组,这样就像传统的发布订说方式。
- 消费组就好比是个逻辑的订阅者,每个订阅者由许多消费者实例构成(用于扩展或容错)。
相对于传统的消息系统, kafka拥有更强壮的顺序保证。由于 topic采用了分区,可在多 Consumer进程操作时保证顺序性和负载靴均衡。
kafka不管consumer消费到哪个partition。就比如说你看书,书只负责提供目录,topic,它不管你看到哪,你要想标记,你就自己整个书签(低阶API)。但是现在有电子书,能自动记录阅读到的页码(高阶API),这就好比zookeeper(只负责记录消息数据的偏移量)。
zk维护元数据信息,怎么体现?
需求:查看kafka offset
日志消息的消费者,定位读数据,向 kafka broker 取消息的客户端;
Topic
每条发布到 kafka集群的消息属于的类别(发布到kafka的消息都会指定topic),即 kafka是面向 topic的。可以理解为一个队列;
topic是一个虚拟的概念。
目的一:不同的consumer可以去指定的topic读;
目的二:不同的producer往不同的topic进行写;
一个 Topic是一个用于发布消息的分类或feed名, kafka集群使用分区的日志每个分区都是有顺序且不变的消息序列。
commite的log可以不断追加。消息在每个分区中都分配了一个叫 offset的id序列来唯一识别分区中的消息。
Partition
为了实现扩展性,一个非常大的 topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个partition 间)的顺序。
Kafka的分配单位就是Partition;
partition是kafka的基本单元。我们一般关注到partition级别就好。partition的目的是实现负载均衡,partition的数据分布在不同的节点上。每一个partition中的数据具有顺序性。
针对单个partition来说,里面的消息是连续递增的,而多个partition无法保证全局的有序。
Replica
partition的副本,保障 partition的高可用;
Follower
replica中的一个角色,从 leader中复制( fetch)数据;
Leader
replica中的一个角色, producer和 consumer只跟 leader交互;
Segment
Partition物理上由多个Segment组成;
Offset
每个 partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition中。 partition中的每个消息都有一个连续递增的序列号叫做 offset,偏移量 offset在每个分区中是唯一的。
kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到2048.kafka 的文件即可。当然the first offset 就是00000000000.kafka。
kafka 的partition下面两个文件,log(存数据的),index(保存的消息的顺序)
Consumer Group
Consumer是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的topic;
high-level consumer API 中,每个 consumer都属于一个consumer group,每条消息和 partition只能被 consumer group中的一个 consumer 消费,但可以被多个 consumer group消费;
Kafka中的数据根据不同的业务可以分发给不同的消费者组。比如,假设kafka中的数据是用户行为数据,consumer1对接推荐业务,第一部分数据给推荐业务consumer1,消费用户的实时行为数据。第二份数据对接的是广告系统,同样需要用户的行为信息。即不同的业务同时需要消费同样的数据,不同的数据给不同的业务逻辑。此外,这两个consumer不能位于同一个group中。
Controller
kafka集群中的其中一个服务器,用来进行 leader election以及各种failover;
Zookeeper
kafka通过 zookeeper来存储集群的meta信息和偏移量( offset);
Kafka命令行操作
- 启动Kafka
1 | bin/kafka-server-start.sh config/server.properties & |
- 关闭Kafka
1 | bin/kafka-server-stop.sh stop |
- 查看服务器中所有的topic
1 | [ kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list |
- 创建topic
1 | [ kafka]$ bin/kafka-topics.sh --zookeeper master:2181 \ |
选项说明:
- topic 定义topic 名
- replication-factor 定义副本数 —partitions 定义分区数
- 删除topic
需要server.properties 中设置delete.topic.enable=true 否则只是标记删除或者直接重启。
1 | [xxx@xxx kafka]$ bin/kafka-topics.sh --zookeeper master:2181 \ |
- 发送消息
1 | [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \ |
- 消费消息
1 | [xxx@xxx kafka]$ bin/kafka-console-consumer.sh \ |
- from-beginning:会把 first 主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
- 查看某个Topic 的详情
1 | [xxx@xxx kafka]$ bin/kafka-topics.sh --zookeeper master:2181 \ |
Kafka工作流程
写入方式
producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分区(patition)中(追加是以segment形式进行追加),属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka 吞吐率)。
分区
消息发送时都被发送到一个 topic,其本质就是一个目录,而topic 是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个Partition 中的消息都是有序的,生产的消息被不断追加到Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。 消息在某一个partition内部是有序的,但在整体的topic下,不是全局有序的。
分区的原因
- 方便在集群中扩展,每个Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个Partition 组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以Partition 为单位读写了;
分区的原则
- 指定了patition,则直接使用;
- 未指定patition 但指定key,通过对key 的value 进行hash 出一个patition;
- patition 和 key 都未指定,使用轮询选出一个patition;
副本
同一个 partition 可能会有多个 replication (对应 server.properties 配置中的 default.replication.factor=N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入 replication之后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一个leader,producer 和 consumer 只与这个leader 交互,其它 replication 作为follower 从leader 中复制数据。
写入流程
- producer 先从zookeeper 的 “/brokers/…/state”节点找到该 partition 的 leader;
- producer 将消息发送给该 leader;
- leader 将消息写入本地log;
- followers 从leader pull 消息,写入本地log 后向leader 发送ACK;
- leader 收到所有ISR 中的replication 的ACK 后,增加HW(high watermark,最后commit 的offset)并向producer 发送ACK
Kafka的ACK机制2
Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。
ack有3个可选值,分别是1,0,-1。
ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,那么就要放弃可靠性。
ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。
ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
Broker保存消息
存储单元-Message
kafka的基本单元是message,类似Flume中的event。
message(消息)是通信的基本单位,每个 producer可以向一个 topic(主题)发布一些消息。如果 consumer订阅了这个主题,那么新发布的消息就会广播给这些 consumer
message format
message length:4 bytes-1
magic” value:1byte( kafka服务协议版本号,做兼容)
crc32: 4 bytes
timestamp:8 bytes
payload:n bytes
存储方式
物理上把topic 分成一个或多个 patition(对应 server.properties 中的num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:
存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
持久化
Kafka存储布局简单: Topice的每个 Partition对应一个逻辑日志(一个日志为相同大小的一组分段文件)。
每次生产者发布消息到一个分区,代理就将消息追加到最后一个段文件(segment)中。当发布的消息数量达到设定值或者经过一定的时间后,一段文件真正flush磁盘中,写入完成后,消息公开给消费者。
与传统的消息系统不同, Kafka系统中存储的消息没有明确的消息Id。
消息通过日志中的逻辑偏移量来公开。
Segment
副本策略
副本管理
kafka将日志复制到指定多个服务器上。复本的单元是 partition。在正常情況下,每个分区有一个 Leader和0到多个follower.
Leader处理对应分区上所有的读写请求。分区可以多于 broker数, leader也是分布式的。follower的日志和 leader的日志是相同的, follower被动的复制 leader。如果leader挂了,其中ー个 followe会自动变成新的 leader。
topic中的第一个partition有多个副本,分布在broker1-4上,其中broker1中partition的是leader角色,负责所有消息的读写 ,也就是consumer读取消息时,从1中读取,其他副本同步1的读取状态如offset等。如果broker1节点挂掉了其他节点可能会顶替broker1成为leader,通过zk来维护实现。
和其他分布式系统一样,节点“活着”定义在于我们能否处理一些失败情况。kafka需要两个条件保证是”活着“。
- 节点在zookeeper注册的 session还在且可维护(基于 zookeeper心跳机制);
- 如果是 slave则能够紧随 leader的更新不至于落得太远;
kafka:采用 in sync来代替”活着”。
- 如果 follower挂掉或卡住或落得很远,则 leader会移除同步列表中的 In sync。至于落了多远才叫远由 replica.lag.max. messages配置,而表示复本“卡住”由 replica.lag,time.max.ms配置;
定义”活着“
- replica.lag.max.messages=1000
如果备份的数据和leader的数据消息的条数超过1000条,将你从我的同步列表中剔除出去,也就是没有任何机会称为leader
- replica.lag.time.max.ms 通过时间设置,不至于掉的太远
Isr (同步副本): kafka在zk中动态维护的机会,里面维护了所有的副本,前提这些副本都能跟上leader的节奏
所谓一条消息是“提交”的,意味着所有 in sync的复本也持久化到了他们的log中。这意味着消费者无需担心 leader挂掉导致数据丟失。另一方面,生产者可以选择是否等待消息“提交”。
kafka动态的维护了ー组in-sync(ISR)的复本,表示已追上了 leader,只有处于该状态的成员组才是能被选择为 leader。这些ISR组会在发生变化时被持久化到zookeeper中。通过ISR模型和f+1复本,可以让kafka的topic支持最多f个节点挂掉而不会导致提交的数据丟失。
Kafka的消息交付保证
Kafka默认采用 at least once的消息投递策略。即在消费者端的处理顺序是获得消息->处理消息->保存位置。这可能导致一旦客户端挂掉,新的客户端接管时处理前面客户端已处理过的消息。
三种保证策略
- At most once消息可能会丢,但绝不会重复传输(很少用)
- At least one消息绝不会丢,但可能会重复传输(常用)
- Exactly once每条消息肯定会被传输一次且仅传输一次
ZooKeeper存储结构
注意:producer不在zk中注册,消费者在zk注册。
zk除了维护offset,为了保证整个系统的可用性,zk还涉及到保存相关的元数据信息。物理上,不同的topic的消息是分开存储的。
producer和broker之间:push模式,即推数据
consumer和broker之间:pull模式
consumer和producer之间的负载均衡,通过zk实现(保证在push和pull数据的时候,有些节点不可用,可以通过zookeeper达到负载均衡)
注意:每一个partition在同一时间只能服务于一个groupid(消费者组)的consumer
Kafka消费过程分析
kafka 提供了两套consumer API:高级Consumer API 和低级Consumer API。
高级API 优点
- 高级API 写起来简单,不需要自行去管理 offset,系统通过zookeeper 自行管理。
- 不需要管理分区,副本等情况,系统自动管理。
- 消费者断线会自动根据上一次记录在 zookeeper 中的 offset 去接着获取数据(默认设置1 分钟更新一下zookeeper 中存的offset)
- 可以使用 group 来区分对同一个 topic 的不同程序访问分离开来(不同的 group 记录不同的offset,这样不同程序读取同一个 topic 才不会因为 offset 互相影响) 。
高级API 缺点
- 不能自行控制offset(对于某些特殊需求来说);
- 不能细化控制如分区、副本、zk 等 ;
低级API
优点
- 能够让开发者自己控制 offset,想从哪里读取就从哪里读取;
- 自行控制连接分区,对分区自定义进行负载均衡 ;
- 能够让开发者自己控制 offset,想从哪里读取就从哪里读取。 自行控制连接分区,对分区自定义进行负载均衡 ;
缺点
- 太过复杂,需要自行控制offset,连接哪个分区,找到分区 leader 等。
消费者组
消费者是以consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group 中的一个消费者读取,但是多个group可以同时消费这个 partition。
在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。
消费方式
consumer 采用pull(拉)模式从broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
Kafka producer 拦截器(interceptor)
roducer 拦截器(interceptor)是在Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于producer 而言,interceptor 使得用户在消息发送前以及producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
- configure(configs)
- 获取配置信息和初始化数据时调用。
- onSend(ProducerRecord)
- 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
- onAcknowledgement(RecordMetadata, Exception)
- 该方法会在消息被应答或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率
- close
- 关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
Kafka传输效率
1.生产者提交一批消息作为一个请求。消费者虽然利用apij遍历消息是一个一个的但背后也是一次请求获取一批数据,从而减少网络请求数量。
2.Kafka层采用无缓存设计,而是依赖于底层的文件系统页缓存。这有助于避免双重缓存,及即消息只缓存了一份在页缓存中。
3.同时这在 kafka重启后保持缓存warm也有额外的优势。因 kafka根本不缓存消息在进程中,故gc开销也就很小
zero-copy: kafka为了减少字节拷贝,采用了大多数系统都会提供的 sendfile系统调用
(直接将磁盘上的数据拷贝到socket。不用通过应用程序传输。)
Kafka代理是无状态的:意味着消费者必须维护已消费的状态信息 offset。这些信息由消费者自己维护,代理完全不管。这种设计非常微妙,它本身包含了创新。
- 从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。 Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除
- 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。
flume是有事务这种机制,而kafka无任何事务。kafka是针对线上的存储,是一种消息队列,通过offset记录。
Kafka分布式协调
kafka使用 zookeeper做以下事情:
- 探测 broker和 consumer的添加或移除
- 当第1条发生时触发每个消费者进程的重新负载。
- 维护消费关系和追踪消费者在分区消费的消息的 offset
ZK的使用
Broker Node Registry
/brokers/ids/[o…N] —> host:port(ephemeral node)
- broker启动时在/ brokers/ids下创建一个 znode,把 broker id写进去。
- 因为 broker把自己注册到 zookeeper中实用的是瞬时节点,所以这个注册是动态的,如果broker宕机或者没有响应该节点就会被删除。
Broker Topic Registry
/brokers/topics/[topic]/[O…N] —> nPartions (ephemeral node)
- 每个 broker把自己存储和维护的 partion信息洼册到该路径下。
Consumers and Consumer Groups
- consumerst也把它们自己注册到 zookeeper上,用以保持消费负载平衡和 offset记录
- group id相同的多个 consumer构成一个消费组,共同消费一个 topic,同一个组的consumer会尽量均匀的消费,其中的一个 consumer只会消费一个 partion的数据。
Consumer Id Registry
/consumers/[group id]/ids/[consumer_id] —> (“topic1”: # streamstopic: #streams, …, “topicN”: #streams} (ephemeral node)
- 每个 consumer在/ consumers/[ group_id]/ids下创建一个瞬时的唯一的 consumer_id,用来描述当前该 group下有哪些 consumer是 alive的,如果消费进程挂掉对应的 consumer_id就会从该节点删除。
Consumer Offset Tracking
/consumers/[group_id]/offsets/[topic]/[partition_id] —> offset_counter-value ((persistent node)
- consumer把每个 partition的消费 offset己录保存在该节点下。
Partition Owner registry
/consumers/[group _id]/owners/[topic]/[broker_id-partition_id] —> consumer_node_id (ephemeral node)
- 该节点维护着 partion与 consumer之间的对应关系。
Flume与Kafka区别
实践
案例
需求:描述多个topic
1、./bin/kafka-server-start.sh config/server.properties &
2、启动zk(三个节点中)
3、./bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 10 —topic badou_topic10
配置文件中kafkalogs的存储位置
查看logs文件
想通过命令查看topic
如果slave也启动了,则Isr会有0, 1, 2
实践1
1 | 课后实践:将三个kafka进程启动,同时创建 |
结果1
在main上输入
1 | ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic badou_topic11 |
在slaves1上输入
1 | ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic badou_topic11 |
结论:kafka集群创建topic,整个集群共享已经创建的topic信息,partition分布到集群中各个kafka节点里。
课后实践2
在slave1上执行
1 | ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic badou_topic12 |
执行
1 | ./bin/kafka-topics.sh --describe --zookeeper slaves1:2181 --topic badou_topic12 |
在main上执行
1 | ./bin/kafka-topics.sh --describe --zookeeper main:2181 --topic badou_topic12 |
leader代表当前partition的主,replicas代表当前partition的备份节点,isr表示当前主节点挂掉后,下一个主节点是谁。
课后实践3
3 kill掉slaves1上的kafka
在main上执行
1 | ./bin/kafka-topics.sh --describe --zookeeper main:2181 --topic badou_topic12 |
可以看到,slaves1挂掉后,对应的leader和replicas以及isr都发生改变。
案例
选一个topic,往里写入数据
创建生产者,往badou topic写数据
一般是消费完数据后,才会更新偏移量。还有一种情况,就是中途任务挂掉了,比如sparkstreaming的executor,这时offset不会改变,下次任务重启,重新从之前任务挂掉时对应的offset开始继续往后读。
解释不需路由
不同颜色对应不同的topic,即图中有三个topic。
蓝色的topic通过广播的形式发送给两个group组。
如果一个consumer group 消费者的个数大于topic中partition的话,consumer group会出现等待的情况(本质:一个partition在同一时间只能服务于一个consumer)
补充问题
1.为什么一个partition在同一时间只能服务于组内一个consumer?
由于 kafka中ー个 topic中的不同分区只能被消费组中的一个消费者消费,就避免了多个消费者消费相同的分区时会导致额外的开销(如要协调哪个消费者消费哪个消息,还有锁及状态的开销)。 kafka中消费进程只需要在代理和同组消费者有变化时进行一次协调(这种协调不是经常性的,故可以忽略开销。
2.为什么kafka处理数据不会出现这个”\n”问题?
实际业务问题:
hive:\n,导致数据换行,如何处理?
解决方案:在数据源头进行处理。如果通过hive解决存在得问题,很难知道数据的本身规律,比如数据长度以及数据起始位置,成本较高。
在工作中,可以通过timestamp的标识来进行处理,但是不能处理\n问题,能处理只关心用户终态的行为。
Kafka存在拦截器组件,可以从源头上对数据进行处理,解决此问题。
3.什么是事务?
事务的介绍
事务就是用户定义的一系列执行SQL语句的操作, 这些操作要么完全地执行,要么完全地都不执行, 它是一个不可分割的工作执行单元。
事务的使用场景:1
- 在日常生活中,有时我们需要进行银行转账,这个银行转账操作背后就是需要执行多个SQL语句,假
如这些SQL执行到一半突然停电了,那么就会导致这个功能只完成了一半,这种情况是不允许出现,
要想解决这个问题就需要通过事务来完成。
事务的四大特性:
原子性(Atomicity)
一致性(Consistency)
隔离性(Isolation)
持久性(Durability)
原子性:
一个事务必须被视为一个不可分割的最小工作单元,整个事务中的所有操作要么全部提交成功,要么
全部失败回滚,对于一个事务来说,不可能只执行其中的一部分操作,这就是事务的原子性
一致性:
数据库总是从一个一致性的状态转换到另一个一致性的状态。(在前面的例子中,一致性确保了,即
使在转账过程中系统崩溃,支票账户中也不会损失200美元,因为事务最终没有提交,所以事务中所做
的修改也不会保存到数据库中。)
隔离性:
通常来说,一个事务所做的修改操作在提交事务之前,对于其他事务来说是不可见的。(在前面的例
子中,当执行完第三条语句、第四条语句还未开始时,此时有另外的一个账户汇总程序开始运行,则
其看到支票帐户的余额并没有被减去200美元。)
持久性:
一旦事务提交,则其所做的修改会永久保存到数据库。
4.那不同消费组的consumer消费同一个topic下的一个partition不会产生一些问题吗?
不同的consumer组的consumer消费同一个topic下的partition产生的offset由各自进行记录,由于是属于不同消费者组,保证了offset互相隔离,所以不会产生冲突等问题。
参考链接
1. https://blog.csdn.net/lh_hebine/article/details/99051646 ↩
2. https://www.jianshu.com/p/c98b934f2c2b ↩