Redis——Lettuce基本使用

x33g5p2x  于2022-05-05 转载在 Redis  
字(5.9k)|赞(0)|评价(0)|浏览(574)

Lettuce比Jedis好的点之一就是有官网:Lettuce

Lettuce的使用由4块组成:

  1. URI:定义连接信息;
  2. Client:redis客户端,集群使用专用的RedisClusterClient;
  3. Connection:redis连接,有多种类型(单机,哨兵,集群,订阅发布);
  4. Command:用户操作api,基本包含了redis全部命令;

使用流程也很简单:使用指定的URI创建Client,使用Client创建Connection,使用Connection创建Command,然后就可以操作Command的api了,最后关闭连接关闭客户端;

Command有三种类型:

  1. sync:同步,跟jedis差不多
  2. async:异步
  3. reactive:响应式

RedisURI的创建有3种方式:

  1. 使用字符串:RedisURI.create("redis://localhost/");
  2. 使用builder:RedisURI.Builder.redis("localhost", 6379).auth("password").database(1).build();
  3. 使用new对象:new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

另外,使用字符串的时候几种不同的语法:

  • 单机:redis://[[username:]password@]host:port][/database][?[timeout=timeout[d|h|m|s|ms|us|ns]]
  • 单机(ssl):rediss://[[username:]password@]host:port][/database][?[timeout=timeout[d|h|m|s|ms|us|ns]]
  • 单机(Unix Domain Sockets):redis-socket://[[username:]password@]path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&database=database]]
  • 哨兵:redis-sentinel://[[username:]password@]host1[:port1][,host2[:port2]],hostN[:portN]][/database][?[timeout=timeout[d|h|m|s|ms|us|ns]][&sentinelMasterId=sentinelMasterId]

超时时间单位:

  • d 天
  • h 小时
  • m 分钟
  • s 秒
  • ms 毫秒
  • us 微秒
  • ns 纳秒

简单Demo:(使用的redis版本为6.2.6,Lettuce为6.1.8)

maven导入

<dependency>
	    <groupId>io.lettuce</groupId>
	    <artifactId>lettuce-core</artifactId>
	    <version>6.1.8.RELEASE</version>
	</dependency>

测试代码

/**
 * 2022年4月25日上午11:10:59
 */
package testlettuce;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.push.PushListener;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.lettuce.core.resource.Delay;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author XWF
 *
 */
public class TestLettuce {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
//		RedisURI uri = RedisURI.create("redis://default:123456@192.168.1.31:6379/0?timeout=5s");
		
//		RedisURI uri = new RedisURI("192.168.1.31", 6379, Duration.ofSeconds(5));
//		uri.setPassword("123456".toCharArray());
		
		RedisURI uri = RedisURI.builder()
				.withHost("192.168.1.31")
				.withPort(6379)
				.withAuthentication("default", "123456")
				.build();
		ClientResources resources = DefaultClientResources.builder()
                .ioThreadPoolSize(4)	//设置I/O线程池大小(默认cpu数)仅在没有提供eventLoopGroupProvider时有效
                .computationThreadPoolSize(4)	//设置用于计算的任务线程数(默认cpu数)仅在没有提供eventExecutorGroup时有效
//                .reconnectDelay(Delay.constant(Duration.ofSeconds(10)))	//设置无状态尝试重连延迟,默认延迟上限30s
                .build();
		RedisClient client = RedisClient.create(resources, uri);
		ClientOptions options = ClientOptions.builder()
				.autoReconnect(true)	//设置自动重连
				.pingBeforeActivateConnection(true)	//激活连接前执行PING命令
//				.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(5)))	//命令超时
				.build();
		client.setOptions(options);
		client.setDefaultTimeout(Duration.ofSeconds(3));	//为客户端创建的连接设置默认超时时间,适用于尝试连接和非阻塞命令
		StatefulRedisConnection<String, String> conn = client.connect();
		
		System.out.println("==sync==");
		RedisCommands<String, String> syncCmd = conn.sync();
		System.out.println(syncCmd.set("a", "aaaa"));
		String geta = syncCmd.get("a");
		System.out.println(geta);
		
		System.out.println("==async==");
		RedisAsyncCommands<String, String> asyncCmd = conn.async();
		RedisFuture<String> setb = asyncCmd.set("b", "bbbb");
		try {
			System.out.println(setb.get(1, TimeUnit.SECONDS));
		} catch (InterruptedException | ExecutionException | TimeoutException e1) {
			e1.printStackTrace();
		}
		RedisFuture<String> getb = asyncCmd.get("b");
//		a2.thenAccept(x -> {
//			System.out.println(x + x.length());
//		});
		getb.whenCompleteAsync((x, t) -> {
			System.out.println(x + t);
		});
		
		System.out.println("==reactive==");
		RedisReactiveCommands<String, String> reactiveCmd = conn.reactive();
		Mono<String> setc = reactiveCmd.set("c", "cccc");
		System.out.println(setc.block());
		Mono<String> getc = reactiveCmd.get("c");
		getc.subscribe(System.out::println);
		Flux<String> keys = reactiveCmd.keys("*");
		keys.subscribe(System.out::println);
		
		try {Thread.sleep(500);} catch (InterruptedException e) {}
		
		System.out.println("==pubsub==");
		StatefulRedisPubSubConnection<String, String> pubsubConn = client.connectPubSub();
		pubsubConn.addListener(new RedisPubSubListener<String, String>() {
			@Override
			public void unsubscribed(String channel, long count) {
				System.out.println("[unsubscribed]" + channel);
			}
			@Override
			public void subscribed(String channel, long count) {
				System.out.println("[subscribed]" + channel);
			}
			@Override
			public void punsubscribed(String pattern, long count) {
			}
			@Override
			public void psubscribed(String pattern, long count) {
			}
			@Override
			public void message(String pattern, String channel, String message) {
			}
			@Override
			public void message(String channel, String message) {
				System.out.println("[message]" + channel + " -> " + message);
			}
		});
		RedisPubSubAsyncCommands<String, String> pubsubCmd = pubsubConn.async();
		pubsubCmd.subscribe("CH");
		pubsubCmd.subscribe("CH2");
		pubsubCmd.publish("CH", "helloworld");
		pubsubCmd.publish("CH2", "HELLOWORLD");
		pubsubCmd.unsubscribe("CH");
		pubsubCmd.unsubscribe("CH2");
		
		try {Thread.sleep(500);} catch (InterruptedException e) {}
	}

}

运行结果

参考:Lettuce Reference Guide

相关文章