The following sample code (by C#) shows how to read data from the Microsoft SQL Server.
How to access SQL server from Spark
- Download the JDBC driver
- You can find the jar file based on your Java version.
- ex) sqljdbc_11.2\enu\mssql-jdbc-11.2.1.jre8.jar
- Create a Connection String
# Connection String Examples
jdbc:sqlserver://localhost:1433;encrypt=true;databaseName=AdventureWorks;integratedSecurity=true;
jdbc:sqlserver://localhost;databaseName=AdventureWorks;encrypt=true;user=MyUserName;password=*****;
# will pass the username and password later as options
jdbc:sqlserver://localhost;databaseName=AdventureWorks;encrypt=true;trustServerCertificate=true;
Create a test Table and Data
CREATE TABLE People (
Id INT IDENTITY(1,1) PRIMARY KEY,
FirstName NVARCHAR(255) NOT NULL,
LastName NVARCHAR(255) NOT NULL,
Age INT
);
INSERT INTO People (FirstName, LastName, Age)
VALUES
('Carl', 'Adams', 29),
('Abigail', 'Hills', 33),
('Eric', 'Baker', 40);
INSERT INTO People (FirstName, LastName)
VALUES
('Anne', 'Smith'),
('Emily', 'Klein');
Setup SQL Server TCP Connection
- You need to open the “SQL Configuration Manager” and enable the “TCP” connection.
Result
C# Application
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples.AccessDB
{
internal class AccessSQLServer
{
public static void Run()
{
SparkSession spark =
SparkSession.Builder()
.AppName("dataframe-schema")
.GetOrCreate();
// Read the table
DataFrame df = ReadTable(spark, "People");
df.Show();
// Query 1
var query1 = "SELECT FirstName, LastName, Age FROM People";
DataFrame df1 = ExecuteQuery(spark, query1)
.OrderBy(Col("LastName").Desc());
df1.Show();
// Query 2
var query2 = "SELECT FirstName, LastName, Age FROM People WHERE Age IS NOT NULL";
DataFrame df2= ExecuteQuery(spark, query2);
df2.Show();
spark.Stop();
}
private static string GetConnectionString()
{
return @"jdbc:sqlserver://localhost;databaseName=Test;encrypt=true;trustServerCertificate=true;";
}
private static Dictionary<string, string> GetBaseOptions()
{
var connectionString = GetConnectionString();
return new Dictionary<string, string>
{
{ "url", connectionString },
{ "user", "dbuser" },
{ "password", "password1234" },
{ "driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver" }
};
}
private static DataFrame ReadTable(SparkSession session, string table)
{
var options = GetBaseOptions();
options.Add("dbtable", table);
return session.Read()
.Format("jdbc")
.Options(options)
.Load();
}
private static DataFrame ExecuteQuery(SparkSession session, string query)
{
var options = GetBaseOptions();
options.Add("query", query);
return session.Read()
.Format("jdbc")
.Options(options)
.Load();
}
}
}