Wednesday, January 25, 2017

Resilient Distributed Dataset (RDD)

In this post, I am going to provide breif introduction about Spark RDD, different ways to create RDDs and different operations on RDD.

What is a RDD?
Resilient
i.e fault-tolerant If the data in memory (or on a node) is lost, it can be recreated with the help of Lineage Graph.
Distributed
data is chunked into partitions and stored in memory across the cluster.
Dataset
initial data can come from a file or be created programmatically.

Resilient Distributed Dataset (RDD), the basic abstraction in Spark. RDD are immutable (does not change once created), partitioned collection of objects. Each RDD is split into multiple partitions (like inputsplits in MapReduce) and which performs in-memory computations on large clusters in a fault-tolerant manner and and all the function are performed only on RDDs.

Additional traits:
  • RDD does not actually contain data but just creates the pipeline for it.
  • Lazy evaluated - Data inside RDD is not available or transformed until an action is executed that triggers the execution.
  • Cacheable - RDDs can be cached across parallel operations
  • In-memory computing: Using RDDs iterative algorithms in machine learning and graph computations and executing ad-hoc queries on the same dataset efficiently by reusing intermediate in-memory results across multiple data-intensive workloads with no need for copying large amounts of data over the network.
Creating RDD's 
RDDs can be created in two different ways:
  • Hadoop Datasets :
    • Referencing an external dataset in any external storage system supported by Hadoop (Eg: Local FileSystem, Amazon S3, HBase, Casandra or any data source offering a Hadoop Input Format) in the driver program.
  • Parallelized collections
    • By parallelizing a collection of Scala objects (like List)
Before moving further let’s open the Spark Shell by switching into the home directory of Spark and type the following command. It will prompt Scala shell and also load the SparkContext as sc.

$ ./bin/spark-shell

Now you can start Spark programming in Scala.

Creating a RDD from Collection Object
When you want to create a RDD from an existing Scala collection object like Array, List, Tuples by calling SparkContext’s parallelize method on collection in driver program

scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

In the above program, first I created variable assign with an array of 5 elements and then I created RDD named 'distData' and uniquely identified (by id) by calling SparkContext’s 'parallelize' method on array.

To view the content of any RDD by using 'collect' method. Let see the content of distData by typing the below command:

scala> distData.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)

To view RDD's uniquely identified inside a SparkContext
scala> distData.id
res1: Int = 0

An RDD can optionally have a friendly name accessible using name, lets see how to set friendly name:
scala> distData.name = "Sample Data"
distData.name: String = Sample Data

scala> distData.name
res3: String = Sample Data


Creating a RDD from External sources
You can create a RDD from external dataset in any external storage system supported by Hadoop (Eg: Local FileSystem, Amazon S3, HBase, Casandra or any data source offering a Hadoop Input Format) in the driver program.

Lets create a RDD by loading a file:
scala> val lines = sc.textFile("sample.txt")

In the above program, created RDD by using SparkContext's textFile method. This method takes an URI of the file path either local (file://) or a HDFS (hdfs://) or S3 (s3://)

Note: RDD's resides in a SparkContext and SparkContext creates a logical boundary, RDDs can't be shared between SparkContexts.

Operations on RDD
Supports two kinds of operations on RDDs:

  • Transformations - which create a new dataset from an existing one
  • Actions - which return a value to the driver program after performing the computation on the dataset.

In next post, we will play with RDD by applying different operations. If you have any questions or doubts feel free to post them in the comments section.

Happy Learning :)

Monday, January 16, 2017

Loading HBase Table Data into Spark Dataframe

In this blog, I am going to showcase how HBase tables in Hadoop can be loaded as Dataframe.

Here, we will be creating Hive table mapping to HBase Table and then creating dataframe using HiveContext (Spark 1.6) or SparkSession (Spark 2.0) to load Hive table.

Let us create a table in HBase shell.

Create a table using following command:
hbase(main):002:0> create 'custumer_info', 'customer', 'purchases'

A table has been created with name 'custumer_info' and column families 'customer' and 'purchases'.

The scheme of this table can be checked using the following command:
hbase(main):003:0> describe 'custumer_info'

Let us try inserting some sample data into the table by using the following command:
hbase(main):004:0>put 'custumer_info', '101', 'customer:name', 'Satish'
hbase(main):005:0>put 'custumer_info', '101', 'customer:city', 'Bangalore'
hbase(main):006:0>put 'custumer_info', '101', 'purchases:product', 'Mobile'
hbase(main):007:0>put 'custumer_info', '101', 'purchases:price', '9000'

hbase(main):008:0>put 'custumer_info', '102', 'customer:name', 'Ramya'
hbase(main):009:0>put 'custumer_info', '102', 'customer:city', 'Bangalore'
hbase(main):010:0>put 'custumer_info', '102', 'purchases:product', 'Shoes'
hbase(main):011:0>put 'custumer_info', '102', 'purchases:price', '3500'

hbase(main):012:0>put 'custumer_info', '103', 'customer:name', 'Teja'
hbase(main):013:0>put 'custumer_info', '103', 'customer:city', 'Bangalore'
hbase(main):014:0>put 'custumer_info', '103', 'purchases:product', 'Laptop'
hbase(main):015:0>put 'custumer_info', '103', 'purchases:price', '35000'

We can check the number of records inserted by running the below command:
hbase(main):016:0> count 'custumer_info'

We have successfully created a table in HBase. Now let us check out this data in Spark.

Create a HiveTable mapping to HBase table by using following command:
sqlContext.sql("CREATE EXTERNAL TABLE custumer_info (key INT, customer_name STRING, customer_city STRING, product_name STRING, amount FLOAT) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key, customer:name, customer:city, purchases:product, purchases:price') TBLPROPERTIES('hbase.table.name' = 'custumer_info')")

we can check whether table is create or not by running following command:
hiveContext.sql("show tables").show()

Let us create Dataframe by running the below command:
val cust_df = hiveContext.sql("select * from custumer_info")

Now we have successfully loaded the DataFrame cust_df with the data in the table custumer_info which is in the HBase table.

You can see the DataFrame cust_df schema and contents of the custumer_info table using the DataFrame cust_df by using the following command:

cust_df.printSchema()

cust_df.show()


The whole stack trace is shown in the below screenshot, where you can see that the data in HBase table has been loaded into the Spark DataFrame successfully. Any kinds of operations can be performed on this data.





I hope this blog helped you in understanding the concept in-depth.

Enjoy Spark!


Saturday, January 14, 2017

Creating a SparkContext

In this post I am going to explain about SparkContext and creating SparkContext in Scala, Python and Java.

SparkContext is the entry point to Spark for a Spark application and establishes a connection to a cluster. SparkContext allows many functions like get and set configurations of the cluster for running or deploying the application, creating objects, scheduling jobs, canceling jobs and many more.

Now we will see how to create a new SparkContext in Scala, Python and Java

Scala
import org.apache.spark.{SparkConf, SparkContext}

// 1. Create Spark configuration
val conf = new SparkConf()
  .setAppName("Your Spark Application Name")
  .setMaster("local[*]")  // local mode

// 2. Create Spark context
val sc = new SparkContext(conf)

Python
from pyspark import SparkContext
from pyspark import SparkConf

''' 1. Create Spark configuration '''
conf = SparkConf()
.setAppName("Your Spark Application Name")
.setMaster("local[*]")

''' 2. Create Spark context '''
sc = SparkContext(conf=conf)

Java
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext

// 1. Create Spark configuration
SparkConf conf = new SparkConf()
.setAppName("Your Spark Application Name")
.setMaster("local[*]");

// 2. Create Spark context
JavaSparkContext sc = new JavaSparkContext(conf);

Once a SparkContext instance is created you can use it to create RDDs, Accumulators and Broadcast variables, access Spark services and run jobs (until SparkContext is stopped).

Enjoy Spark!

Setup Environment for Spark Development on Windows

In this post, i am going to show you how to setup Spark without Hadoop in standalone mode in windows.

Step 1: Install JDK (Java Development Kit)
Download JDK7 or later from http://www.oracle.com/technetwork/java/javase/downloads/index.html and note the path where you installed.

Step 2: Download Apache Spark
Download a pre-built version of Apache Spark archive from https://spark.apache.org/downloads.html. Extract the downloded Spark archive and note the path where you extracted. (for example C:\dev_tools\spark)

Step 3: Download winutils.exe for Hadoop
Though we are not using Hadoop, spark throws error 'Failed to load the winutils binary in the hadoop binary path'. So download winutils.exe from winutils.exe and place it into a folder (for example C:\dev_tools\winutils\bin\winutils.exe)

Note: winutils.exe utility may varies with OS. If it doesn't support to your OS, find supporting one from winutils and use.

Step 4: Create Environment Variables
Open Control Panel -> System and Security -> Click on 'Advanced System Settings' -> Click on 'Environment Variables' button.
Add the following new USER variables:
JAVA_HOME <JAVA_INSTALLED PATH> (C:\Program Files\Java\jdk1.8.0_101)
SPARK_HOME <SPARK_EXTRACTED_PATH> ( C:\dev_tools\spark)
HADOOP HOME <WINUTILES_PATH> (C:\dev_tools\winutils)

Step 5: Set Classpath
Add following paths to your PATH user variable:
%SPARK_HOME%\bin
%JAVA_HOME%\bin

Step 6: Now Test it out!
1. Open command prompt in administrator mode.
2. Move to path where you setup the spark (i.e, C:\dev_tools\spark)
3. Check for a text file to play with like README.md
4. Type spark-shell to enter spark-shell
5. Execute following statements
val rdd = sc.textFile("README.md")
rdd.count()
You should get count of the number of lines in that file.

Congratulations, you setup done and successfully run first Spark program also :)

Enjoy Spark!

Wednesday, January 4, 2017

Create DataFrame from list of tuples using Pyspark

In this post I am going to explain creating a DataFrame from list of tuples in PySpark. I am using Python2 for scripting and Spark 2.0.1

Create a list of tuples
listOfTuples = [(101, "Satish", 2012, "Bangalore"),
(102, "Ramya", 2013, "Bangalore"),
(103, "Teja", 2014, "Bangalore"),
(104, "Kumar", 2012, "Hyderabad")]

Create Dataframe out of listOfTuples
df = spark.createDataFrame(listOfTuples , ["id", "name", "year", "city"])

Check the schema
df.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- year: long (nullable = true)
 |-- city: string (nullable = true)

Print data
df.show()
+---+------+----+---------+
| id|  name|year|     city|
+---+------+----+---------+
|101|Satish|2012|Bangalore|
|102| Ramya|2013|Bangalore|
|103|  Teja|2014|Bangalore|
|104| Kumar|2012|Hyderabad|
+---+------+----+---------+

Enjoy Spark!

Tuesday, January 3, 2017

Integrate third party package to Spark application

In this post, I’ll show you how to integrate third party packages (like spark-avro, spark-csv,  spark-redshift, spark-cassandra-connector, hbase) to your Spark application.

Lets take an example spark-avro, which allows you to read/write data in the Avro format using Spark.

Different ways to integrate third party package with Spark Application

Include package to Spark Shell/Applications using --jars
Download the jar file (spark-avro_2.11-3.1.0.jar) from below URL
https://spark-packages.org/package/databricks/spark-avro

Launch the spark shell with the jar file:
$SPARK_HOME/bin/spark-shell --jars <DOWNLOAD_PATH>/spark-avro_2.11-3.1.0.jar

Run Spark Application with jar file:
spark-submit --jars <DOWNLOAD_PATH>/spark-avro_2.11-3.1.0.jar <SPARK_SCRIPT>.jar

Include package in your Spark Shell/Applications using --package
Add maven coordinate as a argument to --package then it will install and available to use in your Spark Application. If you want pass multiple packages then list the packages with comma as separator.

Launch the spark shell with --package
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-avro_2.11:3.1.0

Run Spark Application with --package
spark-submit --packages com.databricks:spark-avro_2.11:3.1.0 <SPARK_SCRIPT>.jar

Try the following script to test the package:
// import packages 
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

// Read Avro data 
val df = spark.read.format("com.databricks.spark.avro").load("<INPUT_DIR>")

// Write Avro
df.write.format("com.databricks.spark.avro").save("<OUT_DIR>")

Change the highlighted part to the location of Avro data to read/write.

Enjoy Spark!

Friday, November 4, 2016

UBER mode in Hadoop2 and its configuration

ResourceManager will create separate container for mapper and reducer by default. In Uber mode will allows to run mapper and reducer in the same process as the ApplicationMaster.

Jobs running in uber mode are Uber Jobs. Uber jobs are executed within the ApplicationMaster. Rather then communicate with ResourceManager to create the mapper and reducer containers. The ApplicationMaster runs the map and reduce tasks within its own process and avoided the overhead of launching and communicate with remote containers.

Why we go for UBER Mode?
If you have a small dataset or you want to run MapReduce on small amount of data, Uber configuration will help you out, by reducing additional time that MapReduce normally spends mapper and reducers phase.

Uber mode supports only for map-only jobs and jobs with one reducer.

Configurations to enable jobs to run in UBER Mode
There are four core settings around the configuration of UBER Jobs in the mapred-site.xml. 

Configuration options for Uber Jobs:

mapreduce.job.ubertask.enable (Default = false)
Whether to enable the small-jobs "ubertask" optimization, which runs "sufficiently small" jobs sequentially within a single JVM. 

mapreduce.job.ubertask.maxmaps (Default = 9)
Threshold value for the number of maps beyond which a job is considered too large for the ubertasking optimization. Users can override this value, but only downward.

mapreduce.job.ubertask.maxreduces (Default = 1)
Threshold value for the number of reduces beyond which a job is considered too large for the ubertasking optimization. 
Note: Currently the code can't support more than one Reducer and will ignore larger values.

mapreduce.job.ubertask.maxbytes (Default = HDFS Block Size)
Threshold value for the number of input bytes beyond which a job is considered too large for the ubertasking optimization.
If no value is specified, dfs.block.size is used as the default. Be sure to specify a default value in mapred-site.xml if the underlying file system is not HDFS.