package concurrent.activeobjects;
public interface Result {
Object getResultValue();
}
package concurrent.activeobjects;
public class RealResult implements Result {
private final Object result;
public RealResult(Object reslut) {
this.result = reslut;
}
@Override
public Object getResultValue() {
return result;
}
}
package concurrent.activeobjects;
public class FutureResult implements Result {
private Result result;
private boolean ready = false;
public synchronized void setResult(Result result) {
this.result = result;
this.ready = true;
this.notifyAll();
}
@Override
public synchronized Object getResultValue() {
while (!ready) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return this.result.getResultValue();
}
}
package concurrent.activeobjects;
/**
* @className: MethodRequest
* @description: 对应 ActiveObject 的每一个方法
* @date: 2022/5/3
* @author: cakin
*/
public abstract class MethodRequest {
protected final Servant servant;
protected final FutureResult futureResult;
public MethodRequest(Servant servant, FutureResult futureResult) {
this.servant = servant;
this.futureResult = futureResult;
}
public abstract void execute();
}
package concurrent.activeobjects;
public class MakeStringRequest extends MethodRequest {
private final int count;
private final char fillchar;
public MakeStringRequest(Servant servant, FutureResult futureResult, int count, char fillChar) {
super(servant, futureResult);
this.count = count;
this.fillchar = fillChar;
}
@Override
public void execute() {
Result result = servant.makeString(count, fillchar);
futureResult.setResult(result);
}
}
package concurrent.activeobjects;
public class DisplayStringRequest extends MethodRequest {
private final String text;
public DisplayStringRequest(Servant servant, final String text) {
super(servant, null);
this.text = text;
}
@Override
public void execute() {
this.servant.displayString(text);
}
}
package concurrent.activeobjects;
import java.util.LinkedList;
public class ActivationQueue {
private final static int MAX_METHOD_REQUEST_QUEUE_SIZE = 100;
private final LinkedList<MethodRequest> methodQueue;
public ActivationQueue() {
methodQueue = new LinkedList<>();
}
public synchronized void put(MethodRequest request) {
while (methodQueue.size() >= MAX_METHOD_REQUEST_QUEUE_SIZE) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.methodQueue.addLast(request);
this.notifyAll();
}
public synchronized MethodRequest take() {
while (methodQueue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
MethodRequest methodRequest = methodQueue.removeFirst();
this.notifyAll();
return methodRequest;
}
}
package concurrent.activeobjects;
public class SchedulerThread extends Thread {
private final ActivationQueue activationQueue;
public SchedulerThread(ActivationQueue activationQueue) {
this.activationQueue = activationQueue;
}
public void invoke(MethodRequest request) {
this.activationQueue.put(request);
}
@Override
public void run() {
while (true) {
activationQueue.take().execute();
}
}
}
package concurrent.activeobjects;
/**
* @className: ActiveObject
* @description: 接受异步消息的主动对象
* @date: 2022/5/3
* @author: cakin
*/
public interface ActiveObject {
Result makeString(int count, char fillChar);
void displayString(String text);
}
package concurrent.activeobjects;
import java.util.concurrent.TimeUnit;
class Servant implements ActiveObject {
@Override
public Result makeString(int count, char fillChar) {
char[] buf = new char[count];
for (int i = 0; i < count; i++) {
buf[i] = fillChar;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return new RealResult(new String(buf));
}
@Override
public void displayString(String text) {
System.out.println("Display is " + text);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package concurrent.activeobjects;
class ActiveObjectProxy implements ActiveObject {
private final SchedulerThread schedulerThread;
private final Servant servant;
public ActiveObjectProxy(SchedulerThread schedulerThread, Servant servant) {
this.schedulerThread = schedulerThread;
this.servant = servant;
}
@Override
public Result makeString(int count, char fillChar) {
FutureResult future = new FutureResult();
schedulerThread.invoke(new MakeStringRequest(servant, future, count, fillChar));
return future;
}
@Override
public void displayString(String text) {
schedulerThread.invoke(new DisplayStringRequest(servant, text));
}
}
package concurrent.activeobjects;
public final class ActiveObjectFactory {
public ActiveObjectFactory() {
}
public static ActiveObject createAcitveObject() {
Servant servant = new Servant();
ActivationQueue queue = new ActivationQueue();
SchedulerThread schedulerThread = new SchedulerThread(queue);
ActiveObjectProxy proxy = new ActiveObjectProxy(schedulerThread, servant);
schedulerThread.start();
return proxy;
}
}
package concurrent.activeobjects;
public class DisplayClientThread extends Thread {
private final ActiveObject activeObject;
public DisplayClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
String text = Thread.currentThread().getName() + "=>" + i;
activeObject.displayString(text);
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package concurrent.activeobjects;
public class MakeClientThread extends Thread {
private final ActiveObject activeObject;
private final char fillChar;
public MakeClientThread(ActiveObject activeObject, String name) {
super(name);
this.activeObject = activeObject;
this.fillChar = name.charAt(0);
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
Result result = activeObject.makeString(i + 1, fillChar);
Thread.sleep(20);
Object value = (String) result.getResultValue();
System.out.println(Thread.currentThread().getName() + ": value " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package concurrent.activeobjects;
public class Test {
public static void main(String[] args) {
ActiveObject activeObject = ActiveObjectFactory.createAcitveObject();
new MakeClientThread(activeObject, "Alice").start();
new MakeClientThread(activeObject, "Bobby").start();
new DisplayClientThread("Chris", activeObject).start();
}
}
Alice: value A
Bobby: value B
Display is Chris=>0
Alice: value AA
Display is Chris=>1
Bobby: value BB
Display is Chris=>2
Display is Chris=>3
Display is Chris=>4
Display is Chris=>5
Alice: value AAA
Display is Chris=>6
Bobby: value BBB
Display is Chris=>7
Display is Chris=>8
Display is Chris=>9
Display is Chris=>10
Display is Chris=>11
Display is Chris=>12
Display is Chris=>13
Display is Chris=>14
Display is Chris=>15
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/124555365
内容来源于网络,如有侵权,请联系作者删除!