Асинхронная загрузка файлов python

Асинхронный файловый api-сервис

Идея написать данную статью родилась после моего фейла по разработке данного сервиса. Суть задачи была проста — написать сервер с базовыми методами сохранения и отдачи файлов и сервисными методами по специфичной обработке файлов. Обмен данными (тело запроса, возвращаемые данные) я реализовал через json, про асинхрон идею упустил. По началу всё было хорошо, файлы не превышали размер нескольких мегабайтов, методы использовались редко. Но буквально через пару месяцев размеры файлов стали измеряться десятками мегабайт, количество запросов сотни в минуту. Сервис стал тормозить, возникали ошибки совместного доступа к файлам. «Никогда Штирлиц не был так близок к провалу».

В этом кейсе я покажу как я переписал код базовых методов.

В проекте будут использованы библиотеки asyncio, aiohttp для обеспечения асинхронности сервиса.

На этот раз я использовал специальный метод передачи данных порциями по HTTP протоколу — Chunked transfer encoding. Зачем он нужен и как работает подробно описано тут — в англоязычной статье Википедии. Хорошо, что aiohttp поддерживает такой метод из «коробки» — StreamResponse.enable_chunked_encoding.

Ссылка на полный код проекта будет внизу статьи.

Пишем скрипт потокового вызова post-метода сохранения файла

Этот скрипт вспомогательный, его задача — стримить байтовый поток в файловый сервис. Основа скрипта взята из документации aiohttp, в которой показано показано как отправлять большие файлы, не считывая их полностью в память.

Читайте также:  Css все основные команды

Также в заголовок запроса я добавил информацию об имени файла. Для этого создал в заголовке запроса раздел CONTENT‑DISPOSITION.

Итого скрипт выглядит так:

async def file_sender(file_name: str, chunk_size: int) -> Generator[bytes, None, None]: """ Генератор считывания файла по частям Параметры: file_name (str): имя файла, включая путь chunk_size (int): размер порции для считывания файла в память Возвращаемое значение: chunk (bytes): часть байтового потока файла """ async with aiofiles.open(file_name, 'rb') as f: chunk = await f.read(chunk_size) while chunk: yield chunk chunk = await f.read(chunk_size) async def main() -> None: """Функция генерации post-запроса в адрес файлового сервиса""" args = get_args() # код этой функции доступен в репозитории для этой статьт url = urljoin(f'://:', args.url) headers = < 'CONTENT-DISPOSITION': f'attachment;filename=', > async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, data=file_sender(file_name=args.path, chunk_size=args.chunk_size) ) as resp: logger.info(await resp.text())

Разрабатываем post-метод сохранения файла

В первом приближении код хендлера по обработке post‑метода выглядит следующим образом:

from aiofile import async_open async def save_archive(request: Request) -> web.Response: """Хендлер сохранения байтового потока из запроса в файл""" async with async_open(file_path, 'bw') as afp: # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support # Выполняет итерацию по блокам данных в порядке их ввода в поток async for data in request.content.iter_any(): await afp.write(data) return web.Response(text='file accepted')

Далее я научил хэндлер доставать из заголовка запроса имя файла и сохранять информацию в таблицу базу данных. Таблица имеет два поля — айдишник и имя файла, которое мы получаем из CONTENT‑DISPOSITION.

async def save_file(request: Request) -> web.Response: """ Хендлер сохранения байтового потока из http-запроса в файл Параметры: request (aiohttp.web_request): объект http-запроса Возвращаемое значение: response (aiohttp.Response): объект ответа """ _, params = cgi.parse_header(request.headers['CONTENT-DISPOSITION']) file_name = params['filename'] file_id = str(uuid.uuid4()) file_path = os.path.join(app['folder'], file_id) async with async_open(file_path, 'bw') as afp: # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support # Выполняет итерацию по блокам данных в порядке их ввода в поток async for data in request.content.iter_any(): await afp.write(data) await logger.debug(f'Файл принят и записан на диск') async with engine.begin() as conn: response = await conn.execute(files.insert().values(id=file_id, name=file_name)) await logger.debug(f'Файл сохранен под return web.Response(status=201, reason='OK', text=response.inserted_primary_key[0])

Обратите внимание, файл мы сохраняем под именем айдишника. В нашем случае это UUID4. Данный прием позволил мне избежать проблем с сохранением файлов с одинаковым именем.

Для работы с БД я использовал sqlalchemy. Код подключения к БД ниже:

metadata = sqlalchemy.MetaData() engine = create_async_engine(os.environ["FILE_SERVICE_DATABASE_URL"], echo=True) files = sqlalchemy.Table( "file", metadata, sqlalchemy.Column("id", sqlalchemy.String(38), primary_key=True), sqlalchemy.Column("name", sqlalchemy.String(255)), ) async def init_db(): """ Cоздает таблицу для хранения информации о файлах Параметры: Возвращаемое значение: None """ async with engine.begin() as conn: await conn.run_sync(metadata.create_all)

Разрабатываем get-метод получения файла

У пользователей бывает разная скорость сетевого соединения. Поэтому процесс отдачи файла я делил на части. Это основная идея в интерфейсе файлового сервиса. Все будут в позиции «выиграл — выиграл»: клиент начнет скачивать файл сразу, а нам не придется хранить в памяти сервера файл целиком:

async def get_file(request: Request) -> web.StreamResponse: """ Хендлер формирования архива и скачивания его в файл Параметры: request (aiohttp.web_request): объект http-запроса Возвращаемое значение: response (aiohttp.StreamResponse): объект ответа в виде байтового потока """ file_id = request.match_info['id'] folder_path = os.path.join(os.getcwd(), app['folder']) if not (os.path.exists(folder_path) and os.path.isdir(folder_path)): await logger.warning(f'Запрошена несуществующая папка ') raise web.HTTPNotFound(text='Архив не существует или был удален') async with engine.connect() as conn: statement = select(files.c.id, files.c.name).where(files.c.id == file_id) file_rows = await conn.execute(statement) file = file_rows.fetchone() if file is None: raise web.HTTPNotFound(text='Файла по указанному id не существует') file_path = os.path.join(app['folder'], file_id) response = web.StreamResponse( status=200, reason='OK', headers=< 'Content-Type': 'multipart/x-mixed-replace', 'CONTENT-DISPOSITION': f'attachment;filename=' > ) # Отправляет клиенту HTTP заголовки await response.prepare(request) try: async with async_open(file_path, 'rb') as f: chunk = await f.read(app['chunk_size']) while chunk: await response.write(chunk) chunk = await f.read(app['chunk_size']) except asyncio.CancelledError: await logger.error("Download was interrupted ") # отпускаем перехваченный CancelledError raise return response

Итого

Я создал асинхронный файловый сервис, который помог разгрузить ОЗУ сервера и при этом был способен работать в HL‑режиме, что важно для приложений, которыми пользуется большое количество клиентов.

Репозиторий файлового сервиса для данной статьи доступен по ссылке. В README.md вы найдете инструкцию по запуску проекта.

Источник

darwing1210 / async_download_files.py

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters

import os
import asyncio
import aiohttp # pip install aiohttp
import aiofile # pip install aiofile
REPORTS_FOLDER = «reports»
FILES_PATH = os . path . join ( REPORTS_FOLDER , «files» )
def download_files_from_report ( urls ):
os . makedirs ( FILES_PATH , exist_ok = True )
sema = asyncio . BoundedSemaphore ( 5 )
async def fetch_file ( session , url ):
fname = url . split ( «/» )[ — 1 ]
async with sema :
async with session . get ( url ) as resp :
assert resp . status == 200
data = await resp . read ()
async with aiofile . async_open (
os . path . join ( FILES_PATH , fname ), «wb»
) as outfile :
await outfile . write ( data )
async def main ():
async with aiohttp . ClientSession () as session :
tasks = [ fetch_file ( session , url ) for url in urls ]
await asyncio . gather ( * tasks )
loop = asyncio . get_event_loop ()
loop . run_until_complete ( main ())
loop . close ()

Источник

aiofile 3.8.7

Real asynchronous file operations with asyncio support.

Status

Features

  • Since version 2.0.0 using caio, which contains linux libaio and two thread-based implementations (c-based and pure-python).
  • AIOFile has no internal pointer. You should pass offset and chunk_size for each operation or use helpers (Reader or Writer). The simples way is to use async_open for creating object with file-like interface.
  • For Linux using implementation based on libaio.
  • For POSIX (MacOS X and optional Linux) using implementation based on threadpool.
  • Otherwise using pure-python thread-based implementation.
  • Implementation chooses automatically depending on system compatibility.

Limitations

  • Linux native AIO implementation is not able to open special files. Asynchronous operations against special fs like /proc/ /sys/ are not supported by the kernel. It’s not a aiofile`s or `caio issue. In this cases, you might switch to thread-based implementations (see troubleshooting section). However, when used on supported file systems, the linux implementation has a smaller overhead and is preferred but it’s not a silver bullet.

Code examples

All code examples requires python 3.6+.

High-level API

Helper mimics python file-like objects, it returns file-like objects with similar but async methods.

  • async def read(length = -1) — reading chunk from file, when length is -1 , will be reading file to the end.
  • async def write(data) — writing chunk to file
  • def seek(offset) — setting file pointer position
  • def tell() — returns current file pointer position
  • async def readline(size=-1, newline=»\n») — read chunks until newline or EOF. Since version 3.7.0 __aiter__ returns LineReader . This method is suboptimal for small lines because it doesn’t reuse read buffer. When you want to read file by lines please avoid using async_open use LineReader instead.
  • def __aiter__() -> LineReader — iterator over lines.
  • def iter_chunked(chunk_size: int = 32768) -> Reader — iterator over chunks.
  • .file property contains AIOFile object
Example without context manager:
Concatenate example program ( cat ):
Copy file example program ( cp ):
Example with opening already open file pointer:
Linux native aio doesn’t support reading and writing special files (e.g. procfs/sysfs/unix pipes/etc.), so you can perform operations with these files using compatible context objects.

Low-level API

The AIOFile class is a low-level interface for asynchronous file operations, and the read and write methods accept an offset=0 in bytes at which the operation will be performed.

This allows you to do many independent IO operations on an once open file without moving the virtual carriage.

For example, you may make 10 concurrent HTTP requests by specifying the Range header, and asynchronously write one opened file, while the offsets must either be calculated manually, or use 10 instances of Writer with specified initial offsets.

In order to provide sequential reading and writing, there is Writer , Reader and LineReader . Keep in mind async_open is not the same as AIOFile, it provides a similar interface for file operations, it simulates methods like read or write as it is implemented in the built-in open.

The Low-level API in fact is just little bit sugared caio API.

Reader and Writer

When you want to read or write file linearly following example might be helpful.

LineReader — read file line by line

LineReader is a helper that is very effective when you want to read a file linearly and line by line.

It contains a buffer and will read the fragments of the file chunk by chunk into the buffer, where it will try to find lines.

The default chunk size is 4KB.

When you want to read file by lines please avoid to use async_open use LineReader instead.

More examples

Useful examples with aiofile

Async CSV Dict Reader

Troubleshooting

The caio linux implementation works normal for modern linux kernel versions and file systems. So you may have problems specific for your environment. It’s not a bug and might be resolved some ways:

  1. Upgrade the kernel
  2. Use compatible file systems
  3. Use threads based or pure python implementation.

The caio since version 0.7.0 contains some ways to do this.

1. In runtime use the environment variable CAIO_IMPL with possible values:

  • linux — use native linux kernels aio mechanism
  • thread — use thread based implementation written in C
  • python — use pure python implementation

2. File default_implementation located near __init__.py in caio installation path. It’s useful for distros package maintainers. This file might contains comments (lines starts with # symbol) and the first line should be one of linux thread or python .

Источник

Оцените статью