无法将Flink连接到Elasticache Redis集群- FlinkJedisClusterConfig无法解析CLUSTER NODES响应中的cport

oyxsuwqo  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(154)

How can I use an Elasticache Redis Replication Group as a data sink in Flink for Kinesis Analytics?
I have created an Elasticache Redis Replication Group, and would like to compute something in Flink and store the results in this group.
My Java code,

import org.apache.flink.streaming.api.datastream.DataStream;                              
import org.apache.flink.streaming.api.datastream.DataStreamSink;                          
import org.apache.flink.streaming.connectors.redis.RedisSink;                             
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;    
import java.net.InetSocketAddress;
import java.util.Set;
...

var endpoint = "foo.bar.clustercfg.usw2.cache.amazonaws.com";
var port = 6379;
var node = new InetSocketAddress(endpoint, port);
var jedisConfig = new FlinkJedisClusterConfig.Builder().setNodes(Set.of(node))
                                                       .build();
var redisMapper = new MyRedisMapper();
var redisSink = new RedisSink<>(jedisConfig, redisMapper);

This gives me the following error:

java.lang.NumberFormatException: For input string: "6379@1122"
    at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.base/java.lang.Integer.parseInt(Integer.java:652)
    at java.base/java.lang.Integer.valueOf(Integer.java:983)
    at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39)
    at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14)
    at redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50)
    at redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39)
    at redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28)
    at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21)
    at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:16)
    at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:39)
    at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:45)

This occurs while trying to parse the response of CLUSTER NODES. The ip:port@cport is expected as part of the response (see https://redis.io/commands/cluster-nodes/ ) but Jedis is unable to parse this.
Am I doing something wrong here, or is this a bug in Jedis?

shyt4zoc

shyt4zoc1#

After a little digging I found that this is a bug which affects Jedis 2.8 and earlier when using Redis 4.0 or later. https://github.com/redis/jedis/issues/1958
My Redis cluster is running 6.2.6, and my Apache Flink is 1.13, which is old but is the newest version currently supported by AWS.
To solve this issue, I had to upgrade Jedis to the latest 2.x version so that this bug was fixed but it was still compatible with the Flink 1.13 libraries. Upgrading Jedis to a 3.x or 4.x version broke Flink.

<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.10.2</version>
</dependency>

相关问题