Share via


OMOP transformations: Custom mapping examples

This article provides examples of how to extend the OMOP (Observational Medical Outcomes Partnership) Common Data Model (CDM) by customizing mappings and configurations. These examples guide you through scenarios such as adding new tables, columns, and updating existing mappings within the OMOP framework. For an overview of how to create these customized mappings, see Customize and extend OMOP mappings in healthcare data solutions.

Prerequisites

Follow the steps in Customize and extend OMOP mappings to copy the configuration files and update the library to use your custom configuration files instead of the default ones.

Example 1: Add columns to a table

When adding a column object to a table's column collection, you can copy an existing column and adjust it as needed. If the table already exists, the library doesn't modify its schema. So, you need to use a Spark SQL script to add the column to the table. Similarly, if data is already loaded in the target and you add a new column, you must reload the data as there's no automatic backfill.

In this example, let's add a new column, custom_myid, to the OMOP PERSON table to store a value from the FHIR patient identifier collection.

  1. Update the custom column in the dbTargetSchema.json file.

    {
      "name": "custom_myid",
      "originDataTypeName": {
        "typeName": "string",
        "length": 50,
        "properties": {
          "minValue": null,
          "maxValue": null,
          "description": "Custom myid"
        }
      },
      "isNullable": true
    }
    
  2. Update the dmfAdapter.json file to map the first identifier from the patient's identifier collection, where the system is the Synthea system, to the new column.

    {
      "fieldName": "custom_myid_alias",
      "fieldCalculatedValue": "try_element_at(filter(patient.identifier, x -> x.system == 'https://github.com/synthetichealth/synthea'), 1).value",
      "enabled": true,
      "fieldType": "string",
      "targetFields": {
        "fields": [
          {
            "tableName": "person",
            "fieldName": "custom_myid"
          }
        ]
      }
    }
    
  3. Create a new notebook in your healthcare data solutions environment and add the following Spark SQL script. Run this script against the OMOP lakehouse to modify the PERSON table with your new column.

    ALTER table healthcare1_msft_gold_omop.PERSON ADD columns (custom_myid string);

  4. In the admin lakehouse files, update the deploymentParametersConfiguration.json file to set the omop_config_path parameter to the path of your custom mapping files.

    For example:

    "omop_config_path": "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/healthcare1_msft_gold_omop.Lakehouse/Files/Mappings"

  5. Save all the JSON files to commit the changes to your Fabric workspace.

  6. Run the OMOP notebook/pipeline.

Example 2: Add a new table

You can copy an existing table object and modify it as needed. However, if you need to add any relationships, make sure you update the relationships collection on the table object. For more information, review the relationships property in dbTargetSchema.json.

When you add a table, include the following columns to maintain consistency with other tables and their mappings in the dmfAdapter.json file.

  • msftSourceTableName: Stores the domain resource type of the silver row that primarily contributes to this OMOP record.
  • msftModifiedDateTime: Stores the timestamp of when the row was upserted into the silver lakehouse.
  • msftSourceRecordId: Stores the ID of the silver domain resource primarily responsible for this row.

For example, let's add a table to store DiagnosticReport.

  1. Update dbTargetSchema.json with a table for DiagnosticReport.

    {
      "namespace": {
        "databaseName": ""
      },
      "tableType": "EXTERNAL",
      "storageDescriptor": {
        "columns": [
          {
            "name": "custom_diagnostic_report_id",
            "originDataTypeName": {
           "typeName": "bigint",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "A unique identifier for the DiagnosticReport record."
              },
              "isNullable": false
            }
          },
          {
            "name": "custom_person_id",
            "originDataTypeName": {
              "typeName": "bigint",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "The person record this Diagnostic Report is related to."
              },
              "isNullable": false
            }
          },
          {
            "name": "custom_conclusion",
            "originDataTypeName": {
              "typeName": "string",
              "length": 1000,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "Holds the conclusion of the Diagnostic Report."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftSourceRecordId",
            "originDataTypeName": {
              "typeName": "string",
              "length": 50,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "This field is used to store the original record id from the source data. It is not intended for use in standard analytics and is for reference only."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftSourceTableName",
            "originDataTypeName": {
              "typeName": "string",
              "length": 50,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "This field is used to store the original table name from the source data. It is not intended for use in standard analytics and is for reference only."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftModifiedDatetime",
            "originDataTypeName": {
              "typeName": "timestamp",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "timestampFormat": "yyyy-MM-ddTHH:mm:ssZ",
                "description": ""
              },
              "isNullable": true
            }
          }
        ]
      },
      "name": "custom_diagnostic_report",
      "entityType": "TABLE",
      "properties": {
        "businessArea": "",
        "path": "custom_diagnostic_report.cdm.json/custom_diagnostic_report",
        "description": "Holds basic info for the diagnostic report.",
        "displayName": "custom_diagnostic_report",
        "isDay0Entity": "False",
        "fromBusinessAreas": "",
        "primaryKeys": "custom_diagnostic_report_id",
        "industries": "",
        "relationships": "[{\"joinPairs\": [{\"fromAttribute\": \"custom_person_id\", \"toAttribute\": \"person_id\"}], \"fromEntity\": \"custom_diagnostic_report.cdm.json/custom_diagnostic_report\", \"toEntity\": \"person.cdm.json/person\"}]"
      }
    }
    
  2. Update dmfAdapter.json to include the basic diagnostic report mappings.

    1. Add DiagnosticReport to the query tables.

      {
      "name": "DiagnosticReport"
      }
      
    2. Add a new source table with the basic diagnostic report mappings.

      {
          "tableName": "DiagnosticReport",
          "description": "diagnostic report",
          "query": "select *, msftModifiedDatetime as d_msftModifiedDatetime from DiagnosticReport",
          "modifiedonField": "msftModifiedDatetime",
          "targetAnchorTables": [
            {
              "tableName": "custom_diagnostic_report"
            }
          ],
          "enabled": true,
          "sourceFields": [
            {
              "fieldName": "id",
              "fieldType": "string",
              "isPrimaryKey": true,
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "custom_diagnostic_report_id"
                  },
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftSourceRecordId"
                  }
                ]
              }
            },
            {
              "fieldName": "conclusion",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "custom_conclusion"
                  }
                ]
              }
            },
            {
              "fieldName": "subject_id",
              "fieldCalculatedValue": "subject.id",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "person",
                    "fieldName": "person_id",
                    "targetField": "custom_person_id"
                  }
                ]
              }
            },
            {
              "fieldName": "d_msftModifiedDatetime",
              "fieldType": "datetime",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftModifiedDatetime"
                  }
                ]
              }
            },
            {
              "fieldName": "resourceType",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftSourceTableName"
                  }
                ]
              }
            }
          ]
      }
      
  3. Update dbTargetSchemaConfig.json and include custom_diagnostic_report in the fields property for both SourceModifiedOn and SourceTable.

    [
          {
            "name": "SourceModifiedOn",
            "description": "Used for comparing with source system records to know when a record was changed in the source.",
            "tables": [
              "person",
              "observation_period",
              "visit_occurrence",
              "visit_detail",
              "condition_occurrence",
              "drug_exposure",
              "procedure_occurrence",
              "device_exposure",
              "measurement",
              "observation",
              "death",
              "note",
              "specimen",
              "location",
              "provider",
              "care_site",
              "note_nlp",
              "image_occurrence",
              "custom_diagnostic_report"
            ],
            "type": "timestamp",
            "timestampFormat": "yyyy-MM-ddTHH:mm:ssZ",
            "enabled": true
          },
          {
            "name": "SourceTable",
            "description": "Used for comparing with source system records to know when a record was changed in the source.",
            "tables": [
              "person",
              "observation_period",
              "visit_occurrence",
              "visit_detail",
              "condition_occurrence",
              "drug_exposure",
              "procedure_occurrence",
              "device_exposure",
              "measurement",
              "observation",
              "death",
              "note",
              "specimen",
              "location",
              "provider",
              "care_site",
              "note_nlp",
              "image_occurrence",
              "custom_diagnostic_report"
            ],
            "type": "string",
            "enabled": true
          }
    ]
    
  4. In the admin lakehouse files, update the deploymentParametersConfiguration.json file to set the omop_config_path parameter to the path of your custom mapping files.

    For example:

    "omop_config_path": "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/healthcare1_msft_gold_omop.Lakehouse/Files/Mappings"

  5. Save all the JSON files to commit the changes to your Fabric workspace.

  6. Run the OMOP notebook/pipeline.

Domain mapping

Many tables in OMOP have a concept column used primarily for analysis, such as observation_concept_id, procedure_concept_id, and measurement_concept_id. When working with source tables, you can use this primary analysis column's domain to determine which target table to write to. For example, the FHIR observation.code can be mapped to the OMOP vocabulary. Depending on the domain, this FHIR observation might map to an OMOP observation, measurement, condition, or procedure. To incorporate domain-based mappings into OMOP, follow the steps in this section.

Shortcut the gold lakehouse tables fhir_system_to_omop_vocab_mapping and concept to the silver lakehouse. This shortcut lets the library access relevant vocabulary tables from the source lakehouse.

Update the dmfAdapter.json file with these changes:

  1. Add fhir_system_to_omop_vocab_mapping and concept to the queryTables section.

  2. For the sourceTables where domain mapping is required, follow these steps:

    1. Modify queries where the domain needs to return c.domain_id as omop_domain in the SELECT statement.

    2. Update queries that need the domain to include two more SQL LEFT JOIN statements after the initial FROM clause. Here are a few examples where you can assume that the source table is the observation table, aliased as o:

      1. left join fhir_system_to_omop_vocab_mapping fhiromop on o.code.coding[0].system = fhiromop.fhir_uri
      2. left join concept c on c.concept_code = o.code.coding[0].code
    3. The query for this observation should then be:

        SELECT
            o.*,
            o.id AS o_id,
            p.id AS p_id,
            e.id AS encounter_id,
            c.domain_id AS omop_domain
        FROM Observation o
        LEFT JOIN fhir_system_to_omop_vocab_mapping fhiromop
            ON o.code.coding[0].system = fhiromop.fhir_uri
        LEFT JOIN concept c
            ON c.concept_code = o.code.coding[0].code
            AND fhiromop.vocabulary_id = c.vocabulary_id
        LEFT JOIN (
            SELECT
                MAX(child.id) AS id,
                child_i.value AS value,
                child_i.system AS system,
                child_i.type.coding.code AS type_code
            FROM Patient child
            LATERAL VIEW EXPLODE(identifier) AS child_i
            GROUP BY child_i.value, child_i.system, child_i.type.coding.code
        ) p
        ON p.value = o.subject.identifier.value
            AND (p.system = o.subject.identifier.system OR (p.system IS NULL AND o.subject.identifier.system IS NULL))
            AND (p.type_code = o.subject.identifier.type.coding.code OR (p.type_code IS NULL AND o.subject.identifier.type.coding.code IS NULL))
        LEFT JOIN (
            SELECT
                MAX(enc.id) AS id,
                enc_i.value AS value,
                enc_i.system AS system,
                enc_i.type.coding.code AS type_code
            FROM Encounter enc
            LATERAL VIEW EXPLODE(identifier) AS enc_i
            GROUP BY enc_i.value, enc_i.system, enc_i.type.coding.code
        ) e
        ON e.value = o.encounter.identifier.value
            AND (e.system = o.encounter.identifier.system OR (e.system IS NULL AND o.encounter.identifier.system IS NULL))
            AND (e.type_code = o.encounter.identifier.type.coding.code OR (e.type_code IS NULL AND o.encounter.identifier.type.coding.code IS NULL));
      
    4. For the sourceFields mapping of the primary key, include more fields to accommodate potential target tables. Each field mapping must include a condition that ensures mapping based on the domain. If the primary key condition isn't met, the record isn't written to the target table. For example, the observation field mapping can be modified to allow mapping to observation, measurement, or procedure, depending on the domain.

      {
          "sourceFields": [
            {
              "fieldName": "o_id",
              "fieldType": "string",
              "isPrimaryKey": true,
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "observation",
                    "fieldName": "observation_id",
                    "condition": "subject.type = 'Patient' and isnull(omop_domain) or (omop_domain != 'Measurement' and omop_domain != 'Procedure')"
                  },
                  {
                    "tableName": "measurement",
                    "fieldName": "measurement_id",
                    "condition": "subject.type = 'Patient' and omop_domain == 'Measurement'"
                  },
                  {
                    "tableName": "procedure_occurrence",
                    "fieldName": "procedure_occurrence_id",
                    "condition": "subject.type = 'Patient' and omop_domain == 'Procedure'"
                  },
                  { "tableName": "observation", "fieldName": "msftSourceRecordId" },
                  { "tableName": "measurement", "fieldName": "msftSourceRecordId" },
                  { "tableName": "procedure_occurrence", "fieldName": "msftSourceRecordId" }
                ]
              }
            }
          ]
      }
      
    5. Add any necessary sourceField mappings to the new target tables. For example, you might need to map FHIR observation.effectiveDateTime to OMOP's procedure_occurrence.procedure_datetime.

Troubleshoot

Review the OMOP notebook cell output to diagnose execution issues. You can also check Azure Log Analytics for detailed library logs.

Warnings

The following warnings can appear during the first run. These warnings are expected because the target tables haven't been created yet.

  • Could not read table from 'abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/d5dd0e6f-0d4a-4d66-8de7-d80312418f52/DMHCheckpoint/dtt/dtt_state_db/KEY_MAPPING/CUSTOM_DIAGNOSTIC_REPORT_ID_MAPPING', error message: [PATH_NOT_FOUND] Path does not exist: abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/d5dd0e6f-0d4a-4d66-8de7-d80312418f52/DMHCheckpoint/dtt/dtt_state_db/KEY_MAPPING/CUSTOM_DIAGNOSTIC_REPORT_ID_MAPPING. Traceback: <traceback object at 0x793a44b03500>

  • Could not read table from 'abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>/Tables/custom_diagnostic_report', error message: [PATH_NOT_FOUND] Path does not exist: abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>/Tables/custom_diagnostic_report. Traceback: <traceback object at 0x793a3c5c2780>