如何修复在{serverip}:9042打开到cassandra的本机连接失败

cetgtptt  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(446)

我试图连接Spark和Cassandra使用SparkCassandra连接器。连接已经建立,但是当我试图在javardd上执行操作时,我遇到了一个问题。

java.io.IOException: Failed to open native connection to Cassandra at {10.0.21.92}:9042

下面是我尝试实现的配置和代码:

SparkConf sparkConf = new SparkConf().setAppName("Data Transformation").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]");
    sparkConf.set("spark.cassandra.connection.host", server ip);
    sparkConf.set("spark.cassandra.connection.port", "9042");
    sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
    sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
    sparkConf.set("spark.cassandra.auth.username", user_name);
    sparkConf.set("spark.cassandra.auth.password", password);

    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

下面是我在javardd上执行操作的代码:

CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keySpaceName, tableName);
    JavaRDD<GenericTriggerEntity> rdd = cassandraRDD.map(new Function<CassandraRow, GenericTriggerEntity>() {

    private static final long serialVersionUID = -165799649937652815L; 

    @Override
    public GenericTriggerEntity call(CassandraRow row) throws Exception {
    GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
    if(row.getString("end") != null)                        genericTriggerEntity.setEnd(row.getString("end"));
    if(row.getString("key") != null)
    genericTriggerEntity.setKey(row.getString("key"));
    genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
    genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
    genericTriggerEntity.setRowdeleted(row.getString("rowDeleted"));
    genericTriggerEntity.setRows(row.getString("rows"));
    genericTriggerEntity.setStart(row.getString("start"));
    genericTriggerEntity.setTablename("tablename");
    genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
    genericTriggerEntity.setTriggertime(row.getString("triggertime"));
    genericTriggerEntity.setUuid(row.getUUID("uuid"));
    return genericTriggerEntity;
    }               

    });

下面是我正在执行的javardd操作

JavaRDD<String> jsonDataRDDwords = rdd.flatMap(s -> Arrays.asList(SPACE.split((CharSequence) s)));
    JavaPairRDD<String, Integer> jsonDataRDDones = jsonDataRDDwords.mapToPair(s -> new Tuple2<>(s, 1));
    JavaPairRDD<String, Integer> jsonDataRDDcounts = jsonDataRDDones.reduceByKey((i1, i2) -> i1 + i2);
    List<Tuple2<String, Integer>> jsonDatRDDoutput = jsonDataRDDcounts.collect();

我甚至试过telnet到cassandra服务器,端口是开放的。
我能够建立连接,但是在执行reducebykey时,出现了上述异常。
我搞不清楚是什么问题。javardd操作有问题。任何帮助都将不胜感激。提前谢谢。

e5nszbig

e5nszbig1#

您可以使用socat命令将本地端口转发到远程cassandra端口:

apt-get install socat
socat tcp-listen:9042,fork tcp:10.0.21.92:9042 &
zengzsys

zengzsys2#

上述错误是由于cassandra驱动器内核的某些依赖性问题造成的。通过在pom.xml中添加度量依赖关系解决了这个问题

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>

相关问题