Stream Bulk to SQLDB(Python)

Import required libraries

from pyspark.sql.functions import *
from pyspark.sql import *

Set the connection property variables.

Copy and paste the appropriate value based on your Azure SQL Database.

Note, I've hardcoded the password in this example, but good practice would pull from an Azure KeyVault: dbutils.secrets.get(scope = “my-azure-key-vault-scope”, key = “secret-access-key”)

jdbcHostname = "<Server Name>.database.windows.net"
jdbcDatabase = "<Database Name>"
jdbcPort = 1433
jdbcUsername = "<User Name>"
jdbcPassword = "<Password>"

Set the connection string

This could also be passed inline in the JDBC call, but I find it cleaner to setup a connection string ahead of time.

Example:

jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName={database}") \
    .option("dbtable", "Employees") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Batch write function

Create a function that allows you to push a dataframe into Azure SQL Database using JDBC. This will be called within our Structured Streaming

The parameters are as following:

  • URL : JDBC connection string build previously
  • table : Target table name
  • mode : overwrite or append
  • Properties : the connection details which includes the username, password and the driver details.
def writeToSQLDB(df, epochId):
  df.write.jdbc(url=jdbcUrl, table="rates", mode="append", properties=connectionProperties)

Start the structured streaming process of a type RateStreamSource. RateStreamSource is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.

The options available are:

Name Default Value Description
numPartitions (default parallelism) Number of partitions to use
rampUpTime 0 (seconds)
rowsPerSecond 1 Number of rows to generate per second (has to be greater than 0)

This will generate a dataframe with this structure:

Name Type
timestamp TimestampType
value LongType

Here's a good source of information : https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-RateStreamSource.html

# Prepare streaming source; this could be Kafka, Kinesis, or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

Write in a foreachBatch

Write the df as part of the structured streaming.

foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

See documentation on this : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

In our example, we use the writeToSQLDB function we wrote above which executes for each structured streaming batch

df.writeStream \
    .foreachBatch(writeToSQLDB) \
    .outputMode("update") \
    .start()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in SQL DW.

df.writeStream \
  .foreachBatch(writeToSQLDB) \
  .outputMode("update") \
  .start()