python 定义管道选项后,数据流管道未启动

c6ubokkw  于 2023-02-07  发布在  Python
关注(0)|答案(1)|浏览(97)

我有下面的代码,其中有三个方法:zip提取、获取文件路径和数据重构。
代码应首先执行zip_extract,如果gcp bucket中有任何zip文件,则只进行提取,不会返回任何内容。
接下来,它应该执行get_file_path,这将遍历整个存储桶,并获取其中存在的所有相应文件路径,将其存储在列表中,并将其返回给data_restruct。
Data_restructure获取该列表中存在的每个文件路径,并检查其是否为DICOM,如果文件为DICOM,则将其存储在目标桶中的结构中,如果文件不是DICOM,则将其存储在目标桶中的不同层次结构中。
我为这段代码编写了一个数据流管道,如下所示:

with beam.Pipeline(options=pipeline_options) as p:
        file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
        file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))

,但这会在数据流日志中引发错误消息
The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. Please check the worker logs in Stackdriver Logging. You can also get help with Cloud Dataflow at https://cloud.google.com/dataflow/support."
主要代码:

def zip_extract():
    '''
    Function to unzip a folder in a bucket under a specific hierarchy
    '''
    from google.cloud import storage
    client = storage.Client()
    bucket = client.bucket(landing_bucket)   
    blobs_specific = list(bucket.list_blobs(prefix=data_folder))
    for file_name in blobs_specific:        
    
        file_extension = pathlib.Path(file_name.name).suffix 

        try:

            if file_extension==".zip":    
                destination_blob_pathname = file_name.name        
                blob = bucket.blob(destination_blob_pathname)
                zipbytes = io.BytesIO(blob.download_as_string())

                if is_zipfile(zipbytes):
                    with ZipFile(zipbytes, 'r') as myzip:
                        for contentfilename in myzip.namelist():
                            contentfile = myzip.read(contentfilename)             
                            blob = bucket.blob(f'{file_name.name.replace(".zip","")}/{contentfilename}')
                            blob.upload_from_string(contentfile)
                            
            logging.info("Unzip completed")

        except:
            logging.info('Skipping : {} file format found.'.format(file_extension))
            continue

    client.close

def get_file_path():
    '''
    Function to store all the file paths present in landing bucket into a list 
    '''
    zip_extract()
    file_paths = []
    from google.cloud import storage
    client = storage.Client()
    bucket = client.bucket(landing_bucket)
    blobs_specific = list(bucket.list_blobs(prefix=data_folder))

    try:

        for blob in blobs_specific:         
            file_paths.append("gs://{}/".format(landing_bucket)+blob.name)     
        client.close 
        logging.info("List is ready with data")
        return file_paths  
    except Exception as err:
        logging.error("Error while appending data to list : {}".format(err))
        raise       

def data_restructure(line):
    '''
    params line: String which has the file path
    Function to read each file and check if it is a DICOM file or not, if yes,
    store it in Study-Series-SOP hierarchy else store it in Descriptive folder in Intermediate bucket.
    '''
    from google.cloud import storage
    InstanceUID={}
    client = storage.Client()
    destination_bucket = client.bucket(intermediate_bucket)
    cmd = "gsutil cp {} .\local_folder".format(line)
    result = subprocess.run(cmd,shell=True,capture_output=True,text=True)
    file_name=os.listdir(".\local_folder").pop(0) 

    try:
        dicom_data = dcmread(".\local_folder\{}".format(file_name))
        logging.info("Started reading Dicom file")
            
        for element in dicom_data:

            if element.name in ("Study Instance UID","Series Instance UID","SOP Instance UID","Modality"):                   
                InstanceUID[element.name]=element.value

        destination_bucket = client.bucket(intermediate_bucket)
        blob = destination_bucket.blob('Client/Test/DICOM/{}/{}/{}/{}.dcm'.format(list(InstanceUID.values())[1],list(InstanceUID.values())[2],list(InstanceUID.values())[3],list(InstanceUID.values())[0]))
        blob.upload_from_filename(".\local_folder\{}".format(file_name))
        InstanceUID.clear()   
        logging.info("DICOM file {} uploaded into Intermediate Bucket".format(file_name))        
        os.remove(".\local_folder\{}".format(file_name))    

    except Exception as e:
        
        file_extension = file_name.split("/")[-1].split(".")[-1]

        if file_extension != "zip" and "report" not in file_name and file_extension != "":

            blob = destination_bucket.blob('Test/Descriptive/{}'.format(file_name))
            blob.upload_from_filename(".\local_folder\{}".format(file_name))
            logging.info("Stored file into Descriptive folder")
            os.remove(".\local_folder\{}".format(file_name))

        else:

            blob = destination_bucket.blob('Test/Reports/{}'.format(file_name))
            blob.upload_from_filename(".\local_folder\{}".format(file_name))
            logging.info("Stored Report file into Reports folder")
            os.remove(".\local_folder\{}".format(file_name))

    client.close()

def call_main():
    
    parser = argparse.ArgumentParser()
    path_args, pipeline_args = parser.parse_known_args()
    pipeline_options = PipelineOptions(pipeline_args)
    setup_options= pipeline_options.view_as(SetupOptions)
    setup_options.setup_file='./setup.py'
    setup_options.save_main_session=True
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = "dataflow"+re.sub("[^0-9]+", "-", str(datetime.datetime.now()))
    google_cloud_options.service_account_email = "service_email"
    pipeline_options.view_as(StandardOptions).runner = "DataflowRunner"
    google_cloud_options.staging_location = config["staging_location"]
    google_cloud_options.temp_location = config["temp_location"]
    google_cloud_options.region = config["region"]
    pipeline_options.view_as(WorkerOptions).num_workers = 2
    pipeline_options.view_as(WorkerOptions).machine_type = "n1-standard-2"
    pipeline_options.view_as(WorkerOptions).disk_size_gb = 1024
    pipeline_options.view_as(WorkerOptions).network = vpc_name
    pipeline_options.view_as(WorkerOptions).subnetwork = f'regions/{config["region"]}/subnetworks/{subnet_name}'
    pipeline_options.view_as(WorkerOptions).use_public_ips=False
    

    with beam.Pipeline(options=pipeline_options) as p:
        file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
        file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))


if __name__ == '__main__':
    call_main()

setup.py file:

import setuptools

setuptools.setup(
   name='Installing Packages',
   version='1.0.0',
   install_requires=['google-cloud-datastore==1.15.3',
    'google.cloud.storage==1.16.1', 
   'apache-beam[gcp]==2.31.0',  
'google-api-core==1.33.2',
'google-cloud-core==1.7.3',
'google-cloud-logging == 1.15.1',
'pydicom == 2.3.1',
'uuid == 1.30',
'google-cloud-secret-manager',
'psycopg2-binary'],
   packages=setuptools.find_packages())

我是apache_beam和数据流的新手。请帮助我。我尝试了其他方法来编写数据流管道,但是没有任何效果。
如果我在这里做错了什么,请纠正我.
请告诉我,如果我写转换的方式是正确的或不正确的。如果不正确,请帮助我正确的方式。我坚持这不能进步。
先谢了

ttcibm8c

ttcibm8c1#

此错误
数据流作业似乎被卡住,因为在过去1小时内未发现任何工作进程活动。请在Stackdriver日志记录中检查工作进程日志。你还可以在https://cloud.google.com/dataflow/support."获取有关云数据流的帮助
通常发生在与依赖项安装相关的问题上(与转换无关);

  • 你可以通过查看云日志中的worker启动日志来调试这个问题。你很可能会在安装依赖项时看到pip问题。
  • 您可以尝试其他形式的依赖关系管理(https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/)-自定义容器不太容易出错。
  • 作为一个侧记,没有必要钉梁sdk版本。它将被自动挑选,它可能会导致错误,如果你钉一个版本,但使用一个不同的版本在本地。

相关问题