Java之SpringBoot集成redis实现消息队列

x33g5p2x  于2022-07-22 转载在 Java  
字(2.6k)|赞(0)|评价(0)|浏览(546)

一、设置好Redis的配置文件(application.yml)

spring:
  # redis 配置
  redis:
    # 地址
    host: localhost
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 2
    # 密码
    #password: root
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池的最大数据库连接数
        max-active: 8
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms

二、消息接收者实体类(RedisMessage )

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 消息接收者实体类
 * @author
 * @date itbluebox
 */
@Slf4j
@Component
public class RedisMessage implements MessageListener {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 测试代码
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
        String msg = serializer.deserialize(message.getBody());
        System.out.println("接收到的消息是1:" + msg);
    }

}

三、消息队列 订阅者(RedisSubConfig)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * 消息队列  订阅者  redis配置
 * @author
 * @date itbluebox
 */
@Configuration
public class RedisSubConfig {
    /**
     * 创建连接工厂
     * @param connectionFactory
     * @param adapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(
            RedisConnectionFactory connectionFactory, MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(adapter, new PatternTopic("userInfo"));
        container.addMessageListener(adapter, new PatternTopic("org"));
        return container;
    }
    /**
     * @param message
     * @return
     */
    @Bean
    public MessageListenerAdapter adapter(RedisMessage message){
        // onMessage 如果RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称一样
        return new MessageListenerAdapter(message, "onMessage");
    }
}

四、发送消息(消息生产者)

@RestController
public class TestController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage(){
        stringRedisTemplate.convertAndSend("userInfo", "Hello world-userInfo");
        stringRedisTemplate.convertAndSend("org", "Hello world-org");
        return "ok";
    }
}

相关文章