Projektując złożone aplikacje często zachodzi potrzeba wykonania jakiejś funkcji w przyszłości - jednorazowo lub cyklicznie. Wystarczy wziąć na warsztat aplikację, która przypomina nam o różnych wydarzeniach, wysyła cyklicznie maile lub powiadomienia, czy też dokonuje płatności w modelu subskrypcyjnym.

Wymienione wyżej funkcjonalności wymagają implementacji jakiegoś mechanizmu, który pozwoli nam to zrealizować w tle czyli asynchronicznie. W tym artykule postaram się to omówić na przykładzie ekosystemu Java.

Dlaczego asynchronicznie?

Zadania w tle są przydatnym mechanizmem i możliwością dla systemu sprawdzenia swojego wewnętrznego stanu i na jego podstawie w razie potrzeby wykonania pewnych akcji np. wysłanie maila, przygotowanie raportu czy też innego rodzaju uspójnienie systemu.

Wyobraźmy sobie, że chcemy aby nasz system wygenerował jakiś skomplikowany raport analityczny. Wygenerowanie wspomnianego raportu może być dość czasochłonne gdyż system musi najpierw pobrać dane z jakichś zewnętrznych źródeł i je przetworzyć w sensowny sposób w odpowiednim formacie.

Bez względu na to czy jest to aplikacja desktopowa, webowa czy mobilna, nie byłoby dobrym pomysłem zmuszanie użytkownika do czekania aż wspomniany wyżej raport się wygeneruje. Lepszym sposobem byłoby umożliwienie użytkownikowi kontynuowanie korzystania z aplikacji po uprzednim wrzuceniu generowania raportu w tło czyli wykonanie tego procesu asynchronicznie. Następnie po ukończeniu tego procesu powiadomienie użytkownika o rezultacie i możliwości pobrania raportu.

Typowym technicznym rozwiązaniem tego problemu jest skorzystanie z kolejki, gdzie główny system przyjmujący żądania od użytkownika wrzuca zdarzenia na kolejkę a system raportowy je odbiera i podejmuje odpowiednie działania. Dzięki temu, że wykonamy to zadanie asynchronicznie, odciążymy nasz główny mikroserwis udostępniający API, które powinno działać szybko i niezawodnie - nieobciążone przez jakieś "ciężkie" zadania, które działają w tle.

Java / Async

W tej sekcji skupimy się na tym jakie mamy możliwości przetwarzania asynchronicznego w ekosystemie Java. Sprawdźmy najpierw jakie mamy możliwości pomijająć wykorzystanie zewnętrznych bibliotek i frameworków.

Pierwszym co może przyjść do głowy jest użycie wątków.

public static void main(String[] args) throws InterruptedException {
        var thread = new Thread(() -> {
                System.out.println("Process something in the background");
        });
        thread.start();
        thread.join();
}

Powyższe rozwiązanie, mimo że proste, ma istotne wady. Tworzenie wątków jest kosztowne pod względem zasobów, więc tworzenie ich na żądanie raczej nie jest dobrym pomysłem. Poza tym nie mamy kontroli nad ich ilością, więc w łatwy sposób moglibyśmy wysycić nasz system z zasobów.

Rozwiązaniem jest pula wątków, która może zwiększyć wydajność naszego systemu i pozwolić na nieco więcej kontroli.

public static void main(String[] args) throws InterruptedException {
        var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        pool.execute(() -> {
                System.out.println("Process something in the background");
        });
        pool.shutdown();
        pool.awaitTermination(10, TimeUnit.SECONDS);
}

Jeżeli korzystamy z frameworków Spring i Spring Boot możemy wykorzystać do tego samego celu klasę TaskExecutor.

@Component
public class Foo {

    private final TaskExecutor taskExecutor;

    public Foo(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void bar() {
        taskExecutor.execute(() -> {
            System.out.println("Process something in the background");
        });
    }
}

Implementacja tej klasy jest dostarczana dzięki autokonfiguracji w TaskExecutionAutoConfiguration. Domyślnie na starcie otrzymamy pulę 8 wątków, która może rosnąć lub maleć w zależności od obciążenia (ilość zadań w kolejce).

Konfigurację możemy w łatwy sposób tuningować.

spring:
  task:
    execution:
      pool:
        max-size: 16
        queue-capacity: 100
        keep-alive: "10s"

Powyższa konfiguracja ustawia maksymalną liczbę wątków w puli na 16, w związku z tym jeżeli kolejka zadań do wykonania zapełni się (100 zadań), pula wątków zwiększy się do 16 wątków. Z kolei jeżeli wątki staną się bezczynne minimum przez 10 sekund, zostaną wyrzucone z puli (w przeciwieństwie do domyślnej wartości 60 sekund).

Możemy również ręcznie skonfigurować ten mechanizm dostarczając kawałek własnej klasy konfiguracyjnej.

@Configuration
public class AppConfig {

    @Bean
    public TaskExecutor taskExecutor() {
        var taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(4);
        taskExecutor.setMaxPoolSize(16);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setThreadNamePrefix("task-");
        return taskExecutor;
    }
}

Spring dostarcza również adnotację @Async, dzięki której w łatwy sposób możemy wywoływać dowolną metodę asynchronicznie korzystając z puli wątków, którą wcześniej utworzyliśmy.

Mechanizm ten trzeba najpierw włączyć adnotacją @EnableAsync co spowoduje zaimportowanie odpowiedniej autokonfiguracji.

@Configuration
@EnableAsync
public class AppConfig {

    @Bean
    public TaskExecutor taskExecutor() {
        var taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(4);
        taskExecutor.setMaxPoolSize(16);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setThreadNamePrefix("task-");
        return taskExecutor;
    }

    @Bean
    public Clock clock() {
        return Clock.system(ZoneId.of("Europe/Warsaw"));
    }
}

Następnie zamiast ręcznie wstrzykiwać TaskExecutor wykorzystamy wspomnianą adnotację @Async.

@Slf4j
@Component
public class Foo {

    @Async
    public void bar() {
        log.info("Hello world");
    }

    @Async
    public Future xyz() {
        return new AsyncResult<>("Hello world as a result");
    }
}

@SpringBootApplication
public class DemoApplication {

        public static void main(String[] args) {
                SpringApplication.run(DemoApplication.class, args);
        }

        @Autowired
        private Foo foo;

        @EventListener(ApplicationReadyEvent.class)
        public void ready() throws ExecutionException, InterruptedException {
                foo.bar();
                String result = foo.xyz().get();
                System.out.println(result);
        }
}

Scheduler

Wiemy już w jaki sposób możemy wrzucać zadania w tło. W tej sekcji poświęcimy czas jak takie zadania możemy planować i wykonywać w przyszłości jednorazowo lub cyklicznie.

Implementowanie takiego mechanizmu samodzielnie byłoby dość czasochłonne dlatego będziemy posiłkować się tym co dostarcza nam Spring. Istnieją również zewnętrzne alternatywy tj. Quartz, którymi możesz się zainteresować.

Analogicznie jak w przypadku TaskExecutor w celu planowania zadań mamy do dyspozycji klasę TaskScheduler. Spring dzięki autokonfiguracji w klasie TaskSchedulingAutoConfiguration dostarcza nam implementację tej klasy.

@SpringBootApplication
public class DemoApplication {

        public static void main(String[] args) {
                SpringApplication.run(DemoApplication.class, args);
        }

        @Autowired
        private MailSender mailSender;

        @Autowired
        private TaskScheduler taskScheduler;

        @EventListener(ApplicationReadyEvent.class)
        public void ready() {
                // 1
                taskScheduler.schedule(() -> {
                        mailSender.sendMail();
                }, Instant.now().plusSeconds(3600));

                // 2
                taskScheduler.scheduleAtFixedRate(() -> {
                        System.out.println("Hello world");
                }, 1000);
        }
}

Powyższy przykład pokazuje jak możemy dynamicznie zaplanować jednorazowe zadanie do wykonania na później, w tym przypadku wysyłka maila po upłynięciu godziny a także zadanie wykonywane cyklicznie.

Spring dostarcza nam również adnotację @Scheduled, dzięki której w łatwy sposób możemy wyrazić w jakich okolicznościach nasze job'y mają być uruchamiane i wykonywane. Mechanizm ten trzeba najpierw włączyć adnotacją @EnableScheduling co spowoduje zaimportowanie odpowiedniej autokonfiguracji.

@Configuration
@EnableAsync
@EnableScheduling
public class AppConfig {

    @Bean
    public Clock clock() {
        return Clock.system(ZoneId.of("Europe/Warsaw"));
    }
}

Następnie możemy przejść do stworzenia przykładowego job'a.

@Slf4j
@Component
public class JobExample {

    @Scheduled(fixedRate = 1000)
    public void hello() {
        log.info("Hello world");
    }
}

W tym przypadku po prostu wypisujemy na konsolę 'Hello world' co sekundę. Powyższa adnotacja ma więcej parametrów:

  • zone - strefa czasowa, według której ma zostać uruchomione zadanie
  • initialDelay - opóźnienie pierwszego uruchomienia w milisekundach (jednostkę można zmienić parametrem timeUnit)
  • fixedDelay - czas pomiędzy zakończeniem ostatniego wykonania a rozpoczęciem kolejnego (wykorzystujemy do zadań zależnych od siebie)
  • fixedRate - czas pomiędzy rozpoczęciem ostatniego i kolejnego wykonania (wykorzystujemy do zadań nieposiadających zasobów współdzielonych między sobą)
  • fixedDelayString - liczba w formacie zrozumiałym przez Duration np. PT48H
  • cron - wyrażenie cron w postaci ciągu znaków
    "a b c d e f" (cron expression)
    a - sekunda (0-59) b - minuta (0-59) c - godzina (0-23) d - dzien msca (1-31) e - miesiac (1-12) f - dzien tygodnia
    przykłady:
    9 12 * * * – co sekundę przez minutę od godz. 12:09:00 każdego dnia
    0 0 2-4 * * * – godz.2:00:00, 3:00:00 i 4:00:00 każdego dnia
    0 * 6,19 * * 2 – 6:00:00 i 19:00:00 w każdy wtorek
    0 0/30 10 * JAN * – 10:00:00, 10:30:00 każdego dnia stycznia
    0 0 12 * * MON-FRI – 12:00:00 od poniedziałku do piątku
    0 0 0 3 5 ? – każdego 3 maja o północy

Istnieje możliwość jednej metodzie przypisać wiele wywołań cyklicznych, które zgrupowane tworzą harmonogram. Należy do tego wykorzystać adnotacje @Schedules i w jej argumencie przekazać zadania do wykonania.

@Schedules({
        @Scheduled(cron = "0 30 10-13 ? * WED,FRI"),
        @Scheduled(fixedDelay = 10000)
})

Warto pamiętać o następujących regułach związanych z metodą pod adnotacją @Scheduled:

  • powinna zwracać void
  • nie powinna mieć żadnych parametrów w innym przypadku zostanie rzucony wyjątek typu IllegalStateException

Wielkość puli

Domyślnie w celu wykonania zadań Spring tworzy pulę wątków o wielkości 1. W związku z tym, jeżeli uruchomimy sekwencję kilku niezależnych zadań używając parametru fixedRate, implementacja zadziała podobnie jak fixedDelay ponieważ w tym przypadku w jednym momencie tylko jedno zadanie może być uruchomione.

Możemy w prosty sposób zmienić liczbę wątków w puli aby zmienić to zachowanie dodając odpowiednią konfigurację.

spring:
  task:
    scheduling:
      thread-name-prefix: "scheduling-"
      pool:
        size: 2

Oczywiście nic nie stoi na przeszkodzie aby dostarczyć dla frameworka własną implementację.

@Configuration
public class AppConfig {

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(4);
        threadPoolTaskScheduler.setThreadNamePrefix("scheduling-");
        return threadPoolTaskScheduler;
    }
}

Synchronizacja

Zazwyczaj jeżeli system robi jakieś cykliczne operacje np. wysyłanie maila do klienta z fakturą raz na miesiąc, raczej chcielibyśmy uniknąć wielokrotnego wysyłania tych samych rzeczy (chyba, że mieliśmy jakieś problemy infrastrukturalne i chcemy ponowić wysyłkę). Problem ten może powstać jeżeli nasza aplikacja jest uruchomiona na kilku instancjach. Na szczęście istnieją rozwiązania tego problemu. Możemy np. skorzystać z biblioteki ShedLock, która pozwala na synchronizację zadań. Kolejnym rozwiązaniem może być wydzielenie funkcjonalności odpowiedzialnych za harmonogram zadań i planowanie do osobnego mikroserwisu i upewnienie się, że w klastrze istnieje tylko jedna jego instancja.

Praktyczne przykłady

Weźmy na warsztat 2 przykłady.

Załóżmy, że mamy aplikację, która działa w modelu subskrypcyjnym (np. Spotify). Obciążamy użytkownika co miesiąc za oferowane usługi. Jak technicznie podejść do tego tematu? W końcu każdy użytkownik mógł wykupić abonament dowolnego dnia miesiąca, więc kolejne pobranie środków powinno nastąpić dokładnie za miesiąc. Rozwiązaniem może być przygotowanie joba, który codziennie będzie się "budził" i przedłużał subskrypcję wszystkim użytkownikom, którym usługa już wygasła uprzednio pobierając środki.

Innym pomysłem, którego osobiście nie testowałem mogłoby być skorzystanie z opóźniania wiadomości na kolejce używając np. RabbitMq delayed message exchange lub innego podobnego mechanizmu.

Kolejny przykład. Wyobraźmy sobie, że chcemy wysyłać powiadomienia sms/mail/push do wszystkich użytkowników w naszym systemie o godzinie 10 rano biorąc pod uwagę to, że użytkownicy mogą być w różnych strefach czasowych (więc nie chcielibyśmy budzić niektórych o 3 w nocy). Jak moglibyśmy podejść do tematu? Jednym z rozwiązań może być job uruchamiany co godzinę, który za każdym razem wysyłałby powiadomienie tylko do użytkowników, u których aktualnie jest godzina 10. Oczywiście pamiętając o przechowywaniu strefy czasowej użytkownika w bazie danych aby móc stwierdzić wspomniany warunek.

Testowanie

Załóżmy, że mamy implementację joba, który codziennie od poniedziałku do piątku o godzinie 12 wysyła maila.

@Component
public class MailSenderJob {

    private final MailSender mailSender;

    public MailSenderJob(MailSender mailSender) {
        this.mailSender = mailSender;
    }

    @Scheduled(cron = "0 0 12 * * MON-FRI", zone = "Europe/Warsaw")
    public void sendMail() {
        mailSender.sendMail("Hello world");
    }
}

W jaki sposób możemy przetestować taką implementację? Adnotacja @Scheduled działa tylko w porozumieniu z springiem, więc koniecznym będzie podniesienie kontekstu springowego w testach tj. przygotowanie testu integracyjnego.

Zobaczmy najpierw jak może wyglądać poprawna konfiguracja schedulera.

@Configuration
@EnableScheduling
public class SchedulerConfig {

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler(Clock clock) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5);
        taskScheduler.setClock(clock);
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        taskScheduler.setThreadNamePrefix("scheduling-");
        return taskScheduler;
    }

    @Profile("!it")
    @Bean
    public Clock clock() {
        return Clock.system(ZoneId.of("Europe/Warsaw"));
    }
}

Jak widać w powyższym przykładzie taskScheduler tworzony jest na podstawie wstrzykniętego obiektu klasy Clock. Dzięki temu w testach będziemy mogli w łatwy sposób podmienić tą implementację co pozwoli nam na przesuwanie się w czasie np. ustawiając czas kiedy job powinien wystartować, następnie wykonanie asercji czy faktycznie tak się stało. Odsyłam również do mojego poprzedniego artykułu gdzie także o tym pisałem.

Rozpocznijmy od stworzenia pomocnej implementacji Clock'a.

public class TestClock extends Clock {

    private final Clock baseClock;
    private Instant initializationTime;

    public TestClock(Instant fixed, ZoneId zone) {
        this(Clock.fixed(fixed, zone));
    }

    private TestClock(Clock baseClock) {
        this.baseClock = baseClock;
    }

    @Override
    public ZoneId getZone() {
        return baseClock.getZone();
    }

    @Override
    public Clock withZone(ZoneId zone) {
        return new TestClock(baseClock.withZone(zone));
    }

    @Override
    public long millis() {
        return instant().toEpochMilli();
    }

    @Override
    public Instant instant() {
        synchronized (this) {
            if (initializationTime == null) {
                initializationTime = Instant.now();
            }
        }
        long elapsedTime = ChronoUnit.MILLIS.between(initializationTime, Instant.now());
        return baseClock.instant().plusMillis(elapsedTime);
    }
}

Następnie przetestujmy naszą funkcjonalność.

@SpringBootTest
@ActiveProfiles("it")
class MailSenderJobTest {

    private static final Instant NOW = ZonedDateTime.parse("2023-04-12T11:59:57.000+02:00[Europe/Warsaw]").toInstant();

    @SpyBean
    private MailSender mailSender;

    @Test
    void shouldSendMailOnTime() {
        await().atMost(Duration.ofSeconds(5))
                .untilAsserted(() -> verify(mailSender).sendMail("Hello world"));
    }

    @TestConfiguration
    static class TestConfig {
        @Bean
        public Clock clock() {
            return new TestClock(NOW, ZoneId.of("Europe/Warsaw"));
        }
    }
}

Alternatywne podejście

Schedulery są silnym narzędziem, które pomagają w wykonywaniu cyklicznych operacji jednak rodzą też pewne problemy:

  • hardcodowanie konfiguracji
  • testowanie nie jest proste
  • zmiana cykliczności wykonywanego zadania
    może spowodować przebudowanie znacznej części aplikacji

Lepszym rozwiązaniem z punktu widzenia utrzymania aplikacji może się okazać utworzenie metody webowej, która będzie przygotowana na wywołanie cykliczne. Wtedy środowisko CI/CD mogłoby pełnić rolę inicjatora i wywoływać metodę cykliczną w odpowiednich momentach w zależności od konfiguracji.

Podsumowanie

Mam nadzieję, że w pewnym stopniu przybliżyłem Tobie jak w praktyce możemy wykorzystywać narzędzia dostarczone w ekosystemie Java & Spring związane z planowaniem, tak aby ułatwić sobie pracę a zarazem zrobić to poprawnie. Jeżeli masz pytania lub chciałbyś podzielić się czymś ciekawym zostaw komentarz. Po więcej informacji odsyłam do dokumentacji. Do następnego! 👋