我正在研究一个用例,在这个用例中,我阅读了一个带有databricks的eventhub事件。特定的用例是,每次将文件添加到存储帐户目录时,都会触发一个事件(使用eventgrid)并通过eventhub使用databricks处理它。
每次插入文件时,到达eventhub的json都具有以下结构:
{
"topic":"/subscriptions/b87ed442-9d87-4e71-8784-f72e6da5b77e/resourceGroups/rsg/providers/Microsoft.Storage/storageAccounts/storage_account_name",
"subject":"/blobServices/default/containers/container/blobs/path/to/file.xml",
"eventType": "Microsoft.Storage.BlobCreated",
"id":"02e27363-a01e-0000-7218-fb8871065026",
"data":{
"api": "PutBlob",
"requestId":"02e27363-a01e-0000-7218-fb8871000000",
"eTag":"0x8D8C92FA9FDAB6D",
"contentType": "application/octet-stream",
"contentLength":103809024,
"blobType": "BlockBlob",
"blobUrl":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml",
"url":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml",
"sequencer":"000000000000000000000000000027c30000000000005204",
"storageDiagnostics":{
"batchId":"00ea9f48-5f8c-42ee-8323-da91a026e701"
}
},
}, "dataVersion":"",
"metadataVersion": "1",
"eventTime":"2021-02-04T17:09:42.5557255Z",
"EventProcessedUtcTime":"2021-02-04T17:20:27.8325561Z",
"PartitionId":0,
"EventEnqueuedUtcTime":"2021-02-04T17:09:42.9170000Z"
}
在databricks中,我读到了结构化流式处理这个事件并提取 url
json的字段。
我们的想法是,对于每个批,使用以下方法创建一个pysparkDataframe:
path = "<<URL value of the JSON>>"
spark.read.format("binaryFile").load(path)
有没有什么方法可以直接加载 https://
或者我应该在读取文件之前实现一些逻辑来挂载目录?
我想知道有没有更快的办法。
非常感谢!
pd:下面是我试图实现所有解决方案的代码:
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
df = stream_data.withColumn("body", stream_data_df["body"].cast("string"))
def parse_json(array_str):
json_obj = json.loads(array_str)
return json_obj[0]['data']['url']
extract_url = udf(parse_json)
url_df= df.withColumn("url",extract_url(df.body))
def getData(url):
binary = spark.read.format("binaryFile").load(url)
binary.show()
def loadData(batchDf, batchId):
url_select_df = batchDf.select("url")
url_collect = url_select_df.collect()
[getData(item) for item in url_collect]
url_df.writeStream.foreachBatch(loadData).outputMode("append").start().awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!