Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Friday, June 23, 2017

Streaming Twitter Data by Flume using Cloudera Twitter Source

In my previous post Streaming Twitter Data using Apache Flume which fetches tweets using Flume and twitter streaming for data analysis.Twitter streaming converts tweets to Avro format and send Avro events to downsteam HDFS sinks, when Hive table backed by Avro load the data, I got the error message said "Avro block size is invalid or too large". In order to overcome this issue, I used Cloudera TwitterSource rather than apache TwitterSource.


Download Cloudera twitter source 

Download Cloudera twitter source from below location
https://github.com/cloudera/cdh-twitter-example 


click clone and download button, it will download a zip file (cdh-twitter-example-master.zip)
unzip this file , it will have different folder including flume-sources and hive-serdes.


After download and unzip, compile flume-sources.1.0-SNAPSHOT.jar. This jar contains the implementation of Cloudera TwitterSource. You should compile it using maven

After unzip file , it has a folder flume-sources. Copy it to your node where Maven is installted. I placed this folder in /usr/hadoopsw/


The flume-sources directory contains a Maven project with a custom Flume source designed to connect to the Twitter Streaming API and ingest tweets in a raw JSON format into HDFS.


Download Maven
If you don't have maven already then download Maven (apache-maven-3.5.0-bin.tar.gz) from below location


https://maven.apache.org/download.cgi
[hdpsysuser@vbgeneric ~]$ tar -xvf apache-maven-3.5.0-bin.tar.gz
[hdpsysuser@vbgeneric ~]$ cd flume-sources/
[hdpsysuser@vbgeneric flume-sources]$ pwd
/usr/hadoopsw/flume-sources

[hdpsysuser@vbgeneric flume-sources]$ ll
total 12
-rw-r--r-- 1 root root 2224 Jul 14  2016 flume.conf
-rw-r--r-- 1 root root 4108 Jul 14  2016 pom.xml
drwxr-xr-x 3 root root   17 Jun 11 17:49 src

[hdpsysuser@vbgeneric flume-sources]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building flume-sources 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ flume-sources ---
[INFO] Compiling 2 source files to /usr/hadoopsw/flume-sources/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ flume-sources ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ flume-sources ---
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom (1.5 kB at 559 B/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom (19 kB at 43 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom (24 kB at 67 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom (3.0 kB at 9.7 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom (2.5 kB at 8.0 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.pom
.....
....

Downloaded: https://repo.maven.apache.org/maven2/asm/asm-util/3.2/asm-util-3.2.jar (37 kB at 32 kB/s)
[INFO] Including org.slf4j:slf4j-api:jar:1.6.1 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-stream:jar:3.0.5 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-core:jar:3.0.5 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT.jar with /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 58.692 s
[INFO] Finished at: 2017-06-11T18:04:30-04:00
[INFO] Final Memory: 27M/283M
[INFO] ------------------------------------------------------------------------

This will generate a file called flume-sources-1.0-SNAPSHOT.jar in the target directory.
[hdpsysuser@vbgeneric flume-sources]$ ll
total 24
-rw-rw-r-- 1 hdpsysuser hdpsysuser 10633 Jun 11 18:04 dependency-reduced-pom.xml
-rw-r--r-- 1 hdpsysuser hadoop_grp  2224 Jul 14  2016 flume.conf
-rw-r--r-- 1 hdpsysuser hadoop_grp  4108 Jul 14  2016 pom.xml
drwxr-xr-x 3 hdpsysuser hadoop_grp    17 Jun 11 17:49 src
drwxrwxr-x 5 hdpsysuser hdpsysuser   148 Jun 11 18:04 target

[hdpsysuser@vbgeneric flume-sources]$ cd target/
[hdpsysuser@vbgeneric target]$ ll
total 388
drwxrwxr-x 3 hdpsysuser hdpsysuser     16 Jun 11 18:03 classes
-rw-rw-r-- 1 hdpsysuser hdpsysuser 388883 Jun 11 18:04 flume-sources-1.0-SNAPSHOT.jar
drwxrwxr-x 3 hdpsysuser hdpsysuser     24 Jun 11 18:03 generated-sources
drwxrwxr-x 2 hdpsysuser hdpsysuser     27 Jun 11 18:04 maven-archiver
-rw-rw-r-- 1 hdpsysuser hdpsysuser   7000 Jun 11 18:04 original-flume-sources-1.0-SNAPSHOT.jar

Now copy flume-sources-1.0-SNAPSHOT.jar to $FLUME_HOME/lib

Modify twitter.conf file
Now Change the type in twitter.conf file as explained in Streaming Twitter Data using Apache Flume I used apache TwitterSource ie: TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource

Please change it to cloudera TwitterSource: ie;
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

# Describing/Configuring the source 
#TwitterAgent1.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

Remove the files in HDFS and run the agent again to stream twitter data
[hdpsysuser@vbgeneric ~]$ hdfs dfs -rm /flume/twitter/*

flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent1

Now tweets streamed successfully, so cat and view the contents

[hdpsysuser@vbgeneric target]$ hdfs dfs -cat /flume/twitter/FlumeData.1497219143017

we will get the below error if we query the hive table based on Avro (in previous post)  as file generated by cloudera jar is text JSON

hive> select * from twitterdata limit 2;
OK
Failed with exception java.io.IOException:java.io.IOException: Not a data file.
Time taken: 0.591 seconds
hive>

Setting up Hive

Build or Download the JSON SerDeCopy hive-serdes folder to your desired location eg; /usr/hadoopsw/

The hive-serdes directory contains a Maven project with a JSON SerDe which enables Hive to query raw JSON data.

[hdpsysuser@vbgeneric ~]$ cd /usr/hadoopsw/hive-serdes/
[hdpsysuser@vbgeneric hive-serdes]$ ll
total 4
-rw-r--r-- 1 root root 3840 Jul 14 2016 pom.xml
drwxr-xr-x 3 root root 17 Jun 11 18:39 src

Build the hive-serdes JAR, from the root of the git repository:
[hdpsysuser@vbgeneric hive-serdes]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package

[INFO] Scanning for projects...

[INFO]

[INFO] ------------------------------------------------------------------------
[INFO] Building hive-serdes 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: https://repository.cloudera.com/artifactory/cloudera-repos/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
Downloading: https://repo.maven.apache.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
...
...
Downloaded: https://repo.maven.apache.org/maven2/org/apache/avro/avro-ipc/1.7.3/avro-ipc-1.7.3-tests.jar (264 kB at 74 kB/s)
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ hive-serdes ---
[INFO] Compiling 1 source file to /usr/hadoopsw/hive-serdes/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ hive-serdes ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ hive-serdes ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ hive-serdes ---
[INFO] Building jar: /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-shade-plugin:1.7.1:shade (default) @ hive-serdes ---
[INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.8 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar with /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:09 min
[INFO] Finished at: 2017-06-11T18:45:07-04:00
[INFO] Final Memory: 25M/211M
[INFO] ------------------------------------------------------------------------

This will generate a file called hive-serdes-1.0-SNAPSHOT.jar in the target directory.
Copy hive-serdes-1.0-SNAPSHOT.jar to $HIVE_HOME/lib


Run hive, and create table using the following commands: CREATE EXTERNAL TABLE tweets_data (
   id BIGINT,

   created_at STRING,

   source STRING,

   favorited BOOLEAN,
   retweeted_status STRUCT<
     text:STRING,
     twitter_user:STRUCT<screen_name:STRING,name:STRING>,
     retweet_count:INT>,
   entities STRUCT<
     urls:ARRAY<STRUCT<expanded_url:STRING>>,
     user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
     hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   twitter_user STRUCT<
     screen_name:STRING,
     name:STRING,
     friends_count:INT,
     followers_count:INT,
     statuses_count:INT,
     verified:BOOLEAN,
     utc_offset:INT,
     time_zone:STRING>,
   in_reply_to_screen_name STRING
 ) 
 PARTITIONED BY (datehour INT)
 ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
 LOCATION '/flume/twitter';

If you don't want to use JSONSerde , you could create an external table as below and then use hive function to parse the tweets.

hive> CREATE EXTERNAL TABLE RAW_TWEETS(json_response STRING)  STORED AS TEXTFILE  LOCATION '/flume/twitter';

hive> select * from RAW_TWEETS limit 10;


Parse tweets

CREATE VIEW vw_parsed_tweets as
SELECT

CAST(get_json_object(json_response, '$.id') as BIGINT) as ID,  get_json_object(json_response, '$.created_at') as CREATED_AT,get_json_object(json_response, '$.text') as TEXT,  get_json_object(json_response, '$.retweeted') as RETWEETED,

get_json_object(json_response, '$.coordinates') COORDINATES,  get_json_object(json_response, '$.source') SOURCE,  CAST (get_json_object(json_response, '$.retweet_count') as INT) RETWEET_COUNT,

get_json_object(json_response, '$.entities.urls[0].display_url') DISPLAY_URL,get_json_object(json_response, '$.user.screen_name') USER_SCREEN_NAME,  get_json_object(json_response, '$.user.name') USER_NAME,
CAST (get_json_object(json_response, '$.user.followers_count') as INT) FOLLOWER_COUNT,  CAST (get_json_object(json_response, '$.user.listed_count') as INT) LISTED_COUNT,  CAST (get_json_object(json_response, '$.user.friends_count') as INT) FRIENDS_COUNT,
get_json_object(json_response, '$.user.lang') USER_LANG,  get_json_object(json_response, '$.user.location') USER_LOCATION,  get_json_object(json_response, '$.user.time_zone') USER_TZ, 
get_json_object(json_response, '$.user.profile_image_url') PROFILE_IMAGE_URL
from raw_tweets;


Optional: Create view in PrestoDB

create or replace view hive.flume.vw_tweets as
SELECT
cast(json_extract_scalar(json_response, '$.created_at') as varchar) created_at,
json_extract_scalar(json_response, '$.source') source,
cast(json_extract_scalar(json_response, '$.retweet_count') as bigint) retweet_count,
json_extract_scalar(json_response, '$.retweeted') retweeted,
json_extract_scalar(json_response, '$.id') id,
json_extract_scalar(json_response, '$.text') text,
json_extract_scalar(json_response, '$.lang') lang,
json_extract_scalar(json_response, '$.favorited') favorited,
json_extract_scalar(json_response, '$.possibly_sensitive') possibly_sensitive,
json_extract_scalar(json_response, '$.coordinates') coordinates,
json_extract_scalar(json_response, '$.truncated') truncated,
json_extract_scalar(json_response, '$.timestamp_ms') timestamp_ms,
json_extract_scalar(json_response, '$.entities.urls[0].display_url') display_ur,
json_extract_scalar(json_response, '$.entities.urls[0].expanded_url') expanded_url,
json_extract_scalar(json_response, '$.entities.urls[0].url') url,
cast(json_extract_scalar(json_response, '$.user.friends_count') as bigint) user_friends_count,
json_extract_scalar(json_response, '$.user.profile_image_url_https') user_profile_image_url_https,
cast(json_extract_scalar(json_response, '$.user.listed_count') as bigint) user_listed_count,
json_extract_scalar(json_response, '$.user.profile_background_image_url') user_profile_background_image_url,
cast(json_extract_scalar(json_response, '$.user.favourites_count') as bigint) user_favourites_count,
json_extract_scalar(json_response, '$.user.description') user_description,
json_extract_scalar(json_response, '$.user.created_at') user_created_at,
json_extract_scalar(json_response, '$.user.profile_background_image_url_https') user_profile_background_image_url_https,
json_extract_scalar(json_response, '$.user.protected') user_protected,
json_extract_scalar(json_response, '$.user.id') user_id,
json_extract_scalar(json_response, '$.user.geo_enabled') user_geo_enabled,
json_extract_scalar(json_response, '$.user.lang') user_lang,
json_extract_scalar(json_response, '$.user.verified') user_verified,
json_extract_scalar(json_response, '$.user.time_zone') user_time_zone,
json_extract_scalar(json_response, '$.user.url') user_url,
json_extract_scalar(json_response, '$.user.contributors_enabled') user_contributors_enabled,
cast(json_extract_scalar(json_response, '$.user.statuses_count') as bigint) user_statuses_count,
cast(json_extract_scalar(json_response, '$.user.followers_count') as bigint) user_followers_count,
json_extract_scalar(json_response, '$.user.name') user_name,
json_extract_scalar(json_response, '$.user.location') user_location
FROM
hive.flume.raw_tweets


Streaming to HBase
You can stream the tweets to HBase also using AsyncHBaseSink. Example configuration is below.

1- Create a table in HBase
hbase(main):040:0* create 'json_data','cf'
0 row(s) in 2.3020 seconds

=> Hbase::Table - json_data


export HIVE_HOME=/usr/hdp/current/hive-server2
export HCAT_HOME=/usr/hdp/current/hive-webhcat
export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume

[flume@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[flume@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat

[flume@dn04 ~]$ export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume


2- Agent configuration File

hbase_agent.conf

# Naming the components on the current agent. 
hbase_agent1.sources = Twitter
hbase_agent1.channels = MemChannelHBase
hbase_agent1.sinks = HBASESINK

# Describing/Configuring the source 
hbase_agent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
hbase_agent1.sources.Twitter.consumerKey = NZTCCsoEEZEpnvzcguFUYvvlmy
hbase_agent1.sources.Twitter.consumerSecret = odBgVnMd8xDoFzNh56CyKH6l4Q9uyJE24yGKfRZyy3uNGlWyqqI
hbase_agent1.sources.Twitter.accessToken = 522008992-jcg9PYx74FVw7fkdPk8jLYgwpIJ69Kedft1VdTsR
hbase_agent1.sources.Twitter.accessTokenSecret = KFfGh1Uu1jSvXkZTy6FJcnJIxdfuBOGTLUHlyyuLwACqEI
hbase_agent1.sources.Twitter.keywords = pakistan


# Describing/Configuring the sink 
#Use the AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.type = org.apache.flume.sink.hbase.AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase
hbase_agent1.sinks.HBASESINK.table = json_data
hbase_agent1.sinks.HBASESINK.columnFamily = cf
hbase_agent1.sinks.HBASESINK.column = charges
hbase_agent1.sinks.HBASESINK.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
hbase_agent1.sinks.HBASESINK.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
hbase_agent1.sinks.HBASESINK.serializer.incrementColumn = icol


# Describing/Configuring the channel 
hbase_agent1.channels.MemChannelHBase.type=memory
hbase_agent1.channels.MemChannelHBase.capacity=10000
hbase_agent1.channels.MemChannelHBase.transactionCapacity=1000

# Binding the source and sink to the channel 
hbase_agent1.sources.Twitter.channels = MemChannelHBase
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase

3-  Run Agent

[flume@dn04 ~]$ flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/hbase_agent.conf -Dflume.root.logger=DEBUG,console -n hbase_agent1

4- Test from HBase Shell

hbase(main):025:0> count 'json_data'
Current count: 1000, row: default4f0868fe-0860-4cca-b2ea-8d64b386c658
Current count: 2000, row: defaulta2d5f305-3949-462b-9094-944fa517629e
Current count: 3000, row: defaultf0ae4d91-5325-4eb1-81b6-bce5c47b1755
3177 row(s) in 0.4750 seconds

=> 3177

No comments: