Published on
🍵 6 min read

Saga Implementation Using MassTransit Framework

Authors

Photo

Overview

Introduction

TL;DR

In this blog, we covered how to implement the Saga pattern using MassTransit in .NET. When dealing with distributed transactions, we should consider solutions like the Saga pattern or Routing Slip to manage transactions involving multiple processes, rather than relying on a single process. In the case of heavy workloads, we need distributed transaction solutions to ensure resilience and reliability.

In microservices development, we often deal with complex code for handling transactions between services, typically involving verbose try-catch blocks. To ensure consistency in distributed systems, the Saga pattern offers a straightforward approach. Today, I'll walk you through implementing this pattern using MassTransit, a reliable framework for orchestrating distributed systems.

There are two types of Saga implementations in MassTransit. In this blog, we will cover Saga State Machines. You can also explore the Routing Slip if you need to route a message through a series of components.

Roadmap to Implementing the Saga Pattern with MassTransit

Here is the roadmap to implement the Saga pattern using MassTransit:

  • Define events: Outline or visualize the basic structure of events you need to implement.
  • Create a state machine instance.
  • Define states for events.
  • Define behaviors between states.
  • Implement the state machine.
  • Register handlers and the Saga state machine.

To define a state machine, we should implement the class MassTransitStateMachine<T> like this:

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
}

Instance

An instance contains the data for the state of the state machine.

Whenever we switch from one state to another, a new instance is created with a correlation ID.

public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
}

States

An instance can only be in one state at a given time. The Final state is also defined for all state machines and is used to signify that the instance has reached the final state.

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
}

Event

An event is something that happens, which may result in a state change. Events can modify the instance data and the current state.

public interface SubmitOrder
{
    Guid OrderId { get; }
}

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

Configuration

services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();
});

Simple scenario

Let's say we have a simple scenario for an e-commerce platform where a customer orders an item. We should implement the "happy path" but also handle rollback scenarios for any errors. For example, if a "payment rejected" event occurs, we should not reserve the stock and should also send an "order failed" event.

Events:

OrderSubmit,
StockReserved,
StockNotReserved,
PaymentConfirmed,
PaymentRejected

Here is a basic overview of the state's behaviors that we want to implement.

saga

Here is an example of behavior for handling the "happy path" as well as errors:

During(StockReserved,
    When(PaymentConfirmedEvent)
        .TransitionTo(PaymentConfirmed)
        .Send(OrderCompletedCommandQueue, context => new OrderCompletedCommand
        {
            OrderId = context.Instance.OrderId
        })
        .Finalize(),
    When(PaymentRejectedEvent)
        .TransitionTo(PaymentRejected)
        .Send(OrderFailedCommandQueue, context => new OrderFailedCommand
        {
            OrderId = context.Instance.OrderId,
            Reason = context.Data.Reason
        })
        .Send(CompensateStockCommandQueue, context => new CompensateStockCommand
        {
            Items = context.Data.Items
        }));

As you can see, if we encounter an error in the PaymentConfirmed state, we send rollback events: OrderFailedCommandQueue and CompensateStockCommandQueue. If no error occurs, we finalize the transaction.

This approach provides a clean method for handling distributed transactions, ensuring resiliency in case of errors. MassTransit manages persistence when dealing with Sagas, preventing them from treating every event as new. You can use various databases for persistence, such as MongoDB, Redis, or SQL.

Here is an example of using SQL for persistence:

services.AddMassTransit(cfg =>
{
    cfg.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // Or use Optimistic, which requires RowVersion

            r.AddDbContext<DbContext, OrderStateDbContext>((provider, builder) =>
            {
                builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                });
            });
        });
});

Implementation

Here is an example of the complete implementation for a state machine:

namespace Library.Components.StateMachines
{
    using Contracts;
    using MassTransit;

    public sealed class BookStateMachine : MassTransitStateMachine<Book>
    {
        static BookStateMachine()
        {
            MessageContracts.Initialize();
        }

        public BookStateMachine()
        {
            InstanceState(x => x.CurrentState, Available, Reserved);

            Event(() => ReservationRequested, x => x.CorrelateById(m => m.Message.BookId));

            Initially(
                When(Added)
                    .CopyDataToInstance()
                    .TransitionTo(Available));

            During(Available,
                When(ReservationRequested)
                    .Then(context => context.Saga.ReservationId = context.Message.ReservationId)
                    .PublishBookReserved()
                    .TransitionTo(Reserved)
            );

            During(Reserved,
                When(ReservationRequested)
                    .If(context => context.Saga.ReservationId.HasValue && context.Saga.ReservationId.Value == context.Message.ReservationId,
                        x => x.PublishBookReserved())
            );

            During(Reserved,
                When(BookReservationCanceled)
                    .TransitionTo(Available));

            During(Available, Reserved,
                When(BookCheckedOut)
                    .TransitionTo(CheckedOut)
            );
        }

        public Event<BookAdded> Added { get; }
        public Event<BookCheckedOut> BookCheckedOut { get; }
        public Event<BookReservationCanceled> BookReservationCanceled { get; }
        public Event<ReservationRequested> ReservationRequested { get; }

        public State Available { get; }
        public State Reserved { get; }
        public State CheckedOut { get; }
    }

    public static class BookStateMachineExtensions
    {
        public static EventActivityBinder<Book, BookAdded> CopyDataToInstance(this EventActivityBinder<Book, BookAdded> binder)
        {
            return binder.Then(x =>
            {
                x.Saga.DateAdded = x.Message.Timestamp.Date;
                x.Saga.Title = x.Message.Title;
                x.Saga.Isbn = x.Message.Isbn;
            });
        }

        public static EventActivityBinder<Book, ReservationRequested> PublishBookReserved(this EventActivityBinder<Book, ReservationRequested> binder)
        {
            return binder.PublishAsync(context => context.Init<BookReserved>(new
            {
                context.Message.ReservationId,
                context.Message.MemberId,
                context.Message.Duration,
                context.Message.BookId,
                InVar.Timestamp
            }));
        }
    }
}

I chose MassTransit, but other frameworks like NServiceBus or Brighter can also be used.

Frequently Asked Questions


Resources