Using MQTT
WARNING
Wolverine requires the V5 version of MQTT for its broker support
The Wolverine 1.9 release added a new transport option for the MQTT standard common in IoT Messaging.
Installing
To use MQTT as a transport with Wolverine, first install the Wolverine.MQTT
library via nuget to your project. Behind the scenes, this package uses the MQTTnet managed library for accessing MQTT brokers and also for its own testing.
dotnet add WolverineFx.Mqtt
In its most simplistic usage you enable the MQTT transport through calling the WolverineOptions.UseMqtt()
extension method and defining which MQTT topics you want to publish or subscribe to with the normal subscriber rules as shown in this sample:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Connect to the MQTT broker
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
});
// Listen to an MQTT topic, and this could also be a wildcard
// pattern
opts.ListenToMqttTopic("app/incoming")
// In the case of receiving JSON data, but
// not identifying metadata, tell Wolverine
// to assume the incoming message is this type
.DefaultIncomingMessage<Message1>()
// The default is AtLeastOnce
.QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
// Publish messages to an outbound topic
opts.PublishAllMessages()
.ToMqttTopic("app/outgoing");
});
using var host = builder.Build();
await host.StartAsync();
INFO
The MQTT transport at this time only supports endpoints that are either Buffered
or Durable
.
WARNING
The MQTT transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes effectively an inline "Retry"
Broadcast to User Defined Topics
As long as the MQTT transport is enabled in your application, you can explicitly publish messages to any named topic through this usage:
public static async Task broadcast(IMessageBus bus)
{
var paymentMade = new PaymentMade(200, "EUR");
await bus.BroadcastToTopicAsync("region/europe/incoming", paymentMade);
}
Publishing to Derived Topic Names
INFO
The Wolverine is open to extending the options for determining the topic name from the message type, but is waiting for feedback from the community before trying to build anything else around this.
As a way of routing messages to MQTT topics, you also have this option:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Connect to the MQTT broker
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
});
// Publish messages to MQTT topics based on
// the message type
opts.PublishAllMessages()
.ToMqttTopics()
.QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
});
using var host = builder.Build();
await host.StartAsync();
In this approach, all messages will be routed to MQTT topics. The topic name for each message type would be derived from either Wolverine's message type name rules or by using the [Topic("topic name")]
attribute as shown below:
[Topic("one")]
public class TopicMessage1;
Publishing by Topic Rules
You can publish messages to MQTT topics based on user defined logic to determine the actual topic name.
As an example, say you have a marker interfaces for your messages like this:
public interface ITenantMessage
{
string TenantId { get; }
}
To publish any message implementing that interface to an MQTT topic, you could specify the topic name logic like this:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Connect to the MQTT broker
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
});
// Publish any message that implements ITenantMessage to
// MQTT with a topic derived from the message
opts.PublishMessagesToMqttTopic<ITenantMessage>(m => $"{m.GetType().Name.ToLower()}/{m.TenantId}")
// Specify or configure sending through Wolverine for all
// MQTT topic broadcasting
.QualityOfService(MqttQualityOfServiceLevel.ExactlyOnce)
.BufferedInMemory();
});
using var host = builder.Build();
await host.StartAsync();
Listening by Topic Filter
Wolverine supports topic filters for listening. The syntax is still just the same ListenToMqttTopic(filter)
as shown in this snippet from the Wolverine.MQTT test suite:
_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseMqttWithLocalBroker(port);
opts.ListenToMqttTopic("incoming/#").RetainMessages();
}).StartAsync();
In the case of receiving any message that matches the topic filter according to the MQTT topic filter rules, that message will be handled by the listening endpoint defined for that filter.
Integrating with Non-Wolverine
It's quite likely that in using Wolverine with an MQTT broker that you will be communicating with non-Wolverine systems or devices on the other end, so you can't depend on the Wolverine metadata being sent in MQTT UserProperties
data. Not to worry, you've got options.
In the case of the external system sending you JSON, but nothing else, if you can design the system such that there's only one type of message coming into a certain MQTT topic, you can just tell Wolverine to listen for that topic and what that message type would be so that Wolverine is able to deserialize the message and relay that to the correct message handler like so:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Connect to the MQTT broker
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
});
// Listen to an MQTT topic, and this could also be a wildcard
// pattern
opts.ListenToMqttTopic("app/payments/made")
// In the case of receiving JSON data, but
// not identifying metadata, tell Wolverine
// to assume the incoming message is this type
.DefaultIncomingMessage<PaymentMade>();
});
using var host = builder.Build();
await host.StartAsync();
For more complex interoperability, you can implement the IMqttEnvelopeMapper
interface in Wolverine to map between incoming and outgoing MQTT messages and the Wolverine Envelope
structure. Here's an example:
public class MyMqttEnvelopeMapper : IMqttEnvelopeMapper
{
public void MapEnvelopeToOutgoing(Envelope envelope, MqttApplicationMessage outgoing)
{
// This is the only absolutely mandatory item
outgoing.PayloadSegment = envelope.Data;
// Maybe enrich this more?
outgoing.ContentType = envelope.ContentType;
}
public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage incoming)
{
// These are the absolute minimums necessary for Wolverine to function
envelope.MessageType = typeof(PaymentMade).ToMessageTypeName();
envelope.Data = incoming.PayloadSegment.Array;
// Optional items
envelope.DeliverWithin = 5.Seconds(); // throw away the message if it
// is not successfully processed
// within 5 seconds
}
public IEnumerable<string> AllHeaders()
{
yield break;
}
}
And apply that to an MQTT topic like so:
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Connect to the MQTT broker
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
});
// Publish messages to MQTT topics based on
// the message type
opts.PublishAllMessages()
.ToMqttTopics()
// Tell Wolverine to map envelopes to MQTT messages
// with our custom strategy
.UseInterop(new MyMqttEnvelopeMapper())
.QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
});
using var host = builder.Build();
await host.StartAsync();
Clearing Out Retained Messages
MQTT brokers allow you to publish retained messages to a topic, meaning that the last message will always be retained by the broker and sent to any new subscribers. That's a little bit problematic if your Wolverine application happens to be restarted, because that last retained message may easily be resent to your Wolverine application when you restart.
Not to fear, the MQTT protocol allows you to "clear" out a topic by sending it a zero byte message, and Wolverine has a couple shortcuts for doing just that by returning a cascading message to "zero out" the topic a message was received on or a named topic like this:
public static AckMqttTopic Handle(ZeroMessage message)
{
// "Zero out" the topic that the original message was received from
return new AckMqttTopic();
}
public static ClearMqttTopic Handle(TriggerZero message)
{
// "Zero out" the designated topic
return new ClearMqttTopic("red");
}