CQRS i mikroserwisy: odczyt danych

W poprzednim wpisie dosyć obszernie przedstawiłem ogólny koncept oraz implementację zapisu danych w aplikacji DShop, która opiera się na architekturze mikroserwisowej oraz wzorcu CQRS. W tym wpisie zajmiemy się drugim flow przepływu danych w aplikacji tj. ich odczytem. Zaczynajmy!

 

Ogólny zarys podejścia

Przed omówieniem kodu znów pozwolę sobie na uprzednią prezentację koncepcji posługując się prostym diagramem:

 

 

Całe flow rozpoczyna się oczywiście od otrzymania żądania HTTP, które w swoim URI zawiera (choć nie musi) dane potrzebne do pobrania konkretnego zasobu na serwerze (np. id zasobu). Gdy API Gateway (kontroler ASP.NET Core) odbierze żądanie, używa klienta biblioteki RestEase, aby przesłać je dalej, do konkretnej usługi. Usługa odbiera request kierowany do jej wewnętrznego (i niewidocznego dla świata) API po czym tworzy obiektową reprezentację zapytania użytkownika – Query, które przekazane zostaje do odpowiedniego query handlera. Jak nie trudno się domyślić jest to klasa, która po prostu wie co zrobić z konkretnym query tzn. wie w jaki sposób odpytać bazę danych (poprzez repozytorium) używając zawartych w query danych, po czym zwrócić oczekiwany rezultat. Następnie rezultat zostaje przekazany w body odpowiedzi HTTP do API Gateway i dalej do użytkownika końcowego.

Całe flow jest w moim odczuciu dość podobne do tego prezentowanego przy okazji zapisu danych. Posiada ono jednak trzy istotne różnice, na które powinieneś zwrócić uwagę:

  • dane przekazywane do API Gateway znajdują się w URI żądania, a nie w body. Wynika to ze specyfikacji RFC i ogólnej interpretacji HTTP GET. Cytując „The GET method means retrieve whatever information (in the form of an entity) is identified by the Request-URI”
  • komunikacja API Gateway – usługa, obywa się poprzez synchroniczny protokół HTTP, a nie kolejkę. O zasadności tego podejścia pisałem we wpisie „Ciemna strona mikroserwisów”, sekcja „Mikroserwisy == asynchroniczna komunikacja”
  • z uwagi na to, że stosujemy się do CQRS, query handler zwraca rezultat, ale NIE modyfikuje stanu aplikacji (nie wykonuje operacji CUD). To oznacza, że wykonanie query jest dla aplikacji operacją idempotentną (również zgodnie ze specyfikacją HTTP GET 😉 ).

 

Implementacja API Gateway

W poprzednim wpisie poruszaliśmy się wokół domeny produktów, dlatego zostańmy przy niej również tym razem. Poniżej znajduje się akcja kontrolera, która zwraca obiekt produktu na podstawie otrzymanego w żądaniu Id:

 


[AdminAuth]
public class ProductsController : BaseController
{
    private readonly IProductsService _productsService;

    public ProductsController(IBusPublisher busPublisher, 
        IProductsService productsService) : base(busPublisher)
    {
        _productsService = productsService;
    }

    //...

  
    [HttpGet("{id}")]
    [AllowAnonymous]
    public async Task<IActionResult> Get(Guid id)
        => Single(await _productsService.GetAsync(id));

    //...
}

 

Jak widzisz, jest to typowy kod w ASP.NET Core. Akcja posiada atrybut AllowAnonymous, aby umożliwić pobranie zasobu użytkownikom niezalogowanym. Ze ścieżki, poprzez binding „wyciągnięte” zostaje id produktu (typu GUID), a następnie wywołana zostaje metoda Single kontrolera bazowego, której to argumentem jest rezultat metody GetAsync obiektu _productsService. Zacznijmy od implementacji Single:

 


protected IActionResult Single<T>(T model, Func<T,bool> criteria = null)
{
    if (model == null)
    {
        return NotFound();
    }
    var isValid = criteria == null || criteria(model);
    if (isValid)
    {
        return Ok(model);
    }

    return NotFound();
}

 

Jest to nic innego jak opakowany null-check, który na podstawie wartości model, zwraca albo:

  • w przypadku gdy model istnieje – odpowiedź HTTP 200  – OK, z rezultatem w body
  • w przypadku gdy model jest null – odpowiedź HTTP 404 – not found

Sama metoda przyjmuje jeszcze opcjonalny predykat criteria, który może nałożyć dodatkowe restrykcje na zapytanie np. sprawdź czy Id przekazane w zapytaniu jest tym samym z JWT itp.

Przejdźmy teraz do obiektu _productsService. Jest to wcześniej wspomniany klient biblioteki RestEase. Jeżeli nie jesteś z nią zaznajomiony to zapraszam Ciebie serdecznie do przeczytania mojego anglojęzycznego wpisu na ten temat. Zobaczmy w jaki sposób zdefiniowane zostało API usługi produktów dla biblioteki RestEase:

 


[SerializationMethods(Query = QuerySerializationMethod.Serialized)]
public interface IProductsService
{
    [AllowAnyStatusCode]
    [Get("products/{id}")]
    Task<Product> GetAsync([Path] Guid id);

    //...
}


 

Jeżeli znasz już RestEase to kod nie powinien być żadnym zaskoczeniem. Warto zwrócić jedynie uwagę na atrybut AllowAnyStatusCode. Dzięki niemu nawet, jeżeli żądanie zwróci odpowiedź HTTP ze statusem 404 (not found), to nie zostanie rzucony wyjątek, a rezultatem metody będzie null obsłużony następnie w omówionej metodzie Single.
Posiadając definicję dla internal API konkrentej usługi, należy jeszcze dokonać jej rejestracji. Kod za to odpowiedzialny znajduje się w klasie Startup:

 


services.RegisterServiceForwarder<IProductsService>("products-service");

 

Metoda rozszerzająca RegisterServiceForwarder jest opakowaniem rejestracji klienta dla biblioteki RestEase (kodu znajdziesz tutaj). Jako parametr przyjmuje nazwę obiektu konfiguracyjnego z pliku appsettings.json, w którym to znajduja się następujące informacje:

 

      {
        "name": "products-service",
        "host": "localhost",
        "scheme": "http",
        "port": "5006"
      }

 

Posiadając powyższe dane RestEase jest w stanie wykonać żądanie HTTP pod konkretny adres usługi. Z pliku konfiguracyjnego tworzy część hosta adresu tj.

{scheme}://{host}:{port}

A z definicji klienta (czyli interfejsu) wyciaga dalszą część URI konkretnego zasobu. Na tym etapie być może zastanawiać się czy warto było aż tak komplikować sobie życie i „bawić się” w dodatkową konfigurację hosta w zewnętrzynych plikach. Odpowiedź to TAK-było warto! Wynika to z bardzo prostego powodu. Zwróć proszę uwagę, że powyższy host (tj. localhost) będzie działać… no własnie, lokalnie na maszynie deweloperskiej. Problem pojawi się kiedy nasze API + usługi popakujemy w dedykowane kontenery Dockerowe i umieścimy w Docker Swarm/ Kubernetes. Wyciągnięcie części hosta URI do zewnętrzengo pliku konfiguracyjnego daje nam łatwą do zarządzania elastyczność. Kiedy pracujemy lokalnie z kodem, używamy localhost, a kiedy chcemy zbudować obraz dockerowy dla API, wtedy sięgamy do pliku appsettings.docker.json, ktory zawiera zmodyfikowaną wersję:

 

     {
        "name": "products-service",
        "host": "products-service",
        "scheme": "http",
        "port": "5000"
      }

 

Jak widzisz localhost został zastąpiony przez products-service, który jest niczym innym jak nazwą obrazu z dokcer-compose. Dzieki temu nie znając konkretnego IP możemy komunikować się poprzez HTTP z kontenerem dockerowym.

Podsumujmy zatem flow odczytu danych do tego momentu:

  1. API Gateway na starcie aplikacji konfiguruje klientów RestEase. Z pliku appsettings.json pobrane zostają informacje o nazwach hostów konkretnych mikroserwisów.
  2. Odebrane zostaje żądanie HTTP z danymi w URI.
  3. Kontroler ASP.NET Core używa wstrzyknietego klienta RestEase, aby przekazać żądanie HTTP do internal API odpowiedniej usługi.

 

Implementacja usługi

Na tym etapie API Gateway wykonało żądniae HTTP do mikroserwisu produktów, przekazując w URI identyfikator otrzymany od klienta. Zobaczmy na kod internal API usługi:

 


    [Route("[controller]")]
    public class ProductsController : BaseController
    {
        public ProductsController(IDispatcher dispatcher)
            :base(dispatcher)
        {
        }
        //...

        [HttpGet("{id}")]
        public async Task<ActionResult<ProductDto>> GetAsync([FromRoute] GetProduct query)
            => Single(await QueryAsync(query));
    }

 

Kod jest dość podobny do tego z API Gateway, jednak zawiera drobne różnice. Po pierwsze, zwróć proszę uwagę, że na tym etapie operujemy już na obiekcie Query. W tym przypadku nasza kwerenda nazywa się GetProduct, a jej kod prezentuję się następująco:

 


    public class GetProduct : IQuery<ProductDto>
    {
        public Guid Id { get; set; }
    }

 

Klasa kwerendy implementuje generyczny interfejs IQuery<TResult> (jest to wyłącznie marker) i posiada jedną właściwość. Jest to oczywiście identyfikator przekazany z API Gateway. Mamy zatem obiekt, który reprezentuje zapytanie użytkownika po stronie usługi produktów. Warto wspomnieć, że „konstrukcja” tego obiektu (mimo braku body w GET) była możliwa dzieki atrybutowi FromRoute, który zbindował dane zawarte w URI do modelu GetProduct.

Po otrzymaniu Query po stronie mikrousługi, należy wykonać proces taki sam jak w przypadku komendy – trzeba przekazać obiekt do odpowiedniego handlera. W tym celu do kontrolera wstrzykniety został IDispatcher. Jego implementacja prezentuje się następująco:

 


    public class Dispatcher : IDispatcher
    {
        private readonly ICommandDispatcher _commandDispatcher;
        private readonly IQueryDispatcher _queryDispatcher;

        public Dispatcher(ICommandDispatcher commandDispatcher,
            IQueryDispatcher queryDispatcher)
        {
            _commandDispatcher = commandDispatcher;
            _queryDispatcher = queryDispatcher;
        }

        public async Task SendAsync<TCommand>(TCommand command) where TCommand : ICommand
            => await _commandDispatcher.SendAsync(command);

        public async Task<TResult> QueryAsync<TResult>(IQuery<TResult> query)
            => await _queryDispatcher.QueryAsync<TResult>(query);
    }

    public class QueryDispatcher : IQueryDispatcher
    {
        private readonly IComponentContext _context;

        public QueryDispatcher(IComponentContext context)
        {
            _context = context;
        }

        public async Task<TResult> QueryAsync<TResult>(IQuery<TResult> query)
        {
            var handlerType = typeof(IQueryHandler<,>)
                .MakeGenericType(query.GetType(), typeof(TResult));

            dynamic handler = _context.Resolve(handlerType);

            return await handler.HandleAsync((dynamic)query);
        }
    }

 

Sam IDsipatcher to nic innego jak kompozycja dwóch dispatcherów:

  • ICommandDispatcher
  • IQueryDispatcher

Nas w tym przypadku interesuje drugi. Jego implementacja nie jest skomplikowana i opiera się na prostym wyszukaniu odpowiedniej implementacji handlera w Autofac-u. Jak nie trudno wywnioskować z powyższego kodu, każdy query handler musi implementować interfejs:

 


    public interface IQueryHandler<TQuery,TResult> where TQuery : IQuery<TResult>
    {
        Task<TResult> HandleAsync(TQuery query);
    }

 

Po dostarczeniu odpowiedniej implementacji handlera (poprzez metodę Resolve), na instancji wywołana zostaje metoda HandleAsync, a jako parametr przekazany zostaje obiekt kwerendy. Spójrz zatem w jaki sposób zaimplementowany został query handler dla GetProduct:

 


    public sealed class GetProductHandler : IQueryHandler<GetProduct, ProductDto>
    {
        private readonly IProductsRepository _productsRepository;

        public GetProductHandler(IProductsRepository productsRepository)
            => _productsRepository = productsRepository;

        public async Task<ProductDto> HandleAsync(GetProduct query)
        {
            var product = await _productsRepository.GetAsync(query.Id);

            return product == null ? null : new ProductDto
            {
                Id = product.Id,
                Name = product.Name,
                Description = product.Description,
                Vendor = product.Vendor,
                Price = product.Price
            };
        }
    }

 

Podobnie jak to miało miejsce w przypadku komend, handler robi dokładnie to co sugeruje nazwa obiektu. Pobiera na podstawie identyfikatora dane produktu, po czym dokonuje transformacji encji na DTO. Rezultat zostaje następnie zwrócony do internal API usługi -> API Gateway -> Klienta. I na tym całe flow się kończy 🙂 Podsumujmy zatem kolejne kroki flow w usłudze produktów:

  1. Dane zawarte w URI żądania HTTP zostają przekształcone w kwerendę za pośrednictwem atrybutu FromRoute
  2. Query zostaje odbrane przez akcę kontrolera i przkezane do IDispatcher->IQueryDispatcher
  3. IQueryDispatcher dostarcza odpowiednią implementację query handlera i wywołuje na niej metodę HandleAsync
  4. GetProductHandler wykorzystuje dane zawarte w Query (tj. Id) do pobrania encji produktu poprzez repozytorium danych.
  5. Encja zostaje przekształcona na DTO
  6. Query handler zwraca DTO do kontrolera (internal API)
  7. Internal API zwraca odpowiedź HTTP do API Gateway z DTO w request body.
  8. API Gateway przekazuje DTO produktu dalej do klienta.

 

To wszystko na dziś. Mam nadzieję, że choć w małym stopniu pomogłem Ci zrozumieć w jaki sposób możesz połączyć wzorzez CQRS z architekturą mikrosusług. Jeżeli masz jakieś pytania dotyczące tej serii, to tradycyjnie zapraszam Ciebie do sekcji komentarzy 😉

 

LINKI DO PREZENTOWANEGO KODU:

 

You may also like...

  • disqus_wsfjvYpB9i

    Macie jakąś datę z Piotrkiem ustaloną jeśli chodzi o wydanie całego kursu z tymi mikroserwisami ?

  • Pingback: dotnetomaniak.pl()

  • Patryk Roguszewski

    Dlaczego nie skorzystaliście z AutoMappera?

    • Ja osobiście nie jestem fanem AutoMappera. Poza tym mieliśmy mało QH i małe DTO, więc szybciej było zrobić to ręcznie niż bawić się w konfigurowanie map.