spark nlp:远程rpc客户端在dataframe.show()上解除关联

ymzxtsji  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(278)

环境:
spark nlp版本:2.4.5
apache nlp版本:2.5.0
java版本(java-version):1.8
用于在客户端模式下运行spark作业的windows pc
远程服务器上的spark群集(独立)(8核,16 gb ram):
1个主dockerized(docker图片:bitnami)/spark:latest)
1个工人停靠,内存限制为16gb(docker图像:bitnami)/spark:latest)
1个hdfs namenode和1个hdfs datanode对接(docker image:bde2020/hadoop)-namenode:2.0.0-hadoop3.2.1-java8 and bde2020/哈杜pdatanode:2.0.0-hadoop3.2.1-java8)
我必须使用约翰斯诺拉伯斯的spark nlp预训练管道 recognize_entities_dl 从Dataframe中的字符串字段中提取实体。管道已正确下载到hdfs并正常工作,但是当我尝试运行以下脚本时,无法显示feed\u tags dataframe。。

import com.johnsnowlabs.nlp.pretrained.PretrainedPipeline
import com.johnsnowlabs.util.{ConfigLoader, PipelineModels}
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.ml.PipelineModel
import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.Schemas.feedSchema

object tags_extraction_eng {

System.setProperty("HADOOP_USER_NAME", "root")

val (spark, sc) = start_spark_session()

val fs = FileSystem.get(sc.hadoopConfiguration)
println("HDFS_IMPL:", sc.hadoopConfiguration.get("fs.hdfs.impl"))  //null
println("DEFAULT_FS:", sc.hadoopConfiguration.get("fs.defaultFS"))  // hdfs://remote_ip:remote_port
println("FS_URI:", fs.getUri.toString)  // hdfs://remote_ip:remote_port
println("FS_SCHEME:", fs.getScheme)  // hdfs

def start_spark_session(): (SparkSession, SparkContext) = {
  val spark = SparkSession.builder()
    .appName("feeds_tags_extraction_nlp_eng")
    .master("spark://remote_ip:7077")
    .config("spark.driver.host", "host_ip")

    .config("spark.submit.deployMode", "client")
    .config("spark.cores.max", "8")
    .config("spark.executor.memory", "8g")

    .config("spark.ui.showConsoleProgress", "true")
    .config("spark.sql.tungsten.enabled", "true")
    .config("spark.sql.execution.arrow.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.worker.cleanup.enabled", "true")

    .config("spark.shuffle.service.enabled", "true")
    .config("spark.dynamicAllocation.enabled", "true")

    .config("spark.jars", "hdfs://remote_ip:remote_port/sparkscala.jar," +
      "https://repo1.maven.org/maven2/com/couchbase/client/java-client/2.7.6/java-client-2.7.6.jar," +
      "https://repo1.maven.org/maven2/com/couchbase/client/core-io/1.7.6/core-io-1.7.6.jar," +
      "https://repo1.maven.org/maven2/io/reactivex/rxjava/1.3.8/rxjava-1.3.8.jar," +
      "https://repo1.maven.org/maven2/io/reactivex/rxscala_2.11/0.26.5/rxscala_2.11-0.26.5.jar," +
      "https://repo1.maven.org/maven2/io/opentracing/opentracing-api/0.31.0/opentracing-api-0.31.0.jar," +
      "https://repo1.maven.org/maven2/com/couchbase/client/spark-connector_2.11/2.3.0/spark-connector_2.11-2.3.0.jar," +
      "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/spark-nlp-assembly-2.5.0.jar," +
    )

  spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "hdfs://remote_ip:remote_port")
  spark.sparkContext.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
  spark.sparkContext.hadoopConfiguration.set("dfs.datanode.use.datanode.hostname", "true")
  spark.sparkContext.hadoopConfiguration.set("dfs.replication", "1")
  spark.sparkContext.hadoopConfiguration.set("hadoop.tmp.dir", "/tmp")

  val sc = spark.sparkContext
  sc.getConf.getAll.foreach(println)

  (spark, sc)
}

def main(args: Array[String]) {
    val test_df = spark.createDataFrame(Seq(
    (1, "I would like to come over and see you in 01/02/2019."),
    (2, "Donald John Trump (born June 14, 1946) is the 45th and current president of the United States")))
.toDF("id", "text")

  val df = spark.read.couchbase(schema = feedSchema, options = Map("bucket" -> "feeds"))
  val feeds_df = prepare_dataframe(df)

  println(test_df ) 
  println(feeds_df)
  feeds_df.show() //it works
  println(test_df .getClass.toString)
  println(feeds_df.getClass.toString)

 val pipeline = PipelineModel.load("cache_pretrained/recognize_entities_dl_en_2.4.3_2.4_1584626752821")

  pipeline.transform(test_df ).show(truncate = false)  //show  DF

  val feed_tags=pipeline.transform(feeds_df)

  feeds_tags.printSchema() 
  println(feeds_tags)
  println(feeds_tags.getClass.toString)
  println(SizeEstimator.estimate(feeds_tags))
  println(feed_tags.count())   //it works

  feed_tags.show()  //Error
}

}

奇怪的是,test_df被管道和 test_df.show() 是成功的(如果它没有被缓存),而feed\u标记被修改,但是 feed_tags.show() 产生以下错误:

[ INFO] Starting job: show at tags_extraction_eng.scala:222
[ INFO] Got job 21 (show at tags_extraction_eng.scala:222) with 1 output partitions
[ INFO] Final stage: ResultStage 22 (show at tags_extraction_eng.scala:222)
[ INFO] Parents of final stage: List()
[ INFO] Missing parents: List()
[ INFO] Submitting ResultStage 22 (MapPartitionsRDD[76] at show at tags_extraction_eng.scala:222), which has no missing parents
[ INFO] Block broadcast_40 stored as values in memory (estimated size 91.8 KB, free 869.6 MB)
[ INFO] Block broadcast_40_piece0 stored as bytes in memory (estimated size 30.4 KB, free 869.6 MB)
[ INFO] Added broadcast_40_piece0 in memory on 192.168.224.54:57868 (size: 30.4 KB, free: 887.0 MB)
[ INFO] Created broadcast 40 from broadcast at DAGScheduler.scala:1163
[ INFO] Submitting 1 missing tasks from ResultStage 22 (MapPartitionsRDD[76] at show at tags_extraction_eng.scala:222) (first 15 tasks are for partitions Vector(0))
[ INFO] Adding task set 22.0 with 1 tasks
[ INFO] Starting task 0.0 in stage 22.0 (TID 42, 10.128.16.2, executor 1, partition 0, ANY, 7870 bytes)
[ INFO] Added broadcast_40_piece0 in memory on 10.128.16.2:46747 (size: 30.4 KB, free: 2004.5 MB)
[Stage 22:>                                                         (0 + 1) / 1][ INFO] Added broadcast_21_piece0 in memory on 10.128.16.2:46747 (size: 369.0 B, free: 2004.5 MB)
[ INFO] Added broadcast_35_piece3 in memory on 10.128.16.2:46747 (size: 2.6 MB, free: 2001.8 MB)
[ INFO] Added broadcast_35_piece2 in memory on 10.128.16.2:46747 (size: 4.0 MB, free: 1997.8 MB)
[ INFO] Added broadcast_35_piece0 in memory on 10.128.16.2:46747 (size: 4.0 MB, free: 1993.8 MB)
[ INFO] Added broadcast_35_piece1 in memory on 10.128.16.2:46747 (size: 4.0 MB, free: 1989.8 MB)
[ERROR] Lost executor 1 on 10.128.16.2: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[ WARN] Lost task 0.0 in stage 22.0 (TID 42, 10.128.16.2, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[ INFO] Executor updated: app-20200531114104-0001/1 is now EXITED (Command exited with code 134)
[ INFO] Existing executor 1 has been removed (new total is 0)
[ INFO] Executor lost: 1 (epoch 1)
[ INFO] Trying to remove executor 1 from BlockManagerMaster.
[ WARN] Attempted to mark unknown executor 1 idle
[ INFO] Removing block manager BlockManagerId(1, 10.128.16.2, 46747, None)
[ INFO] Removed 1 successfully in removeExecutor
[ INFO] Executor app-20200531114104-0001/1 removed: Command exited with code 134
[ INFO] Trying to remove executor 1 from BlockManagerMaster.
[ INFO] Removal of executor 1 requested
[ INFO] Asked to remove non-existent executor 1
[Stage 22:>                                                         (0 + 0) / 1][ INFO] Executor added: app-20200531114104-0001/2 on worker-20200531110310-10.128.16.2-39637 (10.128.16.2:39637) with 4 core(s)
[ INFO] Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
[ INFO] Granted executor ID app-20200531114104-0001/2 on hostPort 10.128.16.2:39637 with 4 core(s), 4.0 GB RAM
[ INFO] Executor updated: app-20200531114104-0001/2 is now RUNNING
[ INFO] Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.150.71:56528) with ID 2
[ INFO] Starting task 0.1 in stage 22.0 (TID 43, 10.128.16.2, executor 2, partition 0, ANY, 7870 bytes)
[ INFO] New executor 2 has registered (new total is 1)
[Stage 22:>                                                         (0 + 1) / 1][ INFO] Registering block manager 10.128.16.2:39891 with 2004.6 MB RAM, BlockManagerId(2, 10.128.16.2, 39891, None)
[ INFO] Added broadcast_21_piece0 in memory on 10.128.16.2:39891 (size: 369.0 B, free: 2004.5 MB)
[ INFO] Added broadcast_35_piece3 in memory on 10.128.16.2:39891 (size: 2.6 MB, free: 2001.8 MB)
[ INFO] Added broadcast_35_piece2 in memory on 10.128.16.2:39891 (size: 4.0 MB, free: 1997.8 MB)
[ INFO] Added broadcast_35_piece0 in memory on 10.128.16.2:39891 (size: 4.0 MB, free: 1993.8 MB)
[ INFO] Added broadcast_35_piece1 in memory on 10.128.16.2:39891 (size: 4.0 MB, free: 1989.8 MB)
[ERROR] Lost executor 2 on 10.128.16.2: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

    and so on...

几点澄清:
test_df也适用于180行(与feed_tags size相同);
我试着缓存和不馈送标签,没有注意到任何区别;
如果我这样做了 feed_tags.show() 只有在没有通过管道添加的字段上,一切都正常工作;
集群内的工人数量似乎并不影响结果;
我尝试在启用和禁用动态分配的情况下运行脚本;
我已经尝试过使用与worker相关的内存和内核,但没有得到任何区别;
我正在通过intellij运行脚本,因此我无法尝试增加驱动程序内存(如果可以,请告诉我如何操作)
我认为错误取决于内存问题,而不是网络问题(否则它在任何情况下都不起作用),但是从附带的错误中可以看出,驱动程序和工作程序上仍然有大量可用内存 feed_tags.count() 工作(如果它取决于司机的记忆问题,它不应该工作),所以我不知道还有什么尝试。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题