两个线程对资源的访问动作,除了多线程在同一时间都进行读操作时不会引起冲突外,其余的情况都会导致访问的冲突,需要对资源进行同步处理。
| <br>线程<br> | 读 | <br>写<br> |
| <br>读<br> | <br>不冲突<br> | <br>冲突<br> |
| <br>写<br> | <br>冲突<br> | <br>冲突<br> |
本篇实现一个读写分离的锁。
package concurrent.readwritelock;
/**
* @className: Lock
* @description: 定义锁的基本操作
* @date: 2022/4/16
* @author: cakin
*/
public interface Lock {
// 获取显示锁,没有获得锁的线程将被阻塞
void lock() throws InterruptedException;
// 释放获取的锁
void unlock();
}
package concurrent.readwritelock;
/**
* @className: ReadWriteLock
* @description: 创建读写锁,并且提供一些必要查询工作
* @date: 2022/4/16
* @author: cakin
*/
public interface ReadWriteLock {
// 创建读锁
Lock readLock();
// 创建写锁
Lock writeLock();
// 获取当前有多少个线程正在进行写操作,最多只能为1,如果大于0,则 reader 的个数等于0
int getWritingWrites();
// 获取当前有多少个线程正在等待获取写入锁
int getWaitingWrites();
// 获取当前有多少线程正在等待获取 reader 锁,如果大于0,则 writer 的个数等于0
int getReadingReaders();
// 工厂方法,创建读写锁
static ReadWriteLock readWriteLock() {
return new ReadWriteLockImpl();
}
// 工厂方法,创建读写锁,并且传入 preferWrite
static ReadWriteLock readWriteLock(boolean preferWriter) {
return new ReadWriteLockImpl(preferWriter);
}
}
package concurrent.readwritelock;
public class ReadWriteLockImpl implements ReadWriteLock {
// 定义对象锁
private final Object MUTEX = new Object();
// 当前有多少个线程正在写入
private int writingWrites = 0;
// 当前有多少个线程正在等待写入
private int waitingWrites = 0;
// 当前有多少个线程正在 read
private int readingReaders = 0;
// read 和 write 的偏好设置
private boolean preferWriter;
// 默认情况下 perferWriter 为 true
public ReadWriteLockImpl() {
this(true);
}
// 构造器
public ReadWriteLockImpl(boolean preferWriter) {
this.preferWriter = preferWriter;
}
// 创建读锁
@Override
public Lock readLock() {
return new ReadLock(this);
}
// 创建写锁
@Override
public Lock writeLock() {
return new WriteLock(this);
}
// 使写线程数量增加
void incrementWritingWrites() {
this.writingWrites++;
}
// 使等待写入的线程数量增加
void incrementWaitingWrites() {
this.waitingWrites++;
}
// 使读线程的数量增加
void incrementReadingReaders() {
this.readingReaders++;
}
// 使写线程数量减少
void decrementWritingWrites() {
this.writingWrites--;
}
// 使等待写入的线程数量增加
void decrementWaitingWrites() {
this.waitingWrites--;
}
// 使读线程的数量增加
void decrementReadingReaders() {
this.readingReaders--;
}
// 获取当前有多少个线程正在进行写操作
@Override
public int getWritingWrites() {
return this.writingWrites;
}
// 获取当前有多少个线程正在等待获取写入锁
@Override
public int getWaitingWrites() {
return this.waitingWrites;
}
// 获取当前有多少个线程正在进行读操作
@Override
public int getReadingReaders() {
return this.readingReaders;
}
public Object getMUTEX() {
return this.MUTEX;
}
public boolean isPreferWriter() {
return this.preferWriter;
}
// 控制倾向性
public void setPreferWriter(boolean preferWriter) {
this.preferWriter = preferWriter;
}
}
package concurrent.readwritelock;
/**
* @className: ReadLock
* @description: 读锁实现
* @date: 2022/4/16
* @author: cakin
*/
public class ReadLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
public ReadLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
// 使用 Mutex 作为锁
synchronized (readWriteLock.getMUTEX()) {
// 若此时有线程在进行写操作,或者有写线程在等待并且偏写写锁为 true,就会无法获得读锁,只能被挂起
while (readWriteLock.getWritingWrites() > 0 || (readWriteLock.isPreferWriter() && readWriteLock.getWaitingWrites() > 0)) {
readWriteLock.getMUTEX().wait();
}
// 成功获得读锁,并且使 readingReaders 的数量增加
readWriteLock.incrementReadingReaders();
}
}
@Override
public void unlock() {
// 使用 Mutex 作为锁,并且进行同步
synchronized (readWriteLock.getMUTEX()){
// 使得当前 ReadingReaders 的数量减一
readWriteLock.decrementReadingReaders();
// 使得 write 线程获得更多的机会
readWriteLock.setPreferWriter(true);
// 通知唤醒与 mutex 关联 monitor waitset 中的线程
readWriteLock.getMUTEX().notifyAll();
}
}
}
package concurrent.readwritelock;
public class WriteLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
public WriteLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
synchronized (readWriteLock.getMUTEX()) {
try {
// 首先使等待获取写入锁的数字加一
readWriteLock.incrementWaitingWrites();
// 如果此时有其他线程正在进行读操作,或者写操作,那么当前线程被挂起
while (readWriteLock.getReadingReaders() > 0 || readWriteLock.getWritingWrites() > 0) {
readWriteLock.getMUTEX().wait();
}
} finally {
// 成功获取到写锁,使得等待获取写锁的计数器减一
this.readWriteLock.decrementWaitingWrites();
}
// 将正在写入的线程数量加一
readWriteLock.incrementWritingWrites();
}
}
@Override
public void unlock() {
synchronized (readWriteLock.getMUTEX()) {
// 减少正式写入锁的线程计数器
readWriteLock.decrementWritingWrites();
// 将偏好状态修改为false,可以使得读锁被最快速的获得
readWriteLock.setPreferWriter(false);
// 通知唤醒其他在 Mutex monitor waitset 中的线程;
readWriteLock.getMUTEX().notifyAll();
}
}
}
package concurrent.readwritelock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ShareData {
// 定义共享资源
private final List<Character> container = new ArrayList<>();
// 构造 ReadWriteLock
private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
// 创建读取锁
private final Lock readLock = readWriteLock.readLock();
// 创建写入锁
private final Lock writeLock = readWriteLock.writeLock();
private final int length;
public ShareData(int length) {
this.length = length;
for (int i = 0; i < length; i++) {
container.add(i,'c');
}
}
public char[] read() throws InterruptedException{
try {
// 首先使用读锁进行 lock
readLock.lock();
int size = container.size();
char[] newBuffer = new char[size];
for (int i = 0; i < size; i++) {
newBuffer[i] = container.get(i);
}
slowly();
return newBuffer;
} finally {
// 当操作结束之后,将锁释放
readLock.unlock();
}
}
public void write(char c) throws InterruptedException{
try {
// 使用写锁进行 lock
writeLock.lock();
this.container.add(c);
slowly();
} finally {
// 当所有的操作都完成后,对写锁进行释放
writeLock.unlock();
}
}
private void slowly(){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package concurrent.readwritelock;
import static java.lang.Thread.currentThread;
public class Test {
private final static String text = "fjdslkkkkkkkkkkkfsdkfjdsljfd";
public static void main(String[] args) {
// 定义共享数据
final ShareData shareData = new ShareData(50);
// 创建两个线程进行数据写操作
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int index = 0; index < text.length(); index++) {
try {
char c = text.charAt(index);
shareData.write(c);
System.out.println(currentThread() + " writes " + c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 创建 10 个线程进行读操作
for (int i = 0; i < 10; i++) {
new Thread(()->{
while(true){
try {
System.out.println(currentThread()+" read "+new String(shareData.read()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
Thread[Thread-0,5,main] writes f
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccf
Thread[Thread-1,5,main] writes f
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccff
Thread[Thread-0,5,main] writes j
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffj
Thread[Thread-0,5,main] writes d
Thread[Thread-11,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-6,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-10,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-7,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-2,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-5,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-9,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-8,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-3,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-4,5,main] read ccccccccccccccccccccccccccccccccccccccccccccccccccffjd
Thread[Thread-1,5,main] writes j
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/124210006
内容来源于网络,如有侵权,请联系作者删除!