如何处理spark中的cassandra“duration”数据类型?

b91juud3  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(453)

我在使用datastacks/spark cassandra连接器将cassandra(锡拉)数据加载到apache spark时遇到问题:

scala> val rdd = sc.cassandraTable[(String)](keyspace, table).select("url").limit(10).collect()
java.util.NoSuchElementException: key not found: duration
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at com.datastax.spark.connector.types.ColumnType$$anonfun$1.applyOrElse(ColumnType.scala:117)

我用scala和python尝试了不同的spark和spark-cassandra连接器版本,但没有成功。我猜是因为datastacks/spark cassandra connector中缺少duration数据类型支持,所以我检查了columntype.scala,它们只有:

private[connector] val primitiveTypeMap = Map[DataType, ColumnType[_]](
    DataType.text() -> TextType,
    DataType.ascii() -> AsciiType,
    DataType.varchar() -> VarCharType,
    DataType.cint() -> IntType,
    DataType.bigint() -> BigIntType,
    DataType.smallint() -> SmallIntType,
    DataType.tinyint() -> TinyIntType,
    DataType.cfloat() -> FloatType,
    DataType.cdouble() -> DoubleType,
    DataType.cboolean() -> BooleanType,
    DataType.varint() -> VarIntType,
    DataType.decimal() -> DecimalType,
    DataType.timestamp() -> TimestampType,
    DataType.inet() -> InetType,
    DataType.uuid() -> UUIDType,
    DataType.timeuuid() -> TimeUUIDType,
    DataType.blob() -> BlobType,
    DataType.counter() -> CounterType,
    DataType.date() -> DateType,
    DataType.time() -> TimeType
  )

有人知道在这种情况下如何处理cassandra的duration数据类型吗?

83qze16e

83qze16e1#

spark支持的类型定义不包含duration,但它们声明
您可以将符合cql标准的字符串转换为数字、日期、地址或uuid。
而且,java驱动程序已经有了类duration,构造函数将字符串值强制转换为类duration duration . 考虑到这一点,您应该能够创建一个自定义的Spark连接器类似于这里的例子。
我不是spark用户,建议只是基于文档。

相关问题