filereader—如何使用storm将输出写入文件

mpgws1up  于 2021-06-24  发布在  Storm
关注(0)|答案(3)|浏览(351)

嗨,我创建了一个风暴程序读取文本文件 input.txt 逐行使用spout类并将这些元组发送到bolt,在bolt类中,我想将元组写入 output.txt . 我几乎做了,但问题是风暴写了很多次在输出文件。看看我的眼睛 input.txt 以及 output.txt 文件
输入文件

Kaveen,bigdata,29
varadha,cshart,30
vignesh,winrt,21

输出。

varadha,cshart,30
vignesh,winrt,21
Kaveen,bigdata,29
varadha,cshart,30
Kaveen,bigdata,29
Kaveen,bigdata,29
vignesh,winrt,21

我想把输出文件写得和inputfile完全一样,但顺序不是问题。我该怎么做请帮帮我。

8nuwlpux

8nuwlpux1#

看起来有多个spout示例正在读取输入文件,导致输出中出现重复记录。

kh212irz

kh212irz2#

在将内容写入 output.txt 文件,打开 output.txt 在附加模式下。每当发生write语句时,只需将内容附加到此语句 output.txt 通过检查文件中的重复记录,就可以实现这一点。

e37o9pze

e37o9pze3#

我也面临同样的问题,所以找到了以下解决方法。
在spout中,当您读取文件时,在open()方法中创建filereader对象,因为此时它会初始化worker节点的reader对象。并在nexttuple()方法中使用该对象。
(带一个喷嘴和一个螺栓)
以下是open()和nexttuple方法的代码:

public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
            try {
            this.context = context;
            File file = new File(filename);
            this.fileReader = new FileReader(file);
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+ filename + "]");
        }
        this.collector = collector;
    }

    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Do nothing
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                /**
                 * By each line emit a new value with the line as a their
                 */
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

输出:

Kaveen,bigdata,29
varadha,cshart,30
vignesh,winrt,21

另一个问题可能是:
您可能正在为spout运行多个示例,这可能会导致流的重复发射,或者文件是以append模式写入的。

相关问题