Integrating reactive gRPC in Node with GraphQL

UPDATE (05/05/2018): I updated the rxjs examples with rxjs 6. This changes are based on this change between rxjs 5 and 6.

Hey! This is the continuation of my previous post about how to make your NodeJS gRPC client calls reactive:

  1. Making a node gRPC client reactive with rxjs
  2. Integrating reactive gRPC in Node with GraphQL (this post)
  3. Circuit Breaking the gRPC calls
  4. Testing the gRPC calls mocking the responses and streams

We are now going to integrate it with a graphQL server, in this case Apollo-Server with Express.

The schema resolvers

First we see apollo-server resolvers. From the official apollo documentation:

Resolvers in GraphQL can return different kinds of results which are treated differently:

  • null or undefined - this indicates the object could not be found. If your schema says that field is nullable, then the result will have a null value at that position. If the field is non-null, the result will “bubble up” to the nearest nullable field and that result will be set to null. This is to ensure that the API consumer never gets a null value when they were expecting a result.
  • An array - this is only valid if the schema indicates that the result of a field should be a list. The sub-selection of the query will run once for every item in this array.
  • A promise - resolvers often do asynchronous actions like fetching from a database or backend API, so they can return promises. This can be combined with arrays, so a resolver can return a promise that resolves to an array, or an array of promises, and both are handled correctly.
  • A scalar or object value - a resolver can also return any other kind of value, which doesn’t have any special meaning but is simply passed down into any nested resolvers, as described in the next section.

In our case as the gRPC calls are asynchronous we will use Promises.

Adapting the streams to graphQL resolvers

Using our RxGrpcUtils.js created in the previous post to make gRPC calls reactive, we transform that stream into a Promise.

import toArray from 'rxjs/operators';
import grpcClient from './grpcClient';
import RxGrpcUtils from './RxGrpcUtils';

export default {
	Query: {
		getSomething: (obj, args, context, info) => {
			return RxGrpcUtils.grpcToObservable(
						// I need the binding because of a change in context I was 
						// facing while running the call, check if it's necessary in your case
						grpcClient.getSomething.bind(grpcClient),
						{
							/* some data to pass to the grpc call? */
						}
					).toPromise();
		},

		streamSomething: (obj, args, context, info) => {
			return RxGrpcUtils.grpcStreamtoObservable(
						grpcClient.streamSomething.bind(grpcClient),
							{
							/* some data to pass to the grpc call? */
							}
						)
						.pipe(
						  // wait until stream is completed and make an array
                          toArray()
						)
						.toPromise();
		}
	},
	Mutation: { /* ... */ }
};

  • In the first case where the gRPC call has a callback (it’s not a stream), just using .toPromise() (method doc) at the end is enough!
  • And in the case of a gRPC stream we make an array of the elements received in the stream calling .toArray() (method doc) and then call the same .toPromise() to give the apollo server the Promise it needs.

I would like to explore the possibility of streaming the list of items to the client instead of waiting for the stream to complete and return the items as array, but that’s research for another time.

Let me know if you found it useful and/or if you’d like to read more about this. Cheers!

Blog Logo

Tomas Alabes

Software Engineer, author, blogger and obsessive learner, from Argentina living in Silicon Valley


Published

Image

Tomas Alabes' Blog

My personal site's blog

Back to Overview