Shuffler Design v4
The shuffler will have one “leader” and N “stores”.
At any point in time, the leader has a mapping from key ranges to storage node
sets (“store-sets”). A set has at least three nodes. A set of size 2f+1
tolerates as many as f node failures. Each store-set owns many key ranges. The
nodes in a store sets are ordered, I suppose they are really
store-sequences. The orderedness gives a bijection between two store-sets of the
same size. We use this correspondence when the cluster is changing ownership of
A query job can ask for the mapping for a shuffle. Query cores use the mapping
to determine the destinations of each record. They issue requests to every
member of the store-set. The data is durably stored when they have received a
confirmation from every node in the store-set.
If the stores become overloaded (in connections, CPU, or disk), the leader will
add a new store. Using a consistent-hashing-like scheme, we will create a new
class of store-sets that includes the new store node. The mapping will be
updated to map key ranges to the new class of store-sets. What happens next is
called an epoch transition.
The leader broadcasts the new store-sets and key-range mappings with a new epoch
number. If a store owns a key range in epoch e but not in epoch e+1, the store
attempts to transfer the data to the new owner. During this time, clients still
interact with the cluster as if it is in epoch e. If a store receives data it
owns in epoch e but not e+1, the data is not durably stored unless it is also
stored at the corresponding new owner. When every key-range is stored in f+1
stores that own it in epoch e+1, the cluster advances to epoch e+1. At this time
all stores are free to delete data they do not own in epoch e+1.
We wait for an epoch transition to complete before initiating a new epoch
transition. If epoch transition time bottle-necks cluster scale-up speed, we can
create multiple stores in one epoch transition.
If a node fails, the leader replaces it with a new node which assumes the
identity of the old node. The new node retrieves data from the other nodes in
the store-set. This can happen without an epoch transition. It can happen during
an epoch transition. Clients will time out connecting to the failed/dead
store. In response to the time out, they will attempt to contact other stores in
the store-set. (How do I inform clients of the new node? Perhaps every 5 minutes
they check-in with the leader for a mapping update?).
If a store-set has substantially higher load (cpu, disk, or connections) than
the median, the leader will reassign some key ranges from that store-set to
another store-set. This triggers an epoch transition.
If a key range grows substantially larger than the median key range size (in
bytes) for the shuffle, the key range is split into two or more equal sized key
ranges. If the key-range:store becomes too small, every key-range will be split
in two. Key range splitting does not trigger an epoch transition, because there
is no change in ownership.
In summary, there are three critical feedback systems:
- (nodes) the Leader adds nodes to a shuffle in response to store disk
load statistics (e.g. average, 99th-percentile, … of IOPS, bandwidth, …).
- (key-range-balancing) The leader reassigns key ranges from loaded machines to
- (key-range-splitting) The leader splits key ranges that grow substantially
larger than the median size and also splits all key ranges if the
key-range:store ratio becomes too small.
The first feedback system ensure there is ample IOPS, bandwidth, and space for
the shuffle. The second feedback system ensures key ranges are roughly evenly
distributed across the stores. The third feedback system ensures two
things. First, it ensures key ranges are roughly even in size. Second, it
ensures a granular enough division of data to enable the key-range-balancing
system to re-balance the data.
Key ranges should contain a relatively large amount of data, say megabytes. We
want to be bandwidth bound not IOPS bound.
As keys arrive, stores incorporate them into an approximate CDF. At any time,
the leader can poll the workers for their CDFs. The leader can determine
reasonably balanced key ranges from the approximate CDFs.
A store’s CDF represents those keys which it personally received. If ownership
of data changes, the key is still present in the old CDF and not present in the
new owner’s CDF. If a node dies, its CDF is lost. That information is gone
The Mapping and Store-sets
We assume f, the failure tolerance parameter, is fixed. To modify f, the shuffle
cluster must shut down and restart from scratch.
A store-set is an (2f+1)-tuple of store IP-port pairs.
The mapping associates intervals of virtual types with store-sets. The leader
maintains the mapping. The mapping is stored in a durable store (probably
MySQL). Read-only leader replicas can serve the mapping to clients. Clients may
have out-of-date views of the mapping. In the case of connection failure, they
simply try a different store. If all 2f+1 stores fail, they contact the leader
for a mapping update.
A key range is hashed to a floating point number in [0,1). Node-sets are given a
randomly chosen fp-number in [0,1). The least upper bounding node-set is the
owning node-set for a given key range. This is essentially consistent hashing.
- n1’s or n2’s
- some number of Local SSDs (multiplie so 375 GB)
- 8 or 16 cores (ergo 16 or 32 Gbps egress and 32 or 64 GB of RAM)
- no public IP address
- internal IP route-able from query nodes
- LSM-tree for data storage
- one process per core? concurrently accessing the LSM
I discuss various node sizes in Concrete Numbers.
Some cost figures:
|type||cores||SSDs||price per minute (USD)|
With 1, 8, and 16 SSDs we can theoretically write 350 MB/s, 1.4 GB/s, and 3.1
GB/s. We can read back roughly twice as fast. If this is the limiting factor,
then we might think of the cost to us of a shuffle in terms of bytes
shuffled. For row 2 (n1-8-8) shuffling 1 TB costs 0.14 USD for roughly 12
minutes of one node, judging only by theoretical, peak bandwidth.
It’s hard to give good estimates without knowing the incoming bandwidth from
query. Straggling query tasks are of significant concern. One slow task (say a
logistic regression that’s an input to a shuffle) requires us to hold all the
data for an extended period of time. This cost is not well captured by a service
fee because only one query node is alive.
Suppose a store node is an n1-standard-8 with 3 TB of Local SSD. The RAM to disk
ratio is 1:100. This node should have about 680k read IOPS or 360k write IOPS
and aggregate throughput of 2.650 GB/s reading and 1.4 GB/s writing .
This node, with sustained use discounts, costs ~200 USD per month. If it ran
continuously for one-quarter of that time it loses sustained use discounts but
still only costs ~50 USD per month. It’s worth noting that an equivalent
non-preemptible node respectively costs ~440 and ~125 USD. We address failure
with a 3x replication, which is about 30% more expensive than using one
Several shuffles on the order of 100 GB can easily share a store-set comprising
three n1-standard-8s. Instead, suppose we need to shuffle a 1PB dataset. We will
need ~350 stores of this size. Suppose the query job is using 100k
cores. Further suppose the worst case scenario in which all 100k cores start
writing shuffle input at the same time.
All 100k cores will attempt to ask the shuffle leader for the key mapping. We
can address this with read-only replicas of the leader. Moreover, failure to
connect to the leader should trigger exponential back-off and eventually all
100k cores will learn the key mapping. In the future, a gossip protocol may be
The query cores now have a trivial mapping sending all keys to one
node-set. They all attempt to open connections to nodes of said node-set. Said
node-set quickly becomes overwhelmed triggering exponential back-off in most of
the query cores. We probably want some explicit connection rate-limiting via
nginx or the k8s service (if possible) or in Python directly.
In response to high connection counts and CPU load, the leader creates more
stores. The new stores join with no assigned key ranges. The key range splitting
loop, using CDF information, splits some ranges. The key range balancing loop,
in parallel, is reassigning key ranges to new nodes. Epoch transitions occur to
safely migrate data.
Eventually the cluster grows large enough to comfortably handle the query
cores. How large is this? There are no ingress limits on internal IP addresses
. There is a 16 Gbps egress limit on an n1-standard-8 . Suppose Hail
can saturate that 16 Gbps. It seems unlikely a GCE VM will be able to accept
more than 16 Gbps. Therefore, network saturation occurs at 8:1 query-core:shuffle-node. 16
Gbps is 2 GB/s, which exceeds the disk bandwidth, 1.4 GB/s. Therefore, one
shuffle node could handle 5 or 6 cores at once.
A 100k core query cluster needs a 20k node shuffle cluster. Such a cluster would
have 60PB of SSD space, which is far more than necessary to shuffle the 1PB
dataset. At 1.4 GB/s per node and 20k nodes, we can theoretically move 1PB of
data in 35.7 seconds. Such a cluster costs 400 USD for five minutes.
Suppose we use a 1:10 RAM to disk ratio. The same network constraints
apply. Disk bandwidth is not posted, but a 1:10 ratio implies one SSD. One SSD
yields 660 MB/s read and 350 MB/s write  . Such a node is disk
bottle-necked. The theoretical time to move 1PB of data with 20k cores is 142
seconds. This cluster costs ~160 USD for five minutes.
Suppose we use 6TB of disk and an n1-standard-16, a 1:100 RAM-disk ratio. This
yields 3.1 GB/s of write bandwidth and likely at least that much network
bandwidth. Again, assuming ideal query cores, we can get to 12:1
query-core:shuffler-node. ~8.3k shuffler cores for 100k query cores. The
theoretical data movement time is ~38 seconds. This cluster is 383 USD for five
Background Reading on Large-Scale Shufflers
Hyper Dimension Shuffle: Efficient Data Repartition at Petabyte Scale in SCOPE
Microsoft has a data storage and processing system called Cosomos which includes
a map-reduce-like system called SCOPE. They’ve written a few papers on it. This
paper described “HD Shuffle”. The main contribution is the hyper-dimensional
partitioner. The key problem with a shuffle is that every node sends data to
every other node in an NxN dependency nightmare.
The hyper-dimension partitioner addresses this by partitioning the partitions!
Suppose we have a function partition that maps a key to the index of the
partition that owns that key:
partition_index = partition(key)
When shuffling partition_index lies in [0, N) where N is the number of
partitions. An HD partitioner conceives of a
partition_index as a
tuple. Although HD shuffle handles shuffles that change the degree of
parallelism, consider instead a shuffle from 8 partitions to 8 partitions. HD
Shuffle might restructure the dense, one-dimensional range [0, 8) into a
sparse two-dimensional grid: [(0, 0), (3, 3)), visually:
Call this mapping
hdpartition. We will re-cast both the input partitions and
the output partitions using this mapping.
The shuffle operation then proceeds in two stages. The first stage splits the
input into a separate file for each distinct value of the first dimension of the
input_key, input_record = read_one_input() output_key, output_record = operation(input_key, record) phase_one, _ = hdpartition(partition(output_key))
This produces at most three files per partition. Each file corresponds to a
a, b = hdpartition(partition(input_key)) c, _ = hdpartition(partition(output_key)) (a, b, c)
In total there are many files/partitions:
3 * 3 * 3 = 21 files. Our next
step is to reduce the number of partition by forgetting the left-most dimension:
(a, b, c) becomes (b, c)
This return us back to, at most, 9 partitions. Visually I represent a few input
partitions, their output files, and the
(b, c) partition they end up in:
Notice that partition
0 = (0,0) and partition
3 = (1,0) both contribute
files to the output-of-phase-one partition
0 = (0,0). Every
output-of-phase-one partition receive at most three files.
We continue in this manner of refining our knowledge of the output partition by
fraying into files and then collapsing them.
Implications for Shuffler
This strategy seems useful when a full NxN communication is necessary. However,
we hope to avoid ever moving data. In particular if a shuffle cluster of size N
adds one node, we only want to transfer the fraction 1/(N+1) of the data.
Riffle: Optimized Shuffle Service for Large-Scale Data Analytics
They concisely summarize the trouble with Shuffle I/O:
To avoid disk spills, the task input size (S) should be appropriate to fit in
memory, and thus is deter-mined by the underlying hardware. As the size of job
data increases, the number of map (M) and reduce ® tasks has to grow
proportionally. Because each reduce task needs to fetch from all map tasks,
the number of shuffle I/O requests M · R increases quadratically, and the
average block size S for each fetch decreases linearly.
Riffle’s key insight is that many map tasks share a VM or physical node. The
worker process on the node can track map completions and perform merges before
shuffle inputs are read.
Riffle goes on to note that disaggregated datacenter designs place storage
systems in separate racks from compute systems. As a result a whole rack of
compute systems have equal bandwidth to a whole rack of storage
systems. Therefore, any compute system can serve as a merger for shuffle
In the end, for a very large (100s of TB) job Riffle reduces a job to ~75% of
total core-days. That’s certainly something.