Use GraphQL subscription to show progress of time-consuming operations

in

GraphQL is feature-rich query language designed to fit majority of needs of generic online applications. The 3 main types of operations in GraphQL are query, mutation and subscription. While queries and mutations are send in HTTP requests, subscriptions are running on WebSocket, which enables it to receive updates in real time. In this article, I would like to talk about using the subscription to report real-time results and/or progresses to time-consuming operations.

Usually, for time-consuming tasks like server-side downloads, or gathering data from multiple remote sources, it is usually favourable to provide a visual feedback to the user about the progress – or even better – partial results. In GraphQL, we can make use of subscription for the server to stream responses while during the operation.

Schema design

Suppose that we have a query that looks like this:

query {
    multiSearch(keyword: String!): [SearchResult!]!
}

We can refactor it as follows for progress reports:

type MultiSearchProgress {
    totalSources: Int!
    completedSources: Int!
    success: Boolean
}

type subscription {
    multiSearchStatus(sessionId: String!): MultiSearchProgress!
}

type query {
    multiSearch(keyword: String!, sessionId: String): [SearchResult!]!
}

So here, we allow the multiSearch query to take in an optional string parameter sessionId. The client generates a strong random string as a session ID, supplied when making the query. In the meantime, the client can subscribe to to the multiSearchStatus channel with the same session ID to get real time progress from the server.

When the server receives the query and starts its computation, it can take the session ID from the query, and report it to the corresponding subscription channel with progresses. In the subscription payload, we included some common data like total and loaded numbers.

We have also included the succeeded: Boolean to indicate the status of query. This property can has 3 states:

  • null: query in progress
  • true: succeeded
  • false: failed

This can also give the client a hint of when the query finishes, as an additional safeguard to the actual query outcome.

More payloads

If you want to include more information like partial results, we can also add that to the progress type:

type MultiSearchProgress {
    totalSources: Int!
    completedSources: Int!
    newResults: [SearchResult!]!
    success: Boolean
}

In this new definition, we can stream any new search results retrieved during the progress. These new results can be displayed to the user in real time during the query. Optionally, the final query result can be used to replace data retrieved in the subscription, to avoid any loss of results in WebSocket.

Points to note

  • As subscriptions require a WebSocket to set up, and not commonly used in most GraphQL servers, it could be left out in your environment if GraphQL have already been used. In this case, additional effort is needed throughout the connection path (server, forwarder, load balancer, CDN, etc.) in order for WebSocket to work.
  • The client is recommended to subscribe with the generated ID before the query is sent, so as not to lose any update issued right after the query started.
  • If you have no authorization in place for subscriptions, all such progress reports will be available to anyone with the correct session ID at the right time. If the data sent through subscriptions are sensitive or private. It is recommended to use a cryptographically strong random string generator on the client to generate session IDs, and apply authentication wherever possible.

Code sample

Here is a small sample of implementation of this concept in TypeScript. This sample makes use of TypeGraphQL, React, Apollo Server and Client.

// Mock search source interface
interface SearchSource {
  search(query: string): Promise<SearchResult[]>;
}

// Generic interface for data with session ID
interface PubSubSessionPayload<T> {
  sessionId: string;
  data: T | null;
}

@Resolver()
export class MultiSearchResolver {

  sources: SearchSource[];

  @Query(returns => [LyricsKitLyricsEntry])
  public async multiSourceSearch(
    @Arg("query") query: string,
    @Arg("sessionId", { nullable: true }) sessionId: string | null,
    @PubSub("MULTI_SOURCE_SEARCH_PROGRESS") publish: Publisher<PubSubSessionPayload<MultiSourceSearchStatus>>,
  ): Promise<SearchResult[]> {

    const outcomes = await Promise.all(this.sources.map(source => source.search(query).then(data => {
      if (sessionId !== null) {
        // send partial results for each source
        publish({sessionId, data});
        return data;
      }
    })));

    // Send end-of-query update
    await publish({ sessionId, data: null });

    // join SearchResult[][] into SearchResult[]
    const results = outcomes.reduce((prev, curr) => [...prev, ...curr], []);

    return results;
  }

  @Subscription(
    () => MultiSourceSearchStatus,
    {
      topics: "MULTI_SOURCE_SEARCH_PROGRESS",
      filter: ({ payload, args }) => args.sessionId === payload.sessionId,
      nullable: true,
    }
  )
  multiSourceSearchProgress(
    @Root() payload: PubSubSessionPayload<MultiSourceSearchStatus>,
    @Arg("sessionId") sessionId: string,
  ): MultiSourceSearchStatus | null {
    return payload.data;
  }
}
const MULTI_ENGINE_SEARCH = gql`
  query($query: String!, $sessionId: String) {
    multiSourceSearch(query: $query, sessionId: $sessionId) {
      // fields go here
    }
  }
`;

const MULTI_ENGINE_SEARCH_PROGRESS_SUBSCRIPTION = gql`
  subscription ($sessionId: String!) {
    multiSourceSearchProgress(sessionId: $sessionId) {
      // fields go here
    }
  }
`;

const apolloClient = useApolloClient();
const [searchResults, setSearchResults] = useState([]);
const [query, setQuery] = useState([]);

function search() {
  const sessionId = getSecureRandomString();
  setSearchResults([]);

  const subscription = apolloClient.subscribe<{ multiSourceSearchProgress: SearchResult }>({
    query: MULTI_ENGINE_SEARCH_PROGRESS_SUBSCRIPTION,
    variables: { sessionId }
  });

  const zenSubscription = subscription.subscribe({
    next(x) {
      if (x.data.multiSourceSearchProgress !== null) {
        setSearchResults((results) => [...results, x.data.multiSourceSearchProgress]);
      }
    },
    error(err) {
      console.error(`Finished with error: ${err}`);
    },
    complete() {
      console.log("Finished");
    }
  });

  const query = apolloClient.query<{ multiSourceSearch: SearchResult[] }>({
    query: MULTI_ENGINE_SEARCH,
    variables: { query, sessionId },
  });

  const result = await query;

  if (result.data) {
    setSearchResults(result.data.multiSourceSearch);
  }

  zenSubscription.unsubscribe();
}


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *