我想将我的rdd持久化到mysql数据库表中。我使用了一个map函数来迭代rdd,并将每个元组传递到我的函数中,在那里我进行持久化。这里我想把我的工作并行化为主节点和从节点。
但是它不能正常工作,也不能调用使数据库持久化的函数。
如果我使用collect()例如 courseSet.collect().map(m => sendCourseInfo(m))
而不是 courseSet.map(m => sendCourseInfo(m))
这很管用。
我不想在这里使用collect()。
我在很多文章中都找过这个,但都找不出来。有谁能帮我解决这个问题吗。
下面是我的密码,
.....
x.toString().split(",")(1),
x.toString().split(",")(2),
x.toString().split(",")(3)))
courseSet.map(m => sendCourseInfo(m))
}
def sendCourseInfo(courseData: (Int, String, String, String)): Unit = {
try {
DatabaseUtil.setJDBCConfiguration()
val jdbcConnection: java.sql.Connection = DatabaseUtil.getConnection
val statement = "{call insert_course (?,?,?,?)}"
val callableStatement = jdbcConnection.prepareCall(statement)
callableStatement.setInt(1, courseData._1)
callableStatement.setString(2, courseData._2)
callableStatement.setString(3, courseData._3)
callableStatement.setString(4, courseData._4)
callableStatement.executeUpdate
} catch {
case e: SQLException => println(e.getStackTrace)
}
}
1条答案
按热度按时间w41d8nur1#
您正在rdd上调用map(),这是一个转换,而不是一个操作。所以,为了执行它,你需要调用一些动作,比如,
对你所做的额外建议,
无论什么
x
就是,你把它转换成字符串,拆分它,然后从这个拆分中提取一些东西。对于rdd/集合中的每个元素,您将执行三次此操作。所以,你可以用这样的方法来优化它,接下来您必须将这些数据持久化到db中,在本例中是mysql。为此,您使用java常用的jdbc连接,为每个元素创建一个新的连接和操作。相反,使用spark 2.x做一些类似的事情,
让我知道如果这有帮助,干杯。