Apache Spark withColumn VS Select
I have been working with Spark Scala for some years now. The two most common Spark functions I work with are select() and withColumn(). Both functions are very useful when it comes to selecting certain columns, adding new columns, renaming columns, performing some aggregation on a column, and so on.
I will explain the use of each of them below.
Note: there are many other ways to utilise them. I’m sure if you do more research, you will come across different ways apart from the ones explained below.
I’m going to be using Spark-scala for the coding
First of all, let’s create a Session trait and define SparkSession there. I will be importing the Session trait into my class later. You can skip this part and jump straight to the code session. I know many of you know how to create a SparkSession.
import org.apache.spark.sql.SparkSession
trait Session {
val sparkSn: SparkSession = SparkSession.builder()
.appName(“com.stephen”)
.master(“local[*]”)
.getOrCreate()
sparkSn.sparkContext.setLogLevel(“ERROR”)
}
Now, let’s create a simple dataframe with just three records.
import com.stephen.Session
import org.apache.spark.sql.functions._
class Tutorial1 extends Session{
val data = Seq((“0”, “Mango”, “20”), (“1”, “Apple”, “16”), (“2”, null, “10”))
val columns = Seq(“id”, “fruit”, “price”)
import sparkSn.implicits._
val df1 = data.toDF(columns:_*)
df1.show(false)
}
+---+-----+-----+ |id |fruit|price| +---+-----+-----+ |0 |Mango|20 | |1 |Apple|16 | |2 |null |10 | +---+-----+-----+
WithColumn
You can use withColumn to transform your data in a variety of ways such as adding columns and aggregation. First of all, let’s add a new column.
I’m going to add a new column called delivery_date and populate it with current date.
Adding New Column Using withColumn
df2.show(false)
+---+-----+-----+-------------+ |id |fruit|price|delivery_date| +---+-----+-----+-------------+ |0 |Mango|20 |2023-09-21 | |1 |Apple|16 |2023-09-21 | |2 |null |10 |2023-09-21 | +---+-----+-----+-------------+
Spark Aggregation Using withColumn
Now, let’s perform some aggregation on the price column. For better understanding, I will be adding two additional records to the dataFrame.
Below is the new dataframe. You can see Mango and Apple are now duplicated with different id and price.
I’m going to use Window function to partition the dataframe by fruit column and return only fruits with higher prices.
+---+-----+-----+-------------+
|id |fruit|price|delivery_date|
+---+-----+-----+-------------+
|0 |Mango|20 |2023-09-21 |
|1 |Apple|16 |2023-09-21 |
|2 |null |10 |2023-09-21 |
|3 |Mango|25 |2023-09-21 |
|4 |Apple|14 |2023-09-21 |
+---+-----+-----+-------------+
val df3 = df2.withColumn(“price”, max(“price”).over(Window.partitionBy(“fruit”)))
.dropDuplicates(“fruit”)
df3.show(false)
+---+-----+-----+-------------+
|id |fruit|price|delivery_date|
+---+-----+-----+-------------+
|2 |null |10 |2023-09-21 |
|0 |Mango|25 |2023-09-21 |
|1 |Apple|16 |2023-09-21 |
+---+-----+-----+-------------+
You can see only the records with highest price in each partition are returned.
Select
You can use Apache Spark select() function to select some columns from your dataframe. It can do many things that withColumn does. There’s a documentation on select being performed better than withColumn due to query plan projection. What I observed though, is, it depends on how you use the withColumn.
I’m not going to go much into the performance here. So let’s start replicating the code above using select function.
First of all, let’s select all the columns.
df4.show(false)
+---+-----+-----+
|id |fruit|price|
+---+-----+-----+
|0 |Mango|20 |
|1 |Apple|16 |
|2 |null |10 |
|3 |Mango|25 |
|4 |Apple|14 |
+---+-----+-----+
Adding New Column Using Select
df5.show(false)
+---+-----+-----+-------------+
|id |fruit|price|delivery_date|
+---+-----+-----+-------------+
|0 |Mango|20 |2023-09-22 |
|1 |Apple|16 |2023-09-22 |
|2 |null |10 |2023-09-22 |
|3 |Mango|25 |2023-09-22 |
|4 |Apple|14 |2023-09-22 |
+---+-----+-----+-------------+
Spark Aggregation Using Select
val df6 = df5.select(col(“id”), col(“fruit”), col(“delivery_date”),
max(“price”).over(Window.partitionBy(“fruit”)).as(“price”))
.dropDuplicates(“fruit”)
df6.show(false)
+---+-----+-------------+-----+
|id |fruit|delivery_date|price|
+---+-----+-------------+-----+
|2 |null |2023-09-22 |10 |
|0 |Mango|2023-09-22 |25 |
|1 |Apple|2023-09-22 |16 |
+---+-----+-------------+-----+
Full Code
package com.stephen.tutorial
import com.stephen.Session
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
class Tutorial1 extends Session{
val data = Seq((“0”, “Mango”, “20”), (“1”, “Apple”, “16”),
(“2”, null, “10”), (“3”, “Mango”, “25”), (“4”, “Apple”, “14”))
val columns = Seq(“id”, “fruit”, “price”)
import sparkSn.implicits._
val df1 = data.toDF(columns:_*)
df1.show(false)
val df2 = df1.withColumn(“delivery_date”, current_date())
df2.show(false)
val df3 = df2.withColumn(“price2”, max(“price”).over(Window.partitionBy(“fruit”)))
.dropDuplicates(“fruit”)
df3.show(false)
val df4 = df1.select(“*”)
df4.show(false)
val df5 = df1.select(col(“*”), current_date().as(“delivery_date”))
df5.show(false)
val df6 = df5.select(col(“id”), col(“fruit”), col(“delivery_date”),
max(“price”).over(Window.partitionBy(“fruit”)).as(“price”))
.dropDuplicates(“fruit”)
df6.show(false)
}
Conclusion
As shown in the code above, both withColumn and Select can be used to transform the data in a way that Spark produces the same outcome.
Using withColumn, you are bound to perform your transformation on the entire parent dataframe data. However, with select, you can choose to reduce the number of columns of the dataframe, which means reducing the amount of data you want to transform.