- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
问题陈述是我们正在自定义 Google 提供的 PubSubToBQ Dataflow 流式 java 模板,其中我们配置要读取的多个订阅/主题并将数据推送到多个 Bigquery 表中,这需要作为单个执行数据流管道,用于从源读取所有流并推送到 Bigquery 表中。但是,当我们从 eclipse 执行模板时,我们必须传递订阅/主题和 BQ 详细信息,以及 gcs 存储桶上的模板阶段,然后当我们使用具有不同订阅和 BQ 详细信息的 gcloud 命令运行模板时。数据流作业不会被新的订阅或 BQ 表覆盖。
目标:我的目标是使用 Google 提供的 PubSubTOBQ.java 类模板并传递具有相应 Bigquery 表的订阅列表,并创建每个表传递订阅的管道。因此,单个作业中有 n-n、n 个管道。
我正在使用 Google 提供的 PubSubTOBQ.java 类模板,该模板将输入作为单个订阅或单个主题以及相应的大查询表详细信息。
现在我需要对其进行自定义,以将输入作为主题列表或以逗号分隔的订阅列表。我可以使用 ValueProvider> 并在 main 或 run 方法内部迭代字符串数组并将订阅/主题和 bq 表作为字符串传递。请查看下面的代码以获取更多信息。
我在 gcp 文档上读到的是,如果我们想在 rumtime 期间覆盖或使用值来创建动态 Piepline,则我们无法在 DoFn 之外传递 ValueProvider 变量。不确定我们是否可以阅读 DoFn 内的消息。
**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])**
如果是,请告诉我。这样我的目的就达到了。
代码:
public static void main(String[] args) {
StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StreamingDataflowOptions.class);
List<String> listOfSubStr = new ArrayList<String>();
List<String> listOfTopicStr = new ArrayList<String>();
List<String> listOfTableStr = new ArrayList<String>();
String[] providedSubscriptionArray = null;
String[] providedTopicArray = null;
String[] providedTableArray = null;
if (options.getInputSubscription().isAccessible()) {
listOfSubStr = options.getInputSubscription().get();
providedSubscriptionArray = new String[listOfSubStr.size()];
providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
}
if (options.getInputTopic().isAccessible()) {
listOfTopicStr = options.getInputTopic().get();
providedTopicArray = new String[listOfSubStr.size()];
providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
}
if (options.getOutputTableSpec().isAccessible()) {
listOfTableStr = options.getOutputTableSpec().get();
providedTableArray = new String[listOfSubStr.size()];
providedTableArray = createListOfProvidedStringArray(listOfTableStr);
}
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> readPubSubMessage = null;
for (int i = 0; i < providedSubscriptionArray.length; i++) {
if (options.getUseSubscription()) {
readPubSubMessage = pipeline
.apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
} else {
readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
}
readPubSubMessage
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
.apply("Insert Data To BigQuery",
BigQueryIO.writeTableRows().to(providedTableArray[i])
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
}
pipeline.run().waitUntilFinish();
}
应该能够将单个 Dataflow PubSubTOBQ 模板用于与单个 Dataflow 流处理作业中的 bigquery 模板数量相对应的订阅数量的多个管道。
最佳答案
问题是,到目前为止,数据流模板需要知道暂存/创建时的管道图,以便它在运行时不能有所不同。如果您仍然想使用非模板化管道来执行此操作,并将逗号分隔的 Pub/Sub 主题列表传递为 --topicList
选项参数,那么你可以这样做:
String[] listOfTopicStr = options.getTopicList().split(",");
PCollection[] p = new PCollection[listOfTopicStr.length];
for (int i = 0; i < listOfTopicStr.length; i++) {
p[i] = pipeline
.apply(PubsubIO.readStrings().fromTopic(listOfTopicStr[i]))
.apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Log.info(String.format("Message=%s", c.element()));
}
}));
}
完整代码here .
如果我们用 3 个主题来测试它,例如:
mvn -Pdataflow-runner compile -e exec:java \
-Dexec.mainClass=com.dataflow.samples.MultipleTopics \
-Dexec.args="--project=$PROJECT \
--topicList=projects/$PROJECT/topics/topic1,projects/$PROJECT/topics/topic2,projects/$PROJECT/topics/topic3 \
--stagingLocation=gs://$BUCKET/staging/ \
--runner=DataflowRunner"
gcloud pubsub topics publish topic1 --message="message 1"
gcloud pubsub topics publish topic2 --message="message 2"
gcloud pubsub topics publish topic3 --message="message 3"
输出和数据流图将符合预期:
将这种方法强制纳入模板的一个可能的解决方法是拥有足够多的主题 N
对于最坏的情况。当我们使用 n
执行模板时主题(满足 n <= N
)我们需要指定 N - n
要填写的未使用/虚拟主题。
关于java - GCP数据流流模板: Not able to customize google provided java based PubSubToBQ template,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57053573/
namespace std { template <> class hash{ public : size_t operator()( cons
我正在构建一个 Javascript 交互性有限的 Django 应用程序,并且正在研究如何将 Vue 模板与 Django 模板合并以实现相同的内容。 想象一个无限滚动的页面,其中 SEO 非常重要
我需要一个由游戏逻辑组成的外部类,调用 LitElement 组件,并向其传递一个 html 模板文字,该组件将使用该文字来更新其自己的 html 模板文字的一部分。 在下面的代码中,您将看到组件的一
很简单,我不想在 html 文件中定义所有 Handlebars 模板 我试过了 但这并没有奏效。我是否可以不以编程方式定义模板,甚至只是加载 Handlebars 文件,以便我可以重用,而且我觉得
在此代码中,j 正确地成为对象:j.name、j.addr、j.city、j.state 和 j.zip。但是,成功函数有一个 JavaScript 错误 .tmpl() 不是函数。 {{t
Django模板不会?点进来,总结了模板语法传值取值、过滤器和自定义过滤器、模板标签的分类、中间件403报错如何解决、如何继承模板~👆 Django 模板 模板传值取值 后端传值 键值对形式:{‘n
哈喽大家好,我是鹿 九 丸 \color{red}{鹿九丸}鹿九丸,今天给大家带来的是C++模板。 如果大家在看我的博客的过程中或者学习的过程中以及在学习方向上有什么问题或者想跟我交流的话可以加我的企
我正在用 PHP 编写一个简单的模板层,但我遇到了一些困难。目前它是这样工作的: 首先,我使用 fetch_template 从数据库中加载模板内容 - 这可行(如果您有兴趣,我会在启动时收集所有模板
我正在制作有关模板的 Django 教程。我目前处于此代码: from django.template import Template, Context >>> person = {'name': '
我正在使用 Jquery 模板来显示传入的 JSON 数据我想将模板加载到可缓存的外部文件中。我该怎么做? 更新 http://encosia.com/2010/12/02/jquery-templa
这是我的观点.py: from django.http import HttpResponse from django.template.loader import get_template from
我试图说服一位同事在项目的前端使用 Mustache/Hogan,我提出了以下建议: 有一个 templates.js 文件,大致如下所示: var tpl_alert = '{{msg}}'; va
我想创建一个通用的数组函数。在我的 API 中,我有一个通用容器,我需要将其转换为正确的类,但我想让它通用 template void UT::printArray(CCArray* arr, T t
有谁知道是否有办法在 Genshi 中创建 javascript 模板?我的意思是,我需要一个 .js 文件,可以在其中使用 等指令。等等。 有什么想法吗?谢谢! 最佳答案 你可以直接在html中这
我想知道是否可以设置某种 HTML 模板系统,基本上我有 3 个不同的文件: - header.html - footer.html - landing.html(landing.html 是包含页面
我正在尝试构建以下 HTML 模板: 这很简单,如果我使用红色容器 1-4,语法如下: 1 2 3 4 5 6 7 8 9 https://jsfi
#include "boost/numeric/ublas/matrix.hpp" using namespace boost::numeric::ublas; template class Lay
我在一个类中有一个函数,它传递了一个函数及其参数,然后将它们绑定(bind)到一个函数调用中并调用该函数等。 这已经被快速组合在一起以测试我知道代码不是很好的概念。 class Profiling {
是否有一个 c++ 结构或模板(在任何库中)允许我在十进制和任何其他基数之间进行转换(很像 bitset 可以做的)? 最佳答案 是的,你可以使用unsigned int: unsigned int
数据类型给程序设计带来的困扰及解决方案 int maxt(int, int); double maxt(double, double); 若有一种占位符T,能够代替类型,便可以简化代码的冗余编写
我是一名优秀的程序员,十分优秀!