Erfolgreich mit RxJS Teil 2 – Ohne in die Tischkante zu beißen
Janis Krasemann
10 Minuten
Dies ist der zweite und finale Teil meiner kleinen Artikelserie zum sehr umfangreichen und komplexen Thema RxJs. In Teil 1 haben wir uns bereits die Grundlagen zu RxJs sowie die wichtigsten Must-Have Pipeable Operators angeschaut. Hier geht es jetzt weiter mit den Must-Know Pipeable Operators – also denen, die man als RxJS-Entwickler:in zwar kennen sollte, aber nicht unbedingt ohne in die Dokumentation zu gucken aus dem Ärmel schütteln können muss. Anschließend geht es weiter zu den Combination Operators, also den Operatoren, die aus dem gegebenen Input ein neues Observable erzeugen. Auch hier unterscheide ich dann wieder zwischen Must-Have und Must-Know.
Must-Know Pipeable Operators
Passenderweise fallen in diese Kategorie – so wie bei den Must-Have Operators – auch wieder vier Operatoren. Diese sind iif
, take
, takeUntil
, und from
.
iif
Für komplexere asynchrone Prozesse, in denen anhand des Ergebnisses eines Observables verschiedene Observables ausgeführt werden sollen, bietet sich oft iif
an. Anhand einer angegebenen Bedingung wird hierbei entweder auf das erste oder das zweite angegebene Observable subscribed. Da hier wieder mit inneren Observables gearbeitet wird, taucht iif
meistens in Kombination mit mergeMap
auf.
Im Beispiel wird via iif
das Ergebnis eines Requests zum Bezahlen der Bestellung geprüft und, je nach Ergebnis, eine Rechnung angefordert oder die Bestellung abgebrochen. Da das Codebeispiel hier ein wenig komplexer ist, habe ich darin mit Codekommentaren gearbeitet:
// veranlasst die Zahlung:
const pay$ = this.httpClient.post(`${baseUrl}/order/${orderId}`);
// fordert für ein angegebene Rechnungs-ID die gesamte Rechnung an:
const fetchReceipt$ = (receiptId) =>
this.httpClient.get(`${baseUrl}/receipt/${receiptId}`);
// bricht bei Bedarf eine Bestellung ab:
const cancelOrder$ = this.httpClient.post(`${baseUrl}/order/${orderId}/cancel`);
pay$
.pipe(
mergeMap(
// sobald die Bezahlung veranlasst wurde...
(response) =>
iif(
// ...prüfe ob die Bezahlung erfolreich war
() => response.statusCode === 200 && !!response.data.receiptId,
// wenn die Bezahlung erfolgreich war, lade die Rechnung anhand der Rechnungs-ID aus dem pay$-Request
fetchReceipt$(response.data.receiptId).pipe(mapTo(true)),
// wenn die Bezahlung fehlgeschlagen ist, breche den Bestellungsvorgang ab
cancelOrder$.pipe(mapTo(false))
)
)
)
.subscribe(console.log);
Tipp 1: Es bietet sich oft an, innere Observables (hier fetchReceipt$
und cancelOrder$
) als Konstanten oder Funktionen – falls sie einen Parameter aus dem Kontext des äußeren Observables benötigen – auszulagern um die Lesbarkeit zu erhöhen.
Tipp 2: Auch innere Observables können via pipe
weiterverarbeitet werden, was bei der Verwendung von iif
oft auch nötig ist weil sie komplett unterschiedliche Werte ausgeben. Hier habe ich die Ergebnisse der inneren Observables via mapTo
auf einen boolschen Wert gemappt. Im subscribe
könnte dann anhand des resultierenden success
Parameters eine Erfolgs- oder Fehlernachricht angezeigt werden.
take / takeWhile
Wir haben mittlerweile gelernt, dass der Observer jedes mal benachrichtigt wird, wenn das beobachtete Observable einen neuen Wert ausgibt. Manchmal ist das nicht exakt das Verhalten, das benötigt wird. Wenn stattdessen nur die ersten x
Werte benötigt werden, kann innerhalb der Pipe take(x)
verwendet werden. Der take
Operator kommt oft in der Variante take(1)
vor (alternativ kann man hier auch first()
verwenden, wobei hier die Semantik eine etwas andere ist).
Wenn das x
für take(x)
nicht bekannt ist bzw. von einer Bedingung abhängt, die sich dynamisch ändern kann, ist takeWhile
die richtige Wahl. Es erwartet eine Funktion als Parameter, welche einen boolschen Wert zurückgibt. Nur wenn der Wert auf true
evaluiert, wird der jeweilige Wert des Observables verwendet.
// Erzeugt ein Observable, das bei jedem Klick ein MouseEvent als Wert ausgibt:
const clickEvent$ = fromEvent(document, 'click');
clickEvent.pipe(take(1)).subscribe(() => console.log('Erster Klick!'));
clickEvent
.pipe(takeWhile((evt) => evt.buttons === 1))
.subscribe(() => console.log('Linksklick!'));
Für diese beiden Operatoren existiert jeweils auch ein Gegenstück: skip
und skipWhile
. Via skip(x)
können die ersten x
Werte eines Observables ignoriert werden, während skipWhile
eine Bedingung erfüllen muss, damit der emittierte Wert ignoriert werden kann.
takeUntil
Nachdem wir take
kennengelernt haben, wenden wir uns nun dem etwas spezielleren takeUntil
zu. Wenn dieser Operator Teil der Pipe ist, werden nur solange Werte emittiert, bis das als Parameter in takeUntil
verwendete Observable einen Wert ausgibt. An sich sind die Use Cases für diesen Operator sehr speziell und würden daher hier normalerweise keine Erwähnung finden. Allerdings ist dieser Operator in Angular-Projekten mitunter sehr häufig im Einsatz, weswegen ich das Pattern hier kurz aufzeigen möchte (für Angular-Entwickler:innen ist takeUntil
wahrscheinlich sogar ein Must-Have Operator).
Bei langlebigen Observables kann es nämlich vorkommen, dass die Subscriptions unerwünschterweise weiter laufen, obwohl die Komponente eigentlich gar nicht mehr angezeigt wird. Es existieren verschiedene Lösungsmöglichkeiten hierfür, ich möchte die meiner Meinung nach beste Variante via Subject
und ngOnDestroy
hier kurz zeigen. Dabei wird ein eigenes Subject
erstellt, welches beim Zerstören der Komponente einen Wert ausgibt und somit alle Subscriptions beendet:
@Component({
selector: 'my-component',
templateUrl: '<span>takeUntil Demo</span>',
})
export class MyComponent implements OnDestroy, OnInit {
private unsubscribe$ = new Subject<void>();
constructor(private route: ActivatedRoute) {}
ngOnInit(): void {
this.route.queryParams
.pipe(
takeUntil(this.unsubscribe$),
)
.subscribe((queryParams) => {
this.someExpensiveOperation(queryParams);
});
}
ngOnDestroy(): void {
this.unsubscribe$.next();
this.unsubscribe$.complete();
}
}
from
Sozusagen der große Bruder von of
. Dieser Operator ist aber noch ein bisschen mächtiger, da hier auch Arrays oder Maps in Observables umgewandelt werden können. Es wird dann für jeden Wert in dem Array oder der Map ein Wert emittiert. Genau wie of
ist from
damit sehr nützlich beim Schreiben von Unit Tests, allerdings sind darüber hinaus auch noch weitere Anwendungsfälle denkbar:
// Array mit Produkt-IDs als Strings
const productIds = this.warenkorb.getProducts();
from(productIds)
.pipe(
mergeMap((productId) =>
this.httpClient.get(`${baseUrl}/product/${productId}`)
),
map((res) => res.data)
)
.subscribe((product) =>
console.log(`Produkt ${product.description} erfolgreich geladen!`)
);
Hier wird from
verwendet, um für eine variable Menge von Produkten ihre jeweiligen Beschreibungen aus dem Backend zu laden. In dem Codebeispiel kommt außerdem wieder mergeMap
zum Einsatz, da wir nicht mit dem Observable aus get
weiterarbeiten wollen, sondern mit der tatsächlichen Response.
Man kann from
außerdem verwenden, um ein Promise in ein Observable zu verwandeln. Das kann nützlich sein, wenn eine zu verwendende Bibliothek ihre API auf Promises aufgebaut hat, man selber aber Observables verwenden möchte.
Must-Have Creation Operators
Wenden wir uns nun den Creation Operatoren zu, also den Operatoren, die aus einem Input ein Observable erzeugen. Hier ist meiner Meinung nach forkJoin
der wichtigste, außerdem fällt of
in diese Kategorie.
forkJoin
In meiner täglichen Arbeit kommt mir forkJoin
sehr regelmäßig unter die Augen. Es erwartet ein Array oder Dictionary von Observables und endet genau dann wenn alle übergebenen Observables einen Wert ausgegeben und completed haben. Die resultierenden Werte sind dann im subscribe
Callback als Parameter (je nachdem entweder als Array oder als Dictionary) verfügbar.
forkJoin
ist sehr nützlich, wenn mehrere Operationen parallel ablaufen sollen und mit der Verarbeitung der Ergebnisse gewartet werden muss, bis alle Operationen erfolgreich ausgeführt wurden:
const receiptId = 123;
// Lädt die Bestellbestätigung bzw. einen Gutschein:
const fetchConfirmation$ = this.httpClient.get(
`${baseUrl}/order/confirmation/${receiptId}`
);
const fetchVoucher$ = this.httpClient.get(`${baseUrl}/voucher/${receiptId}`);
forkJoin([fetchConfirmation$, fetchVoucher$]).subscribe(
([confirmation, voucher]) => {
console.log('Bestellbestätigung geladen: ' + confirmation);
console.log('Gutschein geladen: ' + voucher);
}
);
Das obige Codebeispiel nimmt an, dass die receiptId
bereits bekannt ist - im iif
-Codebeispiel haben wir aber gesehen, dass die receiptId
erst nach dem erfolgreichen Absenden der Zahlungsinformationen vorliegt. In der Kombination mit mergeMap
können solche komplexeren Abläufe leicht umgesetzt werden:
// veranlasst die Zahlung:
const pay$ = this.httpClient.post(`${baseUrl}/order/${orderId}`);
// Lädt die Bestellbestätigung bzw. einen Gutschein:
const fetchConfirmation$ = (receiptId) =>
this.httpClient.get(`${baseUrl}/order/confirmation/${receiptId}`);
const fetchVoucher$ = (receiptId) =>
this.httpClient.get(`${baseUrl}/voucher/${receiptId}`);
pay$
.pipe(
mergeMap(
// sobald die Bezahlung veranlasst wurde...
(response) =>
iif(
// ...prüfe ob die Bezahlung erfolreich war
() => response.statusCode === 200 && !!response.data.receiptId,
// wenn die Bezahlung erfolgreich war, lade die Rechnung UND den Gutschein anhand der Rechnungs-ID aus dem pay$-Request
forkJoin({
confirmation: fetchConfirmation$,
voucher: fetchVoucher$,
}),
// wenn die Bezahlung fehlgeschlagen ist, breche den Bestellungsvorgang ab
cancelOrder$.pipe(throwError(() => of('Fehler beim Bestellvorgang')))
)
)
)
.subscribe(({ confirmation, voucher }) => {
console.log('Bestellbestätigung geladen: ' + confirmation);
console.log('Gutschein geladen: ' + voucher);
});
Im ersten Codebeispiel wurde die Array-Variante von forkJoin
eingesetzt, im zweiten Beispiel die Dictionary-Variante. Für eine überschaubare Anzahl auszuführender Observables ziehe ich die Array-Variante vor; wenn es mehr als eine handvoll Observables werden, die Dictionary-Variante. Letztere hat außerdem den Vorteil, dass bei komplexeren Observables leichter explizit die Typen der jeweiligen Parameter angegeben werden können (falls TypeScript eingesetzt wird).
of
Der zweite Creation Operator in dieser Kategorie ist of
. Es kann benutzt werden, um ein Observable zu erstellen, welches den angegebenen Wert zurückgibt. Ich benutze of
fast ausschließlich beim Schreiben von Tests oder um in einem API Service einen Endpunkt zu mocken welchen die Kolleg:innen im Backend gerade noch entwickeln. Das könnte dann wie folgt aussehen:
function finishOrder(shoppingCartId) {
return of({ status: 200, data: { payment: { amount: 165.42 } } }).pipe(
map((response) => response.data.payment)
);
}
// Verwendung:
finishOrder(user.currentShoppingCartId).subscribe((payment) => {
console.log(
`Bestellung erfolgreich aufgegeben. Zu zahlen sind ${payment.amount}€`
);
});
Wenn im Backend schließlich der Endpunkt implementiert ist, muss lediglich das of(...)
ausgetauscht werden - der Aufruf von finishOrder
ist nicht betroffen. Auch die Weiterverarbeitung der Response via map
kann bestehen bleiben.
Must-Know Combination Operators
Es gibt eine Unmenge an Combination Operators in RxJs, bei denen man leicht die Übersicht verlieren kann. Viele von ihnen ähneln forkJoin
, daher ist es meiner Meinung nach nützlich, wenn man diesen Operator gut kennt und alle anderen Combination Operators dann anhand ihrer Unterschiede zu forkJoin
bewerten kann. Diese sind nur in Einzelfällen anwendbar, weswegen ich hier auch keine unrealistischen Beispiele konstruiere.
zip
Wie forkJoin
, aber emittiert für jeden ausgegebenen Wert, nicht nur wenn die Observables complete sind. Außerdem wird immer solange gewartet bis jedes Observable einmal emittiert hat. Nützlich wenn die Werte der Observables immer nur paarweise (bzw. zu dritt, ...) weiterverarbeitet werden können.
merge
Wie forkJoin
, aber emittiert jedes mal wenn eines der übergebenen Observables einen Wert ausgibt. Es werden also mehrere Observables in ein einzelnes Observable kombiniert, ohne die Ausgabe weiter zu modifizieren. Nützlich nur dann wenn alle Observables denselben Typen zurückgeben.
race / raceWith
Wählt aus den übergebenen Observables dasjenige aus, das als erstes einen Wert ausgibt, und emittiert dann nur noch die Werte aus diesem Observable. Alle anderen Observables werden nicht weiter berücksichtigt! Nützlich wenn verschiedene Strategien für eine aufwändige Berechnung/Operation vorliegen, die abhängig von bestimmten Einflüssen mal mehr, mal weniger schnell sind und deshalb die Ergebnisse für die schnellste Berechnung verwendet werden sollen. Ein anderer Anwendungsfall wäre, für eine potentiell sehr zeitaufwändige Operation eine Abbruchbedingung zu implementieren. Bisher heißt der Operator noch race
, wird aber in RxJs v8 durch raceWith
abgelöst.
Abschluss
Hiermit endet meine Mini-Serie zum Thema RxJs, hoffentlich konnte ich ein wenig Licht ins Dunkel bringen! Der wichtigste Operator in diesem Artikel war definitiv forkJoin
. Falls ihr Teil 1 noch nicht gelesen habt, sei euch dieser Artikel noch wärmstens ans Herz gelegt.
Beim Programmieren oder Lesen von RxJs-Code kann man leicht mal den Überblick verlieren, insbesondere die schiere Menge an unterschiedlichen Operatoren kann einem leicht mal den Kopf schwirren lassen. Im Zweifel sollte man lieber nochmal einen Schritt zurück machen und einen Blick in diese Artikelserie oder in die verschiedenen verlinkten Ressourcen aus Teil 1 werfen.
Diesen Artikel teilen über: