Handling date and time is one of most important part of data processing. But due to its complex formats, you need to how to convert types and manipulate date and time.
Date & Time
In Spark, there are 2 types: DateType and TimestampType.
- Default Format
- DateType: yyyy-MM-dd
- TimestampType: yyyy-MM-dd HH:mm:ss.SSSS
- Convert strings to date/timestamp
- to_date(column, {format})
- to_timestamp(column, {format})
- Get Current data or time
- current_date()
- current_timestamp()
- Extract the part of date
- year(column)
- month(column)
- dayofweek(column): 1 (Sunday) to 7 (Saturday)
- dayofmonth(column)
- dayofyear(column)
- Extract the part of time
- hour(column)
- minute(column)
- second(column)
Data File
- Create a csv file with the following content.
from_date,to_date,date_time
2022-01-01,12/31/2022,2022-5-3 10:23:31.301
2023-01-01,12/31/2023,2023-12-25 0:0:0
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("date-time-test")
.getOrCreate())
data_file = "test/data/date-time.csv"
# schema
custom_schema = StructType([
StructField("from_date", DateType(), False),
StructField("to_date", StringType(), False),
StructField("date_time", TimestampType(), False)])
original_df = (spark.read.format("csv")
.option("header", True)
.schema(custom_schema)
.load(data_file))
original_df.show()
original_df.printSchema()
# conversion
converted_df = original_df.withColumn(
"to_date", to_date(col("to_date"), "MM/dd/yyyy"))
converted_df.show()
converted_df.printSchema()
# extract info
converted_df.select(
col("date_time"),
year(col("date_time")).alias("Year"),
month(col("date_time")).alias("Month"),
dayofmonth(col("date_time")).alias("Day"),
dayofweek(col("date_time")).alias("DayofWeek"),
dayofyear(col("date_time")).alias("DayofYear"),
hour(col("date_time")).alias("Hour"),
minute(col("date_time")).alias("Minute"),
second(col("date_time")).alias("Second")
).show()
# add current date and timestamp
current_date_df = converted_df.withColumn(
"current_date", current_date()
).withColumn(
"current_timestamp", current_timestamp()
)
current_date_df.show()
current_date_df.printSchema()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class DateTimestamp
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("date-time-test")
.GetOrCreate();
string filePath = "data/date-time.csv";
// schema
StructField[] fields = {
new StructField("from_date", new DateType(), false),
new StructField("to_date", new StringType(), false),
new StructField("date_time", new TimestampType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame originalDF = spark.Read()
.Format("csv")
.Option("header", true)
.Schema(schema)
.Load(filePath);
originalDF.Show();
originalDF.PrintSchema();
// conversion
DataFrame convertedDF = originalDF.WithColumn(
"to_date", ToDate(Col("to_date"), "MM/dd/yyyy")
);
convertedDF.Show();
convertedDF.PrintSchema();
// extract info
convertedDF.Select(
Col("date_time"),
Year(Col("date_time")).Alias("Year"),
Month(Col("date_time")).Alias("Month"),
DayOfMonth(Col("date_time")).Alias("Day"),
DayOfWeek(Col("date_time")).Alias("DayofWeek"),
DayOfYear(Col("date_time")).Alias("DayofYear"),
Hour(Col("date_time")).Alias("Hour"),
Minute(Col("date_time")).Alias("Minute"),
Second(Col("date_time")).Alias("Second")
).Show();
// add current date and timestamp
DataFrame currentDateDF = convertedDF.
WithColumn(
"current_date", CurrentDate()
).WithColumn(
"current_timestamp", CurrentTimestamp()
);
currentDateDF.Show();
currentDateDF.PrintSchema();
spark.Stop();
}
}
}