欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume 1.2.0 用户向导

程序员文章站 2023-12-21 18:51:46
...

概览
     Apache的Flume是一个分布式的,质量可靠,可有效地收集,汇总和来自许多不同来源的大量日志数据到集中的数据存储系统。

    Apache的Flume是在Apache软件基金会的*项目。目前有两个版本的代码行,0.9.x版本和1.x的版本本文件适用于1.x的代码行。请点击这里Flume 0.9.x版本的用户指南。

系统环境要求

    JDK1.6以上版本

数据流

Flume事件被定义为一个单位的数据流量有一个字节的有效载荷和一个可选字符串属性。Flume代理(JVM)的过程中,承载组件,通过这些事件流从外部源的下一个目的地(跳)。

Flume 1.2.0 用户向导

一个web服务器的产生的事件由 Flume源消耗。外部源发送事件发送到Flume中,会带着一个识别的格式。例如: 例如:一个Avro Flume源可以用来接收从Avro clients 其他flume代理Avro link发送事件。当一个Flume 源接收一个事件,他会存储到一个活多个channels中,这些channel会一直保存着event,直到被Flume sink消费处理掉,例如JDBC Channel作为一个例子-它使用一个文件系统支持嵌入式数据库,sink从channel中移除事件,同时放入到一个外部的仓库,比如HDFS,或者流转到下一个Flume source 源,source和sink在agent中是以异步运行方式运行事件。

复杂数据流

Flume到达最终目的地之前,允许用户建立多跳流活动,通过多个代理。它还允许fan-infan-out flows,内容路由和备份路由失败(故障转移)。

配置一个代理(agent)

Flume代理配置存储在本地配置文件。这是一个文本文件格式如下Java属性文件格式。在相同的配置文件,可以指定一个或多个代理的配置。配置文件包括每个源,接收器和代理渠道的性质和它们连接在一起,形成数据流。

配置单个组件

流中每个组件(源,接收器或通道)的名称,类型,和一组特定的类型和实例的属性。例如 Avro源需要一个主机名(或IP地址)和接收数据的端口号。一个内存通道可以有最大队列大小(“能力”),HDFS的散热器需要知道文件系统的URI,路径创建文件,文件的旋转频率(“hdfs.rollInterval”)等,所有这些组件的属性需要设置在托管 Flume 代理的属性文件

 组合组件(Wiring the pieces together)

代理需要知道什么加载各个组件以及它们是如何连接,以构成的流动。这是通过列出的源,汇和代理渠道的名称,然后指定每个接收器和源的连接通道。例如,代理到HDFS flume HDFS cluster1中通过JDBC JDBC通道通道流动称为avroWeb Avro 源的事件。该配置文件将包含这些组件和JDBC通道为avroWeb源和HDFS cluster1中汇作为共享信道的名称。

启动代理(starting an agent)

代理人是开始使用shell脚本称为flume-NG是位于flume分布在bin目录。你需要在命令行上指定的代理的名称,config目录,配置文件:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

现在,代理将开始运行的源和汇的配置在给定的属性文件。

A simple example

在这里,我们举一个例子,配置文件,描述一个单节点的Flume部署。这种配置可以让用户生成的事件和随后输出到控制台。

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444

# Describe sink1
agent1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

这个配置定义了一个单一的代理,称为agent1。 agent1监听44444端口,通道缓存在内存中事件数据,事件数据记录到控制台和一个接收器上的数据源。配置文件名的各个组成部分,然后介绍了他们的类型和配置参数。一个给定的配置文件可能会定义多个命名的代理人;一个给定的Flume进程启动时传递一个标志,告诉它的具名代理体现。

结合此配置文件,我们启动Flume按如下参数:

$ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.root.logger=INFO,console

请注意,在完整部署,我们通常会包括一个选项: - CONF=<conf-dir>。 <conf-dir>目录将包括一个shell脚本flume-env.sh和内置的Log4j属性文件。在这个例子中,我们使用一个Java选项强制flume登录到控制台

我们可以从一个单独的终端,然后telnet端口44444和发送flume事件:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

他原来的flume终端输出日志信息的事件。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

恭喜你 - 你已经成功地配置和部署了一个flume代理!随后的章节涵盖更详细的代理配置。

数据获取

flume支持从从外部数据源获取数据的机制。

 RPC

在flume中 ,Avro客户端使用AVRO RPC机制可以发送一个给定的文件 Avro 源:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

上面的命令将发送的/ usr/logs/log.10的内容到  flume源监听端

Executing commands

还有一个exec执行一个给定的命令获得输出的源。一个单一的输出,即“line”。回车('\ R')或换行符('\ N'),或两者一起的文本。

注:Flume不支持tail做为一个源,不过可以通过exec tail。

Network streams

Flume支持以下的机制,从流行的日志流类型读取数据

  1. Avro
  2. Syslog
  3. Netcat

Flume部署种类

    设置多代理流程

Flume 1.2.0 用户向导

    合并

在日志收集的一个非常普遍的情况是大量生产客户日志的数据发送到一些消费者代理连接到存储子系统。举例来说,从数以百计的Web服务器收集的日志发送到十几代理写入HDFS集群

Flume 1.2.0 用户向导

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent. This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

    多路复用流

Flume 1.2.0 用户向导

  定义流

   在一个单一的代理定义的流,你需要通过一个通道的来源和接收器链接。你需要列出源,接收器和通道,为给定的代理,然后指向源和接收器及通道。一个源的实例可以指定多个通道,但只能指定一个接收器实例通道。格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

例如一个代理名为weblog-agent,外部通过avro客户端,并且发送数据通过内存通道给hdfs。在配置文件weblog.config的可能看起来像这样:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

这将使事件流从avro-AppSrv-source到hdfs-Cluster1-sink通过内存通道mem-channel-1。当代理开始weblog.config作为其配置文件,它会实例化流。

 

  配置单个组件

    定义流之后,你需要设置每个源,接收器和通道的属性。可以分别设定组件的属性值。

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

“type”属性必须为每个组件设置,以了解它需要什么样的对象。每个源,接收器和通道类型有其自己的一套,它所需的性能,以实现预期的功能。所有这些,必须根据需要设置。在前面的例子中,我们拿到从hdfs-Cluster1-sink中的流到HDFS,通过内存通道mem-channel-1的avro-AppSrv-source源。下面是一个例子,显示了这些组件的配置。

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

在一个代理中添加多个流

单个Flume代理可以包含几个独立的流。你可以在一个配置文件中列出多个源,接收器和通道。这些组件可以连接形成多个流。

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

那么你就可以连接源和接收器到其相应的通道,设置两个不同的流。例如,如果您需要设置一个weblog代理两个流,一个从外部Avro客户端到HDFS,另外一个是tail的输出到Avro接收器,然后在这里是做一个配置:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 jdbc-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = jdbc-channel-2
agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2

配置多代理流程

    设置一个多层的流,你需要有一个指向下一跳avro源的第一跳的avro 接收器。这将导致第一Flume代理转发事件到下一个Flume代理。例如,如果您定期发送的文件,每个事件(1文件)AVRO客户端使用本地Flume代理,那么这个当地的代理可以转发到另一个有存储的代理。

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = jdbc-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = jdbc-channel
agent_foo.sinks.avro-forward-sink.channel = jdbc-channel

# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000

# configure other pieces

HDFS agent config:

list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

这里我们连接从weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最终结果从外部源的appserver最终存储在HDFS的事件。

 

 

Fan out flow

Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out,分别是复制和复用。在复制的情况下,流的事件被发送到所有的配置通道。在复用的情况下,事件被发送到可用的渠道中的一个子集。Fan out流需要指定源和Fan out通道的规则。这是通过添加一个通道“选择”,可以复制或复。再进一步指定选择的规则,如果它是一个多路。如果你不指定一个选择,则默认情况下它复制

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

复用的选择集的属性进一步分叉。这需要指定一个事件属性映射到一组通道。选择配置属性中的每个事件头检查。如果指定的值相匹配,那么该事件被发送到所有的通道映射到该值。如果没有匹配,那么该事件被发送到设置为默认配置的通道。

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。下面的示例中有一个单一的流复用两条路径。代理有一个单一的avro源和连接道两个接收器的两个通道。

 

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 jdbc-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 jdbc-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

“State”作为Header的选择检查。如果值是“CA”,然后将其发送到mem-channel-1,如果它的“AZ”的,那么jdbc-channel-2,如果它的“NY”那么发到这两个。如果“State”头未设置或不匹配的任何三个,然后去默认的mem-channel-1通道。

Flume Sources

Avro Source

Avro端口监听并接收来自外部的Avro客户流的事件。当内置AvroSink另一个(前跳)Flume代理,它可以创建分层集合配对拓扑。

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
interceptors Space separated list of interceptors
interceptors.*    

Example for agent named agent_foo

agent_foo.sources = avrosource-1
agent_foo.channels = memoryChannel-1
agent_foo.sources.avrosource-1.type = avro
agent_foo.sources.avrosource-1.channels = memoryChannel-1
agent_foo.sources.avrosource-1.bind = 0.0.0.0
agent_foo.sources.avrosource-1.port = 4141

Exec Source

此源启动运行一个给定的Unix命令,预计这一过程中不断产生标准输出(stderr被简单地丢弃,除非logStdErr= TRUE)上的数据。如果因任何原因的进程退出时,源也退出,并不会产生任何进一步的数据。

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space separated list of interceptors
interceptors.*    

Warning

 

The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. You have been warned.

备注: 在ExecSource不能保证,如果有一个失败的放入到通道的事件,客户也知道。在这种情况下,数据将丢失。

Example for agent named agent_foo:

agent_foo.sources = tailsource-1
agent_foo.channels = memoryChannel-1
agent_foo.sources.tailsource-1.type = exec
agent_foo.sources.tailsource-1.command = tail -F /var/log/secure
agent_foo.sources.tailsource-1.channels = memoryChannel-1


转载于:https://my.oschina.net/88sys/blog/71340

上一篇:

下一篇: