Apache Spark 如何申请疾控中心?

7xzttuei  于 2023-02-24  发布在  Apache
关注(0)|答案(1)|浏览(124)

请我如何可以应用CDC(变更数据捕获),我用Spark读取的数据库,然后将其保存为 parquet 到HADOOP HDFS.这是代码:

spark = SparkSession \
        .builder \
        .appName("Ingest") \
        .master("local[*]") \
        .config("spark.driver.extraClassPath", "/home.../mysql-connector-java-5.1.30.jar") \
        .getOrCreate()
df = spark.read\
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/classicmodels") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("dbtable", "employees") \
        .option("user", "...") \
        .option("password", "...").load()
print(df.show())
dataframe_mysql.write.parquet("hdfs://localhost:9000/...")

代码返回在 Dataframe 中读取的数据。

fnx2tebb

fnx2tebb1#

Spark不做变更数据捕获,任何批处理/轮询JDBC客户机也不做,因为您总是查询after状态,而不是实际的变更事件
为此,Debezium+Kafka经常被使用,尽管也有替代选择
一旦数据被存储在Kafka中,它就可以被KafkaConnect、Spark等使用,并被写为Parquet

相关问题