Spark Application 2.4.0
This page describes the Optimization Pack for Spark Application 2.4.0.
Metrics
Duration
Metric | Unit | Desciption |
---|---|---|
spark_application_duration | milliseconds | The duration of the Spark application |
spark_job_duration | milliseconds | The duration of the job |
spark_stage_duration | milliseconds | The duration of the stage |
spark_task_duration | milliseconds | The duration of the task |
Driver
Metric | Unit | Description |
---|---|---|
spark_driver_rdd_blocks | blocks | The total number of persisted RDD blocks for the driver |
spark_driver_mem_used | bytes | The total amount of memory used by the driver |
spark_driver_disk_used | bytes | The total amount of disk used for RDDs by the driver |
spark_driver_cores | cores | The total number of concurrent tasks that can be run by the driver |
spark_driver_total_input_bytes | bytes | The total number of bytes read from RDDs or persisted data by the driver |
spark_driver_total_tasks | tasks | The total number of tasks run for each the driver |
spark_driver_total_duration | milliseconds | The total amount of time spent by the driver running tasks |
spark_driver_max_mem_used | bytes | The maximum amount of memory used by the driver |
spark_driver_total_jvm_gc_duration | milliseconds | The total amount of time spent by the driver's JVM doing garbage across all tasks |
spark_driver_total_shuffle_read | bytes | The total number of bytes read during a shuffle by the driver |
spark_driver_total_shuffle_write | bytes | The total number of bytes written in shuffle operations by the driver |
spark_driver_used_on_heap_storage_memory | bytes | The amount of on-heap memory used by the driver |
spark_driver_used_off_heap_storage_memory | bytes | The amount of off-heap memory used by the driver |
spark_driver_total_on_heap_storage_memory | bytes | The total amount of available on-heap memory for the driver |
spark_driver_total_off_heap_storage_memory | bytes | The total amount of available off-heap memory for the driver |
Executors
Metric | Unit | Description |
---|---|---|
spark_executor_max_count | executors | The maximum number of executors used for the application |
spark_executor_rdd_blocks | blocks | The total number of persisted RDD blocks for each executor |
spark_executor_mem_used | bytes | The total amount of memory used by each executor |
spark_executor_disk_used | bytes | The total amount of disk used for RDDs by each executor |
spark_executor_cores | cores | The number of cores used by each executor |
spark_executor_total_input_bytes | bytes | The total number of bytes read from RDDs or persisted data by each executor |
spark_executor_total_tasks | tasks | The total number of tasks run for each the executor |
spark_executor_total_duration | milliseconds | The total amount of time spent by each executor running tasks |
spark_executor_max_mem_used | bytes | The maximum amount of memory used by each executor |
spark_executor_total_jvm_gc_duration | milliseconds | The total amount of time spent by each executor's JVM doing garbage collection across all tasks |
spark_executor_total_shuffle_read | bytes | The total number of bytes read during a shuffle by each executor |
spark_executor_total_shuffle_write | bytes | The total number of bytes written in shuffle operations by each executor |
spark_executor_used_on_heap_storage_memory | bytes | The amount of on-heap memory used by each executor |
spark_executor_used_off_heap_storage_memory | bytes | The amount of off-heap memory used by each executor |
spark_executor_total_on_heap_storage_memory | bytes | The total amount of available on-heap memory for each executor |
spark_executor_total_off_heap_storage_memory | bytes | The total amount of available off-heap memory for each executor |
Stages and Tasks
Metric | Unit | Description |
---|---|---|
spark_stage_shuffle_read_bytes | bytes | The total number of bytes read in shuffle operations by each stage |
spark_task_jvm_gc_duration | milliseconds | The total duration of JVM garbage collections for each task |
spark_task_peak_execution_memory | bytes | The sum of the peak sizes across internal data structures created for each task |
spark_task_result_size | bytes | The size of the result of the computation of each task |
spark_task_result_serialization_time | milliseconds | The time spent by each task serializing the computation result |
spark_task_shuffle_read_fetch_wait_time | milliseconds | The time spent by each task waiting for remote shuffle blocks |
spark_task_shuffle_read_local_blocks_fetched | blocks | The total number of local blocks fetched in shuffle operations by each task |
spark_task_shuffle_read_local_bytes | bytes | The total number of bytes read in shuffle operations from local disk by each task |
spark_task_shuffle_read_remote_blocks_fetched | blocks | The total number of remote blocks fetched in shuffle operations by each task |
spark_task_shuffle_read_remote_bytes | bytes | The total number of remote bytes read in shuffle operations by each task |
spark_task_shuffle_read_remote_bytes_to_disk | bytes | The total number of remote bytes read to disk in shuffle operations by each task |
spark_task_shuffle_write_time | nanoseconds | The time spent by each task writing data on disk or on buffer caches during shuffle operations |
spark_task_executor_deserialize_time | nanoseconds | The time spent by the executor deserializing the task |
spark_task_executor_deserialize_cpu_time | nanoseconds | The CPU time spent by the executor deserializing the task |
spark_task_stage_shuffle_write_records | records | The total number of records written in shuffle operations broken down by task and stage |
spark_task_stage_shuffle_write_bytes | records | The total number of bytes written in shuffle operations broken down by task and stage |
spark_task_stage_shuffle_read_records | records | The total number of records read in shuffle operations broken down by task and stage |
spark_task_stage_disk_bytes_spilled | bytes | The total number of bytes spilled on disk broken down by task and stage |
spark_task_stage_memory_bytes_spilled | bytes | The total number of bytes spilled on memory broken down by task and stage |
spark_task_stage_input_bytes_read | bytes | The total number of bytes read, broken down by task and stage |
spark_task_stage_input_records_read | records | The total number of records read, broken down by task and stage |
spark_task_stage_output_bytes_written | bytes | The total number of bytes written, broken down by task and stage |
spark_task_stage_output_records_written | records | The total number of records written, broken down by task and stage |
spark_task_stage_executor_run_time | nanoseconds | The time spent by each executor actually running tasks (including fetching shuffle data) broken down by task, stage and executor |
spark_task_stage_executor_cpu_time | nanoseconds | The CPU time spent by each executor actually running each task (including fetching shuffle data) broken down by task and stage |
Parameters
Execution
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
driverCores | integer | cores | You should select your own default | You should select your own domain | yes | The number of CPU cores assigned to the driver in cluster deploy mode. |
numExecutors | integer | executors | You should select your own default | You should select your own domain | yes | Number of executors to use. YARN only. |
totalExecutorCores | integer | cores | You should select your own default | You should select your own domain | yes | Total number of cores for the application. Spark standalone and Mesos only. |
executorCores | integer | cores | You should select your own default | You should select your own domain | yes | Number of CPU cores for an executor. Spark standalone and YARN only. |
defaultParallelism | integer | partitions | You should select your own default | You should select your own domain | yes | Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. |
broadcastBlockSize | integer | kilobytes |
|
| yes | Size of each piece of a block for TorrentBroadcastFactory. |
schedulerMode | categorical |
|
| yes | Define the scheduling strategy across jobs. |
CPU and Memory allocation
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
driverMemory | integer | megabytes | You should select your own default | You should select your own domain | yes | Amount of memory to use for the driver process. |
yarnDriverMemoryOverhead | integer | megabytes |
|
| yes | Off-heap memory to be allocated per driver in cluster mode. Currently supported in YARN and Kubernetes. |
executorMemory | integer | megabytes | You should select your own default | You should select your own domain | yes | Amount of memory to use per executor. |
executorPySparkMemory | integer | megabytes | You should select your own default | You should select your own default | yes | The amount of memory to be allocated to PySpark in each executor. |
yarnExecutorMemoryOverhead | integer | megabytes |
|
| yes | Off-heap memory to be allocated per executor. Currently supported in YARN and Kubernetes. |
memoryOffHeapEnabled | categorical |
|
| yes | If true, Spark will attempt to use off-heap memory for certain operations. | |
memoryOffHeapSize | integer | megabytes |
|
| yes | The absolute amount of memory in bytes which can be used for off-heap allocation. |
Shuffling
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
reducerMaxSizeInFlight | integer | megabytes |
|
| yes | Maximum size of map outputs to fetch simultaneously from each reduce task in MB. |
shuffleFileBuffer | integer | kilobytes |
|
| yes | Size of the in-memory buffer for each shuffle file output stream in KB. |
shuffleCompress | categorical |
|
| yes | Whether to compress map output files. | |
shuffleServiceEnabled | categorical |
|
| yes | Enables the external shuffle service. This service preserves the shuffle files written by executors so the executors can be safely removed. |
Dynamic allocation
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
dynamicAllocationEnabled | categorical |
|
| yes | Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Requires spark.shuffle.service.enabled to be set. | |
dynamicAllocationExecutorIdleTimeout | integer |
|
| yes | If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. | |
dynamicAllocationInitialExecutors | integer | executors | You should select your own default | You should select your own domain | yes | Initial number of executors to run if dynamic allocation is enabled. |
dynamicAllocationMinExecutors | integer | executors | You should select your own default | You should select your own domain | yes | Lower bound for the number of executors if dynamic allocation is enabled. |
dynamicAllocationMaxExecutors | integer | executors | You should select your own default | You should select your own domain | yes | Upper bound for the number of executors if dynamic allocation is enabled. |
SQL
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
sqlInMemoryColumnarStorageCompressed | categorical |
|
| yes | When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. | |
sqlInMemoryColumnarStorageBatchSize | integer | records |
|
| yes | Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. |
sqlFilesMaxPartitionBytes | integer | bytes |
|
| yes | The maximum number of bytes to pack into a single partition when reading files. |
sqlFilesOpenCostInBytes | integer | bytes |
|
| yes | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. |
Compression and Serialization
Parameter | Unit | Type | Default value | Domain | Restart | Description |
---|---|---|---|---|---|---|
compressionLz4BlockSize | integer | bytes |
|
| yes | Block size in bytes used in LZ4 compression. |
serializer | categorical |
|
| yes | Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. | |
kryoserializerBuffer | integer | bytes |
|
| yes | Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. |
Constraints
The following tables show a list of constraints that may be required in the definition of the study, depending on the tuned parameters:
Cluster size
The overall resources allocated to the application should be constrained by a maximum and, sometimes, a minimum value:
the maximum value could be the sum of resources physically available in the cluster, or a lower limit to allow the concurrent execution of other applications
an optional minimum value could be useful to avoid configurations that allocate executors that are both small and scarce
| The overall allocated memory should not exceed the specified limit |
| The overall allocated CPUs should not exceed the specified limit |
| The overall allocated memory should not exceed the specified limit |
| The overall allocated CPUs should not exceed the specified limit |
Last updated