Showing posts with label json. Show all posts
Showing posts with label json. Show all posts

Saturday, May 14, 2016

Reading JSON Nested Array in Spark DataFrames

In a previous post on JSON data, I showed how to read nested JSON arrays with Spark DataFrames. Now that I am more familiar with the API, I can describe an easier way to access such data, using the explode() function. All of the example code is in Scala, on Spark 1.6.

Loading JSON data

Suppose you have a file with JSON data, with one JSON object per line:

{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "schools":[{"sname":"ucsb", "year":2011}]}

You can read it into a DataFrame with the SqlContext read() method:

>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

>> people.show()
+-------+--------------------+ | name| schools| +-------+--------------------+ |Michael|[[stanford,2010],...| | Andy| [[ucsb,2011]]| +-------+--------------------+

Notice that the second column "schools", is an Array type, and each element of the array is a Struct:

>> people.printSchema()
root |-- name: string (nullable = true) |-- schools: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- sname: string (nullable = true) | | |-- year: long (nullable = true)


Nested Array of Struct

Flatten / Explode an Array

If your JSON object contains nested arrays of structs, how will you access the elements of an array? One way is by flattening it. For instance, in the example above, each JSON object contains a "schools" array. We can simply flatten "schools" with the explode() function.

>> import org.apache.spark.sql.functions._
val flattened = people.select($"name", explode($"schools").as("schools_flat"))
flattened: org.apache.spark.sql.DataFrame

>> flattened.show()
+-------+---------------+ | name| schools_flat| +-------+---------------+ |Michael|[stanford,2010]| |Michael|[berkeley,2012]| | Andy| [ucsb,2011]| +-------+---------------+

Now each school is on a separate row. The new column "schools_flat" is of type Struct.

Select into Struct

Now you can select, for instance, all the school names within each struct, by using the DataFrame select() method. The struct has two fields: "sname" and "year". We will select only the school name, "sname":

>> val schools = flattened.select("name""schools_flat.sname")
schools: org.apache.spark.sql.DataFrame = [sname: string]

>> schools.show()
+-------+--------+
| name| sname| +-------+--------+ |Michael|stanford| |Michael|berkeley| | Andy| ucsb| +-------+--------+

There you have it! We have taken data that was nested as structs inside an array column and bubbled it up to a first-level column in a DataFrame. You can now manipulate that column with the standard DataFrame methods.

References


  1. The DataFrame API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
  2. The explode() function: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$


Thursday, March 3, 2016

Spark 1.6 Datasets API: Example Usage

Overview

Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.

Creating a Dataset from a JSON file

Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.
> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON = sqlContext.read.json("students.json").as[Student]

Creating a Dataset from a Text file

Suppose instead you have data in a text file, in tab-separated (.tsv) format:
Alice<tab>Math<tab>18
Bob<tab>CS<tab>19
Carl<tab>Math<tab>21

To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]


// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)

// Functional programming to parse the lines into a Dataset[Student].

val students = studentsFromText.
  map(line => {
    val cols = line.split("\t") // parse each line
    Student(cols(0), cols(1), cols(2).toInt)
  })
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]

// Show the contents of the Dataset.
> students.show()
| name|dept|age|
+-----+----+---+
|Alice|Math| 18|
|  Bob|  CS| 19|
| Carl|Math| 21|

Relational queries

Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.

SELECT, FILTER

Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
> students.select($"name".as[String], $"dept".as[String]).
    filter(_._2 == "Math").  // Filter on _2, the second selected column
    collect()
(result) Array((Alice,Math), (Carl,Math))

GROUP BY, COUNT

Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))

GROUP BY, AVG

Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._

// Group and aggregate in each group.
> students.groupBy(_.dept).
    agg(avg($"age").as[Double]).
    collect()
(result) Array((CS,19.0), (Math,19.5))

JOIN

Suppose we have a separate table with deparment information. We would like to join the department information into our student table.

First, create the department Dataset.
// The Department type.
> case class Department(abbrevName: String, fullName: String)

// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

// Show the contents of the Dataset.
> depts.show()

|abbrevName|        fullName|
+----------+----------------+
|        CS|Computer Science|
|      Math|     Mathematics|

Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")

// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.
> joined.show()

|             _1|                  _2|

+---------------+--------------------+
|[Alice,Math,18]|  [Math,Mathematics]|
|    [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]|  [Math,Mathematics]|

Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. 
// Notice that the original Dataset types are preserved.
> joined.map(s => (s._1.name, s._2.fullName)).show()

|   _1|              _2|

+-----+----------------+
|Alice|     Mathematics|
|  Bob|Computer Science|
| Carl|     Mathematics|

EXPLAIN

"Explain" prints the query's physical plan for debugging.

// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()

== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
   :- ConvertToUnsafe
   :  +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
   :     +- ConvertToSafe
   :        +- Scan TextRelation[value#168157] InputPaths: /students.tsv
   +- ConvertToUnsafe
      +- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]



Thursday, June 18, 2015

Reading JSON data in Spark DataFrames

Overview

Spark DataFrames makes it easy to read from a variety of data formats, including JSON. In fact, it even automatically infers the JSON schema for you. Once the data is loaded, however, figuring out how to access individual fields is not so straightforward. This post will walk through reading top-level fields as well as JSON arrays and nested objects. The code provided is for Spark 1.4. Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Load the JSON File

Let's begin by loading a JSON file, where each line is a JSON object:

{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}

The Scala code to read a JSON file:

>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

Read a Top-Level Field

With the above command, all of the data is read into a DataFrame. In the following examples, I will show how to extract individual fields into arrays of primitive types. Let's start with the top-level "name" field:

>> val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

>> names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)

Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.

Flatten and Read a JSON Array

Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Next, notice that each Person has an array of "cities". Let's flatten these arrays and read out all their elements.

>> val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame

>> val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]

>> allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)

The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.

Read an Array of Nested JSON Objects, Unflattened

Finally, let's read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:

>> val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]

>> val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]

>> schoolsArr.foreach(schools => {
>>    schools.map(row => print(row.getString(0), row.getLong(1)))
>>    print("\n")
>> })
(stanford,2010)(berkeley,2012) 
(ucsb,2011) 
(berkeley,2014)

Use select() and collect() to select the "schools" array and collect it into an Array[Row]. Now, each "schools" array is of type List[Row], so we read it out with the getSeq[Row]() method. Finally, we can read the information for each individual school, by calling getString() for the school name and getLong() for the school year. Phew!

Summary

In this blog post, we have walked through accessing top-level fields, arrays, and nested JSON objects from JSON data. The key classes involved were DataFrame, Array, Row, and List. We used the select(), collect(), and explode() DataFrame methods, and the getString(), getLong(), and get Seq[T]() Row methods to read data out into arrays of primitive types.