连接穆勒索夫特和Kafka在希罗库举办

8e2ybdfx  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(514)

社区!
我正在尝试将我的mulesoft应用程序连接到托管在heroku的kafka服务器。anypoint studio 7(mule 4)有一个kafka连接器,它有几个连接选项(basic、kerberos、kerberos ssl和ssl):

根据heroku的文档,它“支持”ssl,但没有提到这是必需的。有人能证实吗?
我看到的heroku文档[https://devcenter.heroku.com/articles/kafka-on-heroku]
当我在heroku安装一个应用程序,添加kafka插件,创建主题时,我从运行命令得到引导服务器 heroku config:get KAFKA_URL 如果我尝试进行基本测试,会出现以下错误:

错误状态 org.mule.runtime.api.connection.ConnectionException: invalid connection! org.mule.runtime.api.connection.ConnectionException: invalid connection! Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. 我要建立的是一个有两个流的poc。。。一个产生一条消息到一个主题,另一个从中产生消息流。

任何关于如何设置连接器和heroku环境的帮助都是非常受欢迎的
更新:
我发现在heroku创建应用程序并包含kafka插件时,会得到以下变量:
kafka\u url:一个逗号分隔的ssl url列表,指向构成集群的kafka代理。例子:
Kafka+ssl://ec2-3-*-100.compute-1.amazonaws公司。com:9096,Kafka+ssl://ec2-3-**-127.compute-1.amazonaws公司。com:9096
kafka\u trusted\u cert:代理的ssl证书(pem格式),用于检查您是否连接到正确的服务器。例子:
-----开始证书-----miidfzcCamegawibagadanbgkqhkig9w0baqsfadaymtawlgydvqddcdcjys1l zjqwowuzny00njhhltrimgetogvkoc0wzwyxmmrhyjkyzwwhchcnmjkxmjeymtux nzu5whcnmjkxmjeymtuxnzu5wjaymtawlgydvqddcy1lzjqwowuzny00njhh
kafka\u client\u cert:根据代理对客户端进行身份验证所需的客户端证书(pem格式)。例子:
-----开始证书-----miidqzccaiugawibagibadanbgkqhkig9w0baqsfadaymtawlgydvqqddcdjys1l zjqwowuzny00njhhltrimgetogvkoc0wzwyxmmrhyjkyzwwhchcnmzawmte1mtu1 mju2wjazmrcwqydvqdda51nnztywzm2cwznmyjcc
kafka\u client\u cert\u key:根据代理对客户端进行身份验证所需的客户端证书密钥(采用pem格式)。kafka群集需要使用提供的客户端证书进行身份验证。任何不使用客户端证书的请求都将被拒绝。例子:
-----开始rsa私钥-----miiepaibakcaqeammu+j9dulvnqwiot02++6ehw9mg7kaocdiqjodvtvipc5ayk iahsdnvh9bgjqajststiv/6o1mclmjus/ymyieegmbvatfxcldpgbgcppvsv2r3q
我假设我需要将它们“转换”为连接器ssl配置的密钥库和信任库。。有人能证实吗?因为我从文档中看到,ssl是必需的
另一个更新:
我下载了上面描述的证书,并使用工具“key explorer”创建了keystore.jks并将kafka\u client\u cert和kafka\u client\u cert\u密钥导入其中,然后创建了truststore.jsk并将文件kafka\u trusted\u cert导入其中。在这两种情况下,我都设置了一个基本密码。。。看起来不错,但我有个错误:

org.mule.runtime.api.connection.ConnectionException: invalid connection!
  org.mule.runtime.api.connection.ConnectionException: invalid connection!
  Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
    at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1521)
    at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:528)
    at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1197)
    at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1165)
    at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:448)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:313)
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196)
    at java.lang.Thread.run(Thread.java:748)
  Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
    at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
    at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1709)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:318)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:310)
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1639)
    at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:223)
    at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1037)
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:970)
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:967)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1459)
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
    ... 7 more
  Caused by: java.security.cert.CertificateException: No name matching ec2-3-220-121-33.compute-1.amazonaws.com found
    at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
    at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1626)
    ... 16 more

可能与线路有关,原因是:
java.security.cert.certificateexception:找不到与ec2-3-220-121-33.compute-1.amazonaws.com匹配的名称

iswrvxsc

iswrvxsc1#

我让它工作了。

使用这些命令创建jks文件(需要heroku cli)

client_key=`heroku config:get KAFKA_CLIENT_CERT_KEY --app <SET_HEROKU_APP_NAME_HERE>`
client_cert=`heroku config:get KAFKA_CLIENT_CERT --app <SET_HEROKU_APP_NAME_HERE>`
trusted_cert=`heroku config:get KAFKA_TRUSTED_CERT --app <SET_HEROKU_APP_NAME_HERE>`

# Write config vars to files.

echo "$client_key" >> keystore.pem
echo -n "$client_cert" >> keystore.pem
echo -n "$trusted_cert" > truststore.pem

# Set passwords

TRUSTSTORE_PASSWORD=<SET_PASSWORD_HERE>
KEYSTORE_PASSWORD=<SET_PASSWORD_HERE>
echo $TRUSTSTORE_PASSWORD
echo $KEYSTORE_PASSWORD

# Import cert.

keytool -importcert -file truststore.pem -keystore kafka.client.truststore.jks -deststorepass $TRUSTSTORE_PASSWORD -noprompt

# Create PKCS12 file.

openssl pkcs12 -export -in keystore.pem -out keystore.pkcs12 -password pass:$KEYSTORE_PASSWORD

# Create jks files.

keytool -importkeystore -srcstoretype PKCS12 \
    -destkeystore kafka.client.keystore.jks -deststorepass $KEYSTORE_PASSWORD \
    -srckeystore keystore.pkcs12 -srcstorepass $KEYSTORE_PASSWORD

然后使用ssl选项配置连接器并添加以下内联:

相关问题