Miesięcznik informatyków i menedżerów IT sektora publicznego

Paweł Ziółkowski

Podstawowe funkcje biblioteki RxJS

RxJS pozwala uporać się z asynchronicznymi zdarzeniami w kodzie przez pisanie kodu synchronicznego bez zagnieżdżeń powstałych np. przez wywołania zwrotne (callback hell). RxJS ułatwia także obsługę błędów lub implementację odpowiedniej logiki.

W poprzednim artykule („Technika programowania reaktywnego”, „IT w Administracji” 2019, nr 10) zrobiliśmy wstęp do techniki programowania reaktywnego oraz do biblioteki ReactiveX dla języka JavaScript (RxJS), w tym poznamy kolejne operatory i funkcje oraz przykłady ich zastosowań. Wykorzystamy najnowszą, szóstą wersję biblioteki, jej dokumentację oraz API można znaleźć na stronie itwa.pl/gp.

Operator merge

Aby poznać kolejne operatory oraz ich zasadę działania, posłużmy się stroną, na której chcemy zareagować na zdarzenia użytkownika, np. kliknięcie myszką. Reakcją na kliknięcie może być wysłanie żądania HTTP lub wyświetlenie komunikatu. Nasz testowy kod będziemy wykonywali na stronie stackblitz.com, która umożliwia pracę w wybranym środowisku i języku. Po wejściu na stronę klikamy w przycisk Start a new app, a następnie wybieramy typ RxJS TypeScript. Dzięki temu otrzymamy możliwość pracy z RxJS z wykorzystaniem języka TypeScript (więcej na jego temat w artykule „Framework Angular w nowej wersji”, „IT w Administracji” 2019, nr 3) bez konieczności stawiania własnego serwera. Po utworzeniu projektu, po lewej stronie widoku projektu, w sekcji Dependiences zauważymy, że została dodana wymagana biblioteka rxjs we wskazanej przez nas wersji. Automatycznie zostanie otworzony plik index.ts, w którym został wygenerowany przykładowy kod. Plik z kodem HTML znajduje się w pliku index.html. Po wprowadzeniu jakichkolwiek zmian w plikach projektu i ich zapisaniu wynik będzie widoczny w ostatnim panelu po prawej. 
W następnym kroku wykorzystamy kolejne przydatne narzędzie – stronę beeceptor.com, do której będziemy wysyłać żądania HTTP. Po wejściu na nią wpisujemy w polu Endpoint name dowolną nazwę projektu (np. itwarxjs). Po zatwierdzeniu nazwy otrzymamy adres, na który możemy wysłać żądania, w naszym przypadku jest to https://itwarxjs.free.beeceptor.com. Utwórzmy na stronie przycisk o id btn1 oraz sekcji div w pliku html:

<button id="btn1">Wyślij</button>
<div></div>

W pliku index.ts dodajmy kod:

import { fromEvent } from 'rxjs'; 
import { map } from 'rxjs/operators';
const testAPIuri = 'https://itwarxjs.free.beeceptor.com';
const div = document.querySelector('div');
const btn1 = document.querySelector('#btn1');
const btn1click$ = fromEvent(btn1, 'click');
btn1click$.pipe(
map(e => (fetch(testAPIuri)))
)
.subscribe(x => x.then(res =>div.innerHTML = res.status));

W pierwszych linijkach kodu importujemy funkcję from­Event oraz operator map. Główne narzędzia i funkcje z biblioteki importujemy z pakietu głównego rxjs, natomiast operatory z rxjs/operators. Następne linijki kodu to definiowanie stałych, takich jak adres serwera oraz elementy HTML ze strony. Do stałej btn1click$ przypisujemy strumień (Observable) utworzony dzięki operatorowi fromEvent. Strumień ten to wynik naciśnięcia przycisku o identyfikatorze btn1. Wywołuje on wysłanie żądania za pomocą metody fetch pod wskazany adres. Operacja wykonywana jest przez operator map. Na koniec dokonujemy subskrypcji na utworzonym strumieniu. Musimy tu wykorzystać metodę then obiektu Promise zwracanego przez fetch, w której dodajemy status żądania do elementu div na stronie. Wynikiem powinno być pojawienie się napisu '200' pod przyciskiem. 
Jeżeli po publikacji takiego kodu zdecydowalibyśmy się na zmianę działania programu polegającą na wysłaniu żądania nie tylko przez przycisk, ale też wciśnięcie klawisza na klawiaturze, to jednym z wyjść byłoby powtórzenie kodu dla nowego zdarzenia. RxJS ułatwia nam jednak pracę i udostępnia odpowiednie operatory. Dodanie poniższego fragmentu kodu do poprzedniego przykładu pokazuje, jak uporać się ze wspomnianą zmianą:

import { map, tap, filter, merge } from 'rxjs/operators';
const keyUp$ = fromEvent(document, 'keyup');
btn1click$.pipe(
merge(keyUp$.pipe(filter(k => k.key === 'j'))),
map(e => (fetch(testAPIuri)))
)
.subscribe(x => x.then(res =>div.innerHTML = res.status));

Program będzie teraz komunikował się z serwerem po kliknięciu przycisku lub po naciśnięcia klawisza [j]. Na początku dodaliśmy do importu dodatkowe operatory filter oraz merge. Do stałej keyUp$ zapisaliśmy Observable tworzone ze zdarzenia keyup. Następnie na strumieniu kliknięcia przycisku dodaliśmy operator merge. Jego zadaniem jest połączyć kilka strumieni w jeden, przez co zarówno kliknięcie, jak i naciśnięcie [j] będzie przekazane w dół do map i subscribe. Oba te zdarzenia wywołają metodę fetch. Merge jednocześnie nasłuchuje obydwóch zdarzeń (btn1click$ i keyUp$) i każde z nich przekazuje od razu do obserwatora (subscribe).
Niektóre operatory i funkcje w RxJS obecnie mogą być wywoływane na instancji, obiekcie strumienia za pomocą operatora kropki lub statycznie, czyli bez takiego odwołania. W kodzie użyliśmy metodę instancji pipe, a statycznie merge i map. Dokumentacja merge informuje nas, że postać instancyjna oznaczona jest jako deprecated, co oznacza, że w kolejnych wydaniach biblioteki już nie będzie ona istniała i nie zaleca się jej używać. Zamiast tego możemy użyć jej statyczną postać, ale znajduje się ona w innym pakiecie (wskazany w dokumentacji). Przy imporcie należy usunąć w naszym kodzie merge z importu z pakietu rxjs/operators i dodać do rxjs. W poniższym fragmencie kodu wykorzystujemy poprawną, statyczną wersję merge. Jej argumentami są nasze dwa strumienie, które chcemy połączyć w jeden:

import { fromEvent, merge } from 'rxjs';
merge(btn1click$, keyUp$)
.pipe(
map(e => (fetch(testAPIuri)))
) // koniec pipe
.subscribe(x => x.then(res => div.innerHTML = res.status));

Zarówno w pierwotnym kodzie, jak i tym po zmianach zastosowaliśmy funkcję pipe. Służy ona do grupowania operatorów i funkcji. W poprzednich wersjach RxJS mogliśmy wywoływać operatory na instancjach strumieni, ale obecnie należy je „zapakować” w funkcję pipe, w której po przecinku możemy przekazać inne funkcje lub operatory. W ostatnim fragmencie kodu do pipe przekazaliśmy tylko jeden operator – map. 

Operator concat

Podczas pracy z Observable (strumieniami) możemy poznać ich kilka stanów po dokonaniu subskrypcji. Obiekt obserwujący strumień (Observer, obserwator) reaguje na te stany przez wywołanie jednej z trzech funkcji: next, error lub complete. Funkcje te przekazywane są metodzie subscribe. W naszych przykładach implementowaliśmy tylko funkcję next, która była wywoływana za każdym razem, gdy pojawiło się nowe zdarzenie kliknięcia lub naciśnięcia klawisza. Zdarzenia z interfejsu użytkownika nie wywołują same funkcje complete, ponieważ nie zostają one zakończone, klikać w przycisk możemy bez końca. Mamy jednak możliwość posłużenia się operatorami, aby wymusić uruchomienie funkcji complete. W następnym przykładzie wzbogacimy metodę subscribe o argumenty pozwalające reagować na stany obserwowanych strumieni.

import { fromEvent, merge } from 'rxjs'; 
import { take } from 'rxjs/operators';
const div = document.querySelector('div');
const btn1 = document.querySelector('#btn1');
const btn1click$ = \
fromEvent(btn1, 'click').pipe(take(3));
const keyUp$ = \
fromEvent(document, 'keyup').pipe(take(3));
merge(btn1click$, keyUp$)
.subscribe(
x => div.innerHTML += `<br>${x.type}`,
e => console.log(`Error: ${e}`),
() => console.log('done'));

Ze zdarzeń interfejsu bierzemy tylko trzy pierwsze, a następnie łączymy strumienie operatorem merge. Zdarzenie, które zostanie wywołane jako pierwsze, dojdzie do metody subscribe i wywołany zostanie jej pierwszy argument next, czyli dołączenie stringa z typem zdarzenia (click lub keyup) do elementu div. Każde źródło zostanie przepuszczone tylko po trzy razy. Dopiero po sześciu zdarzeniach zostanie wywołana funkcja complete (ostatni argument subscribe). Wynik na stronie uzyskamy, klikając na przemian przycisk i naciskając klawisz. Dodatkowo w konsoli zobaczymy napis „done”, który został przekazany z funkcji complete po zakończeniu wszystkich zdarzeń. Operator merge stworzył jeden strumień z dwóch i przekazał go do subskrypcji (obserwatorowi). Została tu zachowana kolejność pierwotnych strumieni – ten, który został wywołany, pojawiał się od razu na wyjściu. 
Trochę odmienne działanie ma kolejny operator – concat. W jego przypadku, w odróżnieniu od merge, zdarzenia z każdego strumienia nie są od razu przekazywane do wyjścia, ale zachowany jest porządek, kolejka. W naszym przykładzie najpierw operator przepuściłby trzy zdarzenia click, a dopiero potem trzy keyup. Naciśnięcia klawisza przed trzema kliknięciami przycisku nie uruchomi funkcji next i te zdarzenia przepadną. Dopiero kolejne, po trzech kliknięciach, zostaną przekazane obserwatorowi. W merge nasłuchiwane były oba zdarzenia jednocześnie i każde z nich, zaraz po pojawieniu się, przekazane było do subskrypcji. Z kolei concat nasłuchuje najpierw pierwszy strumień (btn1click$) aż do momentu zakończenia nadawania (po trzech zdarzeniach dzięki operatorowi take), przestaje nasłuchiwać i przełącza się do następnego strumienia (keyUp$). Na koniec uruchamia jeden raz funkcję complete obserwatora. 
Poniższy kod (z pominięciem importów) tworzy dwa Observable – jeden licznik, który emituje jedną liczbę całkowitą co 1000 ms (1 sekundę), a drugi to znane nam już kliknięcie przycisku. Dodatkowo do każdego z nich zostanie dodany znacznik czasu przez funkcję timestamp z pakietu rxjs/operators: 

const licznik1$ = interval(1000).pipe(map(i => \
`Licznik 1 – ${i}`), timestamp(), take(2));
const btn1click$ = fromEvent(btn1, \
'click').pipe(timestamp(), take(3));
concat(licznik1$, btn1click$)
.subscribe(
x => div.innerHTML += `<br>${x.value}, \
czas ${x.timestamp}`,
e => console.log(`Error: ${e}`),
() => console.log('done'));

Strumień licznik1$ zacznie emitować pierwszą cyfrę po jednej sekundzie od załadowania strony. Jeżeli w tym czasie zostanie kliknięty przycisk, to nie pojawi się o tym informacja. Nawet jeśli licznik1$ skończy emitować liczby, to nie pojawią się wcześniejsze zdarzenia z btn1click$, co możemy zaobserwować przez pole timestamp przy każdym zdarzeniu. Podmienienie w kodzie operatora concat na merge będzie skutkowało tym, że od razu zobaczymy wywoływany strumień. 

[...]

Autor jest absolwentem Uniwersytetu Ekonomicznego we Wrocławiu, pracuje jako informatyk w Urzędzie Gminy w Miękini.

Pełna treść artykułu jest dostępna w papierowym wydaniu pisma. Zapraszamy do składania zamówień na prenumeratę i numery archiwalne.
 
 

Polecamy

Biblioteka Informacja Publiczna

Specjalistyczne publikacje książkowe dla pracowników administracji publicznej

więcej