我想通过Azure数据库每天向Kafka发送一次消息。我想以批处理作业的形式接收消息。
我需要将它们发送到Kafka服务器,但我们不想让集群整天运行此任务。
我看到了databricks的writeStream方法(我还不能让它工作,但这不是我的问题的目的),看起来我需要日夜流才能使它运行。
有没有办法把它作为一个批处理作业来使用?我可以把消息发送到Kafka服务器,并在收到消息后关闭我的集群吗?
df = spark \
.readStream \
.format("delta") \
.option("numPartitions", 5) \
.option("rowsPerSecond", 5) \
.load('/mnt/sales/marketing/numbers/DELTA/')
(df.select("Sales", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "rferferfeez.eu-west-1.aws.confluent.cloud:9092")
.option("topic", "bingofr")
.option("kafka.sasl.username", "jakich")
.option("kafka.sasl.password", 'ozifjoijfziaihufzihufazhufhzuhfzuoehza')
.option("checkpointLocation", "/mnt/sales/marketing/numbers/temp/")
.option("spark.kafka.clusters.cluster.sasl.token.mechanism", "cluster-buyit")
.option("request.timeout.ms",30) \
.option("includeHeaders", "true") \
.start()
)
kafkashaded.org.apache.kafka.common.errors.TimeoutException:主题bingofr在60000毫秒后不存在于元数据中。
值得注意的是,我们也有事件中心。我是否更好地发送消息到我们的事件中心,并实现一个触发的函数,写Kafka?
2条答案
按热度按时间vof42yt11#
通常KAFKA是一个持续的服务/能力。至少,在我去过的地方。
我会考虑像AZURE这样的云服务,其中事件中心是基于每条消息使用的,使用KAFKA API。
否则,您将需要一个批处理作业来启动KAFKA,执行您的执行,然后停止KAFKA。
efzxgjgh2#
只是想详细说明@亚历克斯奥特评论,因为它似乎工作。
通过添加“.trigger(availableNow=True)",您可以
定期启动群集,处理自上一个周期以来可用的所有内容,然后关闭群集。在某些情况下,这可能会导致显著的成本节约。
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers