I have an apache-beam based dataflow job to read using vcf source from a single text file (stored in google cloud storage), transform text lines into datastore Entities and write them into the datastore sink. The workflow works fine but the cons I noticed is that:


  • The write speed into datastore is at most around 25-30 entities per second.
  • 数据存储区的写入速度最多约为每秒25-30个实体。
  • I tried to use --autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100 but the execution seems to prefer one worker (see graph below: the target workers once increased to 2 but reduced to 1 "based on the ability to parallelize the work in the currently running step").
  • 我尝试使用--autoscalingAlgorithm = THROUGHPUT_BASED --numWorkers = 10 --maxNumWorkers = 100但执行似乎更喜欢一个工作者(见下图:目标工作者曾经增加到2但是根据并行化的能力减少到1“当前运行步骤中的工作“)。

I did not use ancestor path for the keys; all the entities are the same kind.


The pipeline code looks like below:


def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

Because I have millions of rows to write into the datastore, it would take too long to write with a speed of 30 entities/sec.


Question: The input is just one huge gzipped file. Do I need to split it into multiple small files to trigger multiple workers? Is there any other way I can make the importing faster? Do I miss something in the num_workers setup? Thanks!


2 个解决方案



I'm not familiar with apache beam, the answer is from the general flow perspective.

我对apache beam不熟悉,答案是从一般的流程角度来看。

Assuming there are no dependencies to be considered between entity data in various input file sections then yes, working with multiple input files should definitely help as all these files could then be processed virtually in parallel (depending, of course, on the max number of available workers).


You might not need to split the huge zipfile beforehand, it might be possible to simply hand off segments of the single input data stream to separate data segment workers for writing, if the overhead of such handoff itself is neglijible compared to the actual data segment processing.

您可能不需要事先拆分巨大的zip文件,如果与实际数据段处理相比,这种切换本身的开销是可忽略的,那么可以简单地将单个输入数据流的段移交给单独的数据段工作者进行写入。 。

The overall performance limitation would be the speed of reading the input data, splitting it in segments and handoff to the segment data workers.


A data segment worker would further split the data segment it receives in smaller chunks of up to the equivalent of the max 500 entities that can be converted to entities and written to the datastore in a single batch operation. Depending of the datastore client library used it may be possible to perform this operation asyncronously, allowing the split into chunks and conversion to entities to continue without waiting for the previous datastore writes to complete.


The performance limitation at the data segment worker would then be the speed at which the data segment can be split into chunks and the chunk converted to entities


If async ops aren't available or for even higher throughput, yet another handoff of each chunk to a segment worker could be performed, with the segment worker performing the conversion to entities and datastore batch write.


The performance limitation at the data segment worker level would then be just the speed at which the data segment can be split into chunks and handed over to the chunk workers.


With such approach the actual conversion to entities and batch writing them to the datastore (async or not) would no longer sit in the critical path of splitting the input data stream, which is, I believe, the performance limitation in your current approach.




I looked into the design of vcfio. I suspect (if I understand correctly) that the reason I always get one worker when the input is a single file is due to the limit of the _VcfSource and the VCF format constraint. This format has a header part that defines how to translate the non-header lines. This causes that each worker that reads the source file has to work on an entire file. When I split the single file into 5 separate files that share the same header, I successfully get up to 5 workers (but not any more probably due to the same reason).


One thing I don't understand is that the number of workers that read can be limited to 5 (in this case). But why we are limited to have only 5 workers to write? Anyway, I think I have found the alternative way to trigger multiple workers with beam Dataflow-Runner (use pre-split VCF files). There is also a related approach in gcp variant transforms project, in which the vcfio has been significantly extended. It seems to support the multiple workers with a single input vcf file. I wish the changes in that project could be merged into the beam project too.

我不明白的一件事是,阅读的工人数量可以限制为5(在这种情况下)。但为什么我们只限于只有5名工人写?无论如何,我想我已经找到了使用beam Dataflow-Runner触发多个worker的替代方法(使用pre-split VCF文件)。 gcp变体转换项目中还有一个相关的方法,其中vcfio已得到显着扩展。它似乎支持具有单个输入vcf文件的多个worker。我希望该项目的变化也可以合并到梁项目中。