我正在尝试使用本地spark集群的cdatajdbc驱动程序将数据插入azuredatabricks。我第一次尝试直接从这样的Dataframe
df.write
.format("jdbc")
.option("driver", DatabricksUtil.driver)
.option("url", url)
.option("dbtable", s"$schemaName.$tableName")
.option("createTableColumnTypes", JdbcUtils.schemaString(df, url, None))
.mode(SaveMode.Overwrite)
.save()
甚至这个简单的查询
connection.createStatement.execute("CREATE TABLE `default`.`state2` (State Text, Abbreviation Text, Code Int)")
在这两种情况下,我都遇到了这个错误
Exception in thread "main" XcoreXsparksqlX190X7354.ezf: Error running query: org.apache.spark.sql.catalyst.parser.ParseException:
DataType text is not supported.(line 1, pos 40)
== SQL ==
CREATE TABLE `default`.`state2`(`State` Text, `Abbreviation` Text, `Code` Int)
----------------------------------------^^^
at XcoreXsparksqlX190X7354.vjf.a(Unknown Source)
at XcoreXsparksqlX190X7354.nud.execute(Unknown Source)
at c3.spark.util.DatabricksApp$.main(DatabricksUtil.scala:208)
at c3.spark.util.DatabricksApp.main(DatabricksUtil.scala)
现在,如果我尝试使用sql语句或准备好的语句手动插入一些行,如下所示:
connection.createStatement.execute("DROP TABLE IF EXISTS `default`.`state2`")
connection.createStatement.execute("CREATE TABLE `default`.`state2` (State String, Abbreviation String, Code Int)")
// insert prepared statement
val preparedStatement = connection.prepareStatement("INSERT INTO `default`.`state2` (State,Abbreviation,Code) VALUES (?,?,?)")
preparedStatement.setString(1, "Alabama")
preparedStatement.setString(2, "AL")
preparedStatement.setInt(3, 1)
preparedStatement.addBatch()
preparedStatement.setString(1, "Alaska")
preparedStatement.setString(2, "AK")
preparedStatement.setInt(3, 2)
preparedStatement.addBatch()
println("Inserted rows: " + preparedStatement.executeBatch().mkString(","))
// insert statement
connection.createStatement().execute("INSERT INTO `default`.`state2` (State,Abbreviation,Code) VALUES (Arizona,AZ,3), (Arkansas,AR,4)")
// read table
val result = executeQuery("SELECT * FROM `default`.`state2`", connection.createStatement())
val rows = Iterator.continually(result).takeWhile(_.next()).map{rs =>
val row = Seq("String", "String", "Int").zipWithIndex.map{case (coltype, colindex) =>
coltype match {
case "String" => rs.getString(colindex + 1)
case "Int" => rs.getInt(colindex + 1)
}
}
row.mkString(",")
}.toList
println(rows)
我看到我们插入了一些行,但没有任何数据(字符串为null,整数为0)
Inserted rows: 1,1
List(null,null,0, null,null,0, null,null,0, null,null,0)
什么是正确的方式插入到数据库中???
暂无答案!
目前还没有任何答案,快来回答吧!