/pd_mp/cpu_high.png /pd_mp/cpu_low.png
“Give me alllll of youuuuu” - Pandas + hacky multiprocessing “ill be done when I’m done” - Pandas

TL;DR - Instead of doing df.apply(function), use the following code…

cores = mp.cpu_count()
partitions = cores

def parallelise(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def wrapper(rows):

    def function(row):
            #do the expensive stuff
        return #result

    #df.apply syntax
    return rows.apply(function)

_ = parallelise(df, wrapper)


Be weary of mindlessly using df.apply()

Say you have a function that you need to apply for every row in a pandas data frame.

Typically, if the function cannot be vectorised or done using pandas functions, one would resort to df.apply().

For the uninitiated, df.apply(f), takes a function f and applies along a particular axis of the data frame, df. For example, if you want to square every element in column A of the data frame df, you would do df.A.apply(lambda x: x**2). Although this is a terrible way to square number and should never be done in practice. Case in point, using df.A.apply() takes 5-times longer to do the same computation, and the code is much less readable than using df.A**2

Dont do this

What with these expensive functions?

OK, so… assuming that you have no other option than to use df.apply() for this operation, how can we speed it up?

Lets start with a concrete example. Say you have a data frame with 2 columns, A and B, that hold paths to images. We want to convert the images at these paths to HSV, calculate the mean HSV values for each image and compute the difference between the mean values of the image at A and the image at B. The code below outlines this operation. (rgb2hsv and imread from skimage)

def someExpensiveFunction(row):
    imgA_mean = np.mean(rgb2hsv(imread(row.A)),axis=(0,1))
    imgB_mean = np.mean(rgb2hsv(imread(row.B)),axis=(0,1))
    hsv_mean_diff = imgA_mean - imgB_mean
    return hsv_mean_diff

If i were to do vanilla df.apply, it takes 1min 2s to run for a data frame with about 600 image pairs (590 to be exact).

    %%time
    _ = df.apply(someExpensiveFunction,axis=1)
    ### Wall time: 1min 2s

Currently this is barely using a single core on my PC so lets parallelise it.

Parallellllll

The parallelisation uses python’s built-in multiprocessing module, and is composed of 2 parts; the parallelise function and the worker function.

The worker function defines the actual calculation that needs to run and is quite similar to someExpensiveFunction. But, for this to work, we need to put someExpensiveFunction inside another function, which i will refer to as the wrapper function. This is someExpensiveFunction_wrapper in the code below. The wrapper function receives a block of data and applies the worker function to that data. The return statement of the wrapper function defines how this is done.

Think of rows as a smaller data frame, and you are applying someExpensiveFunction to that1.

def someExpensiveFunction_wrapper(rows):
    def someExpensiveFunction(row):
        imgA_mean =np.mean(rgb2hsv(imread(row.A)),axis=(0,1))
        imgB_mean =np.mean(rgb2hsv(imread(row.B)),axis=(0,1))
        hsv_mean_diff = imgA_mean - imgB_mean
        return hsv_mean_diff
    return rows.apply(someExpensiveFunction,axis=1)

The parallelise function splits the data frame into blocks and assigns each core to run wrapper function, on each data block.

cores = multiprocessing.cpu_count()
partitions = cores

def parallelise(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

And now, you invoke the parallelisation as follows

    %%time
    _ = parallelise(df, someExpensiveFunction_wrapper)

    ### Wall time: 22.3 s

The parallelised code ran about 3x faster than the original with much heavier use of the CPU.

Quick note for working in Jupyter notebooks on windows - To get this to work, I’ve had to copy someExpensiveFunction_wrapper to an external python file and import into my Jupyter notebook. Structure is the same as before but now I have to do parallelise(df, extFile.someExpensiveFunction_wrapper) instead of parallelise(df, someExpensiveFunction_wrapper). Not sure if this is a Jupyter issue or an issue with multiprocessing on windows.

So the complete code looks something like this…

Original

def someExpensiveFunction(row):
    imgA_mean =np.mean(rgb2hsv(imread(row.A)),axis=(0,1))
    imgB_mean =np.mean(rgb2hsv(imread(row.B)),axis=(0,1))
    hsv_mean_diff = imgA_mean - imgB_mean
    return hsv_mean_diff

_ = df.apply(someExpensiveFunction,axis=1)

### Wall time: 1min 2s

Modified

cores = mp.cpu_count()
partitions = cores

def parallelise(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def someExpensiveFunction_wrapper(rows):
    def someExpensiveFunction(row):
        imgA_mean =np.mean(rgb2hsv(imread(row.A)),axis=(0,1))
        imgB_mean =np.mean(rgb2hsv(imread(row.B)),axis=(0,1))
        hsv_mean_diff = imgA_mean - imgB_mean
        return hsv_mean_diff
    return rows.apply(someExpensiveFunction,axis=1)

_ = parallelise(df, someExpensiveFunction_wrapper)

### Wall time: 22.3 s

And finally as code prototype to be copied…

cores = mp.cpu_count()
partitions = cores

def parallelise(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def wrapper(rows):

    def function(row):
            #do the expensive stuff
        return #result

    #df.apply syntax
    return rows.apply(function)

_ = parallelise(df, wrapper)

Be weary of mindlessly using Multiprocessing for everything

Although the parallelised code ran much faster than the original, there multiprocessing overhead needs to be accounted for. Spawning processes is expensive. Consequently, for less expensive worker functions, parallelising would actually make the code run slower. Parallelising the squaring example for before, using this approach made it about 3-times slower. Yes… df.apply() with multiprocessing was slower than vanilla df.apply(). Given that was already 5x than df.A **2, the using multiprocessing would make it 15x slower.

On top of that, if you are running this on a laptop, chances are the sustained CPU clock for multiprocessing loads is lower compared to for single core load, due to thermal and power limits. So, as might be expected, performance is worse than simply multiplying the single thread time by the number of threads. Better performance could potentially be achieved on PC with a larger headroom to the TDP.

In general, the threshold computational cost of the worker function for multiprocessing to be worthwhile will vary depending on your hardware.


Off to the races!

Not really though… Don’t gamble kids… being fiscally responsible is cool!

/pd_mp/cpu_high.png /pd_mp/cpu_low.png
“I am Speed!” Pandas playing where’s wally

  1. I know this looks like a job for decorators…