Proposal: Shuffler

The Shuffler is a service with one leader and many followers.

The leader knows the list of followers and the list of shuffles. For each shuffle it has an approximate CDF.

Each follower has an entry for each shuffle. A shuffle entry is:

  • an ordered data structure of the received records
  • a cdf of the received keys
  • a list of key intervals and which followers own said interval

A client asks the leader to create a new shuffle. The leader generates an id and contacts each follower and informs them of the new shuffle. The followers enter the “put” state. Then it returns the new id and the list of followers to the client.

The client concurrently adds data to all the followers. Periodically, the client checks the leader for the current list of followers. The followers periodically send their CDF to the leader which updates its overall CDF accordingly. The leader periodically sends the followers an updated list of owned key intervals. The followers periodically send data which they do not own to the true owner.

The client then asks the leader to end input to the shuffle. The leader computes a final list of owned key intervals (using whatever CDF it has on hand, perhaps out-of-date) and sends the key intervals to each follower, informing them the input has ended. The leader responds to the client with the owned key intervals. Each follower sends data it does not own to the true owner. When it holds no unowned data it enters the “ready for get” state and informs the leader it is in the “ready for get” state. When all the followers have entered “ready for get,” the leader sends all followers a message to enter the “get” state.

The client concurrently asks for intervals from followers. If a follower has not entered the “get” state it returns 503. The client exponentially backs off. If the follower does not own the requested interval, it returns 400. The client asks the leader for the owned key intervals and tries again.

The client asks the leader to delete the shuffle. The leader immediately responds with success. The leader tells the followers to delete the shuffle. The followers remove all remaining data.


A follower can join the cluster at any time. It notifies the leader. The leader informs the cluster of its presence by sending an updated owned key intervals.

In the “put” state, the followers will eventually send data to the new follower. Moreover, the clients will eventually ask the leader for an updated follower list.

In the “get” state, the new follower will initially return 503 to all get requests. Once a follower has successfully sent all data to the new owners, it informs the leader. In the interim, the follower can still respond to client requests. It should not delete the data it has sent away. The leader then informs all followers they should switch to the new owned key intervals. At this point, clients can delete the data they’ve sent away.


At any time, a follower always knows if it is the authority on an interval. Until a follower enters the “get” state, it is not an authority. In the “get” state, is always an authority on the interval it owns. Even if the cluster has added a new follower of which it is unaware, it is still the authority on the data it owns.


For resilience, I think each follower can have a unique replicate who just duplicates its data. If the nodes are named 0, 1, 2 then every node’s replicate can be i+1 mod 3. If a node disappears, the leader will eventually realize and adjust the owned key intervals accordingly.

ApproxCDF FTW!

How does the leader compute the owned key intervals? Since you didn’t specify, I’m assuming you’re thinking if there are n followers, the first should get the quantile range [0, 1/n), the second gets [1/n, 2/n), etc. Is it a concern that when a new follower joins the cluster, and new intervals are computed, a large fraction (I’d guess about half) of the data are now stored at the wrong follower and need to be sent over the network (and maybe worse, some nodes now own almost none of their data)?

Maybe at the current scale, that isn’t a concern, but it seems like a weak point to keep an eye on. If that is a problem, I think there are ways to assign intervals, where each follower owns several disjoint intervals, that minimize the amount of reshuffling when followers join or leave.

The followers periodically send their CDF to the leader which updates its overall CDF accordingly. The leader periodically sends the followers an updated list of owned key intervals. The followers periodically send data which they do not own to the true owner.

There are a few ways this could work. Do followers send only new keys they’ve received since the last update, or do they send a CDF of all keys they currently hold? When keys are transferred between followers, do they stay in the original owner’s CDF, or is it important for the CDFs to reflect the current state of the partitioning?

If all that is needed from the CDFs is for the CDF on the leader to be a good approximation of the total distribution, I think each follower’s CDF should contain only the keys they received directly from the client. If keys need to be removed from the CDF when they’re transferred to a different follower, that complicates things significantly. For the other question (delta or cumulative CDF updates), I think either could work, but sending deltas seems to require less synchronization among the updates to the leader, and could be made to require less data sent and less work by the leader to update the total CDF.

Regarding the quantiles thing, I spent some time working this out. These are my notes. I think you send on the order of 1/n data.


Pervasively consider the case of one shuffle. Generalizing to multiple shuffles
seems straightforward.

example: 1 node cluster becomes a 2 node cluster

Suppose there is one node in the cluster and a second node joins. The leader
informs the worker of the active shuffles, including their types.

The second node begins to receive traffic from the clients. This data is added
to its sorted tree.

Eventually the first node learns that the second node exists, and a recent
version of its CDF (which is mostly empty). Suppose the second node’s name is
lexicographically after the first node’s name. The first node will send roughly
half of its data to the second node. If the send fails (suppose the second node
crashed), the data remains on the first node. After some delay the first node
will again assess the CDFs and attempt to redistribute data.

In this situation roughly half the data is transferred.

example: n node cluster becomes an n+1 node cluster

Suppose there are n nodes with names:

[n00, n10, n20, n30]

Suppose a node with name n25 joins the cluster:

[n00, n10, n20, n25, n30]

Assume the system has roughly balanced the data across the original six nodes:
each node has 1/4th of all data in the cluster. All nodes will eventually learn
of n25’s existence and will learn two things: they should now own 1/5th of the
data, and their partition boundaries have shifted slightly. Let q be th
quantiles function of all shuffle data (not including data not yet sent to the
shuffler). The original partition boundaries are:

    n00,    n10,    n20,    n30
q(0)   q(1/4)  q(2/4)  q(3/4)  q(4/4)

The ideal new partition boundaries are:

    n00,    n10,    n20,    n25,    n30
q(0)   q(1/5)  q(2/5)  q(3/5)  q(4/5)  q(5/5)

Further assume each node has a pretty good approximation of q. n00 will
transfer 1/4 - 1/5 = 1/20 of its data to n10. n10 will transfer 2/20 of its
data to n20: its fifth consists of one twentieth from n00 and three of its
twentieths. The remaining two twentieths now belong to n20. n20 now consists of
the two twentieths from n10 and its previous first three twentieths. It sends
its remaining two twentieths to n25. n25 additionally receives one twentieth
from n30, bringing its total to three twentieths. In the following cartoon each
dot is one-twentieth of the data

    n00       n10       n20       n30
. . . . .|. . . . .|. . . . .|. . . . .
  n00     n10     n20     n25     n30
. . . .|. . . .|. . . .|. . . .|. . . .

for 2 -> 3

 n00   n10
. . .|. . .
n00 n10 n20
. .|. .|. .

And for 5 -> 6 6/30 -> 5/30 2-> <-3

    n00         n10         n20         n25         n30
. . . . . .|. . . . . .|. . . . . .|. . . . . .|. . . . . .

    n00       n10       n15       n20       n25       n30
. . . . .|. . . . .|. . . . .|. . . . .|. . . . .|. . . . .

And for 6 -> 7

      n00           n10           n15           n20           n25           n30
. . . . . . .|. . . . . . .|. . . . . . .|. . . . . . .|. . . . . . .|. . . . . . .

    n00         n10         n15         n17         n20         n25         n30
. . . . . .|. . . . . .|. . . . . .|. . . . . .|. . . . . .|. . . . . .|. . . . . .

For n>3:

Let w = 1/n - 1/(n+1) = 1/(n(n+1)) and t = n/(n(n+1))

  • if n is even, let the old node names be 0,…,l,r,…,n, where r is n/2. Let
    the new node names be 0,…,h-1,h,h+1,…,n+1 where h is n/2.
    • new 0 receives t data from old 0.
    • new n+1 receives t data from old n.
    • new 1 receives 1 datum from old 0 and t-1 data from old 1
    • new n receives 1 datum from old n and t-1 data from old n-1
    • new nodes i in [2, h-1] receive 2w data from old i-1 and t-2w data
      from old i
    • new nodes i in [h+1, n-1] receive 2w data from old i+1 and t-2w data
      from old i
    • new h receives t/2 data from old l and t/2 data from old r.

If the new node is h, the middle node, all new nodes other than h receive at
most 2w data from someone else. The new node, h, receives t data in two chunks
from l and r. Data transferred is:

ends + penultimates +   others   + middle
  0  +    2 * w     + (n-2) * 2w +   t
= 2(n-1) * w + t
= 2(n-1) * w + n * w
= (n + 2n - 2) * w
= (3n - 2) * w
= (3n - 2) / (n^2 + n)

So in the limit, we’re looking at O(1/n) data transferred. Indeed for growing an
n=1 cluster, we transfer 1/2 data. For an n=2 cluster, if the middle node is
new, 1/3 of the data is transferred. If one of the ends is new 1/2 the data is
transferred. In general the least transfer occurs when a middle node is
new.

Regarding CDFs:

Hmm. For the prototype, I was planning to just regenerate the approx CDF from the data whenever I need to send an update to the leader. I like the idea of keeping a CDF of received keys. The followers do not care about the distribution of keys in their quantile. I just need the leader to know the CDF of the whole dataset without sending the leader every key.

I don’t think O(1/n) is right. The formula I get for moving from n to n+1 nodes, where the new node is inserted with i nodes to the left and j nodes to the right (so i + j = n), is

[ i(i+1) + j(j+1) ] / [ 2n(n+1) ]

So when the new node is on one end, i=n, j=0 or i=0, j=n, exactly 1/2 of the data is transferred.

When the new node is in the middle, i=j=n/2, (n+2)/(4n+4) of the data is transferred. In the limit, that’s 1/4.

So no matter what, between 1/4 and 1/2 of the data must be transferred between nodes.