我尝试使用flink的table和sqlapi作为一个简单的示例,在这个示例中,我从文件中读取字符串,将其转换为tuple2并尝试将其插入表中。这是我的密码。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class table_streaming_test
{
public static void main (String[] args) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
DataStream<Tuple2<String,Integer>> ds= datastream_in
.flatMap(new Splitter()); // transformation flatmap
Table msg=tEnv.fromDataStream(ds).as("symbol,price");
Table result = msg.select("symbol ='A'");
DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
ds2.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] token= sentence.split(",");
out.collect(new Tuple2<String, Integer>(token[0],Integer.parseInt(token[1])));
}
}
}
错误如下:(发生在第行) DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
)
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.table.Table.<init>(Lorg/apache/flink/api/table/TableEnvironment;Lorg/apache/flink/api/table/plan/logical/LogicalNode;)V
at org.apache.flink.api.table.StreamTableEnvironment.ingest(StreamTableEnvironment.scala:97)
at org.apache.flink.api.java.table.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:61)
at table_streaming_test.main(table_streaming_test.java:87)
包含的jar文件包括:
flink-dist_2.10-1.1.3.jar文件
flink-python_2.10-1.1.3.jar文件
flink表格2.10-1.1.3.jar
日志4j-1.2.17.jar
slf4j-log4j12-1.7.7.jar
javase-1.7版本
javase-1.7版本
2条答案
按热度按时间bf1o4zei1#
出现错误“java.lang.nosuchmethoderror”的一个可能原因是使用的flink版本与系统上安装的不同。
对我来说,我有Flink1.4.2,我使用的版本是1.3.2。所以我更新了我的pom文件,使其具有相同的版本,并且运行良好。
jecbmhm32#
我认为你需要使用阴影插件,并按照这个步骤来解决这个问题
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html#resolving-依赖关系与使用maven shade插件的flink冲突