scala—使用flink将json导入kafka的最快方法

dkqlctbz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(568)

代码优化

我有一个flink应用程序,它从url/端口读取数据并对其执行处理并返回json。然后我将json转换为字符串并将其转换为kafka。

当前绩效和注意事项

如果我只是执行处理->我可以通过函数运行大约30000个字符串,但是当我添加函数将其转换为字符串,然后下沉到kafka时,我的吞吐量下降到每秒17000个字符串。
在我沉入Kafka之前,我需要把json转换成字符串吗?如果不是,我如何将json objectnode接收到kafka?
还有什么其他的解决办法。我认为瓶颈是to string函数
我尝试使用多种方法将json转换为字符串(.tostring function,stringbuilder to string)。

// Read from Source
 val in_stream = env.socketTextStream(url, port,      socket_stream_deliminator, socket_connection_retries).setParallelism(1)

 // Perform Process
 .map(x=>{Process(x)}).setParallelism(1)

 // Convert to STring
 .map(x => ObjectNodeToString({
     val json_string_builder = StringBuilder.newBuilder
     json_string_builder.append(x)
     return json_string_builder.toString()
 })).setParallelism(1)

 // sink data
 .addSink(new FlinkKafkaProducer[String](broker_hosts, global_topic, new SimpleStringSchema()))

我想保持每秒30000个字符串的处理速度。我可以通过convert to string函数得到它。我能把objectnode直接放到kafka吗?

mjqavswn

mjqavswn1#

你可以。sink正在将给定对象序列化为字节数组,然后再将其发送给kafka。确保为sink函数提供了能够将objectnode转换为字节数组的序列化程序。
还要确保使用者准备好接收objectnode对象,而不是字符串。

相关问题