CQRS i mikroserwisy…i async: jak poinformować użytkownika o stanie przetwarzanej operacji?

Dziś odpowiedź na zdecydowanie najczęściej zadawane pytanie w kontekście naszego projektu DShop. Brzmi ono „w jaki sposób użytkownik końcowy aplikacji wie czy jego komenda została (i kiedy została) przetworzona?”. Na pierwszy rzut oka odpowiedź na to pytanie może wydawać się trywialna, prawda? Wszak większość typowych aplikacji webowych po wykonaniu jakiejś akcji informuje nas o jej powodzeniu lub błędzie poprzez np. toastry czy alerty. Możemy zatem przypuszczać się, że nie jest to nic skomplikowanego, a dodatkowo będąc programistami spodziewamy się, że wszystko oparte jest zapewne o jakiś rezultat/flagę, która zwracana jest z backendu. I owszem, taki schemat doskonale sprawdza się w typowych aplikacjach webowych, które zazwyczaj oparte są o znaną dobrze architekturę trójwarstwową osadzoną w monolicie. W przypadku naszego projektu był tylko jeden problem. Nie był on typowym przykładem aplikacji webowej.

 

Co jest nie tak z DShopem?

Zanim przejdziemy dalej chciałbym nieco rozjaśnić jakie założenia/podejścia przyjęte w projekcie sprawiły, że tak trywialne zadanie okazało się relatywnie wymagające do zaimplementowania:

  • CQRS – nadal uważam, że dzięki temu podejściu nasz kod jest łatwo rozszerzalny, czytelny i  niesie za sobą kilka innych, ciekawych benefitów. Jednakże, podążanie za nim w 100% skomplikowało znacznie sprawę w kwestii rezultatów procesowania komend. Powód jest bardzo prosty. Z definicji podczas pisania do aplikacji (patrz przetwarzania komendy) nie zwracamy żadnych danych. Przekładając to na kod typem zwracanym jest void lub Task. Nie możemy zatem ot tak zwrócić jakiegoś IResult bądź prostego bool-a.
  • Asynchroniczna komunikacja – kolejnym problem również wynika bezpośrednio z przyjętych założeń projektowych tj. komendy z API do konkretnej usługi są „transportowane” poprzez kolejkę (w naszym przypadku RabbitMQ) w modelu publish/subscribe. Oznacza to, że każda usługa subskrybuje się pod interesujące dla niej (bardziej dla jej domeny) wiadomości i kiedy takowa się pojawi, zostaje „ściągnięta” i przetworzona. Problem tego modelu znów opera się na braku wiadomości zwrotnej do API, ponieważ jedyne co otrzymamy to ACK od konsumenta. Jest to więc bardziej potwierdzenie na poziomie infrastruktury niż na poziomie domeny więc dla użytkownika końcowego jest to informacja bezwartościowa.
  • Rozproszenie domeny –  ostatni problem jest implikacją architektury mikroserwisowej, a mianowicie każda usługa zarządza swoim małym, niezależnym, fragmentem domeny. Pytanie w jaki sposób koordynować cały proces i dodatkowo informować użytkownika o stanie jego operacji, jeśli składa się ona z N kroków, z których każdy jest obsługiwany przez inną usługę?

Nad ostatnim punktem porozwodzę się intensywnie w osobnym wpisie, ponieważ jest to iście interesujący problem!

 

Rozwiązanie problemu w DShop

Zanim przedstawię rozwiązanie dzisiejszego problemu chciałbym nadmienić jedną rzecz. Na tym etapie „serii o DShop” zakładam, że znasz już mniej więcej architekturę i założenia, które przyświecały nam podczas implementacji systemu. Jest to o tyle istotne, że dzisiejszy kod będzie niejednokrotnie używał komponentów, które opisywałem w poprzednich artykułach dlatego gorąco zachęcam Ciebie do ich przeczytania nim przejdziesz dalej 😉

Jeżeli jesteś gotowy/a możemy zaczynać! Standardowo najpierw przedstawię schemat konceptu, który następnie skrupulatnie przeanalizujemy:

 

 

 

Wszystko oczywiście rozpoczyna się od żądania HTTP, który poprzez UI wykonuje użytkownik aplikacji. Zostaje ono przechwycone przez API Gateway (a dokładniej jedną z akcji MVC) po czym z body zostaje zdeserializowana komenda, która trafia na kolejkę. Jak dotąd nic czego nie opisałbym na blogu. Zwróć jednak uwagę, że powyższy schemat oprócz komendy (w tym przypadku CreateProduct) wyróżnia także metadaną samej wiadomości, o nazwie correlationId. W naszym przypadku jest to GUID generowany w API bezpośrednio przed umieszczeniem wiadomości w exchange. Pytanie do czego jest to potrzebne? Jak sugeruje nazwa, correlationId służy do korelowania wiadomości wymienianych pomiędzy usługami z konkretną operacją jak np. dodanie produktu na stronie. Odnosząc się do rzeczywistości moglibyśmy porównać to do numeru przesyłki kurierskiej, dzięki któremu widzimy co dzieje się z naszą paczką. Jak zapewne się domyślasz correlationId musi być zatem immutable, ponieważ identyfikator musi być cały czas ten sam tj. od momentu jego utworzenia w API, aż do wykonania ostatniej akcji w ramach procesu. Inaczej nie miało by to sensu 😉 Oczywiście sam identyfikator to za mało, potrzebujemy w jakiś sposób poinformować użytkownika o tym, że operacja o danym correlationId się np. zakończyła. Tu do  akcji wkraczają kolejna dwie mikrousługi.

Pierwsza to Operations service, która subskrybuje się pod wszystkie wiadomości w systemie tj. komendy i zdarzenia. Odebranie komendy skutkuje utworzeniem obiektu operacji (o id = correlationId i statusie pending),  który zostaje umieszczony w lokalnej bazie danych. Odebranie zdarzenia skutkuje zmianą statusu operacji na:

  • jeśli zdarzenie implementuje interfejs IEventcompleted
  • jeśli zdarzenie implementuje interfejs IRejectedEventrejected

Mały update: Celowo zostawiłem powyższy fragment mimo iż wykryłem to przed publikacją, ale… powyższe rozwiązanie zostało uproszczone. Odebranie komendy nie skutkuje utworzeniem obiektu operacji. Po odebraniu zdarzenia operacja dodawana jest do bazy ze statusem completed lub rejected. Powód zmiany był dość prosty. Ponieważ wymiana wiadomości odbywa się w sposób asynchroniczny przez kolejki mogło dojść do sytuacji gdy zdarzenie przyszłoby przed komendą. Oczywiście można było to obejść ifami sprawdzającymi czy zapisana została już operacja o podanym Id, ale… tak jest prościej ponieważ w większości sytuacji informacja na frontendzie o tym, że dany proces trwa nie jest istotna. Ważne jest to czy i kiedy się zakończyła. Wracamy do wpisu.

Mamy zatem kolejny fragment infrastruktury, dzięki któremu jesteśmy w stanie stwierdzić jaki jest stan konkretnej operacji znając jej identyfikator. ALE… zanim przejdziemy dalej warto dopowiedzieć jedną kwestię, aby ubiec potencjalne pytania na ten temat. Być może zwróciłeś/aś uwagę, że powyższa logika działa wyłącznie dla prostych przypadków tj. takich, które de facto składają się z jednego kroku. Przykładowo utworzenie produktu jest jednym krokiem i może się udać lub nie. Nie ma innego stanu. Co jednak w przypadku gdy po utworzeniu ów produktu chcielibyśmy jeszcze np. wysłać mailing z nową ofertą? W tym przypadku otrzymując zdarzenie ProductCreated nie możemy zapisać operacji o statusie completed, ponieważ wykonaliśmy dopiero połowę operacji/procesu biznesowego. No dobrze, to może podążając konwencją warto wprowadzić jeszcze jeden interfejs jak np. IPendingEvent? Jeżeli zdarzenie byłoby tego typu, oznaczałoby to, że obiekt operacji nie powienien zostać jeszcze utworzony. Pojawia się jednak kolejny problem, ponieważ nic nie stoi na przeszkodzie, aby jedno zdarzenie było częścią kilku procesów biznesowych! Przykładowo utworzenie produktu mogłoby być pierwszym z dwóch kroków w procesie z mailingiem, ale oprócz tego mogłoby być np. ostatnim krokiem w szkoleniu administratorów aplikacji. W takim wypadku usługa operacji nie miałaby pojęcia czy w danym kontekście otrzymana wiadomość sygnalizuje zakończenie całego flow czy jedynie jednego z kroków. Mam nadzieję, że jest to jasne 😉 Jest to jak pisałem we wstępie pochodna rozproszenia domeny, w związku czym proces opiszę skrupulatnie w osobnym wpisie. Dziś przypadek jednokrokowy, który realnie jest najczęściej występującym.

Wróćmy do naszego flow. Po utworzeniu obiektu operacji, Operations service publikuje do RabbitMQ jedno z dwóch zdarzeń: OperationCompleted oraz OperationRejected. Pod wiadomości subskrybuje się ostatnia w całym łańcuszku usługa – SignalR service. Jej zadanie jest bardzo proste, po otrzymaniu wiadomości komunikuje się ona z aplikacją kliencką używając Web Socketów. Jest to więc model komunikacyjny push, w którym to backend publikuje wiadomość kiedy jest taka potrzeba, zamiast klasycznego modelu pull, w którym to klient był zobligowany do periodycznego pytania o ewentualne nowe wiadomości. Pytanie skąd frontend wie, która operacja ma jakie correlationId, aby móc później zinterpretować wiadomość WS z SignalR service? Jak zapewne pamiętasz po umieszczeniu wiadomości w RabbitMQ użytkownikowi zwrócona zostaje odpowiedź HTTP 202 (Accepted). To co pomijałem do tej pory to fakt, że ów zwrotka posiada niestandardowy nagłówek, w którym to znajduje się…correlationId!

Domyślam się, że cały proces może wydawać się skomplikowany, dlatego przed przejściem do samego kodu przedstawmy kolejne kroki na przykładzie dodania produktu:

  1. Użytkwonik poprzez UI tworzy komendę CreateProduct, która zostaje umieszczone w body żądania HTTP
  2. Akcja MVC (poprzez routing) odbiera obiekt komendy
  3. W API Gateway wygenerowany zostaje correlationId, który dołączony zostaje do koemndy jako metadana
  4. API Gateway publikuje wiadomość w kolejce
  5. API Gateway zwraca do frontendu odpowiedźHTTP z kodem 202, która zawiera nagłówek z identyfikatorem
  6. Komenda CreateProduct zostaje odebrana przez usługę produktów
  7. Usługa produktów dodaje produkt do bazy danych
  8. Usługa produktów publikuje zdarzenie ProductCreated (które implementuje interfejs IEvent)
  9. Usługa operacji odbiera zdarzenie i tworzy obiekt operacji ze statusem completed
  10. Usługa operacji publikuje zdarzenie OperationCompleted
  11.  Usługa SignalR otrzymuje zdarzenie i wysyła wiadomość do frontendu z informacją o pomyślnym zakończeniu operacji

 

Implementacja mechanizmu

Na wstępie tej części chciałbym zaznaczyć, że pokażę jedynie kluczowe punkty w kodzie. W przeciwnym przypadku ten wpis byłby zdecydowanie za długi, a i fragmenty na pewno powieliłaby się względem poprzednich części tej „serii”. Zacznijmy od API Gateway, a dokładniej metody SendAsync, która to odpowiada za umieszczenie wiadomości w RabbitMQ:

 


        protected async Task<IActionResult> SendAsync<T>(T command, 
            Guid? resourceId = null, string resource = "") where T : ICommand 
        {
            var context = GetContext<T>(resourceId, resource);
            await _busPublisher.SendAsync(command, context);

            return Accepted(context);
        }

 

Jak widzisz dzieje się dokładnie to co opisałem. Utworzony zostaje correlation context (zawierający correlationId), który zostaje dołączony jako metadana wiadomości. Następnie zwracana zostaje odpowiedź HTTP 202 tj. Accepted. Zobaczmy definicję samego kontekstu jak i metody, która odpowiada za jego utworzenie:

 


    public class CorrelationContext : ICorrelationContext
    {
        public Guid Id { get; }
        public Guid UserId { get; }
        public Guid ResourceId { get; }
        public string TraceId { get; }
        public string SpanContext { get; }
        public string ConnectionId { get; }
        public string Name { get; }
        public string Origin { get; }
        public string Resource { get; }
        public string Culture { get; }
        public int Retries { get; set; }
        public DateTime CreatedAt { get; }

        //dalej mamy statyczne metody tworzące obiekt
    }

        protected ICorrelationContext GetContext<T>(Guid? resourceId = null, string resource = "") where T : ICommand
        {
            if (!string.IsNullOrWhiteSpace(resource))
            {
                resource = $"{resource}/{resourceId}";
            }

            return CorrelationContext.Create<T>(Guid.NewGuid(), UserId, resourceId ?? Guid.Empty, 
               HttpContext.TraceIdentifier, HttpContext.Connection.Id, _tracer.ActiveSpan.Context.ToString(),
               Request.Path.ToString(), Culture, resource);
        }

 

Jak widzisz poza samym identyfikatorem, kontekst posiada wiele innych informacji, które mogą okazać się przydatne podczas przetwarzania wiadomości. Część z nich zostanie omówiona przy okazji omawiania kolejnych składowych systemu więc bez obaw 😉 Sama metoda GetContext nie robi nic innego jak uzupełnia te dane i zwraca nowo utworzony obiekt kontekstu. Przejdź do metody Accepted:

 


        protected IActionResult Accepted(ICorrelationContext context)
        {
            Response.Headers.Add(OperationHeader, $"operations/{context.Id}");
            if (!string.IsNullOrWhiteSpace(context.Resource))
            {
                Response.Headers.Add(ResourceHeader, context.Resource);
            }

            return base.Accepted();
        }

 

Tu dzieje się cała „magia”. Do odpowiedzi dodany zostaje nagłówek „X-Operation”, który zawiera identyfikator (a w zasadzie gotowy link do zasobu alla HATEOS) operacji.

Warto wspomnieć także w jaki sposób CorrelationContext jest publikowany wraz z wiadomością. Do obsługi tego mechanizmu potrzebowaliśmy dodać dwie paczki RawRabbit:

  • RawRabbit.Enrichers.MessageContext
  • RawRabbit.Enrichers.MessageContext.Subscribe

Następnie podczas konfiguracji IInstanceFactory w sekcji Plugins dopięliśmy nas†ępujący kod:

 


 .UseMessageContext<CorrelationContext>()
 .UseContextForwarding()

 

Samo publikowanie i subskrybowanie się wygląda natomiast w ten sposób:

 


        public async Task SendAsync<TCommand>(TCommand command, ICorrelationContext context)
            where TCommand : ICommand
            => await _busClient.PublishAsync(command, ctx => ctx.UseMessageContext(context)
                .UsePublishConfiguration(p => p.WithRoutingKey(GetRoutingKey(@command))));

        public IBusSubscriber SubscribeCommand<TCommand>(string @namespace = null, string queueName = null,
            Func<TCommand, DShopException, IRejectedEvent> onError = null)
            where TCommand : ICommand
        {
            _busClient.SubscribeAsync<TCommand, CorrelationContext>(async (command, correlationContext) =>
                {
                    /...
                },
                ctx => ctx.UseSubscribeConfiguration(cfg =>
                    cfg.FromDeclaredQueue(q => q.WithName(GetQueueName<TCommand>(@namespace, queueName)))));
            return this;
        }

 

Jest to więc stosunkowo proste gdyż sprowadza się to do wywołania metody UseMessageContext ww przypadku publikowania i rozszerzenia lambdy o parametr kontekstu w przypadku subskrybcji.

Idźmy dalej! Po odebraniu komendy i jej obsłużeniu publikowane zostaje zdarzenie, które posiada dokładnie ten sam CorrelationContext co komenda. Zobaczmy to na przykładzie metody HandleAsyncCreateProductHandler:

 








<pre>public async Task HandleAsync(CreateProduct command, ICorrelationContext context)
{
   //logika i zapis do MongoDB

    await _busPublisher.PublishAsync(new ProductCreated(command.Id, command.Name,
        command.Description, command.Vendor, command.Price, command.Quantity), context);
}</pre>

 

Zwróć uwagę, że w środku metody HandleAsync mamy dostęp do kontekstu (w trybie read-only) i który wykorzystujemy przy publikacji zdarzenia celem zaznaczenia, że jest ono częścią operacji tworzenia nowego produktu. Pod spodem metoda PublishAsync wykorzystuje dokładnie ten sam mechanizm rejestracji kontekstu co metoda SendAsync przedstawiona powyżej.

Przejdźmy do usługi operacji zaczynając od jej mechanizmu subskrybcji pod wszystkie wiadomości:

 


    public static class Subscriptions
    {
        private static readonly Assembly MessagesAssembly = typeof(Subscriptions).Assembly;

        private static readonly ISet<Type> ExcludedMessages = new HashSet<Type>(new[]
        {
            typeof(OperationCompleted),
            typeof(OperationRejected)
        });

        public static IBusSubscriber SubscribeAllMessages(this IBusSubscriber subscriber)
            => subscriber.SubscribeAllCommands().SubscribeAllEvents();

        private static IBusSubscriber SubscribeAllCommands(this IBusSubscriber subscriber)
            => subscriber.SubscribeAllMessages<ICommand>(nameof(IBusSubscriber.SubscribeCommand));

        private static IBusSubscriber SubscribeAllEvents(this IBusSubscriber subscriber)
            => subscriber.SubscribeAllMessages<IEvent>(nameof(IBusSubscriber.SubscribeEvent));

        private static IBusSubscriber SubscribeAllMessages<TMessage>
            (this IBusSubscriber subscriber, string subscribeMethod)
        {
            var messageTypes = MessagesAssembly
                .GetTypes()
                .Where(t => t.IsClass && typeof(TMessage).IsAssignableFrom(t))
                .Where(t => !ExcludedMessages.Contains(t))
                .ToList();

            messageTypes.ForEach(mt => subscriber.GetType()
                .GetMethod(subscribeMethod)
                .MakeGenericMethod(mt)
                .Invoke(subscriber,
                    new object[] {mt.GetCustomAttribute<MessageNamespaceAttribute>()?.Namespace, null, null}));

            return subscriber;
        }
    }

 

Jak widzisz wszystko jest znów upakowane w metody rozszerzające. Powyższy kod może wyglądać strasznie, ale de facto jego działanie jest proste. Metoda SubscribeAllMessages w zależności od tego jak została wywołana pobiera wszystkie typy w ramach swojego Assembly, a następnie filtruje je ,a by uzyskać jedynie kolekcję lokalnych wersji komend lub zdarzeń. Z rezultatu zostają wykluczone dwa zdarzenia, które publikuje sama usługa operacji, ponieważ nie miałoby to sensu. Następnie posiadając kolekcję typów wszystkich komend lub zdarzeń, używając również refleksji następuje wywołanie generycznej metody SubcribeCommand lub SubscribeEvent. Dlaczego w odbywa się to w tak dziwny sposób? Posiadając typ jako zmienna nie jesteśmy inaczej wywołać metody generycznej. Przykładowo taki zapis w C# nie jest dozwolony:

 


var type = myMessage.GetType();

_busSubscriber.SubscribeCommand<type>();

 

Przejdźmy dalej i zobaczmy jak wygląda generyczny handler, do którego wpadają wszystkie odebrane zdarzenia (handler dla komend możemy pominąć, ponieważ nowe flow nie tworzy tam już obiektu operacji):

 


    public class GenericEventHandler<T> : IEventHandler<T> where T : class, IEvent
    {
        private readonly ISagaCoordinator _sagaCoordinator;
        private readonly IOperationPublisher _operationPublisher;
        private readonly IOperationsStorage _operationsStorage;

        public GenericEventHandler(ISagaCoordinator sagaCoordinator,
            IOperationPublisher operationPublisher,
            IOperationsStorage operationsStorage)
        {
            _sagaCoordinator = sagaCoordinator;
            _operationPublisher = operationPublisher;
            _operationsStorage = operationsStorage;
        }

        public async Task HandleAsync(T @event, ICorrelationContext context)
        {
            if (@event.BelongsToSaga())
            {
                var sagaContext = SagaContext.FromCorrelationContext(context);
                await _sagaCoordinator.ProcessAsync(@event, sagaContext);
            }

            switch (@event)
            {
                case IRejectedEvent rejectedEvent:
                    await _operationsStorage.SetAsync(context.Id, context.UserId,
                        context.Name, OperationState.Rejected, context.Resource,
                        rejectedEvent.Code, rejectedEvent.Reason);
                    await _operationPublisher.RejectAsync(context,
                        rejectedEvent.Code, rejectedEvent.Reason);
                    return;
                case IEvent _:
                    await _operationsStorage.SetAsync(context.Id, context.UserId,
                        context.Name, OperationState.Completed, context.Resource);
                    await _operationPublisher.CompleteAsync(context);
                    return;
            }
        }
    }

 

Pierwszego if-a pominiemy, ponieważ omówimy go w osobnym wpisie. To co istotne z punktu widzenia dzisiejszej tematyki to pattern matching. Jak widzisz w zależności od typu zdarzenia do lokalnego storage zapisany zostaje obiekt operacji z odpowiednim identyfikatorem, statusem i innymi danymi, a poza tym opublikowane zostaje zdarzenie poprzez obiekt IOperationPublisher:

 


    public class OperationPublisher : IOperationPublisher
    {
        private readonly IBusPublisher _busPublisher;

        public OperationPublisher(IBusPublisher busPublisher)
        {
            _busPublisher = busPublisher;
        }

        public async Task CompleteAsync(ICorrelationContext context)
            => await _busPublisher.PublishAsync(new OperationCompleted(context.Id,
                context.UserId, context.Name, context.Resource), context);

        public async Task RejectAsync(ICorrelationContext context, string code, string message)
            => await _busPublisher.PublishAsync(new OperationRejected(context.Id,
                context.UserId, context.Name, context.Resource, code, message), context);
    }

 

Pora przejść do ostatniej usługi tj. SignalR service. Usługa subskrybuje się pod oba zdarzenia w swojej klasie Startup. W tym przypadku obie wiadomości są obsługiwane przez jedną klasę handlera:

 


    public class OperationUpdatedHandler : IEventHandler<OperationCompleted>, IEventHandler<OperationRejected>
    {
        private readonly IHubService _hubService;
        
        public OperationUpdatedHandler(IHubService hubService)
        {
            _hubService = hubService;
        }

        public async Task HandleAsync(OperationCompleted @event, ICorrelationContext context)
            => await _hubService.PublishOperationCompletedAsync(@event);

        public async Task HandleAsync(OperationRejected @event, ICorrelationContext context)
            => await _hubService.PublishOperationRejectedAsync(@event);
    }

 

Obie metody HandleAsync wywołują jedynie metody obiektu typu IHubService, który to push-uje anonimowy obiekt:

 


    public class HubService : IHubService
    {
        private readonly IHubWrapper _hubContextWrapper;

        public HubService(IHubWrapper hubContextWrapper)
        {
            _hubContextWrapper = hubContextWrapper;
        }

        public async Task PublishOperationCompletedAsync(OperationCompleted @event)
            => await _hubContextWrapper.PublishToUserAsync(@event.UserId, 
                "operation_completed",
                new
                {
                    id = @event.Id,
                    name = @event.Name,
                    resource = @event.Resource
                }
            );

        public async Task PublishOperationRejectedAsync(OperationRejected @event)
            => await _hubContextWrapper.PublishToUserAsync(@event.UserId, 
                "operation_rejected",
                new
                {
                    id = @event.Id,
                    name = @event.Name,
                    resource = @event.Resource,
                    code = @event.Code,
                    reason = @event.Message 
                }
            );
    }

I tak oto całe flow dobiega końca! Nie było tego mało, nie było to na pewno tak proste jak w monolicie, ale cóż… trzeba to sobie jasno powiedzieć. Mikroserwisy to nie tylko lans i powód do naśmiewania się z osób, które piszą monolity. To często nierówna walka z problemami, które implikuje ich rozproszona natura, a o których wcześniej nigdy byśmy nie pomyśleli w kategorii problemu.

 

Jak zwykle po więcej detali i na demo z prostą aplikacją JS zapraszam do naszego darmowego kursu na YouTube 🙂

You may also like...