作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是apache flink的新手。我的输入中有一个未绑定(bind)的数据流(通过 kakfa 输入到 flink 0.10)。
我想获得每个主键的第一次出现(主键是contract_num和event_dt)。
这些“重复”几乎立即发生在彼此之后。
源系统无法为我过滤这个,所以 flink 必须这样做。
这是我的输入数据:
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
这是我想要的输出数据:
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
请注意,第二行已被删除,因为 A001 和“2016-02-24 10:25:08”的组合键已经出现在第一行。
keyBy(0,1)
但在那之后我不知道该怎么办!
@Test
public void test() {
DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);
DataStream<Tuple3<String, Date, String>> testStream =
createTimedTestStreamWith(
Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
.emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
.close();
testStream.keyBy(0,1);
}
最佳答案
这是我刚刚写的另一种方法。它的缺点是它有更多的自定义代码,因为它不使用内置的 Flink 窗口函数,但它没有 Till 提到的延迟损失。 GitHub 上的完整示例.
package com.dataartisans.filters;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import java.io.Serializable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* This class filters duplicates that occur within a configurable time of each other in a data stream.
*/
public class DedupeFilterFunction<T, K extends Serializable> extends RichFilterFunction<T> implements CheckpointedAsynchronously<HashSet<K>> {
private LoadingCache<K, Boolean> dedupeCache;
private final KeySelector<T, K> keySelector;
private final long cacheExpirationTimeMs;
/**
* @param cacheExpirationTimeMs The expiration time for elements in the cache
*/
public DedupeFilterFunction(KeySelector<T, K> keySelector, long cacheExpirationTimeMs){
this.keySelector = keySelector;
this.cacheExpirationTimeMs = cacheExpirationTimeMs;
}
@Override
public void open(Configuration parameters) throws Exception {
createDedupeCache();
}
@Override
public boolean filter(T value) throws Exception {
K key = keySelector.getKey(value);
boolean seen = dedupeCache.get(key);
if (!seen) {
dedupeCache.put(key, true);
return true;
} else {
return false;
}
}
@Override
public HashSet<K> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new HashSet<>(dedupeCache.asMap().keySet());
}
@Override
public void restoreState(HashSet<K> state) throws Exception {
createDedupeCache();
for (K key : state) {
dedupeCache.put(key, true);
}
}
private void createDedupeCache() {
dedupeCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpirationTimeMs, TimeUnit.MILLISECONDS)
.build(new CacheLoader<K, Boolean>() {
@Override
public Boolean load(K k) throws Exception {
return false;
}
});
}
}
关于java - apache flink 0.10 如何从无界输入数据流中获取第一次出现的复合键?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35599069/
我有一个Foldable的Integer s 是无界的,因此我无法应用 Max折叠到它。 定义 Max 有意义吗? Nothing 的实例当一个值不存在时?有Ord a => Semigroup (M
我有两个简单的问题。我有一个 LinkedBlockingQueue,我将其简单地创建为 新的 LinkedBlockingQueue() 所以我认为这保证是无限的,对吗? 如果确实如此,那么说在此队
是否可以在未指定边界之一的情况下使用 git bisect。例如,如果我发现 HEAD 上有问题但我怀疑它在过去在某个时候有效,有没有办法告诉 git “尝试一次提交之前,如果那不起作用尝试两次提交之
我是一名优秀的程序员,十分优秀!