- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试运行连续异步查询。在我的 Windows 盒子上,我通过双击 ignite.bat 文件启动了 Apache Ignite,并尝试运行以下代码 -
Data Publisher 客户端包点燃;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgniteStreamPublisher {
public static void main(String[] args) throws Exception {
System.out.println("Run Spring example!!");
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<Integer, Person>("myStreamCache");
cacheConfiguration.setIndexedTypes(Integer.class, Person.class);
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheConfiguration);
IgniteDataStreamer<Integer, Person> stmr = ignite.dataStreamer("myStreamCache");
stmr.allowOverwrite(true);
try {
for (int i = 0; i < 100; i++) {
Person person = new Person(i, i, "name_" + i, (i * 100) % 3000);
System.out.println("putting--" + person);
stmr.addData(i, person);
Thread.sleep(1*1000);
stmr.flush();
}
}finally{
stmr.close();
}
}
}
数据接收器客户端
package ignite;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.lang.IgniteBiPredicate;
public class IgniteAsyncStreamReceiver {
public static void main(String[] args) throws Exception {
System.out.println("Run Spring example!!");
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<Integer, Person>("myStreamCache");
cacheConfiguration.setIndexedTypes(Integer.class, Person.class);
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(cacheConfiguration);
System.out.println();
System.out.println(">>> Cache continuous query example started.");
// Create new continuous query.
ContinuousQuery<Integer, Person> qry = new ContinuousQuery<>();
IgniteBiPredicate<Integer, Person> filter = new MyIgniteBiPredicate();
Query<Cache.Entry<Integer, Person>> scanQuery = new ScanQuery<>(filter);
qry.setInitialQuery(scanQuery);
// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Person>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Person>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends Person> e : evts)
System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
Factory<CacheEntryEventFilter<Integer, Person>> rmtFilterFactory = new MyRemoteFilterFactory();
qry.setRemoteFilterFactory(rmtFilterFactory);
// Execute query.
try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) {
// Iterate through existing data.
for (Cache.Entry<Integer, Person> e : cur)
System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
Thread.sleep(2000);
}
}
}
*RemoteFileFilterFactory 实现 *
package ignite;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import org.apache.ignite.Ignite;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.resources.IgniteInstanceResource;
public class MyRemoteFilterFactory implements Factory<CacheEntryEventFilter<Integer, Person>> {
private static final long serialVersionUID = 1L;
@Override
public CacheEntryEventFilter<Integer, Person> create() {
return new CacheEntryFilter();
}
@IgniteAsyncCallback
private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, Person> {
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override
public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Person> cache) throws CacheEntryListenerException {
System.out.println("Event : "+ (cache.getValue()));
return true;
}
}
}
IgniteBiPredicate 实现
package ignite;
import org.apache.ignite.lang.IgniteBiPredicate;
public class MyIgniteBiPredicate implements IgniteBiPredicate<Integer, Person> {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean apply(Integer key, Person person) {
return person.getSal() < 1000;
}
}
人是 POJO -
package ignite;
public class Person {
int id;
int age;
String name;
int sal;
public Person(int id, int age, String name, int sal) {
super();
this.id = id;
this.age = age;
this.name = name;
this.sal = sal;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Person [id=");
builder.append(id);
builder.append(", age=");
builder.append(age);
builder.append(", name=");
builder.append(name);
builder.append(", sal=");
builder.append(sal);
builder.append("]");
return builder.toString();
}
}
我在 Ignite 服务器控制台或发布者/接收者客户端上没有收到任何错误。但我的接收器在初始缓存快照后仅收到 1 或 2 条记录。我指的是https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java -
Run Spring example!!
[16:45:24] (wrn) Default Spring XML file not found (is IGNITE_HOME set?): config/default-config.xml
Mar 07, 2017 4:45:24 PM java.util.logging.LogManager$RootLogger log
SEVERE: Failed to resolve default logging config file: config/java.util.logging.properties
[16:45:24] __________ ________________
[16:45:24] / _/ ___/ |/ / _/_ __/ __/
[16:45:24] _/ // (7 7 // / / / / _/
[16:45:24] /___/\___/_/|_/___/ /_/ /___/
[16:45:24]
[16:45:24] ver. 1.9.0#20170302-sha1:a8169d0a
[16:45:24] 2017 Copyright(C) Apache Software Foundation
[16:45:24]
[16:45:24] Ignite documentation: http://ignite.apache.org
[16:45:24]
[16:45:24] Quiet mode.
[16:45:24] ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat}
[16:45:24]
[16:45:24] OS: Windows 7 6.1 x86
[16:45:24] VM information: Java(TM) SE Runtime Environment 1.8.0_60-b27 Oracle Corporation Java HotSpot(TM) Client VM 25.60-b23
[16:45:24] Initial heap size is 16MB (should be no less than 512MB, use -Xms512m -Xmx512m).
[16:45:24] Configured plugins:
[16:45:24] ^-- None
[16:45:24]
[16:45:25] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
[16:45:25] Security status [authentication=off, tls/ssl=off]
[16:45:26] REST protocols do not start on client node. To start the protocols on client node set '-DIGNITE_REST_START_ON_CLIENT=true' system property.
[16:45:27] Performance suggestions for grid (fix if possible)
[16:45:27] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true
[16:45:27] ^-- Enable server mode for JVM (add '-server' to JVM options)
[16:45:27] ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options)
[16:45:27] ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
[16:45:27] ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
[16:45:27] ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options)
[16:45:27] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning
[16:45:27]
[16:45:27] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}
[16:45:27]
[16:45:27] Ignite node started OK (id=98218fc2)
[16:45:27] Topology snapshot [ver=9, servers=1, clients=2, CPUs=4, heap=1.5GB]
>>> Cache continuous query example started.
Queried existing entry [key=0, val=Person [id=0, age=0, name=name_0, sal=0]]
Queried existing entry [key=1, val=Person [id=1, age=1, name=name_1, sal=100]]
Queried existing entry [key=2, val=Person [id=2, age=2, name=name_2, sal=200]]
Queried existing entry [key=3, val=Person [id=3, age=3, name=name_3, sal=300]]
Queried existing entry [key=4, val=Person [id=4, age=4, name=name_4, sal=400]]
Queried existing entry [key=5, val=Person [id=5, age=5, name=name_5, sal=500]]
Queried existing entry [key=6, val=Person [id=6, age=6, name=name_6, sal=600]]
Queried existing entry [key=7, val=Person [id=7, age=7, name=name_7, sal=700]]
Queried existing entry [key=8, val=Person [id=8, age=8, name=name_8, sal=800]]
Queried existing entry [key=9, val=Person [id=9, age=9, name=name_9, sal=900]]
Queried existing entry [key=30, val=Person [id=30, age=30, name=name_30, sal=0]]
Queried existing entry [key=31, val=Person [id=31, age=31, name=name_31, sal=100]]
Queried existing entry [key=32, val=Person [id=32, age=32, name=name_32, sal=200]]
Queried existing entry [key=33, val=Person [id=33, age=33, name=name_33, sal=300]]
Queried existing entry [key=34, val=Person [id=34, age=34, name=name_34, sal=400]]
Queried existing entry [key=35, val=Person [id=35, age=35, name=name_35, sal=500]]
Queried existing entry [key=36, val=Person [id=36, age=36, name=name_36, sal=600]]
Queried existing entry [key=37, val=Person [id=37, age=37, name=name_37, sal=700]]
Updated entry [key=10, val=Person [id=10, age=10, name=name_10, sal=1000]]
Updated entry [key=11, val=Person [id=11, age=11, name=name_11, sal=1100]]
<<NO RECORD after key 11. SOme time it publish 3-4 *Updated Entry* and some time only 1-2>>
最佳答案
当QueryCursor
关闭时,连续查询被取消。如果你摆脱了 try-with-resources block ,它将按照你的预期工作。 IE。这段代码
try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) {
// Iterate through existing data.
for (Cache.Entry<Integer, Person> e : cur)
System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
Thread.sleep(2000);
}
应该替换为:
QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry);
for (Cache.Entry<Integer, Person> e : cur)
System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
关于java - Apache 点燃: Continuous Async Query is not working continuous,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42659281/
我想避免创建 std::thread 的开销,因此我要实现一个线程池。我正在为一个设计决策而苦苦挣扎: 工作队列中的工作是否应该能够将工作添加到工作队列中?如果是,如何? 问题出现了,因为我想让我添加
color 属性正常工作,但其他两个属性(font-size 和 text-shadow)不起作用。当链接被访问时,它的字体大小应该减小到 20 px 并且应用 text-shadow 属性,但它没有
我已经安装并配置了 supervisor。 ps -ax 显示 10 个进程,例如:php/home/vagrant/Sites/mysite/artisan queue:work --tries=1
我对 php artisan queue::work 命令感到不安。 我的命令不起作用,但我的作业已插入作业表但从未执行。 我正在为队列使用 mongodb 驱动程序。 我做错了什么,请给我建议。 最
为什么我可以找到很多关于“工作窃取”的信息而没有关于“工作耸肩”作为动态负载平衡策略的信息? 通过“工作耸肩”,我的意思是将多余的工作从繁忙的处理器转移到负载较低的邻居上,而不是让空闲的处理器从忙碌的
首先,我正在为 MySQL 使用 DATE_ADD 函数。当试图在 php 中使用 $sqlA 时,由于某种原因它说语法错误(主要是 WHERE 之后的区域)。为什么? $sqlA = "SELECT
a:hover { color: #237ca8 !important; font-weight: bold; } a:active { color: #cccccc !imp
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 7 年前。 Improve this q
我试图让只能使用 Tab 键的用户可以访问我的网站。我遇到的问题是,当我尝试使用 tab 键选择 float 的 div 时,不会触发 :focus in css;我不知道为什么它没有被触发。鼠标悬停
我在尝试将 2 个 div 并排放置时遇到了问题。 display: inline 它会删除我的边框并且不会将两个 div 放在同一行上。 请指教: .gig { outline: 1px s
这是 fiddle :http://jsfiddle.net/j9Gmx/ 我怎样才能得到最小高度:100%;上类? 最佳答案 它正在 工作,但由于 div 的父级(正文)没有高度,100% 基本上是
我正在使用 Flutter WebRTC 来创建 P2P 视频通话。 我遇到了一个与网络相关的问题:我已经完成了应用程序,但它只适用于移动数据。 将网络更改为WiFi时,它不起作用并且连接状态挂起Ch
我是 JavaScript 和 jQuery 的初学者。我的 css 和 JavaScript 代码位于 html 文件外部。这个问题已经有了答案,我尝试了所有代码,但滚动不起作用。我不知道我错过了什
我正在使用 Sprin AMQP 的rabbittemplate 通过 RabbitMQ 发送和接收消息。我能够发送和接收消息,但是,我想优先处理消息。 例如,如果我推送 1000 条消息,假设奇数消
我已经在 WorkManager 中加入了一个PeriodicWork,并希望每次完成时都获取它的 Worker 的输出数据,但以下代码似乎不起作用,因为 Log 消息没有出现在 Logcat 中:
我有一个名为 areaOne 的 AngularJS 指令。当我使用 template 时,会显示模板,但当我在 area1.js 中使用 templateUrl 时,不会呈现模板 HTML。 我在这
“:after”选择器在应用于带有 FF 和 IE 的输入时不起作用 input:after { content: "title"; } 而它正在处理 p、a 等。 这是一个错
下面是适用于 oracle 但不适用于 PostgreSQL 的 Sql 查询。 select count(*) from users where id>1 order by username; 我知
position?:fixed 在 chrome 浏览器上不工作,但在 firefox 中工作正常。 我有一个侧边栏可以停止滚动并固定在顶部。它在 firefox 中运行完美,但在 chrome 中,
我有一段代码无法在 Firefox 中运行。当按钮悬停时,.icon 图像不会改变。它在 Chrome 中完美运行。 button.add-to-cart-button .button-left .i
我是一名优秀的程序员,十分优秀!