Prefect is a popular and powerful workflow orchestration service. For calculations on the compute clusters, you can spin up your own Prefect server to orchestrate calculations, which can be submitted to the Slurm queue via Dask. For calculations on your local machine, you can use Prefect Cloud without needing to use Dask or spin up your own server. In a dedicated Anaconda environment on the tiger-arrk
login node, install the necessary dependencies via
pip install prefect[dask] dask[distributed] dask-jobqueue
and then run the following to configure the Prefect settings:
prefect config set PREFECT_SERVER_API_HOST="10.36.48.21"
prefect config set PREFECT_API_URL="http://10.36.48.21:4200/api"
prefect config set PREFECT_UI_API_URL="http://10.36.48.21:4200/api"
Then, to start the server, run
When you need to stop the server, you can run prefect server stop
. To keep the server running in the background, see 👻Background Tasks . When you run the Prefect server, you will be able to view the GUI dashboard. On mytiger.princeton.edu , start an interactive session using the Desktop on Tiger3 Vis nodes feature, open Firefox, and put in the address printed to screen when you ran the prefect server. Note that if you are using a compute cluster that has network access from the compute nodes (e.g. NERSC), you should use Prefect Cloud rather than spinning up your own Prefect server. We can run a Prefect server on the tiger-arrk
login node because it is our dedicated resource.
In the example below, we create a Dask cluster (see 🐸Dask ) before launching any work. This will submit a job to the Slurm queue that will continually pull in new work from the Prefect server until the walltime is reached. Run the following in an IPython console or Jupyter Notebook on the login node:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cores_per_node = 112
job_mem = "512G"
cluster_kwargs = {
"cores": cores_per_node,
"memory": job_mem,
"shebang": "#!/bin/bash",
"account": "rosengroup",
"walltime": "00:00:30",
"interface": "ib0",
"job_cpu": cores_per_node,
"job_mem": job_mem,
"job_script_prologue": [
"source ~/.bashrc",
"module load anaconda3/2024.10",
"conda activate cms",
]
}
cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())
cluster.scale(jobs=slurm_jobs)
client = Client(cluster)
With the above, then run the following minimal example in the same session:
from prefect_dask import DaskTaskRunner
from prefect import flow, task
@task
def add(a, b):
return a + b
@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def workflow(a, b):
output1 = add.submit(a, b)
output2 = add.submit(output1, b)
return output2.result()
result = workflow(1, 2)
In the example below, each @flow
-decorated function will be submitted as its own Slurm job via a temporary Dask cluster. Note that each flow would be run on a 112-core node, so you should make sure each flow can take full advantage of all the compute cores on the node.
from prefect_dask import DaskTaskRunner
from prefect import flow, task
cores_per_node = 112
job_mem = "512G"
cluster_kwargs = {
"cores": cores_per_node,
"memory": job_mem,
"shebang": "#!/bin/bash",
"account": "rosengroup",
"walltime": "00:00:30",
"interface": "ib0",
"job_cpu": cores_per_node,
"job_mem": job_mem,
"job_script_prologue": [
"source ~/.bashrc",
"module load anaconda3/2024.10",
"conda activate cms",
]
}
@task
def add(a, b):
return a + b
@flow(task_runner=DaskTaskRunner(cluster_class="dask_jobqueue.SLURMCluster", cluster_kwargs=cluster_kwargs))
def workflow(a, b):
output1 = add.submit(a, b)
output2 = add.submit(output1, b)
return output2.result()
result = workflow(1, 2)