One of the features inside Microsoft Orleans is the support for ‘Observers’. Through observers it becomes possible to send asynchronous notifications to one or more clients. It allows you to create a ‘simple’ distributed pub/sub mechanism.
I didn’t find the documentation very clear on this part, so here is a step by step guide on how to implement an Orleans observer:
Step 1 - Create the observer client
An observer is a one-way asynchronous interface that inherits from IGrainObserver
, and all its methods must be void. So let’s create this interface first:
public interface IReceiveMessage : IGrainObserver | |
{ | |
void ReceiveMessage(EventMessage message); | |
} |
Inside our client we need to create a class that implements this interface:
public class OrleansObserver : IReceiveMessage | |
{ | |
private readonly IClusterClient _clusterClient; | |
public OrleansObserver(IClusterClient clusterClient) | |
{ | |
_clusterClient = clusterClient; | |
} | |
public void ReceiveMessage(EventMessage message) | |
{ | |
_logger.LogDebug($"OrleansObserver:ReceiveMessage -> message: {message}"); | |
} | |
} |
Step 2 – Handling registrations
To handle the registrations we need a grain that can store the list of registered clients. Let’s create a grain interface that can handle the registrations. Let us also add a Publish() method that can be called to publish a message to all registered clients:
public interface ISubscriptionManager : IGrainWithIntegerKey | |
{ | |
Task Subscribe(IReceiveMessage observer); | |
Task Unsubscribe(IReceiveMessage observer); | |
Task Publish(EventMessage eventMessage); | |
} |
Next it’s time to implement the corresponding grain. Inside this grain we use another class to handle the registrations. In previous versions of Orleans you could use the built-in ObserverSubscriptionManager class, but in Orleans 2 and 3 you’ll have to create this class yourself.
Let’s do that first:
/// <summary> | |
/// Maintains a collection of grain observers. | |
/// </summary> | |
/// <typeparam name="T"> | |
/// The grain observer type. | |
/// </typeparam> | |
public class ObserverSubscriptionManager<T> : IEnumerable<T> where T : IAddressable | |
{ | |
/// <summary> | |
/// The observers. | |
/// </summary> | |
private readonly Dictionary<T, DateTime> _observers = new Dictionary<T, DateTime>(); | |
/// <summary> | |
/// Initializes a new instance of the <see cref="GrainObserverManager{T}"/> class. | |
/// </summary> | |
public ObserverSubscriptionManager() | |
{ | |
this.GetDateTime = () => DateTime.UtcNow; | |
} | |
/// <summary> | |
/// Gets or sets the delegate used to get the date and time, for expiry. | |
/// </summary> | |
public Func<DateTime> GetDateTime { get; set; } | |
internal bool IsSubscribed(T observer) | |
{ | |
return _observers.ContainsKey(observer); | |
} | |
/// <summary> | |
/// Gets or sets the expiration time span, after which observers are lazily removed. | |
/// </summary> | |
public TimeSpan ExpirationDuration { get; set; } | |
/// <summary> | |
/// Gets the number of observers. | |
/// </summary> | |
public int Count => this._observers.Count; | |
/// <summary> | |
/// Removes all observers. | |
/// </summary> | |
public void Clear() | |
{ | |
this._observers.Clear(); | |
} | |
/// <summary> | |
/// Ensures that the provided <paramref name="observer"/> is subscribed, renewing its subscription. | |
/// </summary> | |
/// <param name="observer">The observer.</param> | |
public void Subscribe(T observer) | |
{ | |
// Add or update the subscription. | |
this._observers[observer] = this.GetDateTime(); | |
} | |
/// <summary> | |
/// Ensures that the provided <paramref name="observer"/> is unsubscribed. | |
/// </summary> | |
/// <param name="observer">The observer.</param> | |
public void Unsubscribe(T observer) | |
{ | |
this._observers.Remove(observer); | |
} | |
/// <summary> | |
/// Notifies all observers. | |
/// </summary> | |
/// <param name="notification"> | |
/// The notification delegate to call on each observer. | |
/// </param> | |
/// <param name="predicate">The predicate used to select observers to notify.</param> | |
/// <returns> | |
/// A <see cref="Task"/> representing the work performed. | |
/// </returns> | |
public async Task Notify(Func<T, Task> notification, Func<T, bool> predicate = null) | |
{ | |
var now = this.GetDateTime(); | |
var defunct = default(List<T>); | |
foreach (var observer in this._observers) | |
{ | |
if (observer.Value + this.ExpirationDuration < now) | |
{ | |
// Expired observers will be removed. | |
defunct = defunct ?? new List<T>(); | |
defunct.Add(observer.Key); | |
continue; | |
} | |
// Skip observers which don't match the provided predicate. | |
if (predicate != null && !predicate(observer.Key)) | |
{ | |
continue; | |
} | |
try | |
{ | |
await notification(observer.Key); | |
} | |
catch (Exception) | |
{ | |
// Failing observers are considered defunct and will be removed.. | |
defunct = defunct ?? new List<T>(); | |
defunct.Add(observer.Key); | |
} | |
} | |
// Remove defunct observers. | |
if (defunct != default(List<T>)) | |
{ | |
foreach (var observer in defunct) | |
{ | |
this._observers.Remove(observer); | |
} | |
} | |
} | |
/// <summary> | |
/// Notifies all observers which match the provided <paramref name="predicate"/>. | |
/// </summary> | |
/// <param name="notification"> | |
/// The notification delegate to call on each observer. | |
/// </param> | |
/// <param name="predicate">The predicate used to select observers to notify.</param> | |
public void Notify(Action<T> notification, Func<T, bool> predicate = null) | |
{ | |
var now = this.GetDateTime(); | |
var defunct = default(List<T>); | |
foreach (var observer in this._observers) | |
{ | |
if (observer.Value + this.ExpirationDuration < now) | |
{ | |
// Expired observers will be removed. | |
defunct = defunct ?? new List<T>(); | |
defunct.Add(observer.Key); | |
continue; | |
} | |
// Skip observers which don't match the provided predicate. | |
if (predicate != null && !predicate(observer.Key)) | |
{ | |
continue; | |
} | |
try | |
{ | |
notification(observer.Key); | |
} | |
catch (Exception) | |
{ | |
// Failing observers are considered defunct and will be removed.. | |
defunct = defunct ?? new List<T>(); | |
defunct.Add(observer.Key); | |
} | |
} | |
// Remove defunct observers. | |
if (defunct != default(List<T>)) | |
{ | |
foreach (var observer in defunct) | |
{ | |
this._observers.Remove(observer); | |
} | |
} | |
} | |
/// <summary> | |
/// Removed all expired observers. | |
/// </summary> | |
public void ClearExpired() | |
{ | |
var now = this.GetDateTime(); | |
var defunct = default(List<T>); | |
foreach (var observer in this._observers) | |
{ | |
if (observer.Value + this.ExpirationDuration < now) | |
{ | |
// Expired observers will be removed. | |
defunct = defunct ?? new List<T>(); | |
defunct.Add(observer.Key); | |
} | |
} | |
// Remove defunct observers. | |
if (defunct != default(List<T>)) | |
{ | |
foreach (var observer in defunct) | |
{ | |
this._observers.Remove(observer); | |
} | |
} | |
} | |
/// <summary> | |
/// Returns the enumerator for all observers. | |
/// </summary> | |
/// <returns>The enumerator for all observers.</returns> | |
public IEnumerator<T> GetEnumerator() | |
{ | |
return this._observers.Keys.GetEnumerator(); | |
} | |
/// <summary> | |
/// Returns the enumerator for all observers. | |
/// </summary> | |
/// <returns>The enumerator for all observers.</returns> | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return this._observers.Keys.GetEnumerator(); | |
} | |
} |
Now we can finally create our grain implementation which is rather simple as most of the work is done by the ObserverSubscriptionManager:
The work on the server side is done!
public class SubscriptionManagerGrain : Grain, ISubscriptionManager | |
{ | |
private ObserverSubscriptionManager<IReceiveMessage> _subscriptionManager; | |
public SubscriptionManagerGrain() | |
{ | |
} | |
public override async Task OnActivateAsync() | |
{ | |
_subscriptionManager = new ObserverSubscriptionManager<IReceiveMessage>(); | |
_subscriptionManager.ExpirationDuration = TimeSpan.FromMinutes(60); | |
await base.OnActivateAsync(); | |
} | |
public Task Publish(EventMessage eventMessage) | |
{ | |
_subscriptionManager.Notify(s => s.ReceiveMessage(eventMessage)); | |
return Task.CompletedTask; | |
} | |
public Task Subscribe(IReceiveMessage observer) | |
{ | |
//Adds or updates the subscription | |
_subscriptionManager.Subscribe(observer); | |
return Task.CompletedTask; | |
} | |
public Task Unsubscribe(IReceiveMessage observer) | |
{ | |
if (_subscriptionManager.IsSubscribed(observer)) | |
{ | |
_subscriptionManager.Unsubscribe(observer); | |
} | |
return Task.CompletedTask; | |
} | |
} |
Step 3 - Link the client to the server
As a last step we need to link the observer client to our SubscriptionManagerGrain. Therefore the client needs to call a static method on the observer factory, CreateObjectReference()
, to turn the class into a grain reference, which can then be passed to the subscription method on the notifying grain.
Let’s add an Init() method to do this:
public async Task Init() | |
{ | |
_subscriptionManager = _clusterClient.GetGrain<ISubscriptionManager>(0); | |
_receiveMessageReference = await _clusterClient.CreateObjectReference<IReceiveMessage>(this); | |
await _subscriptionManager.Subscribe(_receiveMessageReference); | |
} |
We also add the possibility to unsubscribe and publish a message through the client:
public async Task Cleanup() | |
{ | |
await _subscriptionManager.Unsubscribe(_receiveMessageReference); | |
} |
public async Task Publish<T>(T @event) where T : IEvent | |
{ | |
if (_isInitialized == false) | |
await Init(); | |
var payload = JsonConvert.SerializeObject(@event); | |
_logger.LogDebug($"Publish -> payload: {payload}"); | |
await _subscriptionManager.Publish(new EventMessage() { Id = Guid.NewGuid(), Payload = payload, Type = @event.GetType().AssemblyQualifiedName }); | |
} |