Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Sunday, April 30, 2017

Hive for Oracle Developers and DBAs - Part III


Today we will discuss some more topic in Hive like Hive Queries, Distributed clauses, Sampling Data, Views,  Indexes and schema design. You can review the related posts below.


Hive for Oracle Developers and DBAs - Part  I
Hive for Oracle Developers and DBAs - Part II

HiveQL : Hive Queries                                                              

Query Array column

hive (scott)> SELECT name, subordinates FROM employees;
OK
name    subordinates
Abuzar  ["Ikram","Usman",""]
Ali     ["Ikram","Usman",""]
Ashraf  ["Ikram","Usman",""]
Mubeen  ["Ikram","Usman",""]
Time taken: 0.159 seconds, Fetched: 4 row(s)

Selects the first element of  the subordinates array
hive (scott)> SELECT name, subordinates[0] FROM employees;
OK
name    c1
Abuzar  Ikram
Ali     Ikram
Ashraf  Ikram
Mubeen  Ikram
Time taken: 0.169 seconds, Fetched: 4 row(s)


Query MAP column
The deductions is a MAP, where the JSON representation for maps is used, namely a
comma-separated list of key:value pairs, surrounded with {...}:

hive (scott)> SELECT name, deductions FROM employees;
OK
name    deductions
Abuzar  {"pf":3.5,"loan":9.4}
Ali     {"pf":5.5,"loan":7.4}
Ashraf  {"pf":3.5,"loan":9.4}
Mubeen  {"pf":5.5,"loan":7.4}
Time taken: 0.165 seconds, Fetched: 4 row(s)

To reference a MAP element, you also use ARRAY[...] syntax, but with key values instead
of integer indices:

hive (scott)> SELECT name, deductions["loan"] FROM employees;
OK
name    c1
Abuzar  9.4
Ali     7.4
Ashraf  9.4
Mubeen  7.4
Time taken: 0.089 seconds, Fetched: 4 row(s)

Query STRUCT column
The address is a STRUCT, which is also written using the JSON map format:
hive (scott)> SELECT name, address FROM employees;
OK
name    address
Abuzar  {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Ali     {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Ashraf  {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Mubeen  {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Time taken: 0.167 seconds, Fetched: 4 row(s)

To reference an element in a STRUCT, you use “dot” notation, similar to the
table_alias.column mentioned above:

hive (scott)> SELECT name, address.city FROM employees;
OK
name    city
Abuzar  Lahore
Ali     Lahore
Ashraf  Lahore
Mubeen  Lahore
Time taken: 0.159 seconds, Fetched: 4 row(s)

Computing with Column Values
hive (scott)> SELECT upper(name), salary, deductions["loan"], round(salary * ( deductions["loan"])/100) FROM employees;
OK
c0      salary  c2      c3
ABUZAR  1000.0  9.4     94.0
ALI     2000.0  7.4     148.0
ASHRAF  4000.0  9.4     376.0
MUBEEN  5000.0  7.4     370.0
Time taken: 0.147 seconds, Fetched: 4 row(s)

Using Functions
hive (scott)> select  to_date("2017-04-30 00:00:00") from dual;
OK
c0
2017-04-30
Time taken: 0.092 seconds, Fetched: 1 row(s)

hive (scott)> select  year("2017-04-30 00:00:00") from dual;

hive (scott)> select current_timestamp from dual;
OK
c0
2017-04-30 11:22:52.547

hive (scott)> select datediff('2017-04-30', '2017-04-01') from dual;
OK
c0
29
hive (scott)> select concat ('Ali' ' Haider') from dual;
OK
c0
Ali Haider

hive (scott)> select sum(salary) from employees;
OK
c0
12000.0
Time taken: 17.553 seconds, Fetched: 1 row(s)

hive (scott)> select employees.address.state,sum(salary) from employees group by employees.address.state having sum(salary) > 10000;
OK
state   _c1
Punjab  12000.0
Time taken: 21.467 seconds, Fetched: 1 row(s)

Nested SELECT Statements
hive (scott)> select * FROM (SELECT upper(name), salary, deductions["loan"] as deduction,round(salary * (deductions["loan"])/100) as amount FROM employees) e;

CASE … WHEN … THEN Statements
The CASE … WHEN … THEN clauses are like if statements for individual columns in query
results. For example:

hive (scott)> SELECT name, salary,CASE WHEN salary < 1001 THEN 'low' WHEN salary >= 1001 AND salary <= 2000 THEN 'middle' WHEN salary >= 4000 AND salary < 10000 THEN 'high' END AS bracket FROM employees;
OK
name    salary  bracket
Abuzar  1000.0  low
Ali     2000.0  middle
Ashraf  4000.0  high
Mubeen  5000.0  high
Time taken: 0.171 seconds, Fetched: 4 row(s)

When Hive Can Avoid MapReduce
you have probably noticed that a MapReduce job is started in most cases. Hive implements some kinds of queries without using MapReduce, in so-called local mode, for example:
SELECT * FROM employees;
Furthermore, Hive will attempt to run other operations in local mode if the hive.exec.mode.local.auto property is set to true:
set hive.exec.mode.local.auto=true;
Otherwise, Hive uses MapReduce to run all other queries.

Distributed Clauses

DISTRIBUTE BY controls how map output is divided among reducers. By default, MapReduce computes a hash on the keys output by mappers and tries to evenly distribute the key-value pairs among the available reducers using the hash values. Say we want the data for each value in a column to be captured together. We can use DISTRIBUTE BY to ensure that the records for each go to the same reducer. DISTRIBUTE BY works similar to GROUP BY in the sense that it controls how reducers receive rows for processing, Note that Hive requires that the DISTRIBUTE BY clause come before the SORT BY clause if it's in same query. SORT BY controls the sorting of data inside the reducer.

hive (scott)> SELECT * from employees distribute by employees.address.city sort by employees.address.city asc;

DISTRIBUTE BY is a good workaround to utilize less memory when you have a memory-intensive job, and forces Hadoop to use Reducers instead of having a Map-only job. Essentially Mappers do some grouping of the rows based on the DISTRIBUTE BY columns specified, which make the framework make less work overall, and pass on these aggregates to the Reducers.


If same columns are used in both clauses and all columns are sorted by ascending order (the default). In this case, the CLUSTER BY clause is a short-hand way of expressing the same query.



hive (scott)> SELECT * from employees cluster by employees.address.city;


Casting
hive (scott)> SELECT name, salary FROM employees WHERE cast(salary AS FLOAT) < 3000.0;
OK
name    salary
Abuzar  1000.0
Ali     2000.0
Time taken: 0.174 seconds, Fetched: 2 row(s)

hive (scott)> select (2.0*cast(cast(salary as string) as double)) from employees;
OK
c0
2000.0
4000.0
8000.0
10000.0
Time taken: 0.128 seconds, Fetched: 4 row(s)

Queries that Sample Data
For very large data sets, sometimes you want to work with a  sample of a query result, not the whole thing. Hive supports this goal with queries that sample tables organized into buckets. We can sample using the rand() function, which returns a random number.

hive (scott)> select count(*)from (SELECT * from translog TABLESAMPLE(BUCKET 3 OUT OF 100 ON rand())) tl;
291318
Time taken: 33.819 seconds, Fetched: 1 row(s)

hive (scott)> SELECT * from employees TABLESAMPLE(BUCKET 1 OUT OF 2 ON rand()) tl;
OK
tl.name tl.salary       tl.subordinates tl.deductions   tl.address
Ashraf  4000.0  ["Ikram","Usman",""]    {"pf":3.5,"loan":9.4}   {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Time taken: 0.099 seconds, Fetched: 1 row(s)

Block Sampling: Hive offers another syntax for sampling a percentage of blocks of an input path as an alternative to sampling based on rows:

hive (scott)> SELECT * FROM employees TABLESAMPLE(0.1 PERCENT) e;
OK
e.name  e.salary        e.subordinates  e.deductions    e.address
Abuzar  1000.0  ["Ikram","Usman",""]    {"pf":3.5,"loan":9.4}   {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Ali     2000.0  ["Ikram","Usman",""]    {"pf":5.5,"loan":7.4}   {"street":"College Rd.","city":"Lahore","state":"Punjab","zip":41000}
Time taken: 0.088 seconds, Fetched: 2 row(s)

This sampling is not known to work with all file formats. Also, the smallest unit of sampling is a single HDFS block. Hence, for tables less than the typical block size of 128 MB, all rows will be returned.

Virtual Columns
Hive provides two virtual columns: one for the input filename for split and the other
for the block offset in the file. These are helpful when diagnosing queries where Hive

is producing unexpected or null results.

hive (scott)> SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE,ename from emp;
OK
input__file__name       block__offset__inside__file     ename
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    0       SMITH
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    41      ALLEN
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    89      WARD
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    136     JONES
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    180     MARTIN
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    230     BLAKE
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    274     CLARK
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    318     SCOTT
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    362     KING
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    403     TURNER
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    450     ADAMS
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    492     JAMES
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    533     FORD
hdfs://nn01:9000/user/hive/warehouse/scott.db/emp/emp.csv    576     MILLER
Time taken: 0.617 seconds, Fetched: 14 row(s)

A third virtual column (ROW__OFFSET__INSIDE__BLOCK) provides the row offset of the file. It must be enabled explicitly and then you can use it in your query.
hive (scott)> set hive.exec.rowoffset=true;


Views: To Reduce Query Complexity                                       

hive (scott)> create view vw1 as select ename,dname,sal from emp e,dept d where e.deptno = d.deptno;
OK
ename   dname   sal
Time taken: 0.186 seconds


hive (scott)> select * from vw1;

hive (scott)> DROP VIEW IF EXISTS vw1;

You cannot use a view as a target of an INSERT or LOAD command.

Indexes                                                                                     

Hive has limited indexing capabilities.you can build an index on columns to speed some operations The index data for a table is stored in another table. Indexing is also a good alternative to partitioning when the logical partitions would actually be too numerous and small to be useful. Not all queries can benefit from an index, the EXPLAIN syntax and Hive can be used to determine if a given query is aided
by an index.
Indexes in Hive, like those in relational databases, need to be evaluated carefully. Maintaining an index requires extra disk space and building an index has a processing cost.

hive (scott)> CREATE INDEX emp_index ON TABLE emp (empno) AS 'COMPACT' WITH DEFERRED REBUILD;
OK
Time taken: 0.193 seconds

hive (scott)> SHOW INDEX ON emp;
OK
idx_name        tab_name        col_names       idx_tab_name    idx_type        comment
emp_index               emp                     empno                   scott__emp_emp_index__  compact
Time taken: 0.053 seconds, Fetched: 1 row(s)

hive (scott)> show tables;
OK
tab_name
dept
dual
emp
emp2
employees
scott__emp_emp_index__
stocks
translog
Time taken: 0.025 seconds, Fetched: 12 row(s)

hive (scott)> describe scott__emp_emp_index__;
OK
col_name        data_type       comment
empno                   string
_bucketname             string
_offsets                array<bigint>
Time taken: 0.034 seconds, Fetched: 3 row(s)


hive (scott)> select * from scott__emp_emp_index__;
OK
scott__emp_emp_index__.empno    scott__emp_emp_index__._bucketname      scott__emp_emp_index__._offsets
Time taken: 0.168 seconds


hive (scott)> select * from emp where empno=7788;
OK
emp.empno       emp.ename       emp.job emp.mgr emp.hiredate    emp.sal emp.comm        emp.deptno
7788    SCOTT   ANALYST 7566    19-APR-87       3000            20
Time taken: 0.126 seconds, Fetched: 1 row(s)

hive (scott)> drop index emp_index on emp;
OK
Time taken: 0.182 seconds

hive (scott)> ALTER INDEX emp_index ON emp REBUILD;

Schema Design                                                                        


Although Hive looks and acts like a relational database but is implemented and used in ways that are very different from conventional relational databases. Often, users try to carry over paradigms from the relational world that are actually Hive anti-patterns. There are some Hive patterns you should use and some anti-patterns you should avoid.

Hive does not have the concept of primary keys or automatic, sequence-based key generation. Joins should be avoided in favor of denormalized data, when feasible. The complex types, Array, Map, and Struct, help by allowing the storage of one-to-many data inside a single row. 
The primary reason to avoid normalization is to minimize disk seeks, such as those typically required to navigate foreign key relations. Denormalizing data permits it to be scanned from or written to large, contiguous sections of disk drives, which optimizes I/O performance. However, you pay the penalty of denormalization, data duplication
and the greater risk of inconsistent data.

Table-by-Day:  It is a pattern where a table named translog is appended with a timestamp

such as translog_20170401, translog_20170501, etc. Table-by-day is an anti-pattern in
the database world, but due to common implementation challenges of ever-growing
data sets, it is still widely used.


With Hive, a partitioned table (discussed in previous post already) should be used instead. Hive uses expressions in the WHERE clause to select input only from the partitions needed for the query.

Using Columnar Tables


Hive typically uses row-oriented storage, however Hive also has a columnar SerDe that stores information in a hybrid row-column orientated form. While this format can be used for any type of data there are some data sets that it is optimal for. For example , for repeated data fields eg; sate and age fields. We will discuss it later in another post.

Use of Compression
The only compelling reason to not use compression is when the data produced is intended for use by an external system, and an uncompressed format, such as text, is the most compatible.
But compression and decompression consumes CPU resources. MapReduce jobs tend to be I/O bound, so the extra CPU overhead is usually not a problem. However, for workflows that are CPU intensive, such as some machine-learning algorithms, compression may actually reduce performance by stealing valuable CPU resources from more essential operations.  We will discuss it later in another post.


No comments: