如何使用trident存储redis散列密钥

6jjcrrmo  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(293)

我正在从事一个实时数据项目,目前正在使用trident redis库https://github.com/kstyrc/trident-redis 存储带有计数的键集。我想存储一些更高级的细分,包括每个键的纬度和经度值。使用命令行上的redis,我可以使用:

HSET 123 lat "40"
HSET 123 lon "-37"

得到

1) "lat"
2) "40"
3) "lon"
4) "-37"

具有

HGETALL 123

如何使用trident redis实现同样的效果?我的拓扑结构当前如下所示:

public class TridentEventTopology {

    public static final StormTopology buildTopology(LocalDRPC drpc, StateFactory state) throws IOException {

        final int batchSize = 500;
        final BatchSpout spout = new BatchSpout(batchSize);

        final TridentTopology topology = new TridentTopology();
        TridentState batchedCounts = topology.newStream("spout", spout)
                                               .groupBy(new Fields("id"))
                                               .persistentAggregate(state, new Count(), new Fields("count"));

        topology.newDRPCStream("stream", drpc)
                .groupBy(new Fields("args"))
                .stateQuery(batchedCounts, new Fields("args"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        return topology.build();
    }

    public static final void executeTopology() throws IOException {

        final StateFactory redis = RedisState.nonTransactional(new InetSocketAddress("localhost", 6379));
        final Config conf = new Config();
        final LocalDRPC drpc = new LocalDRPC();
        final LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, buildTopology(drpc, redis));
    }
}
s71maibg

s71maibg1#

我也遇到过同样的情况。我认为您可以修改redistate的multiput和multiget函数,在该函数中您可以hget或hput所需的键和字段。实际上,它已经为一个固定的哈希键实现了。因此,您需要做的就是从multiget或multiput键参数中提取字段。

相关问题