Real Time Event Streaming In Healthcare

07/25/2024

Looking in the Rear View Mirror

Most healthcare data sources are not available in real time. A 30 to 90-day lag for claims processing remains the industry standard. This delay in healthcare data availability drives many healthcare data organizations to concentrate their efforts on optimizing their batch ETL (Extract, Transform, Load) technologies. Common ETL tools used in the industry include:

  • Informatica
  • Talend
  • Fivetran
  • Oracle Data Integrator

At Astrana Health, we recognize that real-time insights are the future, even as we currently rely heavily on batch processing. One area where we leverage real-time streaming technologies is in ingesting ADT (Admission, Discharge, Transfer) data to support Astrana Health’s Inpatient Management workflows.

When one of our patients visits the hospital, timely communication is crucial. How quickly can our hospitalists be informed that our patient is in the ER? What about our post-discharge team, which coordinates care between the hospital and the patient’s primary care physician? In all cases, the faster the information is relayed, the better. Given this need for speed, the ingestion and use of ADT data are particularly unsuited for batch processing.

Streaming Healthcare Data

The main technologies used in our event-driven streaming system are AWS (Amazon Web Services) Lambdas, Apache Kafka and Databricks Delta Live Tables (DLT). This architecture allows us to elastically scale based on volume of data and stream the data into as many software applications as needed.

Key Components of Each Technology

AWS Lambda is a serverless compute service provided by AWS that allows us to run code without provisioning or managing servers. It automatically scales and manages the infrastructure needed to run our code in response to triggers, such as data changes or incoming requests. For instance, if we experience an overnight spike in volume from 10,000 ADT messages per day to 100,000 ADT messages per day, Lambda will automatically allocate the necessary compute resources to handle the increased load and then scale down once the demand decreases.

Apache Kafka is a high-performance, distributed streaming platform that allows us to publish, subscribe to, store, and process streams of records in real time. It acts as a central nervous system for our data infrastructure. Kafka comes with features like topic-based data labeling. For instance, consider a use case where a patient sends a text message that we want to combine with an ADT diagnosis in a consolidated alert to a Care Manager in real time. With Kafka, we can label both the patient_text and the ADT diagnosis, then route the information appropriately for seamless integration and timely alerts.

Databricks Delta Live Tables are a feature of Databricks that enable real-time data ingestion through a structured streaming interface for continuously updating tables. This technology allows us to query and update tabular data in real-time, serving as the final destination for our real-time data streams.

Security First

In any healthcare business, security is a top concern. Therefore, our Databricks environment and Kafka clusters are each hosted on their own VPCs and communicate with each other using a private subnet via an AWS Peering connection. The API gateway can interact with the MSK VPC only through a Lambda function, which requires proper API key access provided by Astrana Health. This ensures that only authorized entities, with the correct API key distributed by us, can interact with the system.

Apache Kafka & Databricks

One of the key achievements of this architecture is enabling our MSK Clusters (Kafka) to communicate seamlessly with our existing Databricks environment. In addition to the networking security measures previously described, we also had to configure specific policies to allow Databricks to access and utilize certain AWS services. For security reasons, we won’t share our exact policy, but you can view an example below:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

To enable Databricks to utilize this policy, we configured the Kafka consumer with specific parameters to successfully access the MSK Cluster via AWS IAM. The key configuration parameters are:

"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
  "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
  "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

The Kafka consumer we created is now capable of actively listening to a vendor-specific topic and is ready to read any new messages as they come in. This real-time data ingestion ensures that our systems are always up-to-date and responsive to the latest information.

HL7 Parser

Now that we have successfully ingested data into our Databricks environment, we needed a method to transform the HL7 messages into a format that meets our business needs. We decided to use an established HL7 Python package, as it allows us to write code that closely matches the structure of HL7 messages. Below is a code snippet of our parser and an example of how vendors define these variables:

data = {
		'facility_name': msg_parsed.segments('MSH')[0][3][0],#MSH.3 - Sending Application
		'patient_name': str(msg_parsed['PID'][0][5][0][1])+str(msg_parsed['PID'][0][5][0][0]), #PID.5.1- Family name, PID.5.2 - Given Name
    'patient_id': str(msg_parsed['PID'][0][3][0][0]),# PID.3 - ID
    'patient_dob': msg_parsed.segments('PID')[0][7][0], #PID.7 - Date/Time Of Birth
    }

With security and logic in place, we were now able to start ingesting data and stress-testing our new architecture. This ensured that our systems were robust and capable of handling the expected load, providing reliable and timely data processing for our healthcare operations.

Load Testing the Architecture

To test the robustness of the architecture, we sent multiple batches of 100,000 ADT messages to the API to measure the average time it takes for each message to reach the first table. Initially, we attempted to send these messages using as many threads as possible, without limiting the number of threads.

However, we discovered that opening too many threads actually slowed down the number of requests we could send per second. After fine-tuning the test, we successfully sent 100,000 ADT messages in approximately 30 minutes, with each message taking an average of 4 to 5 seconds to reach the first table from the time it was sent.

Please note that the bottleneck in these tests was the cluster sending the 100,000 ADT messages, rather than the architecture receiving and processing them. Therefore, we expect the system to perform even faster in production!

Summarized

  • Scale Seamlessly: Kafka’s distributed architecture, AWS lambda functions, and Delta Live Tables allow us to elastically scale to meet growing data demands without compromising performance or requiring significant engineering resources.
  • Integrate Anywhere: Currently, Databricks Delta Live Tables is the only client utilizing Kafka. However, we can build several other clients on top of the same data Kafka topics without disrupting the flow to other clients or applications (patient applications, provider applications, care management applications, utilization management applications, etc.).
  • Ensure Data Security: All data flows within our private VPC and is encrypted when stored in Kafka. Only trusted clients can access the data from Kafka, and only specified users can utilize the API via OAuth Authentication.
  • React Instantly: With Delta Live Tables and additional monitoring set up through AWS (Cloudwatch), we can monitor the data from start to finish, pinpoint the exact location of any errors, and address them in real time.

Thinking about working with us? We’d love to chat! Take a look at our open positions: here

Introducing Our HEDIS Engine

Investing in Technology to Speed Up the Quality Improvement Cycle


Mon 08 Jan 2024


The Healthcare Effectiveness Data and Information Set (HEDIS®) by NCQA plays a central role in many Value-Based Care (VBC) operations. HEDIS is a set of measure definitions that track healthcare processes, outcomes, and patient satisfaction. Improving these measures, or “closing care gaps,” is vital to providing high-value care and is often associated with financial incentives.

However, Improving HEDIS® is More Challenging Than It Seems.

It is hard for many reasons, but in this article, let’s focus on the administrative side challenges.

First, the HEDIS® logic definitions are not straightforward to implement. Healthcare data are inherently complex and messy; thus, to capture the true “quality,” the logic definitions must also identify the correct cohorts for the measurements and handle various exception cases.

Second, HEDIS® outputs often do not reflect the latest data. Most value-based care organizations calculate the HEDIS® outputs quarterly or monthly at best, stacking additional latency on top of the glacial claims adjudication process. Thus, the HEDIS® outputs show the results of one to two months (or many months) ago. Such outdated (and hence wrong) results are one of the primary sources of provider abrasion regarding many quality improvement initiatives.

Third, many new data sources are available to augment the HEDIS®-related insights, but the vendors are slow to adopt such. Traditionally, HEDIS® uses administrative data, i.e., claims, to calculate the outputs. However, the community is adopting more data sources, such as Electronic Health Records, additional evidence from community workers, and other SDOH data. Integrating these new data sources with the existing vendor outputs often becomes a roadblock to further innovation in the area.

Finally, vendor outputs are not transparent to provide actionable insights. In many HEDIS® improvement projects, it is critical to understand “why” a particular care gap is closed or open. If the care gap is closed, we need to be sure which claim qualified for the gap closure and which doctor was involved. If not, we must develop appropriate outreach strategies, e.g., scheduling an appointment or following up with a PCP. However, most HEDIS® vendors fail to provide that much granular information.

We at Astrana Health believe accurate, reliable, and timely updates of HEDIS® measures are prerequisites for providing high-quality patient care. Based on the technical challenges above, and as a leading VBC provider organization, we embarked on the journey of implementing our HEDIS® engine in 2022 by ourselves. Since then, we have received NCQA certifications for our HEDIS® engine and have been working hard to tackle the current HEDIS® challenges. We want to share one of our exciting new developments with this article.

The below diagram shows our current HEDIS® engine and data pipeline:

Let’s Build a Fast and Transparent HEDIS® Engine!

When implementing the HEDIS® logic in 2022, we decided to use Python, a general-purpose programming language. Having a Python-based HEDIS® engine gave us a lot of flexibility, such as running the engine on various platforms. At first, we ran the engine on a Ubuntu Virtual Machine (VM). It was already faster than any other vendors in the market and running fine. However, we wanted to push the envelope further. In 2023, we transitioned our quality engine and data to our Databricks Delta Lake. With the shift to Databricks, we achieved a 5x runtime savings over our previous approach. Our HEDIS® engine runs over a million members through 20+ measures for two measurement years in roughly 2.5 hours! Frankly, we are thrilled with the result. With this setup, we can:

  • consolidate a complicated process into a single tool
  • minimize runtime
  • provide enhanced transparency to our stakeholders
  • save money

Scaling is Trivial with Pyspark

Our first implementation on an Ubuntu VM extracted HEDIS® inputs as JSON documents from an on-prem SQL server. The VM was costly (running 24×7), with 16 CPUs to support Python pool-based parallelization.

With pyspark, we register a Spark User-Defined Function (UDF) and rely on the framework to manage parallelization. This modification yielded significant performance benefits and is more straightforward to read and communicate to teammates. With the ability to trivially scale clusters as necessary, our implementation can support the needs of a growing business. Moreover, with Databricks, you pay for what you need. We have reduced the compute cost by at least 1/2 in transitioning to Databricks Jobs clusters.

Parallel Processing Before…

Parallel Processing After…

Traceability Is Enabled by Default

As data practitioners, we are frequently queried by stakeholders to explain unexpected changes. While cumbersome, this task is critical to retaining users’ confidence and ensuring they trust our data. If we’ve performed our job well, changes to measures reflect variance in the underlying data distributions. Other times, we make mistakes. Either way, we are accountable to understand the source of a change.

A screenshotted measure. “Why did this change?! It was 52% last Thursday”

In the Databricks Delta Lake, the ability to revert datasets is a default. With a Delta Table, we can quickly inspect a previous version of a member’s record to debug an issue.

We have also enabled change data capture on aggregate tables (provider level, provider group level), which automatically tracks detail-level changes. This capability allows us to reproduce how a rate changes over time.

Reliable Pipelines with Standardized Formats

Our quality performance estimates are a key driver for developing a comprehensive patient data repository. Rather than learning of poorly controlled diabetes once a month via health plan reports, at ApolloMed, we ingest HL7 lab feeds daily and updates our HEDIS® outputs accordingly. In the long term, setting up reliable data pipelines using raw, standardized data sources will facilitate broader use cases, e.g., custom quality measures and machine learning models trained on comprehensive patient records using FHIR bundles from HIEs. As Micky Tripathi marches on and U.S. interoperability improves, we are cultivating the internal capacity to ingest raw data sources as they become available.

Enhanced Transparency

We coded the ApolloMed quality engine to provide the actionable details we’ve always wanted as consumers of quality reports. Which lab met the numerator for this measure? Why was this member excluded from a denominator? This transparency helps report consumers understand the measures they’re accountable to and facilitates gap closures.

Interested in learning more?

The Astrana Health quality engine has 20+ HEDIS Certified Measures is now available to be deployed in your Databricks or any Python-runnable environment. To learn more, please review our Databricks solution accelerator or reach out to da_sales@astranahealth.com for further details.

In this article, we shared our journey of building a more accurate, reliable, and timely HEDIS® engine to provide high-quality patient care. We thank everyone who was involved in this project. Let’s keep pushing the envelope of value-based care analytics!