阅读背景:

如何加快批量导入多个工作人员的谷歌云数据存储?

来源:互联网 

I have an apache-beam based dataflow job to read using vcf source from a single text file (stored in google cloud storage), transform text lines into datastore Entities and write them into the datastore sink. The workflow works fine but the cons I noticed is that:

我有一个基于apache-beam的数据流作业,可以从单个文本文件(存储在谷歌云存储中)中使用vcf源进行读取,将文本行转换为数据存储区实体并将它们写入数据存储区接收器。工作流程工作正常,但我注意到的缺点是:

  • The write speed into datastore is at most around 25-30 entities per second.
  • 数据存储区的写入速度最多约为每秒25-30个实体。
  • I tried to use --autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100 but the execution seems to prefer one worker (see graph below: the target workers once increased to 2 but reduced to 1 "based on the ability to parallelize the work in the currently running step").
  • 我尝试使用--autoscalingAlgorithm = THROUGHPUT_BASED --numWorkers = 10 --maxNumWorkers = 100但执行似乎更喜欢一个工作者(见下图:目标工作者曾经增加到2但是根据并行化的能力减少到1“当前运行步骤中的工作“)。

I did not use ancestor path for the keys; all the entities are the same kind.

我没有使用祖先路径的钥匙;所有实体都是同一种。

The pipeline code looks like below:

管道代码如下所示:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

Because I have millions of rows to write into the datastore, it would take too long to write with a speed of 30 entities/sec.

因为我有数百万行要写入数据存储区,所以用30个实体/秒的速度写入需要很长时间。

Question: The input is just one huge gzipped file. Do I need to split it into multiple small files to trigger multiple workers? Is there any other way I can make the importing faster? Do I miss something in the num_workers setup? Thanks!

问题:输入只是一个巨大的gzip压缩文件。我是否需要将其拆分为多个小文件以触发多个工作人员?还有其他方法可以让导入速度更快吗?我是否错过了num_workers设置中的内容?谢谢!

2 个解决方案

#1


2  

I'm not familiar with apache beam, the answer is from the general flow perspective.

我对apache beam不熟悉,答案是从一般的流程角度来看。

Assuming there are no dependencies to be considered between entity data in various input file sections then yes, working with multiple input files should definitely help as all these files could then be processed virtually in parallel (depending, of course, on the max number of available workers).

假设在各种输入文件部分中的实体数据之间没有依赖关系,那么是的,使用多个输入文件肯定会有所帮助,因为所有这些文件可以实际上并行处理(当然,取决于最大可用数量)工人)。

You might not need to split the huge zipfile beforehand, it might be possible to simply hand off segments of the single input data stream to separate data segment workers for writing, if the overhead of such handoff itself is neglijible compared to the actual data segment processing.

您可能不需要事先拆分巨大的zip文件,如果与实际数据段处理相比,这种切换本身的开销是可忽略的,那么可以简单地将单个输入数据流的段移交给单独的数据段工作者进行写入。 。

The overall performance limitation would be the speed of reading the input data, splitting it in segments and handoff to the segment data workers.

总体性能限制将是读取输入数据的速度,将其分段并切换到段数据工作者。

A data segment worker would further split the data segment it receives in smaller chunks of up to the equivalent of the max 500 entities that can be converted to entities and written to the datastore in a single batch operation. Depending of the datastore client library used it may be possible to perform this operation asyncronously, allowing the split into chunks and conversion to entities to continue without waiting for the previous datastore writes to complete.

数据段工作人员将进一步将其接收的数据段拆分为较小的块,最多可相当于最多500个实体,这些实体可以转换为实体并在单个批处理操作中写入数据存储区。根据所使用的数据存储客户端库,可以异步执行此操作,允许拆分为块并转换为实体,而无需等待先前的数据存储写入完成。

The performance limitation at the data segment worker would then be the speed at which the data segment can be split into chunks and the chunk converted to entities

然后,数据段工作程序的性能限制将是数据段可以拆分为块并且块转换为实体的速度

If async ops aren't available or for even higher throughput, yet another handoff of each chunk to a segment worker could be performed, with the segment worker performing the conversion to entities and datastore batch write.

如果异步操作不可用或者甚至更高的吞吐量,则可以执行每个块到段工作器的另一次切换,其中段工作器执行到实体的转换和数据存储批量写入。

The performance limitation at the data segment worker level would then be just the speed at which the data segment can be split into chunks and handed over to the chunk workers.

数据段工作者级别的性能限制将只是数据段可以拆分成块并移交给块工作者的速度。

With such approach the actual conversion to entities and batch writing them to the datastore (async or not) would no longer sit in the critical path of splitting the input data stream, which is, I believe, the performance limitation in your current approach.

使用这种方法,实际转换为实体并批量写入数据存储区(异步与否)将不再处于分割输入数据流的关键路径中,我相信,这是当前方法中的性能限制。

#2


2  

I looked into the design of vcfio. I suspect (if I understand correctly) that the reason I always get one worker when the input is a single file is due to the limit of the _VcfSource and the VCF format constraint. This format has a header part that defines how to translate the non-header lines. This causes that each worker that reads the source file has to work on an entire file. When I split the single file into 5 separate files that share the same header, I successfully get up to 5 workers (but not any more probably due to the same reason).

我研究了vcfio的设计。我怀疑(如果我理解正确的话)当输入是单个文件时我总是得到一个工作者的原因是由于_VcfSource和VCF格式约束的限制。此格式有一个标题部分,用于定义如何转换非标题行。这会导致每个读取源文件的worker必须处理整个文件。当我将单个文件拆分为共享相同标题的5个单独文件时,我成功地获得了5个工作者(但由于同样的原因,可能不会更多)。

One thing I don't understand is that the number of workers that read can be limited to 5 (in this case). But why we are limited to have only 5 workers to write? Anyway, I think I have found the alternative way to trigger multiple workers with beam Dataflow-Runner (use pre-split VCF files). There is also a related approach in gcp variant transforms project, in which the vcfio has been significantly extended. It seems to support the multiple workers with a single input vcf file. I wish the changes in that project could be merged into the beam project too.

我不明白的一件事是,阅读的工人数量可以限制为5(在这种情况下)。但为什么我们只限于只有5名工人写?无论如何,我想我已经找到了使用beam Dataflow-Runner触发多个worker的替代方法(使用pre-split VCF文件)。 gcp变体转换项目中还有一个相关的方法,其中vcfio已得到显着扩展。它似乎支持具有单个输入vcf文件的多个worker。我希望该项目的变化也可以合并到梁项目中。


分享到: