> fuse [长度; splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> 长度;-6ren">
gpt4 book ai didi

f# - 并行流水线

转载 作者:行者123 更新时间:2023-12-04 07:47:06 25 4
gpt4 key购买 nike

(fileNameToCharStream "大文件"
|>> fuse [长度;
splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> 长度;
splitBy (fun x -> x = '\n') keepEmpty |>> 长度;
])
(*fuse“融合”三个函数同时运行*)
|> run 2(*强制在两个线程上并行运行*)
|> (有趣 [num_chars; num_words; num_lines] ->
printfn "%d %d %d"
num_chars num_words, num_lines))

我想让这段代码按以下方式工作:
将原始流正好在中间一分为二;然后
对于每一半运行一个单独的计算
计算 3 件事:长度(即字符数),
字数,行数。
但是,我不想有问题,如果
我错误地 split 了一个词。这必须是
已搞定。该文件应该只读一次。

我应该如何对指定的函数和运算符 |>> 进行编程?
是否可以?

最佳答案

看起来你的要求有点高。我将让您自行解决字符串操作,但我将向您展示如何定义一个并行执行一系列操作的运算符。

第 1 步:写一个 fuse功能

您的 fuse 函数似乎使用多个函数映射单个输入,这很容易编写如下:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]

请注意,所有映射函数都需要具有相同的类型。

第 2 步:定义运算符以并行执行函数

标准的并行映射函数可以写成如下:
//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
seq [for a in l -> async { return f a } ]
|> Async.Parallel
|> Async.RunSynchronously

据我所知, Async.Parallel将并行执行异步操作,其中在任何给定时间执行的并行任务数等于机器上的内核数(如果我错了,有人可以纠正我)。所以在双核机器上,当这个函数被调用时,我们最多应该有 2 个线程在我的机器上运行。这是一件好事,因为我们不希望通过每个内核运行一个以上的线程来提高速度(实际上,额外的上下文切换可能会减慢速度)。

我们可以定义一个运算符 |>>pmap 方面和 fuse :
//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input

所以 |>>运算符接受一堆输入并使用许多不同的输出映射它们。到目前为止,如果我们把所有这些放在一起,我们得到以下(在 fsi 中):
> let countOccurrences compareChar source =
source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
testData
|>> [length; countOccurrences 'J'; countOccurrences 'o'];;

val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
[|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]
testOutput包含两个元素,这两个元素都是并行计算的。

第 3 步:将元素聚合为单个输出

好的,现在我们有由数组中的每个元素表示的部分结果,我们希望将我们的部分结果合并到一个聚合中。我假设数组中的每个元素都应该合并相同的函数,因为输入中的每个元素都具有相同的数据类型。

这是我为这项工作编写的一个非常丑陋的函数:
> let reduceMany f input =
input
|> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list

> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]
reduceMany接受 n 长度序列的序列,并返回 n 长度数组作为输出。如果你能想到一个更好的方法来编写这个函数,请成为我的客人:)

要解码上面的输出:
  • 48 = 我的两个输入字符串的长度总和。请注意,原始字符串是 49 个字符,但将其拆分为“|”每个“|”消耗一个字符。
  • 1 = 我输入中所有 'J' 实例的总和
  • 4 = 'O' 的所有实例的总和。

  • 第 4 步:将所有内容放在一起
    let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

    let fuse functionList input = [ for f in functionList -> f input]

    let (|>>) input functionList = pmap (fuse functionList) input

    let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])

    let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

    let length (s : string) = s.Length

    let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
    let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o']
    |> reduceMany (+)

    关于f# - 并行流水线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1495861/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com