我对春靴和Cassandra还不熟悉。我试图通过flink cassandra连接器将数据保存到cassandra表中。
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--Flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<!--Flink Cassandra-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.10</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
日期.java
package com.example.demo;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import org.springframework.cassandra.core.PrimaryKeyType;
import org.springframework.data.cassandra.mapping.PrimaryKeyColumn;
@Table(keyspace = "testing" ,name = "dates")
public class Date {
public Date(int patientid, long date, long timestamp) {
super();
this.patientid = patientid;
this.date = date;
this.timestamp = timestamp;
}
@PrimaryKeyColumn(name = "patientid", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
private int patientid;
@PrimaryKeyColumn(name = "date", ordinal = 1, type = PrimaryKeyType.CLUSTERED)
private long date;
@Column(name = "timestamp")
private long timestamp;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public int getPatientid() {
return patientid;
}
public void setPatientid(int patientid) {
this.patientid = patientid;
}
public long getDate() {
return date;
}
public void setDate(long date) {
this.date = date;
}
}
测试.java
package com.example.demo;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Test implements CommandLineRunner {
private final static Collection<Date> collection = new ArrayList<>(2);
static {
for (int i = 1; i <= 2; ++i) {
collection.add(new Date(i, i, i));
}
}
@Override
public void run(String... strings) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Date> fromCollection = env.fromCollection(collection);
try {
CassandraSink
.addSink(fromCollection)
.setHost("192.168.1.20")
.build();
env.execute();
} catch (Exception e) {
}
}
}
完整堆栈跟踪
java.lang.NoSuchMethodError: com.datastax.driver.core.BoundStatement.set(ILjava/lang/Object;Lorg/apache/flink/cassandra/shaded/com/google/common/reflect/TypeToken;)Lcom/datastax/driver/core/BoundStatement;
at com.datastax.driver.mapping.Mapper.setObject(Mapper.java:230)
at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:206)
at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
at com.datastax.driver.mapping.Mapper.saveAsync(Mapper.java:271)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:65)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:75)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
我找不到合适的解决办法。请帮帮我。提前谢谢。
2条答案
按热度按时间vql8enpb1#
java.lang.nosuchmethoderror:com.datastax.driver.core.boundstatement.set(iljava/lang/object;lorg/apache/flink/cassandra/shaded/com/google/common/reflect/typetoken;)lcom/数据税/驱动程序/核心/边界声明;在com.datastax.driver.mapping.mapper.setobject(mapper。java:230)
上面的方法表明,没有方法
setObject
在Mapper
上课时间com.datastax.driver.mapping
包裹。看了jar版本的
cassandra-driver-mapping
在这里,您很可能下载了旧版本的驱动程序flink
pom中指定的依赖项。你能确认你是否有旧版本的吗
com.datastax.driver.mapping.Mapper
类在类路径中?希望这有帮助,祝你好运!
0lvr5msh2#
在我的例子中,我有一个复合分区键:主键(metric\u id,data\u type)
通过向我的对象的coresponding属性添加@partitionkey注解,所有操作都很好:
以下是pom.xml中的依赖项:
链接:https://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/
希望有帮助!
当做,
阿里