读写锁分离实战

x33g5p2x  于2022-04-17 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(218)

一 点睛

两个线程对资源的访问动作,除了多线程在同一时间都进行读操作时不会引起冲突外,其余的情况都会导致访问的冲突,需要对资源进行同步处理。

| <br>线程<br> | 读 | <br>写<br> |
| <br>读<br> | <br>不冲突<br> | <br>冲突<br> |
| <br>写<br> | <br>冲突<br> | <br>冲突<br> |

本篇实现一个读写分离的锁。

二 实战

1 Lock

package concurrent.readwritelock;

/**
* @className: Lock
* @description: 定义锁的基本操作
* @date: 2022/4/16
* @author: cakin
*/
public interface Lock {
    // 获取显示锁,没有获得锁的线程将被阻塞
    void lock() throws InterruptedException;

    // 释放获取的锁
    void unlock();
}

2 ReadWriteLock

package concurrent.readwritelock;

/**
* @className: ReadWriteLock
* @description: 创建读写锁,并且提供一些必要查询工作
* @date: 2022/4/16
* @author: cakin
*/
public interface ReadWriteLock {
    // 创建读锁
    Lock readLock();

    // 创建写锁
    Lock writeLock();

    // 获取当前有多少个线程正在进行写操作,最多只能为1,如果大于0,则 reader 的个数等于0
    int getWritingWrites();

    // 获取当前有多少个线程正在等待获取写入锁
    int getWaitingWrites();

    // 获取当前有多少线程正在等待获取 reader 锁,如果大于0,则 writer 的个数等于0
    int getReadingReaders();

    // 工厂方法,创建读写锁
    static ReadWriteLock readWriteLock() {
        return new ReadWriteLockImpl();
    }

    // 工厂方法,创建读写锁,并且传入 preferWrite
    static ReadWriteLock readWriteLock(boolean preferWriter) {
        return new ReadWriteLockImpl(preferWriter);
    }
}

3 ReadWriteLockImpl

package concurrent.readwritelock;

public class ReadWriteLockImpl implements ReadWriteLock {
    // 定义对象锁
    private final Object MUTEX = new Object();
    // 当前有多少个线程正在写入
    private int writingWrites = 0;
    // 当前有多少个线程正在等待写入
    private int waitingWrites = 0;
    // 当前有多少个线程正在 read
    private int readingReaders = 0;
    // read 和 write 的偏好设置
    private boolean preferWriter;

    // 默认情况下 perferWriter 为 true
    public ReadWriteLockImpl() {
        this(true);
    }

    // 构造器
    public ReadWriteLockImpl(boolean preferWriter) {
        this.preferWriter = preferWriter;
    }

    // 创建读锁
    @Override
    public Lock readLock() {
        return new ReadLock(this);
    }

    // 创建写锁
    @Override
    public Lock writeLock() {
        return new WriteLock(this);
    }

    // 使写线程数量增加
    void incrementWritingWrites() {
        this.writingWrites++;
    }

    // 使等待写入的线程数量增加
    void incrementWaitingWrites() {
        this.waitingWrites++;
    }

    // 使读线程的数量增加
    void incrementReadingReaders() {
        this.readingReaders++;
    }

    // 使写线程数量减少
    void decrementWritingWrites() {
        this.writingWrites--;
    }

    // 使等待写入的线程数量增加
    void decrementWaitingWrites() {
        this.waitingWrites--;
    }

    // 使读线程的数量增加
    void decrementReadingReaders() {
        this.readingReaders--;
    }

    // 获取当前有多少个线程正在进行写操作
    @Override
    public int getWritingWrites() {
        return this.writingWrites;
    }

    // 获取当前有多少个线程正在等待获取写入锁
    @Override
    public int getWaitingWrites() {
        return this.waitingWrites;
    }

    // 获取当前有多少个线程正在进行读操作
    @Override
    public int getReadingReaders() {
        return this.readingReaders;
    }

    public Object getMUTEX() {
        return this.MUTEX;
    }

    public boolean isPreferWriter() {
        return this.preferWriter;
    }

    // 控制倾向性
    public void setPreferWriter(boolean preferWriter) {
        this.preferWriter = preferWriter;
    }
}

4 ReadLock

package concurrent.readwritelock;

/**
* @className: ReadLock
* @description: 读锁实现
* @date: 2022/4/16
* @author: cakin
*/
public class ReadLock implements Lock {
    private final ReadWriteLockImpl readWriteLock;

    public ReadLock(ReadWriteLockImpl readWriteLock) {
        this.readWriteLock = readWriteLock;
    }

    @Override
    public void lock() throws InterruptedException {
        // 使用 Mutex 作为锁
        synchronized (readWriteLock.getMUTEX()) {
            // 若此时有线程在进行写操作,或者有写线程在等待并且偏写写锁为 true,就会无法获得读锁,只能被挂起
            while (readWriteLock.getWritingWrites() > 0 || (readWriteLock.isPreferWriter() && readWriteLock.getWaitingWrites() > 0)) {
                readWriteLock.getMUTEX().wait();
            }
            // 成功获得读锁,并且使 readingReaders 的数量增加
            readWriteLock.incrementReadingReaders();
        }
    }

    @Override
    public void unlock() {
        // 使用 Mutex 作为锁,并且进行同步
        synchronized (readWriteLock.getMUTEX()){
            // 使得当前 ReadingReaders 的数量减一
            readWriteLock.decrementReadingReaders();
            // 使得 write 线程获得更多的机会
            readWriteLock.setPreferWriter(true);
            // 通知唤醒与 mutex 关联 monitor waitset 中的线程
            readWriteLock.getMUTEX().notifyAll();
        }
    }
}

5 WriteLock

package concurrent.readwritelock;

public class WriteLock implements Lock {
    private final ReadWriteLockImpl readWriteLock;

    public WriteLock(ReadWriteLockImpl readWriteLock) {
        this.readWriteLock = readWriteLock;
    }

    @Override
    public void lock() throws InterruptedException {
        synchronized (readWriteLock.getMUTEX()) {
            try {
                // 首先使等待获取写入锁的数字加一
                readWriteLock.incrementWaitingWrites();
                // 如果此时有其他线程正在进行读操作,或者写操作,那么当前线程被挂起
                while (readWriteLock.getReadingReaders() > 0 || readWriteLock.getWritingWrites() > 0) {
                    readWriteLock.getMUTEX().wait();
                }
            } finally {
                // 成功获取到写锁,使得等待获取写锁的计数器减一
                this.readWriteLock.decrementWaitingWrites();
            }
            // 将正在写入的线程数量加一
            readWriteLock.incrementWritingWrites();
        }
    }

    @Override
    public void unlock() {
        synchronized (readWriteLock.getMUTEX()) {
            // 减少正式写入锁的线程计数器
            readWriteLock.decrementWritingWrites();
            // 将偏好状态修改为false,可以使得读锁被最快速的获得
            readWriteLock.setPreferWriter(false);
            // 通知唤醒其他在 Mutex monitor waitset 中的线程;
            readWriteLock.getMUTEX().notifyAll();
        }
    }
}

6 ShareData

package concurrent.readwritelock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ShareData {
    // 定义共享资源
    private final List<Character> container = new ArrayList<>();
    // 构造 ReadWriteLock
    private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
    // 创建读取锁
    private final Lock readLock = readWriteLock.readLock();
    // 创建写入锁
    private final Lock writeLock = readWriteLock.writeLock();
    private final int length;

    public ShareData(int length) {
        this.length = length;
        for (int i = 0; i < length; i++) {
            container.add(i,'c');
        }
    }

    public char[] read() throws InterruptedException{
        try {
            // 首先使用读锁进行 lock
            readLock.lock();
            int size = container.size();
            char[] newBuffer = new char[size];
            for (int i = 0; i < size; i++) {
                newBuffer[i] = container.get(i);
            }
            slowly();
            return newBuffer;
        } finally {
            // 当操作结束之后,将锁释放
            readLock.unlock();
        }
    }

    public void write(char c) throws InterruptedException{
        try {
            //  使用写锁进行 lock
            writeLock.lock();
            this.container.add(c);
            slowly();
        } finally {
            // 当所有的操作都完成后,对写锁进行释放
            writeLock.unlock();
        }
    }

    private void slowly(){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7 Test

package concurrent.readwritelock;

import static java.lang.Thread.currentThread;

public class Test {
    private final static String text = "fjdslkkkkkkkkkkkfsdkfjdsljfd";

    public static void main(String[] args) {
        // 定义共享数据
        final ShareData shareData = new ShareData(50);
        // 创建两个线程进行数据写操作
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int index = 0; index < text.length(); index++) {
                    try {
                        char c = text.charAt(index);
                        shareData.write(c);
                        System.out.println(currentThread() + " writes " + c);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        // 创建 10 个线程进行读操作
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(true){
                    try {
                        System.out.println(currentThread()+" read "+new String(shareData.read()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

三 测试

Thread[Thread-0,5,main] writes f
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-1,5,main] writes f
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-0,5,main] writes j
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-0,5,main] writes d
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-1,5,main] writes j

相关文章