从头开始学Redisson--------分布式集合之映射(RMap)

x33g5p2x  于2021-12-28 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(674)

一、分布式集合映射(Map)

      这里的分布式RMap类型,主要功能就是(1)实现了JavaSE的Map接口,方便操作(2)具有Redis的Map类型的缓存记忆功能。其底层数据类型就是Redis的hash数据类型。

      与RBucket对象桶的区别,RMap可以把多个对象存放到Map中,而RBucket只能存放一个对象。

      与RList集合的区别。RList就是一个集合,类似于Java中的List,存放的是单列集合。而RMap类似于双列集合,能够存放 key - value 类型的数据。

二、实战

package com.tyzhou.redisson.controller;

import com.tyzhou.Constant;
import com.tyzhou.mail.mapper.MUserMapper;
import com.tyzhou.mail.modol.MUser;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @author zhoutianyu
 * @date 2020/3/22
 * @time 19:44
 */
@RestController
@RequestMapping(value = "redisson/map")
public class MapController {

    private static final Logger LOGGER = LoggerFactory.getLogger(MapController.class);

    @Autowired
    private MUserMapper userMapper;

    @Autowired
    private RedissonClient redisson;

    @PostMapping("/addUser")
    public Integer insert(@RequestBody MUser user) {
        user.setId(null);
        int res = userMapper.insertSelective(user);
        if (res > 0) {
            RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
            map.put(user.getId(), user);
        }
        return user.getId();
    }

    @GetMapping("/getUser")
    public MUser insert(@RequestParam Integer userId) {
        MUser user;
        RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
        user = map.get(userId);
        if (user != null) {
            return user;
        }
        return userMapper.selectByPrimaryKey(userId.longValue());
    }

    @PostMapping("/updateUser")
    public Integer update(@RequestBody MUser user) {

        int res = userMapper.updateByPrimaryKeySelective(user);
        if (res > 0) {
            RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
            map.put(user.getId(), user);
        }
        return user.getId();
    }

    @PostMapping("/deleteUser")
    public MUser delete(@RequestParam Long id) {

        int res = userMapper.deleteByPrimaryKey(id);
        if (res > 0) {
            RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
            return map.remove(id.intValue());
        }
        return null;
    }

}

三、加锁

      既然是分布式的Map,那么就一定会有这种可能:其他客户端或者线程也在同一时刻操作相同的Map。那么这样的话,那么RMap就有可能出现线程安全方面的问题。

      RMap提供了两种字段锁。一种是常见的Lock锁,一种是进阶版的读写锁。

官方技术参考文档:
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.unlock();
}

RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.readLock().unlock();
}

测试一:使用RLock锁,做更新用户与查询用户互斥锁

       我们为更新用户使用加锁操作。根据官方提供的文档,分布式Map "RMap"拥有加锁的能力。

       如下代码。只要调用RMap#getLock 方法就能获得一把 RLock锁。再看代码,在更新用户的操作上增加睡眠10秒钟,模拟更新业务的其他业务,例如消息推送、日志记录。最后就是使用finally语句释放锁。不用担心死锁问题,因为Redisson有"看门狗"机制,它在指定时间后会自动释放锁。

@PostMapping("/updateUser")
    public Integer update(@RequestBody MUser user) {

        int res = userMapper.updateByPrimaryKeySelective(user);
        if (res > 0) {
            RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
            //更新与查询互斥
            RLock rLock = map.getLock(user.getId());
            rLock.lock();
            try {
                LOGGER.info("更新用户:{}开始", user.getId());
                map.put(user.getId(), user);
                Thread.sleep(10000);
                LOGGER.info("更新用户:{}结束", user.getId());
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            } finally {
                rLock.unlock();
            }
        }
        return user.getId();
    }

      再看获取用户的接口代码。同样的,通过RMap#getLock获取一把相同的RLock 锁。调用RLock#lock方法,如果没有获取到锁,那么就会一直阻塞。当然了,这里有待商榷,在实际中可以使用Lock#tryLock防止获取锁时间过长而阻塞的问题。

@GetMapping("/getUser")
    public MUser insert(@RequestParam Integer userId) {
        MUser user;
        RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
        RLock rLock = map.getLock(userId);
        LOGGER.info("查询用户:{}开始", userId);
        rLock.lock();
        try {
            user = map.get(userId);
            if (user != null) {
                LOGGER.info("查询用户:{}结束", userId);
                return user;
            }
        } catch (Exception e) {
            LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
        } finally {
            rLock.unlock();
        }
        LOGGER.info("查询用户:{}结束", userId);
        return userMapper.selectByPrimaryKey(userId.longValue());
    }

      实际效果:

      先更新用户,更新成功后,再查询。这样的话,它们不会出现争抢锁的情况。

      接着,我再测一次在更新用户的途中,再次查询用户。因为更新接口已经抢占了同步锁,所以查询接口必须等待更新接口释放同步锁,也就是在更新用户成功后,查询接口才能查询到用户。

测试二:使用读写锁

       下面测试读写锁。读写锁的特征相信大家都知道。传送门:ReentrantReadWriteLock实现原理探索

        读写锁内部有两把锁,一把读锁,一把写锁。我们从操作的角度来思考问题。如果操作是查询,也就是查询数据库,多个线程之间不会出现相互影响,那么就可以在同一时刻允许多个读线程访问。如果操作互不影响,那么锁就可以被分离。这就是锁分离的思想。

        下面是RMap使用读锁的代码。看代码,首先通过Redisson#getMap拿到RMap,然后通过RMap#getReadWriteLock,拿到读写锁。读写锁里面有两把锁,一把读锁,一把写锁。我们这里先读锁,RReadWriteLock#readLock。使用lock加锁并使用unlock解锁。业务代码里面增加一个睡眠时间5秒,方便验证。

@GetMapping("/getUser")
    public MUser getUser(@RequestParam Integer userId) {
        MUser user;
        RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
        //获取读锁
        RReadWriteLock rLock = map.getReadWriteLock(userId);
        RLock readLock = rLock.readLock();
        readLock.lock();
        try {
            LOGGER.info("查询用户:{}开始", userId);
            user = map.get(userId);
            Thread.sleep(5000);
            if (user != null) {
                LOGGER.info("查询用户:{}结束", userId);
                return user;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }
        LOGGER.info("查询用户:{}结束", userId);
        return userMapper.selectByPrimaryKey(userId.longValue());
    }

       好,看实现效果图片。添加一个用户后,用户也会加入缓存。我在浏览器上多次访问,从日志打印来看,多个请求获取了相同的读锁,读锁之间不会出现互斥现象,所以再日志上打印出来多个线程同时进入同步代码块,打印"查询用户:18开始"。 

       最后测试写锁。读锁与写锁之间互斥,即拿到了写锁以后,同一把读写锁的读锁就不能进入同步代码块。道理很简单,我还在操作过程中,其它线程就应该被阻塞,不然就会出现比如脏读的现象。

       如下代码,先获取到读写锁RMap#getReadWriteLock。然后拿这把读写锁的写锁,ReadWriteLock#writeLock。读写锁的特性就是读写互斥,写写互斥。拿到写锁以后,开始更新用户更新成功后将最新的用户对象存放到缓存RMap中。

@PostMapping("/updateUser")
    public Integer update(@RequestBody MUser user) {

        int res = userMapper.updateByPrimaryKeySelective(user);
        if (res > 0) {
            RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
            //获取写锁
            RReadWriteLock rLock = map.getReadWriteLock(user.getId());
            RLock writeLock = rLock.writeLock();
            try {
                writeLock.lock();
                LOGGER.info("更新用户:{}开始", user.getId());
                map.put(user.getId(), user);
                Thread.sleep(10000);
                LOGGER.info("更新用户:{}结束", user.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
        }
        return user.getId();
    }

    @GetMapping("/getUser")
    public MUser getUser(@RequestParam Integer userId) {
        MUser user;
        RMap<Integer, MUser> map = redisson.getMap(Constant.REDISSON_MAP);
        //获取读锁
        RReadWriteLock rLock = map.getReadWriteLock(userId);
        RLock readLock = rLock.readLock();
        readLock.lock();
        try {
            LOGGER.info("查询用户:{}开始", userId);
            user = map.get(userId);
            if (user != null) {
                LOGGER.info("查询用户:{}结束-----", userId);
                return user;
            }
        } finally {
            readLock.unlock();
        }
        LOGGER.info("查询用户:{}结束", userId);
        return userMapper.selectByPrimaryKey(userId.longValue());
    }

      从现象来看,我先发送的是更新请求,获取的是写锁。与此同时,我再在浏览器上发送一个读请求,拿到的是读锁。但是因为同一把读写锁的读锁与写锁之间互斥,先拿到到的是写锁,所以读请求必须要等到写请求释放写锁后才能请求到同步代码。


读锁与写锁互斥

相关文章