我正在flink中创建一个应用程序
阅读主题中的消息
做一些简单的处理
将结果写入其他主题
我的代码可以工作,但是它不能并行运行
我该怎么做?
我的代码似乎只在一个线程/块上运行?
在flink web Jmeter 板上:
应用进入运行状态
但是,在overview子任务中只显示了一个块
和接收/发送的字节数,接收/发送的记录始终为零(无更新)
这是我的代码,请帮助我学习如何分割我的应用程序,以便能够并行运行,我写的应用程序正确吗?
public class SimpleApp {
public static void main(String[] args) throws Exception {
// create execution environment INPUT
StreamExecutionEnvironment env_in =
StreamExecutionEnvironment.getExecutionEnvironment();
// event time characteristic
env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// production Ready (Does NOT Work if greater than 1)
env_in.setParallelism(Integer.parseInt(args[0].toString()));
// configure kafka consumer
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("auto.offset.reset", "earliest");
// create a kafka consumer
final DataStream<String> consumer = env_in
.addSource(new FlinkKafkaConsumer09<>("test", new
SimpleStringSchema(), properties));
// filter data
SingleOutputStreamOperator<String> result = consumer.filter(new
FilterFunction<String>(){
@Override
public boolean filter(String s) throws Exception {
return s.substring(0, 2).contentEquals("PS");
}
});
// Process Data
// Transform String Records to JSON Objects
SingleOutputStreamOperator<JSONObject> data = result.map(new
MapFunction<String, JSONObject>()
{
@Override
public JSONObject map(String value) throws Exception
{
JSONObject jsnobj = new JSONObject();
if(value.substring(0, 2).contentEquals("PS"))
{
// 1. Raw Data
jsnobj.put("Raw_Data", value.substring(0, value.length()-6));
// 2. Comment
int first_index_comment = value.indexOf("$");
int last_index_comment = value.lastIndexOf("$") + 1;
// - set comment
String comment =
value.substring(first_index_comment, last_index_comment);
comment = comment.substring(0, comment.length()-6);
jsnobj.put("Comment", comment);
}
else {
jsnobj.put("INVALID", value);
}
return jsnobj;
}
});
// Write JSON to Kafka Topic
data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
"FilteredData",
new SimpleJsonSchema()));
env_in.execute();
}
}
我的代码确实可以工作,但它似乎只在web界面中的单个线程(显示了一个块)上运行(没有数据传递,因此发送/接收的字节不会更新)。
如何使它并行运行?
1条答案
按热度按时间eqzww0vc1#
要并行运行您的工作,您可以做两件事:
在env级别增加工作的并行性-例如
streamexecutionenvironment env_in=streamexecutionenvironment.getexecutionenvironment().setparallelism(4);
但这只会在flink端读取数据后增加并行性,因此如果源代码生成数据的速度更快,则可能无法充分利用它。
要完全并行化作业,请为kafka主题设置多个分区,理想情况下是flink作业所需的并行量。因此,在创建Kafka主题时,您可能需要执行以下操作:
bin/kafka-topics.sh--创建--缩放器localhost:2181 --replication-factor 3--分区4--主题测试