- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在从 spout 中获取数据。每个 bolt 都会将映射字段插入到我数据库中的不同表中。但是我的数据库表有约束。在我的测试表中,我有两个名为 user-details 和 my-details 的表,它们的约束允许首先填充用户表(首先应该插入),然后仅插入我的详细信息表。当我运行拓扑时,仅插入用户表,因为当 bolt 对数据库执行插入查询时,它允许只有 psqlbolt 首先插入(由于约束)和 psqlbolt1 抛出异常说找不到用户 id。所以当我这样做时,我在 psqlbolt1 中保持 (1000)sleep 两个 bolt 正在工作。但是当我对许多 bolt 应用相同时( 12) 等待时间增加, bolt 执行失败,表示超出 bolt 等待时间。我如何才能先执行用户字段,然后只有 psql1 应该开始插入。
我的拓扑类
public class Topology {
ConnectionProvider cp;
protected static final String JDBC_CONF = "jdbc.conf";
protected static final String TABLE_NAME = "users";
protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
" and user_department.user_id = ?";
public static void main(String[] args) throws Exception{
String argument = args[0];
JdbcMapper jdbcMapper;
TopologyBuilder builder = new TopologyBuilder();
Map map = Maps.newHashMap();
map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
map.put("dataSource.url","jdbc:postgresql://localhost:5432/twitter_analysis?user=postgres");
ConnectionProvider cp = new MyConnectionProvider(map);
jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);
List<Column> schemaColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER), new Column ("user_name",Types.VARCHAR),new Column("create_date", Types.TIMESTAMP));
JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
.withInsertQuery("insert into user_details (id, user_name, created_timestamp) values (?,?,?)");
builder.setSpout("myspout", new UserSpout(), 1);
builder.setBolt("Psql_Bolt", userPersistanceBolt,1).shuffleGrouping("myspout");
jdbcMapper = new SimpleJdbcMapper("My_details", cp);
List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id", Types.INTEGER), new Column ("my_name",Types.VARCHAR));
JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);
PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
.withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");
//builder.setSpout("myspout", new UserSpout(), 1);
builder.setBolt("Psql_Bolt1", userPersistanceBolt1,1).shuffleGrouping("myspout");
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.setNumWorkers(3);
if (argument.equalsIgnoreCase("runLocally")){
System.out.println("Running topology locally...");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Twitter Test Storm-postgresql", conf, builder.createTopology());
}
else {
System.out.println("Running topology on cluster...");
StormSubmitter.submitTopology("Topology_psql", conf, builder.createTopology());
}
}}
我的 bolt :psql1
public class PsqlBolt1 extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);
private String tableName;
private String insertQuery;
private JdbcMapper jdbcMapper;
public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
super(connectionProvider);
this.jdbcMapper = jdbcMapper;
}
public PsqlBolt1 withInsertQuery(String insertQuery) {
this.insertQuery = insertQuery;
System.out.println("query passsed.....");
return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
super.prepare(map, topologyContext, collector);
if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
throw new IllegalArgumentException("You must supply either a tableName or an insert Query.");
}
}
@Override
public void execute(Tuple tuple) {
try {
Thread.sleep(1000);
List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists);
} else {
this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
}
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}}
psql bolt :
public class PsqlBolt extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
private String tableName;
private String insertQuery;
private JdbcMapper jdbcMapper;
public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
super(connectionProvider);
this.jdbcMapper = jdbcMapper;
}
public PsqlBolt withTableName(String tableName) {
this.tableName = tableName;
return this;
}
public PsqlBolt withInsertQuery(String insertQuery) {
this.insertQuery = insertQuery;
System.out.println("query passsed.....");
return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
super.prepare(map, topologyContext, collector);
if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
throw new IllegalArgumentException("You must supply either a tableName or an insert Query.");
}
}
@Override
public void execute(Tuple tuple) {
try {
List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists);
} else {
this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
}
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}}
当我对许多 bolt 应用相同的方法时,我的拓扑颜色变为红色(等待状态)。
这是 bolt 等待时间。第一个 bolt 没有任何 sleep 。我在第二个 bolt 中保持 1 秒 sleep ,其余所有 bolt 都有 2 秒 sleep 。
如何替换那个 sleep 来执行我的工作,或者如果我增加 supervisor 的数量,问题会得到解决吗?
最佳答案
您可以重组您的拓扑结构,以便 spout 将消息 M 发送到 bolt 1。 bolt 1 可以对此消息执行一些操作,并且仅当操作成功时才将相同的消息转发到 bolt 2。这样, Action 之间就有了严格的顺序。
关于database - 当每个 bolt 从同一点获取数据时,如何一个接一个地执行 bolt ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35079414/
平台:Mac OSX Maverics Web 服务器:内置 apache,支持 mod-rewrite,启用 php5_module 重现步骤: cd/Users/用户名/站点/ mkdir bol
如何在 Bolt 中列出分类法中的所有术语?不是应用于记录的术语而是所有现有术语(如标签云或类别列表侧边菜单)? 最佳答案 直接在模板中,可以这样做: {% for category in app.c
我有两个可序列化的类A和B。并且有两种spout A_spout和B_spout。每个spout向bolt C发出并行类。但是方法execute中的元组没有区别,所以我如何区分它们? 最佳答案 每个输
我有一个拓扑,例如由 1 个喷嘴和 4 个 bolt 组成 spout 1 -> bolt A -> bolt B -> bolt C -> bolt D 如果 bolt A 中的某些条件不满足,我们
我正在从 spout 中获取数据。每个 bolt 都会将映射字段插入到我数据库中的不同表中。但是我的数据库表有约束。在我的测试表中,我有两个名为 user-details 和 my-details 的
在我的 Storm 拓扑中,我将一个大的程序逻辑保存在单个 Bolt 中。现在我把大的程序逻辑分成线性排列的小 bolt 。它的性能有什么不同吗? 最佳答案 根据您评论中的描述: In my topo
我正在使用 Bolt.cm 并且在编辑页面或条目时,右侧有一个称为“堆栈”的部分。在 Bolt 网站上,它说 Our Stack functionality contains your latest
我正在使用 Apache Storm,我想知道是否可以像这样用另一个 bolt 组合一个 bolt : public class MyNewBolt extends BaseRichBolt {
已关闭。此问题旨在寻求有关书籍、工具、软件库等的建议。不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以
将 BoltA 和 BoltB 的输出发送到 BoltC 的最简单方法是什么。我必须使用 Joins 还是有任何更简单的解决方案。 A 和 B 具有相同的字段(ts、metric_name、metri
注意: Bolt1 包含前三个质数(2,3,5)的列表。 Bolt2 包含后三组质数(7,11,13)的列表。 在 Bolt3 中,它只是检查数字是否为素数。 从第一个 bolt 开始,我可以从 sp
bolt 是否可以从不同的 spout/bolt 接收多个输入元组?例如,Bolt C 接收来自 Spout A 的输入元组和来自 Bolt B 的输入元组以进行处理。我应该如何实现它?我的意思是为
得到这个错误 framework not found Bolts for architecture x86_64 逐字逐句地遵循 Facebook 的指南,但我唯一能想到的链接错误发生在框架搜索路径的
我正在尝试整合 Amazon Web SDK适用于 iOS。我手动安装了它(不使用 CocoPods)并且在文档中它说如果你安装了 facebook SDK 不包括 Bolts.framework 因
我正在尝试使用 Facebook/Parse Bolts 框架在 Android 上并行运行多个任务。 documentation for running tasks in parallel似乎表明
Error:(39, 13) Failed to resolve: com.parse.bolts:bolts-android:1.+ 这是我的build.gradle dependencies {
我想创建一个内容类型,其中包含诸如“年月”和“产品类型”之类的选择字段,并根据这两个字段的值自动生成标题字段。 这是因为两个选择字段值的描述性足够,我想减少 CMS 的最终用户为使标题显示在管理界面中
操作系统:Windows 10 专业版 Node :6.1.0 NPM:3.8.6 Gulp:CLI 版本 3.9.1 因此,firebase-bolt 已使用 npm install -g fire
我是 phone gap 的新手。在我的项目中集成了 facebook 插件。运行后显示错误 `Error:A problem occurred configuring root project 'a
我正在尝试构建我的项目。由于此错误,构建失败。 Users/company/Desktop/app/DemoApp/Pods/Bolts/Bolts/iOS/BFAppLink.m:11:9:找不到“
我是一名优秀的程序员,十分优秀!