Overview
Spark DataFrames makes it easy to read from a variety of data formats, including JSON. In fact, it even automatically infers the JSON schema for you. Once the data is loaded, however, figuring out how to access individual fields is not so straightforward. This post will walk through reading top-level fields as well as JSON arrays and nested objects. The code provided is for Spark 1.4. Update: please see my updated post on an easier way to work with nested array of struct JSON data.
Load the JSON File
Let's begin by loading a JSON file, where each line is a JSON object:
{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}
The Scala code to read a JSON file:
>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame
Read a Top-Level Field
With the above command, all of the data is read into a DataFrame. In the following examples, I will show how to extract individual fields into arrays of primitive types. Let's start with the top-level "name" field:
>> val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
>> names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)
Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.
Flatten and Read a JSON Array
Update: please see my updated post on an easier way to work with nested array of struct JSON data.
Next, notice that each Person has an array of "cities". Let's flatten these arrays and read out all their elements.
Next, notice that each Person has an array of "cities". Let's flatten these arrays and read out all their elements.
>> val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame
>> val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]
>> allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)
The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.
Read an Array of Nested JSON Objects, Unflattened
Finally, let's read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:
>> val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]
>> val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]
>> schoolsArr.foreach(schools => {
>> schools.map(row => print(row.getString(0), row.getLong(1)))
>> print("\n")
>> })
(stanford,2010)(berkeley,2012)
(ucsb,2011)
(berkeley,2014)
Use select() and collect() to select the "schools" array and collect it into an Array[Row]. Now, each "schools" array is of type List[Row], so we read it out with the getSeq[Row]() method. Finally, we can read the information for each individual school, by calling getString() for the school name and getLong() for the school year. Phew!
Summary
In this blog post, we have walked through accessing top-level fields, arrays, and nested JSON objects from JSON data. The key classes involved were DataFrame, Array, Row, and List. We used the select(), collect(), and explode() DataFrame methods, and the getString(), getLong(), and get Seq[T]() Row methods to read data out into arrays of primitive types.
Nice Article. Very useful. Thanks
ReplyDeleteHow can one do this with the Java API?
ReplyDeleteYou can't use "explode" in Java
ReplyDeleteReally thanks to share valuable Spark tips, i request, please create subscribe by email button to get instant posts info.
ReplyDeleteI added the subscribe by email button. Thanks for reading.
DeleteLoved your article!
ReplyDeleteCool, thanks.
DeleteCan we use in Java API?
ReplyDeleteOddly, to make the `flattened` line work I had to change the List[String] from a List to a scala.collection.mutable.WrappedArray.
ReplyDeleteNow to figure out how to do an explode on structs like the schools instead... then generalize so the it can infer the types from the schema.
I think it may have changed to WrappedArray in Spark 1.5. My code was for 1.4.
Delete