是否可以模拟Kafka环境并在本地运行Flink junit以进行测试?[关闭]

nkoocmlb  于 2023-06-20  发布在  Apache
关注(0)|答案(1)|浏览(144)

已关闭,此问题需要更focused。目前不接受答复。
**想改善这个问题吗?**更新问题,使其仅通过editing this post关注一个问题。

7天前关闭
Improve this question
我们可以测试下面的flink工作吗?如果是的话,我们怎么做呢?

@Component

public class RedisController {

    @Value("${flink.consumer.topic.name}")

    private String flinkConsumerTopic;

    /**

     * Method: run()

     * Description: Entry point for running Flink job that consumes messages from Kafka,

     * enriches them with patient details retrieved from Redis, and merges them together.

     *

     * @throws Exception if Flink job fails to execute

     */

    public void run() throws Exception {

        // Set up the execution environment

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set the Kafka properties

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");

        // Create a Kafka consumer

        FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);

// Create a data stream from the Kafka consumer

        DataStream<ObjectNode> stream = env.addSource(consumer);

// Define the sliding window with a window size of 10 seconds and slide interval of 5 seconds

        DataStream<String> windowedStream = stream

                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))

                .apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {

                    private Socket socket;

                    private OutputStream outputStream;

                    private InputStream inputStream;

                    @Override

                    public void open(Configuration parameters) throws Exception {

                        String serverAddress = "localhost";

                        int serverPort = 8888;

                        // Create a socket and connect to the server

                        socket = new Socket(serverAddress, serverPort);

                        outputStream = socket.getOutputStream();

                        inputStream = socket.getInputStream();

                    }

                    @Override

                    public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {

                        // Serialize the input data to a JSON string

                        String inputJson = input.toString();

                        byte[] inputBytes = inputJson.getBytes();

                        // Send the input data to the server

                        outputStream.write(inputBytes);

                        outputStream.flush();

                        // Measure the start time

                        long startTime = System.currentTimeMillis();

                        // Read the response from the server

                        byte[] responseBytes = new byte[15034];

                        String response = "";

                        int bytesRead = inputStream.read(responseBytes);

                        if (bytesRead != -1) {

                            response = response + new String(responseBytes, 0, bytesRead);

                            System.out.println("Response from server: " + response);

                        }

                        // Measure the end time

                        long endTime = System.currentTimeMillis();

                        // Calculate the time taken for the response

                        long responseTime = endTime - startTime;

                        System.out.println("Response time: " + responseTime + " ms");

                        out.collect(response);

                    }

                });

        // Create a Kafka producer

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(

                "output_topic", new SimpleStringSchema(), properties);

        // Send the windowed stream to the Kafka producer

        windowedStream.addSink(kafkaProducer);

        // Execute the job

        env.execute("Tumbling Window Example");

    }

}

我需要为这个flink运行junit。依赖版本工作不正常。

6yoyoihd

6yoyoihd1#

您可以使用kafka-junit来启动一个对测试有用的嵌入式Kafka主题。依赖关系如下所示:

<dependency>
            <groupId>net.mguenther.kafka</groupId>
            <artifactId>kafka-junit</artifactId>
            <version>2.8.0</version>
            <scope>test</scope>
        </dependency>

您需要根据您的Flink版本将版本设置为适当的版本。
请注意,你没有提到你的Flink版本(这总是一个很好的信息,包括在一个问题),但FlinkKafkaProducer已经被弃用了一段时间。你应该使用KafkaSink。

相关问题