How to create an SQL Table and perform UPSERT using SQLAlchemy ORM in 3 minutes

Oleg Khomenko
3 min readApr 27, 2023
UPSERT operation

Many engineers often require compact and portable SQL storage to maintain the project’s data

SQLite serves as the perfect solution for this. However, some of programmers find it cumbersome to switch between SQL and Python regularly. This is where a SqlAlchemy ORM can come in handy

In the following steps, I will guide you on how to:
(1) Generate a table via declarative_base()
(2) Convert a pandas.DataFrameinto a list of sqlalchemy.schema.Tableelements
(3) Perform either an UPSERT or an INSERT operation depending on your specific requirements

Create a table

Let’s suppose we need to store different datatypes in our SQL, including JSON and DateTime. Without SQLAlchemy, it may be tricky to store JSON and DateTimebecause of a very limited list of supported data types by SQLite.

Let’s define the table (we will use declarative_base for that):

from sqlalchemy import Column, Integer, String, create_engine, DateTime, JSON
from sqlalchemy.orm import sessionmaker, declarative_base

Base = declarative_base()

class User(Base):
__tablename__ = "users"

uuid = Column(String, primary_key=True)
first_name = Column(String)
email = Column(String)
registration_date = Column(DateTime)
config = Column(JSON)
role = Column(Integer, default=0)

Let’s define an initialization function to create DB. The function will return engine to re-use it later

def prepare_db(echo=False):
engine = create_engine("sqlite:///database.db", echo=echo)
Base.metadata.create_all(engine)
return engine

prepare_db()

Let’s create a dataset to test that everything works

import time
import pandas as pd
import uuid
import datetime

data = [
[uuid.uuid4(), "Bob", "bob@mail.com", datetime.datetime.now(), {"theme": "dark"}, 3],
[uuid.uuid4(), "Jane", "jain@mail.com", datetime.datetime.now(), {"theme": "light"}, 1],
[uuid.uuid4(), "Chris", "chris@mail.com", datetime.datetime.now(), {"theme": "light"}, 2]
]

columns = ['uuid', 'first_name', 'email', 'registration_date', 'config', 'role']

d = pd.DataFrame(data=data, columns=columns)

You will see something like this

Our test data

Finally, let’s define a function upsert that will ignore or update existing values depending onupdate=True parameter

def upsert(users: dict, engine: sqlalchemy.engine.base.Engine, update=True):
entries_to_update = 0
entries_to_put = []

with sessionmaker(bind=engine)() as sess:

# Find all rows that needs to be updated and merge
for each in (
sess.query(User.uuid)
.filter(User.uuid.in_(users.keys()))
.all()
):
values = users.pop(each.uuid)
entries_to_update += 1
if update:
sess.merge(User(**values))

# Bulk mappings for everything that needs to be inserted
for u in users.values():
entries_to_put.append(u)

sess.bulk_insert_mappings(User, entries_to_put)
sess.commit()

return (
f" inserted:\t{len(entries_to_put)}\n"
f" {'updated' if update else 'not updated'}:\t{str(entries_to_update)}"
)

Nowe you can test it

# Let's update keys of the dic to match 'uuid' value
users = {v['uuid']: v for v in d.to_dict("index").values()}

# and insert data
result = upsert(users, engine=engine)

and see the result

print(result)
>> inserted: 0
>> updated: 3

That is it. Less than 100 lines of code

import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, DateTime, JSON
from sqlalchemy.orm import sessionmaker, declarative_base
import pandas as pd
import uuid
import datetime

Base = declarative_base()


class User(Base):
__tablename__ = "users"

uuid = Column(String, primary_key=True)
first_name = Column(String)
email = Column(String)
registration_date = Column(DateTime)
config = Column(JSON)
role = Column(Integer, default=0)


def prepare_db(echo=False):
engine = create_engine("sqlite:///database.db", echo=echo)
Base.metadata.create_all(engine)
return engine


engine = prepare_db()

data = [
[
str(uuid.uuid4()),
"Bob",
"bob@mail.com",
datetime.datetime.now(),
{"theme": "dark"},
3,
],
[
str(uuid.uuid4()),
"Jane",
"jain@mail.com",
datetime.datetime.now(),
{"theme": "light"},
1,
],
[
str(uuid.uuid4()),
"Chris",
"chris@mail.com",
datetime.datetime.now(),
{"theme": "light"},
2,
],
]

columns = ["uuid", "first_name", "email", "registration_date", "config", "role"]

d = pd.DataFrame(data=data, columns=columns)


def upsert(users: dict, engine: sqlalchemy.engine.base.Engine, update=True):
entries_to_update = 0
entries_to_put = []

with sessionmaker(bind=engine)() as sess:

# Find all rows that needs to be updated and merge
for each in sess.query(User.uuid).filter(User.uuid.in_(users.keys())).all():
values = users.pop(each.uuid)
entries_to_update += 1
if update:
sess.merge(User(**values))

# Bulk mappings for everything that needs to be inserted
for u in users.values():
entries_to_put.append(u)

sess.bulk_insert_mappings(User, entries_to_put)
sess.commit()

return (
f" inserted:\t{len(entries_to_put)}\n" f" {'updated' if update else 'not updated'}:\t{str(entries_to_update)}"
)


# Let's update keys of the dic to match 'uuid' value
users = {v["uuid"]: v for v in d.to_dict("index").values()}

# Finally, let's insert data
result = upsert(users, engine=engine)
print(result)

🖤 If you found this article helpful in any way, please consider giving it a like. Your support motivates me to continue creating content 💛

--

--