stormjms收集avro消息并发送到下游?

v440hwme  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(306)

我是新的avro格式。我正在尝试使用stormjms-spout从jms队列收集avro消息,并使用hdfs-bolt将它们发送到hdfs。
队列正在发送avro,但我无法使用hdfs螺栓以avro格式获取它们。
如何正确地收集avro消息并将它们发送到下游而不在hdfs中出现编码错误。

46scxncf

46scxncf1#

现有的hdfs bolt不支持编写avro文件,我们需要通过以下更改来克服这一点。在这个示例代码中,我使用的是从我的喷口获取jms消息,并将这些jms字节消息转换为avro并将它们发送到hdfs。
此代码可以作为修改abstracthdfsbolt中方法的示例。

public void execute(Tuple tuple) {          
        try {               
            long length = bytesMessage.getBodyLength();
            byte[] bytes = new byte[(int)length];
            ///////////////////////////////////////
            bytesMessage.readBytes(bytes);
            String replyMessage = new String(bytes, "UTF-8");

            datumReader = new SpecificDatumReader<IndexedRecord>(schema);
            decoder = DecoderFactory.get().binaryDecoder(bytes, null);

            result = datumReader.read(null, decoder);                               
            synchronized (this.writeLock) {                 
                dataFileWriter.append(result);                                      
                dataFileWriter.sync();
                this.offset += bytes.length;                    
               if (this.syncPolicy.mark(tuple, this.offset)) {
                   if (this.out instanceof HdfsDataOutputStream) {
                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
                    } else {
                        this.out.hsync();
                        this.out.flush();
                    }
                    this.syncPolicy.reset();
                }
               dataFileWriter.flush();
            }

            if(this.rotationPolicy.mark(tuple, this.offset)){
                rotateOutputFile(); // synchronized
                this.offset = 0;
                this.rotationPolicy.reset();
            }
        } catch (IOException | JMSException e) {
            LOG.warn("write/sync failed.", e);
            this.collector.fail(tuple);
        } 
    }

@Override
void closeOutputFile() throws IOException {
    this.out.close();
}

@Override
Path createOutputFile() throws IOException {
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    this.out = this.fs.create(path);
    dataFileWriter.create(schema, out);
    return path;
}

@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
    // TODO Auto-generated method stub
     LOG.info("Preparing HDFS Bolt...");
     try {

            schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
        } catch (IOException e1) {              
            e1.printStackTrace();
        }
     this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
     dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
     JMSAvroUtils JASV = new JMSAvroUtils();         
}

相关问题