我正在从ide运行flink。在queryable中存储数据是可行的,但是当我查询它时,它会抛出一个异常。
例外情况
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)])
我的代码:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
我没有产生一个新的小集群或集群。提交喜欢https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/queryablestateitcase.java 因为我想把它放在同一个集群中,在同一个环境中,作为运行env.execute的主应用程序。这一步有必要吗。
默认情况下,flink运行于localhost:6123 is 连接有问题吗?我需要在单独的集群中提交作业吗?
1条答案
按热度按时间7d7tgy0s1#
在谷歌搜索了很多遍之后,我找到了一个解决办法。
我正在使用localstreamenvironment并得到相同的错误,直到发现此线程remoteenv connect失败。所描述的错误是针对不同的设置(不是本地设置),但是用于测试的主题中包含的示例是创建localflinkminicluster,参数“usesingleactorsystem”设置为false。
查看localstreamenvironment的实现,创建微型集群时将“usesingleactorsystem”设置为true。
我只是创建了一个扩展localstreamenvironment的localqueryablestreamenvironment类,在这个类中,创建迷你集群时将“usesingleactorsystem”设置为true,并且一切都在ide中工作。
现在我的代码如下:
配置:
注意:queryablestate只适用于此配置本地\u编号\u任务\u管理器设置为大于1的值!
示例化/执行环境:
要创建客户端:
有关更多信息访问:
apacheflink中的可查询状态-实现
带1.3.0-rc0的可查询状态客户端
我的依赖项: