FlinkQueryableState:本地集群上的配置问题
FlinkQueryableState: configuration issues on a local cluster
我是 运行 来自 IDE 的 flink。在可查询中存储数据是有效的,
但是不知何故,当我查询它时,它会抛出异常。
异常
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)])
我的代码:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
I am not spawning a new mini-cluster or cluster.submit
like https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
as I want to this in the same cluster in the same environment as main app running with env.execute. Is that step necessary.
根据文档,默认情况下 flink 在 localhost:6123 运行
连接有问题吗?我需要在单独的集群中提交作业吗?
经过大量谷歌搜索后,我找到了解决方案。
我正在使用 LocalStreamEnvironment 并遇到相同的错误,直到发现用于测试的主题中包含的这个线程 RemoteEnv connect failed. The error described is for a different setup(not locally) but the gist 示例正在创建参数 "useSingleActorSystem" 设置为 [=44 的 LocalFlinkMiniCluster =]假。
查看 LocalStreamEnvironment 的实现,创建 MiniCluster 时 "useSingleActorSystem" 设置为 true。
我只是创建了一个 class LocalQueryableStreamEnvironment 扩展 LocalStreamEnvironment,其中创建了迷你集群 "useSingleActorSystem" 设置为 true,一切都是从 IDE.
开始工作
现在我的代码如下:
配置:
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**
注意:QueryableState 仅适用于此配置 LOCAL_NUMBER_TASK_MANAGER 设置为大于 1 的值!
Instantiate/execute环境:
LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
...
env.addSource(anySource)
.keyby(anyAtribute)
.flatmap(new UpdateMyStateToBeQueriedLaterMapper())
.addSink(..); //etc
...
env.execute("JobNameHere");
并创建客户端:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
);
return new QueryableStateClient(config,highAvailabilityServices);
更多信息访问:
Queryable States in ApacheFlink - Implementation
Queryable State Client with 1.3.0-rc0
我的依赖项:
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'
我是 运行 来自 IDE 的 flink。在可查询中存储数据是有效的, 但是不知何故,当我查询它时,它会抛出异常。
异常
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)])
我的代码:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
I am not spawning a new mini-cluster or cluster.submit like https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java as I want to this in the same cluster in the same environment as main app running with env.execute. Is that step necessary.
根据文档,默认情况下 flink 在 localhost:6123 运行 连接有问题吗?我需要在单独的集群中提交作业吗?
经过大量谷歌搜索后,我找到了解决方案。
我正在使用 LocalStreamEnvironment 并遇到相同的错误,直到发现用于测试的主题中包含的这个线程 RemoteEnv connect failed. The error described is for a different setup(not locally) but the gist 示例正在创建参数 "useSingleActorSystem" 设置为 [=44 的 LocalFlinkMiniCluster =]假。
查看 LocalStreamEnvironment 的实现,创建 MiniCluster 时 "useSingleActorSystem" 设置为 true。
我只是创建了一个 class LocalQueryableStreamEnvironment 扩展 LocalStreamEnvironment,其中创建了迷你集群 "useSingleActorSystem" 设置为 true,一切都是从 IDE.
开始工作现在我的代码如下:
配置:
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**
注意:QueryableState 仅适用于此配置 LOCAL_NUMBER_TASK_MANAGER 设置为大于 1 的值!
Instantiate/execute环境:
LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
...
env.addSource(anySource)
.keyby(anyAtribute)
.flatmap(new UpdateMyStateToBeQueriedLaterMapper())
.addSink(..); //etc
...
env.execute("JobNameHere");
并创建客户端:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
);
return new QueryableStateClient(config,highAvailabilityServices);
更多信息访问:
Queryable States in ApacheFlink - Implementation
Queryable State Client with 1.3.0-rc0
我的依赖项:
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'