Архив за месяц: Июль 2019

Сохранение обученной ML модели (объекта python) в базе данных (BLOB)

Стандартная архитектура при создании сервисов с использованием машинного обучения подразумеват обучение модели на одном сервере и её использование на другом сервере, который непосредственно работает на предикт. Однако, на предикт может работать много серверов, и возникает вопрос: как доставить обученную модель на все сервера? Обычно модель сохраняют в виде файла. Поэтому можно сделать mount какого-то общего сетевого ресурса, и на нём хранить модель. Но более гибко будет сохранить модель в базе данных, тем более что все инстансы уже скорее всего имеют соединение с общей базой.
Ниже приведу пример того, как сохранить объект python в базе данных в колонке типа блоб.

import _pickle
import pymysql.cursors

## Для простоты возьмём лёгкий объект
# однако обученная модель может достигать сотни мегабайт, а то и ещё больше
listToPickle = [(10, 10), ("example", 10.0), (1.0, 2.0), "object"]

## Преобразование нашего объекта в строку
pickledList = _pickle.dumps(listToPickle)

## Соединение с базой данных mysql
connection = pymysql.connect(host='localhost', port=3306, user='dbuser', password='pass', db='somedb', cursorclass=pymysql.cursors.DictCursor)

## создание курсора
with connection.cursor() as cursor:
    ## Вставка в БД
    cursor.execute("""INSERT INTO pickleTest VALUES (NULL, 'testCard', %s)""", (pickledList, ))
    connection.commit()

    ## Выборка по сохраненной модели
    cursor.execute("""SELECT features FROM pickleTest WHERE card = 'testCard'""")

    ## Получим и распечатаем результат
    res = cursor.fetchone()

    ## Обратное преобразование
    unpickledList = _pickle.loads(res['pickledStoredList'])
    print(unpickledList)

Как видим всё просто, достаточно перед сохранием в блоб преобразовать объект с помощью _pickle.dumps() а при загрузке объекта обратно преобразовать в строку в объект с помощью _pickle.loads().
Также отмечу, что в третьем питоне не надо устанавливать дополнительных библиотек, используется встроенная библиотека _pickle. Таким образом Pickling в Python — это способ хранения весьма сложных структур данных в двоичном представлении, которые требуется восстановить через некоторое время, для получения той же структуры данных.

Как обойти строки dataframe в цикле (pandas)

В первую очередь хочется сказать, что обходить датафрейм не самая лучшая затея из-за плохой производительности и гораздо лучше будет воспользоваться альтернативными методами в виде функции apply (рассмотрим ниже). Если же все-таки потребовалось проитерироваться по строкам в DataFrame, то приведу код ниже. Однако использовать его стоит лишь для небольших дата-сетов.

import pandas as pd

dataframe_from_list = [[1,2], [3,4], [10,20]]
df = pd.DataFrame(dataframe_from_list, columns=['col1', 'col2'])

for index, row in df.iterrows():
    print(index, row)
    print(row['col1'], row['col2'], row['col1'] + row['col2'])

В данном примере использовалась функция iterrows для обхода датафрейма. Для обращения к колоночным значениям в строке используется row['название_колонки'].

А теперь давайте подумаем, зачем нам итерироваться по датафрейму: самое очевидно это взять некоторые колоночные значения из строки и подсчитать некоторую функцию.
Но это можно сделать и с помощью apply метода с указанием направления по оси x:

result = df.apply(lambda row: row['col1'] + row['col2'], axis=1)
print(result)
# 3, 7, 30

Соответсвенно, вместо lambda функции можно поставить свою, или в простом случае использовать оптимизированные numpy функции, например np.sum

Ну а на последок, давайте представим что у нас большое кол-во строк, сравнимое с миллионом, и нам надо подсчитать некую функцию. Проблема в том, что pandas вычисляет apply в один процесс, а все современные процессоры имют несколько ядер. Поэтому необходимо распараллелить apply функцию для оптимального расчета. Для распараллеливания и быстрого расчета функции по датафрейму воспользуемя функцией ниже:

from multiprocessing import Pool
import numpy as np

# для примера возьмем функцию суммы по строке, приведенную выше
def calculate_sum_column(df):
    df['sum_column'] = df.apply(lambda row: row['col1'] + row['col2'], axis=1)
    return df

# в данном примере расспараллеливаем на восемь потоков. Будьте аккуратны - при распараллеливании тратится больше оперативной памяти
def parallelize_dataframe(df, func):
    a,b,c,d,e,f,g,h = np.array_split(df, 8)
    pool = Pool(8)
    some_res = pool.map(func, [a,b,c,d,e,f,g,h])
    df = pd.concat(some_res)
    pool.close()
    pool.join()
    return df

# имитация большого датасета
df = pd.concat([df, df, df], ignore_index=True)

df = parallelize_dataframe(df, calculate_sum_column)
print(df.head(10))

С помощью данного гибкого «многоядерного» подхода можно многократно ускорить обход датафрейма и вычислить необходимую функцию