Vds.new_combiner

The vds.new_combiner overview is missing on this page. Could that section be updated?
https://hail.is/docs/0.2/vds/index.html

Also is there a tutorial for the new variant data set api?

Should vds.new_combiner now be used instead of the hail.experimental.run_combiner for combining gvcfs?

Yes, definitely. The VCF combiner / VDS combiner took off a bit faster than we anticipated! I expect some basic docs to be up before the end of the week.

Docs for the combiner can now be found here. Documentation on the VDS representation and associated classes can be found here.

Thanks @dking! I just tried the new_combiner and getting the following warning and then the job suddenly stops. Any suggestions on this?

Running on Apache Spark version 3.1.2
SparkUI available at http://scc-hadoop.bu.edu:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2.97-ccc85ed41b15
LOGGING: writing to /restricted/projectnb/kageproj/gatk/combiner/combiner.log
n gvcfs: 150
vds:/project/kageproj/dragen/kage.gvcf.chr22.vds
2022-09-21 13:20:25 Hail: INFO: Coerced sorted dataset (0 + 1) / 1]
2022-09-21 13:20:40 Hail: WARN: generated combiner save path of tmp/combiner-plans/vds-combiner-plan_fef8d273ee43de7affbc317e5237cef1fa4af84bda70d343a40dc1841d971c42_0.2.97.json
2022-09-21 13:20:42 Hail: WARN: gvcf_batch_size of 100 would produce too many tasks using 58 instead

I decreased the number of gvcfs from ~3200 to 150 and still get this warning and then stopping.

The Hail script is this:

import os
import sys
import hail as hl
chr= sys.argv[1]
hl.init(log=‘/restricted/projectnb/kageproj/gatk/combiner/combiner.log’,tmp_dir=‘/project/kageproj/tmp’)
os.system(“hdfs dfs -ls -C /project/kageproj/gvcf.rb/chr”+chr+“/*.rb.g.vcf.gz |head -150 >combiner_”+chr+“.list” )
os.system(“hdfs dfs -put -f combiner_”+chr+“.list /project/kageproj/gvcf.rb” )

path_to_input_list = ‘/project/kageproj/gvcf.rb/combiner_’+chr+“.list” # a file with one GVCF path per line
gvcfs =
with hl.hadoop_open(path_to_input_list, ‘r’) as f:
for line in f:
gvcfs.append(line.strip())
print(“n gvcfs: “+str(len(gvcfs)))
vds_path = “/project/kageproj/dragen/kage.gvcf.chr”+chr+”.vds” # output destination
print(“vds:”+vds_path)
temp_bucket = ‘/project/kageproj/tmp’ # bucket for storing intermediate files

hl.vds.new_combiner(reference_genome=‘GRCh38’,
temp_path=‘tmp’,
gvcf_paths=gvcfs,
output_path=vds_path,
use_genome_default_intervals=True)

new_combiner doesn’t actually run the combiner – it builds the plan.

do:

combiner = hl.vds.new_combiner(reference_genome=‘GRCh38’,
temp_path=‘tmp’,
gvcf_paths=gvcfs,
output_path=vds_path,
use_genome_default_intervals=True)

combiner.run()

The warning is about the “batch size”, not the number of input GVCF files. The batch size controls how many GVCF files are combined in one iteration of the combiner. I believe we’re attempting to defend against Spark’s task graph size limitations. You shouldn’t need to take any action. If you want to hide the warning, specify batch_size=50 to new_combiner.

Thanks! I then tried to convert the vds to a matrix table and vcf with the following steps. During the vcf export, an Invalid type for format field ‘gvcf_info’ error was generated (see below). How should this this error be dealt with? Is there a better approach to create a vcf from a vds?

import os
import sys
import hail as hl
chr= sys.argv[1]
hl.init(log=‘/restricted/projectnb/kageproj/gatk/combiner/combiner.log’,tmp_dir=‘/project/kageproj/tmp’)
os.system(“hdfs dfs -ls -C /project/kageproj/gvcf.rb/chr”+chr+“/*.rb.g.vcf.gz >combiner_”+chr+“.list” )
os.system(“hdfs dfs -put -f combiner_”+chr+“.list /project/kageproj/gvcf.rb” )

path_to_input_list = ‘/project/kageproj/gvcf.rb/combiner_’+chr+“.list” # a file with one GVCF path per line
print(path_to_input_list)
gvcfs =
with hl.hadoop_open(path_to_input_list, ‘r’) as f:
for line in f:
gvcfs.append(line.strip())
print(“n gvcfs: “+str(len(gvcfs)))
vds_path = “/project/kageproj/dragen/kage.gvcf.chr”+chr+”.vds” # output destination
print(“vds:”+vds_path)
temp_bucket = ‘/project/kageproj/tmp’ # bucket for storing intermediate files

combiner=hl.vds.new_combiner(reference_genome=‘GRCh38’,
temp_path=‘tmp’,
gvcf_paths=gvcfs,
output_path=vds_path,
use_genome_default_intervals=True)
combiner.run()
vds = hl.vds.read_vds(vds_path)
print(“before split”)
vds_split = hl.vds.split_multi(vds)
print(“after split”)
mt = hl.vds.to_dense_mt(vds_split)
print(“after to dense”)
mt.count()
mt.describe()
mt.summarize()
mt.show(10)
mt.write(“/project/kageproj/mt/gard.chr”+chr+“.raw.mt”)
mt=hl.read_matrix_table(“/project/kageproj/mt/gard.chr”+chr+“.raw.mt”)
print(mt.count())
mt.describe()
mt.show()
hl.export_vcf(mt,“/project/kageproj/pVCF/gard.chr”+chr+“.raw.vcf.bgz”)

During the export, the following error occurred:

showing top 10 rows
showing the first 0 of 3449 columns
2022-09-22 10:43:43 Hail: WARN: export_vcf: ignored the following fields:
‘a_index’ (row)
‘was_split’ (row)
Traceback (most recent call last):
File “/restricted/projectnb/kageproj/gatk/vds.pipeline/./check_vds.py”, line 44, in
hl.export_vcf(mt,“/project/kageproj/pVCF/gard.chr”+chr+“.raw.vcf.bgz”)
File “”, line 2, in export_vcf
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/typecheck/check.py”, line 577, in wrapper
return original_func(*args, **kwargs)
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/methods/impex.py”, line 554, in export_vcf
Env.backend().execute(ir.MatrixWrite(dataset._mir, writer))
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/backend/py4j_backend.py”, line 104, in execute
self._handle_fatal_error_from_backend(e, ir)
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/backend/backend.py”, line 181, in _handle_fatal_error_from_backend
raise err
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/backend/py4j_backend.py”, line 98, in execute
result_tuple = self._jbackend.executeEncode(jir, stream_codec)
File “/share/pkg.7/spark/3.1.2/install/spark-3.1.2-bin-scc-spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py”, line 1305, in call
File “/share/pkg.7/hail/0.2.97/install/lib/python3.7/site-packages/hail/backend/py4j_backend.py”, line 31, in deco
raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
hail.utils.java.FatalError: HailException: Invalid type for format field ‘gvcf_info’. Found ‘struct{AC: array, AF: array, AN: int32, AS_BaseQRankSum: array, AS_FS: array, AS_InbreedingCoeff: array, AS_MQ: array, AS_MQRankSum: array, AS_QD: array, AS_QUALapprox: array, AS_RAW_BaseQRankSum: str, AS_RAW_MQ: array, AS_RAW_MQRankSum: array<tuple(float64, int32)>, AS_RAW_ReadPosRankSum: array<tuple(float64, int32)>, AS_ReadPosRankSum: array, AS_SB_TABLE: array<array>, AS_SOR: array, AS_VarDP: array, BaseQRankSum: float64, DRAGstrInfo: array, DRAGstrParams: array, ExcessHet: float64, FS: float64, InbreedingCoeff: float64, MQ: float64, MQRankSum: float64, MQ_DP: int32, QD: float64, QUALapprox: int32, RAW_GT_COUNT: array, RAW_MQandDP: array, ReadPosRankSum: float64, SOR: float64, VarDP: int32}’.

Java stack trace:
is.hail.utils.HailException: Invalid type for format field ‘gvcf_info’. Found ‘struct{AC: array, AF: array, AN: int32, AS_BaseQRankSum: array, AS_FS: array, AS_InbreedingCoeff: array, AS_MQ: array, AS_MQRankSum: array, AS_QD: array, AS_QUALapprox: array, AS_RAW_BaseQRankSum: str, AS_RAW_MQ: array, AS_RAW_MQRankSum: array<tuple(float64, int32)>, AS_RAW_ReadPosRankSum: array<tuple(float64, int32)>, AS_ReadPosRankSum: array, AS_SB_TABLE: array<array>, AS_SOR: array, AS_VarDP: array, BaseQRankSum: float64, DRAGstrInfo: array, DRAGstrParams: array, ExcessHet: float64, FS: float64, InbreedingCoeff: float64, MQ: float64, MQRankSum: float64, MQ_DP: int32, QD: float64, QUALapprox: int32, RAW_GT_COUNT: array, RAW_MQandDP: array, ReadPosRankSum: float64, SOR: float64, VarDP: int32}’.
at is.hail.utils.ErrorHandling.fatal(ErrorHandling.scala:17)
at is.hail.utils.ErrorHandling.fatal$(ErrorHandling.scala:17)
at is.hail.utils.package$.fatal(package.scala:78)
at is.hail.io.vcf.ExportVCF$.$anonfun$checkFormatSignature$1(ExportVCF.scala:91)
at is.hail.io.vcf.ExportVCF$.$anonfun$checkFormatSignature$1$adapted(ExportVCF.scala:85)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at is.hail.io.vcf.ExportVCF$.checkFormatSignature(ExportVCF.scala:85)
at is.hail.expr.ir.MatrixVCFWriter.lower(MatrixWriter.scala:411)
at is.hail.expr.ir.WrappedMatrixWriter.lower(MatrixWriter.scala:54)
at is.hail.expr.ir.lowering.LowerTableIR$.apply(LowerTableIR.scala:679)
at is.hail.expr.ir.lowering.LowerToCDA$.lower(LowerToCDA.scala:73)
at is.hail.expr.ir.lowering.LowerToCDA$.apply(LowerToCDA.scala:18)
at is.hail.expr.ir.lowering.LowerToDistributedArrayPass.transform(LoweringPass.scala:77)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:27)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:67)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:15)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:13)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:416)
at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:452)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:70)
at is.hail.utils.package$.using(package.scala:640)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:70)
at is.hail.utils.package$.using(package.scala:640)
at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:59)
at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:310)
at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:449)
at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:448)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)

Hail version: 0.2.97-ccc85ed41b15
Error summary: HailException: Invalid type for format field ‘gvcf_info’. Found ‘struct{AC: array, AF: array, AN: int32, AS_BaseQRankSum: array, AS_FS: array, AS_InbreedingCoeff: array, AS_MQ: array, AS_MQRankSum: array, AS_QD: array, AS_QUALapprox: array, AS_RAW_BaseQRankSum: str, AS_RAW_MQ: array, AS_RAW_MQRankSum: array<tuple(float64, int32)>, AS_RAW_ReadPosRankSum: array<tuple(float64, int32)>, AS_ReadPosRankSum: array, AS_SB_TABLE: array<array>, AS_SOR: array, AS_VarDP: array, BaseQRankSum: float64, DRAGstrInfo: array, DRAGstrParams: array, ExcessHet: float64, FS: float64, InbreedingCoeff: float64, MQ: float64, MQRankSum: float64, MQ_DP: int32, QD: float64, QUALapprox: int32, RAW_GT_COUNT: array, RAW_MQandDP: array, ReadPosRankSum: float64, SOR: float64, VarDP: int32}’.

VCFs cannot have structure-typed fields. In particular, you can’t represent something like:

{"some_struct_entry_field": {
    "a_nested_entry_field": 123, 
    "another_nested_entry_field": 456
 },
 "GT": "0/1"
}

You can .drop("gvcf_info") to throw away all that information or you can flatten the structure by lifting all the fields to the top level:

mt = mt.select_entries(**mt.entry.flatten())

I should warn you that generating a dense project VCF will create a VCF much larger than the VDS! That’s fine if its what you need, but it will be rather large, particularly as your sample sizes get into the 100s of thousands.