flume日志采集
ben 2020-03-08
flume
# 下载flume
wget http://mirrors.hust.edu.cn/apache/flume/stable/apache-flume-1.9.0-bin.tar.gz
1
# 解压到/data
# 配置环境变量
vim /etc/profile
export FLUME_HOME=/data/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
1
2
3
4
5
6
2
3
4
5
6
# 使用场景
# 读取某个文件,输出到kafka中
vim flume-kafka.conf:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /var/log/nginx/access.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.135:9092
a1.sinks.k1.kafka.topic = testTopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
启动:
ohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/flume-kafka.conf &
1
# 直接使用avro-client读取目标机器文件,输出到kafka
vim avro.conf:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定制source,绑定channel、主机以及端口
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
#Describe the sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.135:9092
a1.sinks.k1.kafka.topic = testTopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
启动接收端:
/data/apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c /data/apache-flume-1.9.0-bin/conf/ --conf-file /data/apache-flume-1.9.0-bin/conf/avro.conf -Dflume.root.logger=INFO,console
1
在目标采集机器启动avro-client:
/data/apache-flume-1.9.0-bin/bin/flume-ng avro-client -c /data/apache-flume-1.9.0-bin/conf/ -H 192.168.1.135 -p 4141 -F /var/log/nginx/access.log
1
# 读取文件,输出到rocketmq
rocketmq需要引入额外jar包,如下:
https://gitee.com/ahxiao/rocketmq-externals
1
vim flume-rocketmq.conf:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /var/log/nginx/access.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type=org.apache.rocketmq.flume.ng.sink.RocketMQSink
a1.sinks.k1.nameserver=192.168.1.129:9876
a1.sinks.k1.channel=channel1
a1.sinks.k1.topic=BenTopic
a1.sinks.k1.producerGroup=MyProducerGroup_1
a1.sinks.k1.tag=Tag1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
启动:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/flume-rocketmq.conf &
1
# 接收avro,输出到rocketmq
vim avro-rocketmq.conf:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# sink
a1.sinks.k1.type=org.apache.rocketmq.flume.ng.sink.RocketMQSink
a1.sinks.k1.nameserver=192.168.1.129:9876
a1.sinks.k1.channel=channel1
a1.sinks.k1.topic=BenTopic
a1.sinks.k1.producerGroup=MyProducerGroup_1
a1.sinks.k1.tag=Tag1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
启动:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/avro-rocketmq.conf &
1
# 负载均衡
node1 192.168.1.132
node2 192.168.1.134
node3 192.168.1.135
1
2
3
2
3
实现思路: node1的flumeAgent监听一个文件目录,将监听到的文件数据传输给node2、node3(采用负载均衡的特性);
# node1配置
vim agent.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /data/apache-flume-1.9.0-bin/dataDir
a1.sources.r1.fileHeader = true
#define sinkgroups
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin
#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.1.134
a1.sinks.k1.port=4141
#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.1.135
a1.sinks.k2.port=4141
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
启动的时候,必须先创建dataDir:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/agent.conf &
1
# node2、node3配置
vim flume-kafka.conf:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
# 命令输出
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F -c +0 /var/log/nginx/*.log
#a1.sources.r1.shell = /bin/bash -c
# avro rpc
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 拦截器
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER
#保留内容中出现hadoop或者是spark的字符串的记录
a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark)
a1.sources.r1.interceptors.i4.excludeEvents=false
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.135:9092
a1.sinks.k1.kafka.topic = testTopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
分别在node2、node3两个节点修改配置、启动:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/flume-kafka.conf &
1
# 故障转移
# 三部机器
node1 192.168.1.132
node2 192.168.1.134
node3 192.168.1.135
1
2
3
2
3
实现思路: node1的flumeAgent监听一个文件目录,将监听到的文件数据传输给node2,node3备份,当node2宕机时,node3会开始接收数据,知道node2恢复使用;
# node1配置
vim agent.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /data/apache-flume-1.9.0-bin/dataDir
a1.sources.r1.fileHeader = true
#define sinkgroups
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000
#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.1.134
a1.sinks.k1.port=4141
#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.1.135
a1.sinks.k2.port=4141
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
启动的时候,必须先创建dataDir:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/agent.conf &
1
# node2、node3配置
vim flume-kafka.conf:
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
# 命令输出
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F -c +0 /var/log/nginx/*.log
#a1.sources.r1.shell = /bin/bash -c
# avro rpc
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 拦截器
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER
#保留内容中出现hadoop或者是spark的字符串的记录
a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark)
a1.sources.r1.interceptors.i4.excludeEvents=false
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.135:9092
a1.sinks.k1.kafka.topic = testTopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
启动:
nohup /data/apache-flume-1.9.0-bin/bin/flume-ng agent --conf /data/apache-flume-1.9.0-bin/conf/ --name a1 --conf-file /data/apache-flume-1.9.0-bin/conf/flume-kafka.conf &
1