flink从cassandra获取数据用作数据集的问题

vaqhlq81  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(363)

我试图从cassandra表中提取数据用作数据集,但遇到了两个问题。
第一个是cassandrainputformat只返回一个元组,我不希望有一个元组12,而只是使用pojo来定义它将要返回的内容。所以我不知道这是否只是我必须接受的东西,是否有一种方法可以像cassandraconnector那样使用pojo(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html),或者使用cassandrainputformat不是获取数据的最佳方式。
另一个问题是,即使我从cassandrainputformat获取数据(不管是不是tuple),我也不知道如何将其设置为数据源。对于文件、csv和hdfs有很多方法(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/executionenvironment.html#executionenvironment--)但没有一个是明确的Cassandra。所以我猜我需要使用cassandrainputformat提取数据,并使用类似.fromements()或.fromcollecton()的方法,以及正确的方法。
谢谢你的帮助!
更新:
这“管用”(感谢chesnay schepler的帮助):

DataSet<Tuple2<String, String>> testSet = 
exEnv.createInput(cassandraInputFormat, TypeInformation.of(newTypeHint<Tuple2<String, String>>(){}));

但是这个错误正在发生。。。

Exception in thread "main" org.apache.flink.optimizer.CompilerException: 
Error translating node 'Data Source "at execute(CodeBatchProcessorImpl.java:85) 
(org.apache.flink.batch.connectors.cassandra.CassandraInputFormat)" : NONE 
[[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]':
Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: flink.streaming.code.CodeBatchProcessorImpl

再往下包括: Caused by: java.io.NotSerializableException: org.apache.flink.api.java.LocalEnvironment 更新2:
必须将环境设置为 transient 。现在修好了!

798qvoo8

798qvoo81#

通过调用executionenvironment#createinput(inputformat),您可以使用cassandrainputformat和所有inputformat。
目前没有将元素作为pojo直接读取的选项。最简单的解决方法是在sink之后添加一个mapfunction,将元组转换为所需的pojo。

相关问题