- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试通过在 DoFn 步骤中查询数据存储来增强管道中的数据。来自类 CustomClass 的对象的字段用于对数据存储区表执行查询,返回的值用于增强对象。
代码如下所示:
public class EnhanceWithDataStore extends DoFn<CustomClass, CustomClass> {
private static Datastore datastore = DatastoreOptions.defaultInstance().service();
private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind("article");
@Override
public void processElement(ProcessContext c) throws Exception {
CustomClass event = c.element();
Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId()));
String articleName = "";
try{
articleName = article.getString("articleName");
} catch(Exception e) {}
CustomClass enhanced = new CustomClass(event);
enhanced.setArticleName(articleName);
c.output(enhanced);
}
当它在本地运行时,速度很快,但当它在云端运行时,这一步会显着降低管道速度。这是什么原因造成的?有没有解决方法或更好的方法来做到这一点?
管道的图片可以在这里找到(最后一步是增强步骤): pipeline architecture
最佳答案
您在这里所做的是在您的输入 PCollection<CustomClass>
之间进行连接以及 Datastore 中的增强功能。
对于您的 PCollection
的每个分区对 Datastore 的调用将是单线程的,因此会产生大量延迟。我希望这在 DirectPipelineRunner
中会很慢和 InProcessPipelineRunner
以及。通过自动缩放和动态工作重新平衡,您应该在数据流服务上运行时看到并行性,除非您的结构导致我们对其优化不佳,因此您可以尝试增加 --maxNumWorkers
.但是您仍然无法从批量操作中受益。
最好在您的管道中使用 DatastoreIO.readFrom(...)
表达此连接其次是 CoGroupByKey
转换。这样,Dataflow 将对所有增强功能进行批量并行读取,并使用高效的 GroupByKey
。使他们与事件保持一致的机器。
// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);
// Key them both by the common id
PCollection<KV<Long, CustomClass>> keyedEvents =
events.apply(WithKeys.of(event -> event.getArticleId()))
PCollection<KV<Long, Entity>> =
articles.apply(WithKeys.of(article -> article.getKey().getId())
// Set up the join by giving tags to each collection
TupleTag<CustomClass> eventTag = new TupleTag<CustomClass>() {};
TupleTag<Entity> articleTag = new TupleTag<Entity>() {};
KeyedPCollectionTuple<Long> coGbkInput =
KeyedPCollectionTuple
.of(eventTag, keyedEvents)
.and(articleTag, keyedArticles);
PCollection<CustomClass> enhancedEvents = coGbkInput
.apply(CoGroupByKey.create())
.apply(MapElements.via(CoGbkResult joinResult -> {
for (CustomClass event : joinResult.getAll(eventTag)) {
String articleName;
try {
articleName = joinResult.getOnly(articleTag).getString("articleName");
} catch(Exception e) {
articleName = "";
}
CustomClass enhanced = new CustomClass(event);
enhanced.setArticleName(articleName);
return enhanced;
}
});
另一种可能性,如果只有很少的文章足以在内存中存储查找,则使用 DatastoreIO.readFrom(...)
然后通过 View.asMap()
将它们全部读取为 map 端输入并在本地表中查找它们。
// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);
// Key the articles and create a map view
PCollectionView<Map<Long, Entity>> = articleView
.apply(WithKeys.of(article -> article.getKey().getId())
.apply(View.asMap());
// Do a lookup join by side input to a ParDo
PCollection<CustomClass> enhanced = events
.apply(ParDo.withSideInputs(articles).of(new DoFn<CustomClass, CustomClass>() {
@Override
public void processElement(ProcessContext c) {
Map<Long, Entity> articleLookup = c.sideInput(articleView);
String articleName;
try {
articleName =
articleLookup.get(event.getArticleId()).getString("articleName");
} catch(Exception e) {
articleName = "";
}
CustomClass enhanced = new CustomClass(event);
enhanced.setArticleName(articleName);
return enhanced;
}
});
根据您的数据,这些中的任何一个都可能是更好的选择。
关于java - Dataflow DoFn 中的数据存储查询在云中运行时会减慢管道速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40049621/
据我了解,它是服务器中长时间运行的进程。能否覆盖网络游戏服务器等长时间运行的程序实例? 最佳答案 您可以将辅助角色视为 Windows 服务或 Unix 守护进程。正如您所说,它是一个永久执行的进程(
尝试创建ChirpStack Azure 云中的 Docker-Compose 容器: docker login azure docker context create aci myacicontex
我找到了关于这个主题的优秀教程,但它留下了一些悬而未决的问题 http://www.silverlightshow.net/items/Silverlight-in-the-Azure-cloud-P
我们正在尝试使用 ADF(Azure 数据工厂)管道从 DB2 数据库(可在本地基础设施上使用)查询示例数据。我们在本地基础设施的虚拟机中托管了 ADF SHIR(自托管集成运行时)。 我们已经为 D
我在尝试将我的 intelliJ 项目与 JetStream Xsede 云中的 JDK 关联时遇到问题。当尝试通过 intelliJ 访问该目录时,我可以看到一个锁定的文件夹图标,不允许访问它。 我
我是一名优秀的程序员,十分优秀!