The Shuffler is a service with one leader and many followers.
The leader knows the list of followers and the list of shuffles. For each shuffle it has an approximate CDF.
Each follower has an entry for each shuffle. A shuffle entry is:
- an ordered data structure of the received records
- a cdf of the received keys
- a list of key intervals and which followers own said interval
A client asks the leader to create a new shuffle. The leader generates an id and contacts each follower and informs them of the new shuffle. The followers enter the “put” state. Then it returns the new id and the list of followers to the client.
The client concurrently adds data to all the followers. Periodically, the client checks the leader for the current list of followers. The followers periodically send their CDF to the leader which updates its overall CDF accordingly. The leader periodically sends the followers an updated list of owned key intervals. The followers periodically send data which they do not own to the true owner.
The client then asks the leader to end input to the shuffle. The leader computes a final list of owned key intervals (using whatever CDF it has on hand, perhaps out-of-date) and sends the key intervals to each follower, informing them the input has ended. The leader responds to the client with the owned key intervals. Each follower sends data it does not own to the true owner. When it holds no unowned data it enters the “ready for get” state and informs the leader it is in the “ready for get” state. When all the followers have entered “ready for get,” the leader sends all followers a message to enter the “get” state.
The client concurrently asks for intervals from followers. If a follower has not entered the “get” state it returns 503. The client exponentially backs off. If the follower does not own the requested interval, it returns 400. The client asks the leader for the owned key intervals and tries again.
The client asks the leader to delete the shuffle. The leader immediately responds with success. The leader tells the followers to delete the shuffle. The followers remove all remaining data.
A follower can join the cluster at any time. It notifies the leader. The leader informs the cluster of its presence by sending an updated owned key intervals.
In the “put” state, the followers will eventually send data to the new follower. Moreover, the clients will eventually ask the leader for an updated follower list.
In the “get” state, the new follower will initially return 503 to all get requests. Once a follower has successfully sent all data to the new owners, it informs the leader. In the interim, the follower can still respond to client requests. It should not delete the data it has sent away. The leader then informs all followers they should switch to the new owned key intervals. At this point, clients can delete the data they’ve sent away.
At any time, a follower always knows if it is the authority on an interval. Until a follower enters the “get” state, it is not an authority. In the “get” state, is always an authority on the interval it owns. Even if the cluster has added a new follower of which it is unaware, it is still the authority on the data it owns.
For resilience, I think each follower can have a unique replicate who just duplicates its data. If the nodes are named 0, 1, 2 then every node’s replicate can be i+1 mod 3. If a node disappears, the leader will eventually realize and adjust the owned key intervals accordingly.