这个问题可能看起来相当大,但我有两个具体的情况,最好放在一起,而不是分开。首先,我把Kafka的数据读入 dstream
使用spark流kafka api。假设我有以下两种情况之一:
// something goes wrong on the driver
dstream.transform { rdd =>
throw new Exception
}
// something goes wrong on the executors
dstream.transform { rdd =>
rdd.foreachPartition { partition =>
throw new Exception
}
}
这通常描述了我需要停止应用程序时可能发生的一些情况—在驱动程序或其中一个执行器上引发异常(例如,未能到达对处理至关重要的某个外部服务)。如果您在本地尝试此操作,应用程序会立即失败。更多代码:
dstream.foreachRDD { rdd =>
// write rdd data to some output
// update the kafka offsets
}
这是我的应用程序中发生的最后一件事-将数据推入Kafka,然后确保移动Kafka中的偏移量以避免重新处理。
其他注意事项:
我正在用马拉松在mesos上运行spark 2.0.1
已禁用检查点和预写日志
我希望应用程序在抛出异常的情况下关闭(就像我在本地运行它一样),因为我需要一个快速失败的行为。现在经常发生的情况是,在异常发生后,应用程序仍然显示为正在运行;更糟糕的是,在某些情况下仍然可以访问spark ui,尽管不再处理任何内容。
原因可能是什么?
1条答案
按热度按时间nkcskrwz1#
您的示例只显示了转换。对于spark-only操作,抛出异常是因为它们懒散地执行转换。我猜任何试图把结果写在某个地方的尝试都会很快失败。