我正在尝试用spark结构化流连接远程cassandra节点。
我可以在本地计算机上连接到现有的cassandra节点。
这是我可以在本地计算机上连接cassandra的代码:
parsed = parsed_df \
.withWatermark("sourceTimeStamp", "10 minutes") \
.groupBy(
window(parsed_df.sourceTimeStamp, "4 seconds"),
parsed_df.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.withColumnRenamed("window", "sourceTime")
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="opc", keyspace="poc")\
.save()
parsed.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()
但是,我想连接远程cassandra节点。我该怎么说呢?
1条答案
按热度按时间pdsfdshx1#
要连接到远程主机,您需要在
spark.cassandra.connection.host
spark的configuration属性-这可以通过命令行参数(最灵活)或在代码中完成。如果cassandra集群使用身份验证,那么您需要提供spark.cassandra.auth.username
以及spark.cassandra.auth.password
属性。对于ssl和其他内容,请参阅参数参考。