spark streaming-从s3/cos读取文件失败

mnowg1ta  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(377)

我想提交一份 wordcount 在kubernetes集群上运行的spark作业,如下面的命令所示。

./bin/spark-submit \
    --master k8s://https://c111.us-south.containers.cloud.ibm.com:32206 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi --packages com.ibm.stocator:stocator:1.1.3 \
    --conf spark.executor.instances=5 --conf spark.hadoop.fs.cos.myobjectstorage.access.key= --conf spark.hadoop.fs.cos.myobjectstorage.secret.key= --conf spark.hadoop.fs.stocator.scheme.list=cos --conf spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem --conf spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient --conf spark.hadoop.fs.stocator.cos.scheme=cos --conf spark.jars.ivy=/tmp/.ivy\
    --conf spark.kubernetes.container.image=us.icr.io/mods15/spark-py:v1 --conf spark.hadoop.fs.cos.myobjectstorage.endpoint=http://s3.us.cloud-object-storage.appdomain.cloud --conf spark.hadoop.fs.cos.myobjectstorage.v2.signer.type=false --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt

在我将spark streaming添加到python代码示例之前,一切都正常,如下所示 StreamingContext.textFileStream 因为filestream在python中不可用。我在日志中没有看到任何错误,但是写入cos文件夹的输出是空的(没有任何字数)。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time 

def time_in_seconds():
    seconds=time.time()
    return seconds

timeInSeconds=time_in_seconds()
sc = SparkContext("local[2]", "WordCount")
ssc = StreamingContext(sc, 60)
lines = ssc.textFileStream("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/ES_TOPIC_NAME/")

# Split each line into words

words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.pprint()

wordCounts.saveAsTextFiles(f"cos://COS_BUCKET_NAME.COS_SERVICE_NAME/results/wordcount-result-{timeInSeconds}")

ssc.start()
ssc.awaitTermination()

我找不到任何关于如何在kubernetes上运行spark流的文档。我假设从cos桶读取失败。命令或python wordcount示例中是否缺少任何内容?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题