haskell 用于将数据处理表示为流水线的框架

yhxst69z  于 2023-02-16  发布在  其他
关注(0)|答案(5)|浏览(130)

大多数数据处理都可以想象为一个组件的管道,一个组件的输出会输入到另一个组件的输入中。典型的处理管道包括:

reader | handler | writer

作为开始讨论的陪衬,让我们考虑该管道的面向对象实现,其中每个段都是一个对象。handler对象包含对readerwriter对象的引用,并具有run方法,如下所示:

define handler.run:
  while (reader.has_next) {
    data = reader.next
    output = ...some function of data...
    writer.put(output)
  }

示意性的相关性为:

reader <- handler -> writer

现在假设我想在读取器和处理程序之间插入一个新的管道段:

reader | tweaker | handler | writer

同样,在此OO实现中,tweaker将是reader对象的 Package 器,并且tweaker方法可能类似于(在某些伪命令式代码中):

define tweaker.has_next:
  return reader.has_next

define tweaker.next:
  value = reader.next
  result = ...some function of value...
  return result

我发现这不是一个很容易组合的抽象概念,有些问题是:

  1. tweaker只能用在handler的左边,也就是说,我不能使用tweaker的上述实现来形成此管道:
    阅读器|处理程序|微调器|作家
    1.我想利用管道的关联属性,这样这个管道:
    阅读器|处理程序|作家
    可以表示为:
reader | p

其中p是管道handler | writer。在此OO实现中,我必须部分示例化handler对象
1.在某种程度上重述(1),对象必须知道它们是"推"还是"拉"数据。
我正在寻找一个框架(不一定是面向对象的)来创建数据处理管道,以解决这些问题。
我用Haskellfunctional programming标记它,因为我觉得函数式编程概念在这里可能有用。
作为一个目标,能够创建这样的管道将是很好的:

handler1
                   /          \
reader | partition              writer
                   \          /
                     handler2

从某种Angular 来看,Unix shell管道通过以下实现决策解决了许多这样的问题:
1.管道组件在单独的进程中异步运行
1.管道对象在"推送器"和"拉取器"之间传递数据;即,它们阻止写数据太快的写入器和试图读数据太快的读取器。
1.使用特殊连接器<>将无源组件(即文件)连接到管道
我对代理之间不使用线程或消息传递的方法特别感兴趣。也许这是最好的方法,但如果可能的话,我希望避免线程。
谢谢!

t3irkdon

t3irkdon1#

是的,arrows几乎肯定是你要找的人。
我怀疑你是相当新的Haskell,只是根据你在你的问题中说的东西的种类。箭头可能会看起来相当抽象,特别是如果你正在寻找的是一个“框架”。我知道我花了一段时间才真正摸索到箭头是怎么回事。
所以你可能会看着那一页说“是的,那看起来像我想要的”,然后发现自己相当迷失在如何开始使用箭头来解决问题上。所以这里有一点指导,这样你就知道你在看什么了。
箭头解决不了问题。相反,它们给予你一种语言,你可以用它来表达你的问题,你可能会发现一些预定义的箭头就可以完成这项工作--也许是一些kleisli箭头--但是在一天结束的时候,你会想要 * 实现 * 一个箭头(预定义的只是给予你简单的方法来实现它们),它表达了你所指的“数据处理器”。作为一个几乎微不足道的例子,假设你想用简单的函数来实现你的数据处理器,你可以写:

newtype Proc a b = Proc { unProc :: a -> b }

-- I believe Arrow has recently become a subclass of Category, so assuming that.

instance Category Proc where
    id = Proc (\x -> x)
    Proc f . Proc g = Proc (\x -> f (g x))

instance Arrow Proc where
    arr f = Proc f
    first (Proc f) = Proc (\(x,y) -> (f x, y))

这就给了你使用各种箭头组合符(***)(&&&)(>>>)等的机制,以及箭头符号,如果你在做复杂的事情,这是相当不错的。所以,正如丹尼尔Fischer在评论中指出的,你在问题中描述的管道可以组成为:

reader >>> partition >>> (handler1 *** handler2) >>> writer

但最酷的是,处理器的含义由您决定,您可以使用不同的处理器类型,以类似的方式实现您提到的每个处理器分叉线程:

newtype Proc' a b = Proc (Source a -> Sink b -> IO ())

然后适当地实现组合子。
这就是你所看到的:这是一个讨论组合过程的词汇表,其中有一点代码可以重用,但主要是在您实现这些组合子以定义在您的领域中有用的处理器时,帮助指导您的思维。
我的第一个重要的Haskell项目之一是实现一个arrow for quantum entanglement;那个项目是让我真正开始理解Haskell思维方式的项目,是我编程生涯的一个重大转折点。也许你的这个项目也会对你有同样的作用?:-)

watbbzwu

watbbzwu2#

多亏了惰性求值,我们可以在Haskell中用普通的函数组合来表示管道。下面是一个计算文件中一行的最大长度的例子:

main = interact (show . maximum . map length . lines)

这里的一切都是普通的函数,比如

lines :: String -> [String]

但是由于延迟求值,这些函数只能增量地处理输入,并且只在需要时才处理,就像UNIX管道一样。

nbewdwxp

nbewdwxp3#

Haskell的enumerator package是一个很好的框架,它定义了三种类型的对象:
1.以块为单位生成数据的枚举器。
1.使用数据块并在使用足够的数据块后返回值的迭代对象。
1.位于管道中间的枚举对象。它们消耗块并生成块,可能会产生副作用。
这三种类型的对象组成了一个流处理管道,您甚至可以在一个管道中拥有多个枚举数和迭代对象(当一个完成后,下一个就会取而代之)从头写出这些对象中的一个可能会很复杂,但是有很多组合子可以用来把常规函数变成数据流处理器,比如,此管道从stdin读取所有字符,使用函数toUpper将其转换为大写,然后将其写入stdout:

ET.enumHandle stdin $$ ET.map toUpper =$ ET.iterHandle stdout

其中模块Data.Enumerator.Text已作为ET导入。

nbewdwxp

nbewdwxp4#

Yesod框架以conduit包的形式使用Haskell管道库。

ohtdti5x

ohtdti5x5#

在Java中,Spring Reactor和Apache Camel做了你想做的事情,正如其他人提到的haskell,让我介绍一下非haskell语言。
例如,在Apache Camel中,数据以管道方式处理,由Exchange(因为它主要用于消息处理,所以这个术语)携带,在整个过程中需要访问的属性,您将它们存储到Exchange的属性中(Map〈String,Object〉)。该过程是一个Route,您可以在其中定义几个子路由,将一些步骤分组为一个步骤,并给予它命名。
Reactor是Java中的React式编程模型,在单个线程上处理任务,常见的返回类型是Mono(单个对象)或Flux(对象的集合),它使用管道方式。
一开始,从基于堆栈的编码转变为流水线方式并不容易,但一段时间后,您会发现可能很难再回头。

相关问题