如何使用flink pulsar connector auth by tls使用pulsar?

fdbelqdn  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(370)

//转换导入org.apache.flink.api.scala_

val s1mmePulsarProp = new Properties(){
  put("service.url", "pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
  //https://xxx1:8443,xxx2:8443,xxx3:8443
  put("admin.url", "https://xxx2:8443")
  //put("partitionDiscoveryIntervalMillis", "5000")
  put("startingOffsets", "latest")
  put("topic", "persistent://bigdatatest/bigdatatest-namespace/5g_xdr_s1mme_testp4")
}
//Authentication: certFilePath, keyFilePath
/*val tlsAuth: Authentication = AuthenticationFactory.TLS(
  "/usr/tools/pulsartls/bigdatatest.cert.pem",
  "/usr/tools/pulsartls/bigdatatest.key-pk8.pem")*/
val tlsAuthMap = new util.HashMap[String,String]()
tlsAuthMap.put("tlsCertFile", "/usr/tools/pulsartls/bigdatatest.cert.pem")
tlsAuthMap.put("tlsKeyFile", "/usr/tools/pulsartls/bigdatatest.key-pk8.pem")
val tlsAuth: Authentication = AuthenticationFactory.create(
  "org.apache.pulsar.client.impl.auth.AuthenticationTls",
  tlsAuthMap)

//ClientConfigurationData
val pulsarClientConf = new ClientConfigurationData
pulsarClientConf.setServiceUrl("pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
pulsarClientConf.setUseTls(true)
pulsarClientConf.setTlsAllowInsecureConnection(true)
pulsarClientConf.setTlsTrustCertsFilePath("/usr/tools/pulsartls/ca.cert.pem")
//clientConf.setTlsHostnameVerificationEnable(false)
pulsarClientConf.setAuthentication(tlsAuth)
pulsarClientConf.setUseTcpNoDelay(true)

val s1mmePulsarConsumer = new FlinkPulsarSource("https://xxx.xxx.xxx.xxx:8443",
                                        pulsarClientConf,
                                        new SimpleStringSchema(),
                                        s1mmePulsarProp)
                               .setStartFromLatest()

val s1mmePulsarSource = env.addSource(s1mmePulsarConsumer)
  .filter(_.trim.nonEmpty)
  .assignTimestampsAndWatermarks(new timestampExtractor(8))
  .name("s1mmesource")

s1mmePulsarSource.addSink(hadoopSink("hdfs://encrypt_data/ninecon/s1mmepulsar/s1mme/",8))
  .name("s1mme2hdfs")

我刚刚编写了这些代码,但是flink代理总是抛出一个错误:由org.apache.pulsar.client.admin.pulsaradminexception$notauthorizedexception:http401unauthorized引起

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题