Spark Interview Guide : Part 3 : Spark SQL & Dataframe

1. What is Spark SQL?

Spark SQL integrates relational processing with Spark’s functional programming. It provides support for various data sources and makes it possible to weave SQL queries with code transformations thus resulting in a very powerful tool.

2. Why is Spark SQL used?

Spark SQL originated as Apache Hive to run on top of Spark and is now integrated with the Spark stack. Apache Hive had certain limitations as mentioned below. Spark SQL was built to overcome these drawbacks and replace Apache Hive.

3. Is Spark SQL faster than Hive?

Spark SQL is faster than Hive when it comes to processing speed. Below I have listed down a few limitations of Hive over Spark SQL.

Limitations With Hive:

  • Hive launches MapReduce jobs internally for executing the ad-hoc queries.MapReduce lags in the performance when it comes to the analysis of medium-sized datasets (10 to 200 GB).
  • Hive has no resume capability. This means that if the processing dies in the middle of a workflow, you cannot resume from where it got stuck.
  • Hive cannot drop encrypted databases in cascade when the trash is enabled and leads to an execution error. To overcome this, users have to use the Purge option to skip trash instead of drop.

These drawbacks gave way to the birth of Spark SQL.But the question which still pertains in most of our minds is.

4. Is Spark SQL a database?

Spark SQL is not a database but a module that is used for structured data processing. It majorly works on DataFrames which are the programming abstraction and usually act as a distributed SQL query engine.

5. How does Spark SQL work?

Let us explore, what Spark SQL has to offer. Spark SQL blurs the line between RDD and relational table. It offers much tighter integration between relational and procedural processing, through declarative DataFrame APIs which integrates with Spark code. It also provides higher optimization. DataFrame API and Datasets API are the ways to interact with Spark SQL.

With Spark SQL, Apache Spark is accessible to more users and improves optimization for the current ones. Spark SQL provides DataFrame APIs which perform relational operations on both external data sources and Spark’s built-in distributed collections. It introduces an extensible optimizer called Catalyst as it helps in supporting a wide range of data sources and algorithms in Big-data.

Spark runs on both Windows and UNIX-like systems (e.g. Linux, Microsoft, Mac OS). It is easy to run locally on one machine — all you need is to have java installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.

6. Features of Spark SQL :

  1. Cost based optimizer
  2. Mid query fault-tolerance: This is done by scaling thousands of nodes and multi-hour queries using the Spark engine. Follow this guide to Learn more about Spark fault tolerance.
  3. Full compatibility with existing Hive data.
  4. DataFrames and SQL provide a common way to access a variety of data sources. It includes Hive, Avro, Parquet, ORC, JSON, and JDBC.
  5. Provision to carry structured data inside Spark programs, using either SQL or a familiar Data Frame API.

7. What is Dataframe ?

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

— Databricks

8. Key features of Dataframe

  • Autometic optimization of code due to catalyst optimizer
  • We can run SQL queries on top of dataframe using Spark Sql.
  • Language support available — python, java, scala , R
  • Provide data source api to read dataframe in multiple formats.

9. How to create Dataframe

I. Using toDF() function

val dfFromRDD1 = rdd.toDF()

II. Using createDataFrame() with the Row type

import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.Rowval schema = StructType( Array(StructField("language", StringType,true),StructField("language", StringType,true)))val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

III. Using toDF() on List or Seq collection

val dfFromData1 = data.toDF()

IV. Create Spark DataFrame from CSV

val df2 = spark.read.csv("/src/resources/file.csv")

V. Creating from JSON file

val df2 = spark.read.json("/src/resources/file.json")

VI. Creating from Hive

val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)val hiveDF = hiveContext.sql(“select * from emp”)

VII. Spark Create DataFrame from RDBMS From Mysql table

val df_mysql = spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://localhost:port/db”).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “tablename”).option(“user”, “user”).option(“password”, “password”).load()

10. What is Spark Dataset?

Dataset is a data structure in SparkSQL which is strongly typed and is a map to a relational schema. It represents structured queries with encoders. It is an extension to data frame API. Spark Dataset provides both type safety and object-oriented programming interface. We encounter the release of the dataset in Spark 1.6.

Dataset clubs the features of RDD and DataFrame. It provides:

  • The convenience of RDD.
  • Performance optimization of DataFrame.
  • Static type-safety of Scala.

Thus, Datasets provides a more functional programming interface to work with structured data.

11. What is Typesafety?

With type safety, programming languages prevents type errors, or we can say that type safety means the compiler will validate type while compiling, and throw an error when we try to assign a wrong type to a variable.

12. Why Dataframes are not type safe whereas Datasets are type safe?

In Apache Spark 2.0, these two APIs are unified and said we can consider Dataframe as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects.

Spark checks DataFrame type align to those of that are in given schema or not, in run time and not in compile time. It is because elements in DataFrame are of Row type and Row type cannot be parameterized by a type by a compiler in compile time so the compiler cannot check its type. Because of that DataFrame is untyped and it is not type-safe.

Datasets on the other hand check whether types conform to the specification at compile time. That’s why Datasets are type safe.

13. Spark SQL execution plan:

Whenever we create a dataframe or Spark SQL or a HIVE query, spark will

i. Generate an Unresolved Logical Plan.

ii. Then it will apply Analysis rules and Schema catalog to convert into a Resolved Logical Plan.

iii. The Optimization rules will finally create an Optimized Logical Plan.

The above mentioned 3 steps fall under Catalyst Optimizer which is responsible to optimize the logical plan.

Here, Logical plan is nothing but Lineage. Whenever an action is called, this lineage will be converted into a DAG, basically a Physical plan.

iv. The Optimized Logical plan will be transformed to multiple Physical Plans by applying set of Planning strategies.

v. Finally Cost Model will choose the optimal Physical Plan which will be converted into an RDD.

The main reason Spark converts an Unresolved Logical Plan into an Optimized Physical Plan is to minimize the response time of the query execution.

*A logical plan describes the computation on datasets without defining how to conduct the computation.

**A Physical Plan describes computation on datasets with specific definitions on how to conduct the computation. Basically Physical plan is executable.

14. Create Dataframe → RDD

To convert a dataframe back to rdd simply use the .rdd method:

rdd = df.rdd

But the setback here is that it may not give the regular spark RDD, it may return a Row object. In order to have the regular RDD format run the code below:

rdd = df.rdd.map(tuple)
or
rdd = df.rdd.map(list)

15. Create RDD → DataFrame

i)Using toDF() function

Once we have an RDD, let’s use toDF() to create DataFrame in Spark. By default, it creates column names as “_1” and “_2” as we have two columns for each row.

val dfFromRDD1 = rdd.toDF()

ii)Using Spark createDataFrame() from SparkSession

Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.

import org.apache.spark.sql.types._
val data = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse")
)
val schema = StructType(
List(
StructField("number", IntegerType, true),
StructField("word", StringType, true)
)
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

16. If spark application stores a data into a target file which are duplicated records then How to remove duplicates rows in the target ?

Duplicates rows can be removed or deleted by using distinct() & dropDuplicates() function.

When same values are present in all columns then distinct() can be used whereas dropDuplicates() is used to remove the rows that are having same values in multiple columns.

Val disDF= df.distinct()

Val dropdupDF= df.dropDuplicates(“column_name1”,”column_name2”)

17. Basic operations on DataFrames

  • Count how many rows there are.

df.count()

  • The names of the columns in the DataFrame can be accessed.

df.columns

  • The DataType of columns within the DataFrame can be accessed.

df.dtypes

  • Examine how Spark saves the DataFrame’s schema.

df.schema

  • In a heirarchical way, print the DataFrame’s schema.

df.printSchema()

  • The contents of the DataFrame are shown.

df.show()

  • Choose certain columns from the DataFrame.

df.select(“col1”, “col2”)

  • Filter

Filter the rows according to a set of criteria. Let’s see if we can locate the rows with id = 1. The condition can be specified in a variety of ways.

from pyspark.sql.functions import col

df.filter(df[“id”] == 1)

df.filter(df.id == 1)

df.filter(col(“id”) == 1)

df.filter(“id = 1”)

Note:We must import the “col” function before we can use it.

  • Remove a certain Column

newdf = df.drop(“id”)

Note:Because DataFrames are immutable, this action will not remove the column from the “df” DataFrame. It will, however, return a new DataFrame that is missing that column.

18. Aggregations on DF

  • groupBy

The groupBy function may be used to group the data, and then the “agg” function can be used to aggregate the grouped data.

(df.groupBy(“col1”) .agg(

count(“col2”).alias(“count”),

sum(“col2”).alias(“sum”),

max(“col2”).alias(“max”),

min(“col2”).alias(“min”),

avg(“col2”).alias(“avg”)

).show()

)

Jupyter Notebook Viewer (nbviewer.org)

19. Sorting on DF

  • Sort the information by “id.” Sorting is done in ascending order by default.

df.sort(“id”)

  • Sort the information in decreasing order.

df.sort(desc(“id”)).show()

20. Joins

On several DataFrames, we may execute various sorts of joins. Let’s say you want to combine two DataFrames df1 and df2 based on the “id” column.

df1.join(df2, df1[“id”] == df2[“id”])

An inner join is done by default. Other joins, such as “left outer,” “right outer,” and “full outer,” can be performed by giving these as the third parameter.

df1.join(df2, df1[“id”] == df2[“id”], “left_outer”)

21. Executing SQL like queries

We may also do data analysis using SQL-like queries. We must register the DataFrame as a Temporary View in order to conduct SQL-like queries.

df.createOrReplaceTempView(“temp_table”)

Now we can execute the SQL like queries as below :

spark.sql(“select * from temp_table where id = 1”).show()

22. Hive operations on DF

  • Creating an HIVE Table from the DataFrame

spark.sql(“select * from temp_table where id = 1”).show()

  • Saving the DataFrame as a HIVE Table

df.write.saveAsTable(“DB_NAME.TBL_NAME”)

For “overwrite,” “append,” “error,” and other options, we may use the “mode” parameter.

df.write.saveAsTable(“DB_NAME.TBL_NAME”, mode=”overwrite”)

Note:The DataFrame will be saved as an HIVE Managed table by default.

  • Creating an HIVE External table from the DataFrame

df.write.saveAsTable(“DB_NAME.TBL_NAME”, path=<location_of_external_table>)

23. CSV operation on DF

  • Make a DataFrame out of a CSV file

We may use a CSV file to construct a DataFrame and provide different parameters like as a separator, header, schema, inferSchema, and other variables. Let’s assume we have a CSV file with a header delimited by “|” and we want to build the schema automatically.

df = spark.read.csv(“path_to_csv_file”, sep=”|”, header=True, inferSchema=True)

  • A CSV file can be created from a DataFrame.

If, after conducting our analysis, we need to save the DataFrame back to a CSV file, we may do it as follows:

df.write.csv(“path_to_CSV_File”, sep=”|”, header=True, mode=”overwrite”)

24. relational table operation on DF

  • From a relational table, create a DataFrame.

A JDBC URL can be used to access data from relational databases.

relational_df = spark.read.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).load()

  • As a relational table, save the DataFrame.

Using a JDBC URL, we may store the DataFrame as a relational table.

relational_df.write.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).mode(‘overwrite’).save()

25. withColumn() on DF

  1. Change DataType

The below statement changes the datatype from source datatype to Integer

df.withColumn("column_name",col("column_name").cast("Integer")).show()

2. Update The Value of an Existing Column

df.withColumn("Column_name",col("Column_name")+500).show()

3. Create a Column from an Existing column

df.withColumn("New_Column",col("Old_Column")*100).show()

4. Add a New Column using withColumn()

Make sure this new column not already present on DataFrame, if it presents it updates the value of that column.

df.withColumn("New_Column", lit("Sample")).show()

5. Rename Column Name

df.withColumnRenamed("Old_Column","New_Column").show()

26. What are the lists of available read modes in Spark with examples?

When to specify the read mode?

In some scenario, we might get improper/corrupted data. in that case, we have to follow either one of the options below.

  • Drop the corrupted records
  • Handle the corrupted records and store them a separate location
  • Fail the job if we get corrupted record

So, if we want to follow any one of the option above then we have to specify the read mode explicitly.

1. DROPMALFORMED:

employee_df = spark.read
.option("inferSchema", True)
.option("header", True)
.option("mode", "DROPMALFORMED")
.csv("./employee.csv").cache()

When we specify the mode as DROPMALFORMED, the spark program will ignore the whole corrupted records.

2. PERMISSIVE

This is the default read mode. When we receive a corrupted record it puts the malformed record into a field.

Whenever we read the file without specifying the mode, the spark program consider default mode i.e PERMISSIVE

5. FAILFAST

employee_df = spark.read
.option("inferSchema", True)
.option("header", True)
.option("mode", "FAILFAST")
.csv("./employee.csv").cache()

When the spark program meets a corrupted record, this mode throws an exception and stops the spark job.

27. What are the lists of available write/save modes in Spark with examples?

1. append mode

employee_df.write.mode("append")
.format("csv")
.option("header", True)
.save("./output/employee")

When we write or save a data frame into a data source if the data or folder already exists then the data will be appended to the existing folder.

2. overwrite mode

employee_df.write.mode("overwrite")
.format("csv")
.option("header", True)
.save("./output/employee")

When we write or save a data frame into a data source if the data or folder already exists then the existing folder is completely removed or overwritten by the spark program and we can see only the latest data in that folder.

3. ignore mode

employee_df.write.mode("ignore")
.format("csv")
.option("header", True)
.save("./output/employee")

When we write or save a data frame into a data source if the data or folder already exists then the current data won’t be stored in the mentioned location. The spark program will skip the write operation without throwing any error.

4. errorifexists mode

employee_df.write.mode("errorifexists")
.format("csv")
.option("header", True)
.save("./output/employee")

When we write or save a data frame into a data source, if the data or folder already exists then the spark program will throw an error and stops the spark job.
Whenever we write the file without specifying the mode, the spark program consider default mode i.e errorifexists

28. Types of Apache Spark tables and views

1. Global Managed Table

A managed table is a Spark SQL table for which Spark manages both the data and the metadata. A global managed table is available across all clusters. When you drop the table both data and metadata gets dropped.

dataframe.write.saveAsTable("my_table")

2. Global Unmanaged/External Table

Spark manages the metadata, while you control the data location. As soon as you add ‘path’ option in dataframe writer it will be treated as global external/unmanaged table. When you drop table only metadata gets dropped. A global unmanaged/external table is available across all clusters.

dataframe.write.option('path', "<your-storage-path>").saveAsTable("my_table")

3. Local Table (a.k.a) Temporary Table (a.k.a) Temporary View

Spark session scoped. A local table is not accessible from other clusters (or if using databricks notebook not in other notebooks as well) and is not registered in the metastore.

dataframe.createOrReplaceTempView()

4. Global Temporary View

Spark application scoped, global temporary views are tied to a system preserved temporary database global_temp. This view can be shared across different spark sessions (or if using databricks notebooks, then shared across notebooks).

dataframe.createOrReplaceGlobalTempView("my_global_view")

can be accessed as,

spark.read.table("global_temp.my_global_view")

5. Global Permanent View

Persist a dataframe as permanent view. The view definition is recorded in the underlying metastore. You can only create permanent view on global managed table or global unmanaged table. Not allowed to create a permanent view on top of any temporary views or dataframe. Note: Permanent views are only available in SQL API — not available in dataframe API

spark.sql("CREATE VIEW permanent_view AS SELECT * FROM table")

There isn’t a function like dataframe.createOrReplacePermanentView()

--

--

--

This is a repository of my thoughts,my random interest & notes taken down as I navigate my way through the tech world! I am glad if you find anything useful !

Love podcasts or audiobooks? Learn on the go with our new app.

Formatting Flutter: May the theme be with you…

How To Execute Azure Data Factory Pipeline From Logic Apps

Logic Apps- Initialize variable

CS 371p Spring 2021: Brinda Prasad

How To Use Script Activity in Azure Data Factory

Script Activity in ADF

eGoGames Esports Platform Uses Rockset for Real-Time Analytics on Gaming Data

What’s wrong with SQS and why we gave it up

A Supercomputer Analyzed Covid-19 — and an Interesting New Theory Has Emerged

Apache Spark interview questions Set 2

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Nivedita Mondal

Nivedita Mondal

This is a repository of my thoughts,my random interest & notes taken down as I navigate my way through the tech world! I am glad if you find anything useful !

More from Medium

Acing Apache Spark RDD Interview Questions Series-2 using PySpark

Map or Reduce or Both for a given query/job

Map Reduce Flow

Achieve SAS Merge using Pyspark

PySpark fundamentals