import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import com.quotes.Quotes;
public class BinaryDecoderApp {
public static void main(String[] args) throws IOException {
Schema schema = new Schema.Parser().parse(new File("src/main/resources/lpquotes.avsc"));
File avroFile = new File("src/main/resources/FlumeData.1619451750874");
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new FileInputStream(avroFile), null);
SpecificDatumReader<Quotes> datumReader = new SpecificDatumReader<>(schema);
while (!decoder.isEnd()) {
Quotes record = datumReader.read(null, decoder);
System.out.println(record);
}
}
}
上面的简单代码就是我用来解码hdfs中flume接收的avro事件的代码。我遇到的问题是,在读取文件的第一个事件(正确打印)之后,我有一个例外:
Exception in thread "main" java.io.EOFException
at org.apache.avro.io.BinaryDecoder.readDouble(BinaryDecoder.java:272)
at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:197)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:201)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at com.quotes.reader.BinaryDecoderApp.main(BinaryDecoderApp.java:27)
显然,flume可能会在每个avro事件的末尾放置一个字符,并产生这个异常。
但是我找不到一个方法来正确地阅读它。
Flume配置如下:
tier1.channels = c1
tier1.sources = r1
tier1.sinks = k1
# AvroSource r1
tier1.sources.r1.channels = c1
tier1.sources.r1.type = avro
tier1.sources.r1.bind = 0.0.0.0
tier1.sources.r1.port = 60001
# Channel c1
tier1.channels.c1.type = file
# HdfsSink k1
tier1.sinks.k1.channel=c1
tier1.sinks.k1.type=hdfs
## HdfsSink k1 sinking properties
tier1.sinks.k1.hdfs.path=/
tier1.sinks.k1.hdfs.fileType = DataStream
tier1.sinks.k1.hdfs.batchSize = 100000
tier1.sinks.k1.hdfs.rollSize = 0
tier1.sinks.k1.hdfs.rollCount = 0
tier1.sinks.k1.hdfs.rollInterval = 60
tier1.sinks.k1.hdfs.threadPoolSize = 500
tier1.sinks.k1.hdfs.callTimeout = 180000
tier1.sinks.k1.hdfs.serializer = avro_event
tier1.sinks.k1.hdfs.writeFormat = Text
暂无答案!
目前还没有任何答案,快来回答吧!