//转换导入org.apache.flink.api.scala_
val s1mmePulsarProp = new Properties(){
put("service.url", "pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
//https://xxx1:8443,xxx2:8443,xxx3:8443
put("admin.url", "https://xxx2:8443")
//put("partitionDiscoveryIntervalMillis", "5000")
put("startingOffsets", "latest")
put("topic", "persistent://bigdatatest/bigdatatest-namespace/5g_xdr_s1mme_testp4")
}
//Authentication: certFilePath, keyFilePath
/*val tlsAuth: Authentication = AuthenticationFactory.TLS(
"/usr/tools/pulsartls/bigdatatest.cert.pem",
"/usr/tools/pulsartls/bigdatatest.key-pk8.pem")*/
val tlsAuthMap = new util.HashMap[String,String]()
tlsAuthMap.put("tlsCertFile", "/usr/tools/pulsartls/bigdatatest.cert.pem")
tlsAuthMap.put("tlsKeyFile", "/usr/tools/pulsartls/bigdatatest.key-pk8.pem")
val tlsAuth: Authentication = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationTls",
tlsAuthMap)
//ClientConfigurationData
val pulsarClientConf = new ClientConfigurationData
pulsarClientConf.setServiceUrl("pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
pulsarClientConf.setUseTls(true)
pulsarClientConf.setTlsAllowInsecureConnection(true)
pulsarClientConf.setTlsTrustCertsFilePath("/usr/tools/pulsartls/ca.cert.pem")
//clientConf.setTlsHostnameVerificationEnable(false)
pulsarClientConf.setAuthentication(tlsAuth)
pulsarClientConf.setUseTcpNoDelay(true)
val s1mmePulsarConsumer = new FlinkPulsarSource("https://xxx.xxx.xxx.xxx:8443",
pulsarClientConf,
new SimpleStringSchema(),
s1mmePulsarProp)
.setStartFromLatest()
val s1mmePulsarSource = env.addSource(s1mmePulsarConsumer)
.filter(_.trim.nonEmpty)
.assignTimestampsAndWatermarks(new timestampExtractor(8))
.name("s1mmesource")
s1mmePulsarSource.addSink(hadoopSink("hdfs://encrypt_data/ninecon/s1mmepulsar/s1mme/",8))
.name("s1mme2hdfs")
我刚刚编写了这些代码,但是flink代理总是抛出一个错误:由org.apache.pulsar.client.admin.pulsaradminexception$notauthorizedexception:http401unauthorized引起
暂无答案!
目前还没有任何答案,快来回答吧!