Flume理论+实践

Flume基础

image-20210313211305341

Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。 目前业界,大公司日常的业务数据量一般都能够达到PB级别,小公司也有TB级别。Flume是一个分布式的日志采集、数据收集框架,可以高效的从各个网站服务器中收集日志数据,并且存储到Hbase、HDFS中。

Flume可以对接的数据源有

console、PRC、Text、Tail、Syslog、Exec等;

Flume接收的数据源的输出目标

磁盘、hdfs、hbase,经过网络输出给Kafka

data->flume->kafka->spark streaming/storm/flink -> Hbase,mysql

Flume组成架构

image-20210313235158402

数据发生器(如: facebook, twitter)产生的数据被单个的运行在数据发生器所在服务器上的agent所收集,之后数据收容器从各个 agent上汇集数据并将采集到的数据存入到HDFS或者Hbase中。

agent是部署在一个服务器里的进程(JVM),负责收集该服务器产生的日志数据。

Flume架构详解

image-20210313235320160

Agent

简化的agent内部结构示意图

Flume内部有一个或者多个 Agent,每一个 Agent是一个独立的守护进程(JVM),从客户端接收,或者从其他的 Agent接收,然后迅速的将获取的数据传给下一个目的节点 Agent。Agent是Flume数据传输的基本单元。Agent主要有3个部分组成:Source->(interceptor->selector)->Channel->Sink;

Source

Source是一个 Flume源, 是负责接收数据到Flume Agent 的组件。

Source负责一个外部源(数据发生器),如一个web服务器传递给他的事件,该外部源将它的事件以 Flume可以识别的格式发送到Flume中,当一个 Flume源接收到一个事件时,其将通过一个或者多个通道存储该事件。

Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、 syslog、http、legacy。

关于Source处理的各种数据类型的补充:
Avro 类型的Source

监听Avro 端口来接收外部avro客户端的事件流。avro-source接收到的是经过avro序列化后的数据,然后 反序列化数据继续传输。所以,如果是avro-source的话,源数据必须是经过avro序列化后的数据。利用 Avro source可以实现多级流动、扇出流、扇入流等效果。接收通过flume提供的avro客户端发送的日 志信息。

Exec类型的Source

可以将命令产生的输出作为源

1
a1.sources.r1.command=ping 192.168.234.163 要执行的命令
Taildir Source

监控指定的多个文件,一旦文件内有新写入的数据,就会将其写入到指定的sink内,本来源可靠性高,不会丢失数据,建议使用;但目前不适用于Windows系统;其不会对于跟踪的文件有任何处理,不会重命名也不会删除,不会做任何修改,这点比Spooling Source有优势;目前不支持读取二进制文件,支持一行一行的读取文本文件;在实时数据处理中,可以用该方式取代Exec方式,因为本方式可靠性高。

Spooling Directory类型的 Source

将指定的文件加入到“自动搜集”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件,如果有,flume也会报错。

1
a1.sources.r1.spoolDir=/home/work/data 读取文件的路径,即"搜集目录"
NetCat Source

一个NetCat Source用来监听一个指定端口,并接收监听到的数据。

Kafka Source

支持从Kafka指定的topic中读取数据。

Sequence Generator Source —序列发生源

一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1。主要用来测试。

官方文档

Channel

Channel 是位于Source 和Sink 之间的缓冲区。简单来说,channel就是一个通道,存储池,起缓存作用。只有当数据传输完成后,该event才从通道中移除(可靠性)。(Event的最大个数可以由参数设置)

因此,Channel 允许 Source 和Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个Source 的写入操作和几个 Sink的读取操作。

Flume 自带两种 Channel:Memory Channel 和 File Channel。 Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

  • file:本地磁盘的事务实现模式,保证数据不会丟失(WAL实现) write ahead log
  • memoery:内存存储事务,吞吐率极高,但存在丟数据风险

Flume通常选择 File Channel,而不使用 Memory Channel。

补充概念:Event

Event组成结构示意图

Flume数据的传输单元,Event对象是Flume 内部数据传输的最基本单元,以事件的形式将数据从源头送至目的地。它是由一个转载数据的字节数组+ー个可选头部构成 。

Event由零个或者多个 header(可以起到过滤作用)和正文body组成。Header是key/vaue形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和HTTP头一样提供相同的功能一一通过该方法来传输正文之外的额外信息。Body是一个字节数组,包含了实际的内容。

Sink

sink是数据传输的目标,将event传输到外部介质。

Sink 不断地轮询Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用Channel 提交事务。事务一旦被提交,该Channel 从自己的内部缓冲区删除事件。

总结

三个组件将代码和数据进行解耦,channel起到缓存作用,避免sink数据读取慢而source传输块造成数据丢失。channel可以放到mysql也可以放到kafka。

Flume的拓扑结构

Agent对接Agent

Flume Agent链接

单Source对接多Channel、Sink

单Source对应多Agent

Flume负载均衡

Flume负载均衡

Flume Agent聚合

Flume Agent聚合

Flume Agent内部原理

Flume Agent内部原理示意图

补充

Flume中的source、interceptor、channel内部原理参考

https://blog.csdn.net/wx1528159409/article/details/87923754

Interceptor类型

https://blog.csdn.net/jinYwuM/article/details/82598947

Flume实践

需求1:通过netcat作为source,sink为logger方式

去修改example.conf配置文件,进行相关配置。

1
./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

需求2:通过netcat作为source, sink为logger的方式,并配置拦截器,拦截数字

步骤示意图

1
./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

修改配置文件,增加过滤。

需求3:通过netcat作为source, sink写到hdfs

1
./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

配置文件

传入信息

image-20210307184339482

写入hdfs

image-20210307184424121

这里存在小文件问题,如何设置flume防止小文件?

1.在配置文件里配置roll.size,限定一个文件里存储多大的数据。

1
a1.sinks.k1.hdfs.rollSize = 200*1024*1024

2.限定文件可以存储多少个event

1
a1.sinks.k1.hdfs.rollCount = 10000

需求4:通过http作为source, sink写到logger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = master
a1.sources.r1.port = 50020
#a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
~

更改配置文件

1
./bin/flume-ng agent --conf conf --conf-file ./conf/header_test.conf -name a1 -Dflume.root.logger=INFO,console

发送json数据

1
curl -X POST -d '[{"headers" : {"timestamp" : "434324343","host" : "random_host.example.com"},"body" : "random_body"},{"headers" : {"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "badou,badou"}]' main:50020

需求5:将agent进行串联操作

1.在main上,配置slaves2作为sink,利用push.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1

#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels= c1

#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000

#Describe/configure the source
a1.sinks.k1.type= avro
a1.sinks.k1.channel= c1
a1.sinks.k1.hostname= slaves2
a1.sinks.k1.port= 44444

2.在slaves2上配置pull.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1

#Describe/configure the source
a2.sources.r1.type= avro
a2.sources.r1.channels= c1
a2.sources.r1.bind= slaves2
a2.sources.r1.port= 44444

#Describe the sink
a2.sinks.k1.type= logger
a2.sinks.k1.channel = c1

#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000

3.在slaves2运行flume

1
./bin/flume-ng agent -c conf -f conf/pull.conf -n a2 -Dflume.root.logger=INFO,console

image-20210308230615732

先启动第二个agent,因为先启动第一个agent master会报错,因为它会尝试连接第二个agent。

4.在master上启动flume

1
./bin/flume-ng agent -c conf -f conf/push.conf -n a1 -Dflume.root.logger=INFO,console

image-20210308230730586

5.再创建一个main总端,开始给main传输数据

1
(base) [root@main ~]# telnet localhost 44444

image-20210308231137714

查看slaves2:

image-20210308231351584

数据流转:

a.数据打印传输给第一个agent;—>b.第一个agent指定sink是slave2,最后数据传输到slave2的agent上。slave2获取数据时首先要连接到master。 —> c.数据从master传输到slave2上。

1
2
# 将文本中的a2全部替换(g)为a4
%s/a4/a2/g

需求6:通过flume监控日志文件的变化,然后最终sink到logger

要监控的logger文件

1
/usr/local/src/practice_code/flume/data/flume_exec_test.txt

将其置为空值

1
echo '' > /usr/local/src/practice_code/flume/data/flume_exec_test.txt

运行flume_data_write.py,查看第一行输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding: utf-8 -*-
import random
import time
import pandas as pd
import json

writeFileName="/usr/local/src/practice_code/flume/data/flume_exec_test.txt"
cols = ["order_id","user_id","eval_set","order_number","order_dow","hour","day"]
df1 = pd.read_csv('/usr/local/src/practice_code/flume/data/orders.csv')
df1.columns = cols
df = df1.fillna(0)
with open(writeFileName,'a+')as wf:
for idx,row in df.iterrows():
d = {}
for col in cols:
d[col]=row[col]
js = json.dumps(d)
print js
break
#wf.write(js+'\n')
# rand_num = random.random()
# time.sleep(rand_num)
1
2
(py2) [root@main flume]# python flume_data_write.py 
{"user_id": 1, "hour": 8, "order_id": 2539329, "eval_set": "prior", "order_dow": 2, "order_number": 1, "day": 0.0}

main的flume配置文件flume_kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 要观察的日志文件
a1.sources.r1.command = tail -f /usr/local/src/practice_code/flume/data/flume_exec_test.txt

# 设置终端打印
a1.sinks.k1.type = logger
## 设置kafka接收器
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
## 设置kafka的broker地址和端口号
#a1.sinks.k1.brokerList=master:9092
## 设置Kafka的topic
#a1.sinks.k1.topic=badou_for_20
## 设置序列化的方式
#a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume

1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf -name a1 -Dflume.root.logger=INFO,console

image-20210309085242692

运行

1
(py2) [root@main flume]# python flume_data_write.py 

image-20210309085351792

查看日志文件:

1
2
3
4
(py2) [root@main flume]# head -2 data/flume_exec_test.txt 

{"user_id": 1, "hour": 8, "order_id": 2539329, "eval_set": "prior", "order_dow": 2, "order_number": 1, "day": 0.0}
{"user_id": 1, "hour": 7, "order_id": 2398795, "eval_set": "prior", "order_dow": 3, "order_number": 2, "day": 15.0}

通过pthon模拟实时写日志文件完成。

需求6:flume+kafka对接

flume和kafka对接需要先启动zookeeper。

1.在三个节点上启动zookeeper

1
zkServer.sh start

2.在后台(&)启动kafka

1
(base) [root@main kafka_2.11-0.10.2.1]# ./bin/kafka-server-start.sh config/server.properties &

查看后台运行进程

1
2
(base) [root@main kafka_2.11-0.10.2.1]# jobs -l
[1]+ 75738 Running ./bin/kafka-server-start.sh config/server.properties &
1
2
# 查看后台进程
ps -ef | grep 75738

查看端口号

1
netstat -anp | grep 9092

查看kafka中的主题—topic

1
2
(base) [root@main kafka_2.11-0.10.2.1]# ./bin/kafka-topics.sh  --list --zookeeper localhost:2181
badou_for_20

创建主题

1
./bin/kafka-topics.sh  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic badou_for_test
1
2
3
4
(base) [root@main kafka_2.11-0.10.2.1]# ./bin/kafka-topics.sh  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic badou_for_test 
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "badou_for_test".
(base) [root@main kafka_2.11-0.10.2.1]#

3.启动flume

修改flume配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 要观察的日志文件
a1.sources.r1.command = tail -f /usr/local/src/practice_code/flume/data/flume_exec_test.txt

# 设置终端打印
#a1.sinks.k1.type = logger
# 设置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=master:9092
# 设置Kafka的topic
a1.sinks.k1.topic=badou_for_20
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

消费badou_for_20 topic,没有消费就没有监听。

1
./bin/kafka-console-consumer.sh --zookeeper main:2181 --topic badou_for_20 --from-beginning

image-20210309091625510

启动flume

1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf -name a1 -Dflume.root.logger=INFO,console

image-20210309092109997

请空日志文件

1
echo '' > /usr/local/src/practice_code/flume/data/flume_exec_test.txt

写数据,consumer消费:

image-20210309092243496

整体流程:

a.python 模拟java进程往日志文件写日志 —> b.flume通过exec(tail -f 日志文件)监控日志文件,一旦日志文件发生变化写入到source中—> c.sink对接到kafka—>d.kafka启动badou_for_20主题,消费agent往badou_for_20主题上发送的数据—>d.

image-20210309093110470

参考链接

https://blog.csdn.net/u014612521/article/details/103058090

https://www.cnblogs.com/qingyunzong/p/8995554.html