Optimizing and Tuning Apache Spark

Spark has many configurations for tuning and in this post we will be looking at some the important and commonly used configurations. For a full list of configurations you can check the Spark documentation from the following link https://spark.apache.org/docs/latest/configuration.html

How to View and Change the Spark Configurations

There are multiple ways to edit Spark configurations. The first one is, you can set by using configuration files in your deployment folder.

The second option is to use command line options while submitting your job with –conf flag.

spark-submit –conf “spark.sql.shuffle.partitions=10” –conf “spark.executor.memory=1g”

Third option is to set configs directly in Spark application.

import findspark
findspark.init()
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder \
                    .config("spark.sql.shuffle.partitions", 10) \
                    .config("spark.executor.memory", "2g") \
                    .master("local[*]") \
                    .appName('conftestapp') \
                    .getOrCreate()
    print(spark.sparkContext.getConf().getAll())

The last option is to set the configurations via spark shell. After you start your spark shell you can set or get application configs.

>>> spark.conf.set("spark.sql.shuffle.partitions")
'200'
>>> spark.conf.set("spark.sql.shuffle.partitions", 10)
>>> spark.conf.get("spark.sql.shuffle.partitions")
'10'

Now that we know how to edit configurations, lets look at the important configurations to tune the applications with large workloads.

Resource Allocation

By default Spark is using static resource allocation. This means that if more resources are needed later, Spark can’t allocate extra resources.

Instead of static resource allocation, if dynamic resource allocation is used Spark can request more (or less) compute resources whenever necessary. This is useful, when you don’t know what is your input data load and you have some high load during peak hours.

Enabling this setting will allow Spark to better utilize the resources. To enable dynamic resource allocation you can use the following settings

spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 5
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.executorIdleTimeout 2min

By default spark.dynamicAllocation.enabled is set to false. When it is enabled like above, Spark will start with 5 executors to start and increase the executors (up to maxExecutors limit) when schedulerBackLogTimeout is exceeded which means if there are pending tasks that have not scheduled for over 1 minute. On the other hand, if executor is finished its task and idle for 2 minutes (executorIdleTimeout) then Spark will terminate the executor.

Executor Memory and Shuffle Service

The amount of memory for each executor is configured with spark.executor.memory setting. Total memory assigned here, is divided in three sections;

  • Execution memory: It is used for joins, sort, aggregations and shuffles.
  • Storage memory: It is primarily used for caching user data structures and partitions derived from DataFrames.
  • Reserved memory: Reserved memory to prevent out of memory errors.

300 MB is reserved and the rest is split for %60 for execution and %40 for storage.

spark.driver.memory: Default is 1g (1 GB). This is the amount of memory allocated to the Spark driver to receive data from executors

spark.shuffle.file.buffer: Default is 32 KB. Increasing this setting will allow Spark to do more buffering before writing final map results to disk

spark.shuffle.registration.timeout: Timeout in milliseconds for registration to the external shuffle service. You can increase it to prevent timeouts

spark.shuffle.registration.maxAttempts: You can increase together with registration.timeout setting if needed

spark.file.transferTo: Default is true. Setting it to false will force Spark to use the file buffer to transfer files before finally writing to disk; this will decrease the I/O activity

spark.shuffle.unsafe.file.output.buffer: Default is 32 KB. This controls the amount of buffering possible when merging files during shuffle operations. In general, large values perform better for larger workloads

spark.io.compression.lz4.blockSize: You can decrease the size of the shuffle file by increasing the compressed size of the block

Parallelism

Spark gets its power by running the computations in parallel, in order to achieve maximum performance you need to tune the parallelism with an optimal value. To maximize parallelism optimal way is to have at least as many partitions as there are cores on the executor.

So by controlling the number of partitions, you can control and maximize the parallelism. In spark you can configure the bytes of a partition by using spark.sql.files.maxPartitionBytes setting. By default it is 128 MB.

Partitions are also created when creating a DataFrames or RDDs, in this case you can explicitly set the number of partitions. Below is an example of setting the minPartitions setting or by using RDD repartition method

spark.sparkContext.textFile("sample_file.txt", 20)

rdd = spark.sparkContext.textFile("sample_file.txt")
rdd.repartition(20)
print(rdd.getNumPartitions())

And there is shuffle partitions created during shuffle stage. By default it is 200 and can be configured using spark.sql.shuffle.partitions. Keep in mind that for local executions and small workloads, 200 is really high so you may want to reduce it.

flightData = spark.read.option("inferSchema", "true").option("header", "true").csv("flight-data.csv")
flightData.Take(3)
# By default, when we perform a shuffle Spark will output two hundred shuffle partitions. We will set this value to five in order to reduce the number of the output partitions from the shuffle from 200 to 5
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

Shuffle partitions which are known as wide transformations (https://databricks.com/glossary/what-are-transformations) are created during e.g. groupBy() operations and they also use I/O resources. spark.local.directory setting specifies which disk location will be used for the shuffle partition writes so having a disk with a high performance will also help boost the performance.

These are some of the settings which you may use to tune your spark applications especially when you have a high load.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Website Powered by WordPress.com.

Up ↑

%d bloggers like this: