# Spark Application 2.4.0

This page describes the Optimization Pack for Spark Application 2.4.0.

## Metrics

### Duration

| Metric                       | Unit         | Desciption                            |
| ---------------------------- | ------------ | ------------------------------------- |
| spark\_application\_duration | milliseconds | The duration of the Spark application |
| spark\_job\_duration         | milliseconds | The duration of the job               |
| spark\_stage\_duration       | milliseconds | The duration of the stage             |
| spark\_task\_duration        | milliseconds | The duration of the task              |

### Driver

| Metric                                           | Unit         | Description                                                                       |
| ------------------------------------------------ | ------------ | --------------------------------------------------------------------------------- |
| spark\_driver\_rdd\_blocks                       | blocks       | The total number of persisted RDD blocks for the driver                           |
| spark\_driver\_mem\_used                         | bytes        | The total amount of memory used by the driver                                     |
| spark\_driver\_disk\_used                        | bytes        | The total amount of disk used for RDDs by the driver                              |
| spark\_driver\_cores                             | cores        | The total number of concurrent tasks that can be run by the driver                |
| spark\_driver\_total\_input\_bytes               | bytes        | The total number of bytes read from RDDs or persisted data by the driver          |
| spark\_driver\_total\_tasks                      | tasks        | The total number of tasks run for each the driver                                 |
| spark\_driver\_total\_duration                   | milliseconds | The total amount of time spent by the driver running tasks                        |
| spark\_driver\_max\_mem\_used                    | bytes        | The maximum amount of memory used by the driver                                   |
| spark\_driver\_total\_jvm\_gc\_duration          | milliseconds | The total amount of time spent by the driver's JVM doing garbage across all tasks |
| spark\_driver\_total\_shuffle\_read              | bytes        | The total number of bytes read during a shuffle by the driver                     |
| spark\_driver\_total\_shuffle\_write             | bytes        | The total number of bytes written in shuffle operations by the driver             |
| spark\_driver\_used\_on\_heap\_storage\_memory   | bytes        | The amount of on-heap memory used by the driver                                   |
| spark\_driver\_used\_off\_heap\_storage\_memory  | bytes        | The amount of off-heap memory used by the driver                                  |
| spark\_driver\_total\_on\_heap\_storage\_memory  | bytes        | The total amount of available on-heap memory for the driver                       |
| spark\_driver\_total\_off\_heap\_storage\_memory | bytes        | The total amount of available off-heap memory for the driver                      |

### Executors

| Metric                                             | Unit         | Description                                                                                     |
| -------------------------------------------------- | ------------ | ----------------------------------------------------------------------------------------------- |
| spark\_executor\_max\_count                        | executors    | The maximum number of executors used for the application                                        |
| spark\_executor\_rdd\_blocks                       | blocks       | The total number of persisted RDD blocks for each executor                                      |
| spark\_executor\_mem\_used                         | bytes        | The total amount of memory used by each executor                                                |
| spark\_executor\_disk\_used                        | bytes        | The total amount of disk used for RDDs by each executor                                         |
| spark\_executor\_cores                             | cores        | The number of cores used by each executor                                                       |
| spark\_executor\_total\_input\_bytes               | bytes        | The total number of bytes read from RDDs or persisted data by each executor                     |
| spark\_executor\_total\_tasks                      | tasks        | The total number of tasks run for each the executor                                             |
| spark\_executor\_total\_duration                   | milliseconds | The total amount of time spent by each executor running tasks                                   |
| spark\_executor\_max\_mem\_used                    | bytes        | The maximum amount of memory used by each executor                                              |
| spark\_executor\_total\_jvm\_gc\_duration          | milliseconds | The total amount of time spent by each executor's JVM doing garbage collection across all tasks |
| spark\_executor\_total\_shuffle\_read              | bytes        | The total number of bytes read during a shuffle by each executor                                |
| spark\_executor\_total\_shuffle\_write             | bytes        | The total number of bytes written in shuffle operations by each executor                        |
| spark\_executor\_used\_on\_heap\_storage\_memory   | bytes        | The amount of on-heap memory used by each executor                                              |
| spark\_executor\_used\_off\_heap\_storage\_memory  | bytes        | The amount of off-heap memory used by each executor                                             |
| spark\_executor\_total\_on\_heap\_storage\_memory  | bytes        | The total amount of available on-heap memory for each executor                                  |
| spark\_executor\_total\_off\_heap\_storage\_memory | bytes        | The total amount of available off-heap memory for each executor                                 |

### Stages and Tasks

| Metric                                              | Unit         | Description                                                                                                                      |
| --------------------------------------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------------- |
| spark\_stage\_shuffle\_read\_bytes                  | bytes        | The total number of bytes read in shuffle operations by each stage                                                               |
| spark\_task\_jvm\_gc\_duration                      | milliseconds | The total duration of JVM garbage collections for each task                                                                      |
| spark\_task\_peak\_execution\_memory                | bytes        | The sum of the peak sizes across internal data structures created for each task                                                  |
| spark\_task\_result\_size                           | bytes        | The size of the result of the computation of each task                                                                           |
| spark\_task\_result\_serialization\_time            | milliseconds | The time spent by each task serializing the computation result                                                                   |
| spark\_task\_shuffle\_read\_fetch\_wait\_time       | milliseconds | The time spent by each task waiting for remote shuffle blocks                                                                    |
| spark\_task\_shuffle\_read\_local\_blocks\_fetched  | blocks       | The total number of local blocks fetched in shuffle operations by each task                                                      |
| spark\_task\_shuffle\_read\_local\_bytes            | bytes        | The total number of bytes read in shuffle operations from local disk by each task                                                |
| spark\_task\_shuffle\_read\_remote\_blocks\_fetched | blocks       | The total number of remote blocks fetched in shuffle operations by each task                                                     |
| spark\_task\_shuffle\_read\_remote\_bytes           | bytes        | The total number of remote bytes read in shuffle operations by each task                                                         |
| spark\_task\_shuffle\_read\_remote\_bytes\_to\_disk | bytes        | The total number of remote bytes read to disk in shuffle operations by each task                                                 |
| spark\_task\_shuffle\_write\_time                   | nanoseconds  | The time spent by each task writing data on disk or on buffer caches during shuffle operations                                   |
| spark\_task\_executor\_deserialize\_time            | nanoseconds  | The time spent by the executor deserializing the task                                                                            |
| spark\_task\_executor\_deserialize\_cpu\_time       | nanoseconds  | The CPU time spent by the executor deserializing the task                                                                        |
| spark\_task\_stage\_shuffle\_write\_records         | records      | The total number of records written in shuffle operations broken down by task and stage                                          |
| spark\_task\_stage\_shuffle\_write\_bytes           | records      | The total number of bytes written in shuffle operations broken down by task and stage                                            |
| spark\_task\_stage\_shuffle\_read\_records          | records      | The total number of records read in shuffle operations broken down by task and stage                                             |
| spark\_task\_stage\_disk\_bytes\_spilled            | bytes        | The total number of bytes spilled on disk broken down by task and stage                                                          |
| spark\_task\_stage\_memory\_bytes\_spilled          | bytes        | The total number of bytes spilled on memory broken down by task and stage                                                        |
| spark\_task\_stage\_input\_bytes\_read              | bytes        | The total number of bytes read, broken down by task and stage                                                                    |
| spark\_task\_stage\_input\_records\_read            | records      | The total number of records read, broken down by task and stage                                                                  |
| spark\_task\_stage\_output\_bytes\_written          | bytes        | The total number of bytes written, broken down by task and stage                                                                 |
| spark\_task\_stage\_output\_records\_written        | records      | The total number of records written, broken down by task and stage                                                               |
| spark\_task\_stage\_executor\_run\_time             | nanoseconds  | The time spent by each executor actually running tasks (including fetching shuffle data) broken down by task, stage and executor |
| spark\_task\_stage\_executor\_cpu\_time             | nanoseconds  | The CPU time spent by each executor actually running each task (including fetching shuffle data) broken down by task and stage   |

## Parameters

### Execution

| Parameter          | Unit        | Type       | Default value                      | Domain                            | Restart | Description                                                                                                                    |
| ------------------ | ----------- | ---------- | ---------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------ |
| driverCores        | integer     | cores      | You should select your own default | You should select your own domain | yes     | The number of CPU cores assigned to the driver in cluster deploy mode.                                                         |
| numExecutors       | integer     | executors  | You should select your own default | You should select your own domain | yes     | Number of executors to use. YARN only.                                                                                         |
| totalExecutorCores | integer     | cores      | You should select your own default | You should select your own domain | yes     | Total number of cores for the application. Spark standalone and Mesos only.                                                    |
| executorCores      | integer     | cores      | You should select your own default | You should select your own domain | yes     | Number of CPU cores for an executor. Spark standalone and YARN only.                                                           |
| defaultParallelism | integer     | partitions | You should select your own default | You should select your own domain | yes     | Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. |
| broadcastBlockSize | integer     | kilobytes  | `4096`                             | `256` → `131072`                  | yes     | Size of each piece of a block for TorrentBroadcastFactory.                                                                     |
| schedulerMode      | categorical |            | `FIFO`                             | `FIFO`, `FAIR`                    | yes     | Define the scheduling strategy across jobs.                                                                                    |

### CPU and Memory allocation

| Parameter                  | Unit        | Type      | Default value                      | Domain                             | Restart | Description                                                                                             |
| -------------------------- | ----------- | --------- | ---------------------------------- | ---------------------------------- | ------- | ------------------------------------------------------------------------------------------------------- |
| driverMemory               | integer     | megabytes | You should select your own default | You should select your own domain  | yes     | Amount of memory to use for the driver process.                                                         |
| yarnDriverMemoryOverhead   | integer     | megabytes | `384`                              | `384` → `65536`                    | yes     | Off-heap memory to be allocated per driver in cluster mode. Currently supported in YARN and Kubernetes. |
| executorMemory             | integer     | megabytes | You should select your own default | You should select your own domain  | yes     | Amount of memory to use per executor.                                                                   |
| executorPySparkMemory      | integer     | megabytes | You should select your own default | You should select your own default | yes     | The amount of memory to be allocated to PySpark in each executor.                                       |
| yarnExecutorMemoryOverhead | integer     | megabytes | `384`                              | `384` → `65536`                    | yes     | Off-heap memory to be allocated per executor. Currently supported in YARN and Kubernetes.               |
| memoryOffHeapEnabled       | categorical |           | `false`                            | `true`, `false`                    | yes     | If true, Spark will attempt to use off-heap memory for certain operations.                              |
| memoryOffHeapSize          | integer     | megabytes | `0`                                | `0` → `16384`                      | yes     | The absolute amount of memory in bytes which can be used for off-heap allocation.                       |

### Shuffling

| Parameter              | Unit        | Type      | Default value | Domain          | Restart | Description                                                                                                                                 |
| ---------------------- | ----------- | --------- | ------------- | --------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| reducerMaxSizeInFlight | integer     | megabytes | `48`          | `1` → `1024`    | yes     | Maximum size of map outputs to fetch simultaneously from each reduce task in MB.                                                            |
| shuffleFileBuffer      | integer     | kilobytes | `32`          | `1` → `2048`    | yes     | Size of the in-memory buffer for each shuffle file output stream in KB.                                                                     |
| shuffleCompress        | categorical |           | `true`        | `true`, `false` | yes     | Whether to compress map output files.                                                                                                       |
| shuffleServiceEnabled  | categorical |           | `true`        | `true`, `false` | yes     | Enables the external shuffle service. This service preserves the shuffle files written by executors so the executors can be safely removed. |

### Dynamic allocation

| Parameter                            | Unit        | Type      | Default value                      | Domain                            | Restart | Description                                                                                                                                                                                            |
| ------------------------------------ | ----------- | --------- | ---------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| dynamicAllocationEnabled             | categorical |           | `true`                             | `true`, `false`                   | yes     | Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Requires spark.shuffle.service.enabled to be set. |
| dynamicAllocationExecutorIdleTimeout | integer     |           | `60`                               | `1` → `3600`                      | yes     | If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed.                                                                              |
| dynamicAllocationInitialExecutors    | integer     | executors | You should select your own default | You should select your own domain | yes     | Initial number of executors to run if dynamic allocation is enabled.                                                                                                                                   |
| dynamicAllocationMinExecutors        | integer     | executors | You should select your own default | You should select your own domain | yes     | Lower bound for the number of executors if dynamic allocation is enabled.                                                                                                                              |
| dynamicAllocationMaxExecutors        | integer     | executors | You should select your own default | You should select your own domain | yes     | Upper bound for the number of executors if dynamic allocation is enabled.                                                                                                                              |

### SQL

| Parameter                            | Unit        | Type    | Default value | Domain                | Restart | Description                                                                                                                                                      |
| ------------------------------------ | ----------- | ------- | ------------- | --------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| sqlInMemoryColumnarStorageCompressed | categorical |         | `true`        | `true`, `false`       | yes     | When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.                                        |
| sqlInMemoryColumnarStorageBatchSize  | integer     | records | `1000`        | `1` → `100000`        | yes     | Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.           |
| sqlFilesMaxPartitionBytes            | integer     | bytes   | `134217728`   | `1024` → `1073741824` | yes     | The maximum number of bytes to pack into a single partition when reading files.                                                                                  |
| sqlFilesOpenCostInBytes              | integer     | bytes   | `4194304`     | `262144` → `67108864` | yes     | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. |

### Compression and Serialization

| Parameter               | Unit        | Type  | Default value                                | Domain                                                                                     | Restart | Description                                                                                                      |
| ----------------------- | ----------- | ----- | -------------------------------------------- | ------------------------------------------------------------------------------------------ | ------- | ---------------------------------------------------------------------------------------------------------------- |
| compressionLz4BlockSize | integer     | bytes | `32`                                         | `8` → `1024`                                                                               | yes     | Block size in bytes used in LZ4 compression.                                                                     |
| serializer              | categorical |       | `org.apache.spark.serializer.KryoSerializer` | `org.apache.spark.serializer.JavaSerializer`, `org.apache.spark.serializer.KryoSerializer` | yes     | Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. |
| kryoserializerBuffer    | integer     | bytes | `64`                                         | `8` → `1024`                                                                               | yes     | Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker.         |

## Constraints

The following tables show a list of constraints that may be required in the definition of the study, depending on the tuned parameters:

### Cluster size

The overall resources allocated to the application should be constrained by a maximum and, sometimes, a minimum value:

* the maximum value could be the sum of resources physically available in the cluster, or a lower limit to allow the concurrent execution of other applications
* an optional minimum value could be useful to avoid configurations that allocate executors that are both small and scarce

| `driverMemory + executorMemory * numExecutors < MEMORY_CAP` | The overall allocated memory should not exceed the specified limit |
| ----------------------------------------------------------- | ------------------------------------------------------------------ |
| `driverCores + executorCores * numExecutors < CPU_CAP`      | The overall allocated CPUs should not exceed the specified limit   |
| `driverMemory + executorMemory * numExecutors > MIN_MEMORY` | The overall allocated memory should not exceed the specified limit |
| `driverCores + executorCores * numExecutors > MIN_CPUS`     | The overall allocated CPUs should not exceed the specified limit   |
