Generators and stream processing – Part 3


 Usually, when I mention RxJS for instance, I get rather feedback like: “There are a handful of use cases”, “I don’t like Rx”, or “It doesn’t make sense, we can use promises, observables are overkill”. To be honest, I was one of those developers, even though I was working with Angular, where observables are ubiquitous. My opinion about RX started to change with the RabbitMQ integration I mentioned before, and I decided to push it a little bit forward. Probably a little bit too much, as I disgusted by my React developer friend was and called this idea library abuse, to skip React state – I used RXJS in react. I committed an even bigger crime though. I used ‘Inversify’ to inject dependencies. And Elf for data storing.. I hear those voices of react developers in my head. But why…. why WHY ???

1. React is good but for VIEW layer.

React is just a view library with state management and some other toys. With hooks, it is the perfect tool for writing functional components, but it’s really poor with handling side effects and managing dependencies. Not once, not twice I have seen projects where logic was tightly coupled with views where large parts of business logic were implemented in useEffect hooks, useCallbacks, and whatnot. On top of that everything depends on react rendering cycle. Components should represent the presentation layer, be consumers of data, and at best trigger side effects – requests, data persistence, etc – not 1 man army. A little bit more about React and RxJS was posted by Nils here

2. Not sufficient Inversion of Control capabilities

One could argue that there is a Context API that should be used for this purpose. Context certainly helps, but it is simply an object that can be accessed from child components through  context provider. There is nothing less or more. If you want to adhere to SOLID principles, you will struggle with L and D without an IoC container. I’ve seen similar attempts here, however the offered method is dependent on a concretion rather than abstraction. Inversify performs an excellent job at this.

3. Reactive programming

 One interesting fact about React is that it is not reactive. Or, at the very least, not completely, and any “reactive” method couples code to React. Because of the variety of interactions and event streams produced by interactions, using RxJS on the frontend makes even more sense than using it on the backend. Even simple activities like throttling, initiating queries, and so on are vexing with plain react, let alone more complex logic in side effects. Furthermore, RxJS allows us to separate business logic away from the rendering cycle. Components using the useObserable hook will only subscribe and unsubscribe from observables, effectively becoming a data consumer and side effect trigger at best. On emits react, the necessary bits are re-rendered. In contrast to context, which updates all children when context changes.



4. Testing

When business logic is represented with classes rather than react components, it is much easier to test. RxJS provides some useful tools for deterministic testing of observables. Component tests can therefore be reduced to mocking external data sources, which is simple with the tools stated, and interacting with components and making assertions on a view. Business logic is tested independently.

5. Why not Angular?

“Okay, mate, why not just use angular then?” – My response is to use a tool that does the job better. React is both easy to use and powerful. Not to mention Nextjs, which seamlessly integrates SSR features into your projects. Furthermore, I would like my code, performing business logic, to be as isolated from the presentation layer as possible.

Let's cut to the chase

Let’s warm up with a simple example that makes me laugh – retries. How would we do that in react? 

const Component = () => {
    const [retryCount, setRetryCount] = useState(0);
    const [response, setResponse] = useState({})
    useEffect(() => {
        fetch("").then((res) => {
        }).catch(() => {
            if(retryCount > 4) {
            setRetryCount(retryCount + 1)
    }, [retryCount])


It is not uncommon for things to go wrong with our application. Then we must either retry or gracefully handle an error. This is how you would hack your way to repeat the operation in react, believe it or not. By coupling it more closely to the react rendering pipeline! However, requests are not the only operations that may require a retry. What if I need to retry any 3rd party library method call in the same way? Furthermore, we may wish to throttle submitted requests so that user interactions do not result in an excessive number of requests. More state variables, more logic in use effects, and complete loss of control over a flow are the solution. How would you ever attempt to create such logic? Make a request, and if it fails, attempt it in 1-second intervals. Perhaps something like this:


const Component = () => {
    const [retryCount, setRetryCount] = useState(0);
    const [response, setResponse] = useState({})
    useEffect(() => {
        fetch("").then((res) => {
        }).catch(() => {
            if(retryCount > 4) {
            setTimeout(() => {
                setRetryCount(retryCount + 1)                
            }, 1000)

    }, [retryCount])


This only becomes worse if you require more operations. Not to mention that even if we removed the requesting logic from the component, it would still be dependent on the state setter method to trigger re-render…

How can we implement this in a SOLID manner and make it easily testable so that retries, delays, or other processes similar to this may be added to whatever type of operation we perform in our application? The solution is to be reactive!

Let me introduce some dependencies I recently used in a project, that come in really handy.

@ngify/http – a reactive http client

@ngneat/effects – a framework agnostic RxJS effects implementation

inversify – IoC container

inversify-react – react hooks for using Inversify

rxjs – reactive extensions for javascript

@ngneat/react-rxjs – utility hooks for rxjs, i used only useObservable, which is pretty easu to write ourselves though

Oki doki let’s get to work, by implementing something

import {HttpClient, HttpHeaders} from "@ngify/http";
import {injectable} from "inversify";
import {switchMap} from "rxjs";

import {ChromeEventsSource} from "../chrome/chrome-events-source";

const GOOGLE_URL= ''

export class ISomeService {
    abstract getData();

export class SomeService implements ISomeService {

        private readonly http: HttpClient
    ) {

    getData() {
        return this.http.get(`${GOOGLE_URL}`);

We need to make a module provider and bind out dependencies now

import {Container, decorate, injectable} from "inversify";
import {Provider} from "inversify-react";

//Makes Inversify aware about HttpClient
decorate(injectable(), HttpClient)

export const ModuleProvider = () => {
    const container = new Container();

    container.bind(HttpClient).toDynamicValue( () => new HttpClient()).inSingletonScope();

    return container

export const Module = ({children}: any) => {

    return (<Provider container={ GlobalModuleProvider }>
        { children }

And finally we can get to our component and some extra hook.

import {useObservable} from "@ngneat/react-rxjs";
import {useInjection} from "inversify-react";
import {Observable, retry} from "rxjs";

import {ISomeService} from "./some.service";

const useRetryableObservable = (stream$: Observable<any>) => {
    return useObservable(stream$.pipe(
            count: 5,
            delay: 1000

const Component = () => {

    const service = useInjection(ISomeService);
    const [data] = useRetryableObservable(service.getData());

    return (
            { JSON.stringify(data) }


export default Component;

And in this way, we created a component that is independent of concretion because it is dependent on whatever is injected with the ISomeService interface (I know it’s technically an abstract class, but Typescript doesn’t leave a trace of interfaces in resulting JS code, so they can’t be used for DI, but I use this construct for bindings in DI containers; also, ISomeService consists only of signatures, not implementations). Furthermore, we created logic that is not dependent on the React rendering flow. I propose looking into elf if we wish to store info for later access. It is a reactive storage that works well with the notions discussed previously.


What about testing? “I don’t test frontend,” “Nothing to test there,” and “Or testing components is difficult” were all phrases I heard several times. I believe that bad React code is difficult to test. All of these couplings and side effects, together with the rendering pipeline, make things considerably more difficult. However, testing observables can be enjoyable.

As previously stated, RxJS has useful testing tools. Marble diagrams can be used to test your observables. Here’s an example of a test that makes use of an IoC container, mocks requests, and asserts results.

describe("Test suite", () => {
    beforeEach(() => {
        container = ModuleProvider();
        subject = container.get(ChromeEventsSource);
        testScheduler = new TestScheduler((actual, expected) => {
            return expect(actual).toEqual(expected);

    it('should filter incomming request', () => {

        //We simulate responses
        const mockedRequests = {
            a: [new CapturedRequestMock({ userId: "4862536117",users: [{pk: 1}]}, "/followers/")] as any,
            b: [new CapturedRequestMock({ userId: "4862536117",users: [{pk: 2}]}, "/somethign/else")] as any,
            c: [new CapturedRequestMock({ userId: "4862536117",users: [{pk: 3}]}, "/followers/")] as any

        //We expect that middle response will be filtered out from emitted stream values
        const expectedValues = {
            a: {body: {userId: "4862536117", users: [{pk: 1}]}, url: "/followers/"},
            c: {body: {userId: "4862536117", users: [{pk: 3}]}, url: "/followers/"}
        }{ expectObservable, cold }) => {
            const mockRequests$ = cold('-a--b--c--(|)', mockedRequests)
            jest.spyOn(subject, "getRequestFinishedStream").mockImplementation(() => mockRequests$ );

            const mapped$ = subject.onRequestsFinished("/followers");

            expectObservable(mapped$).toBe('-a-----c---------|', expectedValues);


We can easily represent mistake throws or describe delays in a marble diagram. One disadvantage is that observables which are made from promises. There are strategies to get around this by mocking functions that yield promises and returning plain value instead. from() would perform the work for us, creating an Observable that we could test with the RxJS toolset.


Hooks and contexts are fantastic, but when creating reusable components that must communicate with one another, we don’t want to rely on external libraries like RxJS. However, when it comes to implementing domain-specific components, RxJS and reactive tools provide numerous advantages.

Throughout my trip, though, I realized why popular frameworks like as NestJS and Angular made RxJS and Observables first-class citizens. I can summarize it in one sentence, and direct you for further, interesting readings – Everything is a stream.

Adrian Jutrowski

Software Developer