长话短说:当使用beam管道简单地读取tf记录文件并将其写入另一个文件时,它可以工作。但是,当使用它读取匹配多个tf记录文件的输入文件模式并将内容写入单个输出文件时,它会自动删除输出文件,并在运行结束时发出警告。我是新来的Apacheè束和Spark,所以任何帮助是感激的。
具体情况如下
我试着在spark上运行一个简单的beam脚本。spark设置如下:
./sbin/start-master.sh -h localhost -p 1313
./sbin/start-slave.sh spark://localhost:1313 -c 1 -m 128G
我设立了一个就业服务如下(以下https://beam.apache.org/documentation/runners/spark/):
docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:1313
beam python脚本如下(它只读取一个或多个tf记录并将其写入输出文件):
import apache_beam as beam
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
import sys
input_file = sys.argv[1]
output_file = sys.argv[2]
options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
)
with beam.Pipeline(options=options) as pipeline:
filepattern = input_file
readers = []
readers.append(pipeline
| 'ReadTFRecordFiles_{}[{}]'.format(0, filepattern) >> beam
.io.ReadFromTFRecord(filepattern, coder=coders.BytesCoder()))
input_data = readers | 'Flatten' >> beam.Flatten()
_ = input_data | beam.io.WriteToTFRecord(
file_path_prefix=output_file,
file_name_suffix='.tfrecord.gz',
coder=coders.BytesCoder())
我使用spark submit以以下方式运行它: ./bin/spark-submit --master spark://localhost:1313 --driver-memory 128G --executor-memory 256G ~/test_spark_writer.py $input_file /tmp/portable
当我设置 $input_file
到单个文件(例如, $FILEPATH/training_set.with_label.tfrecord-00000-of-00030.gz
),输出文件可用。但是,当我 $input_file
文件模式(例如, $FILEPATH/training_set.with_label.tfrecord-?????-of-00030.gz
),我看到以下警告,并且没有可用的输出文件 WARNING:apache_beam.io.filebasedsink:Deleting 1 existing files in target path matching: -*-of-%(num_shards)05d
我确保在每次操作之前清除输出目录中以前的任何输出文件 spark-submit
.
注意:使用directrunner而不是portablerunner时的行为是相同的,portablerunner不需要spark设置。
暂无答案!
目前还没有任何答案,快来回答吧!