Apache Spark is an open-source unified analytics engine that reduces the time between data acquisition and business insights delivery. Technical professionals can create batch and streaming pipelines, data transformation, machine learning and analytical reporting using common APIs.
- Apache Spark can accelerate ingestion of semistructured, unstructured and structured data from multiple sources, enabling real-time analysis and decision making.
- Ever-increasing data volumes put traditional batch data transformation processing jobs at risk of not completing in their allotted time frames. Spark’s ability to do in-memory transformation reduces batch-processing times, supports high-velocity ingestion and enables additional tasks like data enrichment and reporting.
- The artificial intelligence (AI) and machine learning (ML) requirements of intensive processing power on huge datasets are effectively supported by Spark.
- Spark automates data pipeline processes and moves data seamlessly between disparate data stores and other applications. Its unified architecture, availability of connectors to a large number of systems and simplified APIs make this possible.
Data and analytics technical professionals looking to deliver high-impact big data initiatives should:
- Deliver near-real-time data ingestion and analytics capabilities by adopting Spark as a scalable stream processing engine.
- Perform rapid batch processing by deploying Spark to execute extraction, transformation and loading tasks, data curation and data enrichment functions either directly or through tools with Spark embedded.
- Implement support for Internet of Things (IoT) applications and ML by leveraging Spark’s in-memory distributed processing capability and intuitive APIs.
- Optimize data pipelines by using Spark as the glue to tie together best-in-class technologies such as Apache Kafka, Apache Flume, object stores, relational and nonrelational data stores, and analytical tools.
Apache Hadoop was introduced in 2006 as the solution to big data problems. It introduced a highly distributed shared-nothing architecture as a cost-effective platform. This allowed large amounts of data to be distributed across many nodes within a cluster, with each node performing MapReduce functions to its respective chunk of data. As more nodes were added to the cluster, Hadoop provided almost linear scalability.
So, what went wrong? Hadoop was designed to handle batch processing, and although it excelled at what it was designed for, the world moved on quickly. As the appetite for data increased, Hadoop’s high latency and low-level APIs became its Achilles heel.
Enter Spark. Spark provides a much more efficient in-memory and pipelined distributed architecture that allows big data jobs to execute much faster than the ones based on MapReduce. It should be noted upfront that Spark does not have a persistence layer. Consequently, it must rely on either Hadoop Distributed File System (HDFS) or some other persistence layer to read data from or write output to.
This research focuses on the architecture and use cases of Apache Spark. It does not delve into the programming and administration of Spark. It is not intended to be a pedagogical discourse on Spark. Instead, it provides technical professionals with a clear understanding of Spark’s capabilities and its pragmatic uses.
2018 brings changes to the Spark ecosystem that further enhance its capabilities. In addition to fixing about 1,400 Jira tickets, it has introduced new features that are listed throughout this research. This research was originally published in October 2017, when the community had released version 2.2. Spark 2.2 was a groundbreaking version that introduced the underpinning of new technologies such as structured streaming. Spark 2.3 takes those changes further by introducing Continuous Processing. This is just one example of Spark’s continuous innovations. As described in this document, the latest release introduces many experimental features that will be generally available (GA) in future releases. However, the general theme of the Spark’s progression is to make the unified engine more stable, reliable, faster and, most of all, easier to use.
What Is Spark?
Spark is an in-memory clustered compute engine that is a replacement for MapReduce. If that is all it is, then why so much hoopla around Spark? Why has Spark become one of the fastest-growing projects in Apache Software Foundation (ASF)?
The main reason is that Spark is a unified engine with easy-to-use APIs that serve streaming, analytical, ML and data transformation use cases in a single architecture. In addition, it is integrated with many big data technologies such as the Hadoop ecosystem, various streaming and messaging engines such as Kafka, and ML platforms. Finally, Spark can run on top of the most common databases and data stores. In 2018, Spark 2.3 now has the deep learning frameworks and libraries that can train on the same cluster as the rest of Spark — using the same APIs.
Spark originated in 2009 in the University of California, Berkeley’s AMPLab as a tool to demonstrate cluster management software, Apache Mesos. It was created as an in-memory processing engine to overcome the limited performance of disk-based options. In other words, Spark was developed as adjunct software. In 2010, its developers open-sourced it under a Berkeley Software Distribution (BSD) license. The “AMP” in AMPLab stands for algorithms, machines and people.
As Spark found adoption as a stand-alone product, it was added to the ASF’s repertoire in 2012 and made available under the Apache 2.0 license. The ASF is made up of all-volunteer developers and stewards, and it manages more than 350 open-source projects and initiatives. Spark became a “top-level project” in 2014.
In 2013, some of the original developers of Spark founded Databricks to continue developing the platform while offering a commercial version of the software. In 2018, with the launch of Spark 2.3, Databricks launched its own platform called Databricks Runtime 4.0.
When the AMPLab developers first started developing Spark, they chose a concise programming language called Scala for coding. Today, Spark provides APIs in Java, Python and R, in addition to Scala.
As mentioned before, the current release of Spark is version 2.3, which was released in February2018.
Please note that UC Berkeley’s AMPLab fulfilled its mission in 2016 and was ramped down. RISELab has been established in its place. The “RISE” in RISELab stands for real-time intelligent secure execution.
Is Spark Going to Displace Hadoop?
Let’s get this point out of the way quickly so we can start focusing on the more collaborative aspects of both Hadoop and Spark.
First, a bit of history helps in understanding the context. Hadoop rose out of a paper written by Google employees on MapReduce in 2004. It was quickly adopted by Yahoo!, and it became part of the ASF. It was first commercialized by Cloudera, which was quickly followed by a number of distributions from companies such as Hortonworks and Amazon Web Services’ (AWS’) Elastic MapReduce (EMR). MapR Technologies also launched a highly distributed software that provided Hadoop compatibility.
Before we answer the question posed in this section, it is critical to understand that Apache Hadoop can be defined in two ways:
- Core Hadoopconsists of the data store called Hadoop Distributed File System, or HDFS and the execution engine called MapReduce. In a subsequent version of Hadoop, a resource manager called Yet Another Resource Negotiator (YARN) was added.
- The Hadoop ecosystemconsists of HDFS, MapReduce, YARN and a panoply of technologies required to handle the entire data pipeline for leveraging massive scales of data toward analytics.
If we compare Spark to the Hadoop core, especially to MapReduce, then Spark is indeed a better candidate for the majority of distributed processing jobs. But if we consider the Hadoop ecosystem, Spark fits well into it and is deeply integrated to deliver an end-to-end solution. Spark can leverage the management and security capabilities of the Hadoop ecosystem. This research demonstrates how Spark is integrated with Apache Hadoop ecosystem offerings such as HDFS for storage, Apache Kafka for streaming, YARN for resource management and Hive for querying. In the newest versions of Spark, it has expanded support for container management software such as Kubernetes
Spark complements, rather than replaces, Hadoop. However, Spark isn’t dependent on HDFS for storage or YARN for resource management. It can be run on top of a plethora of relational databases, nonrelational databases and block stores using different types of resource managers and in containers.
Big data architectures consist of three major subsystems:
- Data ingestion: This subsystem is responsible for consuming data from multiple sources in various formats in a reliable, secure and performant manner. Depending on the complexity of data ingestion, various technologies are deployed. Prior to big data architectures, most data ingested emanated from the operational systems that housed the systems of record. A system of record is the single source of truth that is persisted in a database. That record is usually governed by a master data management (MDM) product and is guarded as a crown jewel of an organization. Spark has the potential to challenge the way technical professionals think of systems of record. In the new paradigm of schema-free data streaming from multiple sources into a big data environment, a system of record may be either persisted in a database or acted on in memory.
- Data processing: The system of record is then manipulated by extract, transform and load (ETL) tools to transform the data or enrich it to meet business user needs. Once again, Spark provides the engine to perform the various transformations. In fact, big data architectures already changed ETL to ELT, or extract-load-transform. Now, with Spark’s help, ELT is transforming into DELDT, or discover-extract-load-discover-transform. So, Spark is being used to discoverdata sources at scale at their original locations and once again when multiple data sources are persisted in big data infrastructures. Discover is the ability to understand the characteristics of data values in order to decipher their classifications, such as sensitive data or data belonging to business domains. This discovery becomes part of the metadata that is then used to enable data governance capabilities such as data security and privacy, and lineage and provenance.
- Data access: This is the final piece in the architecture where the end users analyze the data to meet their various business needs. Typically, the end users are business analysts who use business intelligence (BI) tools to run reports and visualizations. Spark not only provides the underpinnings to prepare the data to meet these needs, but it also expands the types of data access for data scientists who want to run ML algorithms on top of data.
Spark is the unified engine to ingest data from many different sources using different techniques; process data into multiple data formats like Parquet, ORC, Avro or the Kafka message bus; and store it in one of many different targets (see Figure 1). Throughout the architecture, Spark can be used to provide analytics at any stage.
Figure 1. Spark Is the Glue
ADLS = Azure Data Lake Shore; C* = Apache Cassandra; CDC = change data capture; GCP = Google Cloud Platform; GCS = Google Cloud Storage; JDBC = Java Database Connectivity; MQTT = Message Queue Telemetry Transport; S3 = Simple Storage Service
Source: Gartner (August 2018)
Big data architectures shine when they are used to ingest data from multiple sources in structured and unstructured formats for advanced analytical purposes. Big data technologies such as Hadoop are being deployed to consolidate data from existing relational databases and other sources into a data lake. The input sources range from legacy mainframe systems to flat files on Network File System (NFS). Figure 1 depicts some of the more common sources that most Gartner clients have, which may include:
- Relational database management systems (RDBMSs), operational and transactional databases, and data warehouses
- Clickstream data and web logs (time series data)
- Market or social media data
- Geospatial data (latitude and longitude)
- Sensor data (IoT endpoints)
Relational Database Management Systems (RDBMSs), Operational and Transactional Databases, and Data Warehouses. These have been organizations’ bread and butter systems for taking care of operational and analytical needs. The data from applications based on Oracle, SQL Server, Db2 and other relational systems can be made available in batch mode or in incremental changes to the data. The latter is known as change data capture (CDC). Batch relational data can be ingested using Apache Sqoop or even as a JDBC API call. This data can also be consumed as a flat file using FTP. Spark is now used to enhance Apache Sqoop’s performance for batch ingestion. However, Sqoop requires data to be persisted in HDFS. Alternatively, Spark can be used to read data directly from an RDBMS into its memory as DataFrames, which are covered later in this document. This is possible by including the RDBMS’ JDBC driver jar file in the Spark code. This is important as Spark is now being used to process high-velocity IoT data from devices and sensors, but the metadata needed to perform useful analytics usually comes from the RDBMSs. Some examples of metadata in the IoT use cases include asset and device characteristics, location information, warranty data, and supply chain information.
CRM systems from vendors such as Salesforce and NetSuite are examples of applications that use relational database technology.
Another common operational application is in call centers, which create structured data and free-form text data in the form of problem resolution. The problem resolution data is unstructured. It contains valuable corporate information, but it is usually locked away and hard to search unless its text is properly categorized or classified, indexed and mined. Medical records and newspaper articles also fall into this category of unstructured data that lacks the proper context needed to parse. Spark is frequently used to perform text mining on unstructured data.
Clickstream Data and Web Logs. As users click through applications in a web browser or on a mobile app, organizations collect the inputs in order to analyze the stream of data for patterns to improve customer experience or fix anomalies. This data may arrive in the form of key value pairs and provide organizations with a treasure trove of intelligence. Development teams often use this data to perform A/B testing.
Spark is a very popular choice to perform clickstream and log analysis. As this data is usually voluminous and arrives at a very high velocity, Spark is used to process the data in memory, aggregate and perform analytics. Clickstreams and logs are analyzed to understand and predict customer conversion, churn or other behaviors. This may require this data to be joined with related data in ERP and CRM systems.
Market or Social Media Data. Organizations have learned to monitor social sites such as Twitter, Instagram, Snap, Pinterest, Facebook and LinkedIn to extract data pertaining to their customers or products. Most sources of social sites make data available via API calls. A common use case for this data is for “sentiment analysis” initiatives in order to decipher whether the social media comments are favorable or not. These initiatives were done in Hadoop using MapReduce, but can be performed much more efficiently in Spark.
Geospatial Data. Geospatial data has location information and is generated by a large number of devices such as vehicles, satellites, mobile phones and IoT sensors. The use cases of geospatial data are varied and are used by commercial, local governments and defense establishments. For instance, package delivery companies ingest streaming geospatial data to optimize operations for their fleets of trucks. Disaster relief organizations use geospatial data to track natural disasters such as hurricanes, and phone companies produce call data records, which are used for billing and by law enforcement agencies to track emergencies.
An example of using Spark for this use case comprises taking public data on taxi or ride share rides from the relevant organizations and joining with the cities’ geographical information to predict congestion and bottlenecks.
Sensor Data. IoT data from endpoints including sensors and devices is increasingly leveraged to improve efficiencies within organizations. IoT data time series data that may be in various formats such as binary, key value or even JSON is transmitted over protocols such as MQTT, AMQP, COAP or HTTP. Unlike other categories, some use cases for IoT require processing to take place on the endpoint or on an intermediate server called an IoT edge gateway or in the cloud. IoT data needs to be streamed and persisted to do historical analysis or advanced analytics. For further information on IoT, please refer to the “Design Your IoT Data Architecture for Streaming Edge Analytics and Platform Advanced Analytics.”
Spark has connectors available for the source formats mentioned above. In addition, data may be ingested into a distributed messaging framework such as Kafka, which has a tight integration with Spark.
Processed data can be persisted in the organization’s data stores, which can range from relational database systems or nonrelational/NoSQL data stores to cloud object stores. Spark may also write the output to a message bus such as Apache Kafka, which has configurable persistence. In addition, Spark can write the data in multiple file formats, such as Avro, Parquet and ORC.
Spark is a comprehensive framework that has been designed as a unified platform to meet various use cases. Figure 2 shows Spark’s components and its role within big data architecture.
Figure 2. Spark Components
BI/ML/AI = business intelligence/machine learning/artificial intelligence; EC2 = Elastic Compute Cloud; GPU = graphics processing unit; K8S = Kubernetes
Source: Gartner (August 2018)
Spark Core is the part of the framework that is responsible for scheduling and running Spark jobs. It executes within a Java Virtual Machine (JVM) on distributed nodes within a cluster. However, a nonclustered version of Spark is also available, and it runs on all common single-node setups, such as on a Mac or Windows laptop for development. Access to Spark Core is through APIs, which come in three different flavors:
- Resilient distributed datasets (RDDs)
Resilient Distributed Datasets (RDDs). The RDD was introduced in the early versions of Spark and, at that time, was the only option to manipulate data. It is a low-level API that today would be akin to programming in assembly language. RDDs are distributed and immutable collections of data partitioned across the nodes in a cluster so they can be operated in parallel.
Like assembly language, RDDs may still be used to perform very low-level tasks, but developers who use RDDs will lose out on all the performance optimizations that are available in the other two types of APIs — DataFrames and Datasets. However, it must be noted that even though other APIs were introduced to simplify programming Spark tasks, RDDs are the fundamental abstraction of Spark and how the tasks are executed no matter which API type is used. Spark creates a map of logical partitions of RDDs that will execute across the nodes into directed acyclic graphs (DAGs).
To understand RDDs, let’s take an example of big data’s equivalent of the Hello World programming — word count. You may not have paid attention to your high school English literature class on Shakespeare, but now you have been tasked with counting all the words in one of the Bard’s classic tomes. Fear not, as Spark comes to your rescue. For this example, we shall use Macbeth:
- Step 1: Read Macbeth into an RDD called
- text= sc.textfile (“Macbeth.txt”)
- “sc” stands for Spark Context. Spark context is the master of the Spark application that is used to execute the Spark program by creating an RDD and accessing the resource manager such as YARN. In this example, the location of the file was on a local disk. The source may exist on a file system such as HDFS or elsewhere such as the Amazon Web Services (AWS) S3 object store or in any relational or nonrelational database.
- Step 2: Perform the required action(s) on the RDD created in Step 1. The action, in this example, is a word count. In other applications, an action may consist of tasks such as data transformation. Complex actions may result in creation of multiple RDDs. One unique feature of Spark is called “lazy evaluation.” Spark will not compute anything, and hence consume compute resources, until an action is requested.
- Step 3: Query the RDD. At this stage, Spark creates the DAG. The query of RDD may involve tasks such as filter, join, map or reduce. One thing to note is that the entire copy of Macbeth is loaded into the RDD. This makes RDD coarse-grained, which means that Spark will operate on the entire dataset and is not able to transform individual elements within RDD. This makes the RDD design very simple, but also limiting.
Spark code is very compact compared to MapReduce. For example, the word count example code using RDD takes up only three lines compared to over fifty lines in Java MapReduce code.
The “R” in RDD stands for resilient. RDD builds a lineage by logging transformation, so in the case of an abnormal activity, RDD can recreate the RDD from its metadata. It does not need to replicate the data to another node.
DataFrames and Datasets. As Spark matured, Databricks introduced an abstraction over RDDs through the DataFrames API in Spark 1.3 so that users could work in more familiar structures like tables with rows and columns. In Spark 1.6, Databricks introduced the Datasets API to further simplify development. It is important to note that these two new APIs are built on top of RDDs.
DataFrames is also an immutable and distributed collection of data, except the data is organized similarly to the relational concept. This allows developers to more easily program and even use SQL statements to handle data. It processes structured, semistructured and unstructured data.
One drawback of DataFrames is that, when it handles input data of unknown schema, it does not enforce compile time safety (i.e., it does not check for the datatypes). This was added in the Datasets API to make it more developer-friendly. Datasets unifies the best of RDDs and DataFrames. In addition, Datasets APIs use encoders to serialize and deserialize JVM objects, which results in compact bytecode.
DataFrames and Datasets are able to infer the schema of data ingested, unlike RDDs where the user is expected to specify the schema. During the discussion on RDD, we introduced the concept of SparkContext. With the advent of DataFrames and Datasets, this has been replaced by SparkSession. SparkContext was built for creating RDDs, but its main drawback was that separate contexts were needed to use APIs for Hive, SQL and streaming. In Spark 2.0, SparkSession became the unified entry point for all Spark APIs.
Gartner clients often ask when to use DataFrames versus Datasets. Starting with Spark 2.0, DataFrames have been merged into Datasets. Datasets are now of two types — untyped APIs and typed APIs. The former (untyped) is a DataFrame. DataFrames APIs are available for coding to maintain backward compatibility. Table 1 provides guidance on when to use the DataFrames API versus the Datasets API.
Table 1: When to Use DataFrames Versus Datasets
|DataFrames API||Datasets API|
|Tighter control over functionality||Runtime safety|
|For interactive analysis||For engineering tasks|
|Scala, Java, Python and R||Scala and Java only|
|More compact code|
Source: Gartner (August 2018)
DataFrames and Datasets have benefited from two initiatives — Catalyst Optimizer and Tungsten. These initiatives are covered in The Details section of this document.
Let’s look at some examples of DataFrames. In the example below, a DataFrame is created by reading a stream of data of the type Apache Kafka by subscribing to one its topics called T1. The package “readStream” was introduced in Spark 2.0 to handle streaming use cases. By simply replacing it with “read,” Spark is able to handle batch use cases.
Apache Kafka can be replaced by a number of formats and data sources supported by Spark, such as AWS Kinesis. For example, spark.readStream.json(s3://weblogs) can be used to read log data continuously from an AWS S3 bucket in JSON format.
Let’s move on to the next step, which may be to perform aggregation on the streaming data. As an example, one use case may require us to analyze the input stream to detect which device it originated from. For example, Netflix may want to get real-time information on the number of concurrent users across different devices such as TVs, smartphones and PCs. A second use case may require analysis based on time stamp or even product ID. This can be achieved in the code below by specifying the appropriate “type.”
In the example above, records are being counted after doing a groupby. The agg aggregate option could be replaced by min, max, sum or even a user-defined function (UDF) developed using an ML algorithm.
Because Spark doesn’t have a persistence layer, the following example shows the output being written to another Kafka topic called T2. Kafka, by default, provides persistence on the disk.
The output could also be to other file formats such as Parquet or JSON or to a persistent data store such as, Apache Cassandra. For example, kafka in the example above can be replaced by jdbcfollowed by the location of the JDBC-supported database such as MySQL or MariaDB. While writing to a target, partitionBy can be used to separate writes by options such as time stamp or device name or product. In addition, Spark allows output to be appended to a target or to update an existing record in the target. If the latter option is chosen, then Spark provides prefix integrity to ensure transactional consistency.
Spark’s popularity stems from its performance, ease of use and versatility. It has the potential to unify infrastructure pertaining to performing disparate key functions within data pipelines by using its libraries as described in this section.
Streaming can be thought of as an unbounded table in which new rows are continuously streamed into. Spark first introduced the concept of streaming using Dstreams, which were based on micro-batches and which used RDD. “Dstreams” stands for discreticized streams. In Spark 2.0 release, DataFrames-based Structured Streaming was introduced, which still used microbatches. Structured Streaming API provides fault-tolerant data ingestion using receivers for common sources such as Apache Kafka and Apache Flume. It processes the data using exactly once semantics, which guarantees message delivery and prevents data duplication. Structured Streaming simplified APIs by removing the need to define micro batches but the underlying architecture still used them. This provided end to latency of 100 ms. However, with Spark 2.3, Continuous Processing was introduced, which launches long-running tasks that continuously reads and processes streaming data. This has reduced end-to-end latency to 1 ms. However, this feature is still experimental in Spark 2.3 and is not yet GA. Another important point to consider is that, while the microbatch option provides exactly-once semantics, Continuous Processing provides at-least-once semantics. So, the choice of which mode to use is based on your use case requirements. Spark 2.3, provides both the options which can be chosen by setting the configuration parameter.
Spark supports ANSI SQL 2003 standards. A litmus test for a product’s ability to support the SQL standard is its ability to run the 99 SQL queries specified in the industry standard benchmark for decision support solutions (Transaction Processing Performance Council [TPC-DS]) without modifications. Spark runs all of the TPC-DS benchmark queries without modifications and at comparable or higher performance than other big data SQL engines. In addition, Spark has also optimized the execution of SQL statement using the well-defined cost-based optimization techniques used frequently in relational databases. This enhancement has enabled faster SQL query execution and is called Catalyst Optimization. Further explanation of Catalyst Optimization is covered later in the document. Key features of Spark SQL are:
- It can natively read and write using structured formats such as Hive, Parquet and semistructured formats such as JSON.
- Spark SQL has Data Sources API for integration with third-party databases such as NoSQL (MongoDB, Cassandra etc.), and databases such as Amazon Redshift have an index of Spark packages.
- SQL statements can be embedded in Python, Java or Scala programs to run queries or any BI tool, such as Tableau can connect to Spark SQL using standard ODBC/JDBC connectors.
Spark 2.3 introduces a new, but experimental, version of the Data Sources API called V2 which resolves many of the limitations, such as:
- When Spark was first developed it was meant to speed up the slow processing of data via map reduce jobs. So, Spark supported only HDFS format which is row based and for that it created RDD and SparkContext. However, today, much faster processing is done using columnar format. Data Sources API V2 now can perform columnar scans in addition to row scans using Dataframes/Datasets
- As can be expected, the original API was meant for batch processing while the new version provides batch and streaming support.
- Data Sources API could not communicate partition locality information to the Spark engine which lead to slower performance of random reads. This limitation is resolved in the V2 API.
- As Spark was developed for use with HDFS, it did not need transaction support. If a write operation fails in HDFS, it is tracked in _successful The V2 API supports transactional writes.
- Data Sources API has very little ability to be extended and customized for the underlying data sources. For example, it is unable to leverage faster processing of data by the data sources but V2 can determine when to do “SQL Pushdown” to take advantage of the data sources’ faster sorting or aggregations.
- Data Sources API was written in Scala but, in order to increase the usability of V2, it is written in Java but it supports Scala. However, the existing user code will not need to change in order to use the V2 API.
Spark provides a general-purpose ML library called MLlib, which contains the most commonly used algorithms. The advantage of using MLlib is its ease of usage and performance. It allows a faster ramp-up for organizations into the data science arena and accelerates delivery by providing algorithms. However, it is powerful enough for experienced data scientists to train, evaluate and run complex models involving well-known ML algorithms. Programs written using MLlib can be called as user defined functions (UDFs) in Spark SQL queries. Embedding ML code via UDFs in an SQL statement allows developers to avoid creating separate data pipelines for the ML use cases. This makes Spark a very powerful engine. UDFs can be written in Java, Scala, R or Python. However, Python code was much slower due to serialization needs. In Spark 2.3, PySpark serialization and execution performance has improved significantly due to the use of vectorized formats (which is part of the Apache Arrow project).
Spark allows data to be viewed in terms of vertices and edges in order to perform graph operations. To understand this functionality, consider the airline data, available easily from the Department of Transportation. Spark SQL can be used to find the busiest airports or the airports with the most delays. But if a user wants to see the number of airport connections between any two airports, GraphX is used to determine the degree of connections between airports (vertices) using available flights (edges).
Please note that future releases of Spark will introduce Graphframes.
While Spark itself is written in Scala, the developer doesn’t need to know Scala to leverage Spark. Spark development can be performed, in addition to Scala, in Java, Python or R.
Python has quickly risen to become one of the most popular languages used in the Spark ecosystem — mainly due to its ease of learning, simplicity and readability. In addition, Python takes a modular approach that allows data scientists and other developers to quickly import modules that provide rich functionality instead of having to develop them. Some of the more popular modules are:
- Pandas is an open-source library that contains high-performance data structures and data analysis options that can perform functions similar to Spark DataFrames. In fact, in Spark 2.3, there is now support to create Spark UDFs using Pandas through the toPandas()and createDataFrame(pandas_df)functions.
- NumPy library provides a rich set of mathematical functions and multidimensional arrays.
- SciPy contains statistical algorithms that can help in areas such as digital signal processing.
- Matplotlib 2D and 3D plotting package is used to deliver data visualization.
Management and Spark Deployment
Spark resources need to be managed along with the resources utilized by other components of an organization’s big data environment. This is achieved through:
- Spark’s own resource manager, Standalone
- Apache YARN in the Hadoop ecosystem
- Open-source options such as Apache Mesos. The Kubernetes (K8s) option has been available in since Spark 2.3 release as experimental.
- Third-party resource management solutions such as IBM Spectrum Conductor with Spark are also available to manage resources across multiple Spark deployments. The IBM product also provides cluster management for multiple Apache clusters.
The resource manager is responsible for allocating CPU and memory across various concurrently running applications. Proper configuration of resource manager is required to ensure high performance of all the components of the big data architecture, especially in a multiuser and multitenant environment. The resource manager is important as Spark applications can consist of mixed workloads including short-running synchronous interactive queries along with long-running batch jobs and ML applications. Each of these workloads have very different CPU, memory and storage needs.
Spark can be deployed in the cloud or on-premises, as depicted in Figure 3.
Figure 3. Spark Deployment Models
Source: Gartner (August 2018)
The open-source Spark download and related documentation is available directly from Apache’s website.
Hadoop vendors such as Cloudera and Hortonworks and Hadoop-compatible vendor MapR provide support for Spark in their offerings. In June 2017, IBM dropped support of its own Hadoop distribution, BigInsights in favor of Hortonworks Data Platform (HDP). IBM’s Data Science Experience and its query platform called Big SQL leverage HDP. IBM Big SQL supports Spark in addition to Apache Hive and Apache HBase, and it is compatible with Oracle. Cloudera offers Data Science Workbench to simplify the development and execution of ML code using Spark.
Databricks provides the latest version of its Spark release for download on-premises.
Databricks has also launched its own cloud-based offering, called Databricks Unified Analytics Platform, which runs in AWS cloud and as a first party service on Microsoft Azure. Databricks has developed:
- Databricks Runtime 4.0 to improve the performance of Spark by optimizing input/output (I/O) operations (Databricks I/O) and also simplify big data operations by offering Spark as a serverless engine (Databricks Serverless).
- Databricks File System (DBFS),which abstracts the underlying storage of AWS S3: S3 is eventually consistent, but DBFS, like AWS’ EMR File System (EMRFS), provides strong consistency.
- Databricks Collaborative Workspace to help data scientists collaborate with data engineers to train, validate and deploy ML models into production.
- Integration with all AWS offerings such as Kinesis and Redshift.
- A limited but free Community Edition and a full-featured commercial platform.
Public cloud providers such as AWS Elastic MapReduce (EMR),Microsoft Azure HDInsight and Google Cloud Dataproc provide Spark and Hadoop capabilities. In addition to the public cloud provider’s platform as a service (PaaS) Spark deployments, organizations can deploy the Spark offerings mentioned in the On-Premises section in the cloud in infrastructure as a service (IaaS) setup.
A new breed of big data as a service (BDaaS) vendors such as Qubole, BlueData Software and Cazena now provide Spark deployment in fully managed offerings. These offering can reduce the time to onboard Spark workloads by reducing the time needed to setup the infrastructure as well as train the staff.
As a summary, Table 2 shows examples of running Spark across various cloud options.
Table 2: Spark Runtime Options in the Cloud
|PaaS||AWS EMR, Azure HDInsight, Google Cloud Dataproc, Databricks|
|SaaS||Qubole, Cazena, BlueData Software|
Source: Gartner (August 2018)
In 2018, Microsoft and Databricks also introduced Azure Databricks to provide a collaborative data science and advanced analytics platform that is optimized for Azure cloud’s various components. Key features of Azure Databricks are as follows:
- Collaborative workspace with a Databricks Notebook can be used by the developers and data scientists to develop and share code.
- Integrated in the Azure UI, which allows the ability to create fully-managed clusters very quickly in the cloud by answering a few prompts. If needed, multiple clusters for other workloads can be created simply by duplicating the setup. Clusters can automatically scale up to the limit specified when they were first created and can scale down when not needed. This makes it an excellent fit for ephemeral workloads where compute needs are required for a short time only. Azure Databricks also provides the Serverless option to further abstract the infrastructure complexity.
- Integrated with Azure security such as Active Directory
- Integrated with Azure storage options such as ADLS, Azure Blob Storage, Azure SQL Data Warehouse and CosmosDB through optimized APIs.
- Integrated with Azure data integration components such as, Azure Data factory (ADF), Azure Event Hub and Azure IoT Hub.
- Integrated with BI tools such as Power BI and any JDBC/ODBC supported tools.
Spark’s Runtime Architecture
Figure 4 depicts a Spark runtime architecture consisting of a master node and one or more worker nodes. Each worker node runs Spark executors inside JVMs.
Figure 4. Spark Runtime Architecture
Source: Gartner (August 2018)
Spark applications acquire executor processes across multiple worker nodes and communicate with each other. The executor processes run tasks in multiple threads. Each worker node may have multiple executors belonging to different Spark applications, but they do not share any data in memory. However, they can share data by writing to a persistent store.
The Master node runs the driver called Spark Context.
Figure 4 shows clients that interact with Spark such as Spark shell. Spark shell is an interactive environment written in Scala, which allows users to run ad hoc queries. The Spark shell for Python is called PySpark, and for R is called SparkR. All these interactive environments can be started by executing the appropriate shell script in Spark distributions’ bin directory. SparkR can also be run inside the RStudio IDE.
In addition, Apache Livy is an open-source Apache incubator project that allows clients such as web and mobile apps to leverage Spark without any Spark client code. It allows clients to submit Spark jobs and retrieve results using REST APIs in a fault tolerant setup. The jobs can be in Scala, Python or Java. Livy allows multiple long-running Spark Contexts on the cluster (YARN or Mesos) to be used by multiple Spark jobs and multiple clients. This provides multi-tenancy and high concurrency.
Container technologies using products such as Docker have recently gained much momentum. Each container consists of the entire stack including operating system, drivers, Spark libraries, applications, ML models and even Notebooks. Containers allow Spark to be deployed quickly and run in a performant manner. They allow multitenancy in typically microservices architecture (MSA) for greater flexibility and agility when deploying to public or private cloud, on-premises and in hybrid environments. However, containers need to be carefully managed for network, security and user access control and management. The management is achieved using container management products such as Docker Swarm, Mesos, Kubernetes and AWS ECS.
Spark can be deployed within Docker containers via two image files containing:
- Master container to run Spark Master and Notebooks such as Apache Zeppelin and Project Jupyter
- Worker containers to run Spark Workers
Containers are stateless. A Spark container will contain only ephemeral storage, which is needed to perform the compute tasks. Persistent storage is outside the container.
In Spark 2.3, Spark workloads can now be launched natively in a Kubernetes cluster. Kubernetes as a cluster manager has grown very fast and now Spark can leverage its capabilities to provide multitenancy. However, there are limitations in the current release such as only Java and Scala applications are supported. Additional features will be added in the upcoming releases.
Spark does not have its own storage system and hence it relies on external data stores. Almost all enterprise-ready databases — relational or nonrelational — and object stores ship today with support for Spark. Some of the common ones are listed below, but this list is only illustrative:
- Hadoop ecosystems’ HDFS, HBase and Hive
- Traditional relational databases such as IBM Db2, Oracle and SQL Server
- Scale-out relational databases such as MemSQL, Google Spanner, NuoDB
- Nonrelational or NoSQL databases such as Cassandra, MongoDB, Redis, MarkLogic, Couchbase, Aerospike and Neo4j
- Cloud object stores such as Amazon S3 and WASB/ADLS and Google Cloud Platform
- In-memory file systems such as Alluxio, Apache Ignite and Hazelcast
Gartner is often asked which data store should be picked for use with Spark. Gartner clients select the optimal data store based on their strategic needs and by referring to “Identifying and Selecting the Optimal Persistent Data Store for Big Data Initiatives.”
In 2018, Databricks has made available, in private preview, Databricks Delta, which adds structured storage to existing data lakes and brings transactions (for data integrity) and performance improvements (for analytics). It stores data as Parquet files in DBFS and maintains a transaction log to keep track of changes to the data.
Efficient use of underlying hardware is critical to Spark’s performance. In Spark 1.3, Databricks launched Project Tungsten to make more efficient use of CPU by getting closer to the maximum performance provided by the bare-metal hardware. As seen in the Spark Runtime Architecture, Spark executors run inside JVMs, but JVMs have been beset with long pauses during their garbage collection process. As part of Project Tungsten, Databricks introduced the option to use off-heap memory for storing the data. This is enabled by default in the configuration file where the parameter, spark.memory.offHeap is enabled and spark.memory.offHeap.size is set to a default size in bytes.
Project Tungsten makes use of CPU registers and L1 and L2 CPU caches to provide better performance.
Spark is being used for running compute and memory-intensive tasks on large amounts of data. Some algorithms based on ML (e.g., image recognition) have requirements that sometimes are not met by CPUs, so some Gartner clients have started exploring GPUs such as NVIDIA’s Tesla and Google’s Tensor Processing Units (TPU). GPUS are known to execute tasks in parallel on very large numbers of cores. This allows certain use cases to be very effective on GPUs but not all use cases, Databricks has tuned Spark to harness the power of GPUs for these compute-intensive needs.
In addition to CPUs and GPUs, Spark also runs on FPGAs and ASIC proprietary chipsets. The collaboration of IBM’s Big SQL and HDP support IBM’s Power8 server running “little-endian” Linux, IBM AIX and IBM i operating systems.
Spark is quickly proving to be the distributed engine for many commercial as well as corporate in-house applications because of its execution performance. Most commonly, Spark is being used across the vast categories of third-party applications and products, some of which are:
- ETL tools such as Informatica, Ab Initio, IBM InfoSphere DataStage, Talend and Pentaho.
- Data ingestion products such as StreamSets and Apache NiFi.
- Data governance tools including data profiling, data catalog, metadata management, self-service data preparation and data wrangling that need to curate large amounts of data. Spark has been used extensively to identify data quality issues across disparate datasets ingested into data lakes.
- Notebooks to execute interactive queries and build applications leveraging Spark such as Apache Zeppelin and Jupyter.
Table 3 summarizes key aspects of Apache Hadoop and Apache Spark.
Table 3: Hadoop and Spark Differences
|Storage||HDFS||Leverages external stores (cloud and on-premises)|
|Processing||Batch via MapReduce on disk||Streaming, in-memory, and on-disk batch|
|Programming API||Java||Scala, Java, Python and R|
|Developer Tools||APIs, no shell||APIs, Shell, Java REPL and Notebooks|
|Clustering||YARN||Standalone, YARN, Mesos, Kubernetes and Docker|
|Data Access||Pig, Hive||Spark SQL and HiveQL|
|Security||Ranger, Sentry and Kerberos||Kerberos, keys|
|Machine Learning||Mahout||MLlib and Deep Learning Pipelines|
|Use Case||Batch processing||Batch, interactive, iterative and near real-time|
Source: Gartner (August 2018)
The starkest difference between Apache Hadoop’s MapReduce and Spark stems from the fact that the former is heavily disk-based and hence orders of magnitude slower in processing tasks than Spark. For example, a job that involves three MapReduce operations will incur at least 12 disk I/Os compared to Spark’s two disk I/Os.
Although both MapReduce and Spark Executors run inside JVMs, Spark persists the JVMs in memory, unlike the former.
Spark code is also much more compact and is, on average, seven times smaller than comparative MapReduce code. Throughout this research, various optimizations performed by Spark engine are mentioned. These fall under two initiatives: Project Tungsten and Catalyst. These enhancements further the advantages of Spark over Apache Hadoop.
Spark Use Cases
Spark is a versatile technology that is being used to deliver end-to-end data pipeline tasks. It is quickly becoming the “Swiss knife of big data” by supporting a plethora of uses (see Figure 5).
Figure 5. Spark Is the Swiss Knife of Big Data
Source: Gartner (August 2018)
This section examines the use of Spark to meet the following four broad use cases (see Figure 6):
- Data ingest
- Data transform
- Machine learning
- Advanced analytics
Figure 6. Spark Provides a Unified Platform
Source: Gartner (August 2018)
Depending on the selected use cases, consumers of Spark may vary, as shown in Table 4.
Table 4: Spark Use Cases
|Data Ingest||Acquire structured and unstructured data
Handle discrete time series data (e.g., IoT)
|Batch and streaming ingestion
Integration with sources, message bus and sinks
|Data Transform||Process data (e.g., transform and cleanse)
|Data engineers/data scientists|
|Machine Learning||Create training and evaluation sets
Engineer, test and deploy models
|Analytics||Operationalize and deploy analytical solutions
Incorporate ML UDFs in SQL queries/analytics
SQL reports and dashboards
Source: Gartner (August 2018)
An obvious starting point of any big data pipeline is to ingest data originating from various internal and external sources. In traditional applications, data ingestion was limited to a small number of operational systems, which usually contained structured data. This data was processed by an ETL tool such as Informatica, IBM InfoSphere DataStage, Talend or Oracle Data Integrator, and was then sent to a data warehouse or a data mart for analytical reporting. Organizations used the reports to analyze key performance indicators (KPIs) and drive business decisions pertaining to the health of the business.
In modern architectures, data-driven organizations are keen to ingest and connect internal data with related data from multiple other sources to deliver more advanced analytics in near real time. In this new world order, disparate sources generate data with varying volume, velocity and variety. Architectures should be able to either pull new data from the sources (e.g., using a message bus based on Apache Kafka) or the source may continuously push new content (e.g., the Twitter API emanating tweets).
Can an architecture based on Spark provide the agility to handle this complex scenario?
Organizations that have successfully collected data from different sources into a common place such as a data lake or a landing zone are able to provide their BI users or data scientists with the ability to deploy advanced analytical options. ML algorithms are more successful when they have access to vast amounts of learning data. A word of caution is warranted here: Just collecting data does not ensure success. Many data lakes have turned into data swamps simply because the data was not curated and governed adequately. The data transform use case touches on Spark’s role in helping curate the data to ensure the success of a data lake initiative.
There are many options for ingesting data. Data is usually extracted, in the Hadoop ecosystem, from RDBMS using Apache Sqoop in batch mode. Organizations like Uber are heavy users of Apache Sqoop, but they have enhanced this product to use Spark and provide much higher throughput.
Change data capture (CDC) is used to extract incremental new data from a source. Some of the tools that offer this capability include Oracle GoldenGate, Attunity Replicate, Striim, IBM InfoSphere, Debezium and Tungsten Replicator. A number of commercial solutions with sophisticated workflow and user interfaces are also available to ingest data from multiple sources. These include StreamSets and Apache NiFi. The above-mentioned products have Spark connectors to leverage its distributed in-memory execution engine.
The products mentioned thus far incorporate the Spark engine to meet low-latency and scalability needs. However, Spark code can also be written to directly perform data ingestion. The code can support traditional batch as well as real-time streaming ingestion. The advantage of writing Spark code for ingestion is that organizations can customize it to meet their use cases. For example, streaming ingestion code can be used to perform fraud detection routines or data quality checks.
Spark introduced a streaming option called Structured Streaming. Prior to its introduction, Spark was not considered a true streaming platform because it made use of microbatches. The older streaming engine used discretized streams based on RDDs. However, with DataFrames-based Structured Streaming, the underlying architecture is now streaming with support for late-arriving data.
Data Ingestion Challenges/Spark Solutions
- Complex ingestion: On one hand, organizations have increased the number of data ingestion sources. On the other hand, some organizations deploy multiple data stores in a polyglot persistence This leads to complex data ingestion challenges and, if not managed carefully, can lead to a spaghettilike ingestion architecture. Spark-based enterprise data ingestion technologies are recommended instead of hand-coding procedures. This will help to avoid monitoring, governance and auditing nightmares.
- Diverse data formats: Data in source and target formats varies, which may lead the ingestion tool to perform some transformation such as serializing data on read from a relational table into an array. Similarly, the array may have to be deserialized before writing to the target. Spark provides connectors for most commonly used formats. It also provides a platform to easily develop new custom connectors. The connectors should be developed to be fault-tolerant and guarantee reliability. MQTT is the one of the most popular standard for IoT use cases and was developed in 1998. It has many advantages, including a small two-byte overhead. Although MQTT details are beyond the scope of this research, Spark provides support for MQTT via a Python API.
- Integration with other ingestion tools: Some organizations use Spark in conjunction with other ingestion technologies. Two well-respected products are the open-source Apache Kafka and its commercial offering from Confluent. Kafka and Spark are tightly integrated and many organizations treat both as first-class citizens. Many Gartner clients have standardized on an ingestion tool and now are introducing Spark to enhance their data pipelines. For example, HDP users often use Spark capabilities that are built into Hortonworks Data Flow (HDF), which also uses Apache Kafka and Apache NiFi. Similarly, MapR provides MapR Streams as its Kafka-based scalable ingestion system. A more compressive list of ingestion options is provided in The Details section of this document.
- Stream data integrity: Data may be duplicate. Spark Streaming ensures the uniqueness of the ingested data. Streaming tools deal with three semantics:
- At most once: The message is passed from source to target only once. There is no guarantee that the message will be received. If, for some reason, a message is lost, it is not retransmitted.
- At least once: The message from source must be acknowledged by the target. This ensures that messages were indeed received, but does not guarantee duplicate messages.
- Exactly once: The message from the source is acknowledged on receipt, and it is also guaranteed that there are no duplicates. This ensures data integrity, but it is the hardest semantic to implement. Spark provides exactly once semantics. Kafka introduced this semantic in June 2017.
- Out of order: Streaming data may arrive out of order. Spark handles this by looking at the event time stamp, not just at the time of arrival of the stream.
- Differing data rates between source and sink: It is quite possible that a producer may generate data faster than the target, such as HDFS or HBase, can consume. In that interval, the Spark engine has to hold on to the data temporarily, which can impact the performance of jobs and cause congestion within Spark’s memory. Spark resolved this initially by allowing developers to set a parameter to limit the rate. However, one static rate may not satisfy characteristics of different producers and consumers. Hence, in later releases, Spark introduced back pressure,which dynamically changes the rate of processing by polling the receivers. It then pushes back on the producer to limit its rate.
- Scalable: The ingestion tool should be able to scale appropriately when new data sources are added. As the technology-centric world becomes more business-friendly, self-service ingestion tools give business users the ability to add new data sources into the mix without IT’s active involvement. Spark has proven in benchmarks its ability to scale Spark cluster to thousands of nodes to handle very high ingestion workloads.
Within a data pipeline, once the data is ingested, it needs to be processed and persisted. Many Gartner clients are witnessing a tremendous increase in the amount of data that needs to be processed in the nightly batch job window frame. However, a steady increase in data volumes and transformation complexity is threatening to spill over the time allocated to complete these jobs. In some cases, businesses are forced to open their doors the following day while the nightly batch jobs are still running. This can have serious impacts on the business operations.
Spark can take batch or streaming data and process it in memory at much faster speeds than many older systems were able to achieve. Spark clusters also have the ability to scale as needed to handle spiky data transformation workloads.
One of the most common use cases of Spark is to do the classic ETL job. In the traditional ETL scenario, the tool may:
- Extract data from an online transaction processing (OLTP) database.
- Analyze the data such as profile it, classify it or even enrich it.
- Run a transformation job. Examples include, masking any personally identifiable information (PII) or to aggregating the data into an online analytical processing (OLAP) cube, a data warehouse or a data mart for end-users to query using their BI tools
Spark provides the ability to collapse the separate tasks of extract, transform and load into a single code base, and perform needed tasks in-memory in a more efficient manner. The sample code below shows how Spark is able to read a comma-separated values (CSV) file from AWS S3, perform group by and aggregation transformations and then store the resulting data in Parquet format on AWS S3.
Traditional data transformations are heavily disk-oriented. With the advent of Spark and other in-memory processing engines, why even have the notion of a batch job? In fact, a batch job can be construed as a stream that is bounded within a window of time. Spark is able to improve performance by eliminating frequent disk reads and writes and using memory more efficiently.
Another important aspect is that, because big data applications process large quantities of data, issues stemming from poor data quality get exacerbated. If the quality of data is suspect, then it doesn’t matter how it is analyzed because the results will be inaccurate. Spark is often used directly or through data quality tools to analyze the level of quality and to take corrective action before the data is made available for ML or analytics. This is one of the ways to ensure that the data lake does not turn into a data swamp. This is also a requirement for data governance and regulatory compliance initiatives. Please refer to the following Gartner research for in-depth analysis on data governance topics.
Data Transformation Use Cases
Spark is being used widely to enable many types of data transformations. This section lists a few types:
- File formats: Spark can be used to transform input file formats into more compact and efficient formats, such as Parquet or ORC, that provide superior analytical performance. In addition, it reduces storage costs and network bandwidth needed to transfer data between systems.
- Data curation and governance: These tools attempt to infer schema in a stream of ingested data in order to allow data scientists and business analysts to extract insights. Self-service data preparation (SSDP) tools generally operate on large amounts of data and rely on Spark to perform tasks such as classify or tag sensitive data, profile data, infer and suggest schema, and identify data quality issues. These tasks are also referred to as data wrangling. The metadata associated with large datasets are collected into a data catalog so that business policies can be applied and enforced. Many tools use Spark to achieve the needed scale. Expect to see more of this use case as regulations like the EU General Data Protection Regulations (GDPR) enforcement began on 25 May 2018.
- ETL tool: Many ETL vendors have adopted Spark as one of their distributed execution engines due to its price performance advantage. ETL products extract input data, perform tasks such as map and join source data into other formats, while enriching the output with new values. These tools may perform data validation and filter or aggregate the data for reporting. One example of this use case is when organizations ingest very large amounts of log data from applications, hardware, website, containers, etc., and then use Spark to enrich the log data with let’s say network performance data before applying ML models.
In the previous two use cases, we explored the use of Spark as a platform to stream and process the data. This section explores how Spark is increasingly being used to deliver ML use cases. ML initiatives have a massive appetite for data because more data available to teach the models and evaluate the algorithms leads to better predictive analytics output. Gartner clients have been aggressively pursuing this area to deliver many use cases, such as:
- Ranking and user profiling
- Fraud and anomaly detection
- Recommendations such as collaborative filtering
- Predictive maintenance
Because this area is so much in demand and there are so many products, why should technical professionals consider Spark’s MLlib API and other new ML/AI enhancements?
The ML capabilities of Spark are part of the product’s goal of being an integrated and unified analytics framework. Hence, it complements other use cases and provides the following benefits:
- Simplifies development of ML models and lowers the barrier to entry by allowing citizen data scientists to develop models with little overhead and by using Python and R
- Supports a wide range of algorithms and different types of ML, which allows experienced data scientists to build, test and deploy advanced models, including those involving deep-learning pipelines
- Executes models within Spark SQL as UDFs and integrate with structured streaming to apply models to real-time streaming data
- Integrates with most commonly used external ML platforms and libraries such as:
- Google’s TensorFlow, a neural network library, developed in Python
- Scikit-learn which leverages NumPy and SciPy to deliver a low-level library
- Theano, which also leverages NumPy and makes extensive use of GPUs
- Keras, a high-level neural network API that can run on top of TensorFlow or Theano
- Other ML frameworks such as xgboost, Intel;s BigDL, H2O and PyTorch
- Monitoring and management options such as MLeap, TF Serving, Azure ML and TensorBoard
ML is one of the disciplines of AI. ML itself is composed of different types, as depicted in Figure 7.
Figure 7. Spark MLlib Covers the Gamut of Machine Learning
Source: Gartner (August 2018)
This document is not meant to be a primer on ML algorithms. Please refer to “Driving an Effective AI Strategy” and “Preparing and Architecting for Machine Learning.”
Figure 7 shows different types of algorithms across:
- Supervised learning, which consists of labelled data and is used to do classificationvia algorithms such as K Nearest, Naive Bayes, Support Vector Machine or Random Forest, or regression via algorithms such as linear regression or decision trees. A common usage of supervised learning is to detect spam emails or fraudulent credit cards.
- Unsupervised learning, which doesn’t have the benefit of labelled data, so it uses the corpus of data to cluster involving K Means or Dimensionality Reduction using principal component analysis (PCA) or singular value decomposition (SVD) algorithms. Spark supports all these algorithms, and data scientists use them to build recommendation engines or do market segmentation.
- Deep learning, which is based on neural networks and made its debut in the Spark 2.2 release. This technique is used by data scientists to work highly compute-intensive tasks such as real-time facial recognition on streaming video.
- Natural-language processing (NLP), which is technically a branch of AI. Spark MLlib has strong support for developing structured and unstructured text mining and analysis applications. In this case, raw data is first sent to a tokenizer. After that, term frequency-inverse document frequency (TF-IDF) hashing determines and ranks the importance of each word within the text. The final step is to apply a logistic regression model and perform predictive analysis.
Let’s look at a flow (see Figure 8) of how MLlib-based ML can be built and run to do classification. In this reference architecture, the starting point is historical data. The fact that we already have some data to work with in this example is a clear giveaway that this depicts supervised learning. Depending on the use case, the historical data can contain information such as:
- A collection of spam mail. In that case, the goal of the architecture below is to train itself to identify whether or not email is spam and then classify it accordingly when the actual email arrives.
- A collection of fraudulent transactions. This corpus of data is used to train the model to identify potential fraudulent transactions.
- Categorization of news feeds. Many data aggregators want to make news items available to their customers as quickly as possible. Speed is of the essence because the first one to make the news item publicly available has the opportunity to capture revenue and/or mind share. For example, a material change in a company’s key financial position may engender losses. However, the news has to be categorized first. What better way to do so than with a well-trained ML model.
Figure 8. Ease of Deployment of Models in Spark
Source: Gartner (August 2018)
In this architecture, Spark is being used for:
- Feature extraction
- Model fitting
- Model validation
The algorithm used to build the model may vary between use cases. For example, spam detection is best-predicted using the Naive Bayes algorithm. Once a model has been built and evaluated, it is then deployed in production. The bottom half of Figure 8, shows that, as the real-time live data stream (JSON in this case) is ingested, the model will make the classification (e.g., spam email, stolen credit card or news feed that is in the sports category). Finally, the figure shows a dashboard built using open-source libraries such as Node.js or Vert.x to display the output from the model.
Data scientists have realized that developing the ML code is not the hardest part, but deploying it in production is often a bigger challenge. Organizations are loath to put new models and applications in production without performing extensive testing and getting all the legal and security aspects handled. Some clients have reported the situation is so bad that, once a data scientist has developed a model using a specialized language, software engineers recode it in Java before deployment because that is the language supported in production. The specialized language is not supported.
Databricks has launched new Spark-based offerings to solve the challenges associated with operationalizing of ML models through the following three products:
- Databricks Delta. As mentioned above, Spark relies on external storage options such as Amazon S3 or HDFS. One of the biggest problems with putting an ML in production is poor data quality. Databricks, now offers a storage layer that sits on top of the data lake, and is appropriately called Delta which enforces schema as well as ACID transaction compliance. It allows organizations to use Spark to compute outputs in reports, dashboards, alerts, ML or reinforcement learning or deep learning models. Delta allows creation of indexes to speed up performance. It runs in AWS and Azure clouds.
- Databricks Runtime ML. The external ML platform and libraries mentioned above are available in an integrated and preconfigured environments for the data scientists to accelerate development of ML models.
- This was introduced in June 2018 to help with the last piece of the puzzle: operationalizing. It provides the ability to develop end-to-end workflows that include SSDP, building models, and then deploying various versions of the model. MLflow is open-source and supports multiple cloud providers and hardware options, such as CPUs and GPUs.
Refer to “Making Machine Learning a Scalable Enterprise Reality — From Development to Production”for more information on using Spark for ML use cases.
This section considers use cases pertaining to delivering real-time BI. This use case is a culmination of all the previous use cases — data ingest, data transform and ML.
Imagine a scenario where you have a business performance dashboard that is constantly updated with the latest data. Let’s say, due to a material change in an organization’s financial reporting, the company issues restatements. This causes existing reports to be updated with the new information, and the dashboard therefore shows the latest financial positions. Spark SQL is increasingly used by data engineers and data scientists to perform near-real-time analytics.
Another interesting application of Spark SQL is to resolve concurrency issues. A Gartner retail client replaced its data warehouse with a solution based on Amazon S3 and used Spark SQL as the query engine because the former solution was unable to handle the concurrency needs. This customer got around AWS S3’s eventual consistency by appending data to S3 and storing batch ID and timestamps in a Hive metastore.
Spark SQL has incorporated SQL optimization tricks from relational database systems, which have matured the cost-based optimization for a number of decades. When a Spark query is written in either Spark SQL or using DataFrames or Datasets APIs, it is converted into a logical plan first. The logical plan is then optimized to find the most optimal query option. For instance, if an SQL statement involves joins and filters, Spark may first perform a filter and then do the join when the smaller set of rows are returned. Spark looks at attributes such as cardinality of data and size of data before translating the logical plan into a physical plan. This plan is further optimized and converted into a series of steps called the DAG. The DAG is then sent to the Spark worker nodes for execution inside each node’s executor. The code is compiled into “whole stage code generation,” which minimizes the number of functional calls. This is another one of the Catalyst optimization enhancements explored earlier in this research.
Spark’s Cost-Based Optimizer (CBO) was introduced in Spark 2.2 (see Figure 9). Prior to that, Spark used a set of rules to execute queries. CBO collects table-level and per-column statistics, such as table size, number of rows, NULL values, min/max values, distinct values, cardinality, average length and maximum length. For CBO to work, the “ANALYZE TABLE COMPUTE STATISTICS” SQL command must be run to generate the statistics. Spark’s CBO is especially critical for queries that involve joins. However, in Spark 2.2, it assumed that the data was distributed linearly. In Spark 2.3, CBO now handles skewed data distribution because, in reality, data frequency is not linear. Spark 2.3 puts statistics in histograms.
As CBO is a new feature, it is turned off by default. If you are tuning your Spark application and you want to use VCBO, set the spark.sql.cbo.enabled configuration parameter to TRUE.
Figure 9. Spark SQL Data Flow
Source: Gartner (August 2018)
The SQL Pushdown feature was added to Spark in version 2.1. SQL Pushdown allows Spark to push the execution of an SQL statement to the underlying data store if the optimizer determines that doing so will make the query perform faster.
There are situations where Spark may have to process a complex query or may need to read or write large amounts of data between the database cluster and the Spark cluster. In those cases, when Spark builds the physical query plans, it can generate a query plan for the underlying storage. If the cost of executing the query closer to the data is the lowest, then Spark uses the SQL engine of the data store, rather than its own engine, to execute the SQL.
Snowflake is a popular cloud-based column-oriented data warehouse. In order to enable SQL Pushdown, the following method call in Snowflake’s Spark connector is used:
Once this call is made, the Spark driver will send the SQL query, using the JDBC connector, to Snowflake if needed. Snowflake produces the results and persists them to its underlying store, which happens to be AWS S3. The connector then retrieves the data from S3 and populates it into Spark’s DataFrames for further processing.
Spark SQL also supports reading and writing data stored in Apache Hive. In addition, Hive queries written in HiveQL can be executed using Spark. This requires setting the Hive configuration parameter, hive.execution.engine=spark.
Spark SQL can also read a JSON dataset and even infer its schema and load it into memory as a DataFrame. Similarly, Spark can read and write Parquet files while preserving the schema of the original files.
Spark SQL supports Open Database Connectivity (ODBC) and JDBC connectivity, which allow any BI dashboard and data visualization tool that supports these standards to connect to Spark. Many of the UI tools already have connectors available. In addition, front-end dashboards created in popular frameworks such as Node.js and Angular can execute Spark SQL.
Spark’s benefits stem from being a general-purpose compute engine that runs in a distributed environment. This research has already captured its strengths in ingestion, batch and interactive processing, and analytical and ML use cases over the same set of data in a unified architecture. This allows operational and analytical workloads to run in a single cluster.
In summary, Spark’s strengths are:
- Unified architecture: Spark supports efficient and low-latency stream ingestion and processing through libraries such as Spark SQL, MLlib and GraphX. In addition, Databricks announced Project Hydrogen, which unites Spark’s native ML framework, MLlib, with external deep learning frameworks that organizations may already be using on-premises or in multiple cloud providers.
- Support for standards: In addition to SQL, Spark allows code to be written in Scala, Java, Python and R. Further, GraphX supports Gremlin Query Language. Any BI tool that supports ODBC and JDBC can work with Spark.
- Ease of use: Spark’s APIs are developer-friendly. Spark also has a command line interface (CLI), which can be used to not only start Spark, but also to execute one command at a time. This feature is used by data scientists who like to develop programs in stages and debug as they go.
- Deployment model: Code can be developed on Spark running on a Windows or a Mac laptop and deployed in a highly distributed Linux cluster. It can also be deployed in a Docker container.
- Wide support and large community: Spark has a large community of committers. In addition, all Hadoop vendors and almost all relational and nonrelational database vendors support Spark. Spark is also used by modern data wrangling, data profiling and data preparation, ML, and BI tools.
- Performance: Spark’s in-memory computation and optimized manner of using the persistent store leads to higher performance compared to MapReduce. With Spark Structured Streaming, it is also able to ingest and process data in streams without the need for microbatches.
- Scalability: Spark’s proven ability to handle large numbers of inputs, processing tasks and concurrent user queries meets some of the most demanding use cases at large organizations.
- Resiliency: Spark provides a high level of fault tolerance and high availability. Resiliency is built into its fundamental design.
- Integration: Spark integrates with ingestion products and standards, Hive, Kafka, HDFS, and S3. It also can leverage YARN resource manager.
Spark is a rapidly maturing platform, which has led to growing pains. Some of its weaknesses are:
- Constantly evolving APIs: Spark versions have introduced many new APIs. These releases and the rapid pace of change have forced application developers to do constant rework. The pace of change has now moderated because Spark has reached a higher level of maturity.
- Code portability: Although Python is now one of the most popular languages for working on Spark, it lacks the kind of code portability that Java enjoys. PySpark deployments on Hadoop require appropriate libraries to be available. This weakness is expected to be alleviated with the recent launch of Hadoop 3 through proposed deployment of PySpark in a Docker container under YARN.
- High latency: Prior to the Spark 2.2 release, Spark relied on microbatches, unlike other ingestion options, such as Flink, which use a continuous-flow streaming model. Pre-Spark 2.2, Spark was a time-based window model, unlike Flink, which has a records-based window model and allows users to custom define their window criteria. Proponents of Flink and Kafka Connect claimed that they were true streaming engines, unlike Spark, but this weakness is only for the users of Spark prior to the 2.2 release.
- Memory management: Spark does manual partitioning and caching. Some older versions were not able to gracefully handle situations when its memory was exhausted. Since Spark 1.5, all the core operators in Spark can spill to disk when data is larger than memory. Spark’s architecture allows it to be resilient and not lose any data in case of memory overflow.
- Stateless: By design, Spark doesn’t have a persistence layer. However, thanks to persistence layers such as relational databases, nonrelational data stores, object stores, HDFS or Kafka, it is able to perform stateful computations. As an example, an input Kafka topic can be transformed into a different output Kafka topic with the ability to handle out-of-order data. Databricks Delta is a move toward addressing the lack of persistent store issue.
- Data flow: Spark’s procedural programming model prohibits it from providing intermediate results while data is being processed. Compare this to Flink, which, thanks to its distributed data flow, is able to broadcast precalculated results as needed to all the worker nodes.
Spark has evolved into a viable production platform to meet enterprise needs. It is easy for developers to learn and use to develop solutions. Spark has also cultivated a vibrant community of committers and solutions. Spark’s architecture and its applicability to ingest, process and analyze both operational and analytical workloads allows it to reduce the time between obtaining data and delivering insights.
Gartner clients often ask about guidance regarding the following questions:
- How do I introduce Spark into my organization, and what development and management tools are available?
- Which persistent data store should I use with Spark?
- How can I tune Spark?
- Which resource manager should I use with Spark?
- Where should I host Spark?
- Can I ingest data directly from source systems, or should I use Spark in conjunction with Kafka?
- How does Spark guarantee security?
- What is the SMACK stack?
- Can I run Spark in my virtualization software, or should I use the container technology?
- What tools and options are available to debug Spark code?
This section provides guidance to these most frequently asked questions.
- How do I introduce Spark into my organization, and what development and management tools are available?
Spark is an easy platform to master. Its developers have gone to great lengths to make the platform easy to develop, manage and deploy. If you are new to Spark, an easy ramp-up option is to go to a cloud provider such as Databricks, AWS, Microsoft Azure or Google Cloud and request a basic cluster with Spark and Hadoop. Databricks provides a cloud platform that integrates a notebook environment with serverless Spark to reduce complexity and simplify adoption. It also provides a number of prebuilt notebooks and datasets to enable you to learn Spark faster. These providers also include a notebook, such as Zeppelin or Jupyter to write, test and visualize SQL queries and results.
Develop programming skills in Python to leverage Spark. The PySpark utility allows data engineers and data scientists to execute Python code against Spark. Scala is a natural choice to exploit the platform, but these skills are not as easily available as Python. Use Java and R for complex data science needs. In July 2017, Java 9 was launched. It introduced an interactive shell using read-evaluate-print-loop (REPL). It is surprising that it took Java so long to get a shell, but now that it exists, it can be harnessed to code and debug easily in Java, including for Spark applications.
In addition, use Spark in “local mode” to prototype solutions on local machines in an iterative fashion. Once the code is tested adequately, it can be moved to a clustered infrastructure. This can reduce development costs while accelerating interactive development of Spark applications.
- Which persistent data store should I use with Spark?
One of the most critical questions clients ask Gartner is for a recommendation on whether to use Hadoop, a different data store or a cloud object store for Spark’s persistence layer. The answer to this is, as you may have guessed, that it depends on the use case. Gartner advises clients to carefully consider the following dimensions to pick the optimal persistent data store:
- Data integrity and consistency requirements for reads and writes. For instance, for strong ACID compliance and transaction support, use relational databases, although some nonrelational databases have started providing strong consistency.
- Performance for read, write and execution throughput: Some persistent stores such as MemSQL support SQL Pushdown, which allows the Spark SQL query to be translated into the underlying database’s SQL and to be executed by the database.
- Cost: Some users of Spark have migrated to lower-cost storage based on object stores such as AWS S3.
- Data governance and security: Please see the section below on security.
- Elasticity needs: Cloud or on-premises selection criteria depend on the elasticity needs. In addition, Spark is a rapidly evolving platform, and some of the newest features are available only in the latest versions. Databricks and Apache deliver the latest versions, but other vendors may lag behind because they first need to integrate newer versions of Spark with other components of the stack before shipping the entire stack.
- How can I tune Spark?
Spark jobs can be constrained by CPU, memory and network resources. As Spark is an in-memory computer engine, its memory usage is a critical tuning factor. In order to conserve, memory, it is best to serialize RDD in memory. Spark provides two serialization options:
- Java serialization: This is the default, but it can be slow and inefficient in size.
- Kryo serialization: Kryo serialization, version 2, is not only faster but also more compact but it may not support all Serializable classes and requires pre-registration of classes. Details for tuning can be found at Tuning Spark.
One of the most important performance improvement options is data locality. When the data is located in the same JVM as the Spark code, it will perform the best. When that is not the case, one or the other has to move. If the Spark code is serialized, it is typically smaller than the data and hence moves to the node that has the data (e.g., the HDFS data node).
- Which cluster resource manager should I use with Spark?
Spark currently supports three options for cluster resource management: Hadoop ecosystem’s YARN, Spark’s own Standalone and Apache Mesos. Databricks and the community has added support for Kubernetes in version 2.3, but it is experimental. The selection for the appropriate choice is as follows:
- YARNshould be the choice if using the Hadoop ecosystem. That way, Spark jobs’ resources can be allocated in conjunction with resources needed for other components of the ecosystem such as Hive. YARN is mature and well-tested. YARN node manager is responsible for starting executor processes. YARN Application Master requests resources.
- Standaloneis the easiest option to stand up. It is the default option for non-Hadoop deployments, but it is limited in functionality compared to YARN. Standalone is best-suited for clusters where the primary function is to run Spark-only workloads. Spark slave is responsible for starting executors. The client must request resources.
- Apache Mesos: Unlike YARN, where one requests resources, Mesos “offers” resources that the user can accept or reject based on the organization’s scheduling policy. This makes Apache Mesos more flexible. On the other hand, it requires an additional skill set on the team.
- Kubernetessupport has been added in Spark 2.3 which provides a powerful and popular option
- Where should I host Spark?
The decision on whether to host Spark jobs in-house or in the cloud should be made in tandem with the overall strategy for the organization’s big data needs. If an organization wants to explore the latest version of Spark, then it must go either to the Apache website or to Databricks. Both these places allow users to download Spark for on-premises use. However, Databricks now offers its own cloud, which can be a starting point in your Spark journey.
Public cloud providers test new Spark versions internally before making them available to their customers. This introduces some delay, but thanks to the cloud providers’ regression tests, it reduces the overall risk of adopting bleeding edge releases. In addition, public cloud providers also offer IaaS services, such as AWS EC2, Google Compute Engine and Azure Virtual Machines, which give users a chance to run any Spark versions in the cloud.
- Can I ingest data directly from source systems, or should I use Spark in conjunction with Kafka?
With the structured streaming enhancements, Spark is an excellent platform for ingesting data directly from source systems. However, consider using Apache Kafka as your message hub if you have a large number of source systems that are sending data to Spark in high volumes. Apache Kafka also has persistent storage, which can be configured to manage the time interval for how long the data is retained. It also separates your producers from your consumers. If, for some reason, you need to rerun your analysis in Spark, you can call the Kafka API as long as the data is still persisted.
- How does Spark guarantee security?
- YARNauthentication is via Kerberos, similar to HDFS. It has service-level authorization.
- With Standalone, each node has to be configured independently with the secret key.
- Mesoscomes with default authentication module called Cyrus Simple Authentication and Security Layer (SASL), but any custom module can be used.
- What Is the SMACK stack?
An efficient platform decouples the producers of the data from the consumers in a reliable and lean manner. This removes the dependencies between the two, but what happens in the middle is crucial. The processing in the middle has its job cut out because it must reliably transform the raw data into formats that can be consumed by the users or downstream applications. Spark is being used extensively to provide the needed heavy lifting.
Spark is frequently the underpinnings of individual technologies or a stack of technologies. One such stack is known as SMACK, which consists of:
- Spark: for distributed processing as well as analytics
- Mesos: an Apache standard for distributed resource management
- Akka: a framework and toolkit to build and run real-time reactive apps
- Cassandra: a distributed database to act as the persistent store for the application
- Kafka: the ingestion and messaging layer for real-time data feeds
Another stack that has been popular in building web-based application has been the LAMP stack, consisting of Linux operating system, Apache web server, MySQL and PHP. PHP as the development language has been replaced by Perl and more recently by Python, while keeping the LAMP acronym intact. In the same vein, in AWS environments, Kafka has been replaced by AWS Kinesis for ingestion, hence maintaining the SMACK acronym.
- Can I run Spark in my virtualization software, or should I use the container technology?
Many customers have invested in virtualization technologies such as VMware vSphere, Citrix and Oracle VirtualBox. The short answer is that Spark has proven to run in various virtualization offerings. In extensive tests on VMware vSphere 6, Spark tasks ran 5% to 10% faster in the virtualized environment compared to physical servers. However, these results were achieved by carefully configuring vSphere6. For example, the memory of the virtual server must fit exactly into the server’s nonuniform memory allocation (NUMA) memory. Running Spark in a virtual environment has many advantages, such as:
- Separation of different versions of Spark: if Spark versions are not fully backward compatible, virtualization can allow organizations to deploy different versions of the platform in different virtual machines in order to test applications against multiple versions.
- Separation of workload: Organizations can separate development, testing, staging and production clusters and provision them independently.
- Utilization: Spark enables more-efficient utilization of the physical resources such as compute and memory.
Container technology has recently become popular, which leads people to ask why they need a virtual machine if they can use a container format, such as Docker. However, these are not overlapping technologies. Virtualization is hardware-level abstraction, while a container is an OS-level process isolation technology. A container is more suited at packaging a solution so that it can be easily deployed in various places. Spark works well inside a Docker container, as discussed earlier in this research.
- What tools and options are available to debug Spark code?
A typical big data application consists of multiple technologies, and when an error happens, it is very difficult to pinpoint the source. In addition, there may be issues with poor performance that require a root cause analysis to determine the issue. Often, the performance and reliability of Spark apps may be affected by more than code issues, such as data layout, container sizing, job configuration settings and resource allocation. For instance, IoT endpoint data may be streamed over Kafka topics to HDFS, which may be choking on the Spark streaming queue but it requires a manual diagnosis of logs across multiple applications and screens. This has led to “big data APM” (application performance management) products. Apache Ambari and Cloudera Manager are able to monitor Hadoop, while traditional APM vendors such as AppDynamics and New Relic have provided APM capabilities for web applications. Newer options such as Unravel Data and Pepperdata are now providing in-depth views into the big data platforms’ performance. Open-source options include Dr. Elephant, which was originally developed at LinkedIn. Finally, the cloud providers provide Hadoop and Spark debugging capabilities natively and support third-party vendors such as Datadog.
Streaming ingestion is a crucial component of the big data pipeline, so it is important to understand the different options in addition to Spark. Spark has connectors for all the options mentioned below. In addition, Spark provides Java, Scala, Python and R APIs to integrate with these streaming ingestion products:
- Apache Flume: This is an open-source streaming agent (JVM process) for Hadoop-based architectures. It is best-suited for reading log data and writing to HDFS and Hive. In addition to collecting and transporting large amounts of streaming data, it can also perform simple transformations such as aggregation of data and search-and-replace operations. It also integrates with Kafka. The data formats are row-based, such as Avro or Flume events.
- Kafka Streams: Kafka Connect was developed at LinkedIn for distributed messaging. It is available as an open-source version and is commercially supported by Confluent. Kafka has since grown from its messaging roots and has added a lightweight streaming library called Kafka Streams. It can handle streams of data that is out-of-order instead of being sequential. For more details on Kafka, please refer to “Enabling Streaming Architectures for Continuous Data and Events With Kafka.”
- Apache Flink: Flink is an open-source, highly distributed and native streaming framework that is commercially supported by data Artisans. It provides high throughput and very low latency. Due to its ability to do stateful computation, it provides fault tolerance. It can also be used for batch processing by ingesting data from relational data sources or files. It can perform complex event processing such as indexing, joins, unions and query optimization.
- Apache Storm: Developed at Twitter, Storm was one of the original stream processing frameworks. It is still very commonly used, even though Twitter developed Heron because Storm wasn’t meeting its scalability needs.
- Apache Apex: Apex is another Hadoop-based streaming and batch-processing engine. It utilizes the Hadoop ecosystem’s YARN to provide a scalable, low-latency, fault-tolerant and stateful platform. It ships with a library of operators called Malhar. In May 2018, DataTorrent, the company behind Apache Apex, abruptly shut down.
- Apache Beam: Beam is more than a streaming platform. It is more of a programming model with its own software development kit (SDK) to implement batch and streaming data processing jobs. It leverages some of the above-mentioned technologies such as Spark, Flink, Apex and Google Cloud Dataflow to execute pipelines. It is open source. Apache Beam can also be used for ETL and data integration tasks.
Cloud service providers (CSPs) have also developed streaming platforms to meet their needs. The notable ones are:
- Amazon Kinesis: This is the AWS platform for collecting streaming data (using Amazon Kinesis Firehose), processing it (using Amazon Kinesis Data Streams) and analyzing it (using Amazon Kinesis Data Analytics). It is tightly integrated with AWS S3, Redshift, AWS Lambda and EMR.
- Google Cloud Data flow: Google’s cloud-based offering supports both batch and streaming data. It can take streaming data from Google’s publish and subscribe mode from its middleware or batch data from databases and file systems. It collects data in a format called PCollections (for parallel connections). In addition, it also includes a library of transformations called PTransformation. Finally, like Spark, it supports SQL, but it uses Google BigQuery.
- Azure Stream Analytics: Stream Analytics is a collection of components that provide real-time streaming ingestion and analytics capabilities.