如何使用pyspark将数据流到mysql数据库?

n3h0vuf2  于 2021-06-18  发布在  Mysql
关注(0)|答案(1)|浏览(405)

我目前正在开发一个单页web应用程序,它允许用户将大型csv文件(目前正在测试一个~7gb的文件)上传到flask服务器,然后将数据流传输到数据库。上传大约需要一分钟,文件将完全保存到flask服务器上的临时文件中。现在我需要能够流式处理这个文件并将其存储到数据库中。我做了一些研究,发现pyspark非常适合流式数据,我选择mysql作为数据库来流式传输csv数据(但我对其他dbs和流式方法持开放态度)。我是一个初级开发人员和pyspark的新成员,所以我不知道该怎么做。spark流媒体指南说数据必须通过kafka、flume、tcp socets等源接收,所以我想知道是否必须使用这些方法中的任何一种才能将csv文件导入spark。然而,我遇到了一个很好的例子,他们正在将csv数据流式传输到azure sql数据库中,看起来他们只是直接使用spark读取文件,而不需要通过kafka这样的流式源接收它,这个例子唯一让我困惑的是,他们使用hdinsight spark集群将数据流传输到db中,我不知道如何将所有这些都与flask服务器结合起来。我对缺少代码表示歉意,但目前我只有一个flask服务器文件,其中一个路由执行文件上载。任何例子,教程,或建议将不胜感激。

ma8fv8wu

ma8fv8wu1#

我不确定流媒体部分,但spark可以有效地处理大型文件-并且存储到db表将并行进行,因此不太了解您的详细信息,如果您的服务器上有上载的文件,我会说:
如果我想在表中保存一个像csv这样的大型结构化文件,我会这样开始:


# start with some basic spark configuration, e.g. we want the timezone to be UTC

conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')

# this is important: you need to have the mysql connector jar for the right mysql version:

conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')

# instantiate a spark session: the first time it will take a few seconds

spark = SparkSession.builder \
    .config(conf=conf) \
    .appName('Huge File uploader') \
    .getOrCreate()

# read the file first as a dataframe

df = spark.read.csv('path to 7GB/ huge csv file')

# optionally, add a filename column

from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))

# write it to the table

df.write.format('jdbc').options(
            url='e.g. localhost:port',
            driver='com.mysql.cj.jdbc.Driver',  # the driver for MySQL
            dbtable='the table name to save to',
            user='user',
            password='secret',
        ).mode('append').save()

注意这里的“append”模式:这里的问题是spark不能对表执行更新,它要么追加新行,要么替换表中的内容。
所以,如果你的csv是这样的:

id, name, address....

您将得到一个具有相同字段的表。
这是我能想到的最基本的例子,所以你可以从spark开始,不考虑spark集群或其他相关的东西。我建议你拿着这个转一转,看看这是否适合你的需要:)
另外,请记住,这可能需要几秒钟或更长时间,这取决于您的数据、数据库所在的位置、您的计算机和数据库负载,因此最好与api保持异步,同样,我不知道您的任何其他详细信息。
希望这有帮助。祝你好运!

相关问题