apache-kafka 我是否可以通过Azure Databricks将消息作为批处理作业发送到KAFKA群集(一旦我发送的消息用完,就关闭我的连接)?

0s7z1bwu  于 2022-11-01  发布在  Apache
关注(0)|答案(2)|浏览(161)

我想通过Azure数据库每天向Kafka发送一次消息。我想以批处理作业的形式接收消息。
我需要将它们发送到Kafka服务器,但我们不想让集群整天运行此任务。
我看到了databricks的writeStream方法(我还不能让它工作,但这不是我的问题的目的),看起来我需要日夜流才能使它运行。
有没有办法把它作为一个批处理作业来使用?我可以把消息发送到Kafka服务器,并在收到消息后关闭我的集群吗?

df = spark \
        .readStream \
        .format("delta") \
        .option("numPartitions", 5) \
        .option("rowsPerSecond", 5) \
        .load('/mnt/sales/marketing/numbers/DELTA/')

(df.select("Sales", "value")
 .writeStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "rferferfeez.eu-west-1.aws.confluent.cloud:9092")
 .option("topic", "bingofr")
 .option("kafka.sasl.username", "jakich")
 .option("kafka.sasl.password", 'ozifjoijfziaihufzihufazhufhzuhfzuoehza')
 .option("checkpointLocation", "/mnt/sales/marketing/numbers/temp/")
 .option("spark.kafka.clusters.cluster.sasl.token.mechanism", "cluster-buyit")
 .option("request.timeout.ms",30) \
 .option("includeHeaders", "true") \
 .start()
)

kafkashaded.org.apache.kafka.common.errors.TimeoutException:主题bingofr在60000毫秒后不存在于元数据中。

值得注意的是,我们也有事件中心。我是否更好地发送消息到我们的事件中心,并实现一个触发的函数,写Kafka?

vof42yt1

vof42yt11#

通常KAFKA是一个持续的服务/能力。至少,在我去过的地方。
我会考虑像AZURE这样的云服务,其中事件中心是基于每条消息使用的,使用KAFKA API。
否则,您将需要一个批处理作业来启动KAFKA,执行您的执行,然后停止KAFKA。

efzxgjgh

efzxgjgh2#

只是想详细说明@亚历克斯奥特评论,因为它似乎工作。
通过添加“.trigger(availableNow=True)",您可以
定期启动群集,处理自上一个周期以来可用的所有内容,然后关闭群集。在某些情况下,这可能会导致显著的成本节约。
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers


**(

df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
     'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL")

相关问题