我想给Cassandra写一行。首先,我将avro流转换为行流。编译时没有显示错误。请参阅下面的代码:(Kafka消费者和Cassandra辛在其他工作中都可以单独工作)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize KafkaConsumer
FlinkKafkaConsumer010 kafkaConsumer = KafkaConnection.getKafkaConsumer(AvroSchemaClass.class, inTopic, schemaRegistryUrl, properties);
// Set KafkaConsumer as source
DataStream<AvroSchemaClass> avroInputStream = environment.addSource(kafkaConsumer);
// converting avro message to flink's row datatype.
// see https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.html
AvroRowDeserializationSchema avroToRow = new AvroRowDeserializationSchema(AvroSchemaClass.class);
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
@Override
public Row map(AvroSchemaClass orders_value) throws Exception {
return avroToRow.deserialize(orders_value.toByteBuffer().array());
}
});
// Example transformation
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()));
CassandraSink streamSink = CassandraConnection.getSink(rowOutputStream,
cassandraURL,
cassandraPort,
cassandraCluster,
cassandraUser,
cassandraPass,
insertQuery);
streamSink.name("Write something to Cassandra");
environment.execute();
但当我在flink中运行作业时,会出现以下错误:
java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType<org.apache.flink.types.Row>
at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:255)
at servingLayer.CassandraConnection.getSink(CassandraConnection.java:24)
at speedLayer.KafkaToCassandra.main(KafkaToCassandra.java:84)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
java.lang.NullPointerException
解决方案是对数据流类型进行特定更改吗?如果是,如何实施?如果您需要更多信息,请告诉我。
1条答案
按热度按时间arknldoa1#
好像
CassandraSink
应该支持Row
开箱即用。问题是RowTypeInfo
属于rowOutputStream
不知怎的丢失了,它使用了后备策略GenericType
(这是无效序列化的kryo)。AvroRowDeserializationSchema
正在正确返回类型信息,但datastream api未自动获取该信息。因此,如果一切正常,那么修复程序将显式设置
rowIn/OutputStream
如下一般来说,如果只使用一个api,会更容易。在这种情况下,我建议完全使用表api。