已关闭,此问题需要更focused。目前不接受答复。
**想改善这个问题吗?**更新问题,使其仅通过editing this post关注一个问题。
7天前关闭
Improve this question
我们可以测试下面的flink工作吗?如果是的话,我们怎么做呢?
@Component
public class RedisController {
@Value("${flink.consumer.topic.name}")
private String flinkConsumerTopic;
/**
* Method: run()
* Description: Entry point for running Flink job that consumes messages from Kafka,
* enriches them with patient details retrieved from Redis, and merges them together.
*
* @throws Exception if Flink job fails to execute
*/
public void run() throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Create a Kafka consumer
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);
// Create a data stream from the Kafka consumer
DataStream<ObjectNode> stream = env.addSource(consumer);
// Define the sliding window with a window size of 10 seconds and slide interval of 5 seconds
DataStream<String> windowedStream = stream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
@Override
public void open(Configuration parameters) throws Exception {
String serverAddress = "localhost";
int serverPort = 8888;
// Create a socket and connect to the server
socket = new Socket(serverAddress, serverPort);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {
// Serialize the input data to a JSON string
String inputJson = input.toString();
byte[] inputBytes = inputJson.getBytes();
// Send the input data to the server
outputStream.write(inputBytes);
outputStream.flush();
// Measure the start time
long startTime = System.currentTimeMillis();
// Read the response from the server
byte[] responseBytes = new byte[15034];
String response = "";
int bytesRead = inputStream.read(responseBytes);
if (bytesRead != -1) {
response = response + new String(responseBytes, 0, bytesRead);
System.out.println("Response from server: " + response);
}
// Measure the end time
long endTime = System.currentTimeMillis();
// Calculate the time taken for the response
long responseTime = endTime - startTime;
System.out.println("Response time: " + responseTime + " ms");
out.collect(response);
}
});
// Create a Kafka producer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output_topic", new SimpleStringSchema(), properties);
// Send the windowed stream to the Kafka producer
windowedStream.addSink(kafkaProducer);
// Execute the job
env.execute("Tumbling Window Example");
}
}
我需要为这个flink运行junit。依赖版本工作不正常。
1条答案
按热度按时间6yoyoihd1#
您可以使用kafka-junit来启动一个对测试有用的嵌入式Kafka主题。依赖关系如下所示:
您需要根据您的Flink版本将版本设置为适当的版本。
请注意,你没有提到你的Flink版本(这总是一个很好的信息,包括在一个问题),但
FlinkKafkaProducer
已经被弃用了一段时间。你应该使用KafkaSink。