val hello : obj
Full name: CDocument.hello
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
Full name: Microsoft.FSharp.Collections.List.map
module Array
from Microsoft.FSharp.Collections
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Array.sum
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []
Full name: Microsoft.FSharp.Collections.Array.map
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
MBrace
large-scale programming with F#
About Me
- Gian Ntzik (aka Jan Dzik)
- @anirothan
- Imperial College, Nessos
MBrace: A Programming Model
- Large-scale distributed computation and big data
- Declarative, compositional, higher-order
Based on F# computation expressions
Inspired by F# asynchronous workflows
Cloud workflows
Express distributed computation.
Hello World!
1:
2:
3:
4:
|
let hello = cloud {
return "Hello World!"
}
val hello: Cloud<string>
|
MBrace: A Distributed Runtime
- Implemented in F#
- Elastic, fault-tolerant, multitasking
Cloud workflow composition
Express distribution and parallelism patterns
Sequential Composition
1:
2:
3:
4:
5:
6:
7:
8:
|
let first = cloud { return 15 }
let second = cloud { return 17 }
cloud {
let! x = first
let! y = second
return x + y
}
|
Parallel Composition
1:
2:
3:
4:
5:
6:
7:
8:
|
val (<||>): Cloud<'T> -> Cloud<'S> -> Cloud<'T * 'S>
cloud {
let first = cloud { return 15 }
let second = cloud { return 17 }
let! x, y = first <||> second
return x + y
}
|
Parallel Composition
1:
2:
3:
4:
5:
6:
7:
8:
|
val Cloud.Parallel: seq<Cloud<'T>> -> Cloud<'T []>
cloud {
let sqr x = cloud { return x * x }
let jobs = List.map sqr [1..10000]
let! sqrs = Cloud.Parallel jobs
return Array.sum sqrs
}
|
MapReduce
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
let rec mapReduce (mapF: 'T -> 'R)
(reduceF: 'R -> 'R -> 'R)
(id : 'R) (input: 'T list) =
cloud {
match input with
| [] -> return id
| [value] -> return mapF value
| _ ->
let left, right = List.split input
let! r, r' =
(mapReduce mapF reduceF id left)
<||>
(mapReduce mapF reduceF id right)
return reduceF r r'
}
|
About that MapReduce
It's a naive implementation.
About that MapReduce
Can you spot potential issues/problems?
Communication Overhead
Processed data needlessly passed copied arround worker machines.
Granularity
- Schedulling overhead of binary decomposition
- Cluster size not considered
- Ignoring multicore capacity of workers
Cloud Storage Backends
Azure, SQL, Filesystem
Distributed Data Primitives
CloudRef
- Conceptually similar to ref cells
- Allocation/dereference effected in cloud workflows
- Immutable/mutable
CloudRef
1:
2:
3:
|
CloudRef.New: 'T -> Cloud<CloudRef<'T>>
CloudRef.Read: CloudRef<'T> -> Cloud<'T>
|
Distributed Trees
1:
2:
3:
4:
|
type CloudTree<'T> =
| EmptyLeaf
| Leaf of 'T
| Branch of ICloudRef<CloudTree<'T>> * ICloudRef<CloudTree<'T>>
|
CloudTree based MapReduce
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
|
let rec createTree (input: 'T list) = cloud {
match input with
| [] -> return! CloudRef.New EmptyLeaf
| [value] -> return! CloudRef.New (Leaf value)
| _ ->
let left, right = List.split input
let! l, r =
(createTree left) <||> (createTree right)
return! CloudRef.New <| Branch(l, r)
}
|
CloudTree based MapReduce
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
let rec mapReduceTree (mapF: 'T -> 'R)
(reduceF: 'R -> 'R -> 'R)
(id : 'R) (input: ICloudRef<CloudTree<'T>>) =
cloud {
let! tree = CloudRef.Read input
match tree with
| EmptyLeaf -> return id
| Leaf value -> return mapF value
| Branch(left, right) ->
let! l, r =
(mapReduceTree mapF reduceF id left)
<||>
(mapReduceTree mapF reduceF id right)
return reduceF l r
}
|
CloudFile
Distributed binary blob
CloudFile
1:
2:
3:
|
CloudFile.New: (Stream -> unit) -> Cloud<CloudFile>
CloudFile.Read: CloudFile -> (Stream -> 'T) -> Cloud<'T>
|
Cluster size
1:
|
Cloud.GetWorkerCount: unit -> Cloud<int>
|
Local execution of cloud workflows
1:
|
Cloud.ToLocal: Cloud<'T> -> Cloud<'T>
|
F# Streams
A lightweight F#/C# library for efficient functional-style pipelines on streams of data.
Insipired by Java 8 Streams
Typical Pipeline Pattern
1:
|
source |> inter |> inter |> inter |> terminal
|
- inter : intermediate (lazy) operations, e.g. map, filter
- terminal : produces result or side-effects, e.g. reduce, iter
Pull vs Push
1:
|
source |> inter |> inter |> inter |> terminal
|
- F# Seq / IEnumerable pull
- Streams push
Example
1:
2:
3:
4:
5:
|
let data = [| 1..10000000 |] |> Array.map int64
Stream.ofArray data //source
|> Stream.filter (fun i -> i % 2L = 0L) //lazy
|> Stream.map (fun i -> i + 1L) //lazy
|> Stream.sum //eager, forcing evaluation
|
4x speedup compared to Seq. or Array. pipelines
Conclusions
- Declartive, composable cloud workflows
- Explicit & dynamic control of parallelism and granularity
- F# interactive