The exercise task is referenced here
The data is loaded, exploded by resource and placed into a deltalake table raw.fhir
where it can be queried.
The data is still fairly raw and could be further refined, or the resource column (json string) could be adhoc queried freely in Databricks using the json DOT notation.
| col_name| data_type|comment|
|--------------------|--------------------|-------|
| fullUrl| string| |
| resourceType| string| |
| id| string| |
| resource| string| |
| _KEY| string| |
| _LOAD_TIMESTAMP| timestamp| |
| _SOURCE| string| |
| _CORRUPT_COLUMN| string| |
After executing the pipeline, in the project home directory run:
$SPARK_HOME/bin/pyspark
Examples:
df = spark.sql("select resourceType, count(1) from raw.fhir group by resourceType")
df.show()
Output:
+--------------------+--------+
| resourceType|count(1)|
+--------------------+--------+
| DocumentReference| 4200|
| DiagnosticReport| 7269|
| MedicationRequest| 3418|
| Observation| 21648|
| Device| 6|
| CarePlan| 278|
|ExplanationOfBenefit| 4200|
| Provenance| 79|
| ImagingStudy| 24|
|MedicationAdminis...| 18|
| Claim| 7618|
| Immunization| 1030|
| Procedure| 6193|
| Medication| 18|
| Patient| 79|
| SupplyDelivery| 156|
| Condition| 2795|
| CareTeam| 278|
| Encounter| 4200|
| AllergyIntolerance| 57|
+--------------------+--------+
Running the build script will build the python project and deploy it to the docker compose setup. As a quickstart I've used a spark base docker image open sourced from RockTheJVM courses of which I'm a paid subscriber.
This obvisouly requires a desktop installation of docker.
Build the docker images, this may take several minutes:
cd spark-standalone
chmod +x ./build-images.sh
./build-images.sh
Build and submit the fhir_pipeline app from the project root dir:
sh build.sh
cd spark-standalone
docker-compose up
To query the resulting table, run and attach to the docker image. From the project root dir:
cd spark-standalone
# ensure that docker compose of the app run is down.
docker-compose down
sh inspect.sh
Once inside the running bash container, fire up pyspark in the app home with the correct dependencies:
cd /opt/spark-apps
/spark/bin/pyspark --packages io.delta:delta-core_2.12:1.2.1 --conf spark.sql.catalogImplementation=hive --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
When pyspark is running, e.g.
dbs = spark.sql("SHOW SCHEMAS")
dbs.show()
df = spark.sql("select resourceType, count(1) from raw.fhir group by resourceType")
df.show()
The application is in a python module ./fhir_pipeline
:
__main__.py
- Executes the pipelinepipeline.py
- has the pipeline logicvalidate.py
- has data ops load validation
The configuration is held in ./config
see the configuration section below.
This is a spark application with DeltaLake it requires following dependencies installed in order to run locally:
Ensure that the spark home path and is added to youy path is set Eg:
export SPARK_HOME="$HOME/opt/spark-3.2.1-bin-hadoop3.2"
Enable DeltaLake by:
cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
Add the following to spark-defaults.conf
:
spark.jars.packages io.delta:delta-core_2.12:1.2.1
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.catalogImplementation hive
Create virual environment and install dependencies for local development:
python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
pip install --editable .
It can be executed in vscode by hitting F5 with the settings in .vscode.
It will load the data using spark into a raw delta table called raw.hfir
persisted into the hive metastore.
Run pyspark
in the project root and explore the data in raw.fhir
The local environment setup can be restored to it's originating state by executing:
sh cleanup.sh
There is a single end to end integration test for this pipeline, to execute run. This will require a local development environment:
pytest
Load schema's are abstracted into config settings that can be found here ./config
:
- config/schema/fhir.yaml is the source data spark schema
Load configurations are abstracted into config settings that can be found here ./config/[ENV]
. For example the local
configuration:
- config/pipeline/local/config.yaml is the general local configuration for the spark project
- config/pipeline/local/landing_to_raw.yaml is the local pipeline configuration for loading raw
- config/pipeline/docker/config.yaml is the general local configuration for the spark project
- config/pipeline/docker/landing_to_raw.yaml is the pipeline configuration for loading raw
The ENV can be set in the .env file for development and in an environment variable for operation:
ENVIRONMENT=local