Menü
Zurück zur Blog-Übersichtsseite
Eine von RxJS-Programmierung genervte Katze.

Erfolgreich mit RxJS Teil 2 – Ohne in die Tischkante zu beißen


Janis Krasemann

10 Minuten

Dies ist der zweite und finale Teil meiner kleinen Artikelserie zum sehr umfangreichen und komplexen Thema RxJs. In Teil 1 haben wir uns bereits die Grundlagen zu RxJs sowie die wichtigsten Must-Have Pipeable Operators angeschaut. Hier geht es jetzt weiter mit den Must-Know Pipeable Operators – also denen, die man als RxJS-Entwickler:in zwar kennen sollte, aber nicht unbedingt ohne in die Dokumentation zu gucken aus dem Ärmel schütteln können muss. Anschließend geht es weiter zu den Combination Operators, also den Operatoren, die aus dem gegebenen Input ein neues Observable erzeugen. Auch hier unterscheide ich dann wieder zwischen Must-Have und Must-Know.

Must-Know Pipeable Operators

Passenderweise fallen in diese Kategorie – so wie bei den Must-Have Operators – auch wieder vier Operatoren. Diese sind iif, take, takeUntil, und from.

iif

Für komplexere asynchrone Prozesse, in denen anhand des Ergebnisses eines Observables verschiedene Observables ausgeführt werden sollen, bietet sich oft iif an. Anhand einer angegebenen Bedingung wird hierbei entweder auf das erste oder das zweite angegebene Observable subscribed. Da hier wieder mit inneren Observables gearbeitet wird, taucht iif meistens in Kombination mit mergeMap auf.

Im Beispiel wird via iif das Ergebnis eines Requests zum Bezahlen der Bestellung geprüft und, je nach Ergebnis, eine Rechnung angefordert oder die Bestellung abgebrochen. Da das Codebeispiel hier ein wenig komplexer ist, habe ich darin mit Codekommentaren gearbeitet:

// veranlasst die Zahlung: const pay$ = this.httpClient.post(`${baseUrl}/order/${orderId}`); // fordert für ein angegebene Rechnungs-ID die gesamte Rechnung an: const fetchReceipt$ = (receiptId) => this.httpClient.get(`${baseUrl}/receipt/${receiptId}`); // bricht bei Bedarf eine Bestellung ab: const cancelOrder$ = this.httpClient.post(`${baseUrl}/order/${orderId}/cancel`); pay$ .pipe( mergeMap( // sobald die Bezahlung veranlasst wurde... (response) => iif( // ...prüfe ob die Bezahlung erfolreich war () => response.statusCode === 200 && !!response.data.receiptId, // wenn die Bezahlung erfolgreich war, lade die Rechnung anhand der Rechnungs-ID aus dem pay$-Request fetchReceipt$(response.data.receiptId).pipe(mapTo(true)), // wenn die Bezahlung fehlgeschlagen ist, breche den Bestellungsvorgang ab cancelOrder$.pipe(mapTo(false)) ) ) ) .subscribe(console.log);

Tipp 1: Es bietet sich oft an, innere Observables (hier fetchReceipt$ und cancelOrder$) als Konstanten oder Funktionen – falls sie einen Parameter aus dem Kontext des äußeren Observables benötigen – auszulagern um die Lesbarkeit zu erhöhen.

Tipp 2: Auch innere Observables können via pipe weiterverarbeitet werden, was bei der Verwendung von iif oft auch nötig ist weil sie komplett unterschiedliche Werte ausgeben. Hier habe ich die Ergebnisse der inneren Observables via mapTo auf einen boolschen Wert gemappt. Im subscribe könnte dann anhand des resultierenden success Parameters eine Erfolgs- oder Fehlernachricht angezeigt werden.

take / takeWhile

Wir haben mittlerweile gelernt, dass der Observer jedes mal benachrichtigt wird, wenn das beobachtete Observable einen neuen Wert ausgibt. Manchmal ist das nicht exakt das Verhalten, das benötigt wird. Wenn stattdessen nur die ersten x Werte benötigt werden, kann innerhalb der Pipe take(x) verwendet werden. Der take Operator kommt oft in der Variante take(1) vor (alternativ kann man hier auch first() verwenden, wobei hier die Semantik eine etwas andere ist).

Wenn das x für take(x) nicht bekannt ist bzw. von einer Bedingung abhängt, die sich dynamisch ändern kann, ist takeWhile die richtige Wahl. Es erwartet eine Funktion als Parameter, welche einen boolschen Wert zurückgibt. Nur wenn der Wert auf true evaluiert, wird der jeweilige Wert des Observables verwendet.

// Erzeugt ein Observable, das bei jedem Klick ein MouseEvent als Wert ausgibt: const clickEvent$ = fromEvent(document, 'click'); clickEvent.pipe(take(1)).subscribe(() => console.log('Erster Klick!')); clickEvent .pipe(takeWhile((evt) => evt.buttons === 1)) .subscribe(() => console.log('Linksklick!'));

Für diese beiden Operatoren existiert jeweils auch ein Gegenstück: skip und skipWhile. Via skip(x) können die ersten x Werte eines Observables ignoriert werden, während skipWhile eine Bedingung erfüllen muss, damit der emittierte Wert ignoriert werden kann.

takeUntil

Nachdem wir take kennengelernt haben, wenden wir uns nun dem etwas spezielleren takeUntil zu. Wenn dieser Operator Teil der Pipe ist, werden nur solange Werte emittiert, bis das als Parameter in takeUntil verwendete Observable einen Wert ausgibt. An sich sind die Use Cases für diesen Operator sehr speziell und würden daher hier normalerweise keine Erwähnung finden. Allerdings ist dieser Operator in Angular-Projekten mitunter sehr häufig im Einsatz, weswegen ich das Pattern hier kurz aufzeigen möchte (für Angular-Entwickler:innen ist takeUntil wahrscheinlich sogar ein Must-Have Operator).

Bei langlebigen Observables kann es nämlich vorkommen, dass die Subscriptions unerwünschterweise weiter laufen, obwohl die Komponente eigentlich gar nicht mehr angezeigt wird. Es existieren verschiedene Lösungsmöglichkeiten hierfür, ich möchte die meiner Meinung nach beste Variante via Subject und ngOnDestroy hier kurz zeigen. Dabei wird ein eigenes Subject erstellt, welches beim Zerstören der Komponente einen Wert ausgibt und somit alle Subscriptions beendet:

@Component({ selector: 'my-component', templateUrl: '<span>takeUntil Demo</span>', }) export class MyComponent implements OnDestroy, OnInit { private unsubscribe$ = new Subject<void>(); constructor(private route: ActivatedRoute) {} ngOnInit(): void { this.route.queryParams .pipe( takeUntil(this.unsubscribe$), ) .subscribe((queryParams) => { this.someExpensiveOperation(queryParams); }); } ngOnDestroy(): void { this.unsubscribe$.next(); this.unsubscribe$.complete(); } }

from

Sozusagen der große Bruder von of. Dieser Operator ist aber noch ein bisschen mächtiger, da hier auch Arrays oder Maps in Observables umgewandelt werden können. Es wird dann für jeden Wert in dem Array oder der Map ein Wert emittiert. Genau wie of ist from damit sehr nützlich beim Schreiben von Unit Tests, allerdings sind darüber hinaus auch noch weitere Anwendungsfälle denkbar:

// Array mit Produkt-IDs als Strings const productIds = this.warenkorb.getProducts(); from(productIds) .pipe( mergeMap((productId) => this.httpClient.get(`${baseUrl}/product/${productId}`) ), map((res) => res.data) ) .subscribe((product) => console.log(`Produkt ${product.description} erfolgreich geladen!`) );

Hier wird from verwendet, um für eine variable Menge von Produkten ihre jeweiligen Beschreibungen aus dem Backend zu laden. In dem Codebeispiel kommt außerdem wieder mergeMap zum Einsatz, da wir nicht mit dem Observable aus get weiterarbeiten wollen, sondern mit der tatsächlichen Response.

Man kann from außerdem verwenden, um ein Promise in ein Observable zu verwandeln. Das kann nützlich sein, wenn eine zu verwendende Bibliothek ihre API auf Promises aufgebaut hat, man selber aber Observables verwenden möchte.

Must-Have Creation Operators

Wenden wir uns nun den Creation Operatoren zu, also den Operatoren, die aus einem Input ein Observable erzeugen. Hier ist meiner Meinung nach forkJoin der wichtigste, außerdem fällt of in diese Kategorie.

forkJoin

In meiner täglichen Arbeit kommt mir forkJoin sehr regelmäßig unter die Augen. Es erwartet ein Array oder Dictionary von Observables und endet genau dann wenn alle übergebenen Observables einen Wert ausgegeben und completed haben. Die resultierenden Werte sind dann im subscribe Callback als Parameter (je nachdem entweder als Array oder als Dictionary) verfügbar.

forkJoin ist sehr nützlich, wenn mehrere Operationen parallel ablaufen sollen und mit der Verarbeitung der Ergebnisse gewartet werden muss, bis alle Operationen erfolgreich ausgeführt wurden:

const receiptId = 123; // Lädt die Bestellbestätigung bzw. einen Gutschein: const fetchConfirmation$ = this.httpClient.get( `${baseUrl}/order/confirmation/${receiptId}` ); const fetchVoucher$ = this.httpClient.get(`${baseUrl}/voucher/${receiptId}`); forkJoin([fetchConfirmation$, fetchVoucher$]).subscribe( ([confirmation, voucher]) => { console.log('Bestellbestätigung geladen: ' + confirmation); console.log('Gutschein geladen: ' + voucher); } );

Das obige Codebeispiel nimmt an, dass die receiptId bereits bekannt ist - im iif-Codebeispiel haben wir aber gesehen, dass die receiptId erst nach dem erfolgreichen Absenden der Zahlungsinformationen vorliegt. In der Kombination mit mergeMap können solche komplexeren Abläufe leicht umgesetzt werden:

// veranlasst die Zahlung: const pay$ = this.httpClient.post(`${baseUrl}/order/${orderId}`); // Lädt die Bestellbestätigung bzw. einen Gutschein: const fetchConfirmation$ = (receiptId) => this.httpClient.get(`${baseUrl}/order/confirmation/${receiptId}`); const fetchVoucher$ = (receiptId) => this.httpClient.get(`${baseUrl}/voucher/${receiptId}`); pay$ .pipe( mergeMap( // sobald die Bezahlung veranlasst wurde... (response) => iif( // ...prüfe ob die Bezahlung erfolreich war () => response.statusCode === 200 && !!response.data.receiptId, // wenn die Bezahlung erfolgreich war, lade die Rechnung UND den Gutschein anhand der Rechnungs-ID aus dem pay$-Request forkJoin({ confirmation: fetchConfirmation$, voucher: fetchVoucher$, }), // wenn die Bezahlung fehlgeschlagen ist, breche den Bestellungsvorgang ab cancelOrder$.pipe(throwError(() => of('Fehler beim Bestellvorgang'))) ) ) ) .subscribe(({ confirmation, voucher }) => { console.log('Bestellbestätigung geladen: ' + confirmation); console.log('Gutschein geladen: ' + voucher); });

Im ersten Codebeispiel wurde die Array-Variante von forkJoin eingesetzt, im zweiten Beispiel die Dictionary-Variante. Für eine überschaubare Anzahl auszuführender Observables ziehe ich die Array-Variante vor; wenn es mehr als eine handvoll Observables werden, die Dictionary-Variante. Letztere hat außerdem den Vorteil, dass bei komplexeren Observables leichter explizit die Typen der jeweiligen Parameter angegeben werden können (falls TypeScript eingesetzt wird).

of

Der zweite Creation Operator in dieser Kategorie ist of. Es kann benutzt werden, um ein Observable zu erstellen, welches den angegebenen Wert zurückgibt. Ich benutze of fast ausschließlich beim Schreiben von Tests oder um in einem API Service einen Endpunkt zu mocken welchen die Kolleg:innen im Backend gerade noch entwickeln. Das könnte dann wie folgt aussehen:

function finishOrder(shoppingCartId) { return of({ status: 200, data: { payment: { amount: 165.42 } } }).pipe( map((response) => response.data.payment) ); } // Verwendung: finishOrder(user.currentShoppingCartId).subscribe((payment) => { console.log( `Bestellung erfolgreich aufgegeben. Zu zahlen sind ${payment.amount}€` ); });

Wenn im Backend schließlich der Endpunkt implementiert ist, muss lediglich das of(...) ausgetauscht werden - der Aufruf von finishOrder ist nicht betroffen. Auch die Weiterverarbeitung der Response via map kann bestehen bleiben.

Must-Know Combination Operators

Es gibt eine Unmenge an Combination Operators in RxJs, bei denen man leicht die Übersicht verlieren kann. Viele von ihnen ähneln forkJoin, daher ist es meiner Meinung nach nützlich, wenn man diesen Operator gut kennt und alle anderen Combination Operators dann anhand ihrer Unterschiede zu forkJoin bewerten kann. Diese sind nur in Einzelfällen anwendbar, weswegen ich hier auch keine unrealistischen Beispiele konstruiere.

zip

Wie forkJoin, aber emittiert für jeden ausgegebenen Wert, nicht nur wenn die Observables complete sind. Außerdem wird immer solange gewartet bis jedes Observable einmal emittiert hat. Nützlich wenn die Werte der Observables immer nur paarweise (bzw. zu dritt, ...) weiterverarbeitet werden können.

merge

Wie forkJoin, aber emittiert jedes mal wenn eines der übergebenen Observables einen Wert ausgibt. Es werden also mehrere Observables in ein einzelnes Observable kombiniert, ohne die Ausgabe weiter zu modifizieren. Nützlich nur dann wenn alle Observables denselben Typen zurückgeben.

race / raceWith

Wählt aus den übergebenen Observables dasjenige aus, das als erstes einen Wert ausgibt, und emittiert dann nur noch die Werte aus diesem Observable. Alle anderen Observables werden nicht weiter berücksichtigt! Nützlich wenn verschiedene Strategien für eine aufwändige Berechnung/Operation vorliegen, die abhängig von bestimmten Einflüssen mal mehr, mal weniger schnell sind und deshalb die Ergebnisse für die schnellste Berechnung verwendet werden sollen. Ein anderer Anwendungsfall wäre, für eine potentiell sehr zeitaufwändige Operation eine Abbruchbedingung zu implementieren. Bisher heißt der Operator noch race, wird aber in RxJs v8 durch raceWith abgelöst.

Abschluss

Hiermit endet meine Mini-Serie zum Thema RxJs, hoffentlich konnte ich ein wenig Licht ins Dunkel bringen! Der wichtigste Operator in diesem Artikel war definitiv forkJoin. Falls ihr Teil 1 noch nicht gelesen habt, sei euch dieser Artikel noch wärmstens ans Herz gelegt.

Beim Programmieren oder Lesen von RxJs-Code kann man leicht mal den Überblick verlieren, insbesondere die schiere Menge an unterschiedlichen Operatoren kann einem leicht mal den Kopf schwirren lassen. Im Zweifel sollte man lieber nochmal einen Schritt zurück machen und einen Blick in diese Artikelserie oder in die verschiedenen verlinkten Ressourcen aus Teil 1 werfen.

Donnerstag, 27.10.2022

Diesen Artikel teilen über:




enpit GmbH & Co. KG

Marienplatz 11a
33098 Paderborn
+49 5251 2027791
Zum enpit-Newsletter anmelden
© 2024 – enpit GmbH & Co. KGDatenschutzerklärung | Impressum