- Published on
- 🍵 6 min read
Saga Implementation Using MassTransit Framework
- Authors
- Name
- Emin Vergil
- @eminvergil
Overview
- Introduction
- Roadmap to Implementing the Saga Pattern with MassTransit
- Simple scenario
- Implementation
- Resources
Introduction
TL;DR
In this blog, we covered how to implement the Saga pattern using MassTransit in .NET. When dealing with distributed transactions, we should consider solutions like the Saga pattern or Routing Slip to manage transactions involving multiple processes, rather than relying on a single process. In the case of heavy workloads, we need distributed transaction solutions to ensure resilience and reliability.
In microservices development, we often deal with complex code for handling transactions between services, typically involving verbose try-catch blocks. To ensure consistency in distributed systems, the Saga pattern offers a straightforward approach. Today, I'll walk you through implementing this pattern using MassTransit, a reliable framework for orchestrating distributed systems.
There are two types of Saga implementations in MassTransit. In this blog, we will cover Saga State Machines. You can also explore the Routing Slip if you need to route a message through a series of components.
Roadmap to Implementing the Saga Pattern with MassTransit
Here is the roadmap to implement the Saga pattern using MassTransit:
- Define events: Outline or visualize the basic structure of events you need to implement.
- Create a state machine instance.
- Define states for events.
- Define behaviors between states.
- Implement the state machine.
- Register handlers and the Saga state machine.
To define a state machine, we should implement the class MassTransitStateMachine<T>
like this:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
}
Instance
An instance contains the data for the state of the state machine.
Whenever we switch from one state to another, a new instance is created with a correlation ID.
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
States
An instance can only be in one state at a given time. The Final state is also defined for all state machines and is used to signify that the instance has reached the final state.
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Accepted { get; private set; }
}
Event
An event is something that happens, which may result in a state change. Events can modify the instance data and the current state.
public interface SubmitOrder
{
Guid OrderId { get; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
}
public Event<SubmitOrder> SubmitOrder { get; private set; }
}
Configuration
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
});
Simple scenario
Let's say we have a simple scenario for an e-commerce platform where a customer orders an item. We should implement the "happy path" but also handle rollback scenarios for any errors. For example, if a "payment rejected" event occurs, we should not reserve the stock and should also send an "order failed" event.
Events:
OrderSubmit,
StockReserved,
StockNotReserved,
PaymentConfirmed,
PaymentRejected
Here is a basic overview of the state's behaviors that we want to implement.
Here is an example of behavior for handling the "happy path" as well as errors:
During(StockReserved,
When(PaymentConfirmedEvent)
.TransitionTo(PaymentConfirmed)
.Send(OrderCompletedCommandQueue, context => new OrderCompletedCommand
{
OrderId = context.Instance.OrderId
})
.Finalize(),
When(PaymentRejectedEvent)
.TransitionTo(PaymentRejected)
.Send(OrderFailedCommandQueue, context => new OrderFailedCommand
{
OrderId = context.Instance.OrderId,
Reason = context.Data.Reason
})
.Send(CompensateStockCommandQueue, context => new CompensateStockCommand
{
Items = context.Data.Items
}));
As you can see, if we encounter an error in the PaymentConfirmed
state, we send rollback events: OrderFailedCommandQueue
and CompensateStockCommandQueue
. If no error occurs, we finalize the transaction.
This approach provides a clean method for handling distributed transactions, ensuring resiliency in case of errors. MassTransit manages persistence when dealing with Sagas, preventing them from treating every event as new. You can use various databases for persistence, such as MongoDB, Redis, or SQL.
Here is an example of using SQL for persistence:
services.AddMassTransit(cfg =>
{
cfg.AddSagaStateMachine<OrderStateMachine, OrderState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // Or use Optimistic, which requires RowVersion
r.AddDbContext<DbContext, OrderStateDbContext>((provider, builder) =>
{
builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
});
});
});
});
Implementation
Here is an example of the complete implementation for a state machine:
namespace Library.Components.StateMachines
{
using Contracts;
using MassTransit;
public sealed class BookStateMachine : MassTransitStateMachine<Book>
{
static BookStateMachine()
{
MessageContracts.Initialize();
}
public BookStateMachine()
{
InstanceState(x => x.CurrentState, Available, Reserved);
Event(() => ReservationRequested, x => x.CorrelateById(m => m.Message.BookId));
Initially(
When(Added)
.CopyDataToInstance()
.TransitionTo(Available));
During(Available,
When(ReservationRequested)
.Then(context => context.Saga.ReservationId = context.Message.ReservationId)
.PublishBookReserved()
.TransitionTo(Reserved)
);
During(Reserved,
When(ReservationRequested)
.If(context => context.Saga.ReservationId.HasValue && context.Saga.ReservationId.Value == context.Message.ReservationId,
x => x.PublishBookReserved())
);
During(Reserved,
When(BookReservationCanceled)
.TransitionTo(Available));
During(Available, Reserved,
When(BookCheckedOut)
.TransitionTo(CheckedOut)
);
}
public Event<BookAdded> Added { get; }
public Event<BookCheckedOut> BookCheckedOut { get; }
public Event<BookReservationCanceled> BookReservationCanceled { get; }
public Event<ReservationRequested> ReservationRequested { get; }
public State Available { get; }
public State Reserved { get; }
public State CheckedOut { get; }
}
public static class BookStateMachineExtensions
{
public static EventActivityBinder<Book, BookAdded> CopyDataToInstance(this EventActivityBinder<Book, BookAdded> binder)
{
return binder.Then(x =>
{
x.Saga.DateAdded = x.Message.Timestamp.Date;
x.Saga.Title = x.Message.Title;
x.Saga.Isbn = x.Message.Isbn;
});
}
public static EventActivityBinder<Book, ReservationRequested> PublishBookReserved(this EventActivityBinder<Book, ReservationRequested> binder)
{
return binder.PublishAsync(context => context.Init<BookReserved>(new
{
context.Message.ReservationId,
context.Message.MemberId,
context.Message.Duration,
context.Message.BookId,
InVar.Timestamp
}));
}
}
}
I chose MassTransit, but other frameworks like NServiceBus or Brighter can also be used.