Saturday, July 8, 2017

Hive UDFs ( User Defined Functions)

Name says User Defined functions., I would rather name it to User Controlled Functionality (UCF).

Now if user is controlling/Defining it, means it is the user, who only decides what to do in function, and unfortunately Hive is written in java so user has to control Hive via java only.

So if you want to write any UDFs, you have to write java code, and its damn simple.

Before starting writing UDFs, question must be coming in mind why we require UDFs ?

Answer is simple:

Arrange Marriage vs Love Marriage

Arrange Marriage:
1. Pre-defined
2. and controlled by parents
3. means you do not have to worry about it, just have to do it.

Love Marriage : First think of your requirements.
1. kind of girl you want.
2. than after finding her/him, think of ways to woo her/him.
3. once wooed, than both start talking to parents for permissions,
4. and etc. etc. etc.

First one quite easy, isn't it ? that is the parent defined marriage (PDM), here parent is hive and anything built into(by) hive, we can directly use it.All Good, work done and no pain.

But we are adventurous, we want to define and control each and every thing of our partner, how can parents impose anyone on us or might be parents(Hive) can't understand my requirement. Go ahead man, take the pain and write all you requirement specific logic. all pain and lots of gain.

So I can say UDFs are required when:
User requirement can't be served or partially served by existing hive functions, than to achieve that functionality, we write our own function and that is called UDFs.

Lets start writing UDFs, I am using ECLIPSE IDE to write UDFs.

My requirement is wherever student got marks between 31 to 32  make it 33 ( give 1-2 grace marks to pass student )

Data Setup:

create a marks.txt in your local with below data:

sushil 31 Maths
Sushil 50 Phy
Sushil 10 Che
Kumar 31 Maths
KUmar 100 Phy
Kumar 30 Che
Madwani 32 Maths
Madwani 40 Phy
Madwani 50 Che
Naga 20 Maths
Naga 40 Phy
Naga 33 Che
Malli 10 Maths
Malli 50 Phy
Malli 60 Che
Rajesh 70 Maths
Rajesh 90 Phy
Rajesh 32 Che
Kumar 50 Maths
Kumar 60 Phy
Kumar 70 Che
Subhijit 100 Maths
Subhijit 90 Phy
Subhijit 20 Che
Asha 31 Maths
Asha 32 Phy
Asha 33 Che

2. Table Creation and data load:

CREATE TABLE udf_test ( name string, marks int, subject string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS TEXTFILE;


LOAD DATA LOCAL INPATH '/home/sushil/bigadata/hivedata/marks.txt' OVERWRITE INTO TABLE udf_test;

3. Write UDF using java.

1. Create a simple java project.













2. Add Hive Jars in the classpath from downloaded hive/lib folder.

3. Crate a class, lets name it Give1OR2GraceMarks which is extending hive UDF class.

package com.sushil;

import org.apache.hadoop.hive.ql.exec.UDF;

public class Give1OR2GraceMarks extends UDF {
public int evaluate(int value) {
if(value >30 && value < 33){
value=33;
}
return value;
}
}

4. right click on class and export as jar. I have given the jar name as same Give1OR2GraceMarks.jar


Now Girl/Boy is wooed ( UDF logic is written), next have to convince parents. here our parent is hive and we have to convince Hive to let us associate UDFs with hive. For that we have to do below :

1. add logic (jar) to the hive.

     hive> add jar /home/sushil/bigadata/hivedata/Give1OR2GraceMarks.jar;

The above command will add the jar into classpath.

2. create a temporary hive function for this for above functionality.

    hive> CREATE TEMPORARY FUNCTION GRACE AS 'com.sushil.Give1OR2GraceMarks';

3. run the select command on table data

    hive> select name, GRACE(marks), subject from udf_test;

4. If all good, you see in the results for all the students whose marks were 31 or 32 got 1 or 2 grace marks.

sushil 33 Maths
Sushil 50 Phy
Sushil 10 Che
Kumar 33 Maths
KUmar 100 Phy
Kumar 30 Che
Madwani 33 Maths
Madwani 40 Phy
Madwani 50 Che
Naga 20 Maths
Naga 40 Phy
Naga 33 Che
Malli 10 Maths
Malli 50 Phy
Malli 60 Che
Rajesh 70 Maths
Rajesh 90 Phy
Rajesh 33 Che
Kumar 50 Maths
Kumar 60 Phy
Kumar 70 Che
Subhijit 100 Maths
Subhijit 90 Phy
Subhijit 20 Che
Asha 33 Maths
Asha 33 Phy
Asha 33 Che


Do remember, this function will change data values temporally, if you see file in warehouse, you will see after querying no data changes.

hadoop fs -cat /user/hive/warehouse/hadoop.db/udf_test/marks.txt 

Result should be:

sushil 31 Maths
Sushil 50 Phy
Sushil 10 Che
Kumar 31 Maths
KUmar 100 Phy
Kumar 30 Che
Madwani 32 Maths
Madwani 40 Phy
Madwani 50 Che
Naga 20 Maths
Naga 40 Phy
Naga 33 Che
Malli 10 Maths
Malli 50 Phy
Malli 60 Che
Rajesh 70 Maths
Rajesh 90 Phy
Rajesh 32 Che
Kumar 50 Maths
Kumar 60 Phy
Kumar 70 Che
Subhijit 100 Maths
Subhijit 90 Phy
Subhijit 20 Che
Asha 31 Maths
Asha 32 Phy
Asha 33 Che

Tuesday, July 4, 2017

Hive Bucketing with Example

Before starting bucketing, its better to have idea around partitioning : Hive Partitioning

Hive partitioning ensures you have data segregation, which can fasten the data analysis process.

In Hive partitioning, when we talked about creating partitions around states, we segregated data in 29 groups.

After analysing data, Indian govt is interested in analysing how individual districts of each state has performed.

quite a easy task, create partition inside partition i.e. divide each state into districts.   

 CREATE EXTERNAL TABLE sp_MMR_data( 
                                         area String, 
                                           kidId long,
                                          vaccinated boolean,
                                       ) PARTITIONED BY (state String, district String)
                            ROW FORMAT DELIMITED 
                            FIELDS TERMINATED BY ','
                            STORED AS TEXTFILE; 

Each State folder will contain folders for each district. So far so good, everyone is happy and INDIAN govt published the report in papers with each state and each district data.

One state CM, read the paper and miffed with the low scores for his state summoned secretary with the details of data at the level of areas in each city.

To meet above requirement we can further partition the data using area code, but it can cause below 2 issues:

1. One distrct can have multiple areas with very few kids details resulting in very small files getting stored in HDFS. Which will result in not so good performance.

2. Within a district there could be possibility of un even data distribution among areas, which is again impact on the MR/Spark/Tez job performance.

So what can be done here?
Answer is why not divide the data in groups of areas i.e. group 5-10 areas of each district together.

This grouping of data is termed as BUCKETING in HIVE world.

Practice:

 CREATE EXTERNAL TABLE sp_buck_MMR_data( 
                                         area String, 
                                           kidId long,
                                           vaccinated boolean
                                       ) PARTITIONED BY (state String, district String)
                            CLUSTERED BY (area String) INTO 5 buckets
                                ROW FORMAT DELIMITED 
                            FIELDS TERMINATED BY ','
                            STORED AS TEXTFILE; 

All good, table created with partition and buckets, lets load data.

Create data file locally with below data:

vim MMRdata.txt;

Bellandur,1,Y,Bangalore,KA
Yelhanka,2,Y,Bangalore,KA
Yelhanka,3,Y,Bangalore,KA
Bellandur,4,Y,Bangalore,KA
Kudulu,5,Y,Bangalore,KA
HSR,6,Y,Mangalore,KA
HSR,7,Y,Mangalore,KA
Kudulu,8,Y,Bangalore,KA
Yelhanka,9,Y,Bangalore,KA
Yelhanka,10,Y,Bangalore,KA
KRPuram,11,Y,Bangalore,KA
KRPuram,12,Y,Bangalore,KA
KRPuram,13,Y,Bangalore,KA
HSR,14,Y,Mangalore,KA
Kudulu,15,Y,Bangalore,KA
Bellandur,16,Y,Bangalore,KA
Bellandur,17,Y,Bangalore,KA
Bellandur,18,Y,Bangalore,KA
Harlur,19,Y,Bangalore,KA
Harlur,20,Y,Bangalore,KA
Harlur,21,Y,Bangalore,KA
Harlur,22,Y,Bangalore,KA
Yelhanka,23,Y,Bangalore,KA
Yelhanka,24,Y,Bangalore,KA
HSR,25,Y,Mangalore,KA
KRPuram,26,Y,Bangalore,KA
HSR,27,Y,Mangalore,KA
Bellandur,28,Y,Bangalore,KA
Bellandur,29,Y,Bangalore,KA
Bellandur,30,Y,Bangalore,KA
Yelhanka,32,Y,Bangalore,KA

Create Temp table to load data ( think of reason why to load in temp table ):

CREATE EXTERNAL TABLE temp_MMR_data (
                        area STRING,
                        kidId int,
                        vaccinated String,
                        city string,
                        state string )
                        ROW FORMAT DELIMITED
                        FIELDS TERMINATED BY ','
                        STORED AS TEXTFILE;

Load file data in temp table;
 
LOAD DATA LOCAL INPATH '/home/sushil/MMRData.txt' INTO TABLE temp_MMR_data;

Load data in PARTITIONED AND BUCKETED Table:

INSERT OVERWRITE TABLE dp_buck_MMR_data PARTITION (city, state)
SELECT area, kidId, vaccinated, city, state from temp_mmr_data;

Verify load is successful by qurying the table:
select * from dp_buck_MMR_data;



Verify the file system:
you must be seeing folder structure like below in HDFS path

/user/hive/warehouse/hadoop.db/dp_mmr_data/city=Bangalore/state=KA/
state=KA – first level of partition
city=Bangalore/Mangalore – second level of partition

Within each city you must be seeing five files, which correspond to five buckets we created.


How hive decides which area will go to which bucket?
The simple algo is:
(hashcode of the area) % (no of buckets users requested) = bucket id

For the areas for which its hash code % buckets value is same, those all areas will go to the same bucket.



Sunday, July 2, 2017

Hive Partitioning with Example


Let's start with understanding partitioning using a simple example.

The recently concluded MMR ( April, 2017 ) vaccination in INDIA data is fed to the central system. Now task is to get state wise vaccination %ge.

There are two possible ways to do this activity:

1. Scan through complete data set each and every time with where clause of each state name and get the %ge.

How the performance would be? The task would take hours might be days as we have 29 states and around 45 cr of rows. It require 29 times scanning of 45 cr rows to get result.

2. While feeding the data create 29 folders for each state and ask each state to feed data in their respective folder only. sounds smart.

Now task is reduced, to just scan one folder at a time to get respective state data and can be done in parallel as data set is small and segregated.

This intelligent way of grouping data during data load is termed as PARTITIONING in hive.

Now the simple question, if you are scanning through the KARNATAKA data, do we get any data other than KARNATAKA state data in that folder.

if your answer is no than you answered the simple fact around partitioned table.
While creating partitioned table no need to include partitioned column in TABLE DEFINITION as it will taken care by PARTITIONED column.

in our above example no need to add partitioned column 'state' in table definition. In other words HIVE says, please do not add partitioned column in table as I can intelligently scan through the folder to get the value.

If you add partitioned column as a field in create table statement, it will be overkill and HIVE will not allow it so will throw exception.   


Partition is of two types:

1. STATIC PARTITION: User himself mentioning the segregation unit, either via using values of any one columns of the file to be loaded or creating virtual column(not part of the file) with values.

2. DYNAMIC PARTITION: User is just mentioning the column, on which partition is required. REST is taken care by hive itself. It will create segregation units based on the distinct column values.


Points to Ponder:

1. STATIC PARTITIONING means each and every thing is controlled by user, starting from mentioning the PARTITION column to loading data in that partitioned folder.

2. If Static partition is done over STATE column of MMR data and by mistake the data for CHHATTISGARH is placed inside KARNATAKA partition while loading, we will get 0 results for CHHATTISGARH. Reason being select on STATIC partition just look for the partition name, not inside the file data.

3. DYNAMIC PARTITIONING means hive will intelligently get the distinct values for partitioned column and segregate data. As hive is doing it there are few things to take care:

A. By default dynamic partitioning is enabled in HIVE.

B. By default it is strict means you require to do one level of STATIC partitioning before HIVE can do DYNAMIC partitioning inside this STATIC segregation units.

C. To enable full dynamic partitioning, we have to set below property to nonstrict in hive.
     
              hive> set hive.exec.dynamic.partition.mode=nonstrcit

D. By default number of partitions allowed for single node is 100 and cluster is 1000. so if you are running in pseudo distributed mode with single node and you partitioned column cardinality is more than 100, than set the below hive property as well.
    
           hive> set hive.exec.max.dynamic.partitions.pernode=1000

E. While creating STATIC partitioned table, you can directly load files into partitioned table as loading depends on user. However DYNAMIC partitioned table is handled by hive and hive doesn't know anything about file, we have to first load file in some temp table and than use that temp table to create dynamic partitioned table.


Practice:

Use link to do the data set up: data setup

STATIC PARTITIONING:

1. Create Table:

                         CREATE EXTERNAL TABLE sp_ratings ( 
                                           userid INT, 
                                           movieid INT, 
                                           tstamp BIGINT 
                          ) PARTITIONED BY (rating INT)
                            ROW FORMAT DELIMITED 
                            FIELDS TERMINATED BY '#'
                            STORED AS TEXTFILE; 

2. Load partitioned data:
         
           LOAD DATA INPATH '/hive/data/rating' INTO TABLE sp_ratings PARTITION (rating=1)

3. Verify data load:
       
        SELECT * from sp_ratings where rating = 1;

Did you notice, It prints all data i.e. data with rating 2,3,4,5, as well.

is partitioning not working ? think over and put in comment.

4. Verify file location:

               hadoop fs -ls /user/hive/warehouse/movielens.db/sp_ratings/

It should have folder named rating=1.


DYNAMIC PARTITIONING:

1. Create Table:

                         CREATE EXTERNAL TABLE dp_ratings ( 
                                           userid INT, 
                                           movieid INT, 
                                           tstamp BIGINT 
                          ) PARTITIONED BY (rating INT)
                            ROW FORMAT DELIMITED 
                            FIELDS TERMINATED BY '#'
                            STORED AS TEXTFILE; 

2. Create TEMP table:
           
                        CREATE EXTERNAL TABLE temp_ratings ( 
                                           userid INT, 
                                           movieid INT,
                                           rating INT, 
                                           tstamp BIGINT 
                          ) 
                            ROW FORMAT DELIMITED 
                            FIELDS TERMINATED BY '#'
                            STORED AS TEXTFILE; 

3. Load data in temp table from file:
         
           LOAD DATA LOCAL INPATH '/home/sushil/bigadata/hivedata/ratings' INTO TABLE temp_ratings;

4. Load data in dynamic partitioned table:
         
           INSERT INTO TABLE dp_ratings PARTITION (rating) SELECT userid, movieid, tstamp,rating FROM temp_ratings;

NOTE: The column order while select should be maintained except partitioned column, which should be selected last and if multiple they should be in creation order.

5. Verify data load:
       
        SELECT * from sp_ratings where rating = 1;
        SELECT * from sp_ratings where rating = 2;
        SELECT * from sp_ratings where rating = 3;
        SELECT * from sp_ratings where rating = 4;

6. Verify file location:

               hadoop fs -ls /user/hive/warehouse/movielens.db/dp_ratings/

It should have folder named rating=1, rating=2 etc...



if time permits will add combination of STATIC & DYNAMIC PARTITIONING. 

Hive Tables ( EXTERNAL, INTERNAL and TEMPORARY)


External VS Internal Tables:

Lets take the analogy of cloud computing vs traditional computing.

In traditional days, everything was set up in house. Servers, DBs, Web servers etc. so you are only responsible for these resources. You manage them and if your contract expire with client, these resources are of no use and become null and void, in other terms resources will be removed along with contract expiration.

In Cloud computing world everything is shared. You are not owning anything so no set up at your location, only you have the links to reach. You can use resources however you want. if you are not using it, resources will not get null and void as resources are shared. Same resources can and must be used by someone else. Cloud allows same resource usage by many.

Traditional computing is INTERNAL Tables and Cloud Computing is EXTERNAL tables.

INTERNAL TABLES:

1. Table created is also called MANAGED table.
2. In case MANAGED tables are dropped, data will be also trashed out from HDFS.
3. DO Not use INTERNAL table schema on top of SHARED data.
4. Not recommended for PRODUCTION.
5. By default table creation is INTERNAL Table.


EXTERNAL TABLES:

1. These are equivalent to cloud in our analogy, Do everything but hold nothing.
2. If you select data in these EXTERNAL tables you will get results.
3. If you DELETE some rows from tables and do SELECT, you won't see the deleted data.
4. However If you go to HDFS location from where data is loaded and search for the deleted row, you will find them in file as EXTERNAL tables are virtual schema on top of file data.
5. Its just an schema and if you make changes in data at table level, you can see the changes in table but at file level no changes are propagated i.e. METASTORE is taking care of table changes, REAL file is not touched at all.
5. If REAL file is not touched at all, it is TRUE virtualisation and many clients can use it for analysis purpose.
6. As WRITE ONCE and READ many times is MANTRA of HADOOP, so EXTERNAL Tables are the right tables to create.


Practice:

Before start creating Tables, lets ponder on some points:

1. Table can be created in two ways:

     A. Create table schema and load data in it, in other words move data from its location to                warehouse directory.

          Hive provides LOAD DATA [LOCAL] INPATH command for this purpose.
   
     B. Create table schema on top of exiting data, so effectively you are moving table creation logic to   the place where data resides, you are not moving the data.

        LOCATION command should be used for this during table creation.

2. Table creation from LOCAL cause file copy from LOCAL to HDFS, however table creation using HDFS file cause movement of data within HDFS only i.e. mv command is invoked.

3. DATA Set up :

        Download data from this location for this exp: Download.zip
     
        Remove the header line from data ( first line of the file ) keep the file somewhere in your local, and put a copy of the same in HDFS using put command as well.
   
        in my system, I have file LOCALLY at ''/home/sushil/bigadata/hivedata/geolocation.csv' and in HDFS at /hive/data/


INTERNAL tables:

CREATE TABLE geolocation 
( truckid String, driverid String ,event String ,latitude double,
longitude double,city String ,state String ,velocity int,event_ind int ,idling_ind int )
COMMENTS 'An example for internal table creation' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;


2. Load data from the file:


LOAD DATA LOCAL INPATH '/home/sushil/bigadata/hivedata/geolocation.csv' OVERWRITE INTO TABLE geolocation;


3. View data :

SELECT * FROM geolocation limit 100;

4. Check where file is in HDFS:

go to /user/hive/warehouse/movielens.db/geolocation/ location.

here movielens is my database in hive.

you should see a file name geolocation.csv is created.

so literally hadoop fs put command is invoked in case of in LOAD from local.




EXTERNAL table:

CREATE EXTERNAL TABLE geolocation_ext 
( truckid String, driverid String ,event String ,latitude double,
longitude double,city String ,state String ,velocity int,event_ind int ,idling_ind int ) 
COMMENTS 'An example for external table creation'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/hive/data/geolocation.csv'

2. Load data from the file:

DATA is already loaded as we have provided LOCATION during data table creation, so instead of data movement, SCHEMA is moved, i.e. schema is created on TOP of data. 

3. View data :

SELECT * FROM geolocation_ext limit 100;

4. Check where file is in HDFS:

go to /user/hive/warehouse/movielens.db/ location.

here movielens is my database in hive.

you should not see any folder named geolocation_ext created as file is not moved.

File would be available at its original location: /hive/data/geolocation.csv'

We have one more kind of TABLE named TEMPORARY table, request you to practice it as well.