Proposal: Shuffler (Attempt 2)

Background Reading

I took ideas and inspiration from these documents.


Cassandra is an elastic key-value store. The Dynamo section of their
architecture page describes how they partition data. In particular, they
leverage consistent hashing to limit data movement when the cluster grows or
shrinks. They also use consistent hashing to decide how to replicate data to
enable failure tolerance.

Cassandra also describes its local data structure: a Log-structured Merge
Tree. I didn’t read Cassandra’s explanation. A few explanations that I found

Problem Description

The Hail Query system pervasively relies on key-ordered datasets. Joins, among
other operations, rely on a “shuffle,” a fully connected data transfer that
re-orders and re-partitions the dataset. Query uses unreliable “preemptible”
nodes. If any one node dies, the entire data transfer must restart from scratch
because every node depends on the dead node.

We intend to replace the shuffle operation with an elastic, fault-tolerant
service. This service will need to concurrently ingest key-value records from
tens of thousands of clients. After ingest, it must describe an approximately
balanced partitioning the dataset. Finally, it must concurrently serve reads of
each partition to tens of thousands of clients.

Architectural Sketch

There is one leader node which manages a cluster of at most ~1000 nodes. The
leader tracks the follower names and all shuffles in the system. Clients start
and stop shuffles by communicating with the leader.

An important concept in our system is a “range”: a key-ordered sequence of
records. A range should be a couple orders of magnitude larger than a single
record. Perhaps one hundred records comprise a range. Ranges are the unit of
partition. Each one is assigned an identity and a consistent hash function
assigns it to a configurable number of followers. The use of a consistent hash
function limits the data transferred when a follower joins the cluster. We
believe the system will balance follower load if there exist at least an order
of magnitude more ranges than followers.

Without any knowledge of the key distribution, we cannot choose the set of
intervals for the ranges. We design the system with three levels: the clients,
the buffer followers, and the storage followers. The clients send data to the
buffer followers which buffer the records. At some point, the leader (in concert
with the buffer followers) calculates an approximate CDF of the buffered data
and estimates balanced ranges for the full dataset (we should know roughly the
fraction of data seen thus far). The ranges are assigned an identity and are
consistently hashed to a storage node. The buffer followers send data to the
responsible storage followers. In general, each buffer follower may send data to
every storage follower.

Even after the ranges are calculated, we believe this system will outperform
clients directly sending data to the responsible storage followers because the
buffer followers effectively batch records from several clients. This suggests
the system uses an order of magnitude (or more) fewer buffer followers than it
has clients.

Buffer followers and storage followers may be implemented by the same process
acting in two different roles.

Failure Tolerance

There are three nodes that can fail and we address each one separately.

A buffer follower will not send a success response to a client until all data
from the client’s partition is stored on a storage follower. If a buffer
follower dies, then a client will mark that partition as failed and the Query
system will re-start that partition. Separate attempts of the same partition use
unique attempt ids which distinguish the records. Data in the system is
immutable, so failed attempts are filtered on read.

Every range is replicated a configurable number of times. We tolerate failure of
at most that number of storage followers. All nodes periodically send a
heartbeat to the leader. The leader detects failure of storage followers by
missing heartbeats. When a storage follower disappears, the leader computes a
new assignment of ranges to storage followers (based on the consistent hash) and
communicates this to the storage followers which transfer data accordingly to
restore full replication of the data. In the meantime, clients which do not find
a range at the expected replica will contact another replica. Under normal
operation, clients will round-robin through the replicas for a given
range. Moreover, if a replica is overloaded for any reason, it can reject client
requests and clients will try another replica. If all replicas for a range fail,
clients should exponentially back-off attempting to contact the cluster.

The leader stores all critical state in a MySQL database. We anticipate
designing the leader to have a warm backup running at all times, perhaps using a
lease system to ensure only one leader is active.


The sample used to calculate ranges may not be representative of the full

A dataset is rebalanced in two steps. First, the leader collects an approximate
CDF from the storage followers. Next, it calculates new ranges. Consistent
hashing limits data movement when data identity is immutable. We endeavor to
limit data movement by propagating the identity of old ranges to new ranges when
possible. In the general case that a set of old ranges all overlap with a set of
new ranges, for each pair of ranges calculate (using the approximate CDF) the
density shared between the pair. Propagate identities of old ranges to new
ranges in order of density shared. For example, if we have old ranges 1 2 3 and
new ranges 4 5 6 7 8 and the highest shared-densities are:

1,4 0.2
1,5 0.15
3,4 0.1
3,5 0.09
2,6 0.08

Then we’ll propagate the identity of range 1 to range 4, the identity of range 2
to range 6, and the identity of range 3 to range 5. All remaining ranges are
assigned new identities. The leader then calculates replicas using consistent
hashing and informs the cluster of the new partitioning. The followers transfer
data until the new partitioning is achieved. Each follower holds a range until
it is successfully transferred, ensuring the replication minimum. In the
meantime, clients are informed of both the old and new range assignments and try
replicas until they find the range. If an old replica does not have the range then
at least one new replica must have the range, so they will eventually find the