Fork me on GitHub

Chapter 5
Asynchronous Operations

Thus far, all HyperDex operations we’ve seen are synchronous, that is, the application calls the HyperDex operation and blocks until the operation is complete. HyperDex is very fast, but even so, clients may spend a non-trivial amount of time waiting on HyperDex operations. For each operation in HyperDex, there is an asynchronous variant that accomplishes the same task, but allows the client to perform other work while the operation completes.

In this chapter, we’ll look at the asynchronous operations HyperDex supports, and see different ways to use them to increase concurrency in the system.

5.1 Setup

As in the previous chapters, the first step is to deploy the cluster and connect a client. First we launch and initialize the coordinator:

  hyperdex coordinator -f -l -p 1982

Next, let’s launch a few daemon processes to store data. Execute the following commands (note that each instance binds to a different port and has a different /path/to/data):

  hyperdex daemon -f --listen= --listen-port=2012 \
                     --coordinator= --coordinator-port=1982 --data=/path/to/data1
  hyperdex daemon -f --listen= --listen-port=2013 \
                     --coordinator= --coordinator-port=1982 --data=/path/to/data2
  hyperdex daemon -f --listen= --listen-port=2014 \
                     --coordinator= --coordinator-port=1982 --data=/path/to/data3

We now have three different daemons ready to serve in the HyperDex cluster. Finally, we create a space which makes use of all three systems in the cluster. In this example, let’s create a space that may be suitable for storing friend lists in a social network:

  >>> import hyperdex.admin
  >>> a = hyperdex.admin.Admin(’’, 1982)
  >>> a.add_space(’’’
  ... space friendlists
  ... key username
  ... attributes
  ...    string first,
  ...    string last,
  ...    set(string) friends
  ... ’’’)
  >>> import hyperdex.client
  >>> c = hyperdex.client.Client(’’, 1982)

Finally, our object for John Smith and some others

  >>> c.put(’friendlists’, ’jsmith1’, {’first’: ’John’, ’last’: ’Smith’})
  >>> c.put(’friendlists’, ’jd’, {’first’: ’John’, ’last’: ’Doe’})
  >>> c.put(’friendlists’, ’bjones1’, {’first’: ’Brian’, ’last’: ’Jones’})

5.2 Asynchronous Operations

Asynchronous operations permit applications to retrieve or modify multiple objects simultaneously and to perform local computation while doing the same. In previous chapters, we submitted synchronous operations to the key-value store, where each client had just a single outstanding request, and waited patiently for that request to complete. In high-throughput applications, clients may have a batch of operations that may be performed simultaneously. The standard practice in such cases is to issue *asynchronous* operations, where the client does not immediately wait for each individual operation to complete. HyperDex has a very versatile interface for supporting this use case.

Asynchronous operations separate the request and response portions of a single operation into two separate parts. Each asynchronous operation returns a small token that identifies the outstanding operation, which can then be used by the client, if and when needed, to wait for the completion of the selected operation.

Every API method covered in the tutorials so far (e.g. get) has a corresponding asynchronous version, usually prefixed with async_ (e.g. async_get), for performing asynchronous operations. Those without an async_ prefix are natively asynchronous. The basic pattern of usage for asynchronous operations is:

* Initiate the asynchronous operation * Do some work and perhaps issue more operations, async or otherwise, * Wait for selected asynchronous operations to complete

This enables the application to continue doing other work while HyperDex performs the requested operations. Here’s how we could insert an object for user John Jackson asynchronously:

  >>> d = c.async_put(’friendlists’, ’jj’, {’first’: ’John’, ’last’: ’Jackson’})
  >>> d
  <hyperdex.client.Deferred object at ...>
  >>> # do some work
  >>> d.wait()
  >>> d = c.async_get(’friendlists’, ’jj’)
  >>> d.wait()
  {’first’: ’John’, ’last’: ’Jackson’, ’friends’: set([])}

Notice that the return value of the first d.wait() is True. This is the same return value that would have come from performing c.put(...), except the client was free to do other computations while HyperDex servers were processing the put request. Similarly, the second asynchronous operation, async_get, queues up the request on the servers, frees the client to perform other work, and yields its results only when wait is called. In fact, the Python bindings implement all synchronous operations using their asynchronous equivalents. For example, here’s a sample definition of get:

  >>> def get(client, space, key):
  ...     return client.async_get(space, key).wait()
  >>> get(c, ’friendlists’, ’jj’)
  {’first’: ’John’, ’last’: ’Jackson’, ’friends’: set([])}

By itself, an asynchronous operation is not very useful if it is waited on right away because that makes it equivalent to a synchronous operation. The true power comes from requesting multiple concurrent operations. For example, to establish a bidirectional friendship between John Smith and John Jackson:

  >>> d1 = c.async_set_add(’friendlists’, ’jj’, {’friends’: ’jsmith1’})
  >>> d2 = c.async_set_add(’friendlists’, ’jsmith1’, {’friends’: ’jj’})
  >>> d1.wait()
  >>> d2.wait()

Note that the order in which operations are waited on does not matter. We could just as easily execute them in a different order, and still get the desired effect. Similarly, we could concurrently add multiple friends for John Smith:

  >>> d1 = c.async_set_add(’friendlists’, ’jsmith1’, {’friends’: ’bjones1’})
  >>> d2 = c.async_set_add(’friendlists’, ’bjones1’, {’friends’: ’jsmith1’})
  >>> d3 = c.async_set_add(’friendlists’, ’jsmith1’, {’friends’: ’jd’})
  >>> d4 = c.async_set_add(’friendlists’, ’jd’, {’friends’: ’jsmith1’})
  >>> d1.wait()
  >>> d2.wait()
  >>> d3.wait()
  >>> d4.wait()

This allows for powerful applications. Going a step further, HyperDex allows a client to wait for the next operation to complete, without specifying an order among asynchronous operations. For instance, fetching the first names of every friend of John can be done in parallel:

  >>> friends_usernames = c.get(’friendlists’, ’jsmith1’)[’friends’]
  >>> outstanding = set()
  >>> for username in friends_usernames:
  ...     outstanding.add(c.async_get(’friendlists’, username))
  >>> friends = []
  >>> while outstanding:
  ...     d = c.loop()
  ...     outstanding.remove(d)
  ...     friend = d.wait()[’first’]
  ...     friends.append(friend)
  >>> sorted(friends)
  [’Brian’, ’John’, ’John’]

Using the loop() method, it is possible to issue thousands of requests and then wait for each one in turn without having to serialize the round trips to the server.

5.3 Potential Pitfalls

Keep in mind that HyperDex may (and will!) maximize performance by executing concurrent asynchronous operations in any order. If a client wants to ensure that an operation B is performed only after asynchronous operation A is committed to the data store, that client needs to call wait() on A. When wait() returns success, the client is guaranteed that HyperDex has committed the data to sufficiently many replicas to tolerate the number of failures in the space definition.

On a related note, a client that has not explicitly performed a wait() on an outstanding asynchronous operation should not assume anything about the disposition of those operations. Specifically, the queued asynchronous operations may not have made it to any of the servers, and therefore may not be committed anywhere. A client that simply issues asynchronous requests and terminates without calling wait() on them is not guaranteed to have any of those asynchronous operations execute. Unless the system explicitly tells a client that the data is committed, it is not safe to assume that it will be committed behind the scenes. (The flip-side of this is that, when HyperDex says the data is committed, it really is committed to sufficiently many replicas to withstand the number of simultaneous failures in the space specification).

5.4 A Common Window Pattern

The last point necessitates a common pattern, where a client will want to keep a fixed-size window of outstanding requests so as to not issue too many operations concurrently, and will appropriately wait for all asynchronous operations to complete. Such code will look, approximately, like this:

  c = hyperclient.Client(...)
  outstanding = 0
  for line in file:
      while outstanding >= 1024: # 1024 is window size
          d = c.loop()
          outstanding -= 1
      parse line and issue put
      outstanding += 1
  # flush remaining
  while outstanding > 0:
      d = c.loop()
      outstanding -= 1