Dask

Overview

 Dask  can be used to parallelize compute tasks. Dask can be very powerful for modern data science tasks like those based on pandas and several machine learning libraries. It can also be used to parallelize any kind of Python function across the different cores on a compute node, which we will demonstrate below. Note that Dask is not currently suitable for distributing work over multiple nodes in a single Slurm allocation. If tha is a feature you need, use  🌀Parsl  instead.
To install Dask with its relevant add-ons, run pip install dask[distributed]. If you are running calculations on Slurm, also run pip install dask-jobqueue.

Local Parallelization

Consider the following toy example. Normally, you would expect this calculation to take a total of 10 seconds since there are two sequential 5-second add functions being called. We will pretend that the add function is a surrogate for some compute-heavy task (e.g. a DFT calculation).
import time

def add(a, b):
time.sleep(5)
return a + b

def workflow(a, b):
output1 = add(a, b)
output2 = add(a, b)
return output1 + output2

result = workflow(1, 2)
With Dask, the above code could be run in only 5 seconds by parallelizing the work over two CPU cores. This is achieved by decorating the compute task with the @delayed decorator.
First, load the default Dask client in an IPython console or Jupyter Notebook.
from dask.distributed import Client

client = Client()
Then, in the same session run the modified code block:
import time
from dask import delayed

@delayed
def add(a, b):
time.sleep(5)
return a + b

def workflow(a, b):
output1 = add(a, b)
output2 = add(a, b)
return output1 + output2

future = client.compute(workflow(1, 2))
result = future.result()
The code will now finish in 5 seconds instead of 10. Note that add(a, b) now returns a Delayed object rather than the actual result. To fetch the result, one must call .result() on the Delayed object — but note that this "blocks" the execution and Python will wait until the value is returned before continuing, so be careful where you call it.

Slurm Parallelization

If you are looking to parallelize a Python task on your local machine, you can use many of the native Dask features like  Dask dataframes  without having to deal with launching a Slurm cluster like the example below.
This example will show how to parallelize a Python function call via Slurm with Dask. An example is provided below that works on Tiger. For other machines (e.g. Della), you may need to change some flags such as modifying the cores_per_node and removing the account flag. The example below will create a single Slurm job on one node that is ready for work, running up to 112 concurrent Python function calls.
The Slurm job launched with the call to SLURMCluster() will remain active and waiting for work. To stop the Slurm job entirely, call client.shutdown() or manually scancel the job ID.
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

slurm_jobs = 1
cores_per_node = 112
job_mem = "512G"
cluster_kwargs = {
"cores": cores_per_node,
"memory": job_mem,
"shebang": "#!/bin/bash",
"account": "rosengroup",
"walltime": "00:00:30",
"interface": "ib0",
"job_cpu": cores_per_node,
"job_mem": job_mem,
"job_script_prologue": [
"source ~/.bashrc",
"module load anaconda3/2024.10",
"conda activate cms",
]
}
cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())

cluster.scale(jobs=slurm_jobs)
client = Client(cluster)
Then you can run the following code, which will dispatch the add() tasks across the compute node such that it finishes in 5 seconds instead of 10. To launch the calculation, the client.compute() function must be used. A Future is returned, which can be converted to its true value via a blocking .result() call. The @delayed decorator allows for asynchronous execution of the task.
import time
from dask import delayed

@delayed
def add(a, b):
time.sleep(5)
return a + b

def workflow(a, b):
output1 = add(a, b)
output2 = add(a, b)
return output1 + output2

future = client.compute(workflow(1, 2))
result = future.result()
Dask will automatically handle task dependencies. For instance, if output1 is passed to add(output1, b), Dask will know not to call the second add() task until the first one is finished.