我是新的avro格式。我正在尝试使用stormjms-spout从jms队列收集avro消息,并使用hdfs-bolt将它们发送到hdfs。队列正在发送avro,但我无法使用hdfs螺栓以avro格式获取它们。如何正确地收集avro消息并将它们发送到下游而不在hdfs中出现编码错误。
46scxncf1#
现有的hdfs bolt不支持编写avro文件,我们需要通过以下更改来克服这一点。在这个示例代码中,我使用的是从我的喷口获取jms消息,并将这些jms字节消息转换为avro并将它们发送到hdfs。此代码可以作为修改abstracthdfsbolt中方法的示例。
public void execute(Tuple tuple) { try { long length = bytesMessage.getBodyLength(); byte[] bytes = new byte[(int)length]; /////////////////////////////////////// bytesMessage.readBytes(bytes); String replyMessage = new String(bytes, "UTF-8"); datumReader = new SpecificDatumReader<IndexedRecord>(schema); decoder = DecoderFactory.get().binaryDecoder(bytes, null); result = datumReader.read(null, decoder); synchronized (this.writeLock) { dataFileWriter.append(result); dataFileWriter.sync(); this.offset += bytes.length; if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); this.out.flush(); } this.syncPolicy.reset(); } dataFileWriter.flush(); } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); } } catch (IOException | JMSException e) { LOG.warn("write/sync failed.", e); this.collector.fail(tuple); } } @Override void closeOutputFile() throws IOException { this.out.close(); } @Override Path createOutputFile() throws IOException { Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); this.out = this.fs.create(path); dataFileWriter.create(schema, out); return path; } @Override void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException { // TODO Auto-generated method stub LOG.info("Preparing HDFS Bolt..."); try { schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc")); } catch (IOException e1) { e1.printStackTrace(); } this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); datumWriter = new SpecificDatumWriter<IndexedRecord>(schema); dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter); JMSAvroUtils JASV = new JMSAvroUtils(); }
1条答案
按热度按时间46scxncf1#
现有的hdfs bolt不支持编写avro文件,我们需要通过以下更改来克服这一点。在这个示例代码中,我使用的是从我的喷口获取jms消息,并将这些jms字节消息转换为avro并将它们发送到hdfs。
此代码可以作为修改abstracthdfsbolt中方法的示例。