Hadoop业务的整体开发流程: 

 

    从hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。 

(一)Flume架构介绍 

1、Flume的概念 


    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。 

    Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

2、flume架构介绍 

    flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个Java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 

    agent里面包含3个核心的组件: source—->channel—–>sink ,类似生产者、仓库、消费者的架构。 

        source :source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。 

        channel :source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。 

        sink :sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。 

3、flume的运行机制 

    flume的核心就是一个 agent ,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。 

    Event的概念 

        flume的核心是把数据从数据源( source )收集过来,在将收集到的数据送到指定的目的地( sink )。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据( channel ),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。 

        在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?

         —–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 

        为了方便大家理解,给出一张event的数据流向图: 


        一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以: 


        其中event信息就是flume收集到的日记记录。 

4、flume的广义用法 

    flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是 source可以接受多个输入,所谓扇出就是sink可以将 数据输出多个目的地 destination中。 


(二)flume安装

1、上传

2、解压

3、修改conf/flume-env.sh  文件中的JDK目录

     注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项

4、验证安装是否成功  ./flume-ng version

5、配置环境变量

export FLUME_HOME=/home/apache-flume-1.6.0-bin

(三)flume应用—日志采集 

    对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, hbase,甚至是另外一个Source等等。下面我将用具体的案例详述flume的具体用法。 

    其实flume的用法很简单—-书写一个配置文件,在配置文件当中描述source、channel与sink的具体实现,而后运行一个agent实例,在运行agent实例的过程中会读取配置文件的内容,这样flume就会采集到数据。 

配置文件的编写原则: 

1> 从整体上描述代理agent中sources、sinks、channels所涉及到的组件

  # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

2> 详细描述agent中每一个source、sink与channel的具体实现:

即在描述source的时候,需要 指定source到底是什么类型的,即这个source是接受文件的、还是接受http的、还是接受thrift 的;

对于sink也是同理,需要指定结果是输出到HDFS中,还是Hbase中啊等等;

对于channel 需要指定是内存啊,还是数据库啊,还是文件啊等等。

# Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444


    # Describe the sink

    a1.sinks.k1.type = logger


    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

3> 通过channel将source与sink连接起来

 # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

启动agent的shell操作:

    flume-ng  agent -n a1  -c  ../conf   -f  ../conf/example.file  -Dflume.root.logger=DEBUG,console 

参数说明:

    -n 指定agent名称(与配置文件中代理的名字相同) 

    -c 指定flume中配置文件的目录 

    -f 指定配置文件 

    -Dflume.root.logger=DEBUG,console 设置日志等级

Source、Channel、Sink有哪些类型

-- Flume Source

Source类型
说明
Avro Source
支持Avro协议(实际上是Avro RPC),内置支持
Thrift Source 
支持Thrift协议,内置支持
Exec Source
基于Unix的command在标准输出上生产数据
JMS Source
从JMS系统(消息、主题)中读取数据
Spooling Directory Source
监控指定目录内数据变更
Twitter 1% firehose Source
通过API持续下载Twitter数据,试验性质
Netcat Source

监控某个端口,将流经端口的每一个文本行数据作为Event输入

Sequence Generator Source

序列生成器数据源,生产序列数据

Syslog Sources

 读取syslog数据,产生Event,支持UDP和TCP两种协议

HTTP Source

基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式

Legacy Sources 

兼容老的Flume OG中Source(0.9.x版本)

-- Flume Channel

Channel 类型
说明
Memory Channel 
Event数据存储在内存中
JDBC Channel   
Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel 
Event数据存储在磁盘文件中
Spillable Memory Channel
Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
Pseudo Transaction Channel
测试用途
Custom Channel
自定义Channel实现

 -- Flume Sink

Source类型
说明
HDFS Sink 
数据写入HDFS
Thrift Source 
数据写入日志文件
Avro Sink
数据被转换成Avro Event,然后发送到配置的RPC端口上
Thrift Sink 
数据被转换成Thrift Event,然后发送到配置的RPC端口上
IRC Sink
数据在IRC上进行回放
File Roll Sink
存储数据到本地文件系统
Null Sink

丢弃到所有数据

HBase Sink 

数据写入HBase数据库

Morphline Solr Sink

数据发送到Solr搜索服务器(集群)

ElasticSearch Sink

数据发送到Elastic Search搜索服务器(集群)

Kite Dataset Sink 

写数据到Kite Dataset,试验性质的

Custom Sink

自定义Sink实现

(三)案例

官方文档:http://flume.apache.org/FlumeUserGuide.html

案例1、 A simple example

    配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    启动flume

flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

    安装telnet

        yum install telnet

        退出 ctrl+]  quit

    Memory Chanel 配置

          capacity:默认该通道中最大的可以存储的event数量是100,

          trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100

          keep-alive:event添加到通道中或者移出的允许时间

          byte**:即event的字节量的限制,只包括eventbody


案例2、两个flume做集群

    node01服务器中,配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444

# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    node02服务器中,安装Flume(步骤略)

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    先启动node02的Flume

    再启动node01的Flume

    打开telnet 测试  node02控制台输出结果


案例3、Exec Source

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    启动Flume

    创建空文件演示

touch flume.exec.log

    循环添加数据

for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done


案例4、Spooling Directory Source

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    启动Flume

    拷贝文件演示

mkdir logs
cp flume.exec.log logs/


案例5、hdfs sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M

##每隔60s或者文件大小超过10M的时候产生新文件
# hdfs有多少条消息时新建文件,0不基于消息个数
a1.sinks.k1.hdfs.rollCount=0
# hdfs创建多长时间新建文件,0不基于时间
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大时新建文件,0不基于文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout=3

a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true

## 每五分钟生成一个目录:
# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式
a1.sinks.k1.hdfs.round=true
# 时间上进行“舍弃”的值;
a1.sinks.k1.hdfs.roundValue=5
# 时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    创建HDFS目录

hadoop fs -mkdir /flume

    启动Flume

    写入数据,查看hdfs文件

添加新评论