Perl:读取fifos非阻塞

kzmpq1sx  于 2023-03-09  发布在  Perl
关注(0)|答案(1)|浏览(124)

我最初的www.example.com解决方案将数据副本保存在磁盘上。https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top saves a copy of the data on disk.
现在我已经创建了第二个版本,在内存中缓冲一行。
它可以工作,但需要在启动前连接所有FIFO。

window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *

这不会给出任何输出(因为100未连接):

window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: {1..99}

我尝试使用open '+<'。这解决了上述问题,但现在它不会在EOF处停止。
我该怎么做呢?
最小版本(不支持大行且不回退等待):

#!/usr/bin/perl

use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);

for (@ARGV) {
    open($fh{$_},"<",$_) || die;
    # set fh non blocking($fh{$_});
    my $flags;
    fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

while(keys %fh) {
    for(keys %fh) {
        my($string,$something_read) = non_blocking_read($_);
    print $string;
    }
    # Sleep 1 ms
    select(undef, undef, undef, 1/1000);
}

{
    my %buffer;

    sub non_blocking_read {

        my $file = shift;
        my $in = $fh{$file};
        my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
            return(undef,undef);
            } else {
            # This file is done
            close $in;
            delete $fh{$file};
            my $buf = $buffer{$file};
            delete $buffer{$file};
            return ($buf,1);
            }
        }

        # Find \n for full line
        my $i = (rindex($buffer{$file},"\n")+1);
        if($i) {
            # Return full line
            # Remove full line from $buffer
            return(substr($buffer{$file},0,$i),
               1,substr($buffer{$file},0,$i) = "");
        } else {
            # Something read, but not a full line
            return("",1);
        }
    }
}

完整版本:重要的代码在前40行:剩下的是经过良好测试的代码。

#!/usr/bin/perl

use Symbol qw(gensym);
use IPC::Open3;

for (@ARGV) {
    open($fh{$_},"<",$_) || die;
    set_fh_non_blocking($fh{$_});
}

$ms = 1;
while(keys %fh) {
    for(keys %fh) {
    my($string,$something_read) = non_blocking_read($_);
    if($something_read) {
        $ms = 0.1;
        print $string;
    }
    }
    $ms = exp_usleep($ms);
}

{
    my %buffer;
    my $ms;

    sub non_blocking_read {
    use POSIX qw(:errno_h);

    my $file = shift;
    my $in = $fh{$file};
    my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
    if (!$rv) {
        if($! == EAGAIN) {
        # Would block: Nothing read
        return(undef,undef);
        } else {
        # This file is done
        close $in;
        delete $fh{$file};
        my $buf = $buffer{$file};
        delete $buffer{$file};
        return ($buf,1);
        }
    }

    #### Well-tested code below                                                           

    # Find \n or \r for full line
    my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
        (::rindex64(\$buffer{$file},"\r")+1);
    if($i) {
        # Return full line
        # Remove full line from $buffer
        return(substr($buffer{$file},0,$i),
           1,substr($buffer{$file},0,$i) = "");
    } else {
        # Something read, but not a full line
        return("",1);
    }
    }
}

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
    return rindex($$ref, $match, $pos);
    }

    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
    # The offset is less than a $block_size
    # Set the $offset to 0 and
    # Adjust block_size accordingly
    $block_size = $block_size + $offset;
    $offset = 0;
    }
    while($offset >= 0) {
    $ret = rindex(
        substr($$ref, $offset, $block_size),
        $match);
    if($ret != -1) {
        return $ret + $offset;
    }
    $offset -= ($block_size - $matchlen - 1);
    }
    return -1;
}

sub exp_usleep {
    # Sleep this many milliseconds.
    # Input:
    #   $ms = milliseconds to sleep
    # Returns:
    #   $ms + 10%
    my $ms = shift;
    select(undef, undef, undef, $ms/1000);
    return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
2wnc66cl

2wnc66cl1#

这个解决方案打开一个伪写入器,一旦接收到任何数据,这个伪写入器就会关闭。除了在输入为空时不会结束之外,它做的事情是正确的:

mkfifo {1..100}
parcat {1..100} &
parallel -j2 echo works '>' {} ::: {1..100}

parcat {1..100} &
# Fails (parcat does not exit)
parallel -j2 cat /dev/null '>' {} ::: {1..100}

代码:

#!/usr/bin/perl
    
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;

my $s = IO::Select->new();
my %fhr;
my %fhw;

for (@ARGV) {
    # Open the file with a fake writer that will never write
    open(my $fhw,"+<",$_) || die;
    # Open the file for real
    open(my $fhr,"<",$_) || die;
    set_fh_non_blocking($fhr);
    $s->add($fhr);
    $fhr{$fhr}++;
    $fhw{$fhr}=$fhw;
}

my %buffer;
while(keys %fhr) {
    for my $file ($s->can_read(undef)) {
        my $rv = sysread($file, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
        next;
            } else {
            # This file is done
        $s->remove($file);
            delete $fhr{$file};
        print $buffer{$file};
            delete $buffer{$file};
        # Closing the $file causes it to block
        # close $file;
        next;
            }
        }
    if($fhw{$file}) {
        # We have received data from $file:
        # Close the fake writer
        close $fhw{$file};
        delete $fhw{$file};
    }
    
        # Find \n or \r for full line
        my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
            (::rindex64(\$buffer{$file},"\r")+1);
        if($i) {
            # Print full line
            # Remove full line from $buffer
        print substr($buffer{$file},0,$i);
        substr($buffer{$file},0,$i) = "";
        next;
        } else {
            # Something read, but not a full line
        next;
        }
    }
}

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
        return rindex($$ref, $match, $pos);
    }
    
    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
        # The offset is less than a $block_size
        # Set the $offset to 0 and
        # Adjust block_size accordingly
        $block_size = $block_size + $offset;
        $offset = 0;
    }
    while($offset >= 0) {
        $ret = rindex(
            substr($$ref, $offset, $block_size),
            $match);
        if($ret != -1) {
            return $ret + $offset;
        }
        $offset -= ($block_size - $matchlen - 1);
    }
    return -1;
}

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

它的一个更好的版本现在作为parcat随GNU Parallel一起分发。

相关问题