如何使用动态资源分配运行spark+cassandra+mesos(dcos)?

u4vypkhs  于 2021-06-26  发布在  Mesos
关注(0)|答案(2)|浏览(454)

在通过马拉松的每个从节点上,我们运行mesos外部洗牌服务。当我们在粗粒度模式下通过dcos cli提交spark作业时,没有动态分配,一切都按预期工作。但是当我们用动态分配提交同一个作业时,它失败了。

16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file:/tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at   org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)

完整描述:

我们使用azure门户安装了mesos(dcos)和marathon。
通过我们安装的宇宙包:Cassandra,Spark和马拉松lb
我们在cassandra中生成了测试数据。
我在笔记本电脑上安装了dcos cli
当我提交如下作业时,一切都按预期进行:

./dcos spark run --submit-args="--properties-file coarse-grained.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208185927-0043

cqlsh:sp> select count(*) from product_model_per_alerts_by_date ;

count
-------
476

粗粒度.conf:

spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.executor.instances 2
spark.submit.deployMode cluster
spark.cores.max 4

portal.spark.cassandra.app.productmodelpernrofalerts公司:

package portal.spark.cassandra.app

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object ProductModelPerNrOfAlerts {
  def main(args: Array[String]): Unit = {

     val conf = new SparkConf(true)
                    .setAppName("cassandraSpark-ProductModelPerNrOfAlerts")

     val sc = new SparkContext(conf)

     val sqlContext = new SQLContext(sc)

     import sqlContext.implicits._

     val df = sqlContext
             .read
             .format("org.apache.spark.sql.cassandra")
             .options(Map("table" -> "asset_history", "keyspace" -> "sp"))
            .load()
            .select("datestamp","product_model","nr_of_alerts")

     val dr = df
           .groupBy("datestamp","product_model")
           .avg("nr_of_alerts")
           .toDF("datestamp","product_model","nr_of_alerts")

     dr.write
          .mode(SaveMode.Overwrite)
          .format("org.apache.spark.sql.cassandra")
          .options(Map("table" -> "product_model_per_alerts_by_date", "keyspace" -> "sp"))
          .save()

     sc.stop()
 }
}

动态分配

通过马拉松,我们运行mesos外部洗牌服务:

{
  "id": "spark-mesos-external-shuffle-service-tt",
  "container": {
     "type": "DOCKER",
     "docker": {
        "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
        "network": "BRIDGE",
        "portMappings": [
           { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
         ],
       "forcePullImage":true,
       "volumes": [
         {
           "containerPath": "/tmp",
           "hostPath": "/tmp",
           "mode": "RW"
         }
       ]
     }
   },
   "instances": 9,
   "cpus": 0.2,
   "mem": 512,
   "constraints": [["hostname", "UNIQUE"]]
 }

jpavt/mesos spark的dockerfile-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1:

FROM mesosphere/spark:1.0.4-2.0.1
WORKDIR /opt/spark/dist
ENTRYPOINT ["./bin/spark-class", "org.apache.spark.deploy.mesos.MesosExternalShuffleService"]

现在,当我提交带有动态分配的作业时,它失败了:

./dcos spark run --submit-args="--properties-file dynamic-allocation.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
 Run job succeeded. Submission id: driver-20161208191958-0047

select count(*) from product_model_per_alerts_by_date ;

count
-------
 5

动态分配.conf:

spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.submit.deployMode cluster
spark.cores.max 4

spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 5
spark.dynamicAllocation.cachedExecutorIdleTimeout 120s
spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 20s
spark.mesos.executor.docker.volumes /tmp:/tmp:rw
spark.local.dir /tmp

来自mesos的日志:

16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.0 KB, free 366.0 MB)
16/12/08 19:20:42 INFO TorrentBroadcast: Reading broadcast variable 7 took 21 ms
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 38.6 KB, free 366.0 MB)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.32.0.4:45422)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Got the output locations
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 58 blocks
16/12/08 19:20:42 INFO TransportClientFactory: Successfully created connection to /10.32.0.11:7337 after 2 ms (0 ms spent in bootstraps)
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 13 ms
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at   org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
 Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)

来自marathon spark mesos外部洗牌服务tt的日志:

...
16/12/08 19:20:29 INFO MesosExternalShuffleBlockHandler: Received registration request from app 704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047 (remote address /10.32.0.4:49710, heartbeat timeout 120000 ms).
16/12/08 19:20:31 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
16/12/08 19:20:38 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8157825166903585542
java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index (No such file or directory)
...

但文件存在于给定的从属框中:

$ ls -l /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
-rw-r--r-- 1 root root 1608 Dec  8 19:20 /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index

 stat shuffle_0_55_0.index 
  File: 'shuffle_0_55_0.index'
  Size: 1608        Blocks: 8          IO Block: 4096   regular file
  Device: 801h/2049d    Inode: 1805493     Links: 1
  Access: (0644/-rw-r--r--)  Uid: (    0/    root)   Gid: (    0/    root)
  Access: 2016-12-08 19:20:38.163188836 +0000
  Modify: 2016-12-08 19:20:38.163188836 +0000
  Change: 2016-12-08 19:20:38.163188836 +0000
  Birth: -
5m1hhzi4

5m1hhzi41#

外部洗牌服务配置(而不是路径)中出错 container.docker.volumes 我们应该使用 container.volumes 路径。
正确配置:

{
  "id": "mesos-external-shuffle-service-simple",
  "container": {
     "type": "DOCKER",
     "docker": {
        "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
        "network": "BRIDGE",
        "portMappings": [
           { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
         ],
       "forcePullImage":true
     },
    "volumes": [
         {
           "containerPath": "/tmp",
           "hostPath": "/tmp",
           "mode": "RW"
         }
    ]
   },
   "instances": 9,
   "cpus": 0.2,
   "mem": 512,
   "constraints": [["hostname", "UNIQUE"]]
 }
vuv7lop3

vuv7lop32#

虽然我不熟悉dcos、marathon和azure,但我在mesos和aurora上使用动态资源分配(mesos外部shuffle服务)和docker。
每个mesos代理节点都有自己的外部洗牌服务(即,一个mesos代理有一个外部洗牌服务)? spark.local.dir 设置是完全相同的字符串并且指向相同的目录?你的 spark.local.dir 因为洗牌服务是 /tmp 不过,我不知道dcos设置。 spark.local.dir 目录可以可读/写吗?如果mesos代理和外部洗牌服务都由docker启动, spark.local.dir 在主机上必须安装到两个容器上。
编辑
如果设置了spark\u local\u dirs(mesos或standalone)环境变量, spark.local.dir 将被覆盖。

相关问题