RFC: Batch, Pipeline, CI roadmap

cc: @cseed @jigold @akotlar


This is the two month roadmap. Hopefully a secure usable prototype is available to Liam within one month (say, the Ides of March)

The goal is for Batch to handle copying output files from parent jobs to child jobs. This will be handled by a pair of pods that are executed before and after the main pod of the Job. All three pods will share a volume unique to the Job. The data will be stored in a Bucket owned by the user. All pods will run with access to a GCS (nb: not k8s) service account (owned by Hail) token. The user will grant said service account access to their buckets. The Batch client will authenticate with the Batch server in some manner (presumably via the authentication service).

Batch will use a SQL database as a persistent store of its state.

Pipeline will use the new Batch system as a backend. CI will use pipeline to implement CI pipelines.

Batch will have a UI that displays batch jobs, their parents and children, and their logs to the owning users (or to the public, if a flag is set).


  • Batches have a “scratch” bucket parameter

  • Jobs have a “public” parameter indicating that their logs, input, and output are all publicly readable

  • Jobs have three pods that run sequentially. The first and last pod have image google-cloud-sdk, and command echo hi. A pod is complete if all three pods are complete or if any pod is complete with a non-zero exit code. If a pod exits with a non-zero exit code subsequent pods are not started.

  • Jobs are associated with a shared volume that is created before the first pod is started and destroyed when the job is canceled, in-error, or complete.

  • Jobs are owned by a user account

  • User accounts are associated with a GCP (nb: not k8s) service account and a GCS bucket

  • The batch client can authenticate with the batch server as a particular user in a secure way

  • All pods run with access to the GCP service account token of the owning user.

  • Jobs have a new parameter non_job_input_files (array of GCS urls). All pods in a Job now mount the shared volume. The first pod of a Job copies all the files specified by non_job_input_files to the Job’s volume at /input.

  • Jobs have a new parameter output_files (array of glob expressions). The last pod of a Job copies all the matching files to a GCS folder associated with that Job in the Batch’s “scratch” bucket. The first pod of all children Jobs copies the contents of that folder to /input.

  • Batch creates a batch database in a SQL database server on-startup if the database does not exist. Batch creates two tables ([foo] indicates the unique Primary Key):

    • batch: [id: Int], attributes: JSON, callback: String, ttl: Int, is_open: Boolean, create_time: DateTime, owner: String
    • job: [id: Int], batch_id: ForeignKey, attributes: JSON, callback: String, child_ids: Int, parent_ids: Int, exit_code: Int, pod_name: String, state: String, owner: String, log: String

    Batch creates secondary indexes in the job table on batch_id and pod_name.

    Batch uses the SQL table for all state, making a SQL transaction for each state change or state read.

  • The Batch database name is configurable and make test creates a UUID and uses it as the batch database name allowing two batch instances to live side-by-side in k8s.

  • Port pipeline to the new batch API

  • Port CI & hail-ci-build.sh’s to the new batch API

  • Enhance Batch UI to show job parent-child dependencies

  • Enhance Batch UI to have a DAG visualization

  • investigate luigi and how it relates to us https://github.com/spotify/luigi

  • batch image-fetcher should also fetch google/cloud-sdk

This all sounds right to me. Some small comments:

  • In addition to log, job status page should have status (completed
    vs cancelled vs pending) and exit status.

  • “scratch” should be a directory (e.g. gs://my-bucket/scratch/pipeline). The user bucket will be used for various things (including Jupyter notebooks).

  • You wrote:

    All pods run with access to the GCP service account token of the owning user.

    We need to be able to run “meat” pod with no credentials (for example, CI testing 3rd party code).

  • Does batch table need state?

  • job table needs log (or location where log is stored)

  • You wrote:

    Jobs have a new parameter non_job_input_files

    Alternative is to have an inputs section with two kinds of inputs: files or job outputs.

  • jobs also need to specify that outputs should be copied to Google storage (instead of or in addition to scratch bucket)

scratch_bucket added: https://github.com/hail-is/hail/pull/5393

The tables also need the owner.

1 Like

I think we should have two options then: credentialed and non-credentialed jobs. The first and last pods always have credentials, but in non-credentialed jobs the middle pod lacks any credentials. CI will use non-credentialed pods. Pipeline users can choose either type of job, but will need to use credentialed pods if they’re writing to some long-term storage location. We can add this as an extension PR after the all-credentialed change goes in. Does that sound right cotton? I think this is the most straightforward way to let a user copy data out of the batch system. (ah, also see my response to your last point below)

I don’t think it needs any summary of its’ jobs’ states. Are you thinking of something else? It does have open/closed status of the batch.

Yeah, good point. My gut says we shouldn’t store that in the table, but maybe that’s the easiest approach fo now. (I’ll edit original post)

Hmm, true. Meaning you could filter down the files you get from certain parents? I suppose that might be useful, but I’m inclined to delay implementing that until someone has a compelling use-case.

Ah, yes, see the discussion of credentialed pods. The interface is simpler if they’re obligated to copy data out of the batch system themselves. I suppose we could have something like extract_output_to that is a GCS folder to copy the output to. I think again, we can get something useful without this by requiring the user to add a copy-out-of-batch step themselves and follow up with this.

Yes, (non-)credentialed pods sounds right.

My gut says we shouldn’t store that in the table

I agree, since the log could be large. We can just throw it in a bucket instead of storing locally. I was thinking you’d want the URL of the log in the table, but now I think you could just use the job id. I’m happy with basically any of the above.

Meaning you could filter down the files you get from certain parents?

100% you need to be able to specify only the files you want from a parent.

But I’m basically suggesting treating files from outside and files from parents analogously:

  - fromGoogleStorage: gs://bucket/path
  - parent: foo
    path: path/in/parent
  - ...

And same for outputs:

  - path: path/in/self
    toGoogleStorage: gs://bucket/path/in/gs
  - path: path2/in/self

The interface is simpler if they’re obligated to copy data out of the batch system themselves.

This won’t work for pipeline. The “meat” pod is provided by the user, but the pipeline API allows you to copy to/from external sources. The alternative is a dummy step and an extra copy to/from Google Storage, but then the dag picture doesn’t match what the user submitted and, well, performance.

OK, I’m on the same page for final product goal.

I think we should start with the simpler: copy-all-inputs-from-parents, separate step or copy output from your tool yourself in meat pod (by making gsutil available in meat pod’s image). We’ll add the things you’ve noted before final product.

Regarding batch is_open: A job exiting with non-0 status is distinct from Kubernetes not fulfilling its promise. This could happen frequently if we allow non-user jobs to be pre-empted, and less often from issues related to cluster stability. Additionally, since state is now handled in two places (etcd, mysql), and therefore state operations cannot be atomic, I think we need to account for race conditions and network instability (wherein we are unable to persist a state update that Kubernetes intends to send).

So, do we want to give batch the ability to ask Kubernetes to retry something that doesn’t exit but is missing.

  • On startup batch could check whether the job is running, finished, or gone (not present, but no job exit code previously received). This would also allow the upstream initiating agent (user, non-interactive service) to make decisions about when to retry.
  • If so, is_open could be an enum (so -1, 0, 1), or there could be an additional state parameter.
  • This could also be handled at the individual job level instead, to allow users to retry subgraphs.


is_open means the batch is still being constructed. The create batch endpoint creates an empty batch. The create job endpoint (optionally) adds a job to a batch. There is a close batch endpoint to indicate the full graph has been constructed.

The alternative to this is to have create batch take the entire graph. That might be preferable. I think all current clients (pipeline and ci) know the entire graph statically.

we want to give batch the ability to ask Kubernetes to retry something that doesn’t exit but is missing

Batch already does this. It follows the Kubernetes “controller” pattern: it attempts to unify the batch state and the k8s state. If a job pod has successfully exited on the k8s but not been recorded in the batch state, it is recorded. If a job which isn’t complete isn’t running, it creates a new pod to run it.

we need to account for race conditions

Yes, batch will have to do leader election to make sure only one process is managing the state. This will also allow us to do fast-failover if we want high availability. I was planning to do this with (our own) etcd since it is pretty trivial there.

There is a close batch endpoint to indicate the full graph has been constructed

We need this because we have “cleanup” finalizer jobs that run when the batch is finished (whether it failed or not). Thus, we need to know when the graph is complete to know if it is done or not.

An alternative is to have finalizer pods have dependencies and always run when its parents are done. This is an option but slightly increases the burden on the graph builder.

@cseed @jigold

I am thinking about the security around volumes for the Liam/Konrad-beta.

Currently, every pod has access to the vdc-sa GCP service account credentials which doesn’t give them access to, for example, gs://hail-ci-0-1/.

To test copying of volumes, we need a bucket to which test jobs have read/write access. Privilege to access a GS bucket is conferred by a GCP SA key file. GKE SAs do not have any relationship to GCP SAs. To grant a pod privilege to access a GS bucket, we must create a secret containing a GCP SA key file and mount that secret into the pod. AFAICT, it is not possible to simply grant a GKE SA access to a GS bucket.

So, to successfully copy data in/out, we need to have access to a GCP SA key file both locally (when testing with a local batch server), during tests (when testing in the batch-pods namespace inside a batch job), and during run-time.

Moreover, we need at least two distinct security contexts: access to Liam/Konrad’s data and no access. We must prevent the PRs from accessing the former’s data.

So, I think tomorrow I can just create a garbage bucket that all of the VDC has access to and use that to test volumes.

However, I think we need to figure out a basic authentication scheme before we give this to Liam/Konrad. We also need to hammer out our story for managing GKE and GCP SAs and how these are related (or not).

A few thoughts that came up on https://github.com/hail-is/hail/pull/6569

start_job checks job._state == "Ready", but jobs that are pushed to start_job are pre-checked for this same status. Besides having 2 O(n) comparisons, which may or may not matter, only the 2nd check should be done: the first could introduce subtle race-like bugs: we probably want to check whether the state is “Ready” at the last moment

  • We also may want to check the database record of state before starting the job, in start_job; having a single source of state is less complex, and therefore less likely to fail (we’re passing a pointer to a job, whose _state could become mutated) This would however put more pressure on the the sql database.

It feels like passing the full list of jobs to the queue, once, instead of .put() N times would improve performance, and decrease the likelihood that a state transition that we don’t want happens. Additionally, if we ever use bounded queues, we will be much more likely to hit that limit if we put N items, vs 1 collection of N items.

N start_job consumers are spawned, but operating on the same thread (via scale_queue_consumers, which calls asyncio.ensure_future on the function passed to it, which in this case is an infinite loop). It would seem reasonable to use N kernel threads / processes instead.

Queues can be used with threads, but there is an apparently neat alternative in the form of collections.deque. Deque performs better, and has several atomic, lockless, operations. Combine a process pool and dequeue to send and consume work quickly.

cc @jigold, @cseed, @dking