How to build a pipeline that converts CDAs, FHIR version STU3, and FHIR version DSTU2 data to FHIR R4 and writes it to Azure Synapse
Databricks is a unified data analytics platform that provides a collaborative environment for data scientists, data engineers, and business analysts. It is built on top of Apache Spark and provides a number of features to simplify the process of building and deploying data pipelines. This case study uses Databricks with Delta Lake Tables to build a pipeline that converts CDAs, FHIR version STU3, and FHIR version DSTU2 data to FHIR R4 and writes it to Azure Synapse.
Overview
The instructional guide provides an overview of the technical flow. After uploading sample data to Azure Blob Storage, it will be used as a source for Databricks Pipelines. This will set up the necessary infrastructure for continued watermarked processing, transform the data using Databricks and the Orchestrate SDK, and write the transformed data to Azure Synapse.
Note: This case study uses an Azure account, the
databricks
CLI for Azure Databricks, and theaz
CLI for Azure interactions. Install these prerequisites by following the instructions in the Microsoft documentation for the Databricks CLI and the Azure CLI.Secret Storage
Secrets must be available to access both the Orchestrate API and the Azure Blob Storage accounts. To store these secrets in Databricks, create a secret scope.
databricks secrets create-scope orchestrate-api
Then add a secret for the Orchestrate API key.
"<your-api-key>" | databricks secrets put-secret orchestrate-api case-study-api-key
Source Setup
An Azure blob storage account and container will serve as the source of data for this case study. Name the container datasets
, then create a secret case-study-storage-account
for the storage account key.
'{ "storageAccountName": "<your-storage-account-name>", "storageAccountKey": "<your-storage-account-key>" }' | databricks secrets put-secret orchestrate-api case-study-storage-account
Next, upload some sample data to the blob storage account organized by their format. In a real scenario, these files would likely come from some external source. In this case study, create some sample data in local cda/
, dstu2/
, and stu3/
directories, then upload them to the blob storage account in the same prefixes.
"dstu2/bundle1.json",
"dstu2/bundle2.json",
"stu3/bundle1.json",
"stu3/bundle2.json",
"cda/cda0.xml",
"cda/cda1.xml",
"cda/cda2.xml" |
ForEach-Object {
az storage blob upload `
-c "datasets" `
--account-name orchestratecasestudy `
--file $_ `
--name $_
}
After uploading the data, view it in the Azure portal.
Databricks Pipelines
To start working with Azure Databricks, create a new workspace, then create a new notebook. In order to transform CDAs and FHIR bundles, install the Orchestrate SDK. This can be done with a %pip
command in the notebook.
%pip install orchestrate-api
dbutils.library.restartPython()
import orchestrate
print(orchestrate.__version__)
Now mount the blob storage account in the notebook.
import json
blob_storage_container = "datasets"
blob_config = json.loads(dbutils.secrets.get("orchestrate-api", "case-study-storage-account"))
fs_uri = f"fs.azure.account.key.{blob_config['storageAccountName']}.blob.core.windows.net"
storage_uri = f"wasbs://{blob_storage_container}@{blob_config['storageAccountName']}.blob.core.windows.net"
mount_point = f"/mnt/{blob_storage_container}/"
spark.conf.set(fs_uri, blob_config["storageAccountKey"])
if mount_point not in [mount.mountPoint for mount in dbutils.fs.mounts()]:
dbutils.fs.mount(
source = storage_uri,
mount_point = mount_point,
extra_configs = {fs_uri: blob_config["storageAccountKey"] }
)
To use the Orchestrate SDK in Spark data frames, define a Spark UDF to map the source data to the appropriate conversion function. This function will take the source data and source type as input, then return the converted FHIR R4 data.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from orchestrate import OrchestrateApi
import os
os.environ['ORCHESTRATE_API_KEY'] = dbutils.secrets.get(scope="orchestrate-api", key="case-study-api-key")
api = OrchestrateApi()
def map_convert_function(content: str, source_type: str) -> str:
fhir_r4 = None
match source_type:
case "STU3":
fhir_r4 = api.convert.fhir_stu3_to_fhir_r4(content)
case "DSTU2":
fhir_r4 = api.convert.fhir_dstu2_to_fhir_r4(content)
case "CDA":
fhir_r4 = api.convert.cda_to_fhir_r4(content)
if fhir_r4 is None:
raise ValueError("UnhandledType")
return json.dump(fhir_r4)
convert_source_data = udf(map_convert_function, StringType())
Now define the Delta Lake Tables to read the source data, convert it to FHIR R4, and write it to one final table with consolidated FHIR R4. Define a `source` view for each source type and a raw
table for each source format. Then define a table for the consolidated FHIR R4 data. Note the LIVE
schema reference, which allows Databricks to identify the table in the pipeline. Structuring the data pipeline in this way approximates the medallion architecture:
- The source layer keeps the raw data in its original format and in its original location.
- The raw layer is a direct representation of the source data, with no transformations, but moves the data into the defined blob storage. In a real scenario, it is possible that either the source or raw layers can be omitted depending on the data lake architecture.
- The converted layer is the transformed, silver-level data that follows a consistent format with cleansed terminology appropriate for feature refinement and gold-level pipelines.
Note: In larger data pipelines with streaming data, it is a good idea to consider partition keys and more specific watermarking behavior. For this case study, the pipeline is intentionally simple.
import dlt
from pyspark.sql.functions import lit
@dlt.view
def cda_source():
cda_directory = spark.sparkContext.wholeTextFiles(f"{mount_point}/cda/")
return cda_directory.toDF(["filename", "content"]).withColumn("source_type", lit("CDA"))
@dlt.view
def stu3_source():
stu3_directory = spark.sparkContext.wholeTextFiles(f"{mount_point}/stu3/")
return stu3_directory.toDF(["filename", "content"]).withColumn("source_type", lit("STU3"))
@dlt.view
def dstu2_source():
dstu2_directory = spark.sparkContext.wholeTextFiles(f"{mount_point}/dstu2/")
return dstu2_directory.toDF(["filename", "content"]).withColumn("source_type", lit("DSTU2"))
@dlt.table
def cda_raw():
return dlt.read("cda_source")
@dlt.table
def dstu2_raw():
return dlt.read("dstu2_source")
@dlt.table
def stu3_raw():
return dlt.read("stu3_source")
@dlt.table
def converted_fhir():
def read_and_convert(table: str):
frame = spark.table(table)
return frame.withColumn("fhir", convert_source_data(frame["content"], frame["source_type"]))
cda = read_and_convert("LIVE.cda_raw")
dstu2 = read_and_convert("LIVE.dstu2_raw")
stu3 = read_and_convert("LIVE.stu3_raw")
return cda.unionAll(dstu2).unionAll(stu3)
Next, create a pipeline to execute the conversion workbook. Ensure that the output schema is set to public
, then run the pipeline.
Setup Synapse
Back in Azure, create a Synapse server and database. When asked, supply the name database
for the storage account container name.
Then create a table to store the converted FHIR R4 data. This will house both the original content and the converted bundles.
create table dbo.converted_fhir(
filename nvarchar(256),
content nvarchar(max),
source_type nvarchar(256),
fhir nvarchar(max)
)
with ( clustered index (filename) );
Add some more secrets (case-study-synapse-connection
and case-study-synapse-storage-account
), for the new synapse sever and associated storage account.
'{ "storageAccountName": "<your-storage-account-name>", "storageAccountKey": "<your-storage-account-key>" }' | databricks secrets put-secret orchestrate-api case-study-synapse-storage-account
'{ "endpoint": "<your-endpoint>", "database": "<database>", "user": "<user>", "password": "<password>" }' | databricks secrets put-secret orchestrate-api case-study-synapse-connection
Load Synapse
Create another Databricks workbook, then configure Spark to write to Synapse and its underlying storage. The storage account housing the source data will be used as a temporary location.
import json
table_name = "converted_fhir"
blob_storage_container = "datasets"
synapse_storage_container = "database"
synapse_storage_configuration = json.loads(
dbutils.secrets.get(
scope="orchestrate-api",
key="case-study-synapse-storage-account"
)
)
synapse_configuration = json.loads(
dbutils.secrets.get(
scope="orchestrate-api",
key="case-study-synapse-connection"
)
)
blob_config = json.loads(
dbutils.secrets.get(
"orchestrate-api",
"case-study-storage-account"
)
)
blob_fs_uri = f"fs.azure.account.key.{blob_config['storageAccountName']}.blob.core.windows.net"
blob_storage_uri = f"wasbs://{blob_storage_container}@{blob_config['storageAccountName']}.blob.core.windows.net"
synapse_fs_uri = f"fs.azure.account.key.{synapse_storage_configuration['storageAccountName']}.blob.core.windows.net"
synapse_storage_uri = f"abfss://{synapse_storage_container}@{synapse_storage_configuration['storageAccountName']}.blob.core.windows.net"
synapse_server_name = synapse_configuration["endpoint"]
synapse_database_name = synapse_configuration["database"]
synapse_user = synapse_configuration["user"]
synapse_password = synapse_configuration["password"]
synapse_url = f"jdbc:sqlserver://{synapse_server_name}:1433;database={synapse_database_name};user={synapse_user};password={synapse_password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
spark.conf.set(synapse_fs_uri, synapse_storage_configuration["storageAccountKey"])
spark.conf.set(blob_fs_uri, blob_config["storageAccountKey"])
Finally, read the consolidated converted_fhir
table and write it to Synapse. If this notebook is configured with a pipeline, the Synapse table will be automatically refreshed as new data is added to the source blob storage account.
converted_fhir = spark.readStream.table(f"public.{table_name}")
(
converted_fhir.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", synapse_url) \
.option("dbTable", table_name) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("tempDir", f"{blob_storage_uri}/temp/") \
.option("checkpointLocation", f"/checkpoints")
.start()
)
Orchestrate empowers healthcare data where you already are.
Whether writing an app for longitudinal patient records or converting a CDA data lake in Databricks, Orchestrate provides a path for standardized and enriched healthcare data. Get a free API key and find out how simple it is to integrate into any platform.