Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Friday, June 23, 2017

Streaming Twitter Data using Apache Flume


Introduction                                                                           

Flume is a distributed service for efficiently collecting, aggregating, and moving large amounts of streaming event data. It is a highly reliable, distributed, and configurable tool. It is principally designed to copy streaming data (event/log data) from various web servers and services like Facebook and Twitter to HDFS.


When the rate of incoming data exceeds the rate at which data can be written to the destination, Flume acts as a mediator between data producers and the centralized stores and provides a steady flow of data between them. The transactions in Flume are channel-based where two transactions (one sender and one receiver) are maintained for each message. It guarantees reliable message delivery.


Architecture

Events: Flume represents data as events. Events are very simple data structures, with a body and a set of headers. The body of the event is a byte array that usually is the payload that Flume is transporting. The headers are represented as a map with string keys and string values. Headers are not meant to transfer data, but for routing purposes and to keep track of priority, severity of events being sent, etc. The headers can be used to add event IDs or UUIDs to events as well.


Flume Agent: The simplest unit of deployment of Flume is called a Flume agent. An agent is a Java application that receives or generates data and buffers it until it is eventually written to the next agent or to a storage or indexing system. Each Flume agent consists of three major components: sources, channels, and sinks. Flume may have more than one agent.



Sources are active components that receive data from some other application that is producing the data. Each source must be connected to at least one channel. A source can write to several channels, replicating the events to all or some of the channels, based on some criteria. Examples include Avro source, Thrift source, twitter 1% source etc.

Channels are, in general, passive components that buffer data that has been received by the agent, but not yet written out to another agent or to a storage system. Channels behave like queues, with sources writing to them and sinks reading from them. An agent can have
many channels. Examples include JDBC channel, File system channel, Memory channel, etc.


Sinks poll their respective channels continuously to read and remove events. The sinks push events to the next hop or to the final destination. Once the data is safely at the next hop or at its destination, the sinks inform the channels, via transaction commits, that those events can now be deleted from the channels. Each sink, though, can read from only exactly one channel. If multiple sinks read from the same channel, it is guaranteed that
exactly one sink will read a specific event from the channel. Example includes HDFS sink

Each event must essentially be an independent record, rather than a part of a record. This also imposes the requirement that each event be able to fit in the memory of the Flume agent JVM. If a File Channel is being used, then there should be enough disk space to accommodate this. If data cannot be represented as multiple individual records, Flume might not be a good fit for the use case.



Interceptors: An interceptor is a point in your data flow where you can inspect and alter Flume events. You can chain zero or more interceptors after a source creates an event.




Channel selectors: These are responsible for how data moves from a source to one or more channels. Flume comes packaged with two channel selectors that cover most use cases you might have. A replicating channel selector (the default) simply puts a copy of the event into each channel, assuming you have configured more than one. A multiplexing channel selector can write to different channels depending on some header information. Combined with some interceptor logic, this duo forms the foundation for routing input to different channels.

Sink processor: It is the mechanism by which you can create failover paths for your sinks or load balance events across multiple sinks from a channel.

Collector: The data in agents will be collected by an intermediate node known as Collector. Just like agents, there can be multiple collectors in Flume. Finally, the data from all these collectors will be aggregated and pushed to a centralized store such as HDFS.






Multi-hop Flow: An event may travel through more than one agent. This is known as multi-hop flow.

Fan-out Flow:
 The dataflow from one source to multiple channels is known as fan-out flow.

Fan-in Flow: The data flow in which the data will be transferred from many sources to one channel is known as fan-in flow.

Failure Handling: The sender sends events to the receiver. Soon after receiving the data, the receiver commits its own transaction and sends a “received” signal to the sender. After receiving the signal, the sender commits its transaction. Sender will not commit its transaction till it receives a signal from the receiver.)

When to use Flume

Flume is primarily meant to push data from a large number of production servers to HDFS, HBase, etc. In cases where Flume is not a good fit, there is often an easier method, like Web HDFS or the HBase HTTP API, that can be used to write data. If there are only a handful of production servers producing data and the data does not need to be written out in real time, then it might also make sense to just move the data to HDFS via Web HDFS or NFS, especially if the amount of data being written out is relatively small—a few files of a few GB every few hours will not hurt HDFS. In this case, planning, configuring, and deploying Flume may not be worth it. Flume is really meant to push events in real time where the stream of data is continuous and its volume reasonably large.

Installing/Configuring Flume                                                 

1- Download the latest version of Apache Flume software from the below link and extract it on desired location eg; /usr/hadoopsw/apache-flume-1.7.0-bin.

https://flume.apache.org/download.html

[root@te1-hdp-rp-en01 ~]# chown -R hdpclient:hadoop_edge /usr/hadoopsw/apache-flume-1.7.0-bin
[root@te1-hdp-rp-en01 ~]# chmod -R 755  /usr/hadoopsw/apache-flume-1.7.0-bin

2- Configure Flume, you need to modify three files namely, flume-env.sh, flumeconf.properties, and .bash_profile.
Add variables in .bash_profile

vi .bash_profile
### Flume Variables
export FLUME_HOME=/usr/hadoopsw/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/bin
export CLASSPATH=$CLASSPATH:$FLUME_HOME/lib/*

Run the below command to take effect
[hdpclient@te1-hdp-rp-en01 ~]$ source ~/.bash_profile

Go to conf folder, you will see four files there.

[hdpclient@te1-hdp-rp-en01 /]$ cd $FLUME_HOME/conf
[hdpclient@te1-hdp-rp-en01 conf]$ ll

total 16
-rwxr-xr-x. 1 hdpclient hadoop_edge 1661 Sep 26 2016 flume-conf.properties.template
-rwxr-xr-x. 1 hdpclient hadoop_edge 1455 Sep 26 2016 flume-env.ps1.template
-rwxr-xr-x. 1 hdpclient hadoop_edge 1565 Sep 26 2016 flume-env.sh.template
-rwxr-xr-x. 1 hdpclient hadoop_edge 3107 Sep 26 2016 log4j.properties


Copy flume-conf.properties.template file to flume-conf.properties and flume-env.sh.template as flume-env.sh
[hdpclient@te1-hdp-rp-en01 conf]$ cp flume-conf.properties.template flume-conf.properties
[hdpclient@te1-hdp-rp-en01 conf]$ cp flume-env.sh.template flume-env.sh

in flume-env.sh Set the JAVA_HOME to the folder where Java was installed in your system.

vi flume-env.sh
export JAVA_HOME=/usr/java/default/

3- Verify the installation of Apache Flume, you will get help doc upon successful installation.

[hdpclient@te1-hdp-rp-en01 conf]$ flume-ng
Error: Unknown or unspecified command ''
Usage: /usr/hadoopsw/apache-flume-1.7.0-bin/bin/flume-ng <command> [options]...
commands:
help display this help text
agent run a Flume agent
avro-client run an avro Flume client
version show Flume version info
.....
....


4- Configure flume using the configuration file (eg; twitter.conf) which is a Java property file having key-value pairs. The configuration file needs to define the sources, the channels and the sinks. Sources, channels and sinks are defined per agent for which name is given by you.

Flume supports various sources, sinks, and channels as listed below.

SourcesChannelsSinks
  • Avro Source
  • Thrift Source
  • Exec Source
  • JMS Source
  • Spooling Directory Source
  • Twitter 1% firehose Source
  • Kafka Source
  • NetCat Source
  • Sequence Generator Source
  • Syslog Sources
  • Syslog TCP Source
  • Multiport Syslog TCP Source
  • Syslog UDP Source
  • HTTP Source
  • Stress Source
  • Legacy Sources
  • Thrift Legacy Source
  • Custom Source
  • Scribe Source
  • Memory Channel
  • JDBC Channel
  • Kafka Channel
  • File Channel
  • Spillable Memory Channel
  • Pseudo Transaction Channel
  • HDFS Sink
  • Hive Sink
  • Logger Sink
  • Avro Sink
  • Thrift Sink
  • IRC Sink
  • File Roll Sink
  • Null Sink
  • HBaseSink
  • AsyncHBaseSink
  • MorphlineSolrSink
  • ElasticSearchSink
  • Kite Dataset Sink
  • Kafka
You can use any of them. For example, if you are transferring Twitter data using Twitter source through a memory channel to an HDFS sink, and the agent name id TwitterAgent1, then

TwitterAgent1.sources = Twitter
TwitterAgent1.channels = MemChannel
TwitterAgent1.sinks = HDFS

After this, for each one of the sources, the type is defined. Along with the property “type”, it is also needed to provide the values of all the required properties of a particular source to configure it. For example, for twitter source, following are the properties to which we must provide values.

TwitterAgent1.sources.Twitter.type = Twitter (type name)
TwitterAgent1.sources.Twitter.consumerKey =
TwitterAgent1.sources.Twitter.consumerSecret =
TwitterAgent1.sources.Twitter.accessToken =
TwitterAgent1.sources.Twitter.accessTokenSecret = 


Now you need to define the channel to transfer data between sources and sinks. For example, if we consider memory channel, following are the properties to which we must provide values to configure it.

TwitterAgent1.channels.MemChannel.type = memory (type name)


After this you need to define sink along with its properties. For example, if we consider HDFS sink, following are the properties to which we must provide values to configure it.

TwitterAgent1.sinks.HDFS.type = hdfs (type name) 
TwitterAgent1.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data

Now bind the Source and the Sink to the Channel. For example to bind the sources and the sinks to a channel, we consider twitter source, memory channel, and HDFS sink.

TwitterAgent1.sources.Twitter.channels = MemChannel 
TwitterAgent1.sinks.HDFS.channels = MemChannel 


vi $FLUME_HOME/conf/twitter.conf
## Define Sources, channels and sinks
TwitterAgent1.sources = Twitter 
TwitterAgent1.channels = MemChannel 
TwitterAgent1.sinks = HDFS 

## for each one of the sources, define type with all required properties
TwitterAgent1.sources.Twitter.type = Twitter
TwitterAgent1.sources.Twitter.consumerKey = 
TwitterAgent1.sources.Twitter.consumerSecret = 
TwitterAgent1.sources.Twitter.accessToken = 
TwitterAgent1.sources.Twitter.accessTokenSecret = 

## define channel to transfer data between sources and sinks
TwitterAgent1.channels.MemChannel.type = memory

##  define sink along with its properties
TwitterAgent1.sinks.HDFS.type = hdfs
TwitterAgent1.sinks.HDFS.hdfs.path = /flume/twitter

## bind the Source and the Sink to the Channel.
TwitterAgent1.sources.Twitter.channels = MemChannel 
TwitterAgent1.sinks.HDFS.channels = MemChannel 

5- After configuration, start the Flume agent.

flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent1 

Streaming Twitter Data                                                          

We will create an application and get the tweets from it using the experimental twitter source provided by Apache Flume. We will use the memory channel to buffer these tweets and HDFS sink to push these tweets into the HDFS.

To fetch Twitter data, we will have to follow the steps given below
  • Create a twitter Application 
  • Install / Start HDFS (already done)
  • Configure Flume 

1- Go to https://apps.twitter.com/, sign into your twitter account. Click on the Create New App button. You will be redirected to a window where you will get an application form in which you have to fill in your details in order to create the App. After putting details click on Create your Twitter application button which is at the bottom of the page.




2- Click on keys and Access Tokens tab, at the bottom of the page click Create my access token button to generate the access token.




3-  Now click on Keys and Access Token tab to get Consumer key, Consumer secret, Access token, and Access token secret. These are useful to configure the agent in Flume.

4- Create directory in HDFS to store tweets
[hdpsysuser@te1-hdp-rp-nn01 ~]$ hdfs dfs -mkdir -p /flume/twitter

5- Configure flume, we will use experimental source provided by Apache Flume named Twitter 1% Firehose Memory channel and HDFS sink. witter 1% Firehose connects to the 1% sample Twitter Firehose using streaming API and continuously downloads tweets, converts them to Avro format, and sends Avro events to a downstream Flume sink. Avro stores the data definition in JSON format making it easy to read and interpret, the data itself is stored in binary format making it compact and efficient. This source is available by default along with the installation of Flume. The jar files corresponding to this source can be located in the $FLUME_HOME/lib folder

Example Configuration File

vi twitter.conf

# Naming the components on the current agent. 
TwitterAgent1.sources = Twitter 
TwitterAgent1.channels = MemChannel 
TwitterAgent1.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent1.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent1.sources.Twitter.consumerKey =<<Provide Key>>
TwitterAgent1.sources.Twitter.consumerSecret = <<Provide Key>> 
TwitterAgent1.sources.Twitter.accessToken = <<Provide Token>> 
TwitterAgent1.sources.Twitter.accessTokenSecret = <<Provide Token Secret>> 
TwitterAgent1.sources.Twitter.keywords = saudi,qatar,bigdata
  
# Describing/Configuring the sink 

TwitterAgent1.sinks.HDFS.type = hdfs 

TwitterAgent1.sinks.HDFS.hdfs.path = /flume/twitter/
TwitterAgent1.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent1.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent1.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent1.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent1.sinks.HDFS.hdfs.rollCount = 10000 

# Describing/Configuring the channel 
TwitterAgent1.channels.MemChannel.type = memory 
TwitterAgent1.channels.MemChannel.capacity = 10000 
TwitterAgent1.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent1.sources.Twitter.channels = MemChannel
TwitterAgent1.sinks.HDFS.channel = MemChannel 


6- Run Flume Agent

flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent1



7- Verify HDFS

Browse HDFS folder  /flume/twitter created earlier 



dump output of any HDFS file to temp file

hdfs dfs -cat /flume/twitter/FlumeData.1497207521006 > /tmp/flumetmp.txt 

open file in text editor to view contents, observe avro format and schema information


[hdpsysuser@vbgeneric ~]$ hdfs dfs -copyToLocal /flume/twitter/FlumeData.1497207521006 /tmp


Avro is a data serialization format. Avro stores the data definition in JSON format making it easy to read and interpret, the data itself is stored in binary format making it compact and efficient. Avro filesinclude markers that can be used to splitting large data sets into subsets suitable for MapReduce processing.

8- As you cannot read the Avro file from text editor, you need Avro tools

Download avro tools from below direct link

http://www.apache.org/dyn/closer.cgi/avro/
http://www-eu.apache.org/dist/avro/stable/java/
http://www-eu.apache.org/dist/avro/stable/java/avro-tools-1.8.2.jar

Copy avro-tools-1.8.2.jar to $FLUME_HOME/lib



Tools included in Avro ToolsJust run Avro Tools without any parameters to see what’s included:
[hdpsysuser@vbgeneric ~]$ java -jar $FLUME_HOME/lib/avro-tools-1.8.2.jar
Version 1.8.2
 of Apache Avro
Copyright 2010-2015 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
----------------
Available tools:
          cat  extracts samples from files
      compile  Generates Java code for the given schema.
       concat  Concatenates avro files without re-compressing.
   fragtojson  Renders a binary-encoded Avro datum as JSON.
     fromjson  Reads JSON records and writes an Avro data file.
     fromtext  Imports a text file into an avro data file.
      getmeta  Prints out the metadata of an Avro data file.
    getschema  Prints out schema of an Avro data file.
          idl  Generates a JSON schema from an Avro IDL file
 idl2schemata  Extract JSON schemata of the types from an Avro IDL file
       induce  Induce schema/protocol from Java class/interface via reflection.
   jsontofrag  Renders a JSON-encoded Avro datum as binary.
       random  Creates a file with randomly generated instances of a schema.
      recodec  Alters the codec of a data file.
       repair  Recovers data from a corrupt Avro Data file
  rpcprotocol  Output the protocol of a RPC service
   rpcreceive  Opens an RPC Server and listens for one message.
      rpcsend  Sends a single RPC message.
       tether  Run a tethered mapreduce job.
       tojson  Dumps an Avro data file as JSON, record per line or pretty.
       totext  Converts an Avro data file to a text file.
     totrevni  Converts an Avro data file to a Trevni file.
  trevni_meta  Dumps a Trevni file's metadata as JSON.
trevni_random  Create a Trevni file filled with random instances of a schema.
trevni_tojson  Dumps a Trevni file as JSON.
[hdpsysuser@vbgeneric ~]$

Likewise run any particular tool without parameters to see its usage/help output.

[hdpsysuser@vbgeneric ~]$ java -jar $FLUME_HOME/lib/avro-tools-1.8.2.jar concat

concat [input-file...] output-file
Concatenates one or more input files into a new output file
by appending the input blocks without decoding them. The input
files must have the same schema, metadata and codec. If they
do not the tool will return the following error codes:

1 if the schemas don't match
2 if the metadata doesn't match
3 if the codecs don't match

If no input files are given stdin will be used. The tool
0 on success. A dash ('-') can be given as an input file
to use stdin, and as an output file to use stdout.


Derive the schema from the avro data file, I don't have any idea why the schema derived from the avro data file only has two columns header and body:

[hdpsysuser@vbgeneric ~]$ java -jar $FLUME_HOME/lib/avro-tools-1.8.2.jar getschema /tmp/FlumeData.1497207521006
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{
  "type" : "record",
  "name" : "Doc",
  "doc" : "adoc",
  "fields" : [ {
    "name" : "id",
    "type" : "string"
  }, {
    "name" : "user_friends_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_location",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_description",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_statuses_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_followers_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_screen_name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "created_at",
    "type" : [ "string", "null" ]
  }, {
    "name" : "text",
    "type" : [ "string", "null" ]
  }, {
    "name" : "retweet_count",
    "type" : [ "long", "null" ]
  }, {
    "name" : "retweeted",
    "type" : [ "boolean", "null" ]
  }, {
    "name" : "in_reply_to_user_id",
    "type" : [ "long", "null" ]
  }, {
    "name" : "source",
    "type" : [ "string", "null" ]
  }, {
    "name" : "in_reply_to_status_id",
    "type" : [ "long", "null" ]
  }, {
    "name" : "media_url_https",
    "type" : [ "string", "null" ]
  }, {
    "name" : "expanded_url",
    "type" : [ "string", "null" ]
  } ]
}

Run the above agent (Point 6) and get the data in HDFS, find out the schema of the avro data and create a Hive table as:

CREATE EXTERNAL TABLE TwitterData

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'

WITH SERDEPROPERTIES ('avro.schema.literal'='

{

  "type" : "record",

  "name" : "Doc",

  "doc" : "adoc",

  "fields" : [ {

    "name" : "id",
    "type" : "string"
  }, {
    "name" : "user_friends_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_location",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_description",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_statuses_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_followers_count",
    "type" : [ "int", "null" ]
  }, {
    "name" : "user_name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "user_screen_name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "created_at",
    "type" : [ "string", "null" ]
  }, {
    "name" : "text",
    "type" : [ "string", "null" ]
  }, {
    "name" : "retweet_count",
    "type" : [ "long", "null" ]
  }, {
    "name" : "retweeted",
    "type" : [ "boolean", "null" ]
  }, {
    "name" : "in_reply_to_user_id",
    "type" : [ "long", "null" ]
  }, {
    "name" : "source",
    "type" : [ "string", "null" ]
  }, {
    "name" : "in_reply_to_status_id",
    "type" : [ "long", "null" ]
  }, {
    "name" : "media_url_https",
    "type" : [ "string", "null" ]
  }, {
    "name" : "expanded_url",
    "type" : [ "string", "null" ]
  } ]
}

')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/flume/twitter'
;

Describe Hive Table:

hive> describe  twitterdata;
OK
id                      string
user_friends_count      int
user_location           string
user_description        string
user_statuses_count     int
user_followers_count    int
user_name               string
user_screen_name        string
created_at              string
text                    string
retweet_count           bigint
retweeted               boolean
in_reply_to_user_id     bigint
source                  string
in_reply_to_status_id   bigint
media_url_https         string
expanded_url            string
Time taken: 2.203 seconds, Fetched: 17 row(s)


Query the table:

hive> select * from twitterdata limit 1;
OK
873977622857998336      203     NULL    NULL    4008    29      Joumana rockmine56      2017-06-11T14:57:57Z    رضيت بالله رباً وبالإسلام ديناً وبمحمد صلى الله عليه وسلم نبياً https://t.co/Ax99sfOWoS      0       false   -1      <a href="http://du3a.org" rel="nofollow">تطبـيـق دعـــــاء </a>   -1      NULL    NULL
Time taken: 7.709 seconds, Fetched: 1 row(s)

hive> select * from twitterdata limit 2;
OK
Failed with exception java.io.IOException:org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40
Time taken: 2.143 seconds


You may observe the above error when you query more than one row in hive table based on Avro. This is because of the Flume source Twitter 1% firehose which is highly experimental. To get rid of this issue and to make the example really working, I'll write another post using Cloudera twitter source.

Note
You may get below error while streaming tweets if your VM time is not sync with host machine. You can go to VM setting's System option and select 'Hardware Clock in UTC Time' checkbox. You also need to install VMbox guest additions.

2018-03-16 00:13:19,705 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] 401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync.
<html>\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>\n<title>Error 401 Unauthorized</title>
</head>
<body>
<h2>HTTP ERROR: 401</h2>
<p>Problem accessing '/1.1/statuses/filter.json'. Reason:
<pre>    Unauthorized</pre>
</body>

</html>

No comments: