Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Friday, June 02, 2017

Apache PIG - a Short Tutorial


Introduction

Apache Pig is an abstraction over MapReduce developed as a research project at Yahoo in 2006 and was open sourced via Apache incubator in 2007. In 2008, the first release of Apache Pig came out. In 2010, Apache Pig graduated as an Apache top-level project. It is a tool/platform which is used to analyze larger sets of data representing them as data flows. To write data analysis programs, Pig provides a high-level language known as Pig Latin. Scripts written in Pig Latin are internally converted to Map and Reduce tasks. Apache Pig has a component known as Pig Engine that accepts the Pig Latin scripts as input and converts those scripts into MapReduce jobs. 
Pig Latin is SQL-like language and it is easy to learn Apache Pig when you are familiar with SQL. Apache Pig provides many built-in operators to support data operations like joins, filters, ordering, etc. In addition, it also provides nested data types like tuples, bags, and maps that are missing from MapReduce.


Apache Pig uses multi-query approach, thereby reducing the length of codes. For example, an operation that would require you to type 200 lines of code (LoC) in Java can be easily done by typing as less as just 10 LoC in Apache Pig. Ultimately Apache Pig reduces the development time by almost 16 times.


Apache Pig - Architecture

Internally, Apache Pig converts these scripts into a series of MapReduce jobs, and thus, it makes the programmer’s job easy. The architecture of Apache Pig is shown below.





Pig Latin: Series of operations or transformations that are applied to the input data to produce results.

Pig Latin Script: Contains pig commands in a file (.pig)

Grunt Shell: Interactive shell for running pig commands

Parser: checks the syntax of the script, does type checking, and other miscellaneous checks.

Optimizer: Carries out the logical optimizations such as projection and pushdown.

Compiler: Compiles the optimized logical plan into a series of MapReduce jobs.

Execution Engine:
Submits MapReduce jobs to Hadoop in a sorted order. Finally, these MapReduce jobs are executed on Hadoop producing the desired results.


Pig Running Modes


MapReduce Mode - Default mode which requires access to a Hadoop cluster and HDFS installation. The input and output in this mode are present on HDFS. Command for this mode is "pig"

Local Mode - With access to a single machine, all files are installed and run using a localhost file system. The input and output in this mode are present on local filesystem. Command for this mode is "pig -x local". There is no need of Hadoop or HDFS. This mode is generally used for testing purpose.



Pig Latin Data Model

The data model of Pig Latin is fully nested and it allows complex non-atomic datatypes such as map and tuple. 
Atom: Any single value in Pig Latin, irrespective of their data, stored as string and can be used as string and number. int, long, float, double, chararray, and bytearray. A piece of data or a simple atomic value is known as a field.
Tuple: Similar to a row in a table of RDBMS and represented by '()'
Example - (1,Linkin Park,7,California)

Bag: Unordered set of tuples, can have any number of fields (flexible schema) and is represented by '{}'. It is similar to a table in RDBMS, but unlike a table in RDBMS, it is not necessary that every tuple contain the same number of fields or that the fields in the same position (column) have the same type.
Example - {(1,Linkin Park,7,California), (Metalica,8),(Mega Death,5,Los Angeles)}

Map: A set of key-value pairs. The key needs to be of type chararray and should be unique. The value might be of any type. It is represented by '[]' and delimited with '#'
Example - [name#Ali, age#10]

Relation: A bag of tuples. The relations in Pig Latin are unordered (there is no guarantee that tuples are processed in any particular order).



Setup Apache Pig

Prerequisites: Hadoop and java already available on system, review post for hadoop and java setup.

Step 1: download the latest version of Apache Pig from the following website
and move the downloaded .tar file to desired location eg; /usr/hadoopsw/ and Untar the .gz file  using root user. After this set the appropriate ownership and file modes.

[root@en01 hadoopsw]# tar zxvf pig-0.16.0.tar.gz
[root@en01 hadoopsw]# chown hdpclient:hadoop_edge pig-0.16.0
[root@en01 hadoopsw]# chmod 755 pig-0.16.0


Step 2: Set the below pig related variables in .bash_profile of owner of pig

PIG_HOME folder to the Apache Pig’s installation folder,
PATH environment variable to the bin folder, and
PIG_CLASSPATH environment variable to the etc (configuration) folder of your Hadoop installations (the directory that contains the core-site.xml, hdfs-site.xml and mapred-site.xml files).


export PIG_HOME=/usr/hadoopsw/pig
export PATH=$PATH:$PIG_HOME/bin
export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop

[hdpclient@en01 hadoopsw]$ vi ~/.bash_profile
[hdpclient@en01 hadoopsw]$ cat ~/.bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
PATH=$PATH:$HOME/.local/bin:$HOME/bin
export PATH
###################Inside .bash_profile########################
## JAVA env variables
export JAVA_HOME=/usr/java/default
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
## Hadoop Variables
export HADOOP_HOME=/usr/hadoopsw/hadoop-2.7.3
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
#Hive Variables
export HIVE_HOME=/usr/hadoopsw/apache-hive-2.1.1-bin
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/*:.
export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/*:.
#Derby Variables
export DERBY_HOME=/usr/hadoopsw/db-derby-10.13.1.1-bin
export PATH=$PATH:$DERBY_HOME/bin
export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbyclient.jar:$DERBY_HOME/lib/derbytools.jar
export DERBY_OPTS=-Dij.protocol=jdbc:derby://hadoopedge1/
# PIG Variables

export PIG_HOME=/usr/hadoopsw/pig-0.16.0
export PATH=$PATH:$PIG_HOME/bin

export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop/


Optionally, In the conf folder of Pig we have a file named pig.properties. In the pig.properties file, you can set various parameters.


Step 3: Enter the below command to take the effect of .bash_profile
[hdpclient@en01 hadoopsw]$ source ~/.bash_profile
[hdpclient@en01 hadoopsw]$ echo $PIG_HOME
/usr/hadoopsw/pig-0.16.0
Step 4: Run/test/verify pig

[hdpclient@te1-hdp-rp-en01 hadoop]$ pig -version
Apache Pig version 0.16.0 (r1746530)
compiled Jun 01 2016, 23:10:49

[hdpclient@en01 hadoopsw]$ pig
17/05/17 17:15:07 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
17/05/17 17:15:07 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
17/05/17 17:15:07 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
17/05/17 17:15:07 WARN pig.Main: Cannot write to log file: /usr/hadoopsw/pig_1495030507876.log
2017-05-17 17:15:07,878 [main] INFO  org.apache.pig.Main - Apache Pig version 0.16.0 (r1746530) compiled Jun 01 2016, 23:10:49
2017-05-17 17:15:07,894 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/hdpclient/.pigbootup not found
2017-05-17 17:15:08,366 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2017-05-17 17:15:08,366 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2017-05-17 17:15:08,366 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://te1-hdp-rp-nn01:9000/
2017-05-17 17:15:08,896 [main] INFO  org.apache.pig.PigServer - Pig Script ID for the session: PIG-default-906031ec-0df3-4ff6-a4b5-d4256efdde28
2017-05-17 17:15:08,896 [main] WARN  org.apache.pig.PigServer - ATS is disabled since yarn.timeline-service.enabled set to false
grunt>
You can exit the Grunt shell using ‘ctrl + d’.

Create Pig related folder in HDFS

[hdpsysuser@nn01 ~]$ hdfs dfs -mkdir /pig
[hdpsysuser@nn01 ~]$ hdfs dfs -chmod 777 /pig


Executing Commands 


Shell Commands
We can invoke any shell commands from the Grunt shell excluding commands that are a part of the shell environment for example − cd.


grunt> sh ls
derby.log
hive_metastore_db
nolog.conf
pig_1495083634779.log
pig_1495096023414.log


fs Commands
Using the fs command, we can invoke any FsShell commands from the Grunt shell.

grunt> fs -put /tmp/mydata/dept.csv /pig
grunt> fs -ls /pig
Found 1 items
-rw-r--r--   3 hdpclient supergroup         82 2017-05-18 12:25 /pig/dept.csv

grunt> fs -put /tmp/mydata/emp.csv /pig



Utility Commands

The Grunt shell provides a set of utility commands such as clear, help, history, quit, and set. Commands such as exec, kill, and run to control Pig from the Grunt shell.

clear command is used to clear the screen of the Grunt shell.
help command gives you a list of Pig commands or Pig properties
history command displays a list of statements executed / used so far since the Grunt sell is invoked.
set command is used to show/assign values to keys used in Pig eg; grunt> set job.name myjob

quit You can quit from the Grunt shell using this command


grunt> dept = LOAD '/pig/dept.csv' USING PigStorage(',') as (deptno:int,dname:chararray,loc:chararray);
grunt> dump dept;


... <<Suppressed output>>
... <<Suppressed output>>
2017-05-18 11:48:12,250 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2017-05-18 11:48:12,255 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2017-05-18 11:48:12,268 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2017-05-18 11:48:12,268 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(10,ACCOUNTING,NEW YORK)
(20,RESEARCH,DALLAS)
(30,SALES,CHICAGO)
(40,OPERATIONS,BOSTON)

Disable info logging of pig console

It is annoying for logging message every time when using pig console like above, especially for illustration, we can’t concentrate on the output due to the message. In order to suppress the message, set an option when launching of pig console.

[hdpclient@te1-hdp-rp-en01 ~]$ pwd
/home/hdpclient
[hdpclient@te1-hdp-rp-en01 ~]$ echo "log4j.rootLogger=fatal" > nolog.conf
[hdpclient@te1-hdp-rp-en01 ~]$ pig -4 nolog.conf
17/05/18 12:03:52 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
17/05/18 12:03:52 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
17/05/18 12:03:52 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
17/05/18 12:03:52 INFO pig.Main: Loaded log4j properties from file: nolog.conf

grunt>


grunt> dept = LOAD '/pig/dept.csv' USING PigStorage(',') as (deptno:int,dname:chararray,loc:chararray);
grunt> dump dept;

(10,ACCOUNTING,NEW YORK)
(20,RESEARCH,DALLAS)
(30,SALES,CHICAGO)
(40,OPERATIONS,BOSTON)
grunt>


exec/run commands can execute Pig scripts from the Grunt shell. When using run the statements from the script are available in the command history

Create a pig script 
[hdpclient@te1-hdp-rp-en01 ~]$ cat pig_test_script.pig
dept = LOAD '/user/hive/warehouse/scott.db/dept/dept.csv' USING PigStorage(',') as (deptno:int,dname:chararray,loc:chararray);
dump dept;

grunt> exec /home/hdpclient/pig_test_script.pig


grunt> run /home/hdpclient/pig_test_script.pig
grunt> dept = LOAD '/user/hive/warehouse/scott.db/dept/dept.csv' USING PigStorage(',') as (deptno:int,dname:chararray,loc:chararray);
grunt> dump dept;

kill  can kill a job from the Grunt shell using this command eg; kill myjob.

Pig Latin

Pig Latin is the language used to analyze data in Hadoop using Apache Pig providing statements as basic constructs which include expressions and schemas and ends with a semicolon (;). Except LOAD and STORE, while performing all other operations, Pig Latin statements take a relation as input and produce another relation as output.

As soon as you enter a Load statement in the Grunt shell, its semantic checking will be carried out. To see the contents of the schema, you need to use the Dump operator.

Pig Latin – Data types

Scaler Types: Int, long, fload, double, chararray, Bytearray, Boolean, Datetime, Biginteger, Bigdecimal
Complex Types: Tuple,Map,Bag

Null values are treated the same way as SQL
Pig Latin – Relational Operations

OperatorDescription
Loading and Storing
LOADTo Load the data from the file system (local/HDFS) into a relation.
Syntax
Relation_name = LOAD 'Input file path' USING function as schema;LOAD Functions (BinStorage, JsonLoader, PigStorage, TextLoader).
STORETo save a relation to the file system (local/HDFS).
Syntax
STORE Relation_name INTO ' required_directory_path ' [USING function];
Filtering
FILTERTo remove unwanted rows from a relation.
DISTINCTTo remove duplicate rows from a relation.
FOREACH, GENERATETo generate data transformations based on columns of data.
STREAMTo transform a relation using an external program.
Grouping and Joining
JOINTo join two or more relations.
COGROUPTo group the data in two or more relations.
GROUPTo group the data in a single relation.
Syntax
Group_data = GROUP Relation_name BY age;
CROSSTo create the cross product of two or more relations.
Sorting
ORDERTo arrange a relation in a sorted order based on one or more fields (ascending or descending).
LIMITTo get a limited number of tuples from a relation.
Combining and Splitting
UNIONTo combine two or more relations into a single relation.
SPLITTo split a single relation into two or more relations.
Diagnostic Operators
DUMPTo print the contents of a relation on the console. It is generally used for debugging Purpose.
Syntax
dump Relation_Name
DESCRIBETo describe the schema of a relation.
Syntax
describe Relation_name
EXPLAINTo view the logical, physical, or MapReduce execution plans to compute a relation.
Syntax
explain Relation_name;
ILLUSTRATETo view the step-by-step execution of a series of statements.
Syntax
illustrate Relation_name; 

Usage

LOAD
grunt> dept = LOAD '/pig/dept.csv' USING PigStorage(',') as (deptno:int,dname:chararray,loc:chararray);
grunt> dump dept;

(10,ACCOUNTING,NEW YORK)
(20,RESEARCH,DALLAS)
(30,SALES,CHICAGO)
(40,OPERATIONS,BOSTON)

STORE
grunt> store dept into '/tmp/deptnew.txt' using PigStorage(',');
Verify
grunt> fs -ls /tmp/dept*
Found 2 items
-rw-r--r--   3 hdpclient supergroup          0 2017-05-18 13:40 /tmp/deptnew.txt/_SUCCESS
-rw-r--r--   3 hdpclient supergroup         80 2017-05-18 13:40 /tmp/deptnew.txt/part-m-00000

grunt> fs -cat /tmp/deptnew.txt/part-m-00000
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

DESCRIBE
grunt> describe dept;
dept: {deptno: int,dname: chararray,loc: chararray}

EXPLAIN
grunt> explain dept;
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
dept: (Name: LOStore Schema: deptno#68:int,dname#69:chararray,loc#70:chararray)
|
|---dept: (Name: LOForEach Schema: deptno#68:int,dname#69:chararray,loc#70:chararray)
    |   |
    |   (Name: LOGenerate[false,false,false] Schema: deptno#68:int,dname#69:chararray,loc#70:chararray)ColumnPrune:OutputUids=[68, 69, 70]ColumnPrune:InputUids=[68, 69, 70]
    |   |   |
    |   |   (Name: Cast Type: int Uid: 68)
    |   |   |
    |   |   |---deptno:(Name: Project Type: bytearray Uid: 68 Input: 0 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 69)
    |   |   |
    |   |   |---dname:(Name: Project Type: bytearray Uid: 69 Input: 1 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 70)
    |   |   |
    |   |   |---loc:(Name: Project Type: bytearray Uid: 70 Input: 2 Column: (*))
    |   |
    |   |---(Name: LOInnerLoad[0] Schema: deptno#68:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[1] Schema: dname#69:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[2] Schema: loc#70:bytearray)
    |
    |---dept: (Name: LOLoad Schema: deptno#68:bytearray,dname#69:bytearray,loc#70:bytearray)RequiredFields:null
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
dept: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-43
|
|---dept: New For Each(false,false,false)[bag] - scope-42
    |   |
    |   Cast[int] - scope-34
    |   |
    |   |---Project[bytearray][0] - scope-33
    |   |
    |   Cast[chararray] - scope-37
    |   |
    |   |---Project[bytearray][1] - scope-36
    |   |
    |   Cast[chararray] - scope-40
    |   |
    |   |---Project[bytearray][2] - scope-39
    |
    |---dept: Load(/pig/dept.csv:PigStorage(',')) - scope-32

#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-44
Map Plan
dept: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-43
|
|---dept: New For Each(false,false,false)[bag] - scope-42
    |   |
    |   Cast[int] - scope-34
    |   |
    |   |---Project[bytearray][0] - scope-33
    |   |
    |   Cast[chararray] - scope-37
    |   |
    |   |---Project[bytearray][1] - scope-36
    |   |
    |   Cast[chararray] - scope-40
    |   |
    |   |---Project[bytearray][2] - scope-39
    |
    |---dept: Load(/pig/dept.csv:PigStorage(',')) - scope-32--------
Global sort: false
----------------

ILLUSTRATE
grunt> illustrate dept;
(10,ACCOUNTING,NEW YORK)
--------------------------------------------------------------------
| dept     | deptno:int    | dname:chararray    | loc:chararray    |
--------------------------------------------------------------------
|          | 10            | ACCOUNTING         | NEW YORK         |
--------------------------------------------------------------------

GROUP
grunt> group_data = group dept by deptno;
grunt> dump group_data;
(10,{(10,ACCOUNTING,NEW YORK)})
(20,{(20,RESEARCH,DALLAS)})
(30,{(30,SALES,CHICAGO)})
(40,{(40,OPERATIONS,BOSTON)})

grunt> describe group_data;
group_data: {group: int,dept: {(deptno: int,dname: chararray,loc: chararray)}}

grunt> illustrate group_data;
(20,RESEARCH,DALLAS)
-----------------------------------------------------------------------
| dept     | deptno:int     | dname:chararray     | loc:chararray     |
-----------------------------------------------------------------------
|          | 20             | RESEARCH            | DALLAS            |
|          | 20             | RESEARCH            | DALLAS            |
-----------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------
| group_data     | group:int     | dept:bag{:tuple(deptno:int,dname:chararray,loc:chararray)}                     |
-------------------------------------------------------------------------------------------------------------------
|                | 20            | {(20, RESEARCH, DALLAS), (20, RESEARCH, DALLAS)}                               |
-------------------------------------------------------------------------------------------------------------------

Here you can observe that the resulting schema has two columns − One is deptno, by which we have grouped the relation. The other is a bag, which contains the group of tuples, student records with the respective deptno.

group on multiple columns
grunt> group_multiple = group dept by (deptno,dname);
grunt> dump group_multiple;
((10,ACCOUNTING),{(10,ACCOUNTING,NEW YORK)})
((20,RESEARCH),{(20,RESEARCH,DALLAS)})
((30,SALES),{(30,SALES,CHICAGO)})
((40,OPERATIONS),{(40,OPERATIONS,BOSTON)})

group on all columns

grunt> group_all = group dept all;
grunt> dump group_all;
(all,{(40,OPERATIONS,BOSTON),(30,SALES,CHICAGO),(20,RESEARCH,DALLAS),(10,ACCOUNTING,NEW YORK)})

COGROUP

grunt> emp = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);

grunt> dump emp;
(7369,SMITH,CLERK,7902,,800,,20)
(7499,ALLEN,SALESMAN,7698,,1600,300,30)
(7521,WARD,SALESMAN,7698,,1250,500,30)
(7566,JONES,MANAGER,7839,,2975,,20)
(7654,MARTIN,SALESMAN,7698,,1250,1400,30)
(7698,BLAKE,MANAGER,7839,,2850,,30)
(7782,CLARK,MANAGER,7839,,2450,,10)
(7788,SCOTT,ANALYST,7566,,3000,,20)
(7839,KING,PRESIDENT,,,5000,,10)
(7844,TURNER,SALESMAN,7698,,1500,0,30)
(7876,ADAMS,CLERK,7788,,1100,,20)
(7900,JAMES,CLERK,7698,,950,,30)
(7902,FORD,ANALYST,7566,,3000,,20)
(7934,MILLER,CLERK,7782,,1300,,10)


grunt> describe emp;
emp: {EMPNO: int,ENAME: chararray,JOB: chararray,MGR: int,HIREDATE: datetime,SAL: int,COMM: int,DEPTNO: int}


grunt> cogroup_data = COGROUP dept by deptno, emp by deptno;
17/05/18 15:11:03 ERROR grunt.Grunt: ERROR 1025:
<line 3, column 46> Invalid field projection. Projected field [deptno] does not exist in schema: EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int.
Details at logfile: /home/hdpclient/pig_1495108584752.log
grunt> cogroup_data = COGROUP dept by deptno, emp by DEPTNO;
grunt> dump cogroup_data;

(10,{(10,ACCOUNTING,NEW YORK)},{(7934,MILLER,CLERK,7782,,1300,,10),(7839,KING,PRESIDENT,,,5000,,10),(7782,CLARK,MANAGER,7839,,2450,,10)})
(20,{(20,RESEARCH,DALLAS)},{(7876,ADAMS,CLERK,7788,,1100,,20),(7788,SCOTT,ANALYST,7566,,3000,,20),(7369,SMITH,CLERK,7902,,800,,20),(7566,JONES,MANAGER,7839,,2975,,20),(7902,FORD,ANALYST,7566,,3000,,20)})
(30,{(30,SALES,CHICAGO)},{(7844,TURNER,SALESMAN,7698,,1500,0,30),(7499,ALLEN,SALESMAN,7698,,1600,300,30),(7698,BLAKE,MANAGER,7839,,2850,,30),(7654,MARTIN,SALESMAN,7698,,1250,1400,30),(7521,WARD,SALESMAN,7698,,1250,500,30),(7900,JAMES,CLERK,7698,,950,,30)})
(40,{(40,OPERATIONS,BOSTON)},{})
grunt>

In the above output, The cogroup operator groups the tuples from each relation according to deptno where each group depicts a particular deptno value.


JOIN
sefl-join (table joins to itself so need alais)
grunt> e1 = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);
grunt> e2 = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);
grunt> emp_self_join = JOIN e1 BY EMPNO, e2 BY MGR;
grunt> dump emp_self_join;
(7566,JONES,MANAGER,7839,,2975,,20,7788,SCOTT,ANALYST,7566,,3000,,20)
(7566,JONES,MANAGER,7839,,2975,,20,7902,FORD,ANALYST,7566,,3000,,20)
(7698,BLAKE,MANAGER,7839,,2850,,30,7654,MARTIN,SALESMAN,7698,,1250,1400,30)
(7698,BLAKE,MANAGER,7839,,2850,,30,7900,JAMES,CLERK,7698,,950,,30)
(7698,BLAKE,MANAGER,7839,,2850,,30,7521,WARD,SALESMAN,7698,,1250,500,30)
(7698,BLAKE,MANAGER,7839,,2850,,30,7844,TURNER,SALESMAN,7698,,1500,0,30)
(7698,BLAKE,MANAGER,7839,,2850,,30,7499,ALLEN,SALESMAN,7698,,1600,300,30)
(7782,CLARK,MANAGER,7839,,2450,,10,7934,MILLER,CLERK,7782,,1300,,10)
(7788,SCOTT,ANALYST,7566,,3000,,20,7876,ADAMS,CLERK,7788,,1100,,20)
(7839,KING,PRESIDENT,,,5000,,10,7782,CLARK,MANAGER,7839,,2450,,10)
(7839,KING,PRESIDENT,,,5000,,10,7698,BLAKE,MANAGER,7839,,2850,,30)
(7839,KING,PRESIDENT,,,5000,,10,7566,JONES,MANAGER,7839,,2975,,20)
(7902,FORD,ANALYST,7566,,3000,,20,7369,SMITH,CLERK,7902,,800,,20)


Inner Join/equijoin (returns rows when there is a match in both tables)
grunt> inner_join = JOIN dept by deptno, emp by DEPTNO;
grunt> dump inner_join;
(10,ACCOUNTING,NEW YORK,7934,MILLER,CLERK,7782,,1300,,10)
(10,ACCOUNTING,NEW YORK,7839,KING,PRESIDENT,,,5000,,10)
(10,ACCOUNTING,NEW YORK,7782,CLARK,MANAGER,7839,,2450,,10)
(20,RESEARCH,DALLAS,7876,ADAMS,CLERK,7788,,1100,,20)
(20,RESEARCH,DALLAS,7788,SCOTT,ANALYST,7566,,3000,,20)
(20,RESEARCH,DALLAS,7369,SMITH,CLERK,7902,,800,,20)
(20,RESEARCH,DALLAS,7566,JONES,MANAGER,7839,,2975,,20)
(20,RESEARCH,DALLAS,7902,FORD,ANALYST,7566,,3000,,20)
(30,SALES,CHICAGO,7844,TURNER,SALESMAN,7698,,1500,0,30)
(30,SALES,CHICAGO,7499,ALLEN,SALESMAN,7698,,1600,300,30)
(30,SALES,CHICAGO,7698,BLAKE,MANAGER,7839,,2850,,30)
(30,SALES,CHICAGO,7654,MARTIN,SALESMAN,7698,,1250,1400,30)
(30,SALES,CHICAGO,7521,WARD,SALESMAN,7698,,1250,500,30)
(30,SALES,CHICAGO,7900,JAMES,CLERK,7698,,950,,30)

left outer join (returns all rows from the left table, even if there are no matches in the right relation)

grunt> dump left_outer_join;
(10,ACCOUNTING,NEW YORK,7934,MILLER,CLERK,7782,,1300,,10)
(10,ACCOUNTING,NEW YORK,7839,KING,PRESIDENT,,,5000,,10)
(10,ACCOUNTING,NEW YORK,7782,CLARK,MANAGER,7839,,2450,,10)
(20,RESEARCH,DALLAS,7876,ADAMS,CLERK,7788,,1100,,20)
(20,RESEARCH,DALLAS,7788,SCOTT,ANALYST,7566,,3000,,20)
(20,RESEARCH,DALLAS,7369,SMITH,CLERK,7902,,800,,20)
(20,RESEARCH,DALLAS,7566,JONES,MANAGER,7839,,2975,,20)
(20,RESEARCH,DALLAS,7902,FORD,ANALYST,7566,,3000,,20)
(30,SALES,CHICAGO,7844,TURNER,SALESMAN,7698,,1500,0,30)
(30,SALES,CHICAGO,7499,ALLEN,SALESMAN,7698,,1600,300,30)
(30,SALES,CHICAGO,7698,BLAKE,MANAGER,7839,,2850,,30)
(30,SALES,CHICAGO,7654,MARTIN,SALESMAN,7698,,1250,1400,30)
(30,SALES,CHICAGO,7521,WARD,SALESMAN,7698,,1250,500,30)
(30,SALES,CHICAGO,7900,JAMES,CLERK,7698,,950,,30)
(40,OPERATIONS,BOSTON,,,,,,,,)


right outer join (returns all rows from the right table, even if there are no matches in the left table)
grunt> right_outer_join = JOIN emp BY DEPTNO RIGHT, dept BY deptno;
grunt> dump right_outer_join;
(7934,MILLER,CLERK,7782,,1300,,10,10,ACCOUNTING,NEW YORK)
(7839,KING,PRESIDENT,,,5000,,10,10,ACCOUNTING,NEW YORK)
(7782,CLARK,MANAGER,7839,,2450,,10,10,ACCOUNTING,NEW YORK)
(7876,ADAMS,CLERK,7788,,1100,,20,20,RESEARCH,DALLAS)
(7788,SCOTT,ANALYST,7566,,3000,,20,20,RESEARCH,DALLAS)
(7369,SMITH,CLERK,7902,,800,,20,20,RESEARCH,DALLAS)
(7566,JONES,MANAGER,7839,,2975,,20,20,RESEARCH,DALLAS)
(7902,FORD,ANALYST,7566,,3000,,20,20,RESEARCH,DALLAS)
(7844,TURNER,SALESMAN,7698,,1500,0,30,30,SALES,CHICAGO)
(7499,ALLEN,SALESMAN,7698,,1600,300,30,30,SALES,CHICAGO)
(7698,BLAKE,MANAGER,7839,,2850,,30,30,SALES,CHICAGO)
(7654,MARTIN,SALESMAN,7698,,1250,1400,30,30,SALES,CHICAGO)
(7521,WARD,SALESMAN,7698,,1250,500,30,30,SALES,CHICAGO)
(7900,JAMES,CLERK,7698,,950,,30,30,SALES,CHICAGO)
(,,,,,,,,40,OPERATIONS,BOSTON)


full outer join (returns rows when there is a match in one of the relations)


grunt> full_outer_join = JOIN dept by deptno FULL OUTER, emp by DEPTNO;

grunt> dump full_outer_join;





CROSS
grunt> cross_product = CROSS dept, emp;
grunt> dump full_outer_join;

UNION
grunt> dept_emp_union = union dept,emp;
grunt> dump dept_emp_union;
(7369,SMITH,CLERK,7902,,800,,20)
(7499,ALLEN,SALESMAN,7698,,1600,300,30)
(7521,WARD,SALESMAN,7698,,1250,500,30)
(7566,JONES,MANAGER,7839,,2975,,20)
(7654,MARTIN,SALESMAN,7698,,1250,1400,30)
(7698,BLAKE,MANAGER,7839,,2850,,30)
(7782,CLARK,MANAGER,7839,,2450,,10)
(7788,SCOTT,ANALYST,7566,,3000,,20)
(7839,KING,PRESIDENT,,,5000,,10)
(7844,TURNER,SALESMAN,7698,,1500,0,30)
(7876,ADAMS,CLERK,7788,,1100,,20)
(7900,JAMES,CLERK,7698,,950,,30)
(7902,FORD,ANALYST,7566,,3000,,20)
(7934,MILLER,CLERK,7782,,1300,,10)
(10,ACCOUNTING,NEW YORK)
(20,RESEARCH,DALLAS)
(30,SALES,CHICAGO)
(40,OPERATIONS,BOSTON)

SPLIT
grunt> SPLIT emp into emp10 if DEPTNO == 10, emp20 if DEPTNO == 20;
grunt> dump emp10;
grunt> dump emp20;
(7782,CLARK,MANAGER,7839,,2450,,10)
(7839,KING,PRESIDENT,,,5000,,10)
(7934,MILLER,CLERK,7782,,1300,,10)

FILTER
grunt> filter_emp = FILTER emp BY JOB == 'CLERK';
grunt> dump filter_emp;
(7369,SMITH,CLERK,7902,,800,,20)
(7876,ADAMS,CLERK,7788,,1100,,20)
(7900,JAMES,CLERK,7698,,950,,30)
(7934,MILLER,CLERK,7782,,1300,,10)


DISTINCT

grunt> distinct_emp = DISTINCT emp;



FOREACH

grunt> foreach_data = FOREACH dept GENERATE dname,loc;

grunt> dump foreach_data;
(ACCOUNTING,NEW YORK)
(RESEARCH,DALLAS)
(SALES,CHICAGO)
(OPERATIONS,BOSTON)


ORDER 
grunt> order_by_data = ORDER dept BY loc ASC;
grunt> dump order_by_data

(40,OPERATIONS,BOSTON)
(30,SALES,CHICAGO)
(20,RESEARCH,DALLAS)
(10,ACCOUNTING,NEW YORK)



LIMIT
grunt> limit_data = LIMIT emp 3;
grunt> dump limit_data;
(7369,SMITH,CLERK,7902,,800,,20)
(7499,ALLEN,SALESMAN,7698,,1600,300,30)
(7521,WARD,SALESMAN,7698,,1250,500,30)


Apache Pig - Functions


Pig comes with a set of built in functions (the eval, load/store, math, string, bag and tuple functions). Two main properties differentiate built in functions from user defined functions (UDFs). First, built in functions don't need to be registered because Pig knows where they are. Second, built in functions don't need to be qualified when they are used because Pig knows where to find them.

Eval Functions

AVG() - Computes the average of the numeric values in a single-column bag.

grunt> emp = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);


grunt> gbj = GROUP emp BY JOB;
grunt> avgByJob = FOREACH gbj GENERATE emp.JOB, AVG(emp.SAL);
grunt> dump avgByJob
({(CLERK),(CLERK),(CLERK),(CLERK)},1037.5)
({(ANALYST),(ANALYST)},3000.0)
({(MANAGER),(MANAGER),(MANAGER)},2758.3333333333335)
({(SALESMAN),(SALESMAN),(SALESMAN),(SALESMAN)},1400.0)
({(PRESIDENT)},5000.0)


You can use the other eval function (eg; max,min,count,sum) the same way as described above.

Load/Store Functions


Load/store functions determine how data goes into Pig and comes out of Pig. Support for compression is determined by the load/store function.
PigStorage() - The PigStorage() function loads and stores data as structured text files. It takes a delimiter using which each entity of a tuple is separated as a parameter. By default, it takes ‘\t’ as a parameter.
Examples
grunt> emp = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);


grunt> STORE emp INTO '/pig/emp2.csv ' USING PigStorage (',');
TextLoader() - To load unstructured data in UTF-8 format.
grunt> dept = LOAD '/pig/dept.csv' USING TextLoader();
grunt> dump dept;



BinStorage() - used to load and store the data into Pig using machine readable format. BinStorge() in Pig is generally used to store temporary data generated between the MapReduce jobs. It supports multiple locations as input.

Example:

Let us load this data into Pig into a relation as shown below.
grunt> dept = LOAD '/pig/dept.csv' USING PigStorage(',');


Now, we can store this relation into the HDFS directory using the BinStorage() function.
grunt> STORE dept INTO '/pig/output ' USING BinStorage();


After executing the above statement, the relation is stored in the given HDFS directory. You can see it using the HDFS ls command

grunt> fs -ls /pig/output;
Found 2 items
-rw-r--r-- 1 oracle supergroup 0 2017-06-02 08:30 /pig/output/_SUCCESS
-rw-r--r-- 1 oracle supergroup 160 2017-06-02 08:30 /pig/output/part-m-00000


Now, load the data from the file part-m-00000.
grunt> result = LOAD '/pig/output/part-m-00000' USING BinStorage();

Verify the contents of the relation
grunt> dump result;
(10,ACCOUNTING,NEW YORK)
(20,RESEARCH,DALLAS)
(30,SALES,CHICAGO)
(40,OPERATIONS,BOSTON)


Bag & Tuple Functions

TOBAG() - converts one or more expressions to individual tuples. And these tuples are placed in a bag.

grunt> emp = load '/pig/emp.csv' using PigStorage(',') as (EMPNO:int,ENAME:chararray,JOB:chararray,MGR:int,HIREDATE:datetime,SAL:int,COMM:int,DEPTNO:int);

grunt> tobag = FOREACH emp GENERATE TOBAG(EMPNO,ENAME,SAL);

verify the contents of the tobag relation
grunt> dump tobag;

({(7369),(SMITH),(800)})
({(7499),(ALLEN),(1600)})
({(7521),(WARD),(1250)})
({(7566),(JONES),(2975)})
({(7654),(MARTIN),(1250)})
({(7698),(BLAKE),(2850)})
({(7782),(CLARK),(2450)})
({(7788),(SCOTT),(3000)})
({(7839),(KING),(5000)})
({(7844),(TURNER),(1500)})
({(7876),(ADAMS),(1100)})
({(7900),(JAMES),(950)})
({(7902),(FORD),(3000)})
({(7934),(MILLER),(1300)})


TOTUPLE() - used convert one or more expressions to the data type tuple.

grunt> totuple = FOREACH emp GENERATE TOTUPLE(EMPNO,ENAME,SAL);
grunt> dump totuple;
((7369,SMITH,800))
((7499,ALLEN,1600))
((7521,WARD,1250))
((7566,JONES,2975))
((7654,MARTIN,1250))


TOMAP()
- used to convert the key-value pairs into a Map.

grunt> tomap = FOREACH emp GENERATE TOMAP(EMPNO,ENAME);
grunt> dump tomap;



For other built-in function please visit the link https://pig.apache.org/docs/r0.9.1/func.html

No comments: