In the first article of this series we discussed the history and background of the Big Data movement. We tentatively divided the Big Data techniques into two categories, Big Data Processing and Big Data Persistence. This article introduces you to the Big Data processing techniques addressing but not limited to various BI (business intelligence) requirements, such as reporting, batch analytics, online analytical processing (OLAP), data mining, text mining, complex event processing (CEP), and predictive analytics.
The source data is always read-only from the perspective of a data processing/analytics system. That brings you one important benefit, fault tolerance. Data can’t be accidently tampered with or deleted by program errors, or even by human faults through appropriate security settings. Often the data to be processed is time series facts (events, termed in BI). Time series facts reflecting the history of data changes are more reliable and valuable than the only current state. For instance, a social networking website keeps the history of adding/removing friend events for all the users. This raw data carries more information than the current friend list of any user. In fact, the current state (friend list) can be derived by aggregating all these events (adding/removing friends). You may argue that executing the aggregate function to derive a friend list is slow and costly, an option is to create a snapshot of the friend list during certain time interval and save it as a materialized view, which can be merged with the new events created post the snapshot to generate the up-to-data friend list.
Big Data processing techniques analyze big data sets at terabyte or even petabyte scale. Offline batch data processing is typically full power and full scale, tackling arbitrary BI use cases. While real-time stream processing is performed on the most current slice of data for data profiling to pick outliers, fraud transaction detections, security monitoring, etc. The toughest task however is to do fast (low latency) or real-time ad-hoc analytics on a complete big data set. It practically means you need to scan terabytes (or even more) of data within seconds. This is only possible when data is processed with high parallelism.
Batch Processing of Big Data
Apache Hadoop is a distributed computing framework modeled after Google MapReduce to process large amounts of data in parallel. Once in a while, the first thing that comes to my mind when speaking about distributed computing is EJB. EJB is de facto a component model with remoting capability but short of the critical features being a distributed computing framework, that include computational parallelization, work distribution, and tolerance to unreliable hardware and software. Hadoop on the other hand has these merits built-in. ZooKeeper modeled on Google Chubby is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and group services for the Hadoop cluster. Hadoop Distributed File System (HFDS) modeled on Google GFS is the underlying file system of a Hadoop cluster. HDFS works more efficiently with a few large data files than numerous small files. A real-world Hadoop job typically takes minutes to hours to complete, therefore Hadoop is not for real-time analytics, but rather for offline, batch data processing. Recently, Hadoop has undergone a complete overhaul for improved maintainability and manageability. Something called YARN (Yet Another Resource Negotiator) is at the center of this change. One major objective of Hadoop YARN is to decouple Hadoop from MapReduce paradigm to accommodate other parallel computing models, such as MPI (Message Passing Interface) and Spark.
Move-Code-To-Data Philosophy
In general, data flows from components to components in an enterprise application. This is the case for application frameworks (EJB and Spring framework), integration engines (Camel and Spring Integration), as well as ESB (Enterprise Service Bus) products. Nevertheless, for the data-intensive processes Hadoop deals with, it makes better sense to load a big data set once and perform various analysis jobs locally to minimize IO and network cost, the so-called “Move-Code-To-Data” philosophy. When you load a big data file to HDFS, the file is split into chunks (or file blocks) through a centralized Name Node (master node) and resides on individual Data Nodes (slave nodes) in the Hadoop cluster for parallel processing.
Map and Reduce
A centralized JobTracker process in the Hadoop cluster moves your code to data. The code hereby includes a Map and a Reduce class. Put simply, a Map class does the heavy-lifting job of data filtering, transformation, and splitting. For better IO and network efficiency, a Mapper instance only processes the data chunks co-located on the same data node, a concept termed data locality (or data proximity). Mappers can run in parallel on all the available data nodes in the cluster. The outputs of the Mappers from different nodes are shuffled through a particular algorithm to the appropriate Reduce nodes. A Reduce class by nature is an aggregator. The number of Reducer instances is configurable to developers.
Word Count Example
A rudimentary “word count” example exhibits how Hadoop runs. The objective is to count the number of times each word is presented in a set of text documents. As we know Hadoop and HDFS work better with a few large files; the first step is to merge all the small text files through the HDFS API to become a single big file, which is further broken down evenly into chunks (file blocks) by the Name Node. These file blocks are distributed across the data nodes in the cluster. Each Map instance takes a line from its local file blocks as input and splits it into words. It then emits a key-value pair of the word and count 1. The outputs with the same key (word) are shuffled to the same Reduce node. A Reduce node sums the counts for every word received and emits a single key-value pair with the word and the total count. While Map phase runs with high parallelism, Reduce phase reconciles the outputs from the Mappers to yield the final results.
Hadoop Ecosystem
Hadoop API is often considered low level, as it is not easy to program with. The quickly growing Hadoop ecosystem offers a list of abstraction techniques, which encapsulate and hide the programming complexity of Hadoop. Pig, Hive, Cascading, Crunch, Scrunch, Scalding, Scoobi, and Cascalog all aim to provide low cost entry to Hadoop programming.
Pig, Crunch (Scrunch), and Cascading are data-pipe based techniques. A data pipe is a multi-stepped process, in which transformation, splitting, merging, and join may be conducted individually at each step. Thinking about a work flow in a general work flow engine, a data pipe is similar. Hive on the other hand works like a data warehouse by offering a SQL compatible interactive shell. Programs or shell scripts developed on top of these techniques are compiled to native Hadoop Map and Reduce classes behind the scene to run in the cluster. Given the simplified programming interfaces in conjunction with libraries of reusable functions, development productivity is greatly improved.
Hadoop plays the heavy lifting role in an enterprise big data solution. Anything collaborating with Hadoop has to be on the same scale. Consequently, new tools in the conventional enterprise computing domain have been developed with enhanced throughput and capacity. For instance, new RPC techniques for passing data to Hadoop were invented, such as, Google Protocol Buffers, Apache Avro, Apache Thrift, and MessagePack. They work like SOAP but are more compact in terms of data model and transportation. Bear in mind that you are working with Big Data, any overhead would be magnified linearly along with the growing size of data. File Slurper open source project can copy data files of any format in and out of HDFS. Apache Sqoop is for relational database connectivity, where a database can be either the data source (input) or the data sink (output) of Hadoop. Apache Kafka, Apache Flume, Facebook Scribe, and Apache Chukwa work effectively as a distributed messaging system, which collects and aggregates data from any server cluster, and feeds to Hadoop for analysis. Oozie is a Hadoop workflow engine that manages multi-stepped data processing activities. The DSL in XML format may remind you of EAI technologies like Spring Batch, Spring Integration, and Camel. Apache Mahout is a scalable machine learning and data mining library for predictive analysis. Machine learning requires a training step with some training data sets (for instance a list of spam emails). Mahout can make predictions on unseen documents in Hadoop after being trained (screening spam emails). It is worth mentioning Cloudera Hue, a Web GUI tool for interacting with Hadoop and its ecosystem, Pig, Hive, Oozie, as well as Impala etc.
Big data processing in Hadoop is fully featured, but with significant latency. While in many circumstances, for instance, detection of credit-card fraud, algorithmic stock-trading, screening spam emails, and business activity monitoring, data (time series facts) must be processed at real time. These activities are termed complex event processing/event stream processing (CEP/ESP) in BI.
Big Data Stream Processing
Twitter Storm is an open source, big-data processing system intended for distributed, real-time streaming processing. Storm implements a data flow model in which data (time series facts) flows continuously through a topology (a network of transformation entities). The slice of data being analyzed at any moment in an aggregate function is specified by a sliding window, a concept in CEP/ESP. A sliding window may be like “last hour”, or “last 24 hours”, which is constantly shifting over time. Data can be fed to Storm through distributed messaging queues like Kafka, Kestrel, and even regular JMS. Trident is an abstraction API of Storm that makes it easier to use. Like Twitter Storm, Apache S4 is a product for distributed, scalable, continuous, stream data processing.
Note, the size of a sliding window cannot grow infinitely. Stream data processing is not intended to analyze a full big data set, nor is it capable of storing that amount of data (The Storm-on-YARN project is an exception). While you may be asked to build a real-time ad-hoc analytics system that operates on a complete big data set, you really need some mighty tools.
Fast/Real-Time Big Data Processing
Big data OLAP (OnLine Analytical Processing) is extremely data and CPU intensive in that terabytes (or even more) of data are scanned to compute arbitrary data aggregates within seconds. Note indexing is indeed not helpful in a full “table” scan; in addition, building an index on a big data set is costly and slow.
Google Dremel (BigQuery), Cloudera Impala, Apache Drill
Cloudera Impala and Apache Drill are modeled after Google Dremel. These techniques run fast in that coordination, query planning, optimization, scheduling, and execution are all distributed throughout nodes in a cluster to maximize parallelization. All three techniques favor a query-efficient columnar storage format. Before saving data directly in a columnar format in a data store (NOSQL, NewSQL, Relational, and more), you may need to transform the existing row-based data through a MapReduce job. Full “table” scan-based ad-hoc queries are offered but with certain limitations. Therefore, they are not a replacement of Hadoop. Supporting in situ (in position) data sources like GFS, BigTable, HDFS, and HBase makes data access blazing faster because of data locality (proximity). With the data source being an OLTP database (BigTable, HBase), a write made by an end user is reflected instantaneously in an analysis report. Such architecture brings you a Big Data OLAP system with typical latency in seconds range. To save resources, it is recommended to build a materialized view (cached result) on an analysis job, and return that view if no changes are expected on the result. Impala and Drill have nice integration with commercial BI/Analysis tools like Tableau, MicroStrategy, Excel, SAP, and more.
Exploring the Lambda Architecture
Lambda Architecture proposed by Nathan Marz takes a very unique approach from the three tools above. It solves the problem of computing arbitrary functions on a big data set in real-time by decomposing the problem into three layers: the batch layer, the serving layer, and the speed layer.
Batch layer is implemented as a Hadoop cluster with JCascalog the abstraction API. Data in the Hadoop cluster is periodically updated by merging incremental changes. Queries are re-computed from scratch after each update. The results are called batch views.
Serving layer saves the batch views in a Splout SQL (or ElephantDB). Batch views are indexed for super-fast access. Evidently, batch views are not real time.
Speed layer is implemented by Storm (Trident), which computes ad-hoc functions on a data stream (time series facts) in real-time. The result from the incremental changes confined to a sliding window is then merged with the materialized batch view from the serving layer to generate up-to-date analysis results.
Resource wise, this design is seemingly very cost-efficient. Being a Big Data OLAP solution, Lambda Architecture works on real-time data streams (time series facts), rather than in situ OLTP databases. It is worth noting that there is a onetime Hadoop latency when a new ad-hoc query is first-time launched. Subsequent executions of the same query are real-time.
Supports from the Spring Ecosystem
Spring Hadoop simplifies creating Hadoop-based applications in Java with Spring XML/Java configurations. Being a Spring user, you also benefit from SpEL (Spring Expression Language) and support to other scripting languages like JRuby, Jython, Groovy, Scala, and Clojure. You may run parameterized HiveQL in Java rather than command line, and launch a Hadoop job through the web. Values of the parameters can be assigned at runtime. Like in Oozie, big data pipelines (work flows) may be defined in XML syntax with Spring Batch and Spring Integration. Spring Data library helps in terms of modularity, productivity, portability, and testability. Spring Social library enables integration with popular SaaS providers like Facebook, Twitter, and LinkedIn.
Administrative Tools
In general, big data techniques come with some sort of administrative interfaces, which allow developers to monitor the real-time status of the distributed system, and troubleshoot various issues. Linux/Unix command line tools, such as top, iostat, and netstat, are also handy in identifying a root cause of an issue. Commercial tools like Nagios, Ganglia, Epic, and DynaTrace are visualized, comprehensive, and scalable for distributed system monitoring, performance profiling, and troubleshooting.
Running Big Data in Cloud
The elastic nature of the cloud makes it a very cost-efficient computing environment for big data. A rule of thumb is to test your code thoroughly before deploying it to the cloud environment like Amazon AWS and Rackspace. Apache Whirr project provides a cloud-neutral way to run services through a common service API. It hides the idiosyncrasies of each cloud service provider. The smart defaults for different cloud services help launch a properly configured environment quickly.
Summary
In this article, the author groups big data processing/analytics technologies into three categories, batch, stream, and real time, addressing different BI/analysis use cases. With a complete technology landscape in mind, you will be able to pick the appropriate tool as part of your enterprise data solutions.
About the Author
Xinyu Liu, as a Sun Microsystems certified enterprise architect, Xinyu Liu has intensive software design and development experience with cutting-edge server-side technologies. He took his graduate degree from George Washington University and currently serves as the application architect for Virginia Workers’ Compensation Commission. Dr. Liu has written for Java.net, JavaWorld.com, and IBM developerWorks on topics such as JSF, Spring Security, Hibernate Search, Spring Web Flow, the Servlet 3.0 specification, and Drools Rule Engine. He also has worked for Packt Publishing reviewing the books Spring Web Flow 2 Web Development, Grails 1.1 Web Application Development, and Application Development for IBM WebSphere Process Server 7 and Enterprise Service Bus 7.
Resources
- Clustering for High Availability (HA) with JBoss AS7: Online presentation from JBoss.
- Hibernate Shards: Simplifying Hibernate programming with sharded databases.
- Google MapReduce: Simplified data processing on large clusters.
- Google BigTable: A distributed storage system for managing structured data.
- Apache Hadoop: A reliable, scalable, distributed computing framework.
- Apache HBase: A distributed, scalable, big data store.
- Apache Cassandra: A distributed, scalable, big data store.
- MongoDB: A JSON document database.
- Neo4j: A graph database.
- Twitter Storm: A free and open source distributed realtime computation system.
- Apache S4: A general-purpose, distributed, and scalable data stream processing system.
- Google Dremel: Interactive analysis of web-scale datasets.
- Cloudera Impala: Real-time queries in Apache Hadoop.
- Apache Drill: Provide low latency ad-hoc queries to many different data sources.
- Big Data: Introduce the Lambda Architecture.
- Kundera: JPA 1.0 ORM library for the Cassandra/Hbase/MongoDB database.
- Apache Gora: An in-memory data model and persistence for big data.
- Spring Data: Modern data access for enterprise Java.
- Spring XD: A unified, distributed, and extensible system for data ingestion, real time analytics, batch processing, and data export.
- Spring Social: Connect your applications with SaaS providers such as Facebook and Twitter.
- Hadoop in Action: A book on Hadoop.
- Hadoop in Practice: A book on Hadoop ecosystem.
- HBase in Action: A book on HBase.
- MongoDB in Action: A book on MongoDB.
- NoSQL Distilled: A brief guide to the emerging world of polyglot persistence.