- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中com.hazelcast.jet.pipeline.WindowDefinition
类的一些代码示例,展示了WindowDefinition
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WindowDefinition
类的具体详情如下:
包路径:com.hazelcast.jet.pipeline.WindowDefinition
类名称:WindowDefinition
[英]The definition of the window for a windowed aggregation operation. The enum WindowKind enumerates the kinds of window that Jet supports. To obtain a window definition, use the factory methods provided in this interface.
[中]窗口聚合操作的窗口定义。enum WindowKind枚举Jet支持的窗口类型。要获得窗口定义,请使用此界面中提供的工厂方法。
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public void addToDag(Planner p) {
if (wDef.kind() == SESSION) {
addSessionWindow(p, wDef.downcast());
} else if (aggrOp.combineFn() == null) {
// We don't use single-stage even when optimizing for memory because the
// single-stage setup doesn't save memory with just one global key.
addSlidingWindowSingleStage(p, wDef.downcast());
} else {
addSlidingWindowTwoStage(p, wDef.downcast());
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public long preferredWatermarkStride() {
return wDef.preferredWatermarkStride();
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
.window(sliding(MINUTES.toMillis(120), MINUTES.toMillis(15)))
.aggregate(linearTrend(CarCount::getTime, CarCount::getCount))
.map((TimestampedEntry<String, Double> e) ->
代码示例来源:origin: hazelcast/hazelcast-jet
private static String createName(WindowDefinition wDef) {
return wDef.kind().name().toLowerCase() + "-window";
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
/**
* Builds and returns the Pipeline which represents the actual computation.
*/
private static Pipeline buildPipeline(String modelPath) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(WebcamSource.webcam(500))
.mapUsingContext(classifierContext(modelPath),
(ctx, img) -> {
Entry<String, Double> classification = classifyWithModel(ctx, img);
return tuple3(img, classification.getKey(), classification.getValue());
}
)
.window(tumbling(1000))
.aggregate(maxBy(comparingDouble(Tuple3::f2)))
.drainTo(buildGUISink());
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
// we'll calculate two aggregations over the same input data:
// 1. number of viewed product listings
// 2. set of purchased product IDs
// Output of the aggregation will be List{Integer, Set<String>}
AggregateOperation1<ProductEvent, ?, Tuple2<Long, Set<String>>> aggrOp = allOf(
summingLong(e -> e.getProductEventType() == VIEW_LISTING ? 1 : 0),
mapping(e -> e.getProductEventType() == PURCHASE ? e.getProductId() : null, toSet())
);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<ProductEvent>streamFromProcessor("generator",
ProcessorMetaSupplier.of(GenerateEventsP::new, 1)))
.addTimestamps(ProductEvent::getTimestamp, 0)
.groupingKey(ProductEvent::getUserId)
.window(WindowDefinition.session(SESSION_TIMEOUT))
.aggregate(aggrOp, SessionWindow::sessionToString)
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
private static String createName(WindowDefinition wDef) {
return wDef.kind().name().toLowerCase() + "-window";
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.aggregate(counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public void addToDag(Planner p) {
if (wDef.kind() == SESSION) {
addSessionWindow(p, wDef.downcast());
} else if (aggrOp.combineFn() == null || getOptimization() == MEMORY) {
addSlidingWindowSingleStage(p, wDef.downcast());
} else {
addSlidingWindowTwoStage(p, wDef.downcast());
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Override
public long preferredWatermarkStride() {
return wDef.preferredWatermarkStride();
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.groupingKey(pv -> pv.userId())
.aggregate(toList())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This code is the main point of the sample: use the source builder to
* create an HTTP source connector, then create a Jet pipeline that
* performs windowed aggregation over its data.
*/
private static Pipeline buildPipeline() {
StreamSource<TimestampedItem<Long>> usedMemorySource = SourceBuilder
.timestampedStream("used-memory", x -> new PollHttp())
.fillBufferFn(PollHttp::fillBuffer)
.destroyFn(PollHttp::close)
.build();
Pipeline p = Pipeline.create();
p.drawFrom(usedMemorySource)
.window(sliding(100, 20))
.aggregate(linearTrend(TimestampedItem::timestamp, TimestampedItem::item))
.map(tsItem -> entry(tsItem.timestamp(), tsItem.item()))
.drainTo(Sinks.map(MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
"prices",
mapPutEvents(),
e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
START_FROM_CURRENT
))
.addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
.setLocalParallelism(1)
.groupingKey(PriceUpdateEvent::ticker)
.window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroup() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
coGrouped.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroupWithBuilder() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
Tag<List<PageVisit>> pageVisitTag = builder.tag0();
Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> paymentTag = builder.add(payments, toList());
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
coGrouped.drainTo(Sinks.logger());
return p;
}
例如,如果运行 1 正在等待批准并触发运行 2,则应拒绝运行 1。 最佳答案 “待批准”状态具体来自 Approval Gates功能。 虽然您不能在触发新运行时明确拒绝“待批准”步骤,但您可以通过在
在 Azure DevOps Pipelines 中,似乎有两种我无法区分的概念和处理“工件”的方法。 管道工件 https://learn.microsoft.com/en-us/azure/dev
不确定是否有办法做到这一点,但我想查看之前运行的 yaml 管道中的参数,以便查看管道运行时输入或选择的内容。那可能吗?我发现的唯一解决方法是根据每个参数添加标签。 最佳答案 您可以从 Build 查
我正在运行我所有的测试用例,其中一些用例有时会失败,管道检测到它并使步骤和构建失败。这会阻止要执行的下一步(压缩报告文件夹)。我想将该 zip 文件作为电子邮件附件发送。 这是我的 bitbucket
我正在数据工厂中定义管道,我纠正了一些错误。第一个事件是调用 usql 脚本进行一些聚合,我更改了脚本很多时间,但错误仍然是: [{"errorId":"E_CSC_USER_SYNTAXERROR"
我正在尝试使用运行命令VSTS扩展名对VSTS版本定义执行helm命令,但问题是它无法在我配置的自定义生成代理上找到kubeconfig文件。我认为这是因为定义的构建步骤在单独的过程中运行。当我运行
我无法弄清楚 sklearn.pipeline.Pipeline 是如何工作的。 doc 中有一些解释.例如它们是什么意思: Pipeline of transforms with a final e
我在 azure 管道中有两个管道(也称为“构建定义”),一个正在执行系统测试,一个正在执行性能测试。两者都使用相同的测试环境。我必须确保系统测试管道运行时不会触发性能管道,反之亦然。 到目前为止我已
我遵循了这个指令 https://confluence.atlassian.com/bitbucket/use-ssh-keys-in-bitbucket-pipelines-847452940.ht
当使用 YAML 完成另一个管道时尝试触发 Azure 管道。有documentation表明您可以添加管道资源: resources: # types: pipelines | builds |
我正在尝试根据我发布到的每个环境对我的 Web.config 进行文件转换。大多数情况下,一切看起来都很好,直到我在发布管道上部署到我的 UAT 阶段。 在我的构建管道中,这是我正在使用的 YAML
脚本化管道中是否有任何方法可以将某个阶段标记为不稳定,但仅将该阶段显示为不稳定,而不在输出中将每个阶段标记为不稳定? 我可以做这样的事情: node() { stage("Stage1") {
我有针对特定环境(dev、qa、uat)的特定配置文件和另一个根 web.config。部署的代码读取 web.config。所以我一直在尝试复制内容或重命名 Azure Pipelines 中的文件
我的 Jenkins 中有很多 Pipeline 项目。我想将它们转换为多分支管道。是否可以不删除管道并创建新的多分支管道?怎么办? 最佳答案 假设您的管道位于 Jenkinsfile 中,则无需进行
我们正在使用扩展功能以安全的方式在我们的管道中重用模板。为了更轻松地定义模板的参数,我想使用变量,但我觉得这是不可能的。 但由于我在官方文档中找不到答案,所以我在这一轮提问。 我的 yml 文件如下所
如何访问UID Jenkins 管道工作中的变量? 我收到了 null什么时候: pipeline { agent any environment { def user
我正在摆弄管道以尝试减少整体运行时间。我想做的一件事是执行 docker pull ...在开始时,以便以后,当我真正需要它时,它已经为我准备好了。我想将它作为后台工作解雇,并让它在该任务结束后继续存
From here我了解到 Bitbucket Pipeline 支持 ifs 语句。 如何在 if 语句中执行多行块? 这不计算: script: - if [ $BITBUCK
我在运行 Jenkins 服务器(在 docker 容器内)的谷歌云中运行虚拟机。我正在尝试为我的应用程序构建一个 Docker 镜像,并使用 Jenkins 管道将其推送到 Google Conta
在构建和部署 docker image.Getting Unexpected value 'Steps' 在第 27 行之前,我试图将视频文件从 GPM 复制到 app/dist/asset/imag
我是一名优秀的程序员,十分优秀!