- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中com.hazelcast.jet.pipeline.WindowDefinition.sliding()
方法的一些代码示例,展示了WindowDefinition.sliding()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WindowDefinition.sliding()
方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.WindowDefinition
类名称:WindowDefinition
方法名:sliding
[英]Returns a WindowKind#SLIDING window definition with the given parameters.
Find more information in the Hazelcast Jet Reference Manual, Sliding and Tumbling Window.
[中]返回带有给定参数的WindowKind#滑动窗口定义。
在Hazelcast Jet参考手册的滑动和翻滚窗口中找到更多信息。
代码示例来源:origin: hazelcast/hazelcast-jet-demos
.window(sliding(MINUTES.toMillis(120), MINUTES.toMillis(15)))
.aggregate(linearTrend(CarCount::getTime, CarCount::getCount))
.map((TimestampedEntry<String, Double> e) ->
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.aggregate(counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.groupingKey(pv -> pv.userId())
.aggregate(toList())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This code is the main point of the sample: use the source builder to
* create an HTTP source connector, then create a Jet pipeline that
* performs windowed aggregation over its data.
*/
private static Pipeline buildPipeline() {
StreamSource<TimestampedItem<Long>> usedMemorySource = SourceBuilder
.timestampedStream("used-memory", x -> new PollHttp())
.fillBufferFn(PollHttp::fillBuffer)
.destroyFn(PollHttp::close)
.build();
Pipeline p = Pipeline.create();
p.drawFrom(usedMemorySource)
.window(sliding(100, 20))
.aggregate(linearTrend(TimestampedItem::timestamp, TimestampedItem::item))
.map(tsItem -> entry(tsItem.timestamp(), tsItem.item()))
.drainTo(Sinks.map(MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
"prices",
mapPutEvents(),
e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
START_FROM_CURRENT
))
.addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
.setLocalParallelism(1)
.groupingKey(PriceUpdateEvent::ticker)
.window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroup() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
coGrouped.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroupWithBuilder() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
Tag<List<PageVisit>> pageVisitTag = builder.tag0();
Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> paymentTag = builder.add(payments, toList());
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
coGrouped.drainTo(Sinks.logger());
return p;
}
我在 docs 中找不到这个或 javadocs : 我是否需要为每个线程创建一个客户端,或者是由以下人员创建的客户端: client = HazelcastClient.newHazelcastCl
据我所见,Hazelcast 的应用最常见于具有 50 多个节点的架构中。在 1 到 4 节点架构上使用 Hazelcast 是否有意义?如果是,我应该遵循的关于分区和 hazelcast 实例的最佳
在 hazelcast 文档中有一些对名为“default”的缓存的简短引用 - 例如,此处: http://docs.hazelcast.org/docs/3.6/manual/html-singl
我们正在针对我们的一个用例评估 Hazelcast,我对 Hazelcast 中的复制有疑问。 在 http://docs.hazelcast.org/docs/latest-development/
Hazelcast 是否会延迟创建主题。在下面的示例中,我们调用 getTopic('default')。如果默认主题不存在,Hazelcast 会自动创建它吗? 这是否意味着没有理由初始化宇宙中所有
我正在研究hazelcast用于以多播方式查找主节点的算法。首先我找到了查找主节点的函数:com.hazelcast.cluster.MulticastJoiner.findMasterWithMul
我正在查看 Hazelcast 的文档,我注意到驱逐政策的差异,并且我注意到其中一个我并不完全理解。 map_size_per_jvm: Max map size per JVM. partition
我已经编写了用于缓存的 hazelcast 缓存框架,并通过构造函数注入(inject)专门编写了一个用于缓存的 hazelcast 服务器。只是想知道 IMap 对象存储在哪里?它是在我的 haze
我们的服务器端解决方案利用 Hazelcast 提供的分布式数据结构来提供与居住在特定集群成员上的实体相关的可用状态。 当一个集群成员加入或离开集群时,我们需要让其他集群节点知道“发生了什么变化”:例
我的 Spring 应用程序由十几个微服务组成。每个微服务都提供不经常更改的数据。为了减少微服务之间的通信,我正在考虑开始使用 Hazelcast。 我的想法是每个微服务都会嵌入 Hazelcast。
我需要使用 csv/平面文件在 hazelcast 中加载 3 亿条记录(每条记录 60KB)。以最快的方式加载所有这些数据的最佳方法是什么?我可以逐条读取记录并在 hazelcast 中执行 map
我正在探索使用 Hazelcast(或任何其他缓存框架)在集群内宣传服务的概念。理想情况下,当集群成员离开时,其服务(或宣传它们的对象)应从缓存中删除。 这全部可能吗? 最佳答案 这是肯定有可能的。
我刚刚开始使用 hazelcast [3.3.1]。按照 hazelcast 应用程序和客户端教程,我创建了一个 hazelcast 应用程序实例和一个客户端(使用 eclipse IDE)。 从客户
出于测试目的,我想在单个节点上运行多个 Hazelcast 实例。 Hazelcast 假定它正在管理整个节点,因此它创建了足够多的线程来完全(实际上是过度)加载所有内核。在我的 8 核 Linux
我使用 IP 127.0.0.1 和端口 5701 启动 Hazelcast。为什么它尝试连接另外两个端口 5702 和 5703?它们的用途是什么,为什么无法连接? 以下是我以编程方式配置的方式:
我使用 启动了 hazelcast 服务器 java -jar hazelcast-3.10.1/lib/hazelcast-3.10.1.jar 服务器启动于 Members {size:1
最近阅读 JSR-107 和 JCache。 想知道 Hazelcast 或 Ehcache 是否遵循此 JSR? 最佳答案 JSR107 (JCache) 取得了良好的进展,我们已经通知规范委员会
背景 Hazelcast 集群的两个节点,每个节点都位于一个离散的子网上,因此多播不适合也不适合节点定位。 我想使用最简单的 XML 配置文件(例如 hazelcast.xml)来配置 Hazelca
我正在创建一个始终启动并运行的服务(ReST)。因此,在这个服务中,我从 spring 上下文中调用 init 方法,该方法会访问数据库并将所需的数据加载到 hazelcast 实例中。 现在我必须确
当我的java服务启动时,我从数据库中获取字符串列表,因此将它们缓存在hazelcast中,键是字符串列表,值(例如)100。可能有多个具有不同值的列表。这个想法是,下次当我的方法接收 String
我是一名优秀的程序员,十分优秀!