Making a node gRPC client reactive with rxjs

UPDATE (05/05/2018): I updated the rxjs examples with rxjs 6. This changes are based on this change between rxjs 5 and 6.

Let’s setup the situation:

  • You have a service in your back-end that acts as a gRPC client and talks with another service that acts as a gRPC server
  • The gRPC client runs on NodeJS
  • And you love the reactive paradigm, know how to work with the famous rxjs and think that gRPC calls are a good fit

Are we talking about similar situations? Yes? Great! So this post might be useful for you.

TL;DR? Go to the “Making the calls reactive” section

Setting up a gRPC client in NodeJS

If you have a gRPC client built in Node you will probably have something similar to this, where you load the .proto with the gRPC services definitions (you can compile the definitions instead of loading them dynamically but that’s another topic).

import path from 'path';
import grpc from 'grpc';

const PROTO_PATH = path.resolve("path/to/your/.proto");

const protoDescriptor = grpc.load(PROTO_PATH);
const grpcService = protoDescriptor.path.to.my.service

export const client = new grpcService(
	// localhost or whatever address you need, same for port
	'localhost:6565',
	// this will probably need to be made secure
	grpc.credentials.createInsecure()
);

When this client is built, you are ready to call your gRPC server.

Making the calls reactive

The basic idea of this helper functions is to use the gRPC callbacks and streaming mechanism to convert them in rxjs Observables.

The code is pretty straight-forward so I’ll let it do the talking:

import { Observable } from 'rxjs';

export default {
	grpcStreamtoObservable(grpcStreamFn, opts) {
		return Observable.create(observer => {
			const streamCall = grpcStreamFn(opts);
			streamCall.on('data', next => observer.next(next));
			streamCall.on('end', () => observer.complete());
			streamCall.on('error', err => observer.error(err));
		});
	},

	grpcToObservable(grpcFn, opts) {
		return Observable.create(observer => {
			grpcFn(opts, (err, next) => {
				if (err) observer.error(err);
				observer.next(next);
				observer.complete();
			});
		});
	}
};

Once you have the observable you can do all the magic you already know with rxjs.

An example usage can be:

RxGrpcUtils.grpcToObservable(
	// I need the binding because of a change in context I was 
	// facing while running the call, check if it's necessary in your case
	grpcClient.myRpcCall.bind(grpcClient), 
	{ /* payload */ }
);

A simple trick I found useful is catching the error from the callback or stream and adapt it to something meaningful for my service.

import { catchError } from 'rxjs/operators';

// ...

	grpcToObservable(grpcFn, opts, i18n) {
		return Observable.create(observer => {
			// ...
		})
		.pipe(
		  catchError((err, caught) => {
                throw new Error("My custom error");
            })
		);
	}

This is totally up to you and your service, if it makes sense to do it. And as I said, you have all the power of streams on your hands to continue playing with it.

As a matter of fact, I will continue showing some tricks and integrations with this in the following posts.

Future posts

  • We will integrate this with a graphQL server
  • And add Circuit Breaking to the gRPC calls
  • Testing the gRPC calls mocking the responses and streams

Let me know if you found it useful and/or if you’d like to read more about this. Cheers!

Blog Logo

Tomas Alabes

Software Engineer, author, blogger and obsessive learner, from Argentina living in Silicon Valley


Published

Image

Tomas Alabes' Blog

My personal site's blog

Back to Overview