Python async orm postgresql

Блог Степана Родионова

Нет времени ждать! Нет времени ждать блокирующие I/O операции, поэтому практически каждый backend-разработчик рано или поздно задумывается об использовании асинхронного веб-фреймворка.

На данный момент у Python-разработчика существует достаточно большой выбор фреймворков с различными реализациями event loop’а: от Twisted, больше похожего на сетевую библиотеку, до http клиента и сервера для asyncio aiohttp (>6500 звезд на GitHub), Flask-like фреймворка sanic (>11000 звезд на GitHub) и http клиента и сервера Tornado (>17000 звезд на GitHub).

Редкий веб-сервер обходится без работы с хранилищами данных. И здесь приверженцев реляционных СУБД поджидает неприятный сюрприз: SQLAlchemy ORM, самая популярная ORM для Python, не поддерживает асинхронную работу. Рассмотрим пути решения возникшей задачи удобной работы с РСУБД без использования самой популярной Python-ORM.

Техническая постановка задачи

  1. Драйвер базы данных (Database Driver) – это программный компонент, реализующий программный интерфейс (API) доступа к базам данных (ODBC, JDBC, DBAPI, ADO.NET и т.д.). Это адаптер, который соединяет общий интерфейс с конкретной реализацией СУБД. Может иметь как синхронную, так и асинхронную реализацию. Примеры асинхронных драйверов для Python: aiopg, asyncpg (PostgreSQL), aiomysql (MySQL). Драйвер позволяет создавать подключение к СУБД и в рамках него выполнять SQL-запросы через курсоры.
  2. Набор абстракций над SQL – это набор инструментов, обеспечивающий абстракции для различных реализаций драйверов, а также язык выражений, позволяющий выражать язык SQL с помощью выражений языка Python, включая DDL и отображение типов языка Python на типы СУБД. Пример такого инструмента: SQLAlchemy Core. Синхронность/асинхронность зависит от выбранного драйвера базы данных.
  3. ORM – инструмент, который дает возможность представлять табличные сущности в виде обычных классов и обращаться с ними как с простыми классами, практически не применяя SQL. Чем абстрактнее инструмент, тем сложнее его реализация, создатель SQLAlchemy Майк Байер подробно описал проблемы создания асинхронных ORM в своем блоге. Примеры асинхронных ORM для Python: peewee-async, GINO, Tortoise ORM.
Читайте также:  Где нужны питон разработчики

Итого, нам нужно определиться, на каком слое становиться, и какие реализации какого слоя использовать.

Варианты решения

Базовый вариант с привлечением в проект минимума сторонних сущностей: берем драйвер для выбранной СУБД (если он существует, конечно) и пишем/генерируем сырой SQL-код. Скорее всего придется писать свою систему очистки данных (защита от SQL-инъекций, экранирование спецсимволов и т.д.). Подход хорош тем, что разработчик полностью контролирует, как его код взаимодействует с СУБД: от точных текстов SQL-запросов до понимания, когда какое действие с какой таблицей будет произведено.

cur = yield from conn.cursor() yield from cur.execute("SELECT host,user FROM user") r = yield from cur.fetchall()

Когда нам надоест писать сырой SQL, мы обратимся, например, к SQLAlchemy Core и опишем нашу таблицу на Python:

user = Table('user', metadata, Column('user', String(256), nullable=False), Column(host, String(256)), )
s = select([(user.c.user + ": " + user.c.host). label('identity')]).\ where( or_( user.c.user.like('%admin%'), user.c.host.like('%.com') ) )
r = yield from conn.execute(s).fetchall()

Peewee-async – библиотека, позволяющая использовать замечательную понятную, легкую и мощную ORM peewee с asyncio. На данный момент peewee-async работает с Python версии 3.4 и выше, поддерживает PostgreSQL через aiopg и MySQL через aiomysql, поддерживает все базовые операции и транзакции. В процессе использования peewee-async я столкнулась с вполне предсказуемой проблемой ленивой подгрузки связанных сущностей. При обращении к полям неявно выгруженной сущности peewee-async может намертво повесить event loop, причем происходит это не каждый раз, поэтому не всегда удается заметить такое поведение во время разработки. Во избежание данной проблемы приходится делать “лишний” явный запрос для связанных сущностей, если ожидается работа с их полями.

Модели описываются следующим образом:

class User(peewee.Model): user = peewee.CharField() host = peewee.CharField() class Meta: database = database
users = await objects.execute(User.select())

Нежелание разбираться с вышеописанными вариантами может мотивировать разработчика еще раз подумать о целесообразности использования реляционной модели данных в разрабатываемой системе. Может быть, предметная область полна вложенных сущностей или древовидных структур данных? В таком случае можно облегчить себе жизнь и посмотреть в сторону MongoDB и Motor.

Вывод

Самым гибким и простым вариантом на первый взгляд является использование сырого SQL. Однако, после очередного бага в методах очистки данных или после очередного метода выборки/обновления данных, который вы по вашим ощущениям написали уже без малого сотню раз, можно начать жалеть о сделанном выборе.

Для меня оптимальным вариантом является использование ORM peewee-async. Она генерирует предсказуемый SQL, который всегда можно посмотреть на этапе разработки, и я ни разу не столкнулась с тем, чтобы оверхед от ее использования серьезно повлиял на быстродействие разрабатываемой мной системы. Такой вывод я сделала после использования peewee-async в проектах, в которых было много CRUD-операций с использованием большого количества связанных сущностей, много нетривиальных выборок, и относительно небольшие объемы данных.

Полезные ссылки

Автор: Анна Вейс, Python developer @ Antida software.

Источник

asyncpg Usage

The interaction with the database normally starts with a call to connect() , which establishes a new database session and returns a new Connection instance, which provides methods to run queries and manage transactions.

import asyncio import asyncpg import datetime async def main(): # Establish a connection to an existing database named "test" # as a "postgres" user. conn = await asyncpg.connect('postgresql://postgres@localhost/test') # Execute a statement to create a new table. await conn.execute(''' CREATE TABLE users( id serial PRIMARY KEY, name text, dob date ) ''') # Insert a record into the created table. await conn.execute(''' INSERT INTO users(name, dob) VALUES($1, $2) ''', 'Bob', datetime.date(1984, 3, 1)) # Select a row from the table. row = await conn.fetchrow( 'SELECT * FROM users WHERE name = $1', 'Bob') # *row* now contains # asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1)) # Close the connection. await conn.close() asyncio.get_event_loop().run_until_complete(main()) 

asyncpg uses the native PostgreSQL syntax for query arguments: $n .

Type Conversion

asyncpg automatically converts PostgreSQL types to the corresponding Python types and vice versa. All standard data types are supported out of the box, including arrays, composite types, range types, enumerations and any combination of them. It is possible to supply codecs for non-standard types or override standard codecs. See Custom Type Conversions for more information.

The table below shows the correspondence between PostgreSQL and Python types.

char , name , varchar , text , xml

smallint , integer , bigint

All other types are encoded and decoded as text by default.

Prior to version 0.20.0, asyncpg erroneously treated inet values with prefix as IPvXNetwork instead of IPvXInterface .

Inexact single-precision float values may have a different representation when decoded into a Python float. This is inherent to the implementation of limited-precision floating point types. If you need the decimal representation to match, cast the expression to double or numeric in your query.

Custom Type Conversions

asyncpg allows defining custom type conversion functions both for standard and user-defined types using the Connection.set_type_codec() and Connection.set_builtin_type_codec() methods.

Example: automatic JSON conversion

The example below shows how to configure asyncpg to encode and decode JSON values using the json module.

import asyncio import asyncpg import json async def main(): conn = await asyncpg.connect() try: await conn.set_type_codec( 'json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog' ) data = 'foo': 'bar', 'spam': 1> res = await conn.fetchval('SELECT $1::json', data) finally: await conn.close() asyncio.get_event_loop().run_until_complete(main()) 

Example: automatic conversion of PostGIS types

The example below shows how to configure asyncpg to encode and decode the PostGIS geometry type. It works for any Python object that conforms to the geo interface specification and relies on Shapely, although any library that supports reading and writing the WKB format will work.

import asyncio import asyncpg import shapely.geometry import shapely.wkb from shapely.geometry.base import BaseGeometry async def main(): conn = await asyncpg.connect() try: def encode_geometry(geometry): if not hasattr(geometry, '__geo_interface__'): raise TypeError('  does not conform to ' 'the geo interface'.format(g=geometry)) shape = shapely.geometry.asShape(geometry) return shapely.wkb.dumps(shape) def decode_geometry(wkb): return shapely.wkb.loads(wkb) await conn.set_type_codec( 'geometry', # also works for 'geography' encoder=encode_geometry, decoder=decode_geometry, format='binary', ) data = shapely.geometry.Point(-73.985661, 40.748447) res = await conn.fetchrow( '''SELECT 'Empire State Building' AS name, $1::geometry AS coordinates ''', data) print(res) finally: await conn.close() asyncio.get_event_loop().run_until_complete(main()) 

Example: decoding numeric columns as floats

By default asyncpg decodes numeric columns as Python Decimal instances. The example below shows how to instruct asyncpg to use floats instead.

import asyncio import asyncpg async def main(): conn = await asyncpg.connect() try: await conn.set_type_codec( 'numeric', encoder=str, decoder=float, schema='pg_catalog', format='text' ) res = await conn.fetchval("SELECT $1::numeric", 11.123) print(res, type(res)) finally: await conn.close() asyncio.get_event_loop().run_until_complete(main()) 

Example: decoding hstore values

hstore is an extension data type used for storing key/value pairs. asyncpg includes a codec to decode and encode hstore values as dict objects. Because hstore is not a builtin type, the codec must be registered on a connection using Connection.set_builtin_type_codec() :

import asyncpg import asyncio async def run(): conn = await asyncpg.connect() # Assuming the hstore extension exists in the public schema. await conn.set_builtin_type_codec( 'hstore', codec_name='pg_contrib.hstore') result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore") assert result == 'a': '1', 'b': '2', 'c': None> asyncio.get_event_loop().run_until_complete(run()) 

Transactions

To create transactions, the Connection.transaction() method should be used.

The most common way to use transactions is through an async with statement:

async with connection.transaction(): await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)") 

When not in an explicit transaction block, any changes to the database will be applied immediately. This is also known as auto-commit.

See the Transactions API documentation for more information.

Connection Pools

For server-type type applications, that handle frequent requests and need the database connection for a short period time while handling a request, the use of a connection pool is recommended. asyncpg provides an advanced pool implementation, which eliminates the need to use an external connection pooler such as PgBouncer.

To create a connection pool, use the asyncpg.create_pool() function. The resulting Pool object can then be used to borrow connections from the pool.

Below is an example of how asyncpg can be used to implement a simple Web service that computes the requested power of two.

import asyncio import asyncpg from aiohttp import web async def handle(request): """Handle incoming requests.""" pool = request.app['pool'] power = int(request.match_info.get('power', 10)) # Take a connection from the pool. async with pool.acquire() as connection: # Open a transaction. async with connection.transaction(): # Run the query passing the request argument. result = await connection.fetchval('select 2 ^ $1', power) return web.Response( text="2 ^ <> is <>".format(power, result)) async def init_app(): """Initialize the application server.""" app = web.Application() # Create a database connection pool app['pool'] = await asyncpg.create_pool(database='postgres', user='postgres') # Configure service routes app.router.add_route('GET', '/', handle) app.router.add_route('GET', '/', handle) return app loop = asyncio.get_event_loop() app = loop.run_until_complete(init_app()) web.run_app(app) 

See Connection Pools API documentation for more information.

© Copyright 2016-present, the asyncpg authors and contributors.

Источник

Оцените статью