Don't forget to create account on our site to get access to more material made only for free registered user.  

Don't forget to create account on our site to get access to more material made only for free registered user.  

Flume sources are input points into the Flume Agent. The purpose of a Source is to receive data from an external client (e.g. logs or syslogs) and store it into the Flume Channel. A Source can get an instance of its own ChannelProcessor to process an Event. The ChannelProcessor in turn can get an instance of its own ChannelSelector (Replicating or Multiplexing) that’s used to get the Channels associated with the Source, as configured in the Flume properties file. A Transaction can then be retrieved from each associated Channel so that the Source can place Events into the Channel reliably, within a Transaction.

Several sources comes with the flume distribution and also available as open sources. However, you can create your own source by extending the

org.apache.flume.source.AbstarctSource

Understand different type of Sources

1.       TailSource

2.       Exec Source

3.       Spooling Directory

4.       Syslog

a.       Syslog UDP Source

b.      Syslog TCP sources

c.       Multiport syslog TCP sources

1.       Introduction to TailSource and why it is discontinued

                TailSource is no longer part of Flume. Using the TailSource you can tail any file on the system and for each line it can create flume events.

                In case of channels and sinks, events are added and removed from the channel, will be a part of transaction. However, when you tail the file, there is no way, that it could be part of a transaction.

Suppose, because of any reason for instance channel fails, then there is no possibility to rollback this tailed transaction, to put back the data.

 

Let’s have an example, if you are tailing a file

/user/hadoopexam/access.log

 

And in the log4j you had done the configuration to rotate or rename the file if it reaches the 1 MB in size and renaming will be done as below.

/user/hadoopexam/access.log1

 

And assume Flume was reading a file access.log which is renamed to access.log1, however, it has file handler with it so it is still able to read it. But at the same time assume the new log file is also renamed as below

/user/hadoopexam/access.log2

 

Now, Flume is done with the access.log1, and it will start reading the file access.log and it is unaware that there is another file access.log2 was created and that log would be missed by the Apache Flume for reading.

 

So, you might have noticed that using the TailSource there are chances that data could be lost, that is the second reason why TailSource was discontinued after 0.9 flume release.

                1. Tail cannot be a part of transaction

                2. Possibility of data loss as per above example.

Apache Flume with the EXEC sources

 

The exec source command can be used to run a command outside of Flume. Output of that command will be than ingested as an event in the Flume.

 

How to use exec source?

Ans: set the agents source type property to exec as below.

agents.sources.sourceid.type=exec

 

Define the channels as below to fed all the events to particular channel

agents.sources.sourceid.channels=channel1

 

      You can also configure more than one channel, with space as a separator. Now, you have to specify one of the mandatory parameter , which is command to be passed to the operating system as below.

 

agents.sources.sourceid.command=tail –F /user/hadoopexam/access.log

 

          Summary of above configuration:

1.       Here we have single source configured named as sourceid

2.       Agent name is agent

3.       An exec source, wich will tail the access.log file

4.       All the events will be written to the channel1 channel.

 

Important: When you use tail command using exec source type. Flume will fork a child process. Which sometimes does not shutdown, when flume agent shutdown and restarts.

And there would be orphan tail –F process, even you delete the file this tail process will keep the file handler open indefinitely. Hence, you have to kill this process manually, to reclaim files space.

Properties for the exec sources

Key

Required

Type

Default

type

Yes

String

exec

channels

Yes

String

Space-separated list of channels

command

Yes

String

 

restart

No

boolean

FALSE

restartThrottle

No

long (milliseconds)

10000

logStdErr

No

boolean

FALSE

batchSize

No

int

20

 

Another command example

agents.sources.sourceid.command=uptime

 

Uptime commads on the unix box, prints since when when box has been restarted and exits immediately.

Hence, below is the configuration for which this command will be executed every minute periodically.

agents.sources.sourceid.restart=true

agents.sources.sourceid.restartThrottle=6000

 

Now, if you want to capture stdErr as well than use the following properties.

agent.sources.sourceid.logstdErr=true

 

How to improve performance?

Ans: Batching the events: You can specify the number of events to be written per transaction by changing the batch size, which has default value as 20

 

Agent.sources.sourceid.batchSize=2000

 

Why to provide higher value in batchSize?

Ans : When your input data is large and you find that you can not write to your channel fast enough.

Having bigger batch size will reduce the overall average transaction overhead per event. However, you must have tested it before deciding batch size.

 

Spooling Directory (Directory as a source data)

 

Spooling directory as a source is divided to keep track of which files have been processed into flume events and which still needs to be processed. And also in this case it is assumed that the file which is posted in the directory is always complete.

 

Some assumption:

1.       Always completed file will be posted to spooling directory.

2.       File name should never change before Flume process it : Because, source would mess it up on Flume restarts as to which files have been sent and which have not.

You should have separate process to clear out any old files in the spool directory after they have been marked sent by Flume.

agent.sources=sourceid

agent.sources.channel=channel1

agent.sources.sourceid.type=spooldir

agent.sources.sourceid.spoolDir=/hadoopexam/log

Once the file has been processed or transmitted, then it will be processed using following configuration.

agent.sources.sourceid.fileSuffix=.DONE

It has default value as .COMPLTETED

Attaching file path to each event

agent.sources.sourceid.fileHeader=true

agent.sources.sourceid.fileHeaderkey=sourceFile

Above configuration will add the header as {sourceFile=/hadoopexa/logs/access.log}

Buffer Length

bufferMaxLines property is used to set the sie of the memory buffer used in reading files by multiplying it with maxBufferLineLength.

Unique Filename

 
Please make sure, you have proper mechanism, which puts the file in spool directory with unique name, simple way is attach the timestamp with each filename. Hence, there would not be any duplicity if Apache Flume restarts.

 

What is the Problem with SpoolDir?

Ans : Whenever, due to error or any other reason Flume, restarts it will create duplicate events on any files in the spooling directory that are re-transmitted due to not being marked as finished.

 

You have no rights to post comments

Don't forget to create account on our site to get access to more material made only for free registered user.