Tuesday, January 14, 2014

Scaling Realtime Analytics on Big Data

Scaling Realtime Analytics

How do you scale realtime analytics for big data? Consider the case where big data consists of both incoming streaming data, as well as historical data, and you are providing analytics in realtime over the entire data set. In other words, the data is big, and growing quickly, while the desired latency is small. To implement this at scale you can distribute the processing over a cluster of machines. Of course, distributed stream processing comes with some challenges: implementing incremental algorithms, ensuring data consistency, and making the system robust by recovering quickly from node failures and dealing with dropped or duplicate data.

Lambda Architecture

One solution to this problem is the Lambda architecture, as described by Nathan Marz (formerly of BackType, Twitter). This architecture splits up the analytics calculation into a batch layer and a speed (streaming) layer. In the batch layer, as much of the calculation as possible is precomputed with batch processing over historical data. As the dataset grows, the calculation is constantly recomputed. The speed layer applies stream processing on only the most recent data. In this way, most of the processing takes place in batch jobs, in a platform such as Hadoop, while the complexity of stream processing is minimized to affect only the latest data. Errors or approximations that arise during stream processing are corrected when the data is later reprocessed in the batch layer. A serving layer combines the intermediate results from the batch and speed layers to produce the final analytics.

From Nathan Marz's presentation "Runaway Complexity in Big Data"

The robustness of this approach relies on two things: (1) immutable data and (2) continuous recomputation. That is, a master dataset of all the raw data is kept around indefinitely, and, as this dataset grows, calculations are constantly recomputed over the entire data set. Any failures, bugs, approximations, or missing features in the data processing are easily corrected because you are always recomputing everything from scratch. This architecture aligns well with the fundamental assumptions of big data: horizontally scalable data storage and computation. You can read more about the motivation behind this architecture in Nathan Marz's blog post. It is also described in detail in the book Big Data.

Lambda Architecture in Practice

In practice, you could run the batch layer on Hadoop, the speed (stream processing) layer on Storm, and store the results in a scalable database such as Cassandra. These intermediate results are read and merged by the serving layer.

Avoiding Code Duplication

Many applications will use the same processing logic in both the batch and speed layers. For example, to count the number of page views of a URL, in the batch layer you might implement a Hadoop Map function that adds one for each new page view, and in the speed layer write a Storm bolt that does the same thing. In a real world application, this could lead to implementing the same business logic twice: once for the batch layer and again for the speed layer. To avoid duplicate code, there are a couple of options: (1) adopting the Summingbird framework or (2) running Spark and Spark Streaming.

Summingbird

Summingbird (a Twitter open source project) is a library that lets you write MapReduce programs in a Scala DSL, which can then be executed on different batch and stream processing platforms, such as Scalding (Hadoop) or Storm. It also provides a framework for the Lambda architecture. All you have to do is write your processing logic as a MapReduce program. Then you configure Summingbird, in its hybrid batch/realtime mode, to execute this program on Scalding, for the batch layer, and Storm, for the speed layer, and then to merge the results into a final analytic.

Spark and Spark Streaming


Alternatively, instead of Hadoop/Storm, you could run the batch layer on Spark and the streaming layer on Spark Streaming (both open source projects from UC Berkeley). Because both platforms share the same Scala API, you would be able to reuse the same code for both the batch and speed layers. Unlike with Summingbird, however, you would need to implement the code to merge the intermediate results yourself.

Additional Info


Saturday, July 6, 2013

What's New at Hadoop Summit 2013?

YARN

YARN, the new cluster resource manager in Hadoop 2.0, was a major theme at last week's Hadoop Summit. Although the project itself is not new (in fact, it has been in development for several years), what's new is its growing adoption by the Hadoop community. YARN (Yet Another Resource Negotiator) plays a central role in Hadoop 2.0: it is the cluster resource manager that allows you to run multiple computing frameworks, such as Storm or Spark, in addition to MapReduce, all on the same Hadoop cluster.

There was evidence of community adoption of YARN throughout Hadoop Summit: (1) a keynote by Yahoo! describing their production analytics stack built on YARN (video), (2) talks about the Stinger Initiative to speed up Hive by 100x, which relies on YARN (through the new Tez framework), and (3) the announcement of YARN's release in HDP 2.0, Hortonworks' latest distribution of Hadoop.

YARN at Yahoo!

According to the Yahoo! keynote, YARN has been undergoing some serious load testing in production Yahoo! systems for personalization and ad targeting. Their YARN clusters run Storm, Spark, and HBase, in addition to MapReduce. This includes a 320-node Storm/YARN cluster that does stream processing, and an overall total of 400k YARN jobs per day. This Strata blogpost contains more details about the talk.

Tez

Tez is a new compute framework that runs on YARN. Tez improves upon MapReduce by supporting the execution of a complex DAG of tasks, beyond the simple map-reduce pattern of MapReduce. Tez is thus more suitable for expressing SQL queries, and will be leveraged to speed up Hive jobs.

Hadoop on Rasberry Pi

Hadoop Summit has traditionally been the place to brag about who has the biggest clusters; however, this LinkedIn demo goes to the opposite extreme :) 

Friday, July 5, 2013

Holding a Hackathon at Your Company

Are you interested in holding a hackathon at your company? An internal hackathon for employees can be a lot of fun. Cathy Polinsky (Salesforce.com) gave a nice introduction to the subject at last year's Grace Hopper Conference, with advice about how to run your own hackathon. Another way to get started is to hold a "ShipIt Day", modeled after Atlassian's (the software tools company) internal hackathons. This ShipIt Day FAQ provides a suggested timeline on planning and running a hackathon. For more ideas, take a look at what happens at LinkedIn Hackdays.

Sunday, May 12, 2013

Real-time Big Data Analytics with Solr

You have heard of Solr as a search engine, but did you know it can be used for real-time analytics as well? I first encountered this idea at a tutorial on Solr at Strata Conference. The ideal use case is when you want to provide real-time data ingestion and analytics on streaming data, with an all-in-one solution that is horizontally scalable and fault-tolerant. And, because of Lucene, which is built into Solr, you also get text-search based queries for free.

Example: Stock Tick

Stock tick data is an example of a fast data stream: every second stock price updates come in as different stocks are traded on an exchange. Suppose you wanted to build a dashboard that displays real-time updates of stock prices as well as other statistics, such as the moving average. You also want to support ad-hoc queries that analyze the historical performance of various stocks. Your denormalized data rows may look something like:

symbol:AAPL, time:2013-05-10 08:15:23, price:415.92, volume:900, ...

Real Time

Real-time ingestion is possible because as soon as a new data row comes in, it is immediately added and indexed in Solr. Internally, Lucene handles high-volume inserts to its indexes with a log-structured merge-tree, in which new, small index segments get merged as an index grows. To further support high insert rates, a Solr index can be split into shards across multiple nodes of a cluster. Real-time queries are answered with fast lookups on the index. Sharding also helps speed up large queries by spreading the query processing across multiple nodes.

Analytics

Solr supports many of the same queries as SQL: aggregation, boolean filters and range queries, grouping, and sorting (or, in SQL, COUNT/SUM/AVG, WHERE, GROUP BY, and ORDER BY). As a tradeoff of horizontal scalability, there are no Joins; you would need to denormalize the data to reframe those queries as aggregations. Because of its search roots, Solr also supports text search in queries (for text fields), custom scoring (ranking) of query results, and facets.

Scalable, Fault-tolerant

With SolrCloud, the support for distributed indexes in Solr, it is easy to grow a cluster horizontally. SolrCloud takes care of load balancing, replication, and automatic fail-over of shards. Internally, Zookeeper is used for distributed coordination.

Gotchas

One of the drawbacks of using an indexing system is with schema changes. Adding new fields is easy, but changing the datatype or (text) analyzer of an existing field requires a full reindex of the raw data, which can be time consuming. Another drawback is with storage of large text blocks. While text fields are well supported, Solr does not efficiently store large blocks of text, such as the full text of a news article or web page. In these cases, a separate NoSQL data store such as HBase/Cassandra may be brought in to provide that storage.

Related Systems

I only discussed Solr above, but there are a couple of other open-source, Lucene-based search engines that have similar analytics capabilities. Elasticsearch was built specifically with a focus on ease of distributed deployment, and has a JSON-based API. SenseiDB features a SQL-like query language and Hadoop integration.

More Info

Slides from Strata tutorial.



Wednesday, March 6, 2013

Strata Conference 2013 Wrap-up

Introduction

Strata has been one of the best conferences for data science, and this year's conference did not disappoint. It brought together developers, data scientists, startups, and business people who are interested in "making data work". It was divided into seven tracks, including design and "Hadoop in Practice". Spending most of my time in the "Beyond Hadoop" and "Data Science" tracks, I noticed one of the themes this year was real-time data processing.

Tutorial: Search and Real Time Analytics (slides)

This was a really good tutorial presented by Ryan Tabora (Think Big Analytics) and Jason Rutherglen (Datastax). I learned that in addition to search, Solr has support for real time analytics: the equivalents of sort and group by queries in SQL (you can't do joins, however). An example application would be ad-hoc queries on streaming stock tick data. The second half of the tutorial was an in-depth look at Lucene and some use cases (O'Reilly book coming soon). Rutherglen also talked about the DataStax Enterprise platform which integrates Solr with Cassandra for scalability: Cassandra is the NoSQL data store for the raw data, and each Cassandra row maps into a Solr document.

Tutorial: Core Data Science Skills (code & slides)

This was an interesting tutorial that introduced the basic methods and tools of supervised machine learning. It was led by William Cukierski and Ben Hamner, both from Kaggle. They talked about decision trees, random forests, and naive Bayes classifiers as the basic algorithms. And, they demo'ed  analyses in R with R-Studio, and Python with IPython Notebook. The coolest part was the last hour, when all attendees practiced these skills by participating in a real Kaggle competition.

Keynotes

All of the keynote speakers were really good. I'll just highlight:

  • Human Fault-tolerance (slides & video): Nathan Marz (Twitter) talked about the importance of immutability in distributed system design. I'm hoping to read more about it in his book on Big Data.
  • Hidden Biases of Big Data (video): Kate Crawford (Microsoft Research) warned us that big data often does not tell the whole story, that context and small data are also needed.

Sketching Techniques for Real-time Big Data (slides)

Bahman Bahmani (Stanford) explained that sketches of data are useful in streaming computation because they take up little memory, and allow for fast updates and queries. One example of a sketching data structure is a bloom filter. Bahmani described sketches for fast approximate counting as well as on-the-fly PageRank computation.

Sight, a Short Film (video)

This was an amazing short film depicting a futuristic augmented reality. It envisions a future that brings together many of the ideas from the conference: mobile, connected world, recommendations, gamification, and ubiquitous Internet of things.

Real Time Systems

There were a number of talks about ingesting and processing big data in real time. I'll cover them in a future post.

Monday, December 10, 2012

Scalable Graph Processing: Comparing Giraph, Titan, Faunus

A couple of open-source, scalable graph processing frameworks were released this year: Giraph, Titan, and Faunus. All of them support distributed processing of graphs across a cluster of multiple machines. What then is the difference between them? Actually, there are huge differences related to the types of queries they support, as well as how graphs are stored and their APIs.

Titan, for concurrent transaction processing (OLTP) 

Titan is optimized for handling thousands of concurrent transactions against massive, dynamic, complex graphs. As such, it really fills a niche in the distributed graph landscape. Titan works best with multiple short, local queries, that start at one vertex and query the local neighborhood around it, as opposed to global queries against an entire graph. An example use case would be a graph representation of a social network like Twitter, where a node represented either a user or a tweet, and edges linked a user to the users and tweets that they followed. In this case, the concurrent transactions would include reads, such as a user reading their tweet stream, as well as writes: any new tweet or user, or a new "follow" edge. In fact, this experiment by Aurelius shows how Titan achieves up to ten thousand transactions per second on a six-node EC2 cluster.

Titan's modular design: Cassandra/HBase and Gremlin

One big plus for Titan is instead of reinventing the wheel, it builds upon existing open-source projects. It cleanly abstracts out the distributed data store layer and thus can leverage either Cassandra or HBase for the underlying storage backend. On the application side, Titan implements the existing Tinkerpop Blueprints graph API, so that any existing libraries built to Blueprints will run on Titan. In particular, this means that one can quickly get started programming Titan through Gremlin, a popular graph DSL built on Groovy.



Giraph, for batch processing (OLAP)

Giraph, on the other hand, is optimized for batch processing of massive graphs. It was designed to speed up global graph analyses, especially iterative algorithms - like PageRank - that start from a set of vertices and take a series of adjacent hops out from those starting points, potentially touching all vertices in the graph. An example use case would be a graph representation of a social network, where an analysis might involve ranking all the people in the network based on their popularity with a link-based algorithm like PageRank.

Giraph keeps the entire graph in memory; Java API

One of Giraph's main selling points is that it loads an entire graph into a cluster's memory, that is, it splits up a graph into partitions and spreads those partitions across the memory of nodes in a cluster, and keeps them there throughout the duration of an analysis job. Therefore, iterative algorithms that constantly reference graph data run faster by not having to access them from disk. Of course, the tradeoff is that the size of the graph that can be processed by Giraph is limited by the total memory in a cluster. Giraph applications are programmed through a Java API. My previous blog post covers more details about Giraph.

Faunus, for batch processing (OLAP)

Faunus, a companion project to Titan, is more comparable to Giraph, in that it was also designed for batch queries. What Faunus does best is batch analytics on massive graphs. One example would be calculating the vertex degree distribution of a graph, which would involve scanning through all its vertices to get their degree and then counting up the degree distribution.

Faunus runs Hadoop MapReduce; Interactive Gremlin shell

Batch analytics in Faunus leverages the parallelism of Hadoop MapReduce. Like Titan, Faunus can be programmed through the interactive Gremlin shell, but, unlike Titan, Faunus converts each Gremlin query into a set of MapReduce jobs. The graph data is stored on disk, distributed across a cluster on HDFS, the Hadoop distributed file system, in a compressed, binary format called SequenceFile format, allowing support for massive graphs. With the MapReduce conversion and interactive shell, Faunus is a lot like Pig, except that it is programmed in Gremlin, a graph-specific DSL.

Saturday, November 10, 2012

Mervin Kelly on Innovation


Long before Jack Dorsey and Steve Jobs reflected on the keys to innovation, Mervin Kelly, one of the early architects of Bell Labs, weighed in on the subject. I learned this from Jon Gertner's "The Idea Factory", a fascinating account of Bell Labs' age of innovation. In particular, Chapter 9, "The Formula" describes a speech that Kelly gave to the Royal Society in London in 1950, shortly after the invention of the transistor at Bell Labs, which highlighted his thoughts on innovation. At this time Bell Labs was widely considered to be the most innovative industrial research lab in the world. After years as a physicist and then director of research, Kelly was by then an executive vice president at Bell Labs.

Interestingly, the formula for innovation does not seem to have changed much over the years. Kelly considered these three ingredients essential to innovation:

Bring together a multi-disciplinary team of smart people. Kelly encouraged the constant exchange of ideas between researchers, engineers, and technicians. These exchanges could be formal or informal; they could take place in the hallway, over lunch, or within assigned multi-disciplinary project teams. And, ideally, these exchanges were in person; the Murray Hill building was even designed with this in mind, featuring long hallways that increased the chance of serendipitous encounters.

Give them real-world, challenging problems to solve. At Bell Labs, there was no shortage of problems related to supporting AT&T's communications business: from inventing ways to save money - by replacing fragile vacuum tubes with transistors, for example, to coming up with better products and services. In addition, during WWII, Bell Labs applied its expertise in technologies, such as radar, to military applications.

Give them the best technological tools and a stable source of funding. Last but not least, innovation at Bell Labs relied on a stable source of funding coming from the phone companies that it supported.

Further reading. Jon Gertner's New York Times' op-ed provides more details from his book. The Wikipedia entry for Bell Labs lists its long history of innovation.