Original post

If you have a large collection of data and have to do similar computations on each element, data parallelism is an easy way to speedup computation using multiple CPUs and machines as well as GPU(s). While this is not the only kind of parallelism, it covers a vast class of compute-intensive programs. A major hurdle for using data parallelism is that you need to unlearn some habits useful in sequential computation (i.e., patterns result in mutations of data structure). In particular, it is important to use libraries that help you describe what to compute rather than how to compute. Practically, it means to use generalized form of map and reduce operations and learn how to express your computation in terms of them. Luckily, if you already know how to write iterator comprehensions, there is not much more to learn for accessing a large class of data parallel computations.

This introduction primary focuses on the Julia packages that I (Takafumi Arakaki @tkf) have developed. As a result, it currently focuses on thread-based parallelism. There is simple distributed computing support. GPU support is a frequently requested feature but it hasn’t been implemented yet. See also other parallel-computation libraries in Julia.

Also note that this introduction does not discuss how to use threading primitives such as Threads.@spawn since it is too low-level and error-prone. For data parallelism, a higher-level description is much more appropriate. It also helps you write more reusable code; e.g., using the same code for single-threaded, multi-threaded, and distributed computing.

Most of the examples here may work in all Julia 1.x releases. However, for the best result, it is highly recommended to get the latest released version (1.5.2 as of writing). You can download it at https://julialang.org/.

Once you get julia, you can get the dependencies required for this tutorial by running using Pkg; Pkg.add(["Transducers", "ThreadsX", "OnlineStats", "FLoops", "MicroCollections", "BangBang", "Plots", "BenchmarkTools"]) in Julia REPL.

If you prefer using exactly the same environment used for testing this tutorial, run the following commands

git clone https://github.com/JuliaFolds/data-parallelism
cd data-parallelism
julia --project

and then in the Julia REPL:

julia> using Pkg

julia> Pkg.instantiate()

To use multi-threading in Julia, you need to start it with multiple execution threads. If you have Julia 1.5 or higher, you can start it with the -t auto (or, equivalently, --threads auto) option:

$ julia -t auto
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.5.2 (2020-09-23)
 _/ |__'_|_|_|__'_|  |   https://julialang.org/ release
|__/                   |

julia> Threads.nthreads()  # number of core you have
8

The command line option -t/--threads can also take the number of threads to be used. In older Julia releases, use the JULIA_NUM_THREADS environment variable. For example, on Linux and macOS, JULIA_NUM_THREADS=4 julia starts juila with 4 execution threads.

For more information, see Starting Julia with multiple threads in the Julia manual.

Starting julia with multiple worker processes

A few examples below mention Distributed.jl-based parallelism. Like how multi-threading is setup, you need to setup multiple worker processes to get speedup. You can start julia with -p auto (or, equivalently, --procs auto). Distributed.jl also lets you add worker processes after starting Julia with addprocs:

using Distributed
addprocs(8)

For more information, see Starting and managing worker processes section in the Julia manual.

Mapping is probably the most frequently used function in data parallelism. Recall how Julia’s sequential map works:

a1 = map(string, 1:9, 'a':'i')
9-element Array{String,1}:
 "1a"
 "2b"
 "3c"
 "4d"
 "5e"
 "6f"
 "7g"
 "8h"
 "9i"

We can simply replace it with ThreadsX.map for thread-based parallelism (see also other libraries):

using ThreadsX
a2 = ThreadsX.map(string, 1:9, 'a':'i')
@assert a1 == a2

Julia’s standard library Distributed.jl contains pmap as a distributed version of map:

using Distributed
a3 = pmap(string, 1:9, 'a':'i')
@assert a1 == a3

๐Ÿ”ฌ Test Code

using Test
    @testset begin
        @test a1 == a2
        @test a1 == a3
    end

โ˜‘ Pass

Test Summary: | Pass  Total
test set      |    2      2

Practical example: Stopping time of Collatz function

As a slightly more “practical” example, let’s play with the Collatz conjecture which states that recursive application the Collatz function defined as

collatz(x) =
    if iseven(x)
        x รท 2
    else
        3x + 1
    end

reaches the number 1 for all positive integers.

I’ll skip the mathematical background of it (as I don’t know much about it) but let me mention that there are plenty of fun-to-watch explanations in YouTube ๐Ÿ™‚

If the conjecture is correct, the number of iteration required for the initial value is finite. In Julia, we can calculate it with

function collatz_stopping_time(x)
    n = 0
    while true
        x == 1 && return n
        n += 1
        x = collatz(x)
    end
end

Just for fun, let’s plot the stopping time of the initial values from 1 to 10,000:

using Plots
plt = scatter(
    map(collatz_stopping_time, 1:10_000),
    xlabel = "Initial value",
    ylabel = "Stopping time",
    label = "",
    markercolor = 1,
    markerstrokecolor = 1,
    markersize = 3,
    size = (450, 300),
)

We can easily parallelize map(collatz_stopping_time, 1:10_000) and get a good speedup:

julia> Threads.nthreads()  
4

julia> using BenchmarkTools

julia> @btime map(collatz_stopping_time, 1:100_000);
  18.116 ms (2 allocations: 781.33 KiB)

julia> @btime ThreadsX.map(collatz_stopping_time, 1:100_000);
  5.391 ms (1665 allocations: 7.09 MiB)

Julia’s iterator comprehension syntax is a powerful tool for composing mapping, filtering, and flattening. Recall that mapping can be written as an array or iterator comprehension:

b1 = map(x -> x + 1, 1:3)
b2 = [x + 1 for x in 1:3]         
b3 = collect(x + 1 for x in 1:3)  
@assert b1 == b2 == b3
b1
3-element Array{Int64,1}:
 2
 3
 4

The iterator comprehension can be executed with threads by using ThreadsX.collect:

b4 = ThreadsX.collect(x + 1 for x in 1:3)
@assert b1 == b4

๐Ÿ”ฌ Test Code

using Test
    @testset begin
        @test b1 == b2 == b3
    end

โ˜‘ Pass

Test Summary: | Pass  Total
test set      |    1      1

Note that more complex composition of mapping, filtering, and flattening can also be executed in parallel:

c1 = ThreadsX.collect(y for x in 1:3 if isodd(x) for y in 1:x)
4-element Array{Int64,1}:
 1
 1
 2
 3

Transducers.dcollect is for using iterator comprehensions with a distributed backend:

using Transducers
c2 = dcollect(y for x in 1:3 if isodd(x) for y in 1:x)
@assert c1 == c2

๐Ÿ”ฌ Test Code

@test c1 == c2 == [1, 1, 2, 3]

Functions such as sum, prod, maximum, and all are the examples of reduction (aka fold) that can be parallelized. They are very powerful tools when combined with iterator comprehensions. Using ThreadsX.jl, a sum of a iterator created by the comprehension syntax

d1 = sum(x + 1 for x in 1:3)
9

can easily be parallelized by

d2 = ThreadsX.sum(x + 1 for x in 1:3)
9

๐Ÿ”ฌ Test Code

@test d1 == d2

For the full list of pre-defined reductions and other parallelized functions, type ThreadsX. and press TAB in the REPL.

Practical example: Maximum stopping time of Collatz function

We can use maximum to compute the maximum stopping time of Collatz function on a given the range of initial values

max_time = ThreadsX.maximum(collatz_stopping_time, 1:100_000)
350

๐Ÿ”ฌ Test Code

@test max_time == 350

We get a speedup similar to the map example above:

julia> @btime maximum(collatz_stopping_time, 1:100_000)
  17.625 ms (0 allocations: 0 bytes)
350

julia> @btime ThreadsX.maximum(collatz_stopping_time, 1:100_000)
  5.024 ms (1214 allocations: 69.17 KiB)
350

OnlineStats.jl

OnlineStats.jl provides a very rich and composable set of reductions. You can pass it as the first argument to ThreadsX.reduce:

using OnlineStats: Mean
e1 = ThreadsX.reduce(Mean(), 1:10)
Mean: n=10 | value=5.5

๐Ÿ”ฌ Test Code

using OnlineStats; @test e1 == fit!(Mean(), 1:10)

๐Ÿ’ก Note

While OnlineStats.jl often does not provide the fastest way to compute the given statistics when all the intermediate data can fit in memory, in many cases you don’t really need the absolute best performance. However, it may be worth considering other ways to compute statistics if ThreadsX.jl + OnlineStats.jl becomes the bottleneck.

For non-trivial parallel computations, you need to write a custom reduction. FLoops.jl provides a concise set of syntax for writing custom reductions. For example, this is how to compute sums of two quantities in one sweep:

using FLoops

@floop for (x, y) in zip(1:3, 1:2:6)
    a = x + y
    b = x - y
    @reduce(s += a, t += b)
end
(s, t)
(15, -3)

๐Ÿ”ฌ Test Code

@test (s, t) == (15, -3)

In this example, we do not initialize s and t; but it is not a typo. In parallel sum, the only reasonable value of the initial state of the accumulators like s and t is zero. So, @reduce(s += a, t += b) works as if s and t are initialized to appropriate type of zero. However, since there are many zeros in Julia (0::Int, 0.0::Float64, (0x00 + 0x00im)::Complex{UInt8}, …), s and t are undefined if the input collection (i.e., the value of xs in for x in xs) is empty.

To control the type of the accumulators and also to avoid UndefVarError in the empty case, you can set the initial value with accumulator = initial_value op input syntax

@floop for (x, y) in zip(1:3, 1:2:6)
    a = x + y
    b = x - y
    @reduce(s2 = 0.0 + a, t2 = 0im + b)
end
(s2, t2)
(15.0, -3 + 0im)

๐Ÿ”ฌ Test Code

@test (s2, t2) === (15.0, -3 + 0im)

To understand the computation of @floop with @reduce(accumulator = initial_value op input) syntax, you can get a rough idea by just ignoring @reduce( and corresponding ,s and ). More concretely:

  1. Extract expressions accumulator = initial_value (“initializers”) from accumulator = initial_value op input and put them in front of the for loop.

  2. Convert accumulator = initial_value op input to inplace update accumulator = accumulator op input.

  3. Strip off @reduce.

So, the above example of @floop is equivalent to

let
    s2 = 0.0  
    t2 = 0im  
    for (x, y) in zip(1:3, 1:2:6)
        a = x + y
        b = x - y
        s2 = s2 + a  
        t2 = t2 + b  
    end      
    (s2, t2) 
end
(15.0, -3 + 0im)

๐Ÿ”ฌ Test Code

@test (s3, t3) === (s2, t2)

The short-hand version @reduce(s += a, t += b) is implemented by using the first element of the input collection as the initial value.

This transformation is used for generating the base case that is executed in a single Task. Multiple results from tasks are combined by the operators and functions specified by @reduce. More explicitly, (s2_right, t2_right) is combined into (s2_left, t2_left) by

s2_left = s2_left + s2_right
t2_left = t2_left + t2_right

โš  Warning

Don’t use locks or atomics! (unless you know what you are doing)

In particular, do not write

acc = Threads.Atomic{Int}(0)
Threads.@thread fors x in xs
    Threads.atomic_add!(acc, x + 1)
end

Locks and atomics help you write correct concurrent programs when used appropriately. However, they do so by limiting parallel execution. Using data parallel pattern is the easiest way to get high performance.

Parallel findmin/findmax with @reduce() do

@reduce() do syntax is the most flexible way in FLoops.jl for expressing custom reductions. It is very useful when one variable can influence other variable(s) in reduction (e.g., index and value in the example below). Note also that @reduce can be used multiple times in the loop body. Here is a way to compute findmin and findmax in parallel:

@floop for (i, x) in pairs([0, 1, 3, 2])
    @reduce() do (imin = -1; i), (xmin = Inf; x)
        if xmin > x
            xmin = x
            imin = i
        end
    end
    @reduce() do (imax = -1; i), (xmax = -Inf; x)
        if xmax < x
            xmax = x
            imax = i
        end
    end
end

@show imin xmin imax xmax
imin = 1
xmin = 0
imax = 3
xmax = 3

๐Ÿ”ฌ Test Code

@test (imin, xmin, imax, xmax) == (1, 0, 3, 3)

We can understand the computation of @floop roughly by ignoring the lines with @reduce() do and corresponding end. More concretely:

  1. Extract expressions accumulator = initial_value (“initializers”) from (accumulator = initial_value; input) or (accumulator; input) and put them in front of the for loop.

  2. Remove @reduce() do ... and corresponding end.

let
    imin2 = -1    
    xmin2 = Inf   
    imax2 = -1    
    xmax2 = -Inf  

    for (i, x) in pairs([0, 1, 3, 2])
        if xmin2 > x   
            xmin2 = x  
            imin2 = i  
        end            
        if xmax2 < x   
            xmax2 = x  
            imax2 = i  
        end            
    end

    @show imin2 xmin2 imax2 xmax2
end
imin2 = 1
xmin2 = 0
imax2 = 3
xmax2 = 3

๐Ÿ”ฌ Test Code

@test (imin2, xmin2, imax2, xmax2) == (1, 0, 3, 3)

The above computation is used for each partition of the input collection and combined by the reducing function defined by @reduce() do block. That is to say, (imin2_right, xmin2_right, imax2_right, xmax2_right) is combined into (imin2_left, xmin2_left, imax2_left, xmax2_left) by

if xmin_left > xmin_right
    xmin_left = xmin_right
    imin_left = imin_right
end
if xmax_left < xmax_right
    xmax_left = xmax_right
    imax_left = imax_right
end

Remark: Observe that x and i of the first @reduce() do block are replaced with xmin_right and imin_right while x and i of the second @reduce() do block are replaced with xmax_right and imax_right. This is why we used two @reduce() do blocks; we need to “pair” x/i with xmin/imin or with xmax/imax depending on which if block we are in.

Parallel findmin/findmax with ThreadsX.reduce (tedious!)

Note that it is not necessary to use @floop for writing a custom reduction. For example, you can write an equivalent code with ThreadsX.reduce:

(imin3, xmin3, imax3, xmax3) = ThreadsX.reduce(
    ((i, x, i, x) for (i, x) in pairs([0, 1, 3, 2]));
    init = (-1, Inf, -1, -Inf)
) do (imin, xmin, imax, xmax), (i1, x1, i2, x2)
    if xmin > x1
        xmin = x1
        imin = i1
    end
    if xmax < x2
        xmax = x2
        imax = i2
    end
    return (imin, xmin, imax, xmax)
end

@assert (imin3, xmin3, imax3, xmax3) == (imin, xmin, imax, xmax)

๐Ÿ”ฌ Test Code

@test (imin3, xmin3, imax3, xmax3) == (imin, xmin, imax, xmax)

However, as you can see, it is much more verbose and error-prone (e.g., the initial values and the variables are declared in different place).

Histogram with reduce

mapreduce and reduce are useful when combining pre-existing operations. For example, we can easily implement histogram by combining mapreduce, Dict, and mergewith!:

str = "dbkgbjkahbidcbcfhfdeedhkggdigfecefjiakccjhghjcgefd"
f1 = mapreduce(x -> Dict(x => 1), mergewith!(+), str)
Dict{Char,Int64} with 11 entries:
  'f' => 5
  'd' => 6
  'e' => 5
  'j' => 4
  'h' => 5
  'i' => 3
  'k' => 4
  'a' => 2
  'c' => 6
  'g' => 6
  'b' => 4

Note that this code has a performance problem: Dict(x => 1) allocates an object for each iteration. This is bad in particular in threaded Julia code because it frequently invokes garbage collection. To avoid this situation, we can replace Dict with MicroCollections.SingletonDict which does not allocate the dictionary in the heap. SingletonDict can be “upgraded” to a Dict by calling BangBang.mergewith!!. It will then create a mutable object for each task to mutate. We can then compose an efficient parallel histogram operation:

using BangBang: mergewith!!
using MicroCollections: SingletonDict

f2 = ThreadsX.mapreduce(x -> SingletonDict(x => 1), mergewith!!(+), str)
@assert f1 == f2

๐Ÿ”ฌ Test Code

@test f1 == f2

(For more information, see Transducers.jl’s ad-hoc histogram tutorial.)

Practical example: Histogram of stopping time of Collatz function

Let’s compute the histogram of collatz_stopping_time over some range of initial values. Unlike the histogram example above, we know that the stopping time is a positive integer. So, it makes sense to use an array as the data structure that maps a bin (index) to a count. There is no pre-defined reducing function like mergewith! we can use. Fortunately, it is easy to write it using @reduce() do syntax in @floop:

using FLoops
using MicroCollections: SingletonDict

maxkey(xs::AbstractVector) = lastindex(xs)
maxkey(xs::SingletonDict) = first(keys(xs))

function collatz_histogram(xs, executor = ThreadedEx())
    @floop executor for x in xs
        n = collatz_stopping_time(x)
        n > 0 || continue
        obs = SingletonDict(n => 1)
        @reduce() do (hist = Int[]; obs)
            l = length(hist)
            m = maxkey(obs)  
            if l < m
                
                resize!(hist, m)
                fill!(view(hist, l+1:m), 0)
            end
            
            for (k, v) in pairs(obs)
                @inbounds hist[k] += v
            end
        end
    end
    return hist
end

As we discussed above, @reduce() do blocks are used in two contexts; for the sequential base case and for combining the accumulated results from two base cases. Thus, for combining hist_left and hist_right, we need to substitute hist_right to obs. This is why we need to handle the cases where obs is a SingletonDict and a Vector. Thanks to multiple dispatch, it is very easy to absorb the difference in the two containers. We can just use what Base defines for pairs and only need to define maxkey for absorbing the remaining difference.

๐Ÿ’ก Note

When writing @reduce() do (Lโ‚ = Iโ‚; Rโ‚), (Lโ‚‚ = Iโ‚‚; Rโ‚‚), ..., (Lโ‚™ = Iโ‚™; Rโ‚™), make sure that the do block body can handle arbitrary possible value of Lแตข substituted to Rแตข and not just Rแตขs that are calculated directly in the for loop body.

Example usage:

using Plots
plt = plot(
    collatz_histogram(1:1_000_000),
    xlabel = "Stopping time",
    ylabel = "Counts",
    label = "",
    size = (450, 300),
)

We use @floop executor for ... syntax so that it is easy to switch between different kind of execution mechanisms; i.e., sequential, threaded, and distributed execution:

hist1 = collatz_histogram(1:1_000_000, SequentialEx())
hist2 = collatz_histogram(1:1_000_000, ThreadedEx())
hist3 = collatz_histogram(1:1_000_000, DistributedEx())
@assert hist1 == hist2 == hist3

๐Ÿ”ฌ Test Code

@test hist1 == hist2 == hist3

For example, we can easily compare the performance of sequential and threaded execution:

julia> @btime collatz_histogram(1:1_000_000, SequentialEx());
  220.022 ms (9 allocations: 13.81 KiB)

julia> @btime collatz_histogram(1:1_000_000, ThreadedEx());
  58.271 ms (155 allocations: 60.81 KiB)

Quick notes on @threads and @distributed

Julia itself has Threads.@threads macro for threaded for loop and @distributed macro for distributed for loop. They are adequate for simple use cases but come with some limitations. For example, @threads does not have built-in reducing function support. Although @distributed macro has reducing function support, it is limited to pre-defined functions and it is tedious to handle multiple variables. Both of these macros only have simple static scheduler and lacks an option like basesize supported by FLoops.jl and ThreadsX.jl to tune load balancing. Furthermore, the code written with @threads cannot be reused for @distributed and vice versa.

Hopefully, this tutorial covers a bare minimum for you to start writing data-parallel programs and the documentations of FLoops.jl and ThreadsX.jl are now a bit more accessible. These two libraries are based on the protocol designed for Transducers.jl which also contains various tools for data parallelism.

Transducers.jl’s parallel processing tutorial covers a similar topic with explanations for more low-level details. Parallel word count tutorial based on Guy L. Steele Jr.’s 2009 ICFP talk is more advanced but I find it a very good example to follow for understanding what is possible with a clever design of the reducing function.

Note that ideas presented in this tutorial are very general and should be applicable also when using other libraries. For example, the idea of custom reduction is useful in GPU computing when using mapreduce on CuArray.

๐Ÿ’ก Note

Work in progress. TODO: Add more tutorials and how-to guides.