Contents
|
NServiceBus Message Mutators Sample
In NServiceBus V2.6 it was a bit more tricky to change messages as they were sent to and from endpoints. This may be used to encrypt all or part of a message. The encryption message mutator comes as part of the NServiceBus library, and can be used at any time. In order to see MessageMutators in action, open the MessageMutator sample. First of all run the solution - you should see two console applications start up. Find the client application by looking for the one with "Client" in its path and press 's' and 'Enter' in the window. Then press 'e' followed by 'Enter': Your screen should look something like this, the exception message is expected:
Now let's go look at the code:
Code Walk-ThroughThis sample shows how to create a custom message mutator.
Let's first take a quick look at the interfaces involved.
Each interface gives us access to the message so that we can mutate on the inbound and/or outbound message.
For this sample sake, two mutators will be implemented:
Lets have a look at the MessageMutators Assembly Message Mutators assembly
Both interfaces are implemented in MessageMutators project. public class ValidationMessageMutator : IMessageMutator { private static readonly ILog Logger = LogManager.GetLogger("ValidationMessageMutator"); public object MutateOutgoing(object message) { ValidateDataAnnotations(message); return message; } public object MutateIncoming(object message) { ValidateDataAnnotations(message); return message; } private static void ValidateDataAnnotations(Object message) { var context = new ValidationContext(message, null, null); var results = new List<ValidationResult>(); var isValid = Validator.TryValidateObject(message, context, results, true); if (isValid) { Logger.Info("Validation succeeded for message: " + message.ToString()); return; } var errorMessage = new StringBuilder(); errorMessage.Append( string. Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine, message.ToString())); foreach (var validationResult in results) errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine); Logger.Error(errorMessage.ToString()); throw new Exception(errorMessage.ToString()); } }
ValidationMessageMutator implements the two interface methods, the
outgoing and the incoming. As can be seen in the code, both incoming and
outgoing mutaturs have the exact same code in them. The mutation is symmetrical. Lets have a look at the other message mutator, the TransportMessageCompressionMutator code: public class TransportMessageCompressionMutator : IMutateTransportMessages { private static readonly ILog Logger = LogManager.GetLogger("TransportMessageCompressionMutator"); public void MutateOutgoing(object[] messages, TransportMessage transportMessage) { Logger.Info("transportMessage.Body size before compression: " + transportMessage.Body.Length); var mStream = new MemoryStream(transportMessage.Body); var outStream = new MemoryStream(); using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress)) { mStream.CopyTo(tinyStream); } // copy the compressed buffer only after the GZipStream is disposed, // otherwise, not all the compressed message will be copied. transportMessage.Body = outStream.ToArray(); transportMessage.Headers["IWasCompressed"] = "true"; Logger.Info("transportMessage.Body size after compression: " + transportMessage.Body.Length); } public void MutateIncoming(TransportMessage transportMessage) { if (!transportMessage.Headers.ContainsKey("IWasCompressed")) return; using (var bigStream = new GZipStream(new MemoryStream(transportMessage.Body), CompressionMode.Decompress)) { var bigStreamOut = new MemoryStream(); bigStream.CopyTo(bigStreamOut); transportMessage.Body = bigStreamOut.ToArray(); } } } The TransportMessageCompressionMutator is a transport message mutator, meaning, the NServiceBus allows you to mutate the outgoing or/and incoming transport message.
In the TransportMessageCompressionMutator class, both the incoming and outgoing methods are implemented. This mutator, is acting on all transport messages, regardless of what message types the transport message carries.
The compression code is straightforward and is utilizing .Net framework
GZipStream class to do the compression. Decompression is done in the incoming method if the key "IWasCompressed" exists. If the key is missing, the message is returned unmutated. Otherwise, the incoming method is replacing the transport message Body compressed content an uncompressed one. Now all we have to do it hook those two mutators into the NServiceBus message flow. Configuring NServiceBus to use the Message mutatorsHooking the sample message mutators into NServiceBus messaging flow is done using the following code: public class HookMyMessageMutators : IWantCustomInitialization { public void Init() { Configure.Instance.Configurer.ConfigureComponent<ValidationMessageMutator>( DependencyLifecycle.InstancePerCall); Configure.Instance.Configurer.ConfigureComponent<TransportMessageCompressionMutator>( DependencyLifecycle.InstancePerCall); } } Implementing IWantCustomInitialization signals NServiceBus to call the Init method during NServiceBus initialization phase. The Init method in this sample, configures, using NServiceBus builder (dependency injection mechanism) to use ValidationMessageMutator and TransportMessageCompressionMutators. The NServiceBus framework will use those in its messaging flow. Since the HookMyMessageMutators class is defined in the MessageMutators class library assembly, it means, that you can drop it in the Client and the Server executable folder, and mutation will happen automatically. 'Just' dropping the MessageMutators assembly in the executable folder, means that that the client and server, are agnostic to the fact that message mutation is being executed and to its nature. The message mutation can be replaced, updated and removed without the client and server know about it. Client and Server Code
Since registration is done automatically by the framework, the server and client code are NServiceBus standard sending and handling a message.
Nothing special there.
Bus.Send<CreateProductCommand>(m =>
{
m.ProductId = "XJ128";
m.ProductName= "Milk";
m.ListPrice = 4;
m.SellEndDate = new DateTime(2012, 1, 3);
// 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
// before it reaches MSMQ.
m.Image = new byte[1024 * 1024 * 7];
});
Since the message Buffer field is empty, the GZipStreamer in the outgoing transport message mutator, will easily compress it to a size under the MSMQ limit of 4Mega and the message will get to the Server. The following is how the client send an invalid message, that will never reach the server since an exception will be thrown at the outgoing message mutator:
Bus.Send<CreateProductCommand>(m =>
{
m.ProductId = "XJ128";
m.ProductName = "Milk Milk Milk Milk Milk";
m.ListPrice = 15;
m.SellEndDate = new DateTime(2011, 1, 3);
// 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
// before it reaches MSMQ.
m.Image = new byte[1024 * 1024 * 7];
});
The message is invalid from a couple of reasons, the product name is over the 20 characters limit, and list price is too high and the Sell end date is not in the valid range. The thrown exception will also log those invalid values. The server code is simple and straight forward: public class Handler : IHandleMessages<CreateProductCommand> { public void Handle(CreateProductCommand createProductCommand) { Console.WriteLine("Received a CreateProductCommand message: " + createProductCommand); } } As can be seen, the server handler code does not need to change on account of the message mutation. This article was based on an article written by Adam Fyles, the original blog post can be found here. Where to go next?It might be a good idea now to cover the unobtrusive mode subject, here. |