Making a node gRPC client reactive with rxjs
UPDATE (05/05/2018): I updated the rxjs examples with rxjs 6. This changes are based on this change between rxjs 5 and 6.
Let’s setup the situation:
- You have a service in your back-end that acts as a gRPC client and talks with another service that acts as a gRPC server
- The gRPC client runs on NodeJS
- And you love the reactive paradigm, know how to work with the famous rxjs and think that gRPC calls are a good fit
Are we talking about similar situations? Yes? Great! So this post might be useful for you.
TL;DR? Go to the “Making the calls reactive” section
Setting up a gRPC client in NodeJS
If you have a gRPC client built in Node you will probably have something similar to this, where you load the .proto
with the gRPC services definitions (you can compile the definitions instead of loading them dynamically but that’s another topic).
import path from 'path';
import grpc from 'grpc';
const PROTO_PATH = path.resolve("path/to/your/.proto");
const protoDescriptor = grpc.load(PROTO_PATH);
const grpcService = protoDescriptor.path.to.my.service
export const client = new grpcService(
// localhost or whatever address you need, same for port
'localhost:6565',
// this will probably need to be made secure
grpc.credentials.createInsecure()
);
When this client is built, you are ready to call your gRPC server.
Making the calls reactive
The basic idea of this helper functions is to use the gRPC callbacks and streaming mechanism to convert them in rxjs Observables.
The code is pretty straight-forward so I’ll let it do the talking:
import { Observable } from 'rxjs';
export default {
grpcStreamtoObservable(grpcStreamFn, opts) {
return Observable.create(observer => {
const streamCall = grpcStreamFn(opts);
streamCall.on('data', next => observer.next(next));
streamCall.on('end', () => observer.complete());
streamCall.on('error', err => observer.error(err));
});
},
grpcToObservable(grpcFn, opts) {
return Observable.create(observer => {
grpcFn(opts, (err, next) => {
if (err) observer.error(err);
observer.next(next);
observer.complete();
});
});
}
};
Once you have the observable you can do all the magic you already know with rxjs
.
An example usage can be:
RxGrpcUtils.grpcToObservable(
// I need the binding because of a change in context I was
// facing while running the call, check if it's necessary in your case
grpcClient.myRpcCall.bind(grpcClient),
{ /* payload */ }
);
A simple trick I found useful is catching the error from the callback or stream and adapt it to something meaningful for my service.
import { catchError } from 'rxjs/operators';
// ...
grpcToObservable(grpcFn, opts, i18n) {
return Observable.create(observer => {
// ...
})
.pipe(
catchError((err, caught) => {
throw new Error("My custom error");
})
);
}
This is totally up to you and your service, if it makes sense to do it. And as I said, you have all the power of streams on your hands to continue playing with it.
As a matter of fact, I will continue showing some tricks and integrations with this in the following posts.
Future posts
- We will integrate this with a graphQL server
- And add Circuit Breaking to the gRPC calls
- Testing the gRPC calls mocking the responses and streams
Let me know if you found it useful and/or if you’d like to read more about this. Cheers!