Обработка больших файлов с использованием Python
В последний год или около того, и с моим повышенным вниманием к данным ribo-seq я полностью осознал, что означает термин большие данные. Исследования ribo-seq в их необработанном виде могут легко охватить сотни ГБ, что означает, что их обработка как своевременной, так и эффективной требует некоторого обдумывания. В этом посте, и, надеюсь, в следующем, я хочу подробно описать некоторые из методов, которые я придумала (собрал из разных статей в интернете), которые помогают мне получать данные такого масштаба. В частности, я буду подробно описывать методы для Python, хотя некоторые методы можно перенести на другие языки.
Мой первый большой совет по Python о том, как разбить ваши файлы на более мелкие блоки (или куски) таким образом, чтобы вы могли использовать несколько процессоров. Давайте начнем с самого простого способа чтения файла на python.
with open("input.txt") as f: data = f.readlines() for line in data: process(line)
Эта ошибка, сделанная выше в отношении больших данных, заключается в том, что она считывает все данные в ОЗУ, прежде чем пытаться обрабатывать их построчно. Это, вероятно, самый простой способ вызвать переполнение памяти и возникновение ошибки. Давайте исправим это, читая данные построчно, чтобы в любой момент времени в оперативной памяти сохранялась только одна строка.
with open("input.txt") as f: for line in f: process(line)
Это большое улучшение, и именно оно не перегружает ОЗУ при загрузке большого файла. Затем мы должны попытаться немного ускорить это, используя все эти бездействующие ядра.
import multiprocessing as mp pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: for line in f: jobs.append( pool.apply_async(process,(line)) ) # дождаться окончания всех работ for job in jobs: job.get() pool.close()
При условии, что порядок обработки строк не имеет значения, приведенный выше код генерирует набор (пул) обработчиков, в идеале один для каждого ядра, перед созданием группы задач (заданий), по одной для каждой строки. Я склонен использовать объект Pool, предоставляемый модулем multiprocessing, из-за простоты использования, однако, вы можете порождать и контролировать отдельные обработчики, используя mp.Process, если вы хотите более точное управление. Для простого вычисления числа объект Pool очень хорош.
Хотя вышеперечисленное теперь использует все эти ядра, к сожалению, снова возникают проблемы с памятью. Мы специально используем функцию apply_async, чтобы пул не блокировался во время обработки каждой строки. Однако при этом все данные снова считываются в память; это время сохраняется в виде отдельных строк, связанных с каждым заданием, ожидая обработки в строке. Таким образом, память снова будет переполнена. В идеале метод считывает строку в память только тогда, когда подходит ее очередь на обработку.
import multiprocessing as mp def process_wrapper(lineID): with open("input.txt") as f: for i, line in enumerate(f): if i != lineID: continue else: process(line) break pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: for ID, line in enumerate(f): jobs.append( pool.apply_async(process_wrapper,(ID)) ) # дождаться окончания всех работ for job in jobs: job.get() pool.close()
Выше мы изменили функцию, переданную в пул обработчика, чтобы она включала в себя открытие файла, поиск указанной строки, чтение ее в память и последующую обработку. Единственный вход, который теперь сохраняется для каждой порожденной задачи — это номер строки, что предотвращает переполнение памяти. К сожалению, накладные расходы, связанные с необходимостью найти строку путем итеративного чтения файла для каждого задания, являются несостоятельными, поскольку по мере того, как вы углубляетесь в файл, процесс занимает все больше времени. Чтобы избежать этого, мы можем использовать функцию поиска файловых объектов, которая пропускает вас в определенное место в файле. Сочетание с функцией tell, которая возвращает текущее местоположение в файле, дает:
import multiprocessing as mp def process_wrapper(lineByte): with open("input.txt") as f: f.seek(lineByte) line = f.readline() process(line) pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: nextLineByte = f.tell() for line in f: jobs.append( pool.apply_async(process_wrapper,(nextLineByte)) ) nextLineByte = f.tell() for job in jobs: job.get() pool.close()
Используя поиск, мы можем перейти непосредственно к правильной части файла, после чего мы читаем строку в память и обрабатываем ее. Мы должны быть осторожны, чтобы правильно обрабатывать первую и последнюю строки, но в противном случае это будет именно то, что мы излагаем, а именно использование всех ядер для обработки данного файла без переполнения памяти.
Я закончу этот пост с небольшим обновлением вышеупомянутого, поскольку есть разумные накладные расходы, связанные с открытием и закрытием файла для каждой отдельной строки. Если мы обрабатываем несколько строк файла за один раз, мы можем сократить эти операции. Самая большая техническая сложность при этом заключается в том, что при переходе к месту в файле вы, скорее всего, не находитесь в начале строки. Для простого файла, как в этом примере, это просто означает, что вам нужно вызвать readline, который читает следующий символ новой строки. Более сложные типы файлов, вероятно, требуют дополнительного кода, чтобы найти подходящее место для начала / конца чанка.
import multiprocessing as mp, os def process_wrapper(chunkStart, chunkSize): with open("input.txt") as f: f.seek(chunkStart) lines = f.read(chunkSize).splitlines() for line in lines: process(line) def chunkify(fname,size=1024*1024): fileEnd = os.path.getsize(fname) with open(fname,'r') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size,1) f.readline() chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd > fileEnd: break pool = mp.Pool(cores) jobs = [] for chunkStart,chunkSize in chunkify("input.txt"): jobs.append( pool.apply_async(process_wrapper,(chunkStart,chunkSize)) ) for job in jobs: job.get() pool.close()
Во всяком случае, я надеюсь, что некоторые из вышеперечисленных примеров были новыми и возможно, полезными для вас. Если вы знаете лучший способ сделать что-то (на python), мне было бы очень интересно узнать об этом. В следующем посте, который будет опубликован в ближайшем будущем, я расширю этот код, превратив его в родительский класс, из которого создается несколько дочерних элементов для использования с различными типами файлов.
Четыре функции для быстрой работы с Big Data
Я часто пользуюсь функциями для работы с большими данными. Они позволяют упросить и ускорить работу. Некоторые я нашел на просторах интернета, другие написал сам. Сегодня хочу поделиться четырьмя из них, может кому-то будет полезно.
Быстрое чтение больших файлов
Один из первых инструментов, с которым сталкивается аналитик либо Data Scientist — это Pandas, библиотека Python для обработки и анализа данных. С её помощью мы импортируем и сортируем данные, делаем выборки и находим зависимости. Например, чтобы прочитать файл средствами Pandas в Python мы пишем:
import pandas as pd data = pd.read_csv('data_file.csv')
Такой подход простой и понятный. Каждый Data Scientist или аналитик знает это. Но если данных много? Скажем 100 000 000 строк, они постоянно меняются, сроки горят, и до обеда надо проверить еще 100 гипотез?
Возьмем исследовательский набор данных о диабете с сайта Kaggle и продублируем каждую строку 100 000 для создания нашего тестового набора данных. В результате получается 76 800 000 строк.
Изначальный датасет выглядит так:
Преобразовываем его для наших экспериментальных задач:
# Чтение датасета из файла df = pd.read_csv('diabetes.csv') # Создание тестового файла df = df.loc[df.index.repeat(100000)] # Сохранение в файл для экспирементов df.to_csv('benchmark.csv') print(len(df), 'строк')
Преобразование было долгим, но будем надеется, что время потрачено не зря. Сколько же займет чтение такого файла?
df = pd.read_csv('benchmark.csv')
Если каждый раз загружать изменившийся датасет заново, на наши 100 гипотез уйдет 13 часов. Сделать до обеда не получится.
Мы хотим загружать данные быстрее, но при этом не терять всех преимуществ, которые даёт Pandas. Простая функция использующая datatable поможет нам сделать это:
import datatable as dt import pandas as pd def read_fast_csv(f): frame = dt.fread(f) ds = frame.to_pandas() return ds
Попробуем теперь прочитать наш большой датасет:
ds = read_fast_csv('benchmark.csv')
Это же в 6 раз быстрее! Значит до обеда успеваем!
Быстрое знакомство с датасетом
Данная функция выводит несколько параметров датасета с целью ознакомления с его содержимым, в частности:
- Размер датасета
- Информацию о дубликатах
- Процент отсутствующих значений
- Количество уникальных значений
- Первые пять строчек датасета
def brief_df (df): # Подсчитываем пустые значения и уникальные значение rows_na =df.isna().sum().reset_index().rename(columns=) rows_notna = df.notna().sum().reset_index().rename(columns=) rows_analysis = pd.merge(rows_na, rows_notna, on="index", how= "outer") rows_analysis["completeRatio"] = round((rows_analysis["valuesNotNa"]) / (rows_analysis["valuesNotNa"]+rows_analysis["valuesNa"])*100,2) cardinality = df.nunique().reset_index().rename(columns=) rows_analysis = pd.merge(rows_analysis, cardinality) # Размер датасета и кол-во дубликатов print("Размер:", df.shape) dup_raw = df.duplicated ().sum() dup_per = round((dup_raw*100)/df.shape[0],2) print ("Дубликаты:", dup_raw, "->", dup_per, "%") # Статистика по пустым значениям print("Проверка на отсутсвующие значениия") display(rows_analysis) # Первые пять строк print("Первые пять строк") display(df.head())
Попробуем проанализировать наш большой датасет, который мы создали в первом разделе:
Размер: (76800000, 10)
Дубликаты: 76799232 -> 100.0 %
Обзор пустых значений:
Да, дубликатов у нас и правда 100%, ведь мы продублировали каждую строку 100 000 раз. Так что все сходится.
Столбцы с датами в разных форматах
Для анализа и визуализации данных часто требуется работать с датами в разных форматах, когда дата дана только одна. А еще хуже, если все даты прописаны как попало. Приходится приводить все даты к стандартному виду, а потом программным путем добывать из каждой даты разную информацию. Я написал функцию, которая сильно упрощает этот процесс.
Сначала смоделируем нашу ситуацию, и к нашему исходному датасету добавим колонку со случайными датами в диапазоне двух последних лет:
import numpy as np # Чтение датасета из файла df = pd.read_csv('diabetes.csv') df['date'] = np.random.choice(pd.date_range('2020-01-01', '2022-12-31'), 768) df.to_csv('diabetes_dates.csv')
Ниже код функции, которая к существующему датасету добавляет колонки с наиболее частыми требуемыми форматами, чтобы сделать выборки по годам, по кварталам, месяцам или неделям, либо взять полную дату для целей визуализации.
import datetime from datetime import timedelta def granular_dates(df, col): df['ts_date'] = pd.to_datetime(df[col]).dt.normalize() # Полная дата с названием месяца df['ts_date_str'] = df["ts_date"].dt.strftime("%d %B %Y") # Краткая дата с сокращением месяца df['ts_date_str_short'] = df["ts_date"].dt.strftime("%d %b %Y") # Только год df['ts_year'] = df.ts_date.dt.year # Только номер месяца df['ts_month'] = df.ts_date.dt.month # Только число df['ts_day'] = df.ts_date.dt.day # Год и квартал df['ts_quarter'] = pd.PeriodIndex(df.ts_date, freq="Q").astype(str) # Номер недели df['ts_dayweek'] = df.ts_date.dt.dayofweek # День недели df['ts_dayweektext'] = df.ts_date.dt.strftime("%a") # Дата конца недели (воскресенья) df['ts_week_start'] = df.ts_date.apply(lambda x: x - timedelta(days=x.weekday())).dt.strftime("%b-%d") # Дата конца недели (воскресенья) df['ts_week_end'] = df.ts_date.apply(lambda x: x - timedelta(days=x.weekday()) + timedelta(days=6)).dt.strftime("%b-%d")
Теперь всего одна строчка кода (не считая чтение файла):
# Чтение датасета df = pd.read_csv('diabetes_dates.csv') # Добавление колонок с датами разного формата granular_dates(df, 'date')
В результате к датасету добавляются такие колонки: