Event Bus 实战——监控目录变化

x33g5p2x  于2022-05-16 转载在 其他  
字(3.6k)|赞(0)|评价(0)|浏览(216)

一 点睛

有这样一个场景,就是监控某个硬件设备的运行时数据,然后记录到数据库。

可以这样做:该硬件设备在运行的过程中,将一些性能信息等写入特殊的数据文件中,需要做的就是监控文件的变化,读取最后一行数据,然后根据格式将其解析出来插入数据库,实现的思路大致是:在程序第一次启动时,获取文件的最后修改时间,并且做首次解析,然后每隔一段指定时间检查一次文件最后被修改的时间,如果与记录的时间相等则等待下次的采集,否则进行新一轮的采集并且更新时间。

程序实现的思路比较简单,但是上述的实现方式还是存在诸多问题,比如在采集时间间隔内,如果文件发生了 N 次变化,我们只能获取到最后一次,其根本原因是文件的变化不会通知应用程序,所以只能比较笨的主动去轮询。

JDK 从 1.7 版本开始,提供了 WatchService 类,该类可以基于事件通知的方式监控文件或者目录的任何变化,文件的改变相当于每一个事件(Event)的发生,针对不同的事件执行不同的动作,本篇将结合 NIO2.0 中提供的 WatchService 和 之前博客提供的 Event Bus 实现文件目录的监控的功能。

当 WatchService 遇到 EventBus,目录怎样监控呢?下面实现之。

二 实战

1 目录监控

package filechange;

import concurrent.eventbus.EventBus;

import java.nio.file.*;

public class DirectoryTargetMonitor {
    private WatchService watchService;

    private final EventBus eventBus;

    private final Path path;

    private volatile boolean start = false;

    public DirectoryTargetMonitor(final EventBus eventBus, final String path) {
        this(eventBus, path, "");
    }

    public DirectoryTargetMonitor(final EventBus eventBus, String path, final String... morePaths) {
        this.eventBus = eventBus;
        this.path = Paths.get(path, morePaths);
    }

    public void startMonitor() throws Exception {
        this.watchService = FileSystems.getDefault().newWatchService();
        // 为路径注册感兴趣的事件
        this.path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
        System.out.printf("The directory [%s] is monitoring... \n", path);
        this.start = true;
        while (start) {
            WatchKey watchKey = null;

            try {
                // 当有事件发生时会返回对应的 WatchKey
                watchKey = watchService.take();
                watchKey.pollEvents().forEach(event -> {
                    WatchEvent.Kind<?> kind = event.kind();
                    Path path = (Path) event.context();
                    Path child = DirectoryTargetMonitor.this.path.resolve(path);
                    // 提交 FileChangeEvent 到 EventBus
                    eventBus.post(new FileChangeEvent(child, kind));
                });
            } catch (Exception e) {
                this.start = false;
            } finally {
                if (watchKey != null) {
                    watchKey.reset();
                }
            }
        }
    }

    public void stopMonitor() throws Exception {
        System.out.printf("The directory [%s] monitor will be stop...\n", path);
        Thread.currentThread().interrupt();
        this.start = false;
        this.watchService.close();
        System.out.printf("The directory [%s] monitor will be stop done.\n", path);
    }
}

2 文件变化事件

package filechange;

import java.nio.file.Path;
import java.nio.file.WatchEvent;

/**
 * @className: FileChangeEvent
 * @description: 一旦目录发生任何改变,都会提交 FileChangeEvent 事件
 * @date: 2022/5/16
 * @author: cakin
 */
public class FileChangeEvent {
    private final Path path;
    private final WatchEvent.Kind<?> kind;

    public FileChangeEvent(Path path, WatchEvent.Kind<?> kind) {
        this.path = path;
        this.kind = kind;
    }

    public Path getPath() {
        return path;
    }

    public WatchEvent.Kind<?> getKind() {
        return kind;
    }
}

3 监听者

package filechange;

import concurrent.eventbus.Subscribe;

/**
 * @className: FileChangeListener
 * @description: 接受文件目录变化的 Subscriber,也就是当目录发生变化时用来接受事件的方法
 * @date: 2022/5/16
 * @author: cakin
 */
public class FileChangeListener {
    @Subscribe
    public void onChange(FileChangeEvent event) {
        System.out.printf("%s-%s\n", event.getPath(), event.getKind());
    }
}

4 测试

package filechange;

import concurrent.eventbus.AsyncEventBus;
import concurrent.eventbus.EventBus;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        final EventBus eventBus = new AsyncEventBus(executor);
        // 注册
        eventBus.register(new FileChangeListener());
        DirectoryTargetMonitor monitor = new DirectoryTargetMonitor(eventBus, "G:\\monitor");
        monitor.startMonitor();
    }
}

三 测试

The directory [G:\monitor] is monitoring...

G:\monitor\你好.bmp-ENTRY_DELETE

G:\monitor\test.txt-ENTRY_MODIFY

G:\monitor\新建文本文档.txt-ENTRY_CREATE

G:\monitor\新建文本文档.txt-ENTRY_DELETE

G:\monitor\test2.txt-ENTRY_CREATE

相关文章