Dask Futures: Do's & Don'ts

A few considerations with Dask Futures to improve computation efficiency and data management.

Photo by NASA on Unsplash

Breaking up your Dask tasks into smaller single-operation chunks can improve the parallelizability of a workflow. This is true, but it also requires a little thought into how those tasks are submitted to the scheduler. This article aims to provide some helpful considerations when using Dask Futures.

The code that led to this article was implemented for creating an input array for a Dask KMeans Algorithm. The array was provided as Very High Resolution (VHR) imagery (the array is represented as “arr” in the subsequent code) and transformed and reshaped from (1, 2269, 2029) to (4.6M, 16) — That’s 4.6 Million rows of 16 features at 2.5x2.5m resolution. The 16 features include RGB raster data and calculated indices from both drone and Sentinel-2 Imagery. The original spatial resolution of this data is 0.15x0.15m and resampled to 2.5x2.5m (down-sampled for the drone and up-sampled for Sentinel-2 to match) currently to handle the sheer amount of data being processed.

As a typical approach to fitting a KMeans algorithm, one would take a random sample of points of the input data to determine cluster centroids. This information is an important piece in the discovery of the proper way of using Dask Futures.

Below is a stub for creating a KMeans cluster algorithm in Dask.

import dask_ml.cluster
km = dask_ml.cluster.KMeans(n_clusters, n_iterations, oversampling)

DON’T

It makes sense to break up the tasks into logical steps, so passing in array operations and pre-building Futures help build out the computation graph that will be processed by the Dask scheduler.

def get_arr(arr, i):
return da.where(
(((da.isnan(arr[i]))|(da.isinf(arr[i])))), -1, arr[i]
)
smpl = random.sample(range(arr.shape[0]), sample_size)
stk = [client.submit(get_arr, arr, i) for i in smpl]
X = client.submit(da.vstack, X)
km_future = client.submit(km.fit, X)
km = client.gather(km_future)

Why should you not do this?

Pythonic List comprehension is known for its efficiency over for-loop iteration. This approach _is_ faster in that regard. However, due to the sequential submission of eager tasks to the scheduler, this approach underutilizes the distributed abilities of Dask.

This approach also puts a lot of RAM pressure on the Scheduler. Especially with large amounts of data and if chunk sizes are not managed properly. Reducing the chunk size may alleviate RAM pressure, but there is still significant leakage. This RAM pressure eventually causes the Scheduler itself to halt operations that only shows as a “hang” in tasks being processed indefinitely. At some points when the RAM pressure became too much, the Scheduler pod was evicted from the node-pool and thus the workflow would need to be started from the beginning.

As mentioned before in regard to a random sample of points, this approach struggled to process 512 datapoints. Often crashing throwing a `KilledWorker` Error due to the inefficient management of input building and stacking. Even at 512 datapoints, the preparation and fitting of the data took four minutes.

DO

Now, 512 datapoints is a tiny fraction of the 4.6M available points to sample from. Not to mention the high resource demands the scheduler is being placed under, frequenting hangs and lost progress. This needs to be addressed if any meaningful analysis could be pulled from the subsequent work. Below, after taking a step back and thinking about what it really means to submit tasks to a remote scheduler/cluster, is a rework of the previous approach.

A couple considerations that were made:

  • Where does the data live? Where should the data live?
  • Dask prefers “pure” functions.
def get_arr(arr, smpl):
fit_stk = [
da.where(
(((da.isnan(arr[i]))|(da.isinf(arr[i])))),
-1, arr[i]) for i in smpl
]
return fit_stk
def stack_arr(fit_stk):
X = da.vstack(fit_stk)
X = X.rechunk({0: -1, 1: -1})
return X
def fit_arr(km, X):
return km.fit(X)
smpl = random.sample(range(arr.shape[0]), smple_size)
stk_list = client.submit(get_arr, arr, smpl)
X = client.submit(stack_arr, stk_list)
km_future = client.submit(fit_arr, km, X)
km = client.gather(km_future)

Why should we do this?

This approach breaks down the Schedulers tasks into actual remote functions. Before, the arrays methods were being passed in as the function — this required the distribution of the array with its methods to workers.

Shifting to building the input stack to the cluster by placing the list comprehension inside the remote function, where the data lives, it will eliminate the sequential processing of indices in `get_arr`. Instead of waiting for 512 sequential steps typically taking a few minutes, this now returns a Future (processed on the cluster eagerly) within milliseconds.

In regard to input datapoints, this approach has successfully achieved 16k datapoints being fitted onto the KMeans algorithm (this took 1h 35m). The previous “Don’t” approach managed to process 512 datapoints in four minutes. Comparatively, this “Do” approach is able to process 2048 datapoints in the same amount of time. As we can see, a huge improvement! Of course, having more datapoints to fit on will increase the precision of the algorithm when predicting on unseen points and was a main driver to revising the initial approach.

Just as importantly, RAM pressure was eliminated and thus avoiding any Scheduler evictions from Kubernetes. No more starting from scratch!

Summary

Distribution requires a step back when considering your goals. Taking a step back to consider _how_ an operation may be interacted with in a distributed setting is not inherently obvious, especially if it’s hidden behind an API. This article aims to help assist in deciding how to implement functions to be submitted as Futures to a Dask cluster.

Topics