- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想在 Apache Beam(使用 Java)中使用 FileIO
和 writeDynamic()
将键、值对中的值通过键写入 GCS 中的文本文件。
到目前为止,我正在从 Big Query 读取数据,将其转换为键、值对,然后尝试使用 FileIO 和 writeDynamic()
将值写入每个键一个文件中.
PCollection<TableRow> inputRows = p.apply(BigQueryIO.readTableRows()
.from(tableSpec)
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "string1", "string2", "string3", "int1")));
inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to("gs://bucket/output")
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
我收到错误:
The method apply
(PTransform<? super PCollection<KV<Integer,String>>,OutputT>)
in the type PCollection<KV<Integer,String>>
is not applicable for the arguments
(FileIO.Write<String,KV<String,String>>)
最佳答案
存在类型不匹配。请注意 TableRow
元素被解析为 KV<Integer, String>
在MapElements
(即 key 是 Integer
)。然后,写入步骤需要 String
键如 .apply(FileIO.<String, KV<String, String>>writeDynamic()
:
inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
...
避免在使用 .by(KV::getKey)
时再次输入 key 我建议将其转换为 String
之前:
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
作为示例,我使用公共(public)表 bigquery-public-data:london_bicycles.cycle_stations
对此进行了测试我将每个自行车站写入不同的文件:
$ cat output/file-746-00000-of-00004.txt
Lots Road, West Chelsea
$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE
+-------------------------+
| name |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+
完整代码:
package com.dataflow.samples;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
public abstract class DynamicGCSWrites {
public interface Options extends PipelineOptions {
@Validation.Required
@Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
String getOutput();
void setOutput(String s);
}
public static void main(String[] args) {
DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);
Pipeline p = Pipeline.create(options);
String output = options.getOutput();
PCollection<TableRow> inputRows = p
.apply(BigQueryIO.readTableRows()
.from("bigquery-public-data:london_bicycles.cycle_stations")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "name")));
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
p.run().waitUntilFinish();
}
}
关于java - Apache Beam : Writing values of key, 根据 key 对文件进行值对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58725111/
我喜欢 smartcase,也喜欢 * 和 # 搜索命令。但我更希望 * 和 # 搜索命令区分大小写,而/和 ?搜索命令遵循 smartcase 启发式。 是否有隐藏在某个地方我还没有找到的设置?我宁
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 10年前关闭。 Improve this qu
从以下网站,我找到了执行java AD身份验证的代码。 http://java2db.com/jndi-ldap-programming/solution-to-sslhandshakeexcepti
似乎 melt 会使用 id 列和堆叠的测量变量 reshape 您的数据框,然后通过转换让您执行聚合。 ddply,从 plyr 包看起来非常相似..你给它一个数据框,几个用于分组的列变量和一个聚合
我的问题是关于 memcached。 Facebook 使用 memcached 作为其结构化数据的缓存,以减少用户的延迟。他们在 Linux 上使用 UDP 优化了 memcached 的性能。 h
在 Camel route ,我正在使用 exec 组件通过 grep 进行 curl ,但使用 ${HOSTNAME} 的 grep 无法正常工作,下面是我的 Camel 路线。请在这方面寻求帮助。
我正在尝试执行相当复杂的查询,在其中我可以排除与特定条件集匹配的项目。这是一个 super 简化的模型来解释我的困境: class Thing(models.Model) user = mod
我正在尝试执行相当复杂的查询,我可以在其中排除符合特定条件集的项目。这里有一个 super 简化的模型来解释我的困境: class Thing(models.Model) user = mod
我发现了很多嵌入/内容项目的旧方法,并且我遵循了在这里找到的最新方法(我假设):https://blog.angular-university.io/angular-ng-content/ 我正在尝试
我正在寻找如何使用 fastify-nextjs 启动 fastify-cli 的建议 我曾尝试将代码简单地添加到建议的位置,但它不起作用。 'use strict' const path = req
我正在尝试将振幅 js 与 React 和 Gatsby 集成。做 gatsby developer 时一切看起来都不错,因为它发生在浏览器中,但是当我尝试 gatsby build 时,我收到以下错
我试图避免过度执行空值检查,但同时我想在需要使代码健壮的时候进行空值检查。但有时我觉得它开始变得如此防御,因为我没有实现 API。然后我避免了一些空检查,但是当我开始单元测试时,它开始总是等待运行时异
尝试进行包含一些 NOT 的 Kibana 搜索,但获得包含 NOT 的结果,因此猜测我的语法不正确: "chocolate" AND "milk" AND NOT "cow" AND NOT "tr
我正在使用开源代码共享包在 iOS 中进行 facebook 集成,但收到错误“FT_Load_Glyph failed: glyph 65535: error 6”。我在另一台 mac 机器上尝试了
我正在尝试估计一个标准的 tobit 模型,该模型被审查为零。 变量是 因变量 : 幸福 自变量 : 城市(芝加哥,纽约), 性别(男,女), 就业(0=失业,1=就业), 工作类型(失业,蓝色,白色
我有一个像这样的项目布局 样本/ 一种/ 源/ 主要的/ java / java 资源/ .jpg 乙/ 源/ 主要的/ java / B.java 资源/ B.jpg 构建.gradle 设置.gr
如何循环遍历数组中的多个属性以及如何使用map函数将数组中的多个属性显示到网页 import React, { Component } from 'react'; import './App.css'
我有一个 JavaScript 函数,它进行 AJAX 调用以返回一些数据,该调用是在选择列表更改事件上触发的。 我尝试了多种方法来在等待时显示加载程序,因为它当前暂停了选择列表,从客户的 Angul
可能以前问过,但找不到。 我正在用以下形式写很多语句: if (bar.getFoo() != null) { this.foo = bar.getFoo(); } 我想到了三元运算符,但我认
我有一个表单,在将其发送到 PHP 之前我正在执行一些验证 JavaScript,验证后的 JavaScript 函数会发布用户在 中输入的文本。页面底部的标签;然而,此消息显示短暂,然后消失...
我是一名优秀的程序员,十分优秀!