Hidden costs of streaming

TL;DR

The shift to real-time data processing, facilitated by cloud platforms like Azure Databricks, allows businesses to react instantaneously to dynamic data streams, eliminating traditional scheduling constraints. However, this setup comes with hidden costs, primarily related to storage operations and metadata overhead, which can become significant as data volume and file complexity increase. To mitigate these costs, one can increase the batch interval of the stream and look into features for solving the small file problem on delta tables. The latter includes: set logRetentionDuration, deletedFileRetentionDuration, VACUUM and optimize.

Introduction

The ability to process data in real time fascinates many of us. It redefines how we interact with and utilize information. Imagine a data engineer as a modern-day plumber, not working with pipes and water, but with streams of data—creating intricate pipelines that allow data to flow seamlessly from its source to its destination. Just as a plumber ensures the steady flow of water, the data engineer ensures a continuous, reliable stream of information, ready to be consumed by a thirsty business user or data-consuming model/machine.

In this ideal setup, the need for traditional scheduling becomes obsolete. Instead of relying on predetermined times for data processing, the rate of flow is dictated by the natural change in the data itself. This allows businesses to respond to events as they happen, whether it's customer behaviour, market fluctuations, or operational data. The pipeline becomes a living system, adjusting dynamically, ensuring that insights and actions are based on the freshest data possible—giving companies a real-time pulse on their world.

Selling this ideal scenario is easy. Until they ask the question.

Image
data-engineer-plumber

'How much is this going to cost us?'

Unlike a lead pipe—though in reality, it should probably be PVC—let's stick to the romantic notion of an old-school plumber, carefully crafting a flow system from solid materials. However, a data pipeline differs in one crucial aspect: it’s powered by an engine that requires constant energy and regular maintenance. Running this engine 24/7 can be expensive, and ensuring its smooth operation demands frequent upkeep.

Fortunately, with cloud computing and the scalable infrastructure that platforms like Azure Databricks provide, many of these costs become predictable and manageable. The flexibility to scale up or down based on demand allows businesses to optimize resource use and only pay for what they need, minimizing unnecessary expenses. But there are still hidden variables in the equation—some costs that remain less predictable.

These could include unexpected surges in data volume, system complexities that arise from new integrations, or performance bottlenecks that require unplanned optimizations. While cloud technology offers a solid framework for managing data pipelines, it’s the unforeseen factors that can still challenge even the most finely tuned systems.

Below is a first-hand testimony of setting up a streaming platform in Azure Databricks.

Streaming testimony

As a data engineer working on building a streaming platform, we were tasked with setting up a real-time data processing pipeline that could handle a large volume of data with low latency. We chose Azure Databricks as the core of the architecture and Azure Storage Account as the main storage for data. This decision has proved to be extremely valuable in many previous projects. Data for this project is coming from an on-premises SAP server and is efficiently streamed to a Lakehouse in Databricks using Fivetran.

Challenge

The challenge was to process data in real time from the source to the data lake, ready for reporting. The main source in this case provides a semi-real-time data stream with new rows every few seconds. We needed to ensure scalability, resilience, and accurate results from this complex data environment, which required both batch and streaming capabilities.

Given that the data was already being processed and stored within Databricks in the form of Delta tables, we decided to leverage a Databricks view for each streaming source. This allowed us to pull data from the source, ensuring consistency and flexibility using some adaptations defined in the view. These involved adding business keys, renaming columns to meaningful names and casting where needed.

Setting Up the Streaming Platform using Spark Structured Streaming

1. Creating the Databricks View:

CREATE OR REPLACE VIEW catalog.bronze.vw_sap4hana_cabn
AS SELECT
    ATINN AS BK_ATINN
    ,ATNAM
    ,ingestion_timestamp
    ,deleted     
FROM catalog.bronze.sap4hana_cabn   

2. Ingesting Data from the View: Instead of pulling raw data from (multiple) tables, we used a view as the source for the streaming process. The view dynamically updates as new data comes in when executing a REFRESH command.

Changes are re-emitted as append-only rows using the setting 'ignoreChanges'.

df = spark.readStream\
        .option("ignoreChanges", "true")\
        .table("catalog.bronze.vw_sap4hana_cabn")

3. Processing the data:

First, the view is refreshed. Then, a MERGE statement is executed within a forEachBatch function. The context of this function ensures that each batch can access the entire target table to search for matching rows before performing updates or inserts.

def merge_data(df, batch_id):
    df.createOrReplaceTempView("cabn_bronze")
    
    sql_query = """
      MERGE INTO catalog.silver.sap4hana_cabn c
      USING cabn_bronze m
      ON c.BK_ATINN=m.BK_ATINN      
      WHEN MATCHED THEN
      UPDATE SET
        ATNAM = m.ATNAM,
        ingestion_timestamp = m.ingestion_timestamp,
        deleted = m.deleted
      WHEN NOT MATCHED THEN                                 
      INSERT *
      """
     df._jdf.sparkSession().sql("REFRESH TABLE catalog.bronze.vw_sap4hana_cabn")  
     df._jdf.sparkSession().sql(sql_query)

4. Writing the Processed Data:

(df.writeStream
  .format('delta')
  .forEachBatch(merge_data)  
  .outputMode('update')
  .start()) 

5. Monitoring and Autoscaling: Azure Databricks' autoscaling clusters automatically adjust based on the stream load, ensuring that the system can handle fluctuating data volumes without manual intervention.

Hidden costs started popping up

Initially, the Azure Databricks streaming platform ran smoothly and effectively met our real-time processing needs. However, over time, we noticed some unexpected costs emerging—not from the compute resources or cluster operations, but from storage account expenses. Specifically, costs categorized under "other operations" began to spike, and after investigation, we found the root cause to be metadata operations related to storage access.

~ Nerdy explanation

In the Azure setup, Databricks relies heavily on Hadoop Distributed File System (HDFS) protocols for file operations, and it constantly communicates with Azure Data Lake Storage (ADLS). For each query or write operation, Databricks performs numerous metadata operations such as getPathStatus requests to read Delta tables. These operations involve checking the status of the files in the underlying data lake, which leads to frequent API calls.

Over time, as the volume of data increases and more files are created (particularly small files), these metadata requests surge, resulting in higher costs. The category "other operations" in the Azure billing is directly related to this. Small files, in particular, exacerbate the problem, as each small file requires individual metadata reads and status checks.

Here's a breakdown of why this became problematic:

  1. Metadata Overhead: Every small file requires multiple API calls to the storage account to perform file status checks. Operations like getPathStatus are used to check the existence and state of files, and these requests accumulate as the number of small files increases.
  2. Frequent Access: Our streaming platform continuously accessed the data lake to read and process data. For each read, Spark and Databricks needed to resolve file paths and metadata, which translated into additional API calls.
  3. Cost of ‘Other Operations’: In Azure, "other operations" in storage costs are often driven by these metadata requests. These operations include listing directories, checking file statuses, and interacting with the storage account in ways that go beyond simple data retrieval.

A solution to hidden storage costs

After analyzing the situation, we realized the best way to reduce costs was to minimize the number of small files in our Delta Lake. The goal was to reduce the frequency of metadata operations while still maintaining the performance and (near) real-time capabilities of our streaming platform.

Here’s how we tackled the issue.

1. Adjusting batch interval:

We increased the trigger interval for our streaming job, allowing the system to accumulate data over a longer period before writing to storage. This resulted in fewer but larger files rather than many small files.

2. Optimize, Vacuum, log retention duration and deleted file retention duration

Delta Lake has built-in features to address the issue of small files, including but not limited to VACUUM, LogRetentionDuration, deletedFileRetentionDuration, and OPTIMIZE.

Delta Lake stores data in the PARQUET format and manages data files using JSON files (or log files) in the _delta_log folder. Both data and log files can accumulate over time. The retention periods for these files are controlled by the Delta settings deletedFileRetentionDuration (for data files) and logRetentionDuration (for log files). Data files are not deleted automatically, while log files are deleted during every checkpoint operation. Running VACUUM periodically will remove data files that are no longer needed. Below is an example of how we set both retention settings to one day.

alter table catalog.silver.sap4hana_cabn
  set TBLPROPERTIES
    ('delta.logRetentionDuration'='interval 1 days',
     'delta.deletedFileRetentionDuration'='interval 1 days');
VACUUM catalog.silver.sap4hana_cabn     

Additionally, active data files can be rewritten into optimally sized files—and optionally sorted—using the OPTIMIZE command. Running OPTIMIZE and VACUUM regularly is considered best practice and effectively addresses the small file issue.