我在使用datastacks/spark cassandra连接器将cassandra(锡拉)数据加载到apache spark时遇到问题:
scala> val rdd = sc.cassandraTable[(String)](keyspace, table).select("url").limit(10).collect()
java.util.NoSuchElementException: key not found: duration
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at com.datastax.spark.connector.types.ColumnType$$anonfun$1.applyOrElse(ColumnType.scala:117)
我用scala和python尝试了不同的spark和spark-cassandra连接器版本,但没有成功。我猜是因为datastacks/spark cassandra connector中缺少duration数据类型支持,所以我检查了columntype.scala,它们只有:
private[connector] val primitiveTypeMap = Map[DataType, ColumnType[_]](
DataType.text() -> TextType,
DataType.ascii() -> AsciiType,
DataType.varchar() -> VarCharType,
DataType.cint() -> IntType,
DataType.bigint() -> BigIntType,
DataType.smallint() -> SmallIntType,
DataType.tinyint() -> TinyIntType,
DataType.cfloat() -> FloatType,
DataType.cdouble() -> DoubleType,
DataType.cboolean() -> BooleanType,
DataType.varint() -> VarIntType,
DataType.decimal() -> DecimalType,
DataType.timestamp() -> TimestampType,
DataType.inet() -> InetType,
DataType.uuid() -> UUIDType,
DataType.timeuuid() -> TimeUUIDType,
DataType.blob() -> BlobType,
DataType.counter() -> CounterType,
DataType.date() -> DateType,
DataType.time() -> TimeType
)
有人知道在这种情况下如何处理cassandra的duration数据类型吗?
1条答案
按热度按时间83qze16e1#
spark支持的类型定义不包含duration,但它们声明
您可以将符合cql标准的字符串转换为数字、日期、地址或uuid。
而且,java驱动程序已经有了类duration,构造函数将字符串值强制转换为类duration
duration
. 考虑到这一点,您应该能够创建一个自定义的Spark连接器类似于这里的例子。我不是spark用户,建议只是基于文档。