我有一个kafka spark作业,它是一个日常批处理作业,它从hadoop位置顺序读取hdfs部分文件,并按顺序生成它们。
在最后一批中,我选取批中的最后一条记录,并在其上创建一个新的Dataframe(df2),其余记录转到另一个Dataframe(df1)。
我首先生产df1,然后生产df2,因为我希望使用者(这是一个流作业)在从producer发送的所有批中的最后一批中使用df2。但这并没有发生。在消费端,df2中的记录不是在最后一批中被消费的,而是在最后第二批或最后第三批中被消费的,但不是在最后一批中。
下面是示例代码:
class XYZ{
create new Kafka Instance
.
ABC.getInstance(topic,<required Kafka Parameters>);
.
.
f(!(df2.toJavaRDD().isEmpty()) && !(df1.toJavaRDD().isEmpty())) {
calling produceDF method(df1, <required Kafka Parameters>);
calling produceDF method(df2, <required Kafka Parameters>);
}
public void produceDF(Dataset<Row> df, Broadcast<ABC> abc>){
df.foreachPartition(partitionOfRecords ->{
final ABC kafkaProducer = abc.value();
while (partitionOfRecords.hasNext()) {
Row row = partitionOfRecords.next();
kafkaProducer.produceDF(<key>, <value>);
}
kafkaProducer.flush();
});
}
}
class ABC implements Serializable{
private transient KafkaProducer<String,String> kafkaProducer = null;
private static ABC abcInstance;
public static ABC getInstance(String topic, Map<String,Object> kafkaParams) {
if(abcInstance==null){
synchronized (ABC.class){
if(abcInstance==null){
abcInstance = new ABC(topic,kafkaParams);
}
}
}
return abcInstance;
}
public void produceDF(String key,String val) {
if( null==kafkaProducer) {
kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
kafkaProducer.close();
}
});
}
if(key!=null) {
kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val));
}
public void flush(){
if(null!=kafkaProducer)
kafkaProducer.flush();
}
}
}
请告知我在最后一批中使用df2需要采用什么方法
暂无答案!
目前还没有任何答案,快来回答吧!