Generators and stream processing – Part 2

I presented basic generator examples in the preceding section. Although it is difficult to discover use cases to use this concept on its own, I decided to give it a try. Reactive programming served as the source of inspiration for this section of the article. I took my time so that I could be a little more detailed with this section. I made the decision to see how much of the RxJS API I could reproduce using async generators. Although I would continue to use RxJS for reactive programming, I am happy with the outcome of this experiment.

Reactive programming was not particularly easy for me to understand, but I believe it does need a small mental adjustment to start understand it. It is much simpler to comprehend modular pipelines where we apply several operators to define sequence of emitted data transformations. A pipeline where every value of a stream that is emitted travels it through completely rather than waiting for each step to finish and sink data to the next processor, I picture it as a kind of CI/CD pipeline where we can control it’s behaviour at any step with a set of operators.

There is amazing website called RxJS marbles, illustrating effects of individual operators applied to a stream. I will be using some to illustrate certain things as we go.

Asynchronous generators in JS have many characteristics with generators, with the important exception that they produce promises rather than values. Based on earlier examples, we can examine the new syntax right away:


async function* pairs(obj) {
  const keys = Object.keys(obj);
  while (keys.length > 0) {
    const k = keys.splice(0, 1)[0];
    yield [k, obj[k]];
  }
}
const obj = {
  a: 1,
  b: 2
};

(async () => {
for await (const t of pairs(obj)) {
  console.log(t)
}
})();
[LOG]: ["a", 1] 
[LOG]: ["b", 2] 

This syntax is an async generator. In the for await  loop,  generated promises are sequentially awaited from one to the next. So, switching from values to promises, is a difference. In the actual world, we frequently work with streams and have practical influence over what transpires when new data are emitted. No matter if it’s a web socket connection or a queue consumer, we quickly comprehend their stream nature.

Observable and Subject is the foundation of the whole idea. Subject  in this case, is just an observable that implements functionality to emit future data to a stream. Multiple async iterators are encapsulated in observables. Observable also implements method for piping operators.

More operators that keep the composeability notion in mind may be found in Sandbox. Let’s take a look at an example stream of numbers that were released every second. Let’s assume that managing even and odd numbers requires distinct handlers. As an example of such a stream, consider the following:

With a code from sandbox, it would look like in example below. Following same principles, it would work the same with RxJS.

import { Observable } from './observable';
import { of } from './sources/of.source';

import { interval } from './sources/interval.source';
import { take } from './operators/take.operator';
import { map } from './operators/map.operator';
import { tap } from './operators/tap.operator';
import { merge } from './sources/merge.source';
import { catchError } from './operators/catchError.operator';
import { retry } from './operators/retry.operator';
import { connect } from './operators/connect.operator';
import { filter } from './operators/filter.operator';


async function main() {
  try {
    const sequence = new Observable(of(() => interval(1000).pipe(take(6))));

    const seq = sequence.pipe(
      connect((shared$) =>
        merge(
          shared$.pipe(
            tap((v) => {
              if (Math.floor((Math.random() * 100)) % 5 == 0) {
                throw new Error('Simulated error');
              }
            }),
            filter((x: number) => x % 2 == 0),
            map(() => 'even'),
          ),
          shared$.pipe(filter((x: number) => x % 2 != 0)),
        ),
      ),
      tap((v) => {
        console.log('Second Tap', v);
      }),

      catchError((e) => console.log('Error happened', e.message)),

      retry(2),
    );

    const sub = seq.subscribe((k) => {});

    await sub.lastValue;
  } catch (e) {
    console.log('Error when processing', e);
  }
}

main();
Second Tap 1
Second Tap even
Second Tap 3
Second Tap even
Error happened Simulated error
Error happened Simulated error
Second Tap 1
Second Tap even
Second Tap 3
Second Tap even
Second Tap 5
Second Tap even

We should ideally allow retries and error handling in production code. This example generates random mistakes to show how thinking in terms of streams makes it easy to add retries and error handlers. Operators were implemented in a way that causes underlying promises to reject in the event of a error. If we use the pipe retry operator, it will retry a specified number of times, commencing at the stream’s beginning.

I can provide an example from my experience where we integrated RabbitMQ as a message bus. I first only used promises and event callbacks, but I soon began to experience problems. Code started to become dirty and tangled very rapidly. After my panic over missing the deadline subsided, I made the decision to redesign the integration using a reactive approach. RxJS performed flawlessly in this case. Retry and error handling logic suddenly began to simplify; all stages involved in message consumption, were combined into one error-prone pipeline block. I’ve now begun to understand why RxJS is used as an underlying technology in the heart of Angular or NestJS becouse when one uses it properly, it can simpify asynchornous code.

Enough with JavaScript, though. I’ve recently begun experimenting with Kotlin. It resembles Java’s less verbose sibling and has the wonderful feature of giving users access to the entire delightful Java ecosystem. Sequences, Flows and Coroutines are available in Kotlin for asynchronous processing; in future, we’ll examine if the same functionality can be achieved with a comparable user interface in Kotlin. I wanted to also explore reactive programming patterns on front-end side with react. Stay tuned.

Adrian Jutrowski

Software Developer