不支持给定数据流的类型:generictype< org.apache.flink.types.row>flink cassandra

2ic8powd  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(245)

我想给Cassandra写一行。首先,我将avro流转换为行流。编译时没有显示错误。请参阅下面的代码:(Kafka消费者和Cassandra辛在其他工作中都可以单独工作)

StreamExecutionEnvironment environment =  StreamExecutionEnvironment.getExecutionEnvironment();

// Initialize KafkaConsumer
FlinkKafkaConsumer010 kafkaConsumer = KafkaConnection.getKafkaConsumer(AvroSchemaClass.class, inTopic, schemaRegistryUrl, properties);

// Set KafkaConsumer as source
DataStream<AvroSchemaClass> avroInputStream = environment.addSource(kafkaConsumer);

// converting avro message to flink's row datatype.
// see https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.html
AvroRowDeserializationSchema avroToRow = new AvroRowDeserializationSchema(AvroSchemaClass.class);
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
                @Override
                public Row map(AvroSchemaClass orders_value) throws Exception {
                    return avroToRow.deserialize(orders_value.toByteBuffer().array());
                }
            });

// Example transformation
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()));

CassandraSink streamSink = CassandraConnection.getSink(rowOutputStream,
                    cassandraURL,
                    cassandraPort,
                    cassandraCluster,
                    cassandraUser,
                    cassandraPass,
                    insertQuery);
streamSink.name("Write something to Cassandra");

environment.execute();

但当我在flink中运行作业时,会出现以下错误:

java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType<org.apache.flink.types.Row>
        at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:255)
        at servingLayer.CassandraConnection.getSink(CassandraConnection.java:24)
        at speedLayer.KafkaToCassandra.main(KafkaToCassandra.java:84)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
java.lang.NullPointerException

解决方案是对数据流类型进行特定更改吗?如果是,如何实施?如果您需要更多信息,请告诉我。

arknldoa

arknldoa1#

好像 CassandraSink 应该支持 Row 开箱即用。问题是 RowTypeInfo 属于 rowOutputStream 不知怎的丢失了,它使用了后备策略 GenericType (这是无效序列化的kryo)。 AvroRowDeserializationSchema 正在正确返回类型信息,但datastream api未自动获取该信息。
因此,如果一切正常,那么修复程序将显式设置 rowIn/OutputStream 如下

rowInputStream = rowInputStream.returns(avroToRow.getProducedType())
...
rowOutputStream = rowOutputStream.returns(avroToRow.getProducedType())

一般来说,如果只使用一个api,会更容易。在这种情况下,我建议完全使用表api。

相关问题