我可以从python调用bluemix消息中心服务吗?

qlfbtfca  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(406)

kafkapython客户机支持kafka0.9,但显然不包括新的身份验证和加密功能,因此我猜它只适用于开放服务器(与以前的版本一样)。在任何情况下,甚至java客户机都需要一个特殊的messagehub登录模块来连接(或者在示例中看起来是这样),这意味着除非有一个类似的模块可用于python,否则什么都不起作用。
我的具体场景是,我希望使用同样托管在bluemix(apachespark服务)中的jupyter笔记本中的消息中心服务。

i86rm4rw

i86rm4rw1#

我们在非java语言支持的文档中添加了一些文本—请参阅“在非java应用程序中连接和验证”部分:https://www.ng.bluemix.net/docs/services/messagehub/index.html
我们当前的身份验证方法是非标准的,不受apache项目的支持,只是一个临时解决方案。消息中心团队正在与apachekafka社区合作开发kip-43。一旦完成,我们将更改消息中心身份验证实现以匹配,并且可以用任何语言实现该规范的客户机。

55ooxyrt

55ooxyrt2#

在bluemixpachespark服务本机支持之前,您可以采用与realtime情绪分析项目相同的方法。这方面的帮助程序代码可以在cds labs spark samples github repo上找到。

8aqjt8rx

8aqjt8rx3#

我可以使用kafka python库进行连接:

$ pip install --user kafka-python

然后。。。

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

############################################ 

# Service credentials from Bluemix UI:

############################################ 

bootstrap_servers =   # kafka_brokers_sasl
sasl_plain_username = # user
sasl_plain_password = # password

############################################ 

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2

context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version=(0,10))

# Asynchronous by default

future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

我从BlueMixSpark将其作为jupyter笔记本的一项服务来使用,但是请注意,这种方法没有使用spark。代码只是在驱动程序主机上运行。

bksxznpy

bksxznpy4#

已请求kafka python客户端中的sasl支持:https://github.com/dpkp/kafka-python/issues/533 但在messagehub使用的用户名/密码登录方法得到支持之前,它将无法工作

相关问题