Ускорение применения скалярной (невекторизированной) функции к объекту pandas.Series / pandas.DataFrame

Отвечая на этот вопрос, я задумался о том как можно ускорить обработку данных, представленных в виде Pandas Series/DataFrame, при помощи невекторизированной функции, которую мы, к тому же, не можем изменить.

В указанном вопросе обработка данных заключалась в парсинге и "гармонизировании" адресов при помощи модуля natasha.

код для создания функции, использование которой мы хотим ускорить:

from natasha import MorphVocab, AddrExtractor

morph_vocab = MorphVocab()
addr_extractor = AddrExtractor(morph_vocab)


PART_TYPES = [
    "индекс", "город", 
    "улица", "проспект", "шоссе", "переулок", "проезд",
    "дом", "корпус"
]


def parse_addr(
        addr_str, 
        addr_extractor=addr_extractor, 
        min_addr_len=10
):
    if not addr_str or len(str(addr_str)) < min_addr_len:
        return None
    ext = addr_extractor.find(addr_str)
    if not ext:
        return None
    return {part.type: part.value for part in ext.fact.parts}


def combine_addr_str(parsed_addr, part_types=PART_TYPES):
    if not parsed_addr:
        return None
    return " ".join(
        parsed_addr[part_type]
        for part_type in part_types
        if parsed_addr.get(part_type)
    )

    
def get_addr_str(addr_str):
    return combine_addr_str(parse_addr(addr_str))

Цель задачи не распараллеливание ради распараллеливания, а именно ускорение вызова функции, принимающей и возвращающей скалярные значения, к pandas.Series / pandas.DataFrame.

Ссылка на пример данных - Excel файл, содержащий 200 случайных адресов взятых из примера данных из этого вопроса

Чтобы прочитать данные в Pandas DataFrame:

import pandas as pd

filename = "/tmp/sample.xlsx" # NOTE:у кажите правильный путь к файлу!
df = pd.read_excel(filename)

Предлагайте ваши варианты решения! :)


Ответы (1 шт):

Автор решения: MaxU

Собственные попытки найти оптимальное решение на MacBook Pro (Processor: 2,6 GHz 6-Core Intel Core i7):

вспомогательные функции:

from multiprocessing import Pool
from joblib import Parallel, delayed


def get_addr_series(ser):
    return ser.map(get_addr_str)


def parallelize_pd_func(arr, func, n_cores=4):
    arr_split = np.split(arr, n_cores)
    pool = Pool(n_cores)
    res = pd.concat(pool.map(func, arr_split))
    pool.close()
    pool.join()
    return res

сравнение производительности различных решений:

# читаем данные
df = pd.read_excel("/tmp/sample.xlsx")

In [346]: %timeit res = df["address_str"].apply(get_addr_str)
5.37 s ± 161 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [335]: %timeit res = df["address_str"].map(get_addr_str)
5.16 s ± 32 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [336]: %timeit res = [get_addr_str(x) for x in df["address_str"]]
5.27 s ± 50.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [339]: %timeit res = parallelize_pd_func(df["address_str"], get_addr_series, n_cores=5)
1.58 s ± 49.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [340]: %timeit res = Parallel(n_jobs=5, require='sharedmem')(delayed(get_addr_str)(x) for x in df["address_str"])
5.42 s ± 99.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
→ Ссылка