Spark has many configurations for tuning and in this post we will be looking at some the important and commonly used configurations. For a full list of configurations you can check the Spark documentation from the following link https://spark.apache.org/docs/latest/configuration.html
How to View and Change the Spark Configurations
There are multiple ways to edit Spark configurations. The first one is, you can set by using configuration files in your deployment folder.
The second option is to use command line options while submitting your job with –conf flag.
spark-submit –conf “spark.sql.shuffle.partitions=10” –conf “spark.executor.memory=1g”
Third option is to set configs directly in Spark application.
import findspark findspark.init() from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .config("spark.sql.shuffle.partitions", 10) \ .config("spark.executor.memory", "2g") \ .master("local[*]") \ .appName('conftestapp') \ .getOrCreate() print(spark.sparkContext.getConf().getAll())
The last option is to set the configurations via spark shell. After you start your spark shell you can set or get application configs.
>>> spark.conf.set("spark.sql.shuffle.partitions") '200' >>> spark.conf.set("spark.sql.shuffle.partitions", 10) >>> spark.conf.get("spark.sql.shuffle.partitions") '10'
Now that we know how to edit configurations, lets look at the important configurations to tune the applications with large workloads.
By default Spark is using static resource allocation. This means that if more resources are needed later, Spark can’t allocate extra resources.
Instead of static resource allocation, if dynamic resource allocation is used Spark can request more (or less) compute resources whenever necessary. This is useful, when you don’t know what is your input data load and you have some high load during peak hours.
Enabling this setting will allow Spark to better utilize the resources. To enable dynamic resource allocation you can use the following settings
By default spark.dynamicAllocation.enabled is set to false. When it is enabled like above, Spark will start with 5 executors to start and increase the executors (up to maxExecutors limit) when schedulerBackLogTimeout is exceeded which means if there are pending tasks that have not scheduled for over 1 minute. On the other hand, if executor is finished its task and idle for 2 minutes (executorIdleTimeout) then Spark will terminate the executor.
Executor Memory and Shuffle Service
The amount of memory for each executor is configured with spark.executor.memory setting. Total memory assigned here, is divided in three sections;
- Execution memory: It is used for joins, sort, aggregations and shuffles.
- Storage memory: It is primarily used for caching user data structures and partitions derived from DataFrames.
- Reserved memory: Reserved memory to prevent out of memory errors.
300 MB is reserved and the rest is split for %60 for execution and %40 for storage.
spark.driver.memory: Default is 1g (1 GB). This is the amount of memory allocated to the Spark driver to receive data from executors
spark.shuffle.file.buffer: Default is 32 KB. Increasing this setting will allow Spark to do more buffering before writing final map results to disk
spark.shuffle.registration.timeout: Timeout in milliseconds for registration to the external shuffle service. You can increase it to prevent timeouts
spark.shuffle.registration.maxAttempts: You can increase together with registration.timeout setting if needed
spark.file.transferTo: Default is true. Setting it to false will force Spark to use the file buffer to transfer files before finally writing to disk; this will decrease the I/O activity
spark.shuffle.unsafe.file.output.buffer: Default is 32 KB. This controls the amount of buffering possible when merging files during shuffle operations. In general, large values perform better for larger workloads
spark.io.compression.lz4.blockSize: You can decrease the size of the shuffle file by increasing the compressed size of the block
Spark gets its power by running the computations in parallel, in order to achieve maximum performance you need to tune the parallelism with an optimal value. To maximize parallelism optimal way is to have at least as many partitions as there are cores on the executor.
So by controlling the number of partitions, you can control and maximize the parallelism. In spark you can configure the bytes of a partition by using spark.sql.files.maxPartitionBytes setting. By default it is 128 MB.
Partitions are also created when creating a DataFrames or RDDs, in this case you can explicitly set the number of partitions. Below is an example of setting the minPartitions setting or by using RDD repartition method
spark.sparkContext.textFile("sample_file.txt", 20) rdd = spark.sparkContext.textFile("sample_file.txt") rdd.repartition(20) print(rdd.getNumPartitions())
And there is shuffle partitions created during shuffle stage. By default it is 200 and can be configured using spark.sql.shuffle.partitions. Keep in mind that for local executions and small workloads, 200 is really high so you may want to reduce it.
flightData = spark.read.option("inferSchema", "true").option("header", "true").csv("flight-data.csv") flightData.Take(3) # By default, when we perform a shuffle Spark will output two hundred shuffle partitions. We will set this value to five in order to reduce the number of the output partitions from the shuffle from 200 to 5 spark.conf.set("spark.sql.shuffle.partitions", "5") flightData2015.sort("count").take(2)
Shuffle partitions which are known as wide transformations (https://databricks.com/glossary/what-are-transformations) are created during e.g. groupBy() operations and they also use I/O resources. spark.local.directory setting specifies which disk location will be used for the shuffle partition writes so having a disk with a high performance will also help boost the performance.
These are some of the settings which you may use to tune your spark applications especially when you have a high load.