- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如果要由数据流处理的输入文件有数据,我正在执行一项任务,以清除 memorystore 的缓存。这意味着,如果输入文件没有记录,则不会刷新内存存储,但输入文件即使有一条记录,也应刷新内存存储,然后应对输入文件进行处理。
我的数据流应用程序是一个多管道应用程序,它读取、处理然后将数据存储在 memorystore 中。管道正在成功执行。但是,内存存储的刷新正在工作,但刷新后,插入没有发生。
我编写了一个函数,在检查输入文件是否有记录后刷新内存。
FlushingMemorystore.java
package com.click.example.functions;
import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class FlushingMemorystore {
private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
public static FlushingMemorystore.Read read() {
return (new AutoValue_FlushingMemorystore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}
@AutoValue
public abstract static class Read extends PTransform<PCollection<Long>, PDone> {
public Read() {
}
@Nullable
abstract RedisConnectionConfiguration connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract FlushingMemorystore.Read.Builder toBuilder();
public FlushingMemorystore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public FlushingMemorystore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public FlushingMemorystore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<Long> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
return PDone.in(input.getPipeline());
}
private static class ReadFn extends DoFn<Long, String> {
private static final int DEFAULT_BATCH_SIZE = 1000;
private final FlushingMemorystore.Read spec;
private transient Jedis jedis;
private transient Pipeline pipeline;
private int batchCount;
public ReadFn(FlushingMemorystore.Read spec) {
this.spec = spec;
}
@Setup
public void setup() {
this.jedis = this.spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
this.pipeline = this.jedis.pipelined();
this.pipeline.multi();
this.batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
batchCount++;
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
}
@FinishBundle
public void finishBundle() {
if (this.pipeline.isInMulti()) {
this.pipeline.exec();
this.pipeline.sync();
}
this.batchCount=0;
}
@Teardown
public void teardown() {
this.jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
abstract FlushingMemorystore.Read build();
abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
}
}
我在 Starter Pipeline 代码中使用该函数。
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
lines.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
清除缓存后插入处理数据的代码片段:
dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
数据流执行良好,它也会刷新内存存储,但此后插入不起作用。你能指出我哪里出错了吗?
abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423
在这种情况下,数据流将计算记录数,其中 2 并将根据逻辑处理 id、名字等,然后将其存储在 memorystore 中。此输入文件每天都会出现,因此,如果输入文件有记录,则应清除(或刷新)内存库。
最佳答案
我怀疑这里的问题是您需要确保“刷新”步骤在 RedisIO.write 步骤发生之前运行(并完成)。梁有Wait.on您可以为此使用转换。
为了实现这一点,我们可以使用刷新 PTransform 的输出作为我们已经刷新数据库的信号 - 我们只在完成刷新后写入数据库。 process
调用您的冲洗 DoFn 将如下所示:
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
c.output("READY");
}
一旦我们有一个信号指出数据库已被刷新,我们可以使用它在向其写入新数据之前等待:
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
PCollection<String> flushedSignal = lines
.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
// Then we use the flushing signal to start writing to Redis:
dataset
.apply(Wait.on(flushedSignal))
.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
关于google-cloud-platform - 如何在使用 Google Cloud Dataflow 清除 Cloud Memorystore 中的缓存后插入数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64653112/
我有一个网站,我正在通过学校参加比赛,但我在清除 float 元素方面遇到了问题。 该网站托管在 http://www.serbinprinting.com/corey/development/
我有一个清除按钮,需要使用 JQuery 函数清除该按钮单击时的 TextBox 值(输入的)。 最佳答案 您只需将单击事件附加到按钮即可将输入元素的值设置为空。 $("#clearButton").
我们已经创建了一个保存到 CoreData 然后同步到 CloudKit 的 iOS 应用程序。在测试中,我们还没有找到一种方法来清除应用程序 iCloud 容器中的数据(用于用户私有(private
这是一个普遍的问题,也是我突然想到并且似乎有道理的问题。我看到很多人使用清除div 并且知道这有时不受欢迎,因为它是额外的标记。我最近开始使用 因为它接缝代表了它的实际用途。 当然都引用了:.clea
我有两个单选按钮。如果我检查第一个单选按钮下面的数据将填充在组合框中。之后我将检查另一个单选按钮,我想清除组合框值。 EmployeeTypes _ET = new EmployeeTypes(
我一直在玩 Canvas ,我正在尝试制作一个可以移动和跳跃的正方形,移动部分已经完成,但是跳跃部分有一个问题:每次跳跃时它都会跳得更快 here's a jsfiddle 这是代码: ///////
我该如何在 Dart 上做到这一点? 抓取tbody元素后,我想在其上调用empty(),但这似乎不存在: var el = query('#search_results_tbody'); el.em
我需要创建一个二维模拟,但是在设置新的“框架”时,旧的“框架”不会被清除。 我希望一些圆圈在竞技场中移动,并且每个循环都应删除旧圆圈并生成新圆圈。一切正常,但旧的没有被清除并且仍然可见,这就是我需要改
无论我使用set statusline将状态行更改为什么,我的状态行都不会改变。看起来像 ".vimrc" 39L, 578C
在 WPF 应用程序中,我有一个 ListView 绑定(bind)到我的 ViewModel 上的一个 ObservableCollection。 在应用程序运行期间,我需要删除并重新加载集合中的所
我有一个大型程序,一个带有图形的文本扭曲游戏。在我的代码中的某处,我使用 kbhit() 我执行此代码来清除我的输入缓冲区: while ((c = getchar()) != '\n' && c !
我正在将所有网站的页面加载到主索引页面中,并通过将 href 分成段并在主域名后使用 .hash 函数添加段来更新 URL 显示,如下所示: $('a').click(function(event)
我有一个带有 的表单和 2 控件来保存和重置表单。我正在触发 使用 javascript __doPostBack()函数并在其中传递一个值 __EVENTARGUMENT如果面板应该重置。 我的代
我目前有一堆 UIViewController,每个都是在前一个之上呈现的模式 ViewController。我的问题是我不需要一堆 UIViewController,我只需要最后一个。因此,当出现新
我在一个类中有一些属性方法,我想在某个时候清除这个属性的缓存。 示例: class Test(): def __init__(self): pass @property
在此Test Link我试图将标题和主站点导航安装到博客脚本的顶部。 我清除:两者;在主要网站脚本上工作,但现在把所有东西都扔到了一边。尝试了无数次 fixex 都没有成功!提前感谢 Ant 指点解决
我似乎无法正确清除布局。看this 我无法阻止左栏中的元素向下推右栏中的元素。谁能帮忙? Screenshot with some pointy arrows (死链接) 最佳答案 问题标记/样式似
我希望能够在某个类 (sprite-empos) 之后清除 '' 中的内容,想知道是否有不添加任何新类或不使用 js 的方法(我在下面尝试过不工作)? 为了明确它是“985”,我想在某个视口(view
我想清除ptr_array boost::ptr_array a; ... a.clear(); // missing 如何清理 ptr 容器? 最佳答案 它应该表现得像一个数组,您不能在 C++
这是我使用多 map 制作的一个简单的事件系统;当我使用 CEvents::Add(..) 方法时,它应该插入并进入多重映射。问题是,当我触发这些事件时, multimap 似乎是空的。我确定我没有调
我是一名优秀的程序员,十分优秀!