The following sample code (by Python and C#) shows how to handle columns in the DataFrame. You can check the number of columns, add a new column, rename the existing column, or even remove columns.
Columns
- Add: add a new column or replace the current one if you use the same name
- withColumn(column name, new column)
- Rename: rename the existing column
- withColumnRenamed(old name, new name)
- Remove: remove the existing columns
- drop(list of columns …)
Data File
- Create a csv file with the following content.
first name,last name,age
Paul,Henderson,20
Grace,Carr,32
Julia,Jackson,14
Alexandra,May,43
Anna,Jones,24
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("column-test")
.getOrCreate())
data_file = "test/data/people.csv"
# schema
custom_schema = StructType([
StructField("first name", StringType(), False),
StructField("last name", StringType(), False),
StructField("age", IntegerType(), False)])
original_df = (spark.read.format("csv")
.option("header", True)
.schema(custom_schema)
.load(data_file))
original_df.show()
# rename the existing column
renamed_df = original_df.withColumnRenamed(
"last name", "surname" # remove spaces
)
renamed_df.show()
# add a new column
added_df = renamed_df.withColumn(
"fullname",
concat_ws(" ", col("first name"), col("surname"))
)
added_df.show()
# remove existing columns
final_df = added_df.drop("first name", "surname")
final_df.show()
# check the number of columns of each dataframe
print("original_df: ", len(original_df.columns)) # 3
print("renamed_df: ", len(renamed_df.columns)) # 3
print("added_df: ", len(added_df.columns)) # 4
print("final_df: ", len(final_df.columns)) # 2
# retrieves column names
for column_name in original_df.columns:
print(column_name)
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class People
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("column-test")
.GetOrCreate();
string filePath = "data/people.csv";
// schema
StructField[] fields = {
new StructField("first name", new StringType(), false),
new StructField("last name", new StringType(), false),
new StructField("age", new IntegerType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame originalDF = spark.Read()
.Format("csv")
.Option("header", true)
.Schema(schema)
.Load(filePath);
originalDF.Show();
// rename the existing column
DataFrame renamedDF = originalDF
.WithColumnRenamed("last name", "surname");
renamedDF.Show();
// add a new column
DataFrame addedDF = renamedDF
.WithColumn("fullname",
ConcatWs(" ", Col("first name"), Col("surname")));
addedDF.Show();
// remove existing columns
DataFrame finalDF = addedDF
.Drop("first name", "surname");
finalDF.Show();
// check the number of columns of each dataframe
Console.WriteLine("originalDF: {0}", originalDF.Columns().Count); // 3
Console.WriteLine("renamedDF: {0}", renamedDF.Columns().Count); // 3
Console.WriteLine("addedDF: {0}", addedDF.Columns().Count); // 4
Console.WriteLine("finalDF: {0}", finalDF.Columns().Count); // 2
// retrieves column names
foreach(string columnName in originalDF.Columns())
{
Console.WriteLine(columnName);
}
spark.Stop();
}
}
}