返回 导航

大数据

hangge.com

Flume日志采集工具使用详解9(Sink处理器:负载均衡、故障转移)

作者:hangge | 2024-03-19 08:55
Sink 处理器类型包括如下三种:
  • Default Sink Processor 是默认的,不用配置 sinkgroup,就是我们之前一直使用的最普通的形式,一个 channel 后面接一个 sink 的形式
  • Load balancing Sink Processor 是负载均衡处理器,一个 channel 后面可以接多个 sink,这多个 sink 属于一个 sink group,根据指定的算法进行轮询或者随机发送,减轻单个 sink 的压力
  • Failover Sink Processor 是故障转移处理器,一个 channel 后面可以接多个 sink,这多个 sink 属于一个 sink group,按照 sink 的优先级,默认先让优先级高的 sink 来处理数据,如果这个 sink 出现了故障,则用优先级低一点的 sink 处理数据,可以保证数据不丢失。
我在之前的文章中都是使用默认的 Default Sink Processor,本文接着演示另外两个 Sink Processor 的使用。

九、Sink 处理器1:Load balancing Sink Processor(负载均衡)

1,整体架构

(1)该案例中一个 channel 后面接了两个 sink,这两个 sink 属于一个 sink group,通过轮询的方式处理数据,从而达到负载均衡的效果。
(2)这个负载均衡案例可以解决之前单节点输出能力有限的问题,可以通过多个 sink 后面连接多个 Agent 实现负载均衡,如果后面的 Agent 挂掉 1 个,也不会影响整体流程,只是处理效率又恢复到了之前的状态。

2,配置 Agent

(1)首先配置 bigdata04 节点上的这个 Agent
参数说明:
  • processor.type:针对负载均衡的 sink 处理器,这里需要指定 load_balance
  • processor.selector:此参数的值内置支持两个,round_robinrandomround_robin 表示轮询,按照 sink 的顺序,轮流处理数据,random 表示随机。
  • processor.backoff:默认为 false,设置为 true 后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长;一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率。
  • processor.selector.maxTimeOut:最大的黑名单时间,默认是 30
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件,[为了方便演示效果,把batch-size设置为1]
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.60.101
a1.sinks.k1.port=41414
a1.sinks.k1.batch-size = 1

a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.60.102
a1.sinks.k2.port=41414
a1.sinks.k2.batch-size = 1

# 配置sink策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(2)接着配置 bigdata02 上的 Agent
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data101
a1.sinks.k1.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)接着配置 bigdata03 上的 Agent
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3,测试 Agent

(1)首先我们依次启动 bigdata02bigdata03bigdata04 上的 Agent
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666

(3)连接上后发送两条数据:

(4)到 hdfs 上验证数据,可以看到由于轮询这两条数据分别存放到不同的 path 上:
[root@bigdata01 soft]# hdfs dfs -ls /load_balance
Found 2 items
-rw-r--r--   2 root supergroup          6 2024-02-03 12:44 /load_balance/data101.1588481094383.log.tmp
-rw-r--r--   2 root supergroup          6 2024-02-03 12:44 /load_balance/data102.1588481087463.log.tmp
[root@bigdata01 soft]# hdfs dfs -cat /load_balance/data101.1588481094383.log.tmp
hangge
[root@bigdata01 soft]# hdfs dfs -cat /load_balance/data102.1588481087463.log.tmp
baidu

十、Sink 处理器2:Failover Sink Processor(故障转移)

1,整体架构

(1)该案例也是一个 channel 后面接了两个 sink,但是这里和负载均衡架构不一样的是,这两个 sink 正常情况下只有一个干活,另一个是不干活的
(2)如果某一个 sink 输出功能失效,另一个还可以顶上来,同时只会存在一个真正输出数据的 sink。通过故障转移从而解决 sink 组件单点故障的问题。

2,配置 Agent

(1)首先配置 bigdata04 节点上的这个 Agent
参数说明:
  • sinks:指定这个 sink groups 中有哪些 sink,指定 sink 的名称,多个的话中间使用空格隔开即可
  • processor.type:针对故障转移的 sink 处理器,使用 failover
  • processor.priority:指定 sink group 中每一个 sink 组件的优先级,默认情况下 channel 中的数据会被优先级比较高的 sink 取走
  • processor.maxpenaltysink 发生故障之后,最大等待时间
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件,[为了方便演示效果,把batch-size设置为1]
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.60.101
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.60.102
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 1

# 配置sink策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(2)接着配置 bigdata02 上的 Agent
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:9000/failover
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data101
a1.sinks.k1.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)接着配置 bigdata03 上的 Agent
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:9000/failover
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3,测试 Agent

(1)首先我们依次启动 bigdata02bigdata03bigdata04 上的 Agent
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666

(3)连接上后发送两条数据:

(4)然后到 hdfs 上验证数据,发现两条数据是通过 bigdata02 这台机器写出去的,因为对应 bigdata02 这台机器的 sink 组件的优先级比较高:
[root@bigdata01 ~]# hdfs dfs -ls /failover
Found 1 items
-rw-r--r--   2 root supergroup         12 2024-02-03 15:17 /failover/data101.1588490221243.log.tmp
[root@bigdata01 ~]# hdfs dfs -cat /failover/data101.1588490221243.log.tmp
hangge
baidu

(5)接下来我们将 bigdata02agent 停止模拟这台机器上的的 Agent 挂掉,也就意味着 k1 这个 sink 写不出去数据了,此时,我们再通过 socket 发送一条数据:
jojo

(6)此时再到 hdfs 上查看一下,会发现新采集的数据会通过 bigdata03 上的 Agent 写出去:
[root@bigdata01 ~]# hdfs dfs -ls /failover
Found 2 items
-rw-r--r--   2 root supergroup          6 2023-02-03 15:17 /failover/data102.1588490267828.log.tmp
-rw-r--r--   2 root supergroup         12 2023-02-03 15:17 /failover/data101.1588490221243.log
[root@bigdata01 ~]# hdfs dfs -cat /failover/data102.1588490267828.log.tmp
jojo

(7)此时如果把 bigdata02 上的 Agent 再启动的话,会发现新采集的数据会通过 bigdata02 上的 Agent 写出去,这是因为它的优先级比较高。这就是 Sink 故障转移的应用。
评论

全部评论(0)

回到顶部