gpt4 book ai didi

java - 我的 KafkaSpout 不使用 HDP 中来自 Kafka Brokers 的消息

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:53:57 24 4
gpt4 key购买 nike

我开发了 storm 拓扑来从 hortonworks 上的 kafka 代理接收 JSONArray 数据,

我不知道为什么我的 kafkaSpout 不使用 HDP 中 Kafka Brokers 的消息,但是 Storm 拓扑已成功提交,但是当我可视化拓扑时:0% 数据已被消耗!!

topology visualisation

这是我的 Scheme 类:

public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;

public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";

public List<Object> deserialize(byte[] bytes) {

try{
String clientInfos = new String(bytes, "UTF-8");
JSONArray JSON = new JSONArray(clientInfos);
for(int i=0;i<JSON.length();i++) {
JSONObject object_clientInfos=JSON.getJSONObject(i);
try{

//Récupérations des données

this.codeBanque=object_clientInfos.getString("codeBanque");
this.codeAgence=object_clientInfos.getString("codeAgence");
this.codeGuichet=object_clientInfos.getString("codeGuichet");
this.devise=object_clientInfos.getString("devise");
this.numCompte=object_clientInfos.getString("numCompte");
this.codeClient=object_clientInfos.getString("codeClient");
this.codeOperation=object_clientInfos.getString("codeOperation");
this.sensOperation=object_clientInfos.getString("sensOperation");
this.montantOperation=object_clientInfos.getString("montantOperation");
this.dateValeur=object_clientInfos.getString("dateValeur");
this.dateComptable=object_clientInfos.getString("dateComptable");
this.utilisateur=object_clientInfos.getString("utilisateur");

}
catch(Exception e)
{
e.printStackTrace();
}


}// End For Loop



} catch (JSONException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
montantOperation,dateValeur, dateComptable,utilisateur);

}// End Function deserialize

public Fields getOutputFields() {
return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
}


}

和属性文件:

#Broker host
kafka.zookeeper.host.port=sandbox.hortonworks.com

#Kafka topic to consume.
kafka.topic=INFOCLIENT

#Location in ZK for the Kafka spout to store state.
kafka.zkRoot=/client_infos_sprout

#Kafka Spout Executors.
spout.thread.count=1

当我使用另一个消费者时,存储在 Kafka Brokers 中的数据如下:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]

所以我的问题是为什么它不使用来自 Kafka Brokers 的消息?

我需要帮助

最佳答案

正如您在日志中发现的那样,您的 Spout 不会“消费”消息,因为拓扑有错误并且不会确认元组 - 因此 Spout 将重播它们。这是按设计工作的。

一旦您的拓扑结构稳定,您将观察到偏移量正在增加。在此之前,Spout 会将消息发送到拓扑中,但您将无法观察到结果。

没有看到 calculCleRib 方法,也没有看到它是如何集成到您的拓扑中的,我们无法帮助您调试这方面的问题。

关于java - 我的 KafkaSpout 不使用 HDP 中来自 Kafka Brokers 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37012056/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com