java—如何为每个循环发出不同的元组并在单个字段中声明storm bolt方法?

v9tzhpje  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(283)

我正在从元组中获取所需的字符串。我的字符串附加了一个字符串,因此我将它们拆分并在for each循环中发出。如果我编写一个emit(如果我只发出“id”),bolt不会显示任何异常,但当我为每个添加一个来拆分其他字符串时,bolt会抛出异常,如下所示

java.lang.IllegalArgumentException: Tuple created with wrong number of fields. Expected 2 fields but got 1 fields at backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:58) at backtype.storm.daemon.executor$fn__5694$fn__5707$bolt_emit__5736.invoke(executor.clj:739) at backtype.storm.daemon.executor$fn__5694$fn$reify__5742.emit(executor.clj:763) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:63) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:101) at test.bolts.TInserts.execute(TInsert.java:264)

这是我的密码

public void execute(Tuple tuple) {
   try {
String screenname=tuple.getStringByField("s_name");
    String mentionname=tuple.getStringByField("n_name");
    String mentionid=tuple.getStringByField("n_id");

                 if(mentionid != null && !mentionid.isEmpty()){
                     for(String id:mentionid.split(",")){ 
                        id = id.trim();
                          this.collector.emit(new Values(id));
 this.collector.ack(tuple);
 }//for close
 }//if condtion for id
 if(mentionname != null && !mentionname.isEmpty()){
                     for(String mns:mentionname.split(",")){ 
                        mns = mns.trim();
                          this.collector.emit(new Values(mns));
this.collector.ack(tuple);
   }//for close
 }//if condtion for mentionname

  } //if close
    }
             catch (Exception e) {
           this.collector.reportError(e);
          this.collector.fail(tuple);
             }
       } 

@Override
   public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

//   outputFieldsDeclarer.declare(new Fields("id","mns"));//doesn't throw any   exception if i emit only id(1st for)                     
    outputFieldsDeclarer.declare(new Fields("id","mns"));//thows exception 
}
}

如何在单个emit方法中发出id和mns?我认为这不会引发任何错误或异常。有什么方法可以这样做吗?

this.collector.emit(new Values(id,mns));
owfi6suc

owfi6suc1#

如错误消息所示,输出元组应该包含两个字段,但是您将发出一个包含单个字段的元组。
storm需要一个具有两个输出字段的元组,因为您将输出模式声明为 new Fields("id","mms"))declareOutputFields(...) ;
你自己已经回答了你的问题。发出2字段元组的正确方法是

this.collector.emit(new Values(id,mns));

编辑:
您需要同时跨过两个标记化字段。。。像这样:

String mentionname = tuple.getStringByField("n_name");
String mentionid = tuple.getStringByField("n_id");

String[] idTokens = null;
if(mentionid != null) {
    idTokens = mentionid.split(",");
}
String[] nameTokens = null;
if(mentionname != null){
    nameTokens = mentionname.split(",");
}

for(int i = 0, j = 0; i < idTokens.length && j < nameTokens.lenght; ++i, ++j) {
    this.collector.emit(new Values(idTokens[i].trim(), nameTokens[j].trim()));
}
this.collector.ack(tuple);

相关问题