Spark Interview Guide : Miscellaneous Important Concepts

1. Internal working of Spark

It has small code base but system is divided into many layers. Each layer has responsibility. The layers are independent to each others.
Spark driver contains many components to translate user program into actual jobs executed on cluster.

Spark Context: It represent the connection to spark cluster, it is used to create RDD, accumulators and broadcast variables to cluster.

DAGScheduler: It computes DAG stages for each job and submit them to TaskScheduler, determine preferred locations of task and find minimum schedule to run the job.

TaskScheduler: Responsible for sending task to cluster, running tasks, retrying if they are failure.

Scheduler Backend: Backend interface used to schedule systems that allow plugging in different library or framework.

2. What is Shuffle partitions in Spark

Shuffle partitions are the partitions in spark dataframe, which is created using a grouped or join operation. Number of partitions in this dataframe is different than the original dataframe partitions.

For example, the below code
val df = sparkSession.read.csv("src/main/resources/sales.csv")
println(df.rdd.partitions.length)

will print 2 for small sales file. This indicates there are two partitions in the dataframe.
Now when we run the groupby operation on the same, the number of partitions will change

println(df.groupBy("_c0").count().rdd.partitions.length)

The above code prints 200. The 2 partition increased to 200.

This is because the parameter spark.sql.shuffle.partitions which controls number of shuffle partitions is set to 200 by default.

Challenges with Default Shuffle Partitions

The number of shuffle partitions in spark is static. It doesn’t change with different data size. This will lead into below issues.

  • For smaller data, 200 is a overkill which often leads to slower processing because of scheduling overheads.
  • For large data, 200 is small and doesn’t effectively use the all resources in the cluster.

To over come the issues mentioned above, we need to control shuffle partitions dynamically.

Dynamically Setting the Shuffle Partitions

Spark allows changing the configuration of spark sql using conf method on the sparkSession. Using this method, we can set wide variety of configurations dynamically.
So if we need to reduce the number of shuffle partitions for a given dataset, we can do that by below code

sparkSession.conf.set("spark.sql.shuffle.partitions",100)
println(df.groupBy("_c0").count().rdd.partitions.length)

The above code will print 100. This shows how to set the number partitions dynamically.
The exact logic for coming up with number of shuffle partitions depends on actual analysis. You can typically set it to be 1.5 or 2 times of the initial partitions.

  • Operations that involve Shuffle

repartition, coalesce, By Key Operations Except Count By Key, Join, Co Group

  • One of the best solution to avoid a static number of partitions (200 by default) is to enabled Spark 3.0 new features Adaptive Query Execution (AQE).
    This feature enables Spark to dynamically coalesce shuffle partitions even when the static parameter which defines the default number of shuffle partitions is set to a inappropriate number (defined by spark.sql.shuffle.partitions, 200 by default)

For more details on Spark shuffle Architecture : https://0x0fff.com/spark-architecture-shuffle/

3. How to monitor Spark applications

There are three ways to monitor Spark applications. The first way is the Web UI. The default port is 4040.

  1. The information on this UI is available for the duration of the application. To see the data sometime later, set the spark.eventLog.enabled to true prior to beginning the application. The information will then be persisted to storage as well.
  2. At last, you can likewise utilize outer instrumentations to monitor Spark. Gangalia is used to view overall cluster utilization and resource bottlenecks. Various OS profiling tools and JVM utilities can likewise be utilized for observing Spark.

Reference :

5. What is static allocation and dynamic allocation in Spark?

  • Static Allocation — The values are given as part of spark-submit.
    Dynamic Allocation — The values are picked up based on the requirement (size of data, amount of computations needed) and released after use. This helps the resources to be re-used for other applications.
  • Dynamic resource allocation
    Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.
    This feature is disabled by default and available on all coarse-grained cluster managers, i.e. standalone mode, YARN mode, and Mesos coarse-grained mode.

To enable it, the spark property spark.dynamicAllocation.enabled should be set to 'true'. And when dynamic allocation is enabled, there are companion properties need to be set.

  • spark.dynamicAllocation.initialExecutors, the initial number of executors when the spark sessions is created. This value is set to minExecutors if not set.
  • spark.dynamicAllocation.minExecutors, the minimum number of executors the spark session should maintain.
  • spark.dynamicAllocation.maxExecutors, max number of executors the spark session should maintain.

You should decide if your Spark job can benefit from using dynamic allocation and maybe switch on and off based on the execution of the actual code (by looking at Spark UI).

6. Spark Submit and its Options

What is Spark Submit?
Spark Submit is a command-line tool that allows you to submit the Spark application to the cluster.

Reference : https://sparkbyexamples.com/spark/spark-submit-command/

7. Speculative Execution in Spark

The Speculative task in Apache Spark is task that runs slower than the rest of the task in the job. It is health check process that verifies the task is speculated, meaning the task that runs slower than the median of successfully completed task in the task sheet. Such tasks are submitted to another worker. It runs the new copy in parallel rather than shutting down the slow task.

In the cluster deployment mode, the thread starts as TaskSchedulerImp1with spark.speculation enabled. It executes periodically every spark.speculation.interval after the initial spark.speculation.interval passes.

Spark.speculation default value is false If you set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.

http://spark.apache.org/docs/latest/configuration.html

You can add these flags to your spark-submit, passing them under — conf e.g.:

spark-submit \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \
--conf "spark.speculation.quantile=0.90" \
--class "org.asyncified.myClass" "path/to/test.jar"

Note : Spark driver is spending a lot of time in speculation when managing a large number of tasks. enable it only if needed.

8. Optimal file size for HDFS & S3

  • HDFS
    In the case of HDFS, the ideal file size is that which is as close to the configured blocksize value as possible (dfs.blocksize), often set as default to 128 MB.
    Avoid file sizes that are smaller than the configured block size. An average size below the recommended size adds more burden to the NameNode, cause heap/GC issues in addition to cause storage and processing to be inefficient.
    Larger files than the blocksize are potentially wasteful. e.g. Creating files of 130MB would mean that file extend over 2 blocks, which carries additional I/O time.
  • S3
    For S3, there is a configuration parameter we can refer to — fs.s3a.block.size — however this is not the full story. File listing performance from S3 is slow, therefore an opinion exists to optimise for a larger file size.
    1GB is a widely used default, although you can feasibly go up to the 4GB file maximum before splitting.
    The penalty for handling larger files is that processes such as Spark will partition based on files — if you have more cores available than partitions, they will be idle. 2x1GB files in a partition can only be operated on by 2 cores simultaneously, whereas 16 files of 128MB could be processed by 16 cores in parallel.

9. Difference between spark.sql.shuffle.partitions vs spark.default.parallelism?

Spark provides spark.sql.shuffle.partitions and spark.default.parallelism configurations to work with parallelism or partitions

RDD: spark.default.parallelism was introduced with RDD hence this property is only applicable to RDD. The default value for this configuration set to the number of all cores on all nodes in a cluster, on local, it is set to the number of cores on your system.

spark.conf.set("spark.default.parallelism",100)

DataFrame: Whereas spark.sql.shuffle.partitions was introduced with DataFrame and it only works with DataFrame, the default value for this configuration set to 200.

spark.conf.set("spark.sql.shuffle.partitions",100)

10. Metastore in Apache Spark

Metastore

Metastore (aka metastore_db) is a relational database that is used by Hive, Presto, Spark, etc. to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) for fast access.
Additionally, a spark-warehouse is the directory where Spark SQL persists tables.

Spark SQL by default uses an In-Memory catalog/metastore deployed with Apache Derby database.

11. Predicate Push Down (PPD)

PPD in simple terms is a process of only selecting the required data for processing when querying a huge table. For eg: If you have a table of 100 columns and you are querying only 10 columns, in PPD data for only those 10 columns are selected for further processing. Another example could be, if there is a filter clause (eg. where clause) in any query, the filter will be applied first to reduce the number of records picked for processing. This significantly improves the performance by reducing the number of records read/write resulting reduction in input/output operation.

Columnar file formats give us a great way of using the power of PPD as it inherently enabled us to do so. Some of the examples of Columnar file formats are Parquet, RC or Row-Column, ORC or Optimized Row-Column, etc.

Optimization Trick: There are two important notes to make here.

  • Use Parquet format wherever feasible for reading and writing files into HDFS or s3 as the parquet seems to be performing very well along with Spark. Especially, All the intermediate steps that you would like to write data into HDFS so as to break the lineage( As mentioned under optimization trick in Lazy Evaluation)
  • Always try to identify the “filters” and try to move it up as early as you can for all your data processing pipeline.

12. Is it a good idea to change spark.default.parallelism to a big number so that lots of threads can work on a single partition concurrently?

Partition is the smallest unit of concurrency in Spark. It means a single thread per partition. You can of course use parallel processing inside mapPartitions but it is not a part of a standard Spark logic.

Higher parallelism means more partitions when number of partitions is not specified otherwise. Usually it is a desired outcome but it comes with a price. It means a growing cost of bookkeeping, less efficient aggregations and generally speaking less data that can be processed locally without serialization/deserialization and network traffic. It can become a serious problem when number of partitions grows when number of partitions is very high compared to the amount of data and number of available cores (see Spark iteration time increasing exponentially when using join).

When it makes sense to increase parallelism:

  • you have large amount of data and a lot of spare resources (recommend number of partitions is twice a number of available cores).
  • you want to reduce amount of memory required to process a single partition.
  • you perform computationally intensive tasks.

When it doesn’t makes sense to increase parallelism:

  • parallelism >> number of available cores.
  • parallelism is high compared to amount of data and you want to process more than one record at the time (groupBy, reduce, agg).

Generally ,spark.default.parallelism is not a very useful tool and it makes more sense to adjust parallelism on case by case basis. If parallelism is too high it can result in empty partitions in case of data loading and simple transformations and reduced performance / suboptimal resource usage. If it is too low it can lead to problems when you perform transformations which may require a large number of partitions (joins, unions).

13. DAG

(Directed Acyclic Graph) DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD. In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.

There are multiple advantages of Spark DAG, let’s discuss them one by one:

  • The lost RDD can recover using the Directed Acyclic Graph.
  • Map Reduce has just two queries the map, and reduce but in DAG we have multiple levels. So to execute SQL query, DAG is more flexible.
  • DAG helps to achieve fault tolerance. Thus we can recover the lost data.
  • It can do a better global optimization than a system like Hadoop MapReduce.

How DAG works in Spark?

  • The interpreter is the first layer, using a Scala interpreter, Spark interprets the code with some modifications.
  • Spark creates an operator graph when you enter your code in Spark console.
  • When we call an Action on Spark RDD at a high level, Spark submits the operator graph to the DAG Scheduler.
  • Divide the operators into stages of the task in the DAG Scheduler. A stage contains task based on the partition of the input data. The DAG scheduler pipelines operators together. For example, map operators schedule in a single stage.
  • The stages pass on to the Task Scheduler. It launches task through cluster manager. The dependencies of stages are unknown to the task scheduler.
  • The Workers execute the task on the slave.

The image below briefly describes the steps of How DAG works in the Spark job execution.

How to Achieve Fault Tolerance through DAG?

RDD splits into the partition and each node operates on a partition at any point in time. Here, the series of Scala function executes on a partition of the RDD. These operations compose together and Spark execution engine view these as DAG (Directed Acyclic Graph).

When any node crashes in the middle of any operation say O3 which depends on operation O2, which in turn O1. The cluster manager finds out the node is dead and assign another node to continue processing. This node will operate on the particular partition of the RDD and the series of operation that it has to execute (O1->O2->O3). Now there will be no data loss.

14. RDD lineage

While we create a new RDD from an existing Spark RDD, that new RDD also carries a pointer to the parent RDD in Spark. That is the same as all the dependencies between the RDDs those are logged in a graph, rather than the actual data. It is what we call as lineage graph.

RDD Lineage (aka RDD operator graph or RDD dependency graph) is a graph of all the parent RDDs of a RDD. It is built as a result of applying transformations to the RDD and creates a logical execution plan.

Advantage of Lineage

  • Allows RDD’s to be reconstructed when nodes crash.
  • We start from the source file. Apply all the transformation which are stored and recreate the RDD
  • Allows RDD’s to be lazily instantiated (materialized) when accessing the results

ToDebugString Method to get RDD Lineage Graph in Spark

Although there are several methods to get RDD lineage graph in spark, one of the methods is toDebugString method. Such as,
toDebugString: String

Have a look at Spark DStream
Basically, we can learn about an Spark RDD lineage graph with the help of this method.

scala> val wordCount1 = sc.textFile(“README.md”).flatMap(_.split(“\\s+”)).map((_, 1)).reduceByKey(_ + _)
scala> wordCount1.toDebugString

Result:

res13: String =
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[20] at map at <console>:24 []
| MapPartitionsRDD[19] at flatMap at <console>:24 []
| README.md MapPartitionsRDD[18] at textFile at <console>:24 []
| README.md HadoopRDD[17] at textFile at <console>:24 []
Here for indication of shuffle boundary, this method “ toDebugString method” uses indentations.
Basically, here H in round brackets refers, numbers that show the level of parallelism at each stage.
For example, (2) in the above output.
scala> wordCount1.getNumPartitions
res14: Int = 2
The toDebugString method is included when executing an action, With spark.logLineage property enabled.
$ ./bin/spark-shell –conf spark.logLineage=true
scala> sc.textFile(“README.md”, 4).count

15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD’s recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
| README.md HadoopRDD[0] at textFile at <console>:25 []

15. What according to you is a common mistake apache-spark developers make when using spark?

16. Why should one not use a UDF?

UDFs can not be optimized by the Catalyst Optimizer. To use UDFs, functions must be serialized and sent to executors. And for Python, there is additional overhead of spinning up a Python interpreter on an executor to run a UDF.

sampleUDF = udf(sample_function) serializes the function and sends the UDF to the executors so that we can use it on a dataframe.

17. What is Serialization in Spark ?

  • What is Serialization
  • Concept of Serialization in Spark
  • Java serialization
  • Kyro Serialization
  • why Kyro Serialization is best for Spark?

Serialization refers to converting objects into a stream of bytes and vice-versa (de-serialization) in an optimal way to transfer it over nodes of network or store it in a file/memory buffer.

Data Serialization:
Serialization is to convert an object to byte stream and the vice versa is for de-serialization.

We can configure the serializer type using property => spark.serializer

Advantages -

a. Faster than Java serialization mechanism
b. Serialize data size is smaller compared to Java serialized. Memory consumption is less.
c. Kryo Serialization when shuffle can optimize the performance of network transmission.

The global default serializer is set to FieldSerializer by default.

There are two types of serializers provided in Spark.

  1. Java serialization (default)

Java serialization is the default serialization that is used by Spark when we spin up the driver. It provides a lot more flexibility, but it is quiet slow and leads to large serialized formats for many classes.

  1. Kyro serialization (recommended by Spark)

Kryo is a Java serialization framework that focuses on speed, efficiency, and a user-friendly API.
Kryo has less memory footprint, which becomes very important when you are shuffling and caching a large amount of data. However, it is not natively supported to serialize to the disk. Both methods, saveAsObjectFile on RDD and objectFile on SparkContext support Java serialization only.
Kryo is not the default because of the custom registration and manual configuration requirement.

Let’s see how we can set up Kryo to use in our application.

Default Serializer
When Kryo serializes an object, it creates an instance of a previously registered Serializer class to do the conversion to bytes. Default serializers can be used without any setup on our part.

Let’s see how we can set up Kryo to use in our application.

val spark = SparkSession.builder().appName(“KryoSerializerExample”) 
.config(someConfig)
.config(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
.config(“spark.kryoserializer.buffer”, “1024k”)
.config(“spark.kryoserializer.buffer.max”, “1024m”)
.config(“spark.kryo.registrationRequired”, “true”)
.getOrCreate

18. What will spark do if I don’t have enough memory?

There can be either of these situation.

What happens if my dataset does not fit in memory? Often each partition of data is small and does fit in memory, and these partitions are processed a few at a time. For very large partitions that do not fit in memory, Spark’s built-in operators perform external operations on datasets.

What happens when a cached dataset does not fit in memory? Spark can either spill it to disk or recompute the partitions that don’t fit in RAM each time they are requested. By default, it uses recomputation, but you can set a dataset’s storage level to MEMORY_AND_DISK to avoid this.

19. Fault Tolerant
TBD

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Nivedita Mondal

Nivedita Mondal

503 Followers

This is a repository of my thoughts, my random interest & notes taken down as I navigate my way through the tech world! I am glad if you find anything useful !