Skip to content

The search box in the website knows all the secrets—try it!

For any queries, join our Discord Channel to reach us faster.

JasperFx Logo

JasperFx provides formal support for Wolverine and other JasperFx libraries. Please check our Support Plans for more details.

Dealing with Concurrency

Lions and tigers and bears, oh my!

With a little bit of research today -- and unfortunately my own experience -- here's a list of some of the problems that can be caused by concurrent message processing in your system trying to access or modify the same resources or data:

  • Race conditions
  • Deadlocks
  • Consistency errors when multiple threads may be overwriting the same data and some changes get lost
  • Out of order processing that may lead to erroneous results
  • Exceptions from tools like Marten that helpfully try to stop concurrent changes through optimistic concurrency

Because these issues are so common in the kind of systems you would want to use a tool like Wolverine on in the first place, the Wolverine community has invested quite heavily in features to help you manage concurrent access in your system.

Error Retries on Concurrency Errors

If you don't expect many concurrency exceptions, you can probably get away with some kind of optimistic concurrency. Using the aggregate handler workflow integration with Marten as an example, there is some built in optimistic concurrency in Marten just to protect your system from simultaneous writes to the same event stream. In the case when Marten determines that something else has written to an event stream between your command handling starting and it trying to commit changes, Marten will throw the JasperFx.ConcurrencyException.

If we're doing simplistic optimistic checks, we might be perfectly fine with a global error handler that simply retries any failure due to this exception a few times:

cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
    opts
        // On optimistic concurrency failures from Marten
        .OnException<ConcurrencyException>()
        .RetryWithCooldown(100.Milliseconds(), 250.Milliseconds(), 500.Milliseconds())
        .Then.MoveToErrorQueue();
});

snippet source | anchor

Of course though, sometimes you are opting into a more stringent form of optimistic concurrency where the handler should fail fast if an event stream has advanced beyond a specific version number, as in the usage of this command message:

csharp
public record MarkItemReady(Guid OrderId, string ItemName, int Version);

In that case, there's absolutely no value in retrying the message, so we should use a different error handling policy to move that message off immediately like one of these:

cs
public static class MarkItemReadyHandler
{
    // This will let us specify error handling policies specific
    // to only this message handler
    public static void Configure(HandlerChain chain)
    {
        // Can't ever process this message, so send it directly 
        // to the DLQ
        // Do not pass Go, do not collect $200...
        chain.OnException<ConcurrencyException>()
            .MoveToErrorQueue();
        
        // Or instead...
        // Can't ever process this message, so just throw it away
        // Do not pass Go, do not collect $200...
        chain.OnException<ConcurrencyException>()
            .Discard();
    }
    
    public static IEnumerable<object> Post(
        MarkItemReady command, 
        
        // Wolverine + Marten will assert that the Order stream
        // in question has not advanced from command.Version
        [WriteAggregate] Order order)
    {
        // process the message and emit events
        yield break;
    }
}

snippet source | anchor

Exclusive Locks or Serializable Transactions

You can try to deal with concurrency problems by utilizing whatever database tooling you're using for whatever exclusive locks or serializable transaction support they might have. The integration with Marten has an option for exclusive locks with the "Aggregate Handler Workflow." With EF Core, you should be able to opt into starting your own serializable transaction.

The Wolverine team considers these approaches to maybe a necessary evil, but hopefully a temporary solution. We would probably recommend in most cases that you protect your system from concurrent access through selective queueing as much as possible as discussed in the next section.

Using Queueing

In many cases you can use queueing of some sort to reduce concurrent access to sensitive resources within your system. The most draconian way to do this is to say that all messages in a given queue will be executed single file in strict order on one single node within your application like so:

cs
var builder = Host.CreateApplicationBuilder()
    .UseWolverine(opts =>
    {
        opts.UseRabbitMq();

        // Wolverine will *only* listen to this queue
        // on one single node and process messages in strict
        // order
        opts.ListenToRabbitQueue("control").ListenWithStrictOrdering();

        opts.Publish(x =>
        {
            // Just keying off a made up marker interface
            x.MessagesImplementing<IControlMessage>();
            x.ToRabbitQueue("control");
        });
    });

snippet source | anchor

The strict ordering usage definitely limits the throughput in your system while largely eliminating issues due to concurrency. This option is useful for fast processing messages where you may be coordinating long running work throughout the rest of your system. This has proven useful in file ingestion processes or systems that have to manage long running processes in other nodes.

More likely though, to both protect against concurrent access against resources that are prone to issues with concurrent access and allow for greater throughput, you may want to reach for either:

Released under the MIT License.