我的目标是将apachespark中作为rdd加载的一些文档加载到neo4j图中。问题似乎出在创建这样的节点上,因为eclipse报告任务不可序列化 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
我的代码如下:
public class Spark {
public static void createRdd() {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/TheFoodPlanner.join")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaMongoRDD<Document> rddRecipes = MongoSpark.load(jsc);
Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "estrella100"));
try (Session session = driver.session()) {
rddRecipes.foreach(f -> {
String id = f.getString("id");
session.run("CREATE (n: Recipe {id:'" + id + "'})");
});
session.close();
}
driver.close();
jsc.close();
}
}
错误出现在行中 session.run("CREATE (n: Recipe {id:'" + id + "'})");
以并行方式加载节点的正确方法是什么?
1条答案
按热度按时间btqmn9zl1#
问题是,您将来自mongo的基于spark的数据连接器与neo4j的“传统”java连接器混为一谈。在这种情况下,通常的问题是,如果没有额外的代码来处理在多个执行器上运行的代码,那么这些驱动程序就不能优化为与spark一起使用—这里的一个常见问题是
Driver
仅在驱动程序节点上打开连接,而不在执行器上打开连接。对于您的情况,您有两种可能:
使用neo4j现有的spark连接器-您需要将mongo中的数据转换为neo4j连接器可以使用的格式-它可以比“手动”解决方案更优化,而且更易于使用
继续使用
foreach
,但您需要更改方法-驱动程序的初始化应该在每个执行器上进行-最简单的方法是使用类似的方法(伪代码,未测试)-这将使每个分区由持有该分区的执行器处理-在这种情况下,每个执行器都有自己到neo4j的连接: