我有下面的代码,其中有三个方法:zip提取、获取文件路径和数据重构。
代码应首先执行zip_extract,如果gcp bucket中有任何zip文件,则只进行提取,不会返回任何内容。
接下来,它应该执行get_file_path,这将遍历整个存储桶,并获取其中存在的所有相应文件路径,将其存储在列表中,并将其返回给data_restruct。
Data_restructure获取该列表中存在的每个文件路径,并检查其是否为DICOM,如果文件为DICOM,则将其存储在目标桶中的结构中,如果文件不是DICOM,则将其存储在目标桶中的不同层次结构中。
我为这段代码编写了一个数据流管道,如下所示:
with beam.Pipeline(options=pipeline_options) as p:
file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))
,但这会在数据流日志中引发错误消息The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. Please check the worker logs in Stackdriver Logging. You can also get help with Cloud Dataflow at https://cloud.google.com/dataflow/support."
主要代码:
def zip_extract():
'''
Function to unzip a folder in a bucket under a specific hierarchy
'''
from google.cloud import storage
client = storage.Client()
bucket = client.bucket(landing_bucket)
blobs_specific = list(bucket.list_blobs(prefix=data_folder))
for file_name in blobs_specific:
file_extension = pathlib.Path(file_name.name).suffix
try:
if file_extension==".zip":
destination_blob_pathname = file_name.name
blob = bucket.blob(destination_blob_pathname)
zipbytes = io.BytesIO(blob.download_as_string())
if is_zipfile(zipbytes):
with ZipFile(zipbytes, 'r') as myzip:
for contentfilename in myzip.namelist():
contentfile = myzip.read(contentfilename)
blob = bucket.blob(f'{file_name.name.replace(".zip","")}/{contentfilename}')
blob.upload_from_string(contentfile)
logging.info("Unzip completed")
except:
logging.info('Skipping : {} file format found.'.format(file_extension))
continue
client.close
def get_file_path():
'''
Function to store all the file paths present in landing bucket into a list
'''
zip_extract()
file_paths = []
from google.cloud import storage
client = storage.Client()
bucket = client.bucket(landing_bucket)
blobs_specific = list(bucket.list_blobs(prefix=data_folder))
try:
for blob in blobs_specific:
file_paths.append("gs://{}/".format(landing_bucket)+blob.name)
client.close
logging.info("List is ready with data")
return file_paths
except Exception as err:
logging.error("Error while appending data to list : {}".format(err))
raise
def data_restructure(line):
'''
params line: String which has the file path
Function to read each file and check if it is a DICOM file or not, if yes,
store it in Study-Series-SOP hierarchy else store it in Descriptive folder in Intermediate bucket.
'''
from google.cloud import storage
InstanceUID={}
client = storage.Client()
destination_bucket = client.bucket(intermediate_bucket)
cmd = "gsutil cp {} .\local_folder".format(line)
result = subprocess.run(cmd,shell=True,capture_output=True,text=True)
file_name=os.listdir(".\local_folder").pop(0)
try:
dicom_data = dcmread(".\local_folder\{}".format(file_name))
logging.info("Started reading Dicom file")
for element in dicom_data:
if element.name in ("Study Instance UID","Series Instance UID","SOP Instance UID","Modality"):
InstanceUID[element.name]=element.value
destination_bucket = client.bucket(intermediate_bucket)
blob = destination_bucket.blob('Client/Test/DICOM/{}/{}/{}/{}.dcm'.format(list(InstanceUID.values())[1],list(InstanceUID.values())[2],list(InstanceUID.values())[3],list(InstanceUID.values())[0]))
blob.upload_from_filename(".\local_folder\{}".format(file_name))
InstanceUID.clear()
logging.info("DICOM file {} uploaded into Intermediate Bucket".format(file_name))
os.remove(".\local_folder\{}".format(file_name))
except Exception as e:
file_extension = file_name.split("/")[-1].split(".")[-1]
if file_extension != "zip" and "report" not in file_name and file_extension != "":
blob = destination_bucket.blob('Test/Descriptive/{}'.format(file_name))
blob.upload_from_filename(".\local_folder\{}".format(file_name))
logging.info("Stored file into Descriptive folder")
os.remove(".\local_folder\{}".format(file_name))
else:
blob = destination_bucket.blob('Test/Reports/{}'.format(file_name))
blob.upload_from_filename(".\local_folder\{}".format(file_name))
logging.info("Stored Report file into Reports folder")
os.remove(".\local_folder\{}".format(file_name))
client.close()
def call_main():
parser = argparse.ArgumentParser()
path_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
setup_options= pipeline_options.view_as(SetupOptions)
setup_options.setup_file='./setup.py'
setup_options.save_main_session=True
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "dataflow"+re.sub("[^0-9]+", "-", str(datetime.datetime.now()))
google_cloud_options.service_account_email = "service_email"
pipeline_options.view_as(StandardOptions).runner = "DataflowRunner"
google_cloud_options.staging_location = config["staging_location"]
google_cloud_options.temp_location = config["temp_location"]
google_cloud_options.region = config["region"]
pipeline_options.view_as(WorkerOptions).num_workers = 2
pipeline_options.view_as(WorkerOptions).machine_type = "n1-standard-2"
pipeline_options.view_as(WorkerOptions).disk_size_gb = 1024
pipeline_options.view_as(WorkerOptions).network = vpc_name
pipeline_options.view_as(WorkerOptions).subnetwork = f'regions/{config["region"]}/subnetworks/{subnet_name}'
pipeline_options.view_as(WorkerOptions).use_public_ips=False
with beam.Pipeline(options=pipeline_options) as p:
file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))
if __name__ == '__main__':
call_main()
setup.py file:
import setuptools
setuptools.setup(
name='Installing Packages',
version='1.0.0',
install_requires=['google-cloud-datastore==1.15.3',
'google.cloud.storage==1.16.1',
'apache-beam[gcp]==2.31.0',
'google-api-core==1.33.2',
'google-cloud-core==1.7.3',
'google-cloud-logging == 1.15.1',
'pydicom == 2.3.1',
'uuid == 1.30',
'google-cloud-secret-manager',
'psycopg2-binary'],
packages=setuptools.find_packages())
我是apache_beam和数据流的新手。请帮助我。我尝试了其他方法来编写数据流管道,但是没有任何效果。
如果我在这里做错了什么,请纠正我.
请告诉我,如果我写转换的方式是正确的或不正确的。如果不正确,请帮助我正确的方式。我坚持这不能进步。
先谢了
1条答案
按热度按时间ttcibm8c1#
此错误
数据流作业似乎被卡住,因为在过去1小时内未发现任何工作进程活动。请在Stackdriver日志记录中检查工作进程日志。你还可以在https://cloud.google.com/dataflow/support."获取有关云数据流的帮助
通常发生在与依赖项安装相关的问题上(与转换无关);