The Ops Community ⚙️

Cover image for How to tune Dapr bulk publish/subscribe for maximum throughput
Kai Walter
Kai Walter

Posted on • Edited on

How to tune Dapr bulk publish/subscribe for maximum throughput

TL;DR

In this post I show

  • how to convert a .NET C# program processing messages from Azure Service Bus with Dapr input bindings to Dapr bulk pubsub
  • how to activate and remove Dapr components even with an incremental ARM/Bicep deployment
  • how to use Dapr multi run for testing locally
  • what to consider and where to measure to achieve desired throughput goals

Motivation

real motivation: In my day job as an architect I am often too far away from coding artifacts that really go into production (and would deliver value and sustain there). Folks just keep me away from production code for a good reason : my prime days for such kind of work are over, as practices evolved too far for me to keep up with just coding from time to time.
However, to unwind, I grant myself the occassional excursion, try out and combine stuff that is hard to do in product or project context's with ambitioned deadlines and value-delivery expectations.

This post builds up or is a sequel to my previous post on comparing Azure Functions vs Dapr throughput on Azure Container Apps. I detected back then, that processing a given workload of 10k messages with an ASP.NET application backed with Dapr seems to perform faster than doing that with a Functions container or the Functions on ACA footprint.

Some of the underpinnings of that offering are going to get improved, on which I will make a post when released. These changes however already show a significant improvement in processing of the Functions stacks, with now the Dapr variant falling behind:

timestamp stack cycle time in seconds
2024-02-05T17:09:54 FUNC 30
2024-02-05T17:19:17 ACAF 36
2024-02-05T17:28:49 DAPR 46
2024-02-05T17:38:35 FUNC 32
2024-02-06T05:50:08 ACAF 47
2024-02-06T06:11:08 DAPR 53
2024-02-06T06:32:11 FUNC 31
2024-02-06T06:52:40 ACAF 34
2024-02-06T07:15:33 DAPR 46
2024-02-06T07:37:28 FUNC 38
2024-02-08T13:38:10 ACAF 50
2024-02-08T13:45:23 DAPR 63
2024-02-08T13:52:40 FUNC 36
2024-02-08T13:59:45 ACAF 37
2024-02-08T14:06:37 DAPR 43
2024-02-08T14:13:58 FUNC 35
2024-02-08T14:21:01 ACAF 36
2024-02-08T14:27:43 DAPR 47
2024-02-08T14:35:01 FUNC 37
2024-02-08T14:42:00 ACAF 38
2024-02-08T14:48:43 DAPR 46
average / standard deviation: ACAF = Functions on ACA 39.7 / 6.2
DAPR = Dapr in Container on ACA 49.1 / 6.8
FUNC = Function in Container on ACA 34.1 / 3.1
  1. measured with Azure Container Apps Consumption workload profile, not a dedicated workload profile!
  2. to achieve a more realistic measurement, different to the original post, not the timestamps (in telemetry, Application Insights) between the first and last request processed are measured but the time from when the 10k messages are scheduled until all messages have been stored into blobs

Back then I did not max out the capabilities of Dapr's message processing capabilities, as the point we had to prove internally - "What is faster: Dapr or Functions?" - already showed the desired results. Also the Dapr implementation with Azure Service Bus Dapr input bindings reflected the majority of our implementations.

This post is about what I did, to squeeze more performance out of the Dapr implementation. The goal would be also have a cycle time of 30-35 seconds for the Dapr implementation.

Basic Assumption

In the Dapr implementation so far, for each of the 10k messages an input binding endpoint is frantically hit by the Dapr sidecar which has the application to invoke respective output binding endpoints in the same frequency:

app.MapPost($"/q-order-ingress-{testCase}-input", async (
    [FromBody] Order order,
    [FromServices] DaprClient daprClient
    ) =>
{
    switch (order.Delivery)
    {
        case Delivery.Express:
            await daprClient.InvokeBindingAsync($"q-order-express-{testCase}-output", "create", order);
            break;
        case Delivery.Standard:
            await daprClient.InvokeBindingAsync($"q-order-standard-{testCase}-output", "create", order);
            break;
    }

    return Results.Ok(order);
});
Enter fullscreen mode Exit fullscreen mode

While exploring other hosting and compute options like Dapr, WebAssembly and Spin I realized, that this kind of implementation causes a lot of traffic in the Dapr to application communication which can easily stand in the way of faster compute processing.

Dapr still has an option available (currently in alpha status) to reduce this kind of noice by reducing amount of invocations exchanged between Dapr and the App : Publish and subscribe to bulk messages.

Conversion

all code snippets shown hereinafter, can be followed along and found in dapr-pubsub branch of the repository I already used in the previous post

Convert To Single PubSub

I try to take small steps forward when converting or transforming implementations - just to to be able keep track on cause and effect.

Conversion required in .NET code to get from input bindings to pubsub are minor. I still keep Azure Service Bus queues as messaging resource and will not switch to topics. This is a preference on my end as I prefer the dedicated dead letter handling of queues on single edges on a given messaging path than over dead letter handling with topics.

When using Azure Service Bus queues with Dapr pubsub, order-pubsub in PublishEventAsync below refers to a single Dapr component in ACA (Azure Container Apps environment) - compared to indvidual components required for input and output bindings - while q-order-... determines that queue itself.

Also at his point I do not burden my self with a conversion to CloudEvents and keep message processing "raw" : { "rawPayload", "true"} when publishing and avoiding app.UseCloudEvents(); on application startup. I noticed that, when combining bulk pubsub with CloudEvents, the message payload is suddenly rendered in JSON camelCase (!) instead of the expected PascalCase and that is definetely a worry for another day.

app.MapPost($"/q-order-ingress-{testCase}-pubsub-single", async (
    [FromBody] Order order,
    [FromServices] DaprClient daprClient
    ) =>
{
    var metadata = new Dictionary<string, string>
        {
          { "rawPayload", "true"}
        };

    switch (order.Delivery)
    {
        case Delivery.Express:
            await daprClient.PublishEventAsync("order-pubsub", $"q-order-express-{testCase}", order, metadata);
            break;
        case Delivery.Standard:
            await daprClient.PublishEventAsync("order-pubsub", $"q-order-standard-{testCase}", order, metadata);
            break;
    }

    return Results.Ok(order);
});
Enter fullscreen mode Exit fullscreen mode

As I want to manage pubsub subscriptions programmatically, an endpoint to configure these subscriptions is required:

app.MapGet("/dapr/subscribe", () => Results.Ok(new[]{
new {
    pubsubname = "order-pubsub",
    topic = $"q-order-ingress-{testCase}",
    route = $"/q-order-ingress-{testCase}-pubsub-single"
}}));
Enter fullscreen mode Exit fullscreen mode

this is the daprdistributor implementation; daprreceiver implementation is adapted accordingly receiving messages from pubsub instead from input binding

Dapr Component Switching

To be able to switch the environment between both bindings and pubsub, as the general flow is controlled by Dapr components not by the application, I need to have a Bicep parameter like ...

@description('determines whether bindings or pubsub is deployed for the experiment')
@allowed([
  'bindings'
  'pubsub'
])
param daprComponentsModel string
Enter fullscreen mode Exit fullscreen mode

... which then controls relevance of Dapr components through scoping for the individual applications:

// switch valid application ids for the respective deployment model
var scopesBindings = daprComponentsModel == 'bindings' ? {
  distributor: 'daprdistributor'
  recvexp: 'daprrecvexp'
  recvstd: 'daprrecvstd'
} : {
  distributor: 'skip'
  recvexp: 'skip'
  recvstd: 'skip'
}

var scopesPubSub = daprComponentsModel == 'pubsub' ? [
  'daprdistributor'
  'daprrecvexp'
  'daprrecvstd'
] : [
  'skip'
]
Enter fullscreen mode Exit fullscreen mode

These scopes are then linked to Dapr components relevant for the desired model.

  resource pubSubComponent 'daprComponents' = {
    name: 'order-pubsub'
    properties: {
      componentType: 'pubsub.azure.servicebus.queues'
      version: 'v1'
      secrets: [
        {
          name: 'sb-root-connectionstring'
          value: '${listKeys('${sb.id}/AuthorizationRules/RootManageSharedAccessKey', sb.apiVersion).primaryConnectionString};EntityPath=orders'
        }
      ]
      metadata: [
        {
          name: 'connectionString'
          secretRef: 'sb-root-connectionstring'
        }
        {
          name: 'maxActiveMessages'
          value: '1000'
        }
        {
          name: 'maxConcurrentHandlers'
          value: '8'
        }
      ]
      scopes: scopesPubSub
    }
  }
Enter fullscreen mode Exit fullscreen mode

Although this way of handling switching Dapr components in a ARM/Bicep incremental deployment context is not production grade and requires a post-deployment step like ...

az containerapp env dapr-component list -g $RESOURCE_GROUP_NAME -n $ACAENV_NAME \
  --query "[?properties.scopes[0]=='skip'].name" -o tsv | \
  xargs -n1 az containerapp env dapr-component remove -g $RESOURCE_GROUP_NAME -n $ACAENV_NAME --dapr-component-name
Enter fullscreen mode Exit fullscreen mode

... to clean up non-used components, it is OK-ish for such an evaluation environment at the moment.

In the end I then just need to modify the .env file in the repositories root, to switch the desired model:

AZURE_LOCATION=eastus
AZURE_ENV_NAME=my-env
DAPR_COMPONENTS_MODEL=bindings
DAPR_PUBSUB_MODEL=single
Enter fullscreen mode Exit fullscreen mode

Local Testing

With all those small changes and re-tests ahead I invested some more time in local testability of the whole environment. For that I created a Dapr Multi-App Run file which allows to run all applications in concert:

version: 1
apps:
  - appID: distributor
    appDirPath: ./src/daprdistributor/
    resourcesPath: ./components-pubsub
    configFilePath: ./components-pubsub/config.yaml
    appProtocol: http
    appPort: 3001
    daprHTTPPort: 3501
    appHealthCheckPath: "/health"
    placementHostAddress: ""
    logLevel: "debug"
    command: ["dotnet", "run"]
    env:
      ASPNETCORE_URLS: http://localhost:3001
      ASPNETCORE_ENVIRONMENT: Development
      TESTCASE: dapr
  - appID: receiver-express
    appDirPath: ./src/daprreceiver/
    resourcesPath: ./components-pubsub-express
    configFilePath: ./components-pubsub-express/config.yaml
    appProtocol: http
    appPort: 3002
    daprHTTPPort: 3502
    appHealthCheckPath: "/health"
    placementHostAddress: ""
    logLevel: "debug"
    command: ["dotnet", "run"]
    env:
      ASPNETCORE_URLS: http://localhost:3002
      ASPNETCORE_ENVIRONMENT: Development
      TESTCASE: dapr
      INSTANCE: express
  - appID: receiver-standard
    appDirPath: ./src/daprreceiver/
    resourcesPath: ./components-pubsub-standard
    configFilePath: ./components-pubsub-standard/config.yaml
    appProtocol: http
    appPort: 3003
    daprHTTPPort: 3503
    appHealthCheckPath: "/health"
    placementHostAddress: ""
    logLevel: "debug"
    command: ["dotnet", "run"]
    env:
      ASPNETCORE_URLS: http://localhost:3003
      ASPNETCORE_ENVIRONMENT: Development
      TESTCASE: dapr
      INSTANCE: standard
Enter fullscreen mode Exit fullscreen mode

Accompanied by a small script to publish both types of messages into the local environment and observe the flow:

#!/bin/bash

set -e

ORDERID=0

function generate_message()
{
  UUID=`uuidgen`
  if [[ $((ORDERID % 2)) -eq 0 ]]; then
    DELIVERY=Express
  else
    DELIVERY=Standard
  fi
  jq -c -n \
        --arg uuid "$UUID" \
        --arg orderid "$ORDERID" \
        --arg delivery "$DELIVERY" \
        '{OrderId: $orderid|tonumber, OrderGuid: $uuid, Delivery: $delivery}'
}

ORDERID=$((ORDERID + 1))
dapr publish --publish-app-id distributor \
  --topic q-order-ingress-dapr \
  --pubsub order-pubsub \
  --data "$(generate_message)" \
  --metadata '{"rawPayload":"true"}'

ORDERID=$((ORDERID + 1))
dapr publish --publish-app-id distributor \
  --topic q-order-ingress-dapr \
  --pubsub order-pubsub \
  --data "$(generate_message)" \
  --metadata '{"rawPayload":"true"}'
Enter fullscreen mode Exit fullscreen mode

First Pit Stop

It seems using pubsub puts a toll on the performance.

implementation average, 5 cycles standard deviation
Dapr input bindings, before conversion 48.6s 6.2s
Dapr single pubsub 103.4s 7.5s

The Valley Of Tears

I thought, with just implementing bulk pubsub performance will increase dramatically. I reduce single message transfers to block transfers and it seems logical, that just by that messages are bursted through the environment at light speed. Well, ...

Converting To Bulk PubSub

To apply bulk pubsub on subscription programmatically, just meta information needs to be enhanced:

app.MapGet("/dapr/subscribe", () => Results.Ok(new[]{
new {
    pubsubname = "order-pubsub",
    topic = $"q-order-ingress-{testCase}",
    route = $"/q-order-ingress-{testCase}-pubsub",
    bulkSubscribe = new {
      enabled = true,
      maxMessagesCount = 100,
      maxAwaitDurationMs = 40,
    }
}}));
Enter fullscreen mode Exit fullscreen mode

On the processing endpoint changes are more substantial:

  • messages are passed in a bulk, hence looping is required to process single messages
  • response collections need to be maintained to tell Dapr, which of the incoming entries/messages processed successful, which need to be retried or dropped
  • messages to be published in a bulk also need to be collected for sending outbound
  • again, let no CloudEvents slip in at that moment
app.MapPost($"/q-order-ingress-{testCase}-pubsub", async (
    [FromBody] BulkSubscribeMessage<Order> bulkOrders,
    [FromServices] DaprClient daprClient,
    ILogger<Program> log
    ) =>
{
    log.LogInformation("{Count} Orders to distribute", bulkOrders.Entries.Count);
    List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>(bulkOrders.Entries.Count);
    var expressEntries = new List<BulkSubscribeMessageEntry<Order>>(bulkOrders.Entries.Count);
    var standardEntries = new List<BulkSubscribeMessageEntry<Order>>(bulkOrders.Entries.Count);

    foreach (var entry in bulkOrders.Entries)
    {
        var order = entry.Event;

        switch (order.Delivery)
        {
            case Delivery.Express:
                expressEntries.Add(entry);
                break;
            case Delivery.Standard:
                standardEntries.Add(entry);
                break;
        }
    }

    var metadata = new Dictionary<string, string>
        {
          { "rawPayload", "true"}
        };

    if (expressEntries.Count > 0)
    {
        try
        {
            var orders = from e in expressEntries select e.Event;
            await daprClient.BulkPublishEventAsync("order-pubsub", $"q-order-express-{testCase}", orders.ToArray(), metadata);
            foreach (var entry in expressEntries)
            {
                responseEntries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
            }
        }
        catch (Exception e)
        {
            log.LogError(e.Message);
            foreach (var entry in expressEntries)
            {
                responseEntries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
            }
        }
    }

    if (standardEntries.Count > 0)
    {
        try
        {
            var orders = from e in standardEntries select e.Event;
            await daprClient.BulkPublishEventAsync("order-pubsub", $"q-order-standard-{testCase}", orders.ToArray(), metadata);
            foreach (var entry in standardEntries)
            {
                responseEntries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
            }
        }
        catch (Exception e)
        {
            log.LogError(e.Message);
            foreach (var entry in standardEntries)
            {
                responseEntries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
            }
        }
    }


    return new BulkSubscribeAppResponse(responseEntries);
});
Enter fullscreen mode Exit fullscreen mode

this is the daprdistributor implementation; daprreceiver implementation is adapted accordingly receiving bulk messages but storing to blob with single requests

Second Pit Stop

Even with having 2 cycles faster then 40s, still there are outliers which take 60-70s. Which in turn results in this average:

implementation maxMessagesCount average, 5 cycles standard deviation
Dapr input bindings n.a. 48.6s 6.2s
Dapr single pubsub n.a. 103.4s 7.5s
Dapr bulk pubsub 100 57.6s 19.2s

Crank It Up

A maxMessagesCount of 100 does not seem to make a dent. Let's increase:

      maxMessagesCount = 500,
Enter fullscreen mode Exit fullscreen mode

Although having half of the requests with a significant better cycle time, the other half still took twice the time (standard deviation ~21s on a 64s average):

timestamp cycle time
2024-02-12T06:15:09 36
2024-02-12T06:22:50 43
2024-02-12T06:29:41 80
2024-02-12T06:37:04 81
2024-02-12T06:45:04 78
2024-02-12T06:52:34 38

Using a query like ...

requests
| where timestamp between( todatetime('2024-02-12T06:45:04.7285069Z') .. todatetime('2024-02-12T06:52:34.2132486Z') )
| where name startswith "POST" and cloud_RoleName matches regex "^[\\d\\w\\-]+dapr"
| where success == true
| summarize count() by cloud_RoleName, bin(timestamp, 5s)
| render columnchart
Enter fullscreen mode Exit fullscreen mode

... shows a processing gap towards the end of processing ...

78 seconds cycle time with spread of request processing

... while in this sample processing is in one block without a gap:

38 seconds cycle time with cohesive request processing

Looking at the replica count for all 3 services during the testing period shows, that those go consitently up to 10 and down again. Also no replica restarts are reported. Hence non-availibilty of replicas can be ruled out for causing these gaps.

replica count of all 3 services during test period

Additionally I found many RETRY errors captured for during bulk subscribe processing, ...

time="2024-02-12T06:30:00.08158319A3Z" level=error msg="App handler returned an error for message 5888d18e-da39-44b4-81d9-a2e11bef21bb on queue q-order-ingress-dapr: RETRY required while processing bulk subscribe event for entry id: 27180f25-502b-4d13-a738-77306a6d3f01"
Enter fullscreen mode Exit fullscreen mode

... spread out over all test cycles but only logged for Distributor service.

occurences of bulk subscribe RETRY errors

Looking for the root cause of these retries I found ...

dependencies
| where timestamp between( todatetime('2024-02-12T06:15:00') .. todatetime('2024-02-12T07:00:00') )
| where cloud_RoleName endswith "daprdistributor"
| where name == "/dapr.proto.runtime.v1.dapr/bulkpublisheventalpha1"
| order by timestamp asc
Enter fullscreen mode Exit fullscreen mode
error when publish to topic q-order-express-dapr in pubsub order-pubsub: the message is too large
Enter fullscreen mode Exit fullscreen mode

... which means, that a maxMessagesCount of 500 can cause Distributor to generate too large bulk publish packets.

OK, so let's increase carefully ...

      maxMessagesCount = 250,
Enter fullscreen mode Exit fullscreen mode

... and then check for errors again.

dependencies
| where customDimensions.error != ""
| order by timestamp desc
Enter fullscreen mode Exit fullscreen mode

Third Pit Stop

Not quite, but close.

implementation maxMessagesCount average, 5 cycles standard deviation
Dapr input bindings n.a. 48.6s 6.2s
Dapr single pubsub n.a. 103.4s 7.5s
Dapr bulk pubsub 100 57.6s 19.2s
Dapr bulk pubsub 250 42.8s 13.1s

Still for the slow test cycles there is a gap until all records are processed:

76 seconds cycle time with spread of request processing

Exactly for these kind of measurements I log the bulk messsage count with log.LogInformation("{Count} Orders to distribute", bulkOrders.Entries.Count); to have an indication on how many messages are handed over in one bulk.

At this point I even tried to switch to a dedicated workload profile type - instead of consumption - to rule out that "noisy neighbors" are causing the processing to lag. But no, same effects also with dedicated this processing gap occured.

Side node: I had to go up to E8 workload profile type to get to a processing speed similar to consumption model.

Beware Of Message Locks

Fine. Back to telemetry. Checking what else is logged on the containers ...

ContainerAppConsoleLogs_CL
| where TimeGenerated between( todatetime('2024-02-13T16:07:05.8171428Z') .. todatetime('2024-02-13T16:45:47.5552143Z') )
| where ContainerAppName_s contains "daprdist"
| where Log_s !contains "level=info"
| order by TimeGenerated asc
Enter fullscreen mode Exit fullscreen mode

... reveals that almost exactly at the point before final processing continues, this type of error is logged:

...
time="2024-02-13T16:14:05.778260655Z" level=warning msg="Error renewing message locks for queue q-order-ingress-dapr (failed: 43/758):
couldn't renew active message lock for message ff3d2095-effb-428e-87bf-e131c961ac4e: lock has been lost
...
Enter fullscreen mode Exit fullscreen mode

Looking at the order-pubsub component in the containerapps.bicep I realized I missed a spot. Measuring earlier how many bulks could be processed at full size (e.g. 100) I observed, that very soon during the processing cycle, the Dapr Service Bus implementation was creating smaller bulks (even with size 1) as 8 maxConcurrentHandlers, after waiting maxAwaitDurationMs = 40 (see pubsub subscribe above), were just pulling "what they get". Hence I tuned down maxConcurrentHandlers to 1 to avoid that kind of congestion.

resource pubSubComponent 'daprComponents' = {
    name: 'order-pubsub'
    properties: {
      componentType: 'pubsub.azure.servicebus.queues'
      version: 'v1'
      secrets: [
        {
          name: 'sb-root-connectionstring'
          value: '${listKeys('${sb.id}/AuthorizationRules/RootManageSharedAccessKey', sb.apiVersion).primaryConnectionString};EntityPath=orders'
        }
      ]
      metadata: [
        {
          name: 'connectionString'
          secretRef: 'sb-root-connectionstring'
        }
        {
          name: 'maxActiveMessages'
          value: '1000'
        }
        {
          name: 'maxConcurrentHandlers'
          value: '1'
        }
      ]
      scopes: scopesPubSub
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

However I did not pay attention to maxActiveMessages - which before were divided by 8 handlers, what in turn allowed Dapr sidecar and the app to process a block of 125 locked message in time (before the lock expires). Now 1 handler was locking all 1000 messages and even with bulk processing some message locks could expire.

Consequently I think it makes more sense to tune the amount of active messages pulled at a time to the potential bulk size:

        {
          name: 'maxActiveMessages'
          value: '250'
        }
Enter fullscreen mode Exit fullscreen mode

The Last Mile

Changing this setting did not make the warning completely go away. As Alessandro Segala states in this issue : "This happens because when it's time to renew the locks, at the interval, we "snapshot" the active messages and then renew each one's lock in sequence. If you have a high number for maxActiveMessages, it takes time for the component to renew the locks for each one, and if the app has ACK'd the message in the meanwhile, then the lock renewal fails."

However as results show: the better noise and congestion is reduced in the environment, the better and more reliable the throughput.

implementation maxMessagesCount maxActiveMessages maxConcurrentHandlers average, 5 cycles standard deviation
Dapr input bindings n.a. 1000 8 48.6s 6.2s
Dapr single pubsub n.a. 1000 8 103.4s 7.5s
Dapr bulk pubsub 100 1000 1 57.6s 19.2s
Dapr bulk pubsub 250 1000 1 42.8s 13.1s
Dapr bulk pubsub 250 250 1 28.0s 3.6s

Disclaimer: This combination of configuration values is just tuned for this particular scenario. With this post I just want to show, what to consider, what to look out for and where to calibrate.

Conclusion

Running all 3 stacks on the same environment now shows, that an invest in bulk pubsub definitely pays out.

stack average, 6 cycles standard deviation
ACAF = Functions on ACA 36.0s 3.5s
DAPR = Dapr in Container on ACA 27.8s 1.6s
FUNC = Function in Container on ACA 34.0s 2.4s

As always there is no free lunch and some points have to be considered:

  • Dapr HTTP and gRPC payload size
  • just switching to bulk pubsub is not enough; the additional aggregation and collection times (see maxAwaitDurationMs) need to be outweighed by the gains achieved with packaging data
  • with larger payloads moving through an environment, impact on all involved components needs to be considered and then balanced for optimum outcome

When I set out writing this post I already observed some of these lags shown above, not sure whether I would be able to identify and overcome those. But hey, that are exactly the challenges I seek.

Top comments (0)