Chronicle, czyli saga/process manager dla .NET Core

W poprzednim wpisie poruszyłem tematykę transakcji biznesowych w systemach rozproszonych, przedstawiając różne sposoby na ich projektowania i kontrolowanie. Dziś, zgodnie z zapowiedzią przedstawię kod C#, który będzie niczym innym jak implementacją hybrydową wzorca saga/oraz process manager. Bez zbędnego przedłużania, zaczynajmy!

 

Chronicle – potrzeba matką wynalazków…

Jeżeli czytałeś/aś poprzedni wpis to zapewne pamiętasz, że przyrównałem sam wzorzec do Yeti. Niby każdy coś na ten temat słyszał, ale gdy przychodzi co do czego, okazuje się, że mało kto miał okazję widzieć kod sagi na własne oczy. Podwód tego zjawiska zrozumiałem rok temu, gdy zgłębiając zagadnienia związane z tą tematyką (m.in. oglądając rózne prezentacje na YouTube) postanowiłem w końcu spróbować swoich sił z implementacją. Naturalnym ruchem było wpisanie w Google:

„saga pattern .NET Core”.

Wierz lub nie, ale był to pierwszy od dawna moment, gdzie przeszedłem na drugą stronę Google, aby znaleźć coś co skierowałoby mnie na stronę GitHub jakiegoś projektu. Wszystko na co trafiałem było bardziej implementacją wzorca workflow, a część projektów po prostu była zaimplementowana w innych językach (Ruby, Java). Dużo później bo już po publikacji kursu „Distributed ‚NET Core”, widzowie podesłali coś czego tak na prawdę szukałem tj. sagę od MassTransit. Było to jednak o jakieś 4-5 miesięcy za późno, a ja przyznam szczerze, że nie wiedziałem o istnieniu tej biblioteki (dziwi trochę jej słabe pozycjonowanie). Zatem, co mogłem zrobić chcąc zaimplementować sagę w projekcie DShop? W oparciu o wcześniej studiowane materiały (m.in. opisy Udiego Dahana) stworzyłem swoją biblioteką dla .NET Core. Jest ona oczywiście Open Source i znajduje się na GitHubie. Nazwałem ją Chronicle.

W dużym skrócie Chronicle oparty jest na podejściu, które przedstawił Udi w swoich wpisach. Nie ma tu skomplikowanego fluent API na modelowanie całego procesu, ponieważ do tego (w tym przypadku) nadaje się doskonale sam język C#. Sama saga to nic więcej niż klasa, która implementuje kilka interejsów ISagaStartAction<T> oraz ISagaAction<T>, które definiują jeden wybrany krok w procesie oraz mechanizm do ewentualnej kompensacji ów kroku. Wszystko sterowane jest poprzez zdarzenia, które wymieniane są w naszym systemie. Prosty przykład obrazujący użycie i implementację znajdziesz w samym repozytorium, a w pliku README.MD znajdziesz opis kolejnych kroków implementacyjnych.

 

Jaki proces będziemy modelować?

Omówmy po krótce jaki proces biznesowy umieścimy w środku naszej sagi. Z początku moją intencję był przykład z DShop, ale nie ukrywam, że jest on dość trywialny i mógłby z powodzeniem zostać oparty np. o choreografię zdarzeń. Zamiast tego proponuję dużo ciekawszy przykład z nowej aplikacji o nazwie Pacco. W dużym skrócie jest to system do obsługi przewozów kurierskich, ALE dość nietypowych bo mowa np. o organach ludzkich, broni lub innych „specjalnych” paczkach, które wymagają specjalnego transportu.

W systemie znajduje się kilak usług, z których każda realizuje część domeny:

  • Orders – uproszczona usługa zamówienia/koszyka zamówienia
  • Vehicles – usługa zarządzająca pojazdami
  • Parcels – usługa zarządzająca paczkami
  • Availability – usługa zarządzająca dostępnością różnych zasobów (np. pojazdów) w czasie
  • Deliveries – usługa zajmująca cyklem życia paczki podczas dostawy

Na tę chwilę możemy ograniczyć się do tych usług. W jaki sposób realizowany jest cały przebieg zamówienia?

  1. Klient tworzy paczki w systemie
  2. Klient tworzy puste zamówienie
  3. Klient przypisuje wybrane paczki do zamówienia
  4. Klient otrzymuje propozycję pojazdów do obsługi jego przewozu (np. dla broni będzie to pojazd opancerzony z sejfem)
  5. Klient wybiera pojazd i wybiera datę realizacji zamówienia
  6. Zamówienie zostaje zatwierdzone i czeka na odbiór przez kuriera

To wszystko dzieje się z poziomu UI, gdzie klient kilkoma klikami może zdefiniować swój przewóz. Aż tu nagle „byznez” wpada na kolejny ficzer w systemie! Automatyczne tworzenie zamówienia w oparciu o wybrane paczki! Po co klienta ma sam klikać i wybierać, skoro AI zrobi to lepiej i szybciej! Mamy zatem wymaganie, które ewidentnie przechodzi przez wiele usług i próba modelowania tego np. choreografią zdarzeń skończyła by się najprawdopodobniej wielkim makaronem. Co zatem zrobimy? Stowrzymy dedykowaną mikrosuługę o nazwie OrderMaker, w której to umieścimy… sage 😉

 

 

OrderMaker, czyli implementacja saga pattern dla scenariusz w Pacco

Zacznijmy od samego wpięcia Chronicle do usługi, która oparta jest oczywiście o ASP.NET Core (jeszcze 2.2, ale na dniach pojawi się pewnie 3.0). Odbywa się ona poprzez wywołanie metody na obiekcie IServiceCollection:

 


services.AddChronicle();

 

Powyższa konstrukcja dokona automatycznej rejestracji wszystkich klas, które dziedziczą po klasie Saga lub Saga<T>oraz zarejestruje dodatkowe komponenty, z których zaraz skorzystamy. Warto wspomnieć, że domyślnym trybem persystencji dla sag jest zapis do pamięci. Można ten proces nadpisać używając wybranego rozszerzenia. Póki co wsparcie posiada MongoDB, a już na review znajduje się wersja dla Redisa oraz EF Core.

Po zarejestrowaniu Chronicle możemy przejść do implementacji samej sagi:

 

public class AIMakingOrderData
    {
        public Guid OrderId { get; set; }
        public Guid CustomerId { get; set; }
        public Guid VehicleId { get; set; }
        public DateTime ReservationDate { get; set; }
        public List<Guid> ParcelIds { get; set; } = new List<Guid>();
        public List<Guid> AddedParcelIds { get; set; } = new List<Guid>();
        public bool AllPackagesAddedToOrder => AddedParcelIds.All(ParcelIds.Contains);
    }
    
    public class AIOrderMakingSaga : Saga<AIMakingOrderData>,
        ISagaStartAction<MakeOrder>,
        ISagaAction<OrderCreated>,
        ISagaAction<ParcelAddedToOrder>,
        ISagaAction<VehicleAssignedToOrder>,
        ISagaAction<OrderApproved>
    {
        private readonly IBusPublisher _publisher;
        private readonly ICorrelationContextAccessor _accessor;
        private readonly IAvailabilityServiceClient _client;

        private const string VehicleId = "d718feb1-2bc1-4a9a-8b3c-6048d89bc1ad";

        public AIOrderMakingSaga(IBusPublisher publisher, ICorrelationContextAccessor accessor,
            IAvailabilityServiceClient client)
        {
            _publisher = publisher;
            _accessor = accessor;
            _client = client;
            _accessor.CorrelationContext = new CorrelationContext
            {
                User = new CorrelationContext.UserContext()
            };
        }

        public override SagaId ResolveId(object message, ISagaContext context)
        {
            switch (message)
            {
                case MakeOrder m: return m.OrderId.ToString();
                case OrderCreated m: return m.OrderId.ToString();
                case ParcelAddedToOrder m: return m.OrderId.ToString();
                case VehicleAssignedToOrder m: return m.OrderId.ToString();
                case OrderApproved m: return m.OrderId.ToString();
            }

            return base.ResolveId(message, context);
        }

        public async Task HandleAsync(MakeOrder message, ISagaContext context)
        {            
            Data.ParcelIds.Add(message.ParcelId);
            Data.OrderId = message.OrderId;
            Data.CustomerId = message.CustomerId;
            await _publisher.SendAsync(new CreateOrder(Data.OrderId, message.CustomerId), _accessor.CorrelationContext);
        }

        public async Task HandleAsync(OrderCreated message, ISagaContext context)
        {
            var tasks = Data.ParcelIds.Select(id =>
                _publisher.SendAsync(new AddParcelToOrder(Data.OrderId, id, Data.CustomerId),
                    _accessor.CorrelationContext));

            await Task.WhenAll(tasks);
        }

        public async Task HandleAsync(ParcelAddedToOrder message, ISagaContext context)
        {
            Data.AddedParcelIds.Add(message.ParcelId);

            if (Data.AllPackagesAddedToOrder)
            {
                Data.VehicleId = true? new Guid(VehicleId) : Guid.Empty; // typical AI in startups

                var resource = await _client.GetResourceReservationsAsync(Data.VehicleId);
                var latestReservation = resource.Reservations.Any() 
                    ? resource.Reservations.OrderBy(r => r.DateTime).Last() : null;
                
                Data.ReservationDate = latestReservation?.DateTime.AddDays(1) ?? DateTime.UtcNow.AddDays(5);

                await _publisher.SendAsync(new AssignVehicleToOrder(Data.OrderId, Data.VehicleId, Data.ReservationDate),
                    _accessor.CorrelationContext);
            }
        }

        public Task HandleAsync(VehicleAssignedToOrder message, ISagaContext context)
            => _publisher.SendAsync(new ReserveResource(Data.VehicleId, Data.ReservationDate, 9999, Data.CustomerId),
                _accessor.CorrelationContext);

        public Task HandleAsync(OrderApproved message, ISagaContext context)
            => CompleteAsync();

        public Task CompensateAsync(MakeOrder message, ISagaContext context)
            => Task.CompletedTask;
        
        public Task CompensateAsync(OrderCreated message, ISagaContext context)
            => Task.CompletedTask;

        public Task CompensateAsync(ParcelAddedToOrder message, ISagaContext context)
            => _publisher.SendAsync(new CancelOrder(message.OrderId, "Because I'm saga"), 
                _accessor.CorrelationContext);

        public Task CompensateAsync(VehicleAssignedToOrder message, ISagaContext context)
            => Task.CompletedTask;
        
        public Task CompensateAsync(OrderApproved message, ISagaContext context)
            => Task.CompletedTask;

    }

 

Zacznijmy od samej góry czyli klasy AIMakingOrderData. Jest to nic innego jak „pojemnik” na dane, które razem ze stanem sagi zostaje zapisany do wybranego źródła danych. W naszym przypadku potrzebujemy kilku pól, które stopniową będą uzupełniane i wykorzystywane w kolejnych krokach procesu. Zwróć uwagę, że wykorzystana została generyczna wersja klasy Saga co oznacza, że nasz pojemnik na dane będzie dostępny po właściwością o nazwie Data. Następnie pojawia się kilka interfejsów, o których już wcześniej wspomniałem czyli:

  • ISagaStartAction<T> – oznacza, że wiadomość T rozpoczyna cały proces. Implementuje dwie metody: HandleAsync (1 krok w procesie) oraz CompensateAsync (kompensacja kroku)
  • ISagaAction<T> – oznacza, że wiadomość T jest N-tym krokiem w procesie. Implementuje dwie metody: HandleAsync (krok w procesie) oraz CompensateAsync (kompensacja kroku)

Proste? Proste. W naszym przypadku wiadomością, która inicjalizuje proces jest komenda o nazwie MakeOrder:

 


    public class MakeOrder : ICommand
    {
        public Guid OrderId { get; }
        public Guid CustomerId { get; }
        public Guid ParcelId { get; }

        public MakeOrder(Guid orderId, Guid customerId, Guid parcelId)
        {
            OrderId = orderId;
            CustomerId = customerId;
            ParcelId = parcelId;
        }
    }

 

Dla uproszczenia zawiera ona jedynie Id jednej paczki, a nie wielu. Kolejny ważny element sagi to metoda ResolveId. Jest celem jest określenie co jest identyfikatorem konkrentej sagi podczas otrzymania wiadomości. Może przecież dojśc do sytuacji, w której dwóch klientów jednocześnie uruchomi nasz automatyczny ficzer kompletowania zamówienia. Wówczas po otrzymaniu wiadomości musimy wiedzieć w ramach jakiej „instancji” sagi będziemy procesować krok. W tym przypadku identyfikatorem będzie identyfikator zamówienia. Jest on unikalny więc nie dojdzie do sytuacji, w której klient A ingeruje w proces klienta B. Dalej mamy już konkretne implementacje kroków procesu. Przejdźmy je po kolei:

 

  1. Po otrzymaniu komendy MakeOrder przypisujemy potrzebne informacje do Data, a następnie publikujemy komendę do usługi zamówień, która w powinna utworzyć puste zamówienie dla wskazanego klienta.
  2. Po utworzeniu zamówienia usługa Orders publikuje zdarzenie integracyjne o nazwie OrderCreated. Jest to dla nas znak, że możemy przejść do kroku drugiego. W nim, dla każdego identyfikatora paczki z tablicy, którą przypisaliśmy w 1 kroku (ParcelIds) publikujemy komendę (znów do usługi orders) o nazwie AddParcelToOrder. W naszym przypadku, ponieważ komenda posiadała jedynie jedno ID, a nie tablicę de facto zostanie opublikowana tylko jedna komenda.
  3. Każdorazowe przypisanie paczki do konkretnego zamówienia skutkuje opublikowaniem zdarzenia integracyjnego ParcelAddedToOrder. Mamy tutaj przykład rozgałęzienia procesu. Nie chcemy iść dalej do momentu gdy wszystkie paczki nie zostaną przypisane. Rozwiązaniem tego problemu jest… prosty if. Dodajemy na samym początku identyfikator z przypisanej paczki do kolekcji AddedParcelIds, a następnie sprawdzamy czy wszystkie zdarzenia do nas dotarły. Jeśli nie, po prostu kończymy krok i czekamy na ponowne wywołanie….i tak aż do skutku. Jeżeli paczki zostały pomyślnie dodane do zamówienia możemy przypisać pojazd. Normalnie wystąpiłaby tu jakaś niesamowita heurystyka typu. najbliżej klienta, najtańsze, najszybsze itp. ale aby nie komplikować kodu w tym przykładzie „zwycięzca” jest zahardcodowany. Po jego wybraniu wykonujemy żądnie HTTP do usługi dostępności, aby otrzymać listę rezerwacji zasobu. Do pola ReservationDate przypisujemy „obliczoną” datę dostawy i publikujemy komendę AssignVehivleToOrder.
  4. Gdy pojazd został przypisany do zamówienia można już dokonać rezerwacji w usłudze dostępności poprzez wysłanie komendy ReserveResource.
  5. Gdy pojazd zostanie zarezerwowany usługa zamówień nasłuchująca na ten fakt zatwierdza zamówienie i publikuje zdarzenie OrderApproved. Po otrzymaniu go w sadze możemy oznaczyć, że saga dokonała się poprzez wywołanie metody CompleteAsync().

 

Warto zwrócić uwagę, że zaimplementowana została jedna metoda kompensacyjna, która anuluje zamówienie. Oczywiście można by pokusić się o implementację kompensacji dla każdego kroku, ale znów… dla celów szkoleniowych jest to wystarczające rozwiązanie. Nie wiemy jednak kiedy ów kompensacja następuje. Co do zasady kompensacja polega na wywoływaniu metod CompensateAsync() dla każdej wiadomości od tyłu tj. od ostatniego kroku do pierwszego. W tym przypadku wywołałyby się kroki kolejno dla:

  1. OrderApproved
  2. VehicleAssignedToOrder
  3. ParcelAddedToOrder
  4. OrderCreated
  5. MakeOrder

Dzięki temu jesteśmy w stanie wycofywać zmiany w systemie bez problemów w stylu „nie można usunąć zamówienia, póki nie usuniesz rezerwacji pojazdu”. Wydaje się to dość naturalne. Samo wywołanie kompensacji zachodzi w 2 przypadkach:

  1. Jeżeli podczas procesowania dowolnego kroku zostanie rzucony wyjątek, który nie zostanie ówcześnie złapany.
  2. Jeżeli jawnie wywołamy metodę Reject lub RejectAsync(). W naszym przypadku moglibyśmy np. nasłuchiwać na zdarzenia typu Rejected (pisałem o nich przy okazji opisu procesowania operacji asynchronicznych) i w ramach kroku wywoływać jawnie kompensację.

 

Pozostaje jednak jedna kwestia, która być może chodzi Ci po głowie od kiedy pojawił się kod. W jaki sposób wiadomość z kolejki trafia do sagi? Publikowanie jest jasne, ponieważ wstrzyknięty został publisher,  ale subskrypcja?! Już śpieszę wyjaśniać. Oczywiście sama usługa OrderMaker posiada odpowiednie subskrypcje do RabbitMQ:

 


  app
                .UseErrorHandler()
                .UseConsul()
                .UseMetrics()
                .UseRabbitMq()
                .SubscribeEvent<OrderApproved>()
                .SubscribeEvent<OrderCreated>()
                .SubscribeEvent<ParcelAddedToOrder>()
                .SubscribeEvent<ResourceReserved>()
                .SubscribeEvent<VehicleAssignedToOrder>();

 

Następnie utworzyłem jeden współdzielony event handler, do którego wstrzyknąłem ISagaCoordinatora:

 


public class AIOrderMakingHandler : 
        ICommandHandler<MakeOrder>, 
        IEventHandler<OrderApproved>, 
        IEventHandler<OrderCreated>, 
        IEventHandler<ParcelAddedToOrder>,
        IEventHandler<VehicleAssignedToOrder>,
        IEventHandler<ResourceReserved>
    {
        private readonly ISagaCoordinator _coordinator;

        public AIOrderMakingHandler(ISagaCoordinator coordinator)
        {
            _coordinator = coordinator;
        }
        
        public Task HandleAsync(MakeOrder command)
            => _coordinator.ProcessAsync(command, SagaContext.Empty);

        public Task HandleAsync(OrderApproved @event)
            => _coordinator.ProcessAsync(@event, SagaContext.Empty);

        public Task HandleAsync(OrderCreated @event)
            => _coordinator.ProcessAsync(@event, SagaContext.Empty);
        
        public Task HandleAsync(ParcelAddedToOrder @event)
            => _coordinator.ProcessAsync(@event, SagaContext.Empty);
        
        public Task HandleAsync(VehicleAssignedToOrder @event)
            => _coordinator.ProcessAsync(@event, SagaContext.Empty);

        public Task HandleAsync(ResourceReserved @event)
            => _coordinator.ProcessAsync(@event, SagaContext.Empty);
    }

 

Koordynator to klasa dostarczana przez Chronicle. Po wywołaniu metody ProcessAsync dzieje się następująca rzecz:

  1. Wyszukiwane są wszystkie sagi, które implementują ISagaStartAction<T> bądź ISagaAction<T> dla zadanego typu wiadomości. Mówiąc prościej, wyszukiwane są sagi, które w swoim procesie używają danej wiadomości. Może być ich od 0-N, ponieważ jedna wiadomość może być np. pierwszym krokiem procesu, a w innej sadze jest to krok ostatni.
  2. Na otrzymanym zbiorze sag jest wywołane odtworzenie stanu i zapisanych danych ze wskazanego źródła.
  3. Wywołana zostaje metoda HandleAsync(), która realizuje konkretny krok procesu.
  4. Zmiany zostają zapisane ponownie do wskazanego źródła.
  5. Jeżeli rzucony został wyjątek lub wywołany Reject następuje kompensacja.

Mamy więc wszystkie elementy układanki. Usługa nasłuchuje i publikuje wiadomości, a wszystkim koordynuje jedna klasa, która zawiera kompletny proces biznesowy. W tym miejscu warto wspomnieć, że do idealnej z punktu widzenia transakcyjności, implementacji brakuje jeszcze wzorca outbox, który zapobiegałby sytuacji w której opublikujemy z sagi wiadomość, a nie uda nam się np. zapisać stanu sagi do źródła danych. Nie jest to jednak przedmiotem tego wpisu, a o samym outboxie pojawi się zapewne osobny wpis 😉

 

Na dziś to tyle. Cały kod usługi jest dostępny na GitHubie. Zachęcam również do zapoznania się z samym Chronicle i zostawieniem gwiazdki 😀 W razie pytań i problemów nie zawahaj się użyć Issues 😉

 


 

 

Jeżeli interesuje Cię tematyka projektowania/implementacji/wdrażania i utrzymywania mikrousług to zapraszam Cię serdecznie do odwiedzenia strony  https://mikroserwisy.net Jest to nasz kompleksowy kurs online wprowadzający w świat nowoczesnej architektury mikroserwisów z wykorzystaniem metodyki Event Storming oraz najpopularniejszych technologii takich jak:

.NET Core, Docker, Kubernetes, Istio Service Mesh …i wiele innych!

You may also like...