gpt4 book ai didi

google-cloud-platform - 如何计算Google Dataflow文件处理的输入文件中的行数?

转载 作者:行者123 更新时间:2023-12-04 00:54:14 26 4
gpt4 key购买 nike

我正在尝试计算输入文件中的行数,并且我正在使用 Cloud dataflow Runner 创建模板。在下面的代码中,我从 GCS 存储桶中读取文件,对其进行处理,然后将输出存储在 Redis 实例中。

但是我无法计算输入文件的行数。

主类

 public static void main(String[] args) {
/**
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
*/
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);

Pipeline p = Pipeline.create(options);
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
.apply("Transforming data...",
ParDo.of(new DoFn<String, String[]>() {
@ProcessElement
public void TransformData(@Element String line, OutputReceiver<String[]> out) {
String[] fields = line.split("\\|");
out.output(fields);
}
}))
.apply("Processing data...",
ParDo.of(new DoFn<String[], KV<String, String>>() {
@ProcessElement
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
if (fields[RedisIndex.GUID.getValue()] != null) {

out.output(KV.of("firstname:"
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

out.output(KV.of("lastname:"
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

out.output(KV.of("dob:"
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));

out.output(KV.of("postalcode:"
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));

}
}
}))
.apply("Writing field indexes into redis",
RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
.withEndpoint(options.getRedisHost(), options.getRedisPort()));
p.run();

}

示例输入文件

xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666

执行管道的命令

mvn compile exec:java \
-Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
-Dexec.args="--project=my-project-id \
--jobName=dataflow-job \
--inputFile=gs://my-input-bucket/*.txt \
--redisHost=127.0.0.1 \
--stagingLocation=gs://pipeline-bucket/stage/ \
--dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
--runner=DataflowRunner"

我尝试使用 StackOverflow solution 中的以下代码但这对我不起作用。

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
p.apply(TextIO.Read.from("gs://..."))
.apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

我也浏览了 Apache Beam 文档,但没有找到任何有用的信息。对此的任何帮助将不胜感激。

最佳答案

我通过添加 Count.globally() 解决了这个问题并申请 PCollection<String>管道读取文件后。

我添加了以下代码:

PCollection<String> lines = p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()));

lines.apply(Count.globally()).apply("Count the total records", ParDo.of(new RecordCount()));

我在其中创建了一个新类 (RecordCount.java),它扩展了 DoFn ,它只记录计数。

RecordCount.java

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordCount extends DoFn<Long, Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);

@ProcessElement
public void processElement(@Element Long count) {
LOGGER.info("The total number of records in the input file is: ", count);

}
}

}

关于google-cloud-platform - 如何计算Google Dataflow文件处理的输入文件中的行数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63944012/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com