请我如何可以应用CDC(变更数据捕获),我用Spark读取的数据库,然后将其保存为 parquet 到HADOOP HDFS.这是代码:
spark = SparkSession \
.builder \
.appName("Ingest") \
.master("local[*]") \
.config("spark.driver.extraClassPath", "/home.../mysql-connector-java-5.1.30.jar") \
.getOrCreate()
df = spark.read\
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/classicmodels") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("dbtable", "employees") \
.option("user", "...") \
.option("password", "...").load()
print(df.show())
dataframe_mysql.write.parquet("hdfs://localhost:9000/...")
代码返回在 Dataframe 中读取的数据。
1条答案
按热度按时间fnx2tebb1#
Spark不做变更数据捕获,任何批处理/轮询JDBC客户机也不做,因为您总是查询after状态,而不是实际的变更事件
为此,Debezium+Kafka经常被使用,尽管也有替代选择
一旦数据被存储在Kafka中,它就可以被KafkaConnect、Spark等使用,并被写为Parquet