我的目标是将字符串“success”发送到套接字,让apache flink的datastream提取该字符串并用单词“success”更新本地文本文件我已经设置了一个“consumer datastream”,它监视从终端发送到本地主机上9999端口的文本。请参阅下面的代码,以获取有效的使用者数据流代码:
package p1;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class TestClientStreaming {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return value.equals("SUCCESS");
}
});
filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);
env.execute("Reading Flink Stream");
}
}
这对我有用;当我在终端上运行“nc-l 9999”时,只有在终端中键入“success”并按回车键时,在文件路径中找到的“consumer text file”才会更新。到目前为止棒极了!
现在,我想避免使用终端,但仍然要向这个套接字写入文本,这样我的“消费者数据流”就可以获取它。这个文本可以是任何文本,因为我的“消费者数据流”会过滤掉它想要的文本,也就是“成功”。
我知道的只有两种方法可以写入套接字,一种是使用终端(如前面所讨论的),另一种是使用producer datastream,它不断地从“producer text file”读取数据,并调用datastream.writetosocket()将读取的文本写入套接字。然后,我的“消费者数据流”将接收到这一点,并按预期的方式运行。
有没有其他方法可以将文本写入socket以便数据流可以提取该文本?我试着用socket库写“success”给localhost:9999 but 无济于事。
此图像可能有助于形象化我要解决的问题,以及我已经解决的问题:https://ibb.co/41gznwm
谢谢你的时间!
1条答案
按热度按时间gg0vcinb1#
你需要创建一个
nc -l
又名套接字服务器。示例代码为: