Supercharging Your Big Data Analysis: Harnessing the Power of Databricks and Marketplace Models with Mistral

More than a year ago OpenAI released ChatGPT and the entire world seems to be using it, then they released different models and many people are also paying for premium features with a monthly fee, however OpenAI models are all behind a REST API which sooner or later you as an AI Developer will face some limitations. Whether you use OpenAI public endpoints or Azure OpenAI Endpoints, in both flavors the models are exposed as REST API and the limitations in terms of scaling are the same.

Now don't get me wrong, for certain use cases I still believe the standard OpenAI models are very useful and even cheaper than Open Source Models, Why? Because you don't have to host the model yourself and the setup is pretty straightforward.

However for some use cases when you need to use an LLM with Big Data, OpenAI wont work for you. For the sake of this post, imagine the following scenario. Your client ACME CORP send you a big zip file with 1TB of text which is all the emails received at support@acmecorp.com for the last years. And they want to analyze what are the customers complaining more about.

In terms of OpenAI, doing it for one email is easy, with RAG you can add the context of the email to a prompt that says something like this: Classify the following email sent to our customer email address. [email]

But doing a loop with 10M Rows, and use Open AI, its a no go, each request will take 1-3 seconds minimum, now you do the math and tell me when it will finish?

You might think, but with Python you can parallelize work with a big compute and use as many cores as possible. Well, I went throught this hypothesis myself and tested it, using standard Python ThreadExecutor functionality, and the speed of the responses wont get better, remember there is an API there, right? OpenAI and Azure have built RateLimit protections on the network level, so at some points you will hit this limit regardless of how big your VM is, and then you will be forced to wait for several seconds before you can try again to send a request.

So, how do we solve this?

The power of Databricks, Marketplace models and model serving

Recently Databricks has included in the marketplace some models like Llama, Mistral, MTP and others. The good news is that you can use these models in your Databricks environment with your own data, you can leverage the power of your VM sizes with GPU when required, and you can host those models easily with model serving.

Does it look like an interesting topic? Stay with me, lets go through all the steps together.

Mistral

For this blog post, we will use Mistral from the Marketplace, there are 2 ways to consume a model in Databricks, you can create an endpoint and load the model there, or you can use the model directly in your own code, for simplicity lets focus on the first one.

Step 1 Install Dependencies

To create and query the model serving endpoint, Databricks recommends to install the newest Databricks SDK for Python.

 

# Upgrade to use the newest Databricks SDK
%pip install --upgrade databricks-sdk
# Install the dependencies for batch inference
%pip install --upgrade transformers>=4.34.0
dbutils.library.restartPython()

Step 2: Choose the Right Instance Type

Creating and querying serving endpoint don't require specific runtime versions and GPU instance types, but for batch inference Databricks suggests the following:

  • Databricks Runtime for Machine Learning version 14.2 or greater
  • Recommended instance types:

Model Name

Suggested instance type (AWS)

Suggested instance type (AZURE)

Suggested instance type (GCP)

mistral_7b_instruct_v0_1

g5.8xlarge

Standard_NV36ads_A10_v5

g2-standard-4

Selecting the appropriate instance type is crucial for efficient utilization of resources. Depending on your cloud provider, choose the suggested instance type that aligns with the Mistral model you're using (e.g., Standard_NV36ads_A10_v5 for Azure).

 

Step 3: Initialize some variables

Here we define which model to use, which versions and from where (the marketplace). We also set the workload type

# Select the model from the dropdown list
model_names = ['mistral_7b_instruct_v0_1']
dbutils.widgets.dropdown("model_name", model_names[0], model_names)

# Default catalog name when installing the model from Databricks Marketplace.
# Replace with the name of the catalog containing this model
catalog_name = "databricks_mistral_models"

# You should specify the newest model version to load for inference
version = "1"
model_name = dbutils.widgets.get("model_name")
model_uc_path = f"{catalog_name}.models.{model_name}"
endpoint_name = f'{model_name}_marketplace'

# Choose the right workload types based on the model size
workload_type = "GPU_LARGE"

Step 4: Create the model serving endpoint or reuse

If you haven't created the endpoint yet, you can create it with below code. If your endpoint is already created use the get method instead. Both lines are provided below

import datetime

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput
w = WorkspaceClient()

config = EndpointCoreConfigInput.from_dict({
    "served_models": [
        {
            "name": endpoint_name,
            "model_name": model_uc_path,
            "model_version": version,
            "workload_type": workload_type,
            "workload_size": "Small",
            "scale_to_zero_enabled": "False",
        }
    ]
})

model_details = w.serving_endpoints.create(name=endpoint_name, config=config) #Uncomment if no endpoint is created
#model_details = w.serving_endpoints.get(name=endpoint_name) #Comment out if you need to create an endpoint
model_details.result(timeout=datetime.timedelta(minutes=60)) #Uncomment if no endpoint is created

Step 5: Use the model from the service endpoint

Define your dataset, then with WorkspaceClient, just query the endpoint using the name of your endpoint, very straightforward.

from databricks.sdk import WorkspaceClient
import re
# Change it to your own input
dataframe_records = [
    {"prompt": "You are a helpful assistant which only task is to analyze the text of emails sent to a customer service mailbox.  From this text you must extract a category.  No category labels will be provided. Only return the category. Email: Dear Customer Support   I am writing to inquire about the status of my recent order (Order #12345). I placed the order two weeks ago and have not received any updates on the shipping or delivery. Can you please provide me with an update on when I can expect to receive my order?  Thank you for your prompt attention to this matter.  Sincerely ", "max_tokens": 512}
]

w = WorkspaceClient()
response = w.serving_endpoints.query(
    name="mistral_7b_instruct_v0_1_marketplace",
    dataframe_records=dataframe_records,
)

if response.predictions and 'candidates' in response.predictions[0]:
    category_text = response.predictions[0]['candidates'][0]['text']
    match = re.search(r'\[Category: (.+?)\]', category_text)

    if match:
        category_name = match.group(1)
        print(f"Category: {category_name}")
    else:
        print("No category found in the response.")
else:
    print("Category not found in the response.")

After several tests, this took 0.12 seconds for inference.

Step 6: Batch Inference with the endpoint

Now you have a serving endpoint with mistral, you tested with one prompt, what about if we test it with lots of prompts in one request? this is not parallelization, we are just sending a giant request and receiving a giant reponse. The following code, gets a dataframe with 1000 rows, then tests with different batch sizes (request size) and it will print out the average per record for each batch size, for ease of use, I am limitting the testing to maximum 50 records from the csv, and with batch sizes 1 to 50. When I tested batch sizes of 100, and 200 I get the same average 0.12seconds per row. When I test with 500 batch, then I get a timeout, this seems to be a very giant request for the serving endpoint :)

6.A No parallelization

import pandas as pd
from databricks.sdk import WorkspaceClient
import time

def read_csv_file(csv_file_path):
    """
    Reads a CSV file into a pandas DataFrame.

    Parameters:
        csv_file_path (str): Path to the CSV file.

    Returns:
        pd.DataFrame: DataFrame containing the CSV data.
    """
    return pd.read_csv(csv_file_path)

def add_additional_text(data_frame, additional_text):
    """
    Adds additional text to the 'Body' column of a DataFrame.

    Parameters:
        data_frame (pd.DataFrame): DataFrame to modify.
        additional_text (str): Additional text to append to 'Body'.

    Returns:
        pd.DataFrame: Modified DataFrame.
    """
    data_frame['Body'] = additional_text + data_frame['Body']
    return data_frame

def convert_to_data_records(data_frame, text_column_name):
    """
    Converts a DataFrame into a list of records suitable for serving endpoint.

    Parameters:
        data_frame (pd.DataFrame): DataFrame to convert.
        text_column_name (str): Name of the column containing text data.

    Returns:
        list: List of records.
    """
    data_records = []
    for index, row in data_frame.iterrows():
        record = {
            "prompt": row[text_column_name],
            "max_tokens": 512  # You can adjust this as needed
        }
        data_records.append(record)
    return data_records

def query_serving_endpoint(endpoint_name, data_records):
    """
    Queries a serving endpoint with the provided data records.

    Parameters:
        endpoint_name (str): Name of the serving endpoint.
        data_records (list): List of records to query.

    Returns:
        tuple: Tuple containing response and elapsed time.
    """
    w = WorkspaceClient()
    start_time = time.time()
    response = w.serving_endpoints.query(name=endpoint_name, dataframe_records=data_records)
    end_time = time.time()
    elapsed_time = end_time - start_time
    return response, elapsed_time

def test_performance(csv_file_path, endpoint_name, max_records_per_batch, text_column_name, additional_text):
    """
    Tests the performance of a serving endpoint with varying batch sizes.

    Parameters:
        csv_file_path (str): Path to the CSV file.
        endpoint_name (str): Name of the serving endpoint.
        max_records_per_batch (int): Maximum number of records per batch.
        text_column_name (str): Name of the column containing text data.
        additional_text (str): Additional text to append to 'Body'.
    """
    df = read_csv_file(csv_file_path)
    df = add_additional_text(df, additional_text)

    batch_sizes = [1, 2, 5, 10, 25, 50]
    best_batch_size = None
    best_average_time_per_record = float('inf')  # Initialize with infinity for comparison

    for batch_size in batch_sizes:
        print(f"\nTesting with batch size: {batch_size}")

        # Ensure each batch has a maximum of max_records_per_batch records
        batches = [df.iloc[i:i+max_records_per_batch] for i in range(0, len(df), max_records_per_batch)]

        total_elapsed_time = 0
        total_records_processed = 0

        for batch in batches:
            data_records = convert_to_data_records(batch, text_column_name)
            response, elapsed_time = query_serving_endpoint(endpoint_name, data_records)
  
            total_elapsed_time += elapsed_time
            total_records_processed += len(batch)

        average_time_per_record = total_elapsed_time / total_records_processed
        print(f"\nAverage time per record (batch size {batch_size}): {average_time_per_record:.4f} seconds")

        # Update the best batch size if the current one is better
        if average_time_per_record < best_average_time_per_record:
            best_batch_size = batch_size
            best_average_time_per_record = average_time_per_record

    print(f"\nBest Batch Size: {best_batch_size}, Best Average Time per Record: {best_average_time_per_record:.4f} seconds")

if __name__ == "__main__":
    text_column_name = "Body"
    csv_file_path = 'file:/Workspace/Users/luis.valencia@element61.be/.ide/BigDataParallelAPIProcessing-0fc9a8d0/processed_data.csv'
    endpoint_name = "mistral_7b_instruct_v0_1_marketplace"
    max_records_per_batch = 50
    additional_text = "You are a helpful assistant which only task is to analyze the text of emails sent to a customer service mailbox.  From this text you must extract a category.  No category labels will be provided. Email: "

    test_performance(csv_file_path, endpoint_name, max_records_per_batch, text_column_name, additional_text)


Results are below, as you can see the batch size doesnt matter, its basically the same, but this is a lot better than OpenAI where each requests will take around 3 seconds in average, for this specific prompt size.

Results

  • Testing with batch size: 1.
  • Average time per record (batch size 1): 0.1295 seconds
  • Testing with batch size: 2
  • Average time per record (batch size 2): 0.1300 seconds
  • Testing with batch size: 5
  • Average time per record (batch size 5): 0.1298 seconds
  • Testing with batch size: 10
  • Average time per record (batch size 10): 0.1295 seconds
  • Testing with batch size: 25
  • Average time per record (batch size 25): 0.1294 seconds
  • Testing with batch size: 50
  • Average time per record (batch size 50): 0.1294 seconds
  • Best Batch Size: 50, Best Average Time per Record: 0.1294 seconds`

 

6.B Parallelize requests with standard python library.

I made some changes in the code to parallelize request, see the code below

import pandas as pd
from databricks.sdk import WorkspaceClient
import time
import concurrent.futures

def read_csv_file(csv_file_path):
    return pd.read_csv(csv_file_path)

def add_additional_text(data_frame, additional_text):
    data_frame['Body'] = additional_text + data_frame['Body']
    return data_frame

def convert_to_data_records(data_frame, text_column_name):
    data_records = []
    for index, row in data_frame.iterrows():
        record = {
            "prompt": row[text_column_name],
            "max_tokens": 512  # You can adjust this as needed
        }
        data_records.append(record)
    return data_records

def query_serving_endpoint(endpoint_name, data_records):
    w = WorkspaceClient()
    start_time = time.time()
    response = w.serving_endpoints.query(name=endpoint_name, dataframe_records=data_records)
    end_time = time.time()
    elapsed_time = end_time - start_time
    return response, elapsed_time

def process_batch(batch, endpoint_name, text_column_name):
    data_records = convert_to_data_records(batch, text_column_name)
    response, elapsed_time = query_serving_endpoint(endpoint_name, data_records)
    return elapsed_time, len(batch)

def test_performance_parallel(csv_file_path, endpoint_name, max_records_per_batch, text_column_name, additional_text):
    df = read_csv_file(csv_file_path)
    df = add_additional_text(df, additional_text)

    batch_sizes = [1, 2, 5, 10, 25, 50]
    best_batch_size = None
    best_average_time_per_record = float('inf')  # Initialize with infinity for comparison

    for batch_size in batch_sizes:
        print(f"\nTesting with batch size: {batch_size}")

        # Ensure each batch has a maximum of max_records_per_batch records
        batches = [df.iloc[i:i+max_records_per_batch] for i in range(0, len(df), max_records_per_batch)]

        total_elapsed_time = 0
        total_records_processed = 0

        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(executor.map(lambda b: process_batch(b, endpoint_name, text_column_name), batches))

        for elapsed_time, num_records in results:
            total_elapsed_time += elapsed_time
            total_records_processed += num_records

        average_time_per_record = total_elapsed_time / total_records_processed
        print(f"\nAverage time per record (batch size {batch_size}): {average_time_per_record:.4f} seconds")

        # Update the best batch size if the current one is better
        if average_time_per_record < best_average_time_per_record:
            best_batch_size = batch_size
            best_average_time_per_record = average_time_per_record

    print(f"\nBest Batch Size: {best_batch_size}, Best Average Time per Record: {best_average_time_per_record:.4f} seconds")

if __name__ == "__main__":
    text_column_name = "Body"
    csv_file_path = 'file:/Workspace/Users/luis.valencia@element61.be/.ide/BigDataParallelAPIProcessing-0fc9a8d0/processed_data.csv'
    endpoint_name = "mistral_7b_instruct_v0_1_marketplace"
    max_records_per_batch = 50
    additional_text = "You are a helpful assistant which only task is to analyze the text of emails sent to a customer service mailbox.  From this text you must extract a category.  No category labels will be provided. Email: "

    test_performance_parallel(csv_file_path, endpoint_name, max_records_per_batch, text_column_name, additional_text)


Results didnt improve, in fact the average per row is doubled, so in this case this wont help.

  • Testing with batch size: 1
  • Average time per record (batch size 1): 0.2517 seconds
  • Testing with batch size: 2
  • Average time per record (batch size 2): 0.2455 seconds
  • Testing with batch size: 5
  • Average time per record (batch size 5): 0.2515 seconds
  • Testing with batch size: 10
  • Average time per record (batch size 10): 0.2443 seconds
  • Testing with batch size: 25
  • Average time per record (batch size 25): 0.2508 seconds
  • Testing with batch size: 50
  • Average time per record (batch size 50): 0.2538 seconds
  • Best Batch Size: 10, Best Average Time per Record: 0.2443 seconds

Cost and Time Analysis of Email Processing with Mistral and OpenAI

In our comprehensive exploration of large-scale email processing, we compared Mistral running on your own Databricks environment with OpenAI on Azure. Let's delve into the key findings:

Mistral on Databricks:

  • In Databricks we pay for the compute which serves the model in a REST API, so we pay for uptime of this Virtual Machine. Based on 0.13 seconds per request, 10 Million Emails, this would take 13.89 days.
  • Compute size as suggested by Databricks: Standard_NV36ads_A10_v5. Price per hour. 5.36 USD.
  • Total Compute Cost: - Approximately $1943.73
  • Total Time: Approximately 13.89 days Mistral on Databricks offers a cost-effective solution with the flexibility to pay for the compute size required. The overall cost is influenced by the processing time per row and the chosen compute configuration.

OpenAI with Azure:

  • In Azure OpenAI, the cost is per API USAGE, we have assumed GPT 3.5, 1000 Tokens per email, 10M Emails, Each 1000 tokens is 0.002 USD. We assume each request will take 3 seconds.
  • Total Usage Cost: Approximately $40,000
  • Total Time: Approximately 347.22 days OpenAI with Azure provides an efficient and scalable option for large-scale text analysis. The cost is determined by the number of tokens, and the processing time is influenced by the average time per request.

Conclusions:

  • Cost Considerations: Mistral on Databricks proves to be a more cost-effective choice for our specific use case, with significantly lower overall expenses compared to OpenAI with Azure.
  • Processing Time: OpenAI with Azure exhibits longer processing times due to the sheer volume of tokens and the time per request, making Mistral a quicker option for the given scenario.
  • Flexibility vs. Scalability: Mistral on Databricks provides flexibility in compute size and environment control, while OpenAI with Azure showcases ease of setup in matter of minutes.

Ultimately, the choice between Mistral on Databricks and OpenAI with Azure depends on specific processing requirements, cost considerations, and the desired level of control over the environment.