Spark Application 2.4.0

This page describes the Optimization Pack for Spark Application 2.4.0.

Metrics

Duration

Metric
Unit
Desciption

spark_application_duration

milliseconds

The duration of the Spark application

spark_job_duration

milliseconds

The duration of the job

spark_stage_duration

milliseconds

The duration of the stage

spark_task_duration

milliseconds

The duration of the task

Driver

Metric
Unit
Description

spark_driver_rdd_blocks

blocks

The total number of persisted RDD blocks for the driver

spark_driver_mem_used

bytes

The total amount of memory used by the driver

spark_driver_disk_used

bytes

The total amount of disk used for RDDs by the driver

spark_driver_cores

cores

The total number of concurrent tasks that can be run by the driver

spark_driver_total_input_bytes

bytes

The total number of bytes read from RDDs or persisted data by the driver

spark_driver_total_tasks

tasks

The total number of tasks run for each the driver

spark_driver_total_duration

milliseconds

The total amount of time spent by the driver running tasks

spark_driver_max_mem_used

bytes

The maximum amount of memory used by the driver

spark_driver_total_jvm_gc_duration

milliseconds

The total amount of time spent by the driver's JVM doing garbage across all tasks

spark_driver_total_shuffle_read

bytes

The total number of bytes read during a shuffle by the driver

spark_driver_total_shuffle_write

bytes

The total number of bytes written in shuffle operations by the driver

spark_driver_used_on_heap_storage_memory

bytes

The amount of on-heap memory used by the driver

spark_driver_used_off_heap_storage_memory

bytes

The amount of off-heap memory used by the driver

spark_driver_total_on_heap_storage_memory

bytes

The total amount of available on-heap memory for the driver

spark_driver_total_off_heap_storage_memory

bytes

The total amount of available off-heap memory for the driver

Executors

Metric
Unit
Description

spark_executor_max_count

executors

The maximum number of executors used for the application

spark_executor_rdd_blocks

blocks

The total number of persisted RDD blocks for each executor

spark_executor_mem_used

bytes

The total amount of memory used by each executor

spark_executor_disk_used

bytes

The total amount of disk used for RDDs by each executor

spark_executor_cores

cores

The number of cores used by each executor

spark_executor_total_input_bytes

bytes

The total number of bytes read from RDDs or persisted data by each executor

spark_executor_total_tasks

tasks

The total number of tasks run for each the executor

spark_executor_total_duration

milliseconds

The total amount of time spent by each executor running tasks

spark_executor_max_mem_used

bytes

The maximum amount of memory used by each executor

spark_executor_total_jvm_gc_duration

milliseconds

The total amount of time spent by each executor's JVM doing garbage collection across all tasks

spark_executor_total_shuffle_read

bytes

The total number of bytes read during a shuffle by each executor

spark_executor_total_shuffle_write

bytes

The total number of bytes written in shuffle operations by each executor

spark_executor_used_on_heap_storage_memory

bytes

The amount of on-heap memory used by each executor

spark_executor_used_off_heap_storage_memory

bytes

The amount of off-heap memory used by each executor

spark_executor_total_on_heap_storage_memory

bytes

The total amount of available on-heap memory for each executor

spark_executor_total_off_heap_storage_memory

bytes

The total amount of available off-heap memory for each executor

Stages and Tasks

Metric
Unit
Description

spark_stage_shuffle_read_bytes

bytes

The total number of bytes read in shuffle operations by each stage

spark_task_jvm_gc_duration

milliseconds

The total duration of JVM garbage collections for each task

spark_task_peak_execution_memory

bytes

The sum of the peak sizes across internal data structures created for each task

spark_task_result_size

bytes

The size of the result of the computation of each task

spark_task_result_serialization_time

milliseconds

The time spent by each task serializing the computation result

spark_task_shuffle_read_fetch_wait_time

milliseconds

The time spent by each task waiting for remote shuffle blocks

spark_task_shuffle_read_local_blocks_fetched

blocks

The total number of local blocks fetched in shuffle operations by each task

spark_task_shuffle_read_local_bytes

bytes

The total number of bytes read in shuffle operations from local disk by each task

spark_task_shuffle_read_remote_blocks_fetched

blocks

The total number of remote blocks fetched in shuffle operations by each task

spark_task_shuffle_read_remote_bytes

bytes

The total number of remote bytes read in shuffle operations by each task

spark_task_shuffle_read_remote_bytes_to_disk

bytes

The total number of remote bytes read to disk in shuffle operations by each task

spark_task_shuffle_write_time

nanoseconds

The time spent by each task writing data on disk or on buffer caches during shuffle operations

spark_task_executor_deserialize_time

nanoseconds

The time spent by the executor deserializing the task

spark_task_executor_deserialize_cpu_time

nanoseconds

The CPU time spent by the executor deserializing the task

spark_task_stage_shuffle_write_records

records

The total number of records written in shuffle operations broken down by task and stage

spark_task_stage_shuffle_write_bytes

records

The total number of bytes written in shuffle operations broken down by task and stage

spark_task_stage_shuffle_read_records

records

The total number of records read in shuffle operations broken down by task and stage

spark_task_stage_disk_bytes_spilled

bytes

The total number of bytes spilled on disk broken down by task and stage

spark_task_stage_memory_bytes_spilled

bytes

The total number of bytes spilled on memory broken down by task and stage

spark_task_stage_input_bytes_read

bytes

The total number of bytes read, broken down by task and stage

spark_task_stage_input_records_read

records

The total number of records read, broken down by task and stage

spark_task_stage_output_bytes_written

bytes

The total number of bytes written, broken down by task and stage

spark_task_stage_output_records_written

records

The total number of records written, broken down by task and stage

spark_task_stage_executor_run_time

nanoseconds

The time spent by each executor actually running tasks (including fetching shuffle data) broken down by task, stage and executor

spark_task_stage_executor_cpu_time

nanoseconds

The CPU time spent by each executor actually running each task (including fetching shuffle data) broken down by task and stage

Parameters

Execution

Parameter
Unit
Type
Default value
Domain
Restart
Description

driverCores

integer

cores

You should select your own default

You should select your own domain

yes

The number of CPU cores assigned to the driver in cluster deploy mode.

numExecutors

integer

executors

You should select your own default

You should select your own domain

yes

Number of executors to use. YARN only.

totalExecutorCores

integer

cores

You should select your own default

You should select your own domain

yes

Total number of cores for the application. Spark standalone and Mesos only.

executorCores

integer

cores

You should select your own default

You should select your own domain

yes

Number of CPU cores for an executor. Spark standalone and YARN only.

defaultParallelism

integer

partitions

You should select your own default

You should select your own domain

yes

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

broadcastBlockSize

integer

kilobytes

4096

256131072

yes

Size of each piece of a block for TorrentBroadcastFactory.

schedulerMode

categorical

FIFO

FIFO, FAIR

yes

Define the scheduling strategy across jobs.

CPU and Memory allocation

Parameter
Unit
Type
Default value
Domain
Restart
Description

driverMemory

integer

megabytes

You should select your own default

You should select your own domain

yes

Amount of memory to use for the driver process.

yarnDriverMemoryOverhead

integer

megabytes

384

38465536

yes

Off-heap memory to be allocated per driver in cluster mode. Currently supported in YARN and Kubernetes.

executorMemory

integer

megabytes

You should select your own default

You should select your own domain

yes

Amount of memory to use per executor.

executorPySparkMemory

integer

megabytes

You should select your own default

You should select your own default

yes

The amount of memory to be allocated to PySpark in each executor.

yarnExecutorMemoryOverhead

integer

megabytes

384

38465536

yes

Off-heap memory to be allocated per executor. Currently supported in YARN and Kubernetes.

memoryOffHeapEnabled

categorical

false

true, false

yes

If true, Spark will attempt to use off-heap memory for certain operations.

memoryOffHeapSize

integer

megabytes

0

016384

yes

The absolute amount of memory in bytes which can be used for off-heap allocation.

Shuffling

Parameter
Unit
Type
Default value
Domain
Restart
Description

reducerMaxSizeInFlight

integer

megabytes

48

11024

yes

Maximum size of map outputs to fetch simultaneously from each reduce task in MB.

shuffleFileBuffer

integer

kilobytes

32

12048

yes

Size of the in-memory buffer for each shuffle file output stream in KB.

shuffleCompress

categorical

true

true, false

yes

Whether to compress map output files.

shuffleServiceEnabled

categorical

true

true, false

yes

Enables the external shuffle service. This service preserves the shuffle files written by executors so the executors can be safely removed.

Dynamic allocation

Parameter
Unit
Type
Default value
Domain
Restart
Description

dynamicAllocationEnabled

categorical

true

true, false

yes

Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Requires spark.shuffle.service.enabled to be set.

dynamicAllocationExecutorIdleTimeout

integer

60

13600

yes

If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed.

dynamicAllocationInitialExecutors

integer

executors

You should select your own default

You should select your own domain

yes

Initial number of executors to run if dynamic allocation is enabled.

dynamicAllocationMinExecutors

integer

executors

You should select your own default

You should select your own domain

yes

Lower bound for the number of executors if dynamic allocation is enabled.

dynamicAllocationMaxExecutors

integer

executors

You should select your own default

You should select your own domain

yes

Upper bound for the number of executors if dynamic allocation is enabled.

SQL

Parameter
Unit
Type
Default value
Domain
Restart
Description

sqlInMemoryColumnarStorageCompressed

categorical

true

true, false

yes

When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.

sqlInMemoryColumnarStorageBatchSize

integer

records

1000

1100000

yes

Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.

sqlFilesMaxPartitionBytes

integer

bytes

134217728

10241073741824

yes

The maximum number of bytes to pack into a single partition when reading files.

sqlFilesOpenCostInBytes

integer

bytes

4194304

26214467108864

yes

The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition.

Compression and Serialization

Parameter
Unit
Type
Default value
Domain
Restart
Description

compressionLz4BlockSize

integer

bytes

32

81024

yes

Block size in bytes used in LZ4 compression.

serializer

categorical

org.apache.spark.serializer.KryoSerializer

org.apache.spark.serializer.JavaSerializer, org.apache.spark.serializer.KryoSerializer

yes

Class to use for serializing objects that will be sent over the network or need to be cached in serialized form.

kryoserializerBuffer

integer

bytes

64

81024

yes

Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker.

Constraints

The following tables show a list of constraints that may be required in the definition of the study, depending on the tuned parameters:

Cluster size

The overall resources allocated to the application should be constrained by a maximum and, sometimes, a minimum value:

  • the maximum value could be the sum of resources physically available in the cluster, or a lower limit to allow the concurrent execution of other applications

  • an optional minimum value could be useful to avoid configurations that allocate executors that are both small and scarce

driverMemory + executorMemory * numExecutors < MEMORY_CAP

The overall allocated memory should not exceed the specified limit

driverCores + executorCores * numExecutors < CPU_CAP

The overall allocated CPUs should not exceed the specified limit

driverMemory + executorMemory * numExecutors > MIN_MEMORY

The overall allocated memory should not exceed the specified limit

driverCores + executorCores * numExecutors > MIN_CPUS

The overall allocated CPUs should not exceed the specified limit

Last updated