How to create an SQL Table and perform UPSERT using SQLAlchemy ORM in 3 minutes
Many engineers often require compact and portable SQL storage to maintain the project’s data
SQLite serves as the perfect solution for this. However, some of programmers find it cumbersome to switch between SQL and Python regularly. This is where a SqlAlchemy ORM can come in handy
In the following steps, I will guide you on how to:
(1) Generate a table via declarative_base()
(2) Convert a pandas.DataFrame
into a list of sqlalchemy.schema.Table
elements
(3) Perform either an UPSERT or an INSERT operation depending on your specific requirements
Create a table
Let’s suppose we need to store different datatypes in our SQL, including JSON and DateTime. Without SQLAlchemy, it may be tricky to store JSON
and DateTime
because of a very limited list of supported data types by SQLite.
Let’s define the table (we will use declarative_base
for that):
from sqlalchemy import Column, Integer, String, create_engine, DateTime, JSON
from sqlalchemy.orm import sessionmaker, declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
uuid = Column(String, primary_key=True)
first_name = Column(String)
email = Column(String)
registration_date = Column(DateTime)
config = Column(JSON)
role = Column(Integer, default=0)
Let’s define an initialization function to create DB. The function will return engine
to re-use it later
def prepare_db(echo=False):
engine = create_engine("sqlite:///database.db", echo=echo)
Base.metadata.create_all(engine)
return engine
prepare_db()
Let’s create a dataset to test that everything works
import time
import pandas as pd
import uuid
import datetime
data = [
[uuid.uuid4(), "Bob", "bob@mail.com", datetime.datetime.now(), {"theme": "dark"}, 3],
[uuid.uuid4(), "Jane", "jain@mail.com", datetime.datetime.now(), {"theme": "light"}, 1],
[uuid.uuid4(), "Chris", "chris@mail.com", datetime.datetime.now(), {"theme": "light"}, 2]
]
columns = ['uuid', 'first_name', 'email', 'registration_date', 'config', 'role']
d = pd.DataFrame(data=data, columns=columns)
You will see something like this
Finally, let’s define a function upsert
that will ignore or update existing values depending onupdate=True
parameter
def upsert(users: dict, engine: sqlalchemy.engine.base.Engine, update=True):
entries_to_update = 0
entries_to_put = []
with sessionmaker(bind=engine)() as sess:
# Find all rows that needs to be updated and merge
for each in (
sess.query(User.uuid)
.filter(User.uuid.in_(users.keys()))
.all()
):
values = users.pop(each.uuid)
entries_to_update += 1
if update:
sess.merge(User(**values))
# Bulk mappings for everything that needs to be inserted
for u in users.values():
entries_to_put.append(u)
sess.bulk_insert_mappings(User, entries_to_put)
sess.commit()
return (
f" inserted:\t{len(entries_to_put)}\n"
f" {'updated' if update else 'not updated'}:\t{str(entries_to_update)}"
)
Nowe you can test it
# Let's update keys of the dic to match 'uuid' value
users = {v['uuid']: v for v in d.to_dict("index").values()}
# and insert data
result = upsert(users, engine=engine)
and see the result
print(result)
>> inserted: 0
>> updated: 3
That is it. Less than 100 lines of code
import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, DateTime, JSON
from sqlalchemy.orm import sessionmaker, declarative_base
import pandas as pd
import uuid
import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
uuid = Column(String, primary_key=True)
first_name = Column(String)
email = Column(String)
registration_date = Column(DateTime)
config = Column(JSON)
role = Column(Integer, default=0)
def prepare_db(echo=False):
engine = create_engine("sqlite:///database.db", echo=echo)
Base.metadata.create_all(engine)
return engine
engine = prepare_db()
data = [
[
str(uuid.uuid4()),
"Bob",
"bob@mail.com",
datetime.datetime.now(),
{"theme": "dark"},
3,
],
[
str(uuid.uuid4()),
"Jane",
"jain@mail.com",
datetime.datetime.now(),
{"theme": "light"},
1,
],
[
str(uuid.uuid4()),
"Chris",
"chris@mail.com",
datetime.datetime.now(),
{"theme": "light"},
2,
],
]
columns = ["uuid", "first_name", "email", "registration_date", "config", "role"]
d = pd.DataFrame(data=data, columns=columns)
def upsert(users: dict, engine: sqlalchemy.engine.base.Engine, update=True):
entries_to_update = 0
entries_to_put = []
with sessionmaker(bind=engine)() as sess:
# Find all rows that needs to be updated and merge
for each in sess.query(User.uuid).filter(User.uuid.in_(users.keys())).all():
values = users.pop(each.uuid)
entries_to_update += 1
if update:
sess.merge(User(**values))
# Bulk mappings for everything that needs to be inserted
for u in users.values():
entries_to_put.append(u)
sess.bulk_insert_mappings(User, entries_to_put)
sess.commit()
return (
f" inserted:\t{len(entries_to_put)}\n" f" {'updated' if update else 'not updated'}:\t{str(entries_to_update)}"
)
# Let's update keys of the dic to match 'uuid' value
users = {v["uuid"]: v for v in d.to_dict("index").values()}
# Finally, let's insert data
result = upsert(users, engine=engine)
print(result)
🖤 If you found this article helpful in any way, please consider giving it a like. Your support motivates me to continue creating content 💛