The Spark optimization pack allows tuning applications running on the Apache Spark framework. Through this optimization pack, Akamas is able to explore the space of the Spark parameters in order to find the configurations that best optimize the allocated resources or the execution time.
To achieve these goals the optimization pack provides parameters that focus on the following areas:
Driver and executors' resources allocation
Parallelism
Shuffling
Spark SQL
Similarly, the bundled metrics provide visibility on the following statistics from the Spark History Server:
Execution time
Executors' resource usage
Garbage collection time
Component Type | Description |
---|---|
Here’s the command to install the Spark optimization pack using the Akamas CLI:
Spark Application 2.2.0
Spark Application 2.3.0
Spark Application 2.4.0
This page describes the Optimization Pack for Spark Application 2.2.0.
Metric | Unit | Desciption |
---|---|---|
Metric | Unit | Description |
---|---|---|
The following tables show a list of constraints that may be required in the definition of the study, depending on the tuned parameters:
The overall resources allocated to the application should be constrained by a maximum and, sometimes, a minimum value:
the maximum value could be the sum of resources physically available in the cluster, or a lower limit to allow the concurrent execution of other applications
an optional minimum value could be useful to avoid configurations that allocate executors that are both small and scarce
Metric | Unit | Description |
---|---|---|
Metric | Unit | Description |
---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
spark_application_duration
milliseconds
The duration of the Spark application
spark_job_duration
milliseconds
The duration of the job
spark_stage_duration
milliseconds
The duration of the stage
spark_task_duration
milliseconds
The duration of the task
spark_driver_rdd_blocks
blocks
The total number of persisted RDD blocks for the driver
spark_driver_mem_used
bytes
The total amount of memory used by the driver
spark_driver_disk_used
bytes
The total amount of disk used for RDDs by the driver
spark_driver_cores
cores
The total number of concurrent tasks that can be run by the driver
spark_driver_total_input_bytes
bytes
The total number of bytes read from RDDs or persisted data by the driver
spark_driver_total_tasks
tasks
The total number of tasks run for each the driver
spark_driver_total_duration
milliseconds
The total amount of time spent by the driver running tasks
spark_driver_max_mem_used
bytes
The maximum amount of memory used by the driver
spark_driver_total_jvm_gc_duration
milliseconds
The total amount of time spent by the driver's JVM doing garbage across all tasks
spark_driver_total_shuffle_read
bytes
The total number of bytes read during a shuffle by the driver
spark_driver_total_shuffle_write
bytes
The total number of bytes written in shuffle operations by the driver
spark_driver_used_on_heap_storage_memory
bytes
The amount of on-heap memory used by the driver
spark_driver_used_off_heap_storage_memory
bytes
The amount of off-heap memory used by the driver
spark_driver_total_on_heap_storage_memory
bytes
The total amount of available on-heap memory for the driver
spark_driver_total_off_heap_storage_memory
bytes
The total amount of available off-heap memory for the driver
spark_executor_max_count
executors
The maximum number of executors used for the application
spark_executor_rdd_blocks
blocks
The total number of persisted RDD blocks for each executor
spark_executor_mem_used
bytes
The total amount of memory used by each executor
spark_executor_disk_used
bytes
The total amount of disk used for RDDs by each executor
spark_executor_cores
cores
The number of cores used by each executor
spark_executor_total_input_bytes
bytes
The total number of bytes read from RDDs or persisted data by each executor
spark_executor_total_tasks
tasks
The total number of tasks run for each the executor
spark_executor_total_duration
milliseconds
The total amount of time spent by each executor running tasks
spark_executor_max_mem_used
bytes
The maximum amount of memory used by each executor
spark_executor_total_jvm_gc_duration
milliseconds
The total amount of time spent by each executor's JVM doing garbage collection across all tasks
spark_executor_total_shuffle_read
bytes
The total number of bytes read during a shuffle by each executor
spark_executor_total_shuffle_write
bytes
The total number of bytes written in shuffle operations by each executor
spark_executor_used_on_heap_storage_memory
bytes
The amount of on-heap memory used by each executor
spark_executor_used_off_heap_storage_memory
bytes
The amount of off-heap memory used by each executor
spark_executor_total_on_heap_storage_memory
bytes
The total amount of available on-heap memory for each executor
spark_executor_total_off_heap_storage_memory
bytes
The total amount of available off-heap memory for each executor
spark_stage_shuffle_read_bytes
bytes
The total number of bytes read in shuffle operations by each stage
spark_task_jvm_gc_duration
milliseconds
The total duration of JVM garbage collections for each task
spark_task_peak_execution_memory
bytes
The sum of the peak sizes across internal data structures created for each task
spark_task_result_size
bytes
The size of the result of the computation of each task
spark_task_result_serialization_time
milliseconds
The time spent by each task serializing the computation result
spark_task_shuffle_read_fetch_wait_time
milliseconds
The time spent by each task waiting for remote shuffle blocks
spark_task_shuffle_read_local_blocks_fetched
blocks
The total number of local blocks fetched in shuffle operations by each task
spark_task_shuffle_read_local_bytes
bytes
The total number of bytes read in shuffle operations from local disk by each task
spark_task_shuffle_read_remote_blocks_fetched
blocks
The total number of remote blocks fetched in shuffle operations by each task
spark_task_shuffle_read_remote_bytes
bytes
The total number of remote bytes read in shuffle operations by each task
spark_task_shuffle_read_remote_bytes_to_disk
bytes
The total number of remote bytes read to disk in shuffle operations by each task
spark_task_shuffle_write_time
nanoseconds
The time spent by each task writing data on disk or on buffer caches during shuffle operations
spark_task_executor_deserialize_time
nanoseconds
The time spent by the executor deserializing the task
spark_task_executor_deserialize_cpu_time
nanoseconds
The CPU time spent by the executor deserializing the task
spark_task_stage_shuffle_write_records
records
The total number of records written in shuffle operations broken down by task and stage
spark_task_stage_shuffle_write_bytes
records
The total number of bytes written in shuffle operations broken down by task and stage
spark_task_stage_shuffle_read_records
records
The total number of records read in shuffle operations broken down by task and stage
spark_task_stage_disk_bytes_spilled
bytes
The total number of bytes spilled on disk broken down by task and stage
spark_task_stage_memory_bytes_spilled
bytes
The total number of bytes spilled on memory broken down by task and stage
spark_task_stage_input_bytes_read
bytes
The total number of bytes read, broken down by task and stage
spark_task_stage_input_records_read
records
The total number of records read, broken down by task and stage
spark_task_stage_output_bytes_written
bytes
The total number of bytes written, broken down by task and stage
spark_task_stage_output_records_written
records
The total number of records written, broken down by task and stage
spark_task_stage_executor_run_time
nanoseconds
The time spent by each executor actually running tasks (including fetching shuffle data) broken down by task, stage and executor
spark_task_stage_executor_cpu_time
nanoseconds
The CPU time spent by each executor actually running each task (including fetching shuffle data) broken down by task and stage
driverCores
integer
cores
You should select your own default
You should select your own domain
yes
The number of CPU cores assigned to the driver in cluster deploy mode.
numExecutors
integer
executors
You should select your own default
You should select your own domain
yes
Number of executors to use. YARN only.
totalExecutorCores
integer
cores
You should select your own default
You should select your own domain
yes
Total number of cores for the application. Spark standalone and Mesos only.
executorCores
integer
cores
You should select your own default
You should select your own domain
yes
Number of CPU cores for an executor. Spark standalone and YARN only.
defaultParallelism
integer
partitions
You should select your own default
You should select your own domain
yes
Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
broadcastBlockSize
integer
kilobytes
4096
256
→ 131072
yes
Size of each piece of a block for TorrentBroadcastFactory.
schedulerMode
categorical
FIFO
FIFO
, FAIR
yes
Define the scheduling strategy across jobs.
driverMemory
integer
megabytes
You should select your own default
You should select your own domain
yes
Amount of memory to use for the driver process.
yarnDriverMemoryOverhead
integer
megabytes
384
384
→ 65536
yes
Off-heap memory to be allocated per driver in cluster mode. Currently supported in YARN and Kubernetes.
executorMemory
integer
megabytes
You should select your own default
You should select your own domain
yes
Amount of memory to use per executor.
yarnExecutorMemoryOverhead
integer
megabytes
384
384
→ 65536
yes
Off-heap memory to be allocated per executor. Currently supported in YARN and Kubernetes.
memoryOffHeapEnabled
categorical
false
true
, false
yes
If true, Spark will attempt to use off-heap memory for certain operations.
memoryOffHeapSize
integer
megabytes
0
0
→ 16384
yes
The absolute amount of memory in bytes which can be used for off-heap allocation.
reducerMaxSizeInFlight
integer
megabytes
48
1
→ 1024
yes
Maximum size of map outputs to fetch simultaneously from each reduce task in MB.
shuffleFileBuffer
integer
kilobytes
32
1
→ 2048
yes
Size of the in-memory buffer for each shuffle file output stream in KB.
shuffleCompress
categorical
true
true
, false
yes
Whether to compress map output files.
shuffleServiceEnabled
categorical
true
true
, false
yes
Enables the external shuffle service. This service preserves the shuffle files written by executors so the executors can be safely removed.
dynamicAllocationEnabled
categorical
true
true
, false
yes
Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Requires spark.shuffle.service.enabled to be set.
dynamicAllocationExecutorIdleTimeout
integer
60
1
→ 3600
yes
If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed.
dynamicAllocationInitialExecutors
integer
executors
You should select your own default
You should select your own domain
yes
Initial number of executors to run if dynamic allocation is enabled.
dynamicAllocationMinExecutors
integer
executors
You should select your own default
You should select your own domain
yes
Lower bound for the number of executors if dynamic allocation is enabled.
dynamicAllocationMaxExecutors
integer
executors
You should select your own default
You should select your own domain
yes
Upper bound for the number of executors if dynamic allocation is enabled.
sqlInMemoryColumnarStorageCompressed
categorical
true
true
, false
yes
When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.
sqlInMemoryColumnarStorageBatchSize
integer
records
1000
1
→ 100000
yes
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
sqlFilesMaxPartitionBytes
integer
bytes
134217728
1024
→ 1073741824
yes
The maximum number of bytes to pack into a single partition when reading files.
sqlFilesOpenCostInBytes
integer
bytes
4194304
262144
→ 67108864
yes
The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition.
compressionLz4BlockSize
integer
bytes
32
8
→ 1024
yes
Block size in bytes used in LZ4 compression.
serializer
categorical
org.apache.spark.serializer.KryoSerializer
org.apache.spark.serializer.JavaSerializer
, org.apache.spark.serializer.KryoSerializer
yes
Class to use for serializing objects that will be sent over the network or need to be cached in serialized form.
kryoserializerBuffer
integer
bytes
64
8
→ 1024
yes
Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker.
driverMemory + executorMemory * numExecutors < MEMORY_CAP
The overall allocated memory should not exceed the specified limit
driverCores + executorCores * numExecutors < CPU_CAP
The overall allocated CPUs should not exceed the specified limit
driverMemory + executorMemory * numExecutors > MIN_MEMORY
The overall allocated memory should not exceed the specified limit
driverCores + executorCores * numExecutors > MIN_CPUS
The overall allocated CPUs should not exceed the specified limit