[Spark By Example] Date & Timestamp

Handling date and time is one of most important part of data processing. But due to its complex formats, you need to how to convert types and manipulate date and time.

Date & Time

In Spark, there are 2 types: DateType and TimestampType.

  • Default Format
    • DateType: yyyy-MM-dd
    • TimestampType: yyyy-MM-dd HH:mm:ss.SSSS
  • Convert strings to date/timestamp
    • to_date(column, {format})
    • to_timestamp(column, {format})
  • Get Current data or time
    • current_date()
    • current_timestamp()
  • Extract the part of date
    • year(column)
    • month(column)
    • dayofweek(column): 1 (Sunday) to 7 (Saturday)
    • dayofmonth(column)
    • dayofyear(column)
  • Extract the part of time
    • hour(column)
    • minute(column)
    • second(column)

Data File

  • Create a csv file with the following content.
from_date,to_date,date_time
2022-01-01,12/31/2022,2022-5-3 10:23:31.301
2023-01-01,12/31/2023,2023-12-25 0:0:0

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("date-time-test")
          .getOrCreate())

  data_file = "test/data/date-time.csv"

  # schema
  custom_schema = StructType([
        StructField("from_date", DateType(), False),
        StructField("to_date", StringType(), False),
        StructField("date_time", TimestampType(), False)])

  original_df = (spark.read.format("csv")
          .option("header", True)          
          .schema(custom_schema)
          .load(data_file))
  original_df.show()
  original_df.printSchema()

  # conversion
  converted_df = original_df.withColumn(
    "to_date", to_date(col("to_date"), "MM/dd/yyyy"))
  converted_df.show()
  converted_df.printSchema()

  # extract info
  converted_df.select(
    col("date_time"),
    year(col("date_time")).alias("Year"),
    month(col("date_time")).alias("Month"),
    dayofmonth(col("date_time")).alias("Day"),
    dayofweek(col("date_time")).alias("DayofWeek"),
    dayofyear(col("date_time")).alias("DayofYear"),
    hour(col("date_time")).alias("Hour"),
    minute(col("date_time")).alias("Minute"),
    second(col("date_time")).alias("Second")
  ).show()

  # add current date and timestamp
  current_date_df = converted_df.withColumn(
    "current_date", current_date()
  ).withColumn(
    "current_timestamp", current_timestamp()
  )
  current_date_df.show()
  current_date_df.printSchema()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class DateTimestamp
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("date-time-test")
                    .GetOrCreate();

            string filePath = "data/date-time.csv";

            // schema
            StructField[] fields = {
                new StructField("from_date", new DateType(), false),
                new StructField("to_date", new StringType(), false),
                new StructField("date_time", new TimestampType(), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame originalDF = spark.Read()
                .Format("csv")
                .Option("header", true)
                .Schema(schema)
                .Load(filePath);
            originalDF.Show();
            originalDF.PrintSchema();

            // conversion
            DataFrame convertedDF = originalDF.WithColumn(
                    "to_date", ToDate(Col("to_date"), "MM/dd/yyyy")
                );
            convertedDF.Show();
            convertedDF.PrintSchema();

            // extract info
            convertedDF.Select(
              Col("date_time"),
              Year(Col("date_time")).Alias("Year"),
              Month(Col("date_time")).Alias("Month"),
              DayOfMonth(Col("date_time")).Alias("Day"),
              DayOfWeek(Col("date_time")).Alias("DayofWeek"),
              DayOfYear(Col("date_time")).Alias("DayofYear"),
              Hour(Col("date_time")).Alias("Hour"),
              Minute(Col("date_time")).Alias("Minute"),
              Second(Col("date_time")).Alias("Second")
            ).Show();

            // add current date and timestamp
            DataFrame currentDateDF = convertedDF.
                WithColumn(
                    "current_date", CurrentDate()
                ).WithColumn(
                    "current_timestamp", CurrentTimestamp()
                );
            currentDateDF.Show();
            currentDateDF.PrintSchema();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s