Python async http request

Asynchronous HTTP requests in Python

In python, you can make HTTP request to API using the requests module
or native urllib3 module. However, requests and urllib3 are synchronous. It means that only one HTTP call can be made at a time in a single thread. Sometimes you have to make multiples HTTP call and synchronous code will perform baldy. To avoid this, you can use multi-threading or since python 3.4 asyncio module.

Test case

In order to show the difference of time between sync and async code, i made a script that read a file with 500 cities names and perform HTTP call to an API to retrieve information about location, population and so on from the city name.

Sync code performance

@timeit def fetch_all(cities): responses = [] with requests.session() as session: for city in cities: resp = session.get(f"https://geo.api.gouv.fr/communes?nom=city>&fields=nom,region&format=json&geometry=centr") responses.append(resp.json()) return responses 

Async code performance

async def fetch(session, url): """Execute an http call async Args: session: contexte for making the http call url: URL to call Return: responses: A dict like object containing http response """ async with session.get(url) as response: resp = await response.json() return resp async def fetch_all(cities): """ Gather many HTTP call made async Args: cities: a list of string Return: responses: A list of dict like object containing http response """ async with aiohttp.ClientSession() as session: tasks = [] for city in cities: tasks.append( fetch( session, f"https://geo.api.gouv.fr/communes?nom=city>&fields=nom,region&format=json&geometry=centr", ) ) responses = await asyncio.gather(*tasks, return_exceptions=True) return responses @timeit def run(cities): responses = asyncio.run(fetch_all(cities)) return responses 

Conclusion

As you can see, the async version is a lot faster than the sync version so if you run into a situation where your code is performing multiple I/O calls then you should consider concurrency to improve performance. However asynchronous version requires more work as you can see. If you want to see the threading version that works with the requests module and also see how to implement automatic retry and caching on your API call check out this tutorial.

Источник

Как я писал асинхронные веб-запросы на Python, или почему провайдер считает, что я бандит

На днях по работе потребовалось сделать утилиту, которая прямо вот из консоли ходит в апи нашего клауд сервиса и берет оттуда кое-какую информацию. Подробности что и зачем — вне этого рассказа. Принципиальный вопрос здесь другой — скорость. Скорость реально важна (порядок количества запросов — десятки и сотни). Потому что ждать — не кайф.
Здесь я хочу поделиться своим ресёрчем на тему запросов, как делать круто, а как нет. С примерами кода конечно. А так же рассказать, как я тупил.

Есть только я и неприятности

Начнем, пожалуй, с классики

Последовательные синхронные запросы. Будем использовать всем известную либу requests и tqdm для красивого вывода в консоль. В качестве игрушечного примера выбрал первую-попавшуюся публичную апишку: https://catfact.ninja/ . Метрикой качества будет RPS (Request per second). Чем выше — тем соотвественно лучше.

import time import requests from tqdm import tqdm URL = 'https://catfact.ninja/' class Api: def __init__(self, url: str): self.url = url def http_get(self, path: str, times: int): content = [] for _ in tqdm(range(times), desc='Fetching data. ', colour='GREEN'): response = requests.get(self.url + path) content.append(response.json()) return content if __name__ == '__main__': N = 10 api = Api(URL) start_timestamp = time.time() print(api.http_get(path='fact/', times=N)) task_time = round(time.time() - start_timestamp, 2) rps = round(N / task_time, 1) print( f"| Requests: ; Total time: s; RPS: . |\n" )

Получаем следующий вывод в терминале:

Fetching data. 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:06, , , , , , , , , ] | Requests: 10; Total time: 6.61 s; RPS: 1.5. | 

RPS — 1.5. Очень грустно. У меня еще и интернет не самый быстрый сейчас дома. Ну тут добавить нечего.

Что можно оптимизировать уже сейчас? Ответ: использовать requests.Session
Eсли делать несколько запросов к одному и тому же хосту, базовое TCP-соединение будет использоваться повторно, что приводит к значительному увеличению производительности. (цитата из документации requests)

Используем сессию

 def http_get_with_session(self, path: str, times: int): content = [] with requests.session() as session: for _ in tqdm(range(times), desc='Fetching data. ', colour='GREEN'): response = session.get(self.url + path) content.append(response.json()) return content

Немного изменив метод, и вызвав его, видим следующее:

Fetching data. 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:02, , , , , , , , , ] | Requests: 10; Total time: 2.58 s; RPS: 3.9. | 

Почти 4 RPS, в сравнении с 1.5 уже прорыв.
Но не секрет, что для сокращения i/o time есть практика использования асихнронных/многопоточных программ. Это как раз такой случай, потому что во время ожидания ответа от сервера наша программа ничего не делает, хотя могла бы отправлять уже другой запрос, а потом другой и т.д. Попробуем реализовать асинхронный подход к решению кейса.

async / await

Для удобства вызовов сделаем функцию-оболочку:

def run_case(func, path, times): start_timestamp = time.time() asyncio.run(func(path, times)) task_time = round(time.time() - start_timestamp, 2) rps = round(times / task_time, 1) print( f"| Requests: ; Total time: s; RPS: . |\n" )

И собственно сама реализация метода (не забудьте поставить aiohttp, обычные реквесты не работают в асинхронной парадигме):

 async def async_http_get(self, path: str, times: int): async with aiohttp.ClientSession() as session: content = [] for _ in tqdm(range(times), desc='Async fetching data. ', colour='GREEN'): response = await session.get(url=self.url + path) content.append(await response.text(encoding='UTF-8')) return content
if __name__ == '__main__': N = 50 api = Api(URL) run_case(api.async_http_get, path='fact/', times=N)
Async fetching data. 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:12 

Удивительно, но разницы с предыдущим случаем особо нет. Я думаю выигрыш в i/o тайме компенсируется издержками на передачу управления потоком функциями друг другу (слышал что очень ругают эти моменты в python). Как на самом деле не знаю.

На словах такой подход можно объяснить так.
- В цикле создается корутина, которая отправляет запрос.
- Не дожидаясь ответа, управление потоком отдается снова event loop'у, который создает следующую по "for циклу" корутину, которая тоже отправляет запрос.
- Но теперь прежде чем отдать управление эвент лупу проверяется статус ответа первой корутины. Если она может быть продолжена (получила ответ на запрос), то управление потоком возвращается ей, если нет, то см. пункт 2.
- И так далее

В итоге код-то впринципе асихнронный, но запросы не отправляются "разом". Принципиально иначе подойти к этой ситуации поможет asyncio.gather.

Используем asyncio.gather

Gather - как ни банально с английского собирать. Метод gather собирает коллекцию корутин и запускает их разом (тоже условно конечно). То есть, в отличии от предыдущего случая, мы в цикле создаем корутины, а потом их запускаем.

Было:
[cоздали корутину] -> [запустили корутину] -> [cоздали корутину] -> [запустили корутину] ->
[cоздали корутину] -> [запустили корутину] ->[cоздали корутину] -> [запустили корутину]

А стало:
[cоздали корутину] -> [создали корутину] ->[cоздали корутину] -> [создали корутину] ->
[запустили корутину] -> [запустили корутину] -> [запустили корутину] -> [запустили корутину]

 async def async_gather_http_get(self, path: str, times: int): async with aiohttp.ClientSession() as session: tasks = [] for _ in tqdm(range(times), desc='Async gather fetching data. ', colour='GREEN'): tasks.append(asyncio.create_task(session.get(self.url + path))) responses = await asyncio.gather(*tasks) return [await r.json() for r in responses]
if __name__ == '__main__': N = 50 api = Api(URL) run_case(api.async_gather_http_get, path='fact/', times=N)

И получаем. получаем. ничего не получаем. Курсор продолжает многозначительно мигать в окне терминала. Не работает - подумал Штирлиц.

Путем мучительного дебага и попыток понять, почему мой код не работает, я понял - причина в моем VPN. Его узлы находятся где-то в юрисдикции Cloudflare. А они такое поведение не поощряют, считая, что я бот. Нормальный человек столько запросов в секунду делать не будет, поэтому мои запросы. теряются где-то в пучинах интернета. Ответа на них не будет. Никогда. Корутины просто не заканчиваются.

Окей, поняв откуда ноги растут, запускаем код:

Async gather fetching data. 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00

Цифры выросли, круто, 21 не 1.5 - это уж точно.
Однако же есть какой-то предел, увеличим N (число запросов) до 200.

| Requests: 200; Total time: 2.97 s; RPS: 67.3. |

Это что же получается, можно и так? На самом деле нет. Если внимательно рассмотреть, что же все-таки нам отвечает сервер, то заметим, что большая часть ответов это

Конкретный лимит запросв, который я установил в ходе эксперимента с этим сервисом - это около 60 ответов за раз, остальное он не переваривает. Так что если будете так ходить в сервисы, которые не хотите перегрузить или вообще положить, то подходите к этом вопросу обдуманно, не превышайте определенных рамок.

Что там с threading?

Тут особо смысла нет - практический эксперимент показал, что threading показывает такие же результаты (плюс - минус), как асихнронный код из пункта 3.

multiprocessing

Не буду врать, просто посмотрел как нечто похожее делал какой-то индус с Ютуба. Результаты сильно хуже, чем у предыдущих способов. Да и писать такой код - это насилие над своей психикой. А я свою психику берегу.

Подведу итоги:

  • На днях упал Cloudlfare - извините, это из-за меня, больше не буду.
  • Хотите быть чемпионом по запросам - используйте asyncio.gather, но с тщательно подобранными лимитами. Если вы ходите не на один хост, а в разные источники, то вообще не стестяйтесь.

Источник

Читайте также:  Printstream to string java
Оцените статью