Showing posts with label dataset. Show all posts
Showing posts with label dataset. Show all posts

Friday, July 29, 2016

Overview of Spark 2.0 Dataset / DataFrame API, Part 2

Introduction

In Part 1 of this series, we examined type-safe operations with Datasets. In Part 2, we will cover untyped operations with DataFrames. Being untyped, DataFrames are well-suited for data exploration, ad-hoc queries, and data munging.

DataFrame

DataFrames are still available in Spark 2.0, and remain mostly unchanged. The biggest change is that they have been merged with the new Dataset API. The DataFrame class no longer exists on its own; instead, it is defined as a specific type of Dataset: type DataFrame = Dataset[Row]. However, all of the functionality from 1.6 is still there.

Outline

  • Example Data
  • DataFrames: Untyped Language-Integrated SQL Queries
  • DataFrames: SQL Queries in SQL
  • DataFrames: Adding Columns, Data Munging

Example Data

We will continue with the example data from Part 1. We have defined a "Student" class as:

case class Student(name: String, dept: String, age: Long)

The example data has been read into a Dataset[Student]:
ds
ds: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

DataFrames: Untyped Language-Integrated SQL Queries

DataFrames supports language-integrated SQL queries, such as "select", "where", and "group by".

Convert to DataFrame

To convert a Dataset into a DataFrame:

val df = ds.toDF()
df: org.apache.spark.sql.DataFrame

Select, Where

To select columns and specify a "where" clause:

> val selected = df.select("name", "age")
                             .where($"age" === 21)
selected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
| name|age| +-----+---+ |Alice| 21|

Count

To count the number of rows:

> df.count()
res1: Long = 3

GroupBy, Aggregate

To perform a "group by" and aggregate within each group, use "groupBy" and "agg". A number of aggregation functions, such as "avg", are available in the "functions" object. To group by a column and compute the average in each group:

import org.apache.spark.sql.functions._
> val avgAge2 = df.groupBy("dept")
                               .agg(avg($"age"))
|dept|avg(age)| +----+--------+ |Math| 23.0| | CS| 23.0|

Join

You can join two DataFrames with "join". To create a second DataFrame with department info:

> case class Department(deptName: String, building: Int)
> val depts = Seq(Department("Math", 125), Department("CS", 110)).toDF()
|deptName|building| +--------+--------+ | Math| 125| | CS| 110|

Then, to join the students DataFrame with the new department DataFrame:

> val joined2 = df.join(depts, df("dept") === depts("deptName"))
|age|dept| name|deptName|building| +---+----+-----+--------+--------+ | 21|Math|Alice| Math| 125| | 23| CS| Bob| CS| 110| | 25|Math| Carl| Math| 125|

Explain

To examine the query plan used to compute a DataFrame:

> joined2.explain()
== Physical Plan == *BroadcastHashJoin [dept#134], [deptName#384], Inner, BuildRight :- *Filter isnotnull(dept#134) : +- Scan ExistingRDD[age#133L,dept#134,name#135] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(deptName#384) +- LocalTableScan [deptName#384, building#385]

DataFrames: SQL Queries in SQL

You can also query DataFrames with SQL. First create a temp view and then specify SQL queries against that view:

> df.createTempView("StudentTable")

> val sqlResults = spark.sql("SELECT name, dept FROM StudentTable"// "spark" is a SparkSession object
| name|dept| +-----+----+ |Alice|Math| | Bob| CS| | Carl|Math|

DataFrames: Adding Columns, Data Munging

DataFrames support creating new columns and data munging. To add a column, use "withColumn" to specify a new column name and an expression for column values. The Column class defines column operations, such as the minus operator shown below. The "functions" object also contains convenient functions for working with columns, such as math, string, and date / time functions.

In this example, the "lit" function, defined in "functions", returns a Column populated with a literal value. The Column class minus operator performs subtraction. The "$" method returns the Column associated with the given column name:

import org.apache.spark.sql.functions._
val withCol = df.withColumn("birthYear", lit(2016) - $"age")
|age|dept| name|birthYear| +---+----+-----+---------+ | 21|Math|Alice| 1995| | 23| CS| Bob| 1993| | 25|Math| Carl| 1991|

In the next example, the "round" function, defined in "functions", rounds values to the nearest tens digit:

val rounded = df.withColumn("roundedAge", round($"age", -1))
|age|dept| name|roundedAge| +---+----+-----+----------+ | 21|Math|Alice| 20| | 23| CS| Bob| 20| | 25|Math| Carl| 30|

Summary

In Spark 2.0, DataFrames have been merged into the DataSet API. DataFrame is a special type of Dataset that has untyped operations. DataFrames support convenient ways to query data, either through language-integrated queries or SQL. DataFrames are also useful in creating new columns and data munging.

Thursday, March 3, 2016

Spark 1.6 Datasets API: Example Usage

Overview

Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.

Creating a Dataset from a JSON file

Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.
> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON = sqlContext.read.json("students.json").as[Student]

Creating a Dataset from a Text file

Suppose instead you have data in a text file, in tab-separated (.tsv) format:
Alice<tab>Math<tab>18
Bob<tab>CS<tab>19
Carl<tab>Math<tab>21

To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]


// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)

// Functional programming to parse the lines into a Dataset[Student].

val students = studentsFromText.
  map(line => {
    val cols = line.split("\t") // parse each line
    Student(cols(0), cols(1), cols(2).toInt)
  })
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]

// Show the contents of the Dataset.
> students.show()
| name|dept|age|
+-----+----+---+
|Alice|Math| 18|
|  Bob|  CS| 19|
| Carl|Math| 21|

Relational queries

Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.

SELECT, FILTER

Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
> students.select($"name".as[String], $"dept".as[String]).
    filter(_._2 == "Math").  // Filter on _2, the second selected column
    collect()
(result) Array((Alice,Math), (Carl,Math))

GROUP BY, COUNT

Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))

GROUP BY, AVG

Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._

// Group and aggregate in each group.
> students.groupBy(_.dept).
    agg(avg($"age").as[Double]).
    collect()
(result) Array((CS,19.0), (Math,19.5))

JOIN

Suppose we have a separate table with deparment information. We would like to join the department information into our student table.

First, create the department Dataset.
// The Department type.
> case class Department(abbrevName: String, fullName: String)

// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

// Show the contents of the Dataset.
> depts.show()

|abbrevName|        fullName|
+----------+----------------+
|        CS|Computer Science|
|      Math|     Mathematics|

Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")

// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.
> joined.show()

|             _1|                  _2|

+---------------+--------------------+
|[Alice,Math,18]|  [Math,Mathematics]|
|    [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]|  [Math,Mathematics]|

Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. 
// Notice that the original Dataset types are preserved.
> joined.map(s => (s._1.name, s._2.fullName)).show()

|   _1|              _2|

+-----+----------------+
|Alice|     Mathematics|
|  Bob|Computer Science|
| Carl|     Mathematics|

EXPLAIN

"Explain" prints the query's physical plan for debugging.

// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()

== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
   :- ConvertToUnsafe
   :  +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
   :     +- ConvertToSafe
   :        +- Scan TextRelation[value#168157] InputPaths: /students.tsv
   +- ConvertToUnsafe
      +- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]