代码优化
我有一个flink应用程序,它从url/端口读取数据并对其执行处理并返回json。然后我将json转换为字符串并将其转换为kafka。
当前绩效和注意事项
如果我只是执行处理->我可以通过函数运行大约30000个字符串,但是当我添加函数将其转换为字符串,然后下沉到kafka时,我的吞吐量下降到每秒17000个字符串。
在我沉入Kafka之前,我需要把json转换成字符串吗?如果不是,我如何将json objectnode接收到kafka?
还有什么其他的解决办法。我认为瓶颈是to string函数
我尝试使用多种方法将json转换为字符串(.tostring function,stringbuilder to string)。
// Read from Source
val in_stream = env.socketTextStream(url, port, socket_stream_deliminator, socket_connection_retries).setParallelism(1)
// Perform Process
.map(x=>{Process(x)}).setParallelism(1)
// Convert to STring
.map(x => ObjectNodeToString({
val json_string_builder = StringBuilder.newBuilder
json_string_builder.append(x)
return json_string_builder.toString()
})).setParallelism(1)
// sink data
.addSink(new FlinkKafkaProducer[String](broker_hosts, global_topic, new SimpleStringSchema()))
我想保持每秒30000个字符串的处理速度。我可以通过convert to string函数得到它。我能把objectnode直接放到kafka吗?
1条答案
按热度按时间mjqavswn1#
你可以。sink正在将给定对象序列化为字节数组,然后再将其发送给kafka。确保为sink函数提供了能够将objectnode转换为字节数组的序列化程序。
还要确保使用者准备好接收objectnode对象,而不是字符串。