[Spark By Example] Read SQL Server

The following sample code (by C#) shows how to read data from the Microsoft SQL Server.

How to access SQL server from Spark

  1. Download the JDBC driver
  2. You can find the jar file based on your Java version.
    • ex) sqljdbc_11.2\enu\mssql-jdbc-11.2.1.jre8.jar
  3. Create a Connection String
# Connection String Examples

jdbc:sqlserver://localhost:1433;encrypt=true;databaseName=AdventureWorks;integratedSecurity=true;

jdbc:sqlserver://localhost;databaseName=AdventureWorks;encrypt=true;user=MyUserName;password=*****;

# will pass the username and password later as options
jdbc:sqlserver://localhost;databaseName=AdventureWorks;encrypt=true;trustServerCertificate=true;

Create a test Table and Data

CREATE TABLE People (
  Id INT IDENTITY(1,1) PRIMARY KEY,
  FirstName NVARCHAR(255) NOT NULL,
  LastName NVARCHAR(255) NOT NULL,
  Age INT
);

INSERT INTO People (FirstName, LastName, Age)
VALUES
  ('Carl', 'Adams', 29),
  ('Abigail', 'Hills', 33),
  ('Eric', 'Baker', 40);

INSERT INTO People (FirstName, LastName)
VALUES
  ('Anne', 'Smith'),
  ('Emily', 'Klein');

Setup SQL Server TCP Connection

  • You need to open the “SQL Configuration Manager” and enable the “TCP” connection.

Result


C# Application

using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples.AccessDB
{
    internal class AccessSQLServer
    {

        public static void Run()
        {
            SparkSession spark =
                SparkSession.Builder()
                .AppName("dataframe-schema")
                .GetOrCreate();

            // Read the table
            DataFrame df = ReadTable(spark, "People");
            df.Show();

            // Query 1
            var query1 = "SELECT FirstName, LastName, Age FROM People";
            DataFrame df1 = ExecuteQuery(spark, query1)
                .OrderBy(Col("LastName").Desc());
            df1.Show();

            // Query 2
            var query2 = "SELECT FirstName, LastName, Age FROM People WHERE Age IS NOT NULL";
            DataFrame df2= ExecuteQuery(spark, query2);
            df2.Show();

            spark.Stop();
        }

        private static string GetConnectionString()
        {
            return @"jdbc:sqlserver://localhost;databaseName=Test;encrypt=true;trustServerCertificate=true;";
        }

        private static Dictionary<string, string> GetBaseOptions()
        {
            var connectionString = GetConnectionString();

            return new Dictionary<string, string>
            {
                { "url", connectionString },
                { "user", "dbuser" },
                { "password", "password1234" },
                { "driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver" }
            };
        }

        private static DataFrame ReadTable(SparkSession session, string table)
        {
            var options = GetBaseOptions();
            options.Add("dbtable", table);

            return session.Read()
                .Format("jdbc")
                .Options(options)
                .Load();
        }

        private static DataFrame ExecuteQuery(SparkSession session, string query)
        {
            var options = GetBaseOptions();
            options.Add("query", query);

            return session.Read()
                .Format("jdbc")
                .Options(options)
                .Load();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s