Czym są Pipeline Behaviors?

Pipeline Behaviors w MediatR to mechanizm pozwalający na przechwytywanie i modyfikowanie przetwarzania żądań (requestów) przed wykonaniem właściwego handlera oraz po jego zakończeniu. Koncepcyjnie przypominają middleware w ASP.NET Core, z tą różnicą, że działają na poziomie logiki biznesowej, a nie warstwy HTTP.

Każdy behavior implementuje interfejs IPipelineBehavior<TRequest, TResponse> i otrzymuje:

  • Żądanie (TRequest)
  • Delegat next do wywołania kolejnego elementu w łańcuchu
  • Token anulowania

Dzięki temu behaviors tworzą łańcuch odpowiedzialności (Chain of Responsibility pattern), przez który przechodzi każde żądanie.

Dlaczego są niezbędne?

W aplikacjach biznesowych nieuchronnie pojawiają się zagadnienia przekrojowe (cross-cutting concerns) – aspekty funkcjonalności, które przewijają się przez wiele komponentów systemu:

  • Walidacja – sprawdzanie poprawności danych wejściowych
  • Logowanie – rejestrowanie operacji w celach audytowych
  • Zarządzanie transakcjami – zapewnienie spójności danych
  • Buforowanie – optymalizacja wydajności dla często używanych zapytań
  • Obsługa błędów – ustandaryzowane przetwarzanie wyjątków
  • Monitorowanie wydajności – pomiar czasów wykonania operacji
  • Autoryzacja – kontrola dostępu do zasobów

Tradycyjne podejścia do rozwiązania tych problemów prowadzą do:

  • Duplikacji kodu – każdy handler powtarza tę samą logikę walidacji, logowania, itp.
  • Trudności w utrzymaniu – zmiana wymaga modyfikacji dziesiątek klas
  • Niejasnych zależności – kod biznesowy miesza się z infrastrukturą
  • Testowania – konieczność mockowania infrastruktury w każdym teście

Pipeline behaviors rozwiązują te problemy poprzez wydzielenie zagadnień przekrojowych do osobnych, reużywalnych komponentów.

Szczegółowy przykład: Behavior walidacyjny

public class ValidationBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;
    private readonly ILogger<ValidationBehavior<TRequest, TResponse>> _logger;
    
    public ValidationBehavior(
        IEnumerable<IValidator<TRequest>> validators,
        ILogger<ValidationBehavior<TRequest, TResponse>> logger)
    {
        _validators = validators;
        _logger = logger;
    }
    
    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        
        // Jeśli brak walidatorów, pomijamy walidację
        if (!_validators.Any())
        {
            _logger.LogDebug(
                "Brak walidatorów dla {RequestName}, pomijam walidację", 
                requestName);
            return await next();
        }
        
        _logger.LogDebug(
            "Walidacja {RequestName} przy użyciu {ValidatorCount} walidatorów",
            requestName,
            _validators.Count());
        
        // Tworzymy kontekst walidacji
        var context = new ValidationContext<TRequest>(request);
        
        // Wykonujemy wszystkie walidatory równolegle
        var validationResults = await Task.WhenAll(
            _validators.Select(v => v.ValidateAsync(context, cancellationToken))
        );
        
        // Zbieramy wszystkie błędy walidacji
        var failures = validationResults
            .SelectMany(result => result.Errors)
            .Where(failure => failure != null)
            .ToList();
        
        if (failures.Any())
        {
            _logger.LogWarning(
                "Walidacja {RequestName} zakończona niepowodzeniem. Liczba błędów: {ErrorCount}",
                requestName,
                failures.Count);
                
            throw new ValidationException(failures);
        }
        
        _logger.LogDebug("Walidacja {RequestName} zakończona sukcesem", requestName);
        
        // Przechodzimy dalej w łańcuchu
        return await next();
    }
}

Kluczowe aspekty implementacji:

  1. Równoległe wykonanie walidatorówTask.WhenAll pozwala na równoczesne uruchomienie wszystkich walidatorów, co poprawia wydajność
  2. Logowanie diagnostyczne – rejestrujemy rozpoczęcie i zakończenie walidacji dla celów debugowania
  3. Zbieranie wszystkich błędów – użytkownik otrzymuje kompletną listę problemów, nie tylko pierwszy napotkany błąd

Rejestracja w kontenerze DI

services.AddMediatR(cfg =>
{
    cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly());
    
    // Behaviors wykonują się w kolejności rejestracji
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(AuthorizationBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(TransactionBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(PerformanceBehavior<,>));
});

// Rejestracja walidatorów FluentValidation
services.AddValidatorsFromAssembly(Assembly.GetExecutingAssembly());

Uwagi dotyczące kolejności:

  • Logowanie powinno być pierwsze, aby rejestrować wszystkie operacje
  • Walidacja przed autoryzacją – nie ma sensu sprawdzać uprawnień do niepoprawnych danych
  • Transakcja najbliżej handlera – otwieramy ją najpóźniej, zamykamy najwcześniej
  • Performance monitoring może być na początku lub końcu, w zależności od tego, co chcemy mierzyć

Kolejność wykonania – szczegółowy przebieg

Pipeline behaviors tworzą strukturę przypominającą “matriószkę” – każdy behavior otacza kolejny:

Żądanie (Request)
  │
  ├─→ LoggingBehavior START
  │     │
  │     ├─→ ValidationBehavior START
  │     │     │
  │     │     ├─→ AuthorizationBehavior START
  │     │     │     │
  │     │     │     ├─→ TransactionBehavior START
  │     │     │     │     │
  │     │     │     │     ├─→ HANDLER
  │     │     │     │     │
  │     │     │     │     └─← TransactionBehavior END (commit)
  │     │     │     │
  │     │     │     └─← AuthorizationBehavior END
  │     │     │
  │     │     └─← ValidationBehavior END
  │     │
  │     └─← LoggingBehavior END
  │
  └─← Odpowiedź (Response)

Istotne konsekwencje:

  • Błąd w wewnętrznym behaviorze “wypływa” przez wszystkie zewnętrzne, które mogą go przechwycić
  • Timing zewnętrznego behaviora obejmuje czas wykonania wszystkich wewnętrznych
  • Transakcja powinna być jak najbliżej handlera, aby minimalizować czas jej trwania

Behavior zarządzający transakcjami

public class TransactionBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly IDbContext _context;
    private readonly ILogger<TransactionBehavior<TRequest, TResponse>> _logger;
    
    public TransactionBehavior(
        IDbContext context,
        ILogger<TransactionBehavior<TRequest, TResponse>> logger)
    {
        _context = context;
        _logger = logger;
    }
    
    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        
        // Tylko komendy wymagają transakcji
        // Zapytania (queries) są read-only i nie modyfikują stanu
        if (request is IQuery<TResponse>)
        {
            return await next();
        }
        
        _logger.LogDebug("Rozpoczynam transakcję dla {RequestName}", requestName);
        
        // Rozpoczynamy transakcję z odpowiednim poziomem izolacji
        await using var transaction = await _context.BeginTransactionAsync(
            IsolationLevel.ReadCommitted,
            cancellationToken);
        
        try
        {
            var response = await next();
            
            await transaction.CommitAsync(cancellationToken);
            
            _logger.LogDebug(
                "Transakcja dla {RequestName} zakończona sukcesem",
                requestName);
                
            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(
                ex,
                "Transakcja dla {RequestName} wycofana z powodu błędu",
                requestName);
                
            await transaction.RollbackAsync(cancellationToken);
            throw;
        }
    }
}

Kluczowe decyzje projektowe:

  1. Różnicowanie komend i zapytań – tylko komendy modyfikujące stan wymagają transakcji
  2. Poziom izolacjiReadCommitted to rozsądny domyślny wybór w większości scenariuszy
  3. Jawne wycofywanie – choć nie jest technicznie konieczne (transakcja zostanie wycofana przy Dispose), jawny rollback poprawia czytelność logów

Warunkowe wykonanie – zaawansowane scenariusze

Rozróżnienie komend i zapytań

public interface ICommand : IRequest<Result> { }
public interface ICommand<TResponse> : IRequest<Result<TResponse>> { }
public interface IQuery<TResponse> : IRequest<Result<TResponse>> { }

public async Task<TResponse> Handle(...)
{
    if (request is ICommand || request is ICommand<TResponse>)
    {
        // Logika specyficzna dla komend
        // np. walidacja biznesowa, audyt, transakcje
    }
    
    if (request is IQuery<TResponse>)
    {
        // Logika specyficzna dla zapytań
        // np. cache, read-only connection
    }
    
    return await next();
}

Atrybuty kontrolujące behaviors

[AttributeUsage(AttributeTargets.Class)]
public class SkipTransactionAttribute : Attribute { }

public async Task<TResponse> Handle(...)
{
    if (request.GetType().GetCustomAttribute<SkipTransactionAttribute>() != null)
    {
        return await next();
    }
    
    // Normalna logika transakcji
}

Behavior monitorujący wydajność

public class PerformanceBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly ILogger<PerformanceBehavior<TRequest, TResponse>> _logger;
    private readonly IMetrics _metrics;
    private const int PerformanceThresholdMs = 500;
    
    public PerformanceBehavior(
        ILogger<PerformanceBehavior<TRequest, TResponse>> logger,
        IMetrics metrics)
    {
        _logger = logger;
        _metrics = metrics;
    }
    
    public async Task<TResponse> Handle(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            var response = await next();
            stopwatch.Stop();
            
            var elapsedMs = stopwatch.ElapsedMilliseconds;
            
            // Metryki dla systemu monitoringu
            _metrics.RecordRequestDuration(requestName, elapsedMs);
            
            if (elapsedMs > PerformanceThresholdMs)
            {
                _logger.LogWarning(
                    "Wydajność: {RequestName} wykonał się w {ElapsedMs}ms (próg: {ThresholdMs}ms). Żądanie: {@Request}",
                    requestName,
                    elapsedMs,
                    PerformanceThresholdMs,
                    request);
            }
            else
            {
                _logger.LogDebug(
                    "Wydajność: {RequestName} wykonał się w {ElapsedMs}ms",
                    requestName,
                    elapsedMs);
            }
            
            return response;
        }
        catch (Exception)
        {
            stopwatch.Stop();
            _metrics.RecordRequestDuration(requestName, stopwatch.ElapsedMilliseconds);
            throw;
        }
    }
}

Korzyści w projektach bankowych

W systemach bankowych pipeline behaviors przynoszą wymierne korzyści:

1. Audytowalność i zgodność z regulacjami

public class AuditBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
{
    // Centralne logowanie wszystkich operacji zgodnie z wymogami PSD2/RODO
    // - Kto wykonał operację (użytkownik, system)
    // - Kiedy (timestamp z timezone)
    // - Co zostało zmienione (przed/po)
    // - Kontekst biznesowy (numer konta, kwota)
}

2. Bezpieczeństwo wielowarstwowe

public class AuthorizationBehavior<TRequest, TResponse>
    : IPipelineBehavior<TRequest, TResponse>
{
    // - Weryfikacja tożsamości (authentication)
    // - Kontrola uprawnień (authorization)
    // - Sprawdzenie limitów operacyjnych
    // - Wykrywanie podejrzanych wzorców
}

3. Spójność operacyjna

  • Każda operacja przechodzi przez te same kontrole
  • Nie ma możliwości “pominięcia” walidacji czy audytu
  • Jednolite formaty błędów i logów
  • Przewidywalne zachowanie systemu

4. Testowalność

[Fact]
public async Task ValidationBehavior_Should_Reject_Invalid_Transfer()
{
    // Testowanie behaviora w izolacji, bez konieczności
    // uruchamiania całego handlera czy bazy danych
    
    var validators = new[] { new TransferCommandValidator() };
    var behavior = new ValidationBehavior<TransferCommand, Result>(validators);
    
    var invalidCommand = new TransferCommand 
    { 
        Amount = -100 // Niepoprawna kwota
    };
    
    await Assert.ThrowsAsync<ValidationException>(
        () => behavior.Handle(invalidCommand, () => Task.FromResult(Result.Success()), default)
    );
}

5. Łatwość utrzymania

  • Zmiana logiki walidacji w jednym miejscu wpływa na wszystkie operacje
  • Dodanie nowego wymogu regulacyjnego to dodanie jednego behaviora
  • Refactoring nie wymaga modyfikacji dziesiątek handlerów
  • Jasny punkt integracji dla zewnętrznych systemów (np. fraud detection)

6. Separacja zagadnień

┌─────────────────────────────────────┐
│   Warstwa prezentacji (API)         │  ← Deserializacja, routing
├─────────────────────────────────────┤
│   Pipeline Behaviors                │  ← Cross-cutting concerns
├─────────────────────────────────────┤
│   Handlery (logika biznesowa)       │  ← Czysta logika domeny
├─────────────────────────────────────┤
│   Warstwa dostępu do danych         │  ← Persystencja
└─────────────────────────────────────┘

Każda warstwa ma jasno określoną odpowiedzialność. Handlery zawierają tylko logikę biznesową, bez “szumu” infrastrukturalnego.

Pułapki i dobre praktyki

Kolejność ma znaczenie

Źle: Transakcja przed walidacją

cfg.AddBehavior<TransactionBehavior<,>>(); // Otwiera transakcję
cfg.AddBehavior<ValidationBehavior<,>>();  // Może odrzucić żądanie
// Transakcja była otwarta niepotrzebnie!

Dobrze: Walidacja przed transakcją

cfg.AddBehavior<ValidationBehavior<,>>();  // Najpierw sprawdź
cfg.AddBehavior<TransactionBehavior<,>>(); // Potem otwieraj zasoby

Nie dubluj logiki z handlerów

Źle: Behavior zawiera logikę biznesową

// To NIE jest zagadnienie przekrojowe, to logika biznesowa!
if (request is TransferCommand transfer && transfer.Amount > 10000)
{
    await _fraudDetection.CheckHighValueTransfer(transfer);
}

Dobrze: Behavior wywołuje odpowiednią usługę

// Behavior tylko orkiestruje, logikę zostawia dla serwisów
if (request is IRequiresFraudCheck)
{
    await _fraudDetection.CheckRequest(request);
}

Uważaj na wydajność

Behaviors wykonują się dla każdego żądania. Unikaj:

  • Ciężkich operacji I/O w każdym behaviorze
  • Nadmiernego logowania w środowiskach produkcyjnych
  • Zbędnych alokacji pamięci

Testuj behaviors osobno

Każdy behavior to niezależny komponent z jasną odpowiedzialnością – powinien mieć własne testy jednostkowe.

Podsumowanie

Pipeline Behaviors to fundamentalny element architektury opartej na MediatR. Umożliwiają:

  • Separację zagadnień – logika biznesowa oddzielona od infrastruktury
  • Zasadę DRY – brak duplikacji kodu cross-cutting concerns
  • Testowalność – każdy aspekt można testować osobno
  • Łatwość utrzymania – zmiany w jednym miejscu
  • Spójność – te same zasady dla wszystkich operacji

W aplikacjach enterprise, szczególnie w sektorze finansowym, gdzie wymagania dotyczące audytowalności, bezpieczeństwa i spójności są kluczowe, pipeline behaviors stanowią nie tyle opcję, co konieczność architektury.

W następnym artykule przedstawię zaawansowane wzorce łączące pipeline behaviors z result pattern oraz strategię obsługi błędów w systemach rozproszonych.


Dodatkowe zasoby