如何以编程方式将文本写入flink套接字?

x0fgdtte  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(373)

我的目标是将字符串“success”发送到套接字,让apache flink的datastream提取该字符串并用单词“success”更新本地文本文件我已经设置了一个“consumer datastream”,它监视从终端发送到本地主机上9999端口的文本。请参阅下面的代码,以获取有效的使用者数据流代码:

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return value.equals("SUCCESS");
            }
        });

        filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);

        env.execute("Reading Flink Stream");
    }
}

这对我有用;当我在终端上运行“nc-l 9999”时,只有在终端中键入“success”并按回车键时,在文件路径中找到的“consumer text file”才会更新。到目前为止棒极了!
现在,我想避免使用终端,但仍然要向这个套接字写入文本,这样我的“消费者数据流”就可以获取它。这个文本可以是任何文本,因为我的“消费者数据流”会过滤掉它想要的文本,也就是“成功”。
我知道的只有两种方法可以写入套接字,一种是使用终端(如前面所讨论的),另一种是使用producer datastream,它不断地从“producer text file”读取数据,并调用datastream.writetosocket()将读取的文本写入套接字。然后,我的“消费者数据流”将接收到这一点,并按预期的方式运行。
有没有其他方法可以将文本写入socket以便数据流可以提取该文本?我试着用socket库写“success”给localhost:9999 but 无济于事。
此图像可能有助于形象化我要解决的问题,以及我已经解决的问题:https://ibb.co/41gznwm
谢谢你的时间!

gg0vcinb

gg0vcinb1#

你需要创建一个 nc -l 又名套接字服务器。
示例代码为:

public class SocketWriter {

    public static void main(String[] args) {
        int port = 9999;
        boolean stop = false;
        ServerSocket serverSocket = null;

        try {

            serverSocket = new ServerSocket(port);

            while (!stop) {
                Socket echoSocket = serverSocket.accept();
                PrintWriter out =
                        new PrintWriter(echoSocket.getOutputStream(), true);
                BufferedReader in =
                        new BufferedReader(
                                new InputStreamReader(echoSocket.getInputStream()));

                out.println("Hello Amazing");
                // do whatever logic you want.

                in.close();
                out.close();
                echoSocket.close();

            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try {
                serverSocket.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

相关问题