我刚刚建立了datatorrentrts(apacheapex)平台并运行了pi演示。我想使用来自kafka的“avro”消息,然后将数据聚合并存储到hdfs中。我能得到这个或Kafka的示例代码吗?
mrwjdhj31#
下面是一个完整的工作应用程序的代码使用新的kafka输入操作符和apex malhar的文件输出操作符。它将字节数组转换为字符串,并使用大小有界的滚动文件(本例中为1k)将它们写入hdfs;在文件大小达到绑定之前,它将有一个带有 .tmp 分机。您可以按照中devt的建议在这两个操作符之间插入其他操作符https://stackoverflow.com/a/36666388):
.tmp
package com.example.myapexapp; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; @ApplicationAnnotation(name="MyFirstApplication") public class KafkaApp implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator()); in.setInitialPartitionCount(1); in.setTopics("test"); in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); //in.setClusters("localhost:2181"); in.setClusters("localhost:9092"); // NOTE: need broker address, not zookeeper LineOutputOperator out = dag.addOperator("out", new LineOutputOperator()); out.setFilePath("/tmp/FromKafka"); out.setFileName("test"); out.setMaxLength(1024); // max size of rolling output file // create stream connecting input adapter to output adapter dag.addStream("data", in.outputPort, out.input); } } /** * Converts each tuple to a string and writes it as a new line to the output file */ class LineOutputOperator extends AbstractFileOutputOperator<byte[]> { private static final String NL = System.lineSeparator(); private static final Charset CS = StandardCharsets.UTF_8; private String fileName; @Override public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); } @Override protected String getFileName(byte[] tuple) { return fileName; } public String getFileName() { return fileName; } public void setFileName(final String v) { fileName = v; } }
hl0ma9xz2#
在较高的层次上,您的应用程序代码类似于,kafkasingleportstringinputoperator->avrotopojo->dimensions aggregator->abstractfileoutputoperator的实现kafkasingleportstringinputooperator-如果您使用的是其他数据类型,则可以使用kafkasingleportbytearrayinputoperator或编写自定义实现。阿夫罗托波霍-https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/avrotopojo.java.此运算符将genericrecord转换为用户指定的pojo。用户需要指定应发出的pojo类,否则将使用反射。当前,此运算符用于从容器文件中读取genericrecords,并且只支持基元类型。要从kafka中读取,您可以沿类似的线对运算符建模,并向其添加架构对象解析传入的记录。processtuple方法中的类似操作应该可以工作,schema schema=new schema.parser().parse());genericdatumreader=新的genericdatumreader(模式);维度聚合器-您可以选择此处给定的聚合器之一-https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensions 或者按照同样的思路编写一个自定义聚合器。filewriter-来自上面文章中的示例。
2条答案
按热度按时间mrwjdhj31#
下面是一个完整的工作应用程序的代码使用新的kafka输入操作符和apex malhar的文件输出操作符。它将字节数组转换为字符串,并使用大小有界的滚动文件(本例中为1k)将它们写入hdfs;在文件大小达到绑定之前,它将有一个带有
.tmp
分机。您可以按照中devt的建议在这两个操作符之间插入其他操作符https://stackoverflow.com/a/36666388):hl0ma9xz2#
在较高的层次上,您的应用程序代码类似于,
kafkasingleportstringinputoperator->avrotopojo->dimensions aggregator->abstractfileoutputoperator的实现
kafkasingleportstringinputooperator-如果您使用的是其他数据类型,则可以使用kafkasingleportbytearrayinputoperator或编写自定义实现。
阿夫罗托波霍-https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/avrotopojo.java.
此运算符将genericrecord转换为用户指定的pojo。用户需要指定应发出的pojo类,否则将使用反射。当前,此运算符用于从容器文件中读取genericrecords,并且只支持基元类型。要从kafka中读取,您可以沿类似的线对运算符建模,并向其添加架构对象解析传入的记录。processtuple方法中的类似操作应该可以工作,schema schema=new schema.parser().parse());genericdatumreader=新的genericdatumreader(模式);
维度聚合器-您可以选择此处给定的聚合器之一-https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensions 或者按照同样的思路编写一个自定义聚合器。
filewriter-来自上面文章中的示例。