Reactive Pattern tot RxJS: Observable Pattern

In dit artikel leg ik uit hoe het Observable Pattern kan worden geïmplementeerd. Links is het diagram van het Observable Pattern. Er zijn een paar problemen die bij dit pattern spelen, en die in dit artikel worden opgelost, overigens geheel volgens standaard Angular/RxJS wijze.

Het belangrijkste probleem is dat Subject op dezelfde interface de mogelijkheid biedt om data te observeren, maar tegelijkertijd diezelfde data te zenden. Overal binnen de applicatie ontstaat hierdoor directe toegang tot het Subject. Om dit probleem op te heffen willen we graag een onderscheid maken tussen de mogelijkheid om je aan te melden (subscribe) en de mogelijkheid om te observeren (observe).

Laten we om te beginnen eerst een en ander hernoemen zodat het overeen gaat komen met RxJS, waar we uiteindelijk zullen uitkomen.

We hebben de Observer interface, die heeft in RxJS enkele methoden meer (error-afhandeling etc), maar daar kom ik later op terug. De belangrijkste nu is “next”

export interface Observer {
    next(data:any);
}

We hebben de Subject interface, die heeft ook een “next” functie. De Subject is een uitbreiding op Observer en op Observable, deze relatie geven we expliciet aan.

interface Subject extends Observer, Observable{
}

En we hebben een geëxporteerde interface Observable

export interface Observable {
  subscribe(obs:Observer);
  unsubscribe(obs:Observer);
}

We gaan nu de applicatie bouwen rondom deze drie interfaces. Het doel dient te worden dat we beter kunnen uitschalen in complexiteit, we zullen timing-problemen voorkomen (volgorde van creëren van objecten). Ik herinner graag aan de uitspraak van Venkat Subramaniam: “Do not walk away from Complexity, Run!!!”

We beginnen met een implementatie van Subject, want tot nu toe hebben we alleen maar interfaces. De Subject is i feite een event-bus, en dat is hoe we deze implementeren. Maar we houden Subject private in de applicatie. Dit is heel belangrijk in de Observable pattern.

import * as _ from 'lodash';
class SubjectImplementation implements Subject {

  private observers: Observer[] = [];

  next(data: any) {
    this.observers.forEach(obs => obs.next(data));
  }

  subscribe(obs: Observer) {
    this.observers.push(obs);
  }

  unsubscribe(obs: Observer) {
    _.remove(this.observers, el => el === obs);
  }
}

We hebben een array waarin de Observers worden opgeslagen, we hebben een subscribe, en unsubscribe, ze zullen voor zichzelf spreken, (de “remove” functie komt uit de “lodash” library, zie import-statement).

We maken van de data een Observable, voor dit moment een variabele. De dollar achter de naam geeft aan dat het een stream betreft. Andere delen van de applicatie kunnen nu “subscribe” op deze data.

export let booksList$: Observable;

 

Laten we nu kijken hoe Componenten hier mee om moeten gaan. Het component moet de Observer interface implementeren, en daarom ook de “next” methode implementeren. Op deze manier zullen meerdere Componenten op de data subscriben als Observer. Hieronder een component dat het aantal items op de lijst bijhoudt. We zien nu al voordelen van deze code. Het maakt niet meer uit op welke volgorde de Componenten onderling worden gecreëerd, omdat ze niet langer de eigenaar zijn van de data, maar deze observeren.

export class BooksCounterComponent implements Observer, OnInit {

    booksCounter = 0;

    ngOninit() {
        booksList$.subscribe(this);
    }

    next(data: Book[]) {
        console.log('counter component received data ..');
        this.booksCounter = data.length;
    }
}

Een volgend probleem komt op. We hebben ook een Lijst-component die een array van de data houdt. Dit component kan ook data verwijderen. We moeten ervoor zorgen dat deze Lijst dat niet zelf gaat doen (en dan owner wordt van data of van een kopie ervan), maar dat dit wordt uitbesteed aan de centrale data-owner, de Observable booksList$. We doen dit (tijdelijk) op volgende wijze, we voegen een array toe om data in op te slaan

export let booksList$: Observable;
let books : Book[] = [];

export function initializeBooksList(newList: Book[]){
  books = _.cloneDeep(newList);
}

Dit zal one initiële lijst worden. Deze array is private voor dit bestand. We bieden de gelegenheid om de boekenlijst voor de eerste keer te initialiseren. Om te voorkomen dat ergens een component door gebruik van deze functie de owner wordt van de lijst, gebruiken we cloneDeep van lodash. Ergens kan er nu een Component of service bestaan die de lijst bij opstart initialiseert, zodat de interne books-array is geïnitialiseerd.

We hebben nu componenten en services die niet weten waarvan de data komen. Komen ze uit een back-end, uit een netwerk, of gewoon een lokale source? Componenten weten het niet, en dit maakt dat de data losgekoppeld zijn van de applicatie, en dus wordt de applicatie, bijvoorbeeld, makkelijk om te testen.

Echter, er is nog een probleem. De booksList$ is niet geinitialiseerd, dus die kan nog niet zomaar worden aangeroepen. We gaan dat oplossen. We creeren een SubjectImplementation-object en gaan dat gebruiken binnen de booksList$. We krijgen nu dit:

const booksListSubject = new SubjectImplementation();

export let booksList$: Observable = {
  subscribe: obs => { 
     booksListSubject.subscribe(obs);
     obs.next();
  }
  unsubscribe: obs => booksListSubject.unsubscribe(obs)
}

let books : Book[];

export function initializeBooksList(newList: Book[]){
  books = _.cloneDeep(newList);
  booksListSubject.next(books);
}

Zo, we hebben nu alles met elkaar verbonden. De book-data zijn leeg, maar die kunenn door een externe routine met data gevuld worden. Alle componenten kunnen op de lijst inschrijven, en beschikken dan over de data zonder eigenaar van deze data te worden. Maar er is nog een probleem. Alleen componenten die zijn ingetekend krijgen data-updates, en omdat ze dat een voor een doen, tonen diegenen die nog niet zijn ingetekend geen data. Er is dus nog steeds een timing-probleem, de ene componentn wordt voor de andere gecreeerd en ge”subscribed”. Dit probleem is gekoppeld aan het soort Subject wordt gebruikt.

We gaan een nieuw pattern gebruiken, dat eigenlijk al in ontwikkeling is.

Zie regel 5 en 6, in plaats van alleen te subscriben naar de booksListSubject, we gaan gelijk (op regel 6) een callback naar de server doen om de value die we hebben door te geven. Dus “vroege” subscribers ontvangen een lege array, late subscribers ontvangen de gevulde array.

Dus hoever zijn we nu?

  • Meerdere delen van het programma kunnen intekenen op de data, en krijgen notificaties als er data inkomen. We hoeven niet meer na te denken over de volgorde van creëren.
  • Data zijn niet meer in eigendom van de componenten, de componenten observeren de data en updaten zichzelf als de data updaten.
  • Data kunnen niet meer direct worden gemanipuleerd door de componenten.

We gaan over naar een nieuw pattern, het is een bekend pattern. We hebben nu te maken met data en functies die nauw met elkaar zijn verbonden. We kunnen ze net zo goed in een gezamenlijke class zetten. Dat geeft dan goed weer wat we hier doen.

We gaan een nieuwe gecentraliseerde service inrichten, en we noemen deze service “store”.

Laat ons een kijken wat er in de store moet.

class DataStore {
  private books : Book[];
  private booksListSubject = new SubjectImplementation();

  public booksList$: Observable = {
    subscribe: obs => {
      this.booksListSubject.subscribe(obs),
        obs.next(this.books);
    },
    unsubscribe: obs => this.booksListSubject.unsubscribe(obs)
  }

  initializeBooksList(newList: Book[]){
    this.books = _.cloneDeep(newList);
  }
}

We hebben in de store, volgende zaken:

  • De initializer-functie, die houden we public.
  • De data, in de vorm van de books-array
  • De subject-implementatie, die is ook private.
  • De bookList$ Observable, die is public. Hoewel in typescript public default is, mag het ook expliciet worden aangegeven.

En dan creeren we een constant die de DataStore initialiseert en bekend maakt

export const store = new DataStore();

Nu hebben we een enkele store die beschikbaar is voor de hele applicatie. De componenten hebben nu niet meer de booksList$ beschikbaar, maar moeten het doen via de

store.booksList$.subscribe(this);

Zo, nu zijn we weer op precies hetzelfde punt als voordat we een store hadden. Wel valt op dat we nu twee pattern gebruiken. We hebben de Observable pattern die het mogelijk maakt om te subsciben, en functionaliteit om een nieuwe set data te initialiseren. Nog steeds hebben we bereikt dat slechts een deel van de applicatie eigenaar is van de data, de rest observeert alleen maar, en vraagt de eigenaar om wijzigingen.

Nu gaan we kijken hoe componenten dat moeten doen. Bijvoorbeeld nieuwe data toevoegen aan de store:

We richten een addBook functie in, in de store, zo blijft alles bij elkaar, en die addBook functie wordt aangeroepen door de componenten die een boek willen toevoegen

store.addBook(newBook);
addBook(newBook: Book) {
    this.books.push(_.cloneDeep(newBook));
    this.booksListSubject.next(this.books);
}

De betreffende functie is erg eenvoudig. Een kopie van het boek wordt op de data-array gezet. Het is een kopie zodat de store decoupled blijft van het component. Denk aan “lodash” bij de cloneDeep functie. Vervolgens worden alle Observers van de Observable genotificeerd dat de dataset is veranderd. Wat ze ermee doen is hun zaak. Het Count-component telt het aantal boeken, het ListComponent zal zijn lijst opnieuw inrichten.

Toch is het nog niet perfect. We hebben nog steeds een interne private lijst van data nodig, en modificaties worden een op een door gegeven naar de store. Nog steeds zijn er interacties tussen componenten mogelijk via de store. We gaan daar iets aan doen. Ook hebben we nog geen delete-functie in de store. We willen ook af van de lijst van Observers die eenvoudig via de next-functie op de hoogte worden gehouden van de internal state van de store (namelijk de private books array. De books array is niet immutable en zou dus niet zichtbaar mogen zijn buiten de store.

Er komt een broadcast-functie waarin zorg wordt gedragen voor de problemen met de interne lijst die op de Componenten terecht kwam. De broadcast-functie dient ertoe om de bescherming van de lijst af te splitsen van de addBook, en later ook andere functies die voor wijzigingen zorgen. In de broadcast-functie vindt een cloneDeep aanroep plaats waardoor de interne data-array wordt afgeschermd voor de buitenwereld.

addBook(newBook:Book) {
  this.books.push(_.cloneDeep(newBook));
  this.broadcast();
}

deleteBook(deleted: Book) {
    _.remove(this.books, book => book.id === deleted.id );
    this.broadcast();
}

updateBook(update: Book) {
    const book = _.find(this.books, book => book.id === updated.id );
    book = _.cloneDeep(updated);
    this.broadcast();
}

broadcast() {
  this.booksListSubject.next(_.cloneDeep(this.books));
}

 

Een overzicht van wat we nu hebben bereikt. Laten we wel vaststellen dat dit een kleine applicatie is. In een grote applicatie zijn componenten verder verwijderd, zodat ze niet eenvoudig via inputs en outputs kunnen werken. We komen dan tot andere oplossingen. Maar voor die andere oplossingen is het goed om te begrijpen wat er in de basis belangrijk is, en dat is wat we hier zien.

  • De applicatie is veel simpeler dan in het begin
  • De applicatie werkt foutloos, de componenten lopen elkaar niet meer in de weg.
  • De data zijn opgeslagen op een centrale plaats en zijn niet van buitenaf benaderbaar.
  • De componenten zijn in staat om alle acties van de store te volgen, en daaruit conclusies voor hun eigen functionaliteit te verbinden.
  • De volgorde waarin componenten worden gecreëerd is niet meer van belang.

We zijn bijna op het punt beland waar we de RxJS library gaan introduceren.

We kunnen constateren dat de DataStore en de Observable veel dichter bij elkaar staan dan in de eerste instantie lijkt. Nu bevat de DataStore een Observable als member-variabele. Alle componenten die toegang tot de data nodig hebben doen dat via deze observable. Maar we kunnen ook zien dat de DataStore zelf een Observable is. Dit betekent dat de functies subscribe en unsubscribe op DataStore niveau moeten worden geïmplementeerd.

De DataStore komt er zo uit te zien

class DataStore implements Observable{
  private books : Book[];
  private booksListSubject = new SubjectImplementation();

  initializeBooksList(newList: Book[]) {
    this.books = _.cloneDeep(newList);
  }

  addBook(newBook: Book) {
    this.books.push(_.cloneDeep(newBook));
    this.broadcast();
  }

  broadcast() {
    this.booksListSubject.next(_.cloneDeep(this.books));
  }

  subscribe(obs: Observer) {
    this.booksListSubject.subscribe(obs);
      obs.next(this.books);
  }

  unsubscribe(obs: Observer) {
    this.booksListSubject.unsubscribe(obs);
  }
}

Het spreekt dat de componenten dan gelijk op de store subscriben, en niet meer op de store.booksList$

RxJS

Kijk nog eens naar de interfaces die we in het begin hebben gecreëerd. Observer met next, en Observable met subscribe en unsubscribe. Om verder te komen kijk naar de documentatie van RxJS, deze is geheel vernieuwd en erg duidelijk. Het gaat allemaal over Subjects, Observable en Observers. Met de kennis die we tot nu toe hebben opgedaan moet dat eenvoudig te begrijpen zijn. We gaan nog eens naar onze code kijken en zien hoe dit erin zou passen.

import * as _ from 'lodash';
import {Book} from '../shared/model/book';

import { Subject, Observable } from 'rxjs';


class DataStore {

    private books: Book[]  = [];

    private booksListSubject = new Subject<Book[]>();

    public booksList$: Observable<Book[]> = this.booksListSubject.asObservable();

    initializeBooksList(newList: Book[]) {
        this.books = _.cloneDeep(newList);
        this.broadcast();
    }

    addBook(newBook: Book) {
        this.books.push(_.cloneDeep(newBook));
        this.broadcast();
    }

    deleteBook(deleted: Book) {
        _.remove(this.books,
            book => book.id === deleted.id );
        this.broadcast();
    }

    toggleBookViewed(toggled: Book) {
        const book = _.find(this.books, book => book.id === toggled.id);

        book.completed = ! book.completed;
        this.broadcast();


    }

    broadcast() {
        this.booksListSubject.next(_.cloneDeep(this.books));
    }
}

export const store = new DataStore();

Na importeren van RxJS zoiet onze sourcecode er zo uit. We hebben alle interfaces weggedaan, niet meer nodig. We hebben de Subject implementatie weggedaan, die zit in RxJS. We houden het Subject private. Een gouden regel is dat het Subject altijd aanwezig is als er een Observable is, en dat het Subject altijd private is. Het Subject is wat Observable is. We zien dat RxJS is een simpele toolkit om het werk te vervangen dat wij hadden gedaan.
We moeten nog de componenten updaten. Ze subscriben en updaten op deze wijze (Lijstcomponent)

ngOnInit() {
    store.booksList$.subscribe((books: Book[]) => {
      this.books = books;
    });
}

next(data:Book[]) {
    console.log('Books list component received data ..');
    this.books = data;
}

 

Als laatste stap halen we uit de DataStore de schaduwlijst weg, die is een bron van bugs, en we vervangen Subject door BehaviorSubject. BehaviorSubject onthoudt de laatste waarde die gebroadcast is, zodat componenten die net opkomen een waarde krijgen, en niet afhankelijk zijn van de laatste broadcast. De DataStore komt er als volgt uit te zien:

class DataStore {

    private booksListSubject = new BehaviorSubject([]);

    public booksList$: Observable<Book[]> = this.booksListSubject.asObservable();

    initializeBooksList(newList: Book[]) {
      const books = _.cloneDeep(newList);
      this.booksListSubject.next(books);
    }

    addBook(newBook: Book) {
      console.log("adding book")
      const books = this.cloneBooks();
      books.push(_.cloneDeep(newBook));
      this.booksListSubject.next(books);
    }

    deleteBook(deleted: Book) {
      const books = this.cloneBooks();
      _.remove(books,
            book => book.id === deleted.id );
      this.booksListSubject.next(books);
    }

    toggleBookViewed(toggled: Book) {
      const books = this.cloneBooks();
      const book = _.find(books, book => book.id === toggled.id);

      book.completed = ! book.completed;
      this.booksListSubject.next(books);
    }

    private cloneBooks(){
      return _.cloneDeep(this.booksListSubject.getValue());
    }
}

Ter afsluiting, wat hebben we bereikt:

  • We hebben het Observable Pattern geimplementeerd.
  • Op deze wijze kunnen modules en componenten over een data-source beschikken en samenwerken op een ontkoppelde wijze.
  • Iedere module reageert op het aankomen van nieuwe data, of het verwijderen van data in een andere module, zonder met die andere module contact te hebben.
  • Het is voldoende om in te schrijven in de Datastore, en te reageren op de updates die dan aankomen.

Hierna hebben we RxJS geintroduceerd, en van daaruit dezelfde functionaliteit geleverd. RxJS zal de manier worden om reactive programming te implementeren in Angular.