Thursday, June 12, 2014

Storm vs. Spark Streaming: Side-by-side comparison

Overview

Both Storm and Spark Streaming are open-source frameworks for distributed stream processing. But, there are important differences as you will see in the following side-by-side comparison.

Processing Model, Latency

Although both frameworks provide scalability and fault tolerance, they differ fundamentally in their processing model. Whereas Storm processes incoming events one at a time, Spark Streaming batches up events that arrive within a short time window before processing them. Thus, Storm can achieve sub-second latency of processing an event, while Spark Streaming has a latency of several seconds. 

Fault Tolerance, Data Guarantees

However, the tradeoff is in the fault tolerance data guarantees. Spark Streaming provides better support for stateful computation that is fault tolerant. In Storm, each individual record has to be tracked as it moves through the system, so Storm only guarantees that each record will be processed at least once, but allows duplicates to appear during recovery from a fault. That means mutable state may be incorrectly updated twice. 

Spark Streaming, on the other hand, need only track processing at the batch level, so it can efficiently guarantee that each mini-batch will be processed exactly once, even if a fault such as a node failure occurs. [Actually, Storm's Trident library also provides exactly once processing. But, it relies on transactions to update state, which is slower and often has to be implemented by the user.]

Storm vs. Spark Streaming comparison.

Summary

In short, Storm is a good choice if you need sub-second latency and no data loss. Spark Streaming is better if you need stateful computation, with the guarantee that each event is processed exactly once. Spark Streaming programming logic may also be easier because it is similar to batch programming, in that you are working with batches (albeit very small ones).

Implementation, Programming API

Implementation

Storm is primarily implemented in Clojure, while Spark Streaming is implemented in Scala. This is something to keep in mind if you want to look into the code to see how each system works or to make your own customizations. Storm was developed at BackType and Twitter; Spark Streaming was developed at UC Berkeley.

Programming API

Storm comes with a Java API, as well as support for other languages. Spark Streaming can be programmed in Scala as well as Java.

Batch Framework Integration

One nice feature of Spark Streaming is that it runs on Spark. Thus, you can use the same (or very similar) code that you write for batch processing and/or interactive queries in Spark, on Spark Streaming. This reduces the need to write separate code to process streaming data and historical data.

Storm vs. Spark Streaming: implementation and programming API.

Summary

Two advantages of Spark Streaming are that (1) it is not implemented in Clojure :) and (2) it is well integrated with the Spark batch computation framework.

Production, Support

Production Use

Storm has been around for several years and has run in production at Twitter since 2011, as well as at many other companies. Meanwhile, Spark Streaming is a newer project; its only production deployment (that I am aware of) has been at Sharethrough since 2013.

Hadoop Distribution, Support

Storm is the streaming solution in the Hortonworks Hadoop data platform, whereas Spark Streaming is in both MapR's distribution and Cloudera's Enterprise data platform. In addition, Databricks is a company that provides support for the Spark stack, including Spark Streaming.

Cluster Manager Integration

Although both systems can run on their own clusters, Storm also runs on Mesos, while Spark Streaming runs on both YARN and Mesos.

Storm vs. Spark Streaming: production and support.

Summary

Storm has run in production much longer than Spark Streaming. However, Spark Streaming has the advantages that (1) it has a company dedicated to supporting it (Databricks), and (2) it is compatible with YARN.


Further Reading

For an overview of Storm, see these slides.

For a good overview of Spark Streaming, see the slides to a Strata Conference talk. A more detailed description can be found in this research paper.

Update: A couple of readers have mentioned this other comparison of Storm and Spark Streaming from Hortonworks, written in defense of Storm's features and performance.

April, 2015: Closing off comments now, since I don't have time to answer questions or keep this doc up-to-date.

Friday, April 25, 2014

Bay Area Bike Share Data

Bay Area Bike Share recently made some of their bike trip data available for their open data challenge. Curious about how this new transportation option is used, I took a look at the data myself. The code behind my analysis is available on github.

Trip Length

This bike share system is designed for short trips, between bike stations. Of the 144,000 trips in the data set, the median trip length was only nine minutes. And, about 96% of trips were less than one hour long. This makes sense since there is a late fee charged if a bike is not returned to a station within half an hour. Of course, one could go on longer rides without incurring a late fee by hopping between stations, as long as each hop is shorter than a half hour; however, the data set does not provide enough information to infer whether a trip was part of a multi-leg ride.

The median trip length was nine minutes.

Late Fees

About 4% of the trips (5696) were longer than one hour and, thus, incurred a late fee. In fact, the average length of these long trips was about 4.5 hours. Since a late charge of $7 applies to every half hour after one hour, this amounts to $49 in late fees, for a total of about $280k in late fees collected overall. Now that's a great revenue stream! :)

Why are these bikes returned late? Did the rider not know about the late fee? Did they get lost? I do not know, but a vast majority of these trips, 94% (5379), were taken by short-term customers (customers with 1- or 3-day memberships), as opposed to subscribers (who hold annual memberships), even though short-term customers are represented in only 21% of all trips.

Popular Stations

The top three stations at which trips started were all in downtown San Francisco:
  1. San Francisco Caltrain (Townsend at 4th)     :  9838
  2. Harry Bridges Plaza (Ferry Building)         :  7343
  3. Embarcadero at Sansome                       :  6545
However, if broken down by membership type, the most popular stations were:
  • Among short-term customers (1- or 3-day memberships): Embarcadero at Sansome
  • Among subscribers (annual membership): San Francisco Caltrain (Townsend at 4th)

Day of Week

The weekly patterns also vary by membership type. Short-term customers tend to take more trips over the weekend, while subscribers tend to ride on week days:

Customers (with 1- or 3-day memberships) preferred weekends.

Subscribers (with annual memberships) preferred weekdays.

Additional Data

The above analysis was only on trip data. The data set also contained weather data, bike station information, and station rebalancing data. If you are interested in taking a look, check out the data from Bay Area Bike Share and the R code used in my analysis. 

Tuesday, January 14, 2014

Scaling Realtime Analytics on Big Data

Scaling Realtime Analytics

How do you scale realtime analytics for big data? Consider the case where big data consists of both incoming streaming data, as well as historical data, and you are providing analytics in realtime over the entire data set. In other words, the data is big, and growing quickly, while the desired latency is small. To implement this at scale you can distribute the processing over a cluster of machines. Of course, distributed stream processing comes with some challenges: implementing incremental algorithms, ensuring data consistency, and making the system robust by recovering quickly from node failures and dealing with dropped or duplicate data.

Lambda Architecture

One solution to this problem is the Lambda architecture, as described by Nathan Marz (formerly of BackType, Twitter). This architecture splits up the analytics calculation into a batch layer and a speed (streaming) layer. In the batch layer, as much of the calculation as possible is precomputed with batch processing over historical data. As the dataset grows, the calculation is constantly recomputed. The speed layer applies stream processing on only the most recent data. In this way, most of the processing takes place in batch jobs, in a platform such as Hadoop, while the complexity of stream processing is minimized to affect only the latest data. Errors or approximations that arise during stream processing are corrected when the data is later reprocessed in the batch layer. A serving layer combines the intermediate results from the batch and speed layers to produce the final analytics.

From Nathan Marz's presentation "Runaway Complexity in Big Data"

The robustness of this approach relies on two things: (1) immutable data and (2) continuous recomputation. That is, a master dataset of all the raw data is kept around indefinitely, and, as this dataset grows, calculations are constantly recomputed over the entire data set. Any failures, bugs, approximations, or missing features in the data processing are easily corrected because you are always recomputing everything from scratch. This architecture aligns well with the fundamental assumptions of big data: horizontally scalable data storage and computation. You can read more about the motivation behind this architecture in Nathan Marz's blog post. It is also described in detail in the book Big Data.

Lambda Architecture in Practice

In practice, you could run the batch layer on Hadoop, the speed (stream processing) layer on Storm, and store the results in a scalable database such as Cassandra. These intermediate results are read and merged by the serving layer.

Avoiding Code Duplication

Many applications will use the same processing logic in both the batch and speed layers. For example, to count the number of page views of a URL, in the batch layer you might implement a Hadoop Map function that adds one for each new page view, and in the speed layer write a Storm bolt that does the same thing. In a real world application, this could lead to implementing the same business logic twice: once for the batch layer and again for the speed layer. To avoid duplicate code, there are a couple of options: (1) adopting the Summingbird framework or (2) running Spark and Spark Streaming.

Summingbird

Summingbird (a Twitter open source project) is a library that lets you write MapReduce programs in a Scala DSL, which can then be executed on different batch and stream processing platforms, such as Scalding (Hadoop) or Storm. It also provides a framework for the Lambda architecture. All you have to do is write your processing logic as a MapReduce program. Then you configure Summingbird, in its hybrid batch/realtime mode, to execute this program on Scalding, for the batch layer, and Storm, for the speed layer, and then to merge the results into a final analytic.

Spark and Spark Streaming


Alternatively, instead of Hadoop/Storm, you could run the batch layer on Spark and the streaming layer on Spark Streaming (both open source projects from UC Berkeley). Because both platforms share the same Scala API, you would be able to reuse the same code for both the batch and speed layers. Unlike with Summingbird, however, you would need to implement the code to merge the intermediate results yourself.

Additional Info


Saturday, July 6, 2013

What's New at Hadoop Summit 2013?

YARN

YARN, the new cluster resource manager in Hadoop 2.0, was a major theme at last week's Hadoop Summit. Although the project itself is not new (in fact, it has been in development for several years), what's new is its growing adoption by the Hadoop community. YARN (Yet Another Resource Negotiator) plays a central role in Hadoop 2.0: it is the cluster resource manager that allows you to run multiple computing frameworks, such as Storm or Spark, in addition to MapReduce, all on the same Hadoop cluster.

There was evidence of community adoption of YARN throughout Hadoop Summit: (1) a keynote by Yahoo! describing their production analytics stack built on YARN (video), (2) talks about the Stinger Initiative to speed up Hive by 100x, which relies on YARN (through the new Tez framework), and (3) the announcement of YARN's release in HDP 2.0, Hortonworks' latest distribution of Hadoop.

YARN at Yahoo!

According to the Yahoo! keynote, YARN has been undergoing some serious load testing in production Yahoo! systems for personalization and ad targeting. Their YARN clusters run Storm, Spark, and HBase, in addition to MapReduce. This includes a 320-node Storm/YARN cluster that does stream processing, and an overall total of 400k YARN jobs per day. This Strata blogpost contains more details about the talk.

Tez

Tez is a new compute framework that runs on YARN. Tez improves upon MapReduce by supporting the execution of a complex DAG of tasks, beyond the simple map-reduce pattern of MapReduce. Tez is thus more suitable for expressing SQL queries, and will be leveraged to speed up Hive jobs.

Hadoop on Rasberry Pi

Hadoop Summit has traditionally been the place to brag about who has the biggest clusters; however, this LinkedIn demo goes to the opposite extreme :) 

Friday, July 5, 2013

Holding a Hackathon at Your Company

Are you interested in holding a hackathon at your company? An internal hackathon for employees can be a lot of fun. Cathy Polinsky (Salesforce.com) gave a nice introduction to the subject at last year's Grace Hopper Conference, with advice about how to run your own hackathon. Another way to get started is to hold a "ShipIt Day", modeled after Atlassian's (the software tools company) internal hackathons. This ShipIt Day FAQ provides a suggested timeline on planning and running a hackathon. For more ideas, take a look at what happens at LinkedIn Hackdays.

Sunday, May 12, 2013

Real-time Big Data Analytics with Solr

You have heard of Solr as a search engine, but did you know it can be used for real-time analytics as well? I first encountered this idea at a tutorial on Solr at Strata Conference. The ideal use case is when you want to provide real-time data ingestion and analytics on streaming data, with an all-in-one solution that is horizontally scalable and fault-tolerant. And, because of Lucene, which is built into Solr, you also get text-search based queries for free.

Example: Stock Tick

Stock tick data is an example of a fast data stream: every second stock price updates come in as different stocks are traded on an exchange. Suppose you wanted to build a dashboard that displays real-time updates of stock prices as well as other statistics, such as the moving average. You also want to support ad-hoc queries that analyze the historical performance of various stocks. Your denormalized data rows may look something like:

symbol:AAPL, time:2013-05-10 08:15:23, price:415.92, volume:900, ...

Real Time

Real-time ingestion is possible because as soon as a new data row comes in, it is immediately added and indexed in Solr. Internally, Lucene handles high-volume inserts to its indexes with a log-structured merge-tree, in which new, small index segments get merged as an index grows. To further support high insert rates, a Solr index can be split into shards across multiple nodes of a cluster. Real-time queries are answered with fast lookups on the index. Sharding also helps speed up large queries by spreading the query processing across multiple nodes.

Analytics

Solr supports many of the same queries as SQL: aggregation, boolean filters and range queries, grouping, and sorting (or, in SQL, COUNT/SUM/AVG, WHERE, GROUP BY, and ORDER BY). As a tradeoff of horizontal scalability, there are no Joins; you would need to denormalize the data to reframe those queries as aggregations. Because of its search roots, Solr also supports text search in queries (for text fields), custom scoring (ranking) of query results, and facets.

Scalable, Fault-tolerant

With SolrCloud, the support for distributed indexes in Solr, it is easy to grow a cluster horizontally. SolrCloud takes care of load balancing, replication, and automatic fail-over of shards. Internally, Zookeeper is used for distributed coordination.

Gotchas

One of the drawbacks of using an indexing system is with schema changes. Adding new fields is easy, but changing the datatype or (text) analyzer of an existing field requires a full reindex of the raw data, which can be time consuming. Another drawback is with storage of large text blocks. While text fields are well supported, Solr does not efficiently store large blocks of text, such as the full text of a news article or web page. In these cases, a separate NoSQL data store such as HBase/Cassandra may be brought in to provide that storage.

Related Systems

I only discussed Solr above, but there are a couple of other open-source, Lucene-based search engines that have similar analytics capabilities. Elasticsearch was built specifically with a focus on ease of distributed deployment, and has a JSON-based API. SenseiDB features a SQL-like query language and Hadoop integration.

More Info

Slides from Strata tutorial.



Wednesday, March 6, 2013

Strata Conference 2013 Wrap-up

Introduction

Strata has been one of the best conferences for data science, and this year's conference did not disappoint. It brought together developers, data scientists, startups, and business people who are interested in "making data work". It was divided into seven tracks, including design and "Hadoop in Practice". Spending most of my time in the "Beyond Hadoop" and "Data Science" tracks, I noticed one of the themes this year was real-time data processing.

Tutorial: Search and Real Time Analytics (slides)

This was a really good tutorial presented by Ryan Tabora (Think Big Analytics) and Jason Rutherglen (Datastax). I learned that in addition to search, Solr has support for real time analytics: the equivalents of sort and group by queries in SQL (you can't do joins, however). An example application would be ad-hoc queries on streaming stock tick data. The second half of the tutorial was an in-depth look at Lucene and some use cases (O'Reilly book coming soon). Rutherglen also talked about the DataStax Enterprise platform which integrates Solr with Cassandra for scalability: Cassandra is the NoSQL data store for the raw data, and each Cassandra row maps into a Solr document.

Tutorial: Core Data Science Skills (code & slides)

This was an interesting tutorial that introduced the basic methods and tools of supervised machine learning. It was led by William Cukierski and Ben Hamner, both from Kaggle. They talked about decision trees, random forests, and naive Bayes classifiers as the basic algorithms. And, they demo'ed  analyses in R with R-Studio, and Python with IPython Notebook. The coolest part was the last hour, when all attendees practiced these skills by participating in a real Kaggle competition.

Keynotes

All of the keynote speakers were really good. I'll just highlight:

  • Human Fault-tolerance (slides & video): Nathan Marz (Twitter) talked about the importance of immutability in distributed system design. I'm hoping to read more about it in his book on Big Data.
  • Hidden Biases of Big Data (video): Kate Crawford (Microsoft Research) warned us that big data often does not tell the whole story, that context and small data are also needed.

Sketching Techniques for Real-time Big Data (slides)

Bahman Bahmani (Stanford) explained that sketches of data are useful in streaming computation because they take up little memory, and allow for fast updates and queries. One example of a sketching data structure is a bloom filter. Bahmani described sketches for fast approximate counting as well as on-the-fly PageRank computation.

Sight, a Short Film (video)

This was an amazing short film depicting a futuristic augmented reality. It envisions a future that brings together many of the ideas from the conference: mobile, connected world, recommendations, gamification, and ubiquitous Internet of things.

Real Time Systems

There were a number of talks about ingesting and processing big data in real time. I'll cover them in a future post.