并发-两个线程从不同的服务器查询多个数据库并同步比较每个记录

czfnxgou  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(201)

我想让两个线程查询(jdbc)两个表(来自不同的服务器/数据库,但相关)以获得有序的输出,然后比较它们或逐个记录地应用一些逻辑。
表的大小可能非常大,所以我认为使用线程将是以最少的占用空间完成此任务的最有效方法。
例子:
thread1—按1排序的查询表server1.database1.schema1.tablea;
thread2-查询表server2.database2.schema2.tableb,其中[conditions/logics related to a]按1排序;
在两个线程中对结果集中的每条记录进行同步,并应用比较或数据逻辑。
例如:resultset from thread1=[1,2,3,4,5]
来自thread2的结果集=[2,4,6,8,10]
我希望能够同步每个索引(0…4),并比较它们。假设thread1.resultset[0]=thread2.resultset[0]/2。
这意味着:
1 = 2/2
2 = 4/2
等。。。
这是我到目前为止得到的,基于我在研究中得到的另一个答案。我使用atomicinteger来同步resultset迭代。

//Main class
public class App {
    public static void main(String[] args) {
        try {
            ReaderThread t1 = new ReaderThread();
            ReaderThread t2 = new ReaderThread();
            List<ReaderThread> list = new ArrayList<ReaderThread>();
            list.add(t1);
            list.add(t2);

            HelperThread helperThread = new HelperThread(list);
            helperThread.start();

            t1.setName("Reader1");
            t2.setName("Reader2");
            t1.start();
            t2.start();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

//Database ReaderThread 
public class ReaderThread extends Thread {

    private DatabaseAccessLayer dal = new DatabaseAccessLayer(); //access layer to instantiate connection, statement and execute query and return ResultSet
    private ResultSet rs;
    private final Object hold = new Object();
    private final AtomicInteger lineCount = new AtomicInteger(0);
    private String currentLine;

    public ReaderThread() throws SQLException {
        this.rs = dal.executeStatement(); //execute SQL query on instantiation and get the resultset
    }

    @Override
    public void run() {
        synchronized (hold) {
            try {
                while (rs.next()) {
                    currentLine = rs.getString(1) + rs.getString(2) + rs.getString(3) + rs.getString(4)
                            + rs.getString(5) + rs.getString(5);
                    lineCount.getAndIncrement();
                    System.out.println(this.getName() + " ||| " + currentLine);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }

        }
    }

    public void hold () throws InterruptedException {
        this.hold.wait();
    }

    public void release() {
        this.hold.notify();
    }

    public boolean isLocked() {
        return getState().equals(State.WAITING);
    }

    public Object getHold() {
        return hold;
    }

    public AtomicInteger getLineCount() {
        return lineCount;
    }

    public String getCurrentLine() {
        return currentLine;
    }

}

// THe helper class which look at two threads and determine lock conditions and subsequence logic
public class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
        while (true) {
            threads.forEach(t -> {
                try {
                    int r1 = 0;
                    int r2 = 0;
                    //======== lock and synchronize logic here =========
                    if (t.getName().equals("Reader1")) r1 = t.getLineCount().get();
                    if (t.getName().equals("Reader2")) r2 = t.getLineCount().get();
                    if (t.getName().equals("Reader1") && r1 == r2) t.hold();
                    if (t.getName().equals("Reader2") && r2 == r1) t.hold();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            if (threads.stream().allMatch(ReaderThread::isLocked)) {
                System.out.println("next line:");

                threads.forEach(t -> {
                    synchronized (t.getLock()) {
                        System.out.println(t.getCurrentLine());
                        t.release();
                    }
                });

                System.out.println("\n");
            }
        }

    }

    public HelperThread(List<ReaderThread> threads) {
        this.threads = threads;
    }
}

上面的代码能够在表上并发执行查询,并打印出每个表的结果集。但是,锁定/保持逻辑不起作用。当两个线程中的atomicinteger变量相同时,我尝试将线程搁置。在我的代码中,这意味着它将逐个遍历结果集。对于每一个线程,atomicinteger变量都递增,并将等待另一个线程的atomicinteger变量达到相同的值。然后发生比较逻辑,然后释放两个线程继续。
我不确定atomicinteger在这里的用法是否正确。
任何建议都非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题