gpt4 book ai didi

java - 在 trident 中实现事务拓扑的问题

转载 作者:行者123 更新时间:2023-11-30 11:24:24 28 4
gpt4 key购买 nike

我的用例是调用查询以使用不同的输入参数从数据库中获取记录。取出记录后,做一些处理,最后写入文件。我的输入参数值取决于上一个查询的完整处理。我的问题是,我如何在 spout 中知道先前查询的处理已完成,即记录已成功写入文件。

我尝试实现 ITridentSpout 但仍然没有得到任何解决方案。下面是我的 ITridentSpout 代码:

三叉戟协调器.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{

ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
boolean result=true;

@Override
public void success(long txid) {
System.out.println("inside success mehod with txid as "+txid);
if(prevMetadata.containsKey(txid)){
prevMetadata.replace(txid, "SUCCESS");
}
}

@Override
public boolean isReady(long txid) {
if(!prevMetadata.isEmpty()){
result=true;
for(Long txId:prevMetadata.keySet()){
System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId) );
if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
prevMetadata.put(txid, "STARTED");
result= true;
}
}
}
else{
prevMetadata.put(txid, "STARTED");
result= true;
}

System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result);

return result;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata);

return prevMetadata;
}
}

TridentEmitterImpl.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {

@Override
public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString());
collector.emit(new Values("preeti"));
}

@Override
public void success(TransactionAttempt tx) {
System.out.println("inside success of emitter with tx id as "+tx.getTransactionId());

}

@Override
public void close() {
// TODO Auto-generated method stub

}
}

三叉戟SpoutImpl.java

package com.TransactionlTopology;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {

@Override
public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {

return new TridentCoordinator();
}

@Override
public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {

return new TridentEmitterImpl();
}

@Override
public Map getComponentConfiguration() {

Map<String,String> newMap=new HashMap<String, String>();
newMap.put("words","preeti");
return newMap;
}

@Override
public Fields getOutputFields() {

return new Fields("word");
}

}

也无法理解 initializeTransaction 中将出现哪些值作为 prevMetaDatacurMetada。请提供一些解决方案

最佳答案

您有多种选择。不过,也许最简单的方法是在您的拓扑中添加最后一个 bolt ,在写入文件后,通知 spout 可以通过您的 spout 可以监视的消息队列开始下一个查询。当 spout 接收到这个通知时,它就可以处理下一个查询。

但是,更一般地说,这似乎是 Storm 的一个有问题的用例。您的拓扑结构的很多资源可能会在很多时候处于空闲状态,因为您一次只有一个事务通过它运行。显然我不知道你问题的所有细节,但是事务之间的这种依赖性限制了使用 Storm 所增加的复杂性的值(value)。

关于java - 在 trident 中实现事务拓扑的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20681276/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com