PolarSPARC

Introduction to SQLAlchemy :: Part - 1


Bhaskar S 04/04/2020


Overview

SQLAlchemy (pronounced as Sequel Alchemy) is a popular SQL database abstraction layer for Python.

SQLAlchemy consists of two layers - the Core and the Object Relational Mapping (ORM).

The following diagram illustrates a very high level view of SQLAlchemy:

SQLAlchemy Layers
SQLAlchemy Layers

In this article, we will focus on the Core layer for interacting with the SQL database.

Installation and Setup

The installation is on a Ubuntu 18.04 LTS based Linux desktop.

Ensure Docker is installed on the system. Else, follow the instructions provided in the article Introduction to Docker to complete the installation.

For our demonstrations, we will be leveraging PostgreSQL as our SQL database. Check the latest stable version for Postgres docker image. Version 12.2 was the latest at the time of this article.

To download the latest docker image for Postgres, execute the following command:

$ docker pull postgres:12.2

The following would be a typical output:

Output.1

12.2: Pulling from library/postgres
68ced04f60ab: Pull complete 
59f4081d08e6: Pull complete 
74fc17f00df0: Pull complete 
8e5e30d57895: Pull complete 
a1fd179b16c6: Pull complete 
7496d9eb4150: Pull complete 
0328931819fd: Pull complete 
8acde85a664a: Pull complete 
38e831e7d2d3: Pull complete 
582b4ba3b134: Pull complete 
cbf69ccc1db5: Pull complete 
1e1f3255b2e0: Pull complete 
c1c0cedd64ec: Pull complete 
6adde56874ed: Pull complete 
Digest: sha256:110d3325db02daa6e1541fdd37725fcbecb7d51411229d922562f208c51d35cc
Status: Downloaded newer image for postgres:12.2
docker.io/library/postgres:12.2

We need to specifiy a directory on the host that will be mounted as a data volume for the Postgres database.

To create a data directory on the host, execute the following command:

$ mkdir -p $HOME/Downloads/Docker/postgres

Now, we will need to initialze and start the Postgres database.

To initialze and start the Postgres database, execute the following command:

$ docker run -d --rm --name postgres-12.2 -e POSTGRES_USER=polarsparc -e POSTGRES_PASSWORD=polarsparc\$123 -p 5432:5432 -v $HOME/Downloads/Docker/postgres:/var/lib/postgresql/data postgres:12.2

The following would be a typical output:

Output.2

58916c2912fe4bff8ec2f727f0457011f9f8dcfd9f11f274503e7a839ce916d8

To check the Postgres database log, execute the following command:

$ docker logs postgres-12.2

The following would be a typical output:

Output.3

...SNIP...
2020-04-04 14:21:24.245 UTC [1] LOG:  starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2020-04-04 14:21:24.245 UTC [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
2020-04-04 14:21:24.245 UTC [1] LOG:  listening on IPv6 address "::", port 5432
2020-04-04 14:21:24.257 UTC [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2020-04-04 14:21:24.293 UTC [1] LOG:  database system is ready to accept connections

Finally, we will need to create the database called my_test_db. For that we need to enter the docker shell by executing the following command:

$ docker exec -it postgres-12.2 sh

The shell prompt will change to #

We need to enter the psql shell by executing the following command:

# psql -U polarsparc

The following would be a typical output:

Output.4

psql (12.2 (Debian 12.2-2.pgdg100+1))
Type "help" for help.

The shell prompt will change to polarsparc=#

Execute the following command:

polarsparc=# CREATE DATABASE my_test_db;

Next, execute the following command:

polarsparc=# GRANT ALL PRIVILEGES ON DATABASE my_test_db TO polarsparc;

To exit psql, execute the following command:

polarsparc=# \q

Finally, to exit docker shell, execute the following command:

# exit

Now, we need to install the sqlalchemy module for Python by executing the following command:

$ python -m pip install sqlalchemy

The following would be a typical output:

Output.5

Collecting sqlalchemy
  Cache entry deserialization failed, entry ignored
  Cache entry deserialization failed, entry ignored
  Downloading https://files.pythonhosted.org/packages/8c/30/4134e726dd5ed13728ff814fa91fc01c447ad8700504653fe99d91fdd34b/SQLAlchemy-1.3.15.tar.gz (6.1MB)
    100% |********************| 6.1MB 228kB/s 
Installing collected packages: sqlalchemy
  Running setup.py install for sqlalchemy ... done
Successfully installed sqlalchemy-1.3.15

Next, we need to install the psycopg2-binary module for Python by executing the following command:

$ python -m pip install psycopg2-binary

The following would be a typical output:

Output.6

Collecting psycopg2-binary
  Cache entry deserialization failed, entry ignored
  Downloading https://files.pythonhosted.org/packages/97/2a/b854019bcb9b925cd10ff245dbc9448a82fe7fdb40127e5cf1733ad0765c/psycopg2_binary-2.8.4-cp27-cp27mu-manylinux1_x86_64.whl (2.9MB)
    100% |********************| 2.9MB 400kB/s 
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.8.4

This completes the necessary setup. Time to get hands-on !!!

Hands-on with SQLAlchemy Core

To get started, one needs an instance of Engine, which under-the-hood uses a database connection pool and a dialect to interact with the specific database (Postgres in this case).

The following diagram illustrates the component view of the Engine:

Engine Components
Engine Components

The method create_db_engine in the following Python program (ex_sa_00.py) creates and returns an instance of Engine:

ex_sa_00.py
from sqlalchemy.engine import Engine
from sqlalchemy.engine.url import URL
from sqlalchemy.engine import create_engine

import logging

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)


def create_db_engine() -> Engine:
    postgres_db = {'drivername': 'postgres',
                   'username': 'polarsparc',
                   'password': 'polarsparc$123',
                   'host': 'localhost',
                   'port': 5432,
                   'database': 'my_test_db'}
    db_url = URL(**postgres_db)

    logging.info("Postgres database url: %s" % db_url)

    db_engine = create_engine(db_url)

    with db_engine.connect() as db_conn:
        logging.info("Connected to the Postgres database !!!")

    return db_engine

The following are brief descriptions for some of the Python classes and methods:

We will now demonstrate how to create a simple database table called securities and insert 3 records into the table.

The following diagram illustrates the securities database table:

Securities Table
Securities Table

The method create_securities_table in the following Python program (ex_sa_01.py) creates the database table securities and adds an index on the column symbol. The method insert_securities_recs inserts 3 records into the securities table:

ex_sa_01.py
from sqlalchemy.engine import Engine
from SQLAlchemy.ex_sa_00 import create_db_engine

import logging

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)


def create_securities_table(engine: Engine) -> bool:
    status = False

    if not engine.dialect.has_table(engine, 'securities'):
        engine.execute('CREATE TABLE securities ('
                       'id serial PRIMARY KEY,'
                       'symbol varchar(10) UNIQUE NOT NULL,'
                       'price NUMERIC(5, 2))')

        logging.info("Created the securities table !!!")

        engine.execute('CREATE INDEX idx_securities_symbol '
                       'ON securities(symbol)')

        logging.info("Created the idx_securities_symbol index !!!")

        status = True
    else:
        logging.info("The securities table already exists !!!")

    return status


def insert_securities_recs(engine: Engine):
    if engine.dialect.has_table(engine, 'securities'):
        with engine.connect() as db_conn:
            # Record - 1
            resp = db_conn.execute('INSERT INTO securities '
                                   '(symbol, price) '
                                   'VALUES (\'BULL.ST\', 25.75)')
            if resp.rowcount == 1:
                logging.info("Inserted record for BULL.ST")
            else:
                logging.info("Failed to insert record for BULL.ST")

            # Record - 2
            resp = db_conn.execute('INSERT INTO securities '
                                   '(symbol, price) '
                                   'VALUES (\'DOG.ST\', 54.15)')
            if resp.rowcount == 1:
                logging.info("Inserted record for DOG.ST")
            else:
                logging.info("Failed to insert record for DOG.ST")

            # Record - 3
            resp = db_conn.execute('INSERT INTO securities '
                                   '(symbol, price) '
                                   'VALUES (\'BARK.ST\', 144.90)')
            if resp.rowcount == 1:
                logging.info("Inserted record for BARK.ST")
            else:
                logging.info("Failed to insert record for BARK.ST")
    else:
        logging.info("The securities table *DOES NOT* exists !!!")


if __name__ == "__main__":
    db_engine = create_db_engine()
    if create_securities_table(db_engine):
        insert_securities_recs(db_engine)

The following are brief descriptions for some of the Python classes and methods:

To run the Python program ex_sa_01.py, execute the following command:

$ python ex_sa_01.py

The following would be a typical output:

Output.7

2020-04-04 14:33:57,051 - Postgres database url: postgres://polarsparc:polarsparc$123@localhost:5432/my_test_db
2020-04-04 14:33:57,100 - Connected to the Postgres database !!!
2020-04-04 14:33:57,123 - Created the securities table !!!
2020-04-04 14:33:57,133 - Created the idx_securities_symbol index !!!
2020-04-04 14:33:57,137 - Inserted record for BULL.ST
2020-04-04 14:33:57,138 - Inserted record for DOG.ST
2020-04-04 14:33:57,139 - Inserted record for BARK.ST

The following Python program (ex_sa_02.py) demonstrates the CRUD (Create, Read, Update, Delete) operations on the securities database table.

The method create_dummy_security creates a DUMMY record, the method query_dummy_security reads the DUMMY record, the method update_dummy_security updates the DUMMY record, and finally the method delete_dummy_security deletes the DUMMY record.

ex_sa_02.py
from sqlalchemy.engine import Engine
from SQLAlchemy.ex_sa_00 import create_db_engine

import logging

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)


def create_dummy_security(engine: Engine):
    if engine.dialect.has_table(engine, 'securities'):
        with engine.connect() as db_conn:
            resp = db_conn.execute('INSERT INTO securities '
                                   '(symbol, price) '
                                   'VALUES (\'DUMMY\', 1.00)')
            if resp.rowcount == 1:
                logging.info("Inserted record for DUMMY")
            else:
                logging.info("Failed to insert record for DUMMY")
    else:
        logging.info("The securities table *DOES NOT* exists !!!")


def query_dummy_security(engine: Engine):
    if engine.dialect.has_table(engine, 'securities'):
        with engine.connect() as db_conn:
            resp = db_conn.execute('SELECT symbol, price '
                                   'FROM securities '
                                   'WHERE symbol = \'DUMMY\'')
            if resp.rowcount == 1:
                logging.info("Selected record for DUMMY")

                row = resp.fetchone()

                logging.info('Symbol: %s, Price: %d' % (row['symbol'], row[1]))
            else:
                logging.info("Record for DUMMY *DOES NOT* exists !!!")
    else:
        logging.info("The securities table *DOES NOT* exists !!!")


def update_dummy_security(engine: Engine):
    if engine.dialect.has_table(engine, 'securities'):
        with engine.connect() as db_conn:
            resp = db_conn.execute('UPDATE securities '
                                   'SET price = 2.00 '
                                   'WHERE symbol = \'DUMMY\'')
            if resp.rowcount == 1:
                logging.info("Updated record for DUMMY")
            else:
                logging.info("Record for DUMMY *DOES NOT* exists !!!")
    else:
        logging.info("The securities table *DOES NOT* exists !!!")


def delete_dummy_security(engine: Engine):
    if engine.dialect.has_table(engine, 'securities'):
        with engine.connect() as db_conn:
            resp = db_conn.execute('DELETE FROM securities '
                                   'WHERE symbol = \'DUMMY\'')
            if resp.rowcount == 1:
                logging.info("Deleted record for DUMMY")
            else:
                logging.info("Record for DUMMY *DOES NOT* exists !!!")
    else:
        logging.info("The securities table *DOES NOT* exists !!!")


if __name__ == "__main__":
    db_engine = create_db_engine()
    create_dummy_security(db_engine)
    query_dummy_security(db_engine)
    update_dummy_security(db_engine)
    query_dummy_security(db_engine)
    delete_dummy_security(db_engine)
    query_dummy_security(db_engine)

To run the Python program ex_sa_02.py, execute the following command:

$ python ex_sa_02.py

The following would be a typical output:

Output.8

2020-04-04 15:09:59,489 - Postgres database url: postgres://polarsparc:polarsparc$123@localhost:5432/my_test_db
2020-04-04 15:09:59,538 - Connected to the Postgres database !!!
2020-04-04 15:09:59,543 - Inserted record for DUMMY
2020-04-04 15:09:59,545 - Selected record for DUMMY
2020-04-04 15:09:59,545 - Symbol: DUMMY, Price: 1
2020-04-04 15:09:59,549 - Updated record for DUMMY
2020-04-04 15:09:59,551 - Selected record for DUMMY
2020-04-04 15:09:59,551 - Symbol: DUMMY, Price: 2
2020-04-04 15:09:59,554 - Deleted record for DUMMY
2020-04-04 15:09:59,555 - Record for DUMMY *DOES NOT* exists !!!

Shifting gears, we will now demonstrate another way to create a simple database table called customer and insert 3 records into the table.

The following diagram illustrates the customer database table:

Customer Table
Customer Table

The method create_customer_table in the following Python program (ex_sa_03.py) creates the database table customer and adds an index on the column last_name. The method insert_customer_recs inserts 3 records into the customer table:

ex_sa_03.py
from sqlalchemy.engine import Engine
from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String
from SQLAlchemy.ex_sa_00 import create_db_engine

import logging

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)


def create_customer_table(engine: Engine) -> bool:
    status = False

    if not engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData()
        customer_table = Table(
            'customer',
            metadata,
            Column('id', Integer, autoincrement=True, primary_key=True),
            Column('first_name', String(25), nullable=False),
            Column('last_name', String(25), nullable=False, index=True),
            Column('email', String(50)),
            Column('mobile', String(10))
        )
        customer_table.create(engine)

        logging.info("Created the customer table !!!")

        status = True
    else:
        logging.info("The customer table already exists !!!")

    return status


def insert_customer_recs(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']

        with engine.connect() as db_conn:
            # Record - 1
            rec_1 = customer_table.insert().values(
                first_name='Alice',
                last_name='Doctor',
                email='alice.d@timbuk2.do'
            )
            resp = db_conn.execute(rec_1)
            if resp.rowcount == 1:
                logging.info("Inserted record for Alice")
            else:
                logging.info("Failed to insert record for Alice")

            # Record - 2
            rec_2 = customer_table.insert().values(
                first_name='Bob',
                last_name='Builder',
                email='bbuilder@nomansland.bu'
            )
            resp = db_conn.execute(rec_2)
            if resp.rowcount == 1:
                logging.info("Inserted record for Bob")
            else:
                logging.info("Failed to insert record for Bob")

            # Record - 3
            rec_3 = customer_table.insert().values(
                first_name='Charlie',
                last_name='Driver',
                email='charlie.driver@vehicles.ve'
            )
            resp = db_conn.execute(rec_3)
            if resp.rowcount == 1:
                logging.info("Inserted record for Charlie")
            else:
                logging.info("Failed to insert record for Charlie")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


if __name__ == "__main__":
    db_engine = create_db_engine()
    if create_customer_table(db_engine):
        insert_customer_recs(db_engine)

The following are brief descriptions for some of the Python classes and methods:

To run the Python program ex_sa_03.py, execute the following command:

$ python ex_sa_03.py

The following would be a typical output:

Output.9

2020-04-04 20:27:47,367 - Postgres database url: postgres://polarsparc:polarsparc$123@localhost:5432/my_test_db
2020-04-04 20:27:47,418 - Connected to the Postgres database !!!
2020-04-04 20:27:47,447 - Created the customer table !!!
2020-04-04 20:27:47,475 - Inserted record for Alice
2020-04-04 20:27:47,477 - Inserted record for Bob
2020-04-04 20:27:47,478 - Inserted record for Charlie

The following Python program (ex_sa_04.py) demonstrates the CRUD (Create, Read, Update, Delete) operations on the customer database table.

The method create_dummy_customer creates a Dummy record, the method query_dummy_customer reads the Dummy record, the method update_dummy_customer updates the Dummy record, the method delete_dummy_customer deletes the Dummy record, and finally the method query_customer queries various records.

ex_sa_04.py
from sqlalchemy.engine import Engine
from sqlalchemy import MetaData
from SQLAlchemy.ex_sa_00 import create_db_engine

import logging

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)


def create_dummy_customer(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']
        with engine.connect() as db_conn:
            dummy = customer_table.insert().values(
                first_name='Dummy',
                last_name='Joker',
                email='dj@nowhere.cc'
            )
            resp = db_conn.execute(dummy)
            if resp.rowcount == 1:
                logging.info("Inserted record for Dummy")
            else:
                logging.info("Failed to insert record for Dummy")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


def query_dummy_customer(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']
        with engine.connect() as db_conn:
            # Select all columns
            dummy = customer_table.select() \
                .where(customer_table.columns.last_name == 'Joker')
            resp = db_conn.execute(dummy)
            if resp.rowcount == 1:
                logging.info("Selected record for Dummy")

                row = resp.fetchone()

                logging.info('First name: %s, Last name: %s, Email: %s' % (row['first_name'], row[2], row['email']))
            else:
                logging.info("Record for Dummy *DOES NOT* exists !!!")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


def update_dummy_customer(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']
        with engine.connect() as db_conn:
            dummy = customer_table.update() \
                .where(customer_table.c.last_name == 'Joker') \
                .values(email='djoker@dummy.io')
            resp = db_conn.execute(dummy)
            if resp.rowcount == 1:
                logging.info("Updated record for Dummy")
            else:
                logging.info("Record for Dummy *DOES NOT* exists !!!")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


def delete_dummy_customer(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']
        with engine.connect() as db_conn:
            dummy = customer_table.delete().where(customer_table.c.last_name == 'Joker')
            resp = db_conn.execute(dummy)
            if resp.rowcount == 1:
                logging.info("Deleted record for Dummy")
            else:
                logging.info("Record for Dummy *DOES NOT* exists !!!")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


def query_customer(engine: Engine):
    if engine.dialect.has_table(engine, 'customer'):
        metadata = MetaData(bind=engine, reflect=True)
        customer_table = metadata.tables['customer']
        with engine.connect() as db_conn:
            # Select all records and all columns
            query = customer_table.select()
            resp = db_conn.execute(query)
            if resp.rowcount > 0:
                for row in resp.fetchall():
                    logging.info('First name: %s, Last name: %s, Email: %s' % (row['first_name'], row[2], row['email']))
                logging.info("-------------------------")
            else:
                logging.info("No record(s) exists !!!")

            # Select all records and only columns last_name and email
            query = customer_table.select().with_only_columns([customer_table.c.last_name, customer_table.c.email])
            resp = db_conn.execute(query)
            if resp.rowcount > 0:
                for row in resp.fetchall():
                    logging.info('Last name: %s, Email: %s' % (row[0], row['email']))
                logging.info("-------------------------")
            else:
                logging.info("No record(s) exists !!!")

            # Select all records and only columns last_name and email order by last_name
            query = customer_table.select().with_only_columns([customer_table.c.last_name, customer_table.c.email]) \
                                  .order_by('last_name')
            resp = db_conn.execute(query)
            if resp.rowcount > 0:
                for row in resp.fetchall():
                    logging.info('Last name: %s, Email: %s' % (row[0], row['email']))
                logging.info("-------------------------")
            else:
                logging.info("No record(s) exists !!!")
    else:
        logging.info("The customer table *DOES NOT* exists !!!")


if __name__ == "__main__":
    db_engine = create_db_engine()
    create_dummy_customer(db_engine)
    query_dummy_customer(db_engine)
    update_dummy_customer(db_engine)
    query_dummy_customer(db_engine)
    delete_dummy_customer(db_engine)
    query_dummy_customer(db_engine)
    query_customer(db_engine)

The following are brief descriptions for some of the Python methods:

To run the Python program ex_sa_04.py, execute the following command:

$ python ex_sa_04.py

The following would be a typical output:

Output.10

2020-04-04 21:48:05,446 - Postgres database url: postgres://bswamina:bswamina$123@localhost:5432/my_test_db
2020-04-04 21:48:05,498 - Connected to the Postgres database !!!
2020-04-04 21:48:05,526 - Inserted record for Dummy
2020-04-04 21:48:05,553 - Selected record for Dummy
2020-04-04 21:48:05,553 - First name: Dummy, Last name: Joker, Email: dj@nowhere.cc
2020-04-04 21:48:05,571 - Updated record for Dummy
2020-04-04 21:48:05,590 - Selected record for Dummy
2020-04-04 21:48:05,590 - First name: Dummy, Last name: Joker, Email: djoker@dummy.io
2020-04-04 21:48:05,612 - Deleted record for Dummy
2020-04-04 21:48:05,636 - Record for Dummy *DOES NOT* exists !!!
2020-04-04 21:48:05,664 - First name: Alice, Last name: Doctor, Email: alice.d@timbuk2.do
2020-04-04 21:48:05,664 - First name: Bob, Last name: Builder, Email: bbuilder@nomansland.bu
2020-04-04 21:48:05,664 - First name: Charlie, Last name: Driver, Email: charlie.driver@vehicles.ve
2020-04-04 21:48:05,664 - -------------------------
2020-04-04 21:48:05,665 - Last name: Doctor, Email: alice.d@timbuk2.do
2020-04-04 21:48:05,665 - Last name: Builder, Email: bbuilder@nomansland.bu
2020-04-04 21:48:05,665 - Last name: Driver, Email: charlie.driver@vehicles.ve
2020-04-04 21:48:05,665 - -------------------------
2020-04-04 21:48:05,666 - Last name: Builder, Email: bbuilder@nomansland.bu
2020-04-04 21:48:05,666 - Last name: Doctor, Email: alice.d@timbuk2.do
2020-04-04 21:48:05,666 - Last name: Driver, Email: charlie.driver@vehicles.ve
2020-04-04 21:48:05,666 - -------------------------

This concludes the exploration of the basic capabilities in the SQLAlchemy Core layer.

References

SQLAlchemy

PySheet SQLAlchemy

Introduction to Docker



© PolarSPARC