Using Pulsar 3.0
INFO
Fun fact, the Pulsar transport was actually the very first messaging broker to be supported by Jasper/Wolverine, but for whatever reason, wasn't officially released until Wolverine 3.0.
Installing
To use Apache Pulsar as a messaging transport with Wolverine, first install the WolverineFx.Pulsar
library via nuget to your project. Behind the scenes, this package uses the DotPulsar client library managed library for accessing Pulsar brokers.
dotnet add WolverineFx.Pulsar
To connect to Pulsar and configure senders and listeners, use this syntax:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
opts.UsePulsar(c =>
{
var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
c.ServiceUrl(pulsarUri);
// Any other configuration you want to apply to your
// Pulsar client
});
// Publish messages to a particular Pulsar topic
opts.PublishMessage<Message1>()
.ToPulsarTopic("persistent://public/default/one")
// And all the normal Wolverine options...
.SendInline();
// Listen for incoming messages from a Pulsar topic
opts.ListenToPulsarTopic("persistent://public/default/two")
.SubscriptionName("two")
.SubscriptionType(SubscriptionType.Exclusive)
// And all the normal Wolverine options...
.Sequential();
});
The topic name format is set by Pulsar itself, and you can learn more about its format in Pulsar Topics.
INFO
Depending on demand, the Pulsar transport will be enhanced to support conventional routing topologies and more advanced topic routing later.
Read Only Subscriptions 3.13
As part of Wolverine's "Requeue" error handling action, the Pulsar transport tries to quietly create a matching sender for each Pulsar topic it's listening to. Great, but that will blow up if your application only has receive-only permissions to Pulsar. In this case, you probably want to disable Pulsar requeue actions altogether with this setting:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
opts.UsePulsar(c =>
{
var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
c.ServiceUrl(pulsarUri);
});
// Listen for incoming messages from a Pulsar topic
opts.ListenToPulsarTopic("persistent://public/default/two")
.SubscriptionName("two")
.SubscriptionType(SubscriptionType.Exclusive)
// Disable the requeue for this topic
.DisableRequeue()
// And all the normal Wolverine options...
.Sequential();
// Disable requeue for all Pulsar endpoints
opts.DisablePulsarRequeue();
});
If you have an application that has receive only access to a subscription but not permissions to publish to Pulsar, you cannot use the Wolverine "Requeue" error handling policy.
Subscription behavior when closing connection
By default, the Pulsar transport will automatically close the subscription when the endpoints is being stopped. If the subscription is created for you, and should be kept after application shut down, you can change this behavior.
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
opts.UsePulsar(c =>
{
var pulsarUri = builder.Configuration.GetValue<Uri>("pulsar");
c.ServiceUrl(pulsarUri);
});
// Disable unsubscribe on close for all Pulsar endpoints
opts.UnsubscribePulsarOnClose(PulsarUnsubscribeOnClose.Disabled);
});
Interoperability
TIP
Also see the more generic Wolverine Guide on Interoperability
Pulsar interoperability is done through the IPulsarEnvelopeMapper
interface.