[Spark By Example] DataFrame Columns

The following sample code (by Python and C#) shows how to handle columns in the DataFrame. You can check the number of columns, add a new column, rename the existing column, or even remove columns.

Columns

  • Add: add a new column or replace the current one if you use the same name
    • withColumn(column name, new column)
  • Rename: rename the existing column
    • withColumnRenamed(old name, new name)
  • Remove: remove the existing columns
    • drop(list of columns …)

Data File

  • Create a csv file with the following content.
first name,last name,age
Paul,Henderson,20
Grace,Carr,32
Julia,Jackson,14
Alexandra,May,43
Anna,Jones,24

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("column-test")
          .getOrCreate())

  data_file = "test/data/people.csv"

  # schema
  custom_schema = StructType([
        StructField("first name", StringType(), False),
        StructField("last name", StringType(), False),
        StructField("age", IntegerType(), False)])

  original_df = (spark.read.format("csv")
          .option("header", True)
          .schema(custom_schema)
          .load(data_file))
  original_df.show()
  
  # rename the existing column
  renamed_df = original_df.withColumnRenamed(
        "last name", "surname" # remove spaces
  )
  renamed_df.show()

  # add a new column
  added_df = renamed_df.withColumn(
    "fullname", 
    concat_ws(" ", col("first name"), col("surname"))
  )
  added_df.show()

  # remove existing columns
  final_df = added_df.drop("first name", "surname")
  final_df.show()

  # check the number of columns of each dataframe  
  print("original_df: ", len(original_df.columns)) # 3
  print("renamed_df: ", len(renamed_df.columns)) # 3
  print("added_df: ", len(added_df.columns)) # 4
  print("final_df: ", len(final_df.columns)) # 2
  
  # retrieves column names
  for column_name in original_df.columns:
        print(column_name)

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class People
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("column-test")
                    .GetOrCreate();

            string filePath = "data/people.csv";

            // schema
            StructField[] fields = {
                new StructField("first name", new StringType(), false),
                new StructField("last name", new StringType(), false),
                new StructField("age", new IntegerType(), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame originalDF = spark.Read()
                .Format("csv")
                .Option("header", true)
                .Schema(schema)
                .Load(filePath);
            originalDF.Show();

            // rename the existing column
            DataFrame renamedDF = originalDF
                .WithColumnRenamed("last name", "surname");
            renamedDF.Show();

            // add a new column
            DataFrame addedDF = renamedDF
                .WithColumn("fullname",
                    ConcatWs(" ", Col("first name"), Col("surname")));
            addedDF.Show();

            // remove existing columns
            DataFrame finalDF = addedDF
                .Drop("first name", "surname");
            finalDF.Show();

            // check the number of columns of each dataframe 
            Console.WriteLine("originalDF: {0}", originalDF.Columns().Count); // 3
            Console.WriteLine("renamedDF: {0}", renamedDF.Columns().Count); // 3
            Console.WriteLine("addedDF: {0}", addedDF.Columns().Count); // 4
            Console.WriteLine("finalDF: {0}", finalDF.Columns().Count); // 2

            // retrieves column names
            foreach(string columnName in originalDF.Columns())
            {
                Console.WriteLine(columnName);
            }

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s