Tutorial: Using Cerbos with SQLAlchemy
If you maintain an application that handles any state at all, it’s likely that you’ve had to figure out how to both store that state, as well as how to load it into the application layer and act on it in any which way your business logic requires.
Perhaps, in your case, a lot of the computational "heavy lifting" is done by the database, and the application is just an abstraction layer where you write your database queries. Or maybe on the contrary, the database is just a basic store which provides the data for the application to manage all of the tricky logic itself.
Regardless, there’s many ways to build an application (as the common idiom doesn’t go). Application design is a vast and complex process, but one thing we can do to make that process more manageable is to use tools that take a lot of the implementation complexity away…
Enter SQLAlchemy
SQLAlchemy has established itself as one of the standards in database abstraction layers in the Python world. It offers two distinct ways of communicating with the DB; via it’s lower-level Core
SQL abstraction toolkit, or via it’s ORM
component, which extends Core
to offer some convenient, higher-level abstractions.
What we’re building
In this run-through, we’ll be building an application that manages a "Contact directory", enabling users to keep track of their contacts, along with useful information such as employment information (current company etc).
We’re going to explore how to model our data, map it to Cerbos entities, and interact with it in a clean, efficient and reusable way.
We’ll be building a Python FastAPI server and securing it using the following Cerbos APIs:
-
CheckResources
: e.g. canUser X
from the Sales department accessContact Y
? -
PlanResources
: e.g. which contacts canUser X
from the Marketing department access?
The full source code for this demo can be found in our repo here.
Prerequisites
-
Python 3.10
-
SQLAlchemy 1.4 / 2.0
-
Docker running locally.
The database
Setting up our models
We have the following entities within our application:
-
User
: the person interacting with the application -
Contact
: a person within aUser’s
directory (aUser
can have manyContacts
) -
Company
: the company that aContact
is currently employed with (aCompany
can have manyContacts
)
In order to persist and manage these models, we need to be able to represent them in code in a way that can be mapped to our database layer. This is where SQLAlchemy comes in.
SQLAlchemy allows us to represent our relational database tables as classes, with attributes representing the columns of those tables. An object instance of one of these classes will represent a single row in the table. An example is shown below:
from sqlalchemy import Column, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = Column(String, primary_key=True)
username = Column(String(255))
email = Column(String(255))
# ...
It also allows us to go a step further, and model relationships between these tables (via variations of table joins). In our case, we want to be able to model the one-to-many relationships mentioned above:
class User(Base):
__tablename__ = "user"
# ...
contacts = relationship("Contact", back_populates="owner")
class Contact(Base):
__tablename__ = "contact"
id = Column(String, primary_key=True)
# ...
owner_id = Column(String, ForeignKey("user.id"))
owner = relationship("User", back_populates="contacts", lazy="joined")
You can see how we relate the two tables via the relationship
function. In setting a relationship
field on each linked class, we establish a bidirectional relationship between the objects (with the "reverse" side being a many-to-one). In this particular case, the ForeignKey
placed on the child table infers the many-to-one side, and as such, allows for child table objects to reference the parent via child.owner
/child.owner_id
. The lazy="joined"
parameter indicates to SQLAlchemy that we’d like to lazily load the related object at attribute access time.
The full table definitions can be found in this module.
You can see how SQLAlchemy ORM entity objects can then be used to reference each another in code:
from sqlalchemy import select
# Session is a SQLAlchemy sessionmaker instance
with Session() as s:
user = s.scalars(select(User).where(User.username == "gandalf")).first()
user.email # "greybeard99@midearth.com"
# Note, in order to reference contacts with a `lazy` loading pattern, the
# attribute lookup needs to occur in the context of a session - hence it's
# in the Session() context manager scope.
contact = user.contacts[0]
contact.user_id == user.id # True
Check out the excellent SQLAlchemy documentation for more information on relationships.
Connecting to our database
SQLAlchemy is a wonderful abstraction layer between Python and a whole array of different relational databases. By specifying the "dialect" when connecting to a DB engine, we tell it which relational database it is connecting to.
For our demo, we’ll be setting up a simple, ephemeral SQLite instance. We won’t even persist it to disk; each time the application is started, it’ll build the DB in memory and populate it with a migration script.
We create the engine like so:
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
engine = create_engine(
"sqlite://", # the absence of a specified URL infers a `:memory:` database (e.g. no disk persistence)
connect_args={"check_same_thread": False}, # in FastAPI, when using sync (def) functions, more than one thread could interact with the database
# for the same request, so we need to make SQLite know that it should allow that
poolclass=StaticPool, # Use a static pool to persist state with an in memory instance of sqlite
)
Tables and metadata
To start using the SQLAlchemy Expression Language, we will want to have
Table
objects constructed that represent all of the database tables we are interested in working with. EachTable
may be declared, meaning we explicitly spell out in source code what the table looks like, or may be reflected, which means we generate the object based on what’s already present in a particular database.Whether we will declare or reflect our tables, we start out with a collection that will be where we place our tables known as the MetaData object. This object is essentially a facade around a Python dictionary that stores a series of Table objects keyed to their string name.
Our classes above inherit from a base class generated from a call to declarative_base()
. This "declarative" method allows us to declare user-defined classes and Table
metadata at once. Each time a class inherits from this Base
class, it is added to this collection, or registry
. The following call will generate the database tables from the metadata:
Base.metadata.create_all(engine)
Populating the database
We can then generate a session
from our engine
instance, and use it to populate our newly generated tables:
with Session() as s:
coca_cola = Company(name="Coca Cola")
s.add(coca_cola)
s.commit()
john = User(
name="John",
username="john",
email="john@cerbos.demo",
role="user",
department="Sales",
)
s.add(john)
s.commit()
s.add(Contact(
first_name="Nick",
last_name="Smyth",
marketing_opt_in=True,
is_active=True,
owner=john,
company=coca_cola,
))
s.commit()
You can see in the example above (in the Contact
definition) how we can define relationships by referencing instances of the table classes.
Again, the full source code for this section can be found here.
The API
We now have a database which can be declared and populated on demand, and models which allow us to interact with it. The next step is to build an API layer to expose the data, and to secure the endpoints and resources with Cerbos.
We’ll be creating our server with FastAPI. The source code for this section can be found here.
Dependency injection with FastAPI dependables
FastAPI allows you to define callables called "dependables", which are functions that take all of the same arguments as a "path operation function" and return whatever we might require for the handler. The Depends(fn)
class takes the callable and on execution will return the default argument, if required. We define a few dependables which we can use across our endpoints.
Firstly, one to retrieve the cerbos Principal instance from the username (which in itself is retrieved via the FastAPI provided HTTPBasic
dependable):
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
security = HTTPBasic()
def get_principal(credentials: HTTPBasicCredentials = Depends(security)) -> Principal:
username = credentials.username
with Session() as s:
# retrieve `user` from the DB to access the attributes
user = s.scalars(select(User).where(User.username == username)).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return Principal(user.id, roles={user.role}, attr={"department": user.department})
This can then be used on all endpoints to authenticate the user, and then assert that the user exists in the database:
@app.get("/contacts")
def get_contacts(p: Principal = Depends(get_principal)):
# do something with the principal "p"
We then create a dependable which attempts to retrieve the Contact
from the database based on a path parameter in the URL: contact_id
:
def get_db_contact(contact_id: str) -> Contact:
with Session() as s:
contact = s.scalars(select(Contact).where(Contact.id == contact_id)).first()
if contact is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Contact not found",
)
return contact
This in turn can then be nested in another dependable which attempts to return the Contact
Resource instance:
def get_resource_from_contact(
db_contact: Contact = Depends(get_db_contact),
) -> Resource:
return Resource(
id=db_contact.id,
kind="contact",
attr=jsonable_encoder(
{n.name: getattr(db_contact, n.name) for n in Contact.__table__.c}
),
)
These can then be used to attempt to retrieve a Cerbos Resource
, or a SQLAlchemy Contact
instance respectively, on routes which include the contact_id
path parameter:
@app.delete("/contacts/{contact_id}")
def delete_contact(
r: Resource = Depends(get_resource_from_contact),
p: Principal = Depends(get_principal),
):
# do something with the resource
@app.get("/contacts/{contact_id}")
def get_contact(
db_contact: Contact = Depends(get_db_contact),
p: Principal = Depends(get_principal)
):
# optionally, call the dependable direct to retrieve the resource from the db Contact instance
resource = get_resource_from_contact(db_contact)
Defining our API schema
Some of our routes will require specific parameters in the payload in order to carry out the given request. For example, we might need routes for creating or updating new or existing Contacts
.
FastAPI provides a nice interface to enforce request schema via "Pydantic" models:
from pydantic import BaseModel
class ContactSchema(BaseModel):
first_name: str
last_name: str
owner_id: str
company_id: str
is_active: bool = False
marketing_opt_in: bool = False
class Config:
# tell the Pydantic model to read the data even if it is not a dict, but an ORM model
# (or any other arbitrary object with attributes)
orm_mode = True
Once we’ve defined these schema models, we can use them in the FastAPI routes:
@app.post("/contacts/new")
def create_contact(
contact_schema: ContactSchema, p: Principal = Depends(get_principal)
):
with CerbosClient(host="http://localhost:3592") as c:
if not c.is_allowed(
"create",
p,
Resource(
id="new",
kind="contact",
),
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized"
)
db_contact = Contact(**contact_schema.dict())
with Session() as s:
s.add(db_contact)
s.commit()
s.refresh(db_contact)
return {"result": "Created contact", "contact": db_contact}
FastAPI will automatically validate the input payload to ensure required fields are present, and types are correct (as well as other optional checks). We can then use the schema model attributes to generate SQLAlchemy models as you can see above.
The schema for our demo can be found in this module.
Protecting the routes
Now we have our dependables and API schema defined, we can start to define the routes and secure them using Cerbos.
We can make granular checks against specific principal:resource:action
mappings using Cerbos' CheckResources
API (via the is_allowed
method):
@app.get("/contacts/{contact_id}")
def get_contact(
db_contact: Contact = Depends(get_db_contact), p: Principal = Depends(get_principal)
):
r = get_resource_from_contact(db_contact)
with CerbosClient(host="http://localhost:3592") as c:
if not c.is_allowed("read", p, r):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized"
)
return db_contact
However, sometimes, we want to establish which resources a Principal has access to. To do this, we can use the PlanResources
API.
The query planner
If we provide Cerbos with a Principal
and a description of the resource they’re trying to access (ResourceDesc
), we can ask it for a query plan.
The PlanResources
call returns one of the following:
-
KIND_ALWAYS_ALLOWED
-
KIND_ALWAYS_DENIED
-
KIND_CONDITIONAL
In the final case, it’ll also return an abstract syntax tree (AST) of the condition that must be satisfied to allow the action:
@app.get("/contacts")
def get_contacts(p: Principal = Depends(get_principal)):
with CerbosClient(host="http://localhost:3592") as c:
rd = ResourceDesc("contact")
# Get the query plan for "read" action
plan = c.plan_resources("read", p, rd)
print(json.dumps(plan.to_dict(), sort_keys=False, indent=4))
Cerbos provides a SQLAlchemy adapter library with an API that takes the query plan response, and uses it to generate a SQLAlchemy query object. Continuing below:
query = get_query(
plan,
Contact,
{
"request.resource.attr.owner_id": User.id,
"request.resource.attr.department": User.department,
"request.resource.attr.is_active": Contact.is_active,
"request.resource.attr.marketing_opt_in": Contact.marketing_opt_in,
},
[(User, Contact.owner_id == User.id)],
)
# Optionally reduce the returned columns (`with_only_columns` returns a new `select`)
# NOTE: this is wise to do as standard, to avoid implicit joins generated by sqla `relationship()` usage, if present
query = query.with_only_columns(
Contact.id,
Contact.first_name,
Contact.last_name,
Contact.is_active,
Contact.marketing_opt_in,
)
print(query.compile(compile_kwargs={"literal_binds": True}))
The provided get_query
function accepts the following parameters, respectively:
-
query plan
-
a primary SQLAlchemy
Table
or ORMDeclarativeMeta
type (theFROM table
part of the resulting query) -
the "attribute map" - responsible for mapping the Cerbos resource attribute strings to the associated SQLAlchemy columns (type
Column
or ORMInstrumentedAttribute
) -
OPTIONAL: list of explicit table joins - required only if more than one table specified in primary table + attribute map
It returns a SQLAlchemy Selectable
, which can be further extended/reduced, and then used to query the database:
# ...
with Session() as s:
rows = s.execute(query).fetchall()
return rows
Run the server
Now we understand how everything works, let’s fire up the server and the Cerbos PDP, and test it out.
Clone the repo:
git clone git@github.com:cerbos/python-sqlalchemy-cerbos.git
cd python-sqlalchemy-cerbos
Start up the Cerbos PDP instance docker container:
cd cerbos
./start.sh
Install Python dependencies:
# from project root
pdm install
Start the FastAPI dev server:
pdm run demo
Example requests
Get a single contact
Sales user, contact owned ⇒ 200 OK
curl -i http://john@localhost:8000/contacts/1
Sales user, contact not owned or active ⇒ 403 Forbidden
curl -i http://john@localhost:8000/contacts/4
Create a contact
Sales user ⇒ 200 OK
curl -i http://john@localhost:8000/contacts/new \
-H 'Content-Type: application/json' \
-X POST \
-d '{"first_name": "frodo", "last_name": "baggins", "owner_id": "2", "company_id": "2"}'
Marketing user (e.g. geri
) ⇒ 403 Forbidden
Delete a contact
Contact owner ⇒ 200 OK
curl -i http://john@localhost:8000/contacts/1 -X DELETE
Non-owner ⇒ 403 Forbidden
curl -i http://john@localhost:8000/contacts/3 -X DELETE
If you have any questions or feedback, or to chat to us and other like-minded technologists, please join our Slack community!