我正在尝试读取一个包含34个字段的文件,以便使用netbeans在控制台上打印。但是,我只能打印模式。因为csvreader使用的flink的这个特定版本缺少打印选项。
请看代码,并帮助我了解我应该纠正。我会用 CSVReader
,但事实证明它不支持超过22个字段,因此只能使用表api。也尝试过使用 CsvTableSource
1.5.1版本的flink,但语法不好。作为 .field("%CPU", Types.FLOAT())
不断给出错误的类型浮点未识别符号。我的主要目标是只能够读取csv文件,然后发送到一个Kafka主题,但在此之前,我想检查文件是否读取,还没有运气。
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;
public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
new TypeInformation<?>[] {
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT()
});// lenient
tEnv.registerTableSource("container", csvTableSource);
Table result = tEnv
.scan("container");
System.out.println(result);
result.printSchema();
}
}
/*tEnv.toAppendStream(result, Row.class).print();
result.writeToSink(null);print();
env.execute();*/
这是输出
root
|-- %CPU: Float
|-- MEM: Float
|-- VSZ: Float
|-- RSS: Float
|-- timestamp: Float
|-- OOM_Score: Float
|-- io_read_count: Float
|-- io_write_count: Float
|-- io_read_bytes: Float
|-- io_write_bytes: Float
|-- io_read_chars: Float
|-- io_write_chars: Float
|-- num_fds: Float
|-- num_ctx_switches_voluntary: Float
|-- num_ctx_switches_involuntary: Float
|-- mem_rss: Float
|-- mem_vms: Float
|-- mem_shared: Float
|-- mem_text: Float
|-- mem_lib: Float
|-- mem_data: Float
|-- mem_dirty: Float
|-- mem_uss: Float
|-- mem_pss: Float
|-- mem_swap: Float
|-- num_threads: Float
|-- cpu_time_user: Float
|-- cpu_time_system: Float
|-- cpu_time_children_user: Float
|-- cpu_time_children_system: Float
|-- container_nr_sleeping: Float
|-- container_nr_running: Float
|-- container_nr_stopped: Float
|-- container_nr_uninterruptible: Float
|-- container_nr_iowait: Float
这是另一个版本的代码,也不起作用
package wikiedits;
import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("home/merlin/Experiments/input_container/container_data1.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
// env.execute();
}
}
如果我必须将它传递到一个kafka主题,然后传递到一个函数调用中,是否需要新编辑?这就是我所尝试的:
DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
SendToRestAPI sendrest= new SendToRestAPI(value);
String String1= sendrest.converttoJson();
return "Stream Value: " + String1;
}
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/
env.execute();
}
}
stream.map行抛出错误:
Cannot find symbol:method Map
2条答案
按热度按时间9ceoxa921#
此代码:
对于这些库(有些可能是多余的):
至少正在运行。我不确定它是否提供了您需要的输出,但它几乎完全是您在最新编辑中输入的内容,但它是在ide中工作的。有帮助吗?
如果分隔符仍然是空格,请记住更改
.fieldDelimiter(",")
mftmpeh82#
你必须转换
Table
变成一个DataStream
打印出来。最简单的方法是将其转换为DataStream<Row>
具体如下:有关更多详细信息,请参阅文档。