An Introduction to Apache Spark

An Introduction to Apache Spark

Image source: Apache Foundation, Logo is a trademark of the Apache Foundation - usage for articles is allowed, see: https://www.apache.org/foundation/marks/

Summary

TL;DR I will show you how to debug your first spark application and go into details on what you can monitor. After you are able to monitor, I will help you to understand the graphs are produced by Apache Spark.

  • Introduction to Apache Hadoop
  • What is Apache Spark?
  • Components
  • Continuous Integration (CI)
  • Integration Test
  • And it crashed
  • Memory Management in Spark
  • Graphite / Grafana Metrics with a lot of insides
  • Lowest Level Debugging
  • Alternatives to Spark

Apache Hadoop

Before we start speaking about Apache spark, I want to wrap up the history about Apache Spark.

Google released in 2004 a white paper about MapReduce but they don't provide the implementation to it. The Apache Hadoop project implemented this paper and released it to the public. But what is MapReduce now?

MapReduce

MapReduce defines a programming pattern, where the data records get transformed into something else and then these results are reduced again. A typical "Hello World" application in MapReduce is a distributed word count. You have several files of text and want to count (and sum) the amount of each word in all documents.

To count the words in a "classic" way you may come up with such an implementation (pseudo-code):

result = {}

for l in open(f).lines():
    for token in l.split(" "):
        if token in result:
            result[token] += 1
        else:
            result[token] = 1
return result

This code is slow as the processing of the files is way faster than the reading of the file. Moreover, these counts can be done in parallel and can then be summed to reduce the waiting on the reading.

Lets define a map_line function, that gets as input a line of text. This will handle the splitting of the line and perform a word count on a line basis.

The next function will be a reduce_lines function, that take a list of mapped lines and reduces them to a dict again.

You can run the final code on your own if you want to play with it.

from typing import List

def map_line(line: str) -> dict:
    result = {}
    for token in line.split(" "):
        if token in result:
            result[token] += 1
        else:
            result[token] = 1

    return result

def reduce_lines(lines: List[dict]) -> dict:
    total_result = {}
    for line in lines:
        for word, count in line.items():
            if word in total_result:
                total_result[word] += count
            else:
                total_result[word] = count

    return total_result

data = ["test line we want to count", "and this line I want to count as well"]
mapped = map(lambda sentence: map_line(sentence), data)
reduced = reduce_lines(mapped)
print(reduced) # {'test': 1, 'line': 2, 'we': 1, 'want': 2, 'to': 2, 'count': 2, 'and': 1, 'this': 1, 'I': 1, 'as': 1, 'well': 1}

This separation of functions allows all maps to be run in parallel (what we don't do in this example). This is possible as the map is a pure function. A pure function don't share any state from outside of the function, which means it has no side effects. Running this code in parallel will result in a longer waiting for the underlying hard drive.

Instead of trying to improve the performance on one server, I will now introduce the Hadoop Distributed File System (HDFS).

HDFS

To allow parallel processing without the IO bound of one server, the data is distributed on several servers. When using an appropriate data format like csv, json (each object in one line), parquet or avro - this data can be read in parallel by an offset on several servers. Each server will only read a chunk of the entire file, allowing a high IO performance.

The downside is, that the MapReduce API is not intuitive and after each transformation, you have to store the data again. The next MapReduce job will read the data again and continue - you will "soon" get IO bound again.

Google removed MapReduce from their stack "recently" and replaced it by something else (we can guess, that's Apache Spark). I would consider MapReduce as deprecated technology but it's still good to know to understand Apache Spark better.

YARN

YARN stands for "Yet another resource negotiator" - this part of Hadoop will tell the jobs what server they can run on. YARN is also responsible for terminating applications that consume more memory than we allowed them to consume. We need to define memory limits as we can have a mixed setup of servers with a lot a RAM and with less memory - YARN will put the application where it can run and enough memory is available based on the memory settings we provide.

Pick a Cluster

Do you want to setup your own Hadoop? Sure you can to that but its not a trivial task.

  • Buy Hardware (estimate at least 3 times the size of files you want to store for replications and backup)
  • Setup Network
  • generate ssh's keys that all severs can reach each other
  • install java
  • Install required components of the hadoop stack
  • deploy all the settings to the clients

Do you really want to know each detail of your cluster and have the appropriate hardware and how-to available? For a local installation you way use a distribution that help with the details.

How much hardware do you need? This depends on the amount of data you are dealing with and how long your jobs are running.

As rule of thumb, it would at least go for a ration of 1 CPU to 8 GB of ram for each server in the cluster, that is doing computing tasks. Most of the jobs I am writing on a huge production cluster at least need 8 GB of ram. Some jobs even need 32GB of ram. When you need more, your code have a high chance to have space for improvement / reducing the memory usage.

In case you don't have a long running pipeline, you can try out managed solutions.

Managed Cluster

If you don't have a Hadoop cluster, you can put your data on common cloud storage like S3 from AWS to improve your processing speed or search for a managed solution. You can directly read from s3 within Apache Spark and other cloud vendors should work as well but I haven't tried.

Still you don't know what Apache Spark is and why to use it - let's fix it.

What is Apache Spark?

At its core, Apache spark is a computation framework, often used in combination with a HDFS. Apache Spark has some key improvements compared to plain MapReduce that I would like to mention. This makes it a lot fast than plain MapReduce.

Caching

Apache Spark will cache the data you are reading from your source. In case you do something more with the data, you don't need to read it again. But you can also use caching explicitly, to help Apache Spark to improve the speed of your code even more. Say you are doing a heavy computation and then want to output the data in different data formats - instead of doing the computation twice, you can tell Apache Spark to cache the result just before you want to write it down.

At first the data will be processed normally but the second write operation will just use the cached data that was processed already and only apply minimal transformations required to save the data in a different format. Smart caching can also reduce the used memory of your jobs, when only one copy of the data have to be stored in memory instead of several "copies" of it - for a cross join for example.

Lazy Evaluation / DAG

Apache Spark will create a directed, acyclic graph of the processing steps down to any write operations. This graph gets optimized by Apache Spark and then executed. For example, when you first read all files from a directory but then you are really just interested in data from a particular date, all the other files can be skipped by Apache Spark. This will require some processing steps again (for the data storage) to allow Apache Spark to do such filtering as early as possible and get all optimizations. Lets now have a high level overview about Apache Spark components:

Components

Apache Spark History Server

This is a webserver that allows to view the current steps that Apache Spark is executing and give you a lot details what is happening.

Apache Spark Driver

Your code gets first copied from a local system to YARN. YARN will check if resources are available (that you defined when you submitted the application) and send your application to the servers that will run your code. The driver does all low level coordination of the tasks with the executors. If you are interested on more details, checkout the links at the bottom of this post.

Apache Spark Executor

These servers do the heavy lifting and consume the memory of your job. In general, you have only one driver and a lot of executors. The more you can get running, the better in general as long as you don't hit memory issues.

Continuous Integration (CI)

Do you feel ready for your first job? Great, you will find a lot of tutorials online that you can follow that get in a lot more detail than I do now. I want to give you some help for setting up your first application.

Add a CI Pipeline for your project. I recommend using Apache Spark with Scala as you otherwise have an additional layer of abstraction that you need to debug.

The second point is performance: When using Apache Spark with python (that's supported by Apache Spark as pyspark) the Scala / Java objects get serialized back and forth with python, reducing the performance of the overall application. Additionally, the deployment of pyspark is a bit more different as you have to manage the python dependencies on the server (for example the python version) and other libraries you are using.

After you have coded your application, you should add integration tests. I found it useful to work with real data in integration tests. For creating the dataset, use sampling, filtering or only one partition of the data. A partition is a chunk of the full dataset. You can see a partition in hdfs by the part prefix.

Make sure to not overflow your version control with to much data. I try to create datasets that are about 1MB in size for integration tests or generate them inside the test and remove them afterwords.

Deployment

Than you need to submit the application to the cluster - emetriq is using Jenkins for sequential jobs and airflow for complex job dependencies. Why do you need airflow? Given job A,B and C. Job C needs the data of the previous jobs but both can be run in parallel. When you just submit them directly, YARN may launch the C job directly when resources are available but it directly fail because the data is not available.

And it crashed

There is a high chance that your first try will crash the application. Even after developing a lot of ETL Jobs in Apache Spark, mistakes are made easy.

Memory Management

When not enough resources are available for an application, then it needs to wait (YARN put the application in the ACCEPTED state).

Driver

When your driver gets out of memory, all executors receive a signal term, triggering a shut down. You can get quite fast out of memory, when you do a collect somewhere in your code - this will move the entire data that was calculated up until this point to the driver. Use collect only on a small data set, like a integration test or for debugging.

Executor

Most of the time, the executors will crash. Some ideas how to handle this issue:

Big Files / Repartition

A repartition will shuffle the data around to the amount of partitions you desire. This is a expensive operation, as potentially all data have to be moved via network somewhere else - use this only once and stick on the number of partitions when possible.

You may ask, why not have everything in one file? As the files will be replicated to up to three nodes, you may be unlucky and read everything from this 3 nodes only. Than your application is IO Bound again. When you do the repartition, your data is distributed to all available executors, speeding up all following steps.

Make sure the data is even distributed on each partition. A fast check can be the file size of the partitions. I would recommend to target them close to 128 MB times n, as 128 MB is the default chunk size of HDFS. When your data passes this size, a new chunk will be created, regardless how much data you add. In case the chunk is not filled or close to empty, the HDFS overhead is huge resulting again in lower overall performance.

Imaging a table that only have integer numbers or only a few string. For this case, you can also split the data by the amount of records.

For small files, I suggest to use the popular pandas library or follow my nest suggestion.

Small Files

Use a messaging service like Apache Kafka to buffer your incoming data. A second service like Apache Flume, can read from this message bus and write the data down to HDFS in bigger chunks what work better for further processing.

You can also directly read from Apache Kafka Message using Spark Streaming.

Caching

When you are creating features based on pairs of similar things, caching can also reduce the required memory (instead of having both datasets with slight difference, you cache it only once).

Use Dataframes instead of Datasets

A Dataframe is the datastructure that allows you to work with a nice API with your distributed data. You can use a typed version of a dataframe as well, named dataset.

A Datasets is created by running a map on the entire datasets and serializing the result to a case class (dataclass in python, POJO in Java) - the problems start when you only use a fragment of the attributes of the case class - as all memory is reserved after the mapping.

The second problem is with filtering: Filters can not be pushed down to the read operation, making the initial read a lot more data "producing" than required for your job. You can get an Out Of Memory here again. Don't use datasets.

Graphite

You still have no idea why your job is crashing? You need more insides.

Note that the following metrics are only written when run in cluster mode / on YARN.

Emetriq use graphite to log the production jobs. The integration to graphite is build into Apache Spark. Pass the following arguments to your spark-submit command, to write several (internal) Apache Spark metrics to graphite that you can use for further investigations:

--conf "spark.metrics.conf.*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink"
--conf "spark.metrics.conf.*.sink.graphite.host=graphiteHost"
--conf "spark.metrics.conf.*.sink.graphite.port=2003"
--conf "spark.metrics.conf.*.sink.graphite.period=10"
--conf "spark.metrics.conf.*.sink.graphite.unit=seconds"
--conf "spark.metrics.conf.*.sink.graphite.prefix=prod.<app>"
--conf "spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource"

Source: https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Dashboard/Spark_metrics_config_options.md

Application Id

Key: prod.<app>.<application_id>

Inside this namespace, several keys are created. The first is the YARN Name of the application. When you run the application several times, a new application_id is created.

The next level in the "application type" that is run for this job. It can be either driver, numeric - indicating an executor or applicationMaster

Shared Merics

Lets now get into the details of the first interesting node. You will find this information on the driver and on each executor.

jvm

Key: prod.<app>.<application_id>.*.jvm

All following values are in bytes. To get Gigabytes (GB), divide by 1073741824 (2^30).
  • init represents the amount of memory requested at jvm startup.
  • used is the current amount in use. A very interisting value, as it shows you how for each executor the memory distribution is, giving you insides how your data is distributed - maybe skewed. A repartition may help here.
  • commited is the memory that can safe be requested by the jvm.
  • max the maximum amount of memory we can get.

Source: https://docs.oracle.com/javase/8/docs/api/java/lang/management/MemoryUsage.html

The last three can change over time while running the application as for example other processes from the operating system may require more memory than they currently have.

To have a good visual representation of this values, calculate the derivative of it. Check the average memory consumption and the min max. When this values are spread, than this can be an indicator for a unbalanced dataset. To fix it, see above.

CodeGenerator

Key: prod.<app>.<application_id>.*.CodeGenerator

This is the amount of time spend for code generation. It is the sum again over time, so use the derivative for a timed view. Heavy code generation may be a hint for a complex function that you can try to optimize by adjusting the execution query or refactoring your code.

HiveExternalCatalog

Key: prod.<app>.<application_id>.<number>.HiveExternalCatalog

A metastore for hive queries. As we are not using hive, I can't give you more details.

Driver

Key: prod.<app>.<application_id>.driver

Now we will look at metrics that are only present on the driver node.

BlockManager

Key: prod.<app>.<application_id>.driver.BlockManager

Block Manager (BlockManager) is a key-value store for blocks of data (simply blocks) in Spark.

Source: https://mallikarjuna_g.gitbooks.io/spark/content/spark-blockmanager.html

This will show you the amount (the size in MB) of Data Blocks (RDDs) you are using as runtime.

DAGScheduler

Key: prod.<app>.<application_id>.driver.DAGScheduler

Here you can get the amount of job (s) you are running withing your application and the stage (s) with their states. It is interesting if a particular step in your application fails and you want to take a closer look at it.

Also you can find messageProcessingTime - that may be useful for debugging network issues.

ExecutorAllocationmanager

Key: prod.<app>.<application_id>.driver.ExecutorAllocationmanager

When dynamic allocation is enables (what you should use imo), it shows you the creation and removal of the executors. In general, executors should spawn at the beginning and then terminate together when your job is finished. Having a task run for a long time while other are done, you have another hit for a imbalanced dataset. Your executor can also be terminated when another YARN application is requesting resources and you have more than you should have.

LiveListenerBus

Key: prod.<app>.<application_id>.driver.LiveListenerBus

LiveListenerBus is an event bus to dispatch Spark events to registered SparkListeners.

Source: https://books.japila.pl/apache-spark-internals/scheduler/LiveListenerBus/

This may be again interesting when having network issues or a overloaded driver.

Executor

Let's now jump to the executors. as I mentioned earlier, every number that follows the application id, is a collection of metrics for one particular executor. Depending on your cluster size, you can have a lot of them. I suggest to only plot the upper and lower x of the executors, depending on the sub keys that are following.

Key: prod.<app>.<application_id>.<number>

ExternalShuffle

Key: prod.<app>.<application_id>.<number>.ExternalShuffle

In fact, external shuffle service can be summarized to a proxy that fetches and provides block files. It doesn't duplicate them. Instead it only knows where they're stored by each of node's executors.

Source: https://www.waitingforcode.com/apache-spark/external-shuffle-service-apache-spark/read

This helps scaling and fault tolerance when an executor gets removed due to dynamic allocation.

executor

Key: prod.<app>.<application_id>.<number>.executor

The executor namespace again have a long list of different metrics. I found most interesting the jvmGCTime. This is the amount the jvm needs to run garbage collection. When you see a peak, it shows that this task may require a lot of memory that may not be available anymore or a lot of objects are created. GC optimization for jobs is a big topic on its own - you will find a lot of resources on this topic in the internet. A high GC time is bad (like for every jvm application).

ApplicationMaster

Application Master is a framework-specific entity charged with negotiating resources with ResourceManager(s) and working with NodeManager(s) to perform and monitor application tasks. Each application running on the cluster has its own, dedicated Application Master instance.

Source: https://luminousmen.com/post/spark-anatomy-of-spark-application

Lowest Level Debugging

That was a lot, I know. The last option you can try is debugging the code in a spark-shell environment, where you run the code interactive. Its time consuming but gives great feedback what is wrong.

The laziness of Apache Spark can point you to a location that is not the problematic code (for example a writer class at the end of your code).

After each line you try to count the amount of results. Count is a action, triggering computation and "removing" the laziness from Apache Spark. Count is also a cheap operation, as it can be run very good in parallel. When a particular line than crash, you can even try fragments of the line to get to the point what operation is the problem.

Take a close look at the amount of records that you are processing / generating after each count and make sure the number don't explode.

When count don't work, you can try to add show statements instead. Compared to count, show will really do all transformations and computation and give you a even better feedback when it crashes - buts its even more time consuming.

More Content to visit

If you are even more interested of optimizing Apache Spark or the jvw, I highly recommend:

Alternatives of spark

I tried dask. The plus point is, that it use plain python - compared to Apache Spark - that needs a java installation as well.

But the debugging was not fun (at least on IntelliJ) - all computation was triggered directly, making an interactive debugging very slow and not productive.

Like? Share! Thanks for reading.