我的kafkaspout不会在hdp中使用来自kafka代理的消息

6vl6ewon  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我开发了storm拓扑来接收kafka经纪人在hortonworks上提供的jsonarray数据,
我不知道为什么我的kafkaspout不使用来自hdp中kafka代理的消息,但是成功提交了storm拓扑,但是当我可视化拓扑时:0%的数据已被使用!!
拓扑可视化
这是我的计划课:

public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;

public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";

public List<Object> deserialize(byte[] bytes) {

        try{
            String clientInfos = new String(bytes, "UTF-8");
               JSONArray JSON = new JSONArray(clientInfos);
                for(int i=0;i<JSON.length();i++) {
                    JSONObject object_clientInfos=JSON.getJSONObject(i);   
                try{     

                    //Récupérations des données

                        this.codeBanque=object_clientInfos.getString("codeBanque");
                        this.codeAgence=object_clientInfos.getString("codeAgence");
                        this.codeGuichet=object_clientInfos.getString("codeGuichet");
                        this.devise=object_clientInfos.getString("devise");
                        this.numCompte=object_clientInfos.getString("numCompte");
                        this.codeClient=object_clientInfos.getString("codeClient");
                        this.codeOperation=object_clientInfos.getString("codeOperation");
                        this.sensOperation=object_clientInfos.getString("sensOperation");
                        this.montantOperation=object_clientInfos.getString("montantOperation");
                        this.dateValeur=object_clientInfos.getString("dateValeur");
                        this.dateComptable=object_clientInfos.getString("dateComptable");
                        this.utilisateur=object_clientInfos.getString("utilisateur");

                    }
                    catch(Exception e) 
                              {
                                  e.printStackTrace(); 
                              }

    }// End For Loop

      } catch (JSONException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (UnsupportedEncodingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
         return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
                 montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize

public Fields getOutputFields() {
        return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
                CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
    }

}

以及属性文件:


# Broker host

kafka.zookeeper.host.port=sandbox.hortonworks.com

# Kafka topic to consume.

kafka.topic=INFOCLIENT

# Location in ZK for the Kafka spout to store state.

kafka.zkRoot=/client_infos_sprout

# Kafka Spout Executors.

spout.thread.count=1

当我使用另一个消费者时,存储在kafka代理中的数据如下:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]

所以我的问题是为什么它不消费Kafka经纪人的信息?
求你了我需要帮助

vdzxcuhz

vdzxcuhz1#

正如您在日志中发现的那样,您的喷口不会“消费”消息,因为拓扑有一个错误并且没有确认元组-因此喷口将重放它们。这是工作的设计。
一旦拓扑结构稳定,您将观察到偏移量正在增加。在此之前,喷口将向拓扑发送消息,但您将无法观察结果。
如果没有看到calculclerib方法,以及它是如何集成到您的拓扑中的,我们将无法帮助您调试该方面。

相关问题