Event Forwarding
TIP
As of Wolverine 2.2, you can use IEvent<T>
as the message type in a handler as part of the event forwarding when you need to utilize Marten metadata
WARNING
The Wolverine team recommends against combining this functionality with also using events as either a handler response or cascaded messages as the behavior can easily become confusing. Instead, prefer using custom types for handler responses or HTTP response bodies instead of the raw event types when using the event forwarding.
The "Event Forwarding" feature immediately pushes any event captured by Marten through Wolverine's persistent outbox where there is a known subscriber (either a local message handler or a known subscriber rule to that event type). The "Event Forwarding" publishes the new events as soon as the containing transaction is successfully committed. This is different from the Event Subscriptions in that there is no ordering guarantee, and does require you to use the Wolverine transactional middleware for Marten.
TIP
The strong recommendation is to use either subscriptions or event forwarding, but not both in the same application.
To be clear, this will work for:
- Any event type where the Wolverine application has a message handler for either the event type itself, or
IEvent<T>
whereT
is the event type - Any event type where there is a known message subscription for that event type or its wrapping
IEvent<T>
to an external transport
Timing wise, the "event forwarding" happens at the time of committing the transaction for the original message that spawned the new events, and the resulting event messages go out as cascading messages only after the original transaction succeeds -- just like any other outbox usage. There is no guarantee about ordering in this case. Instead, Wolverine is trying to have these events processed as soon as possible.
To opt into this feature, chain the Wolverine AddMarten().EventForwardingToWolverine()
call as shown in this application bootstrapping sample shown below:
builder.Services.AddMarten(opts =>
{
var connString = builder
.Configuration
.GetConnectionString("marten");
opts.Connection(connString);
// There will be more here later...
opts.Projections
.Add<AppointmentDurationProjection>(ProjectionLifecycle.Async);
// OR ???
// opts.Projections
// .Add<AppointmentDurationProjection>(ProjectionLifecycle.Inline);
opts.Projections.Add<AppointmentProjection>(ProjectionLifecycle.Inline);
opts.Projections
.Snapshot<ProviderShift>(SnapshotLifecycle.Async);
})
// This adds a hosted service to run
// asynchronous projections in a background process
.AddAsyncDaemon(DaemonMode.HotCold)
// I added this to enroll Marten in the Wolverine outbox
.IntegrateWithWolverine()
// I also added this to opt into events being forward to
// the Wolverine outbox during SaveChangesAsync()
.EventForwardingToWolverine();
This does need to be paired with a little bit of Wolverine configuration to add subscriptions to event types like so:
builder.Host.UseWolverine(opts =>
{
// I'm choosing to process any ChartingFinished event messages
// in a separate, local queue with persistent messages for the inbox/outbox
opts.PublishMessage<ChartingFinished>()
.ToLocalQueue("charting")
.UseDurableInbox();
// If we encounter a concurrency exception, just try it immediately
// up to 3 times total
opts.Policies.OnException<ConcurrencyException>().RetryTimes(3);
// It's an imperfect world, and sometimes transient connectivity errors
// to the database happen
opts.Policies.OnException<NpgsqlException>()
.RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
// Automatic usage of transactional middleware as
// Wolverine recognizes that an HTTP endpoint or message handler
// persists data
opts.Policies.AutoApplyTransactions();
});
This forwarding of events is using an outbox that can be awaited in your tests using this extension method:
public static Task<ITrackedSession> SaveInMartenAndWaitForOutgoingMessagesAsync(this IHost host, Action<IDocumentSession> action, int timeoutInMilliseconds = 5000)
{
var factory = host.Services.GetRequiredService<OutboxedSessionFactory>();
return host.ExecuteAndWaitAsync(async context =>
{
var session = factory.OpenSession(context);
action(session);
await session.SaveChangesAsync();
// Shouldn't be necessary, but real life says do it anyway
await context.As<MessageContext>().FlushOutgoingMessagesAsync();
}, timeoutInMilliseconds);
}
To be used in your tests such as this:
[Fact]
public async Task execution_of_forwarded_events_can_be_awaited_from_tests()
{
var host = await Host.CreateDefaultBuilder()
.UseWolverine()
.ConfigureServices(services =>
{
services.AddMarten(Servers.PostgresConnectionString)
.IntegrateWithWolverine().EventForwardingToWolverine(opts =>
{
opts.SubscribeToEvent<SecondEvent>().TransformedTo(e =>
new SecondMessage(e.StreamId, e.Sequence));
});
}).StartAsync();
var aggregateId = Guid.NewGuid();
await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session =>
{
session.Events.Append(aggregateId, new SecondEvent());
}, 100_000);
using var store = host.Services.GetRequiredService<IDocumentStore>();
await using var session = store.LightweightSession();
var events = await session.Events.FetchStreamAsync(aggregateId);
events.Count.ShouldBe(2);
events[0].Data.ShouldBeOfType<SecondEvent>();
events[1].Data.ShouldBeOfType<FourthEvent>();
}
Where the result contains FourthEvent
because SecondEvent
was forwarded as SecondMessage
and that persisted FourthEvent
in a handler such as:
public static Task HandleAsync(SecondMessage message, IDocumentSession session)
{
session.Events.Append(message.AggregateId, new FourthEvent());
return session.SaveChangesAsync();
}