Peewee Cookbook¶
Below are outlined some of the ways to perform typical database-related tasks with peewee.
Examples will use the following models:
import peewee
class Blog(peewee.Model):
creator = peewee.CharField()
name = peewee.CharField()
class Entry(peewee.Model):
blog = peewee.ForeignKeyField(Blog)
title = peewee.CharField()
body = peewee.TextField()
pub_date = peewee.DateTimeField()
published = peewee.BooleanField(default=True)
Database and Connection Recipes¶
Creating a database connection and tables¶
While it is not necessary to explicitly connect to the database before using it, managing connections explicitly is a good practice. This way if the connection fails, the exception can be caught during the “connect” step, rather than some arbitrary time later when a query is executed.
>>> database = SqliteDatabase('stats.db')
>>> database.connect()
It is possible to use multiple databases (provided that you don’t try and mix models from each):
>>> custom_db = peewee.SqliteDatabase('custom.db')
>>> class CustomModel(peewee.Model):
... whatev = peewee.CharField()
...
... class Meta:
... database = custom_db
...
>>> custom_db.connect()
>>> CustomModel.create_table()
Best practice: define a base model class that points at the database object you wish to use, and then all your models will extend it:
custom_db = peewee.SqliteDatabase('custom.db')
class CustomModel(peewee.Model):
class Meta:
database = custom_db
class Blog(CustomModel):
creator = peewee.CharField()
name = peewee.TextField()
class Entry(CustomModel):
# etc, etc
Using with Postgresql¶
Point models at an instance of PostgresqlDatabase
.
psql_db = peewee.PostgresqlDatabase('my_database', user='code')
class PostgresqlModel(peewee.Model):
"""A base model that will use our MySQL database"""
class Meta:
database = psql_db
class Blog(PostgresqlModel):
creator = peewee.CharField()
# etc, etc
Using with MySQL¶
Point models at an instance of MySQLDatabase
.
mysql_db = peewee.MySQLDatabase('my_database', user='code')
class MySQLModel(peewee.Model):
"""A base model that will use our MySQL database"""
class Meta:
database = mysql_db
class Blog(MySQLModel):
creator = peewee.CharField()
# etc, etc
# when you're ready to start querying, remember to connect
mysql_db.connect()
Multi-threaded applications¶
Some database engines may not allow a connection to be shared across threads, notably
sqlite. If you would like peewee to maintain a single connection per-thread,
instantiate your database with threadlocals=True
:
concurrent_db = SqliteDatabase('stats.db', threadlocals=True)
Deferring initialization¶
Sometimes the database information is not known until run-time, when it might
be loaded from a configuration file/etc. In this case, you can “defer” the initialization
of the database by passing in None
as the database_name.
deferred_db = peewee.SqliteDatabase(None)
class SomeModel(peewee.Model):
class Meta:
database = deferred_db
If you try to connect or issue any queries while your database is uninitialized you will get an exception:
>>> deferred_db.connect()
Exception: Error, database not properly initialized before opening connection
To initialize your database, you simply call the init
method with the database_name
and any additional kwargs:
database_name = raw_input('What is the name of the db? ')
deferred_db.init(database_name)
Creating, Reading, Updating and Deleting¶
Creating a new record¶
You can use the Model.create()
method on the model:
>>> Blog.create(creator='Charlie', name='My Blog')
<__main__.Blog object at 0x2529350>
This will INSERT
a new row into the database. The primary key will automatically
be retrieved and stored on the model instance.
Alternatively, you can build up a model instance programmatically and then save it:
>>> blog = Blog()
>>> blog.creator = 'Chuck'
>>> blog.name = 'Another blog'
>>> blog.save()
>>> blog.id
2
Updating existing records¶
Once a model instance has a primary key, any attempt to re-save it will result
in an UPDATE
rather than another INSERT
:
>>> blog.save()
>>> blog.id
2
>>> blog.save()
>>> blog.id
2
If you want to update multiple records, issue an UPDATE
query. The following
example will update all Entry
objects, marking them as “published”, if their
pub_date is less than today’s date.
>>> update_query = Entry.update(published=True).where(pub_date__lt=datetime.today())
>>> update_query.execute()
4 # <--- number of rows updated
For more information, see the documentation on UpdateQuery
.
Deleting a record¶
To delete a single model instance, you can use the Model.delete_instance()
shortcut:
>>> blog = Blog.get(id=1)
>>> blog.delete_instance()
1 # <--- number of rows deleted
>>> Blog.get(id=1)
BlogDoesNotExist: instance matching query does not exist:
SQL: SELECT "id", "creator", "name" FROM "blog" WHERE "id" = ? LIMIT 1
PARAMS: [1]
To delete an arbitrary group of records, you can issue a DELETE
query. The
following will delete all Entry
objects that are a year old.
>>> delete_query = Entry.delete().where(pub_date__lt=one_year_ago)
>>> delete_query.execute()
7 # <--- number of entries deleted
For more information, see the documentation on DeleteQuery
.
Selecting a single record¶
You can use the Model.get()
method to retrieve a single instance matching
the given query (passed in as a mix of Q
objects and keyword arguments).
This method is a shortcut that calls Model.select()
with the given query,
but limits the result set to 1. Additionally, if no model matches the given query,
a DoesNotExist
exception will be raised.
>>> Blog.get(id=1)
<__main__.Blog object at 0x25294d0>
>>> Blog.get(id=1).name
u'My Blog'
>>> Blog.get(creator='Chuck')
<__main__.Blog object at 0x2529410>
>>> Blog.get(id=1000)
BlogDoesNotExist: instance matching query does not exist:
SQL: SELECT "id", "creator", "name" FROM "blog" WHERE "id" = ? LIMIT 1
PARAMS: [1000]
For more information see notes on SelectQuery
and Querying API in general.
Selecting multiple records¶
To simply get all instances in a table, call the Model.select()
method:
>>> for blog in Blog.select():
... print blog.name
...
My Blog
Another blog
When you iterate over a SelectQuery
, it will automatically execute
it and start returning results from the database cursor. Subsequent iterations
of the same query will not hit the database as the results are cached.
Another useful note is that you can retrieve instances related by ForeignKeyField
by iterating. To get all the related instances for an object, you can query the related name.
Looking at the example models, we have Blogs and Entries. Entry has a foreign key to Blog,
meaning that any given blog may have 0..n entries. A blog’s related entries are exposed
using a SelectQuery
, and can be iterated the same as any other SelectQuery:
>>> for entry in blog.entry_set:
... print entry.title
...
entry 1
entry 2
entry 3
entry 4
The entry_set
attribute is just another select query and any methods available
to SelectQuery
are available:
>>> for entry in blog.entry_set.order_by(('pub_date', 'desc')):
... print entry.title
...
entry 4
entry 3
entry 2
entry 1
Filtering records¶
>>> for entry in Entry.select().where(blog=blog, published=True):
... print '%s: %s (%s)' % (entry.blog.name, entry.title, entry.published)
...
My Blog: Some Entry (True)
My Blog: Another Entry (True)
>>> for entry in Entry.select().where(pub_date__lt=datetime.datetime(2011, 1, 1)):
... print entry.title, entry.pub_date
...
Old entry 2010-01-01 00:00:00
You can also filter across joins:
>>> for entry in Entry.select().join(Blog).where(name='My Blog'):
... print entry.title
Old entry
Some Entry
Another Entry
If you are already familiar with Django’s ORM, you can use the “double underscore” syntax:
>>> for entry in Entry.filter(blog__name='My Blog'):
... print entry.title
Old entry
Some Entry
Another Entry
To perform OR lookups, use the special Q
object. These work in
both calls to filter()
and where()
:
>>> User.filter(Q(staff=True) | Q(superuser=True)) # get staff or superusers
To perform lookups against another column in a given row, use the F
object:
>>> Employee.filter(salary__lt=F('desired_salary'))
Sorting records¶
>>> for e in Entry.select().order_by('pub_date'):
... print e.pub_date
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57
>>> for e in Entry.select().order_by(peewee.desc('pub_date')):
... print e.pub_date
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00
You can also order across joins. Assuming you want to order entries by the name of the blog, then by pubdate desc:
>>> qry = Entry.select().join(Blog).order_by(
... (Blog, 'name'),
... (Entry, 'pub_date', 'DESC'),
... )
>>> qry.sql()
('SELECT t1.* FROM entry AS t1 INNER JOIN blog AS t2 ON t1.blog_id = t2.id ORDER BY t2.name ASC, t1.pub_date DESC', [])
Paginating records¶
The paginate method makes it easy to grab a “page” or records – it takes two parameters, page_number, and items_per_page:
>>> for entry in Entry.select().order_by('id').paginate(2, 10):
... print entry.title
...
entry 10
entry 11
entry 12
entry 13
entry 14
entry 15
entry 16
entry 17
entry 18
entry 19
Counting records¶
You can count the number of rows in any select query:
>>> Entry.select().count()
100
>>> Entry.select().where(id__gt=50).count()
50
Iterating over lots of rows¶
To limit the amount of memory used by peewee when iterating over a lot of rows (i.e.
you may be dumping data to csv), use the iterator()
method on the QueryResultWrapper
.
This method allows you to iterate without caching each model returned, using much less
memory when iterating over large result sets:
# let's assume we've got 1M stat objects to dump to csv
stats_qr = Stat.select().execute()
# our imaginary serializer class
serializer = CSVSerializer()
# loop over all the stats and serialize
for stat in stats_qr.iterator():
serializer.serialize_object(stat)
For simple queries you can see further speed improvements by using the SelectQuery.naive()
query method. See the documentation for details on this optimization.
stats_query = Stat.select().naive() # note we are calling "naive()"
stats_qr = stats_query.execute()
for stat in stats_qr.iterator():
serializer.serialize_object(stat)
Performing atomic updates¶
Use the special F
object to perform an atomic update:
>>> MessageCount.update(count=F('count') + 1).where(user=some_user)
Aggregating records¶
Suppose you have some blogs and want to get a list of them along with the count of entries in each. First I will show you the shortcut:
query = Blog.select().annotate(Entry)
This is equivalent to the following:
query = Blog.select({
Blog: ['*'],
Entry: [Count('id')],
}).group_by(Blog).join(Entry)
The resulting query will return Blog objects with all their normal attributes plus an additional attribute ‘count’ which will contain the number of entries. By default it uses an inner join if the foreign key is not nullable, which means blogs without entries won’t appear in the list. To remedy this, manually specify the type of join to include blogs with 0 entries:
query = Blog.select().join(Entry, 'left outer').annotate(Entry)
You can also specify a custom aggregator:
query = Blog.select().annotate(Entry, peewee.Max('pub_date', 'max_pub_date'))
Let’s assume you have a tagging application and want to find tags that have a certain number of related objects. For this example we’ll use some different models in a Many-To-Many configuration:
class Photo(Model):
image = CharField()
class Tag(Model):
name = CharField()
class PhotoTag(Model):
photo = ForeignKeyField(Photo)
tag = ForeignKeyField(Tag)
Now say we want to find tags that have at least 5 photos associated with them:
>>> Tag.select().join(PhotoTag).join(Photo).group_by(Tag).having('count(*) > 5').sql()
SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2
ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3
ON t2."photo_id" = t3."id"
GROUP BY
t1."id", t1."name"
HAVING count(*) > 5
Suppose we want to grab the associated count and store it on the tag:
>>> Tag.select({
... Tag: ['*'],
... Photo: [Count('id', 'count')]
... }).join(PhotoTag).join(Photo).group_by(Tag).having('count(*) > 5').sql()
SELECT t1."id", t1."name", COUNT(t3."id") AS count
FROM "tag" AS t1
INNER JOIN "phototag" AS t2
ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3
ON t2."photo_id" = t3."id"
GROUP BY
t1."id", t1."name"
HAVING count(*) > 5
SQL Functions, Subqueries and “Raw expressions”¶
Suppose you need to want to get a list of all users whose username begins with “a”.
There are a couple ways to do this, but one method might be to use some SQL functions
like LOWER
and SUBSTR
. To use arbitrary SQL functions, use the special R
object to construct queries:
# select the users' id, username and the first letter of their username, lower-cased
query = User.select(['id', 'username', R('LOWER(SUBSTR(username, 1, 1))', 'first_letter')])
# now filter this list to include only users whose username begins with "a"
a_users = query.where(R('first_letter=%s', 'a'))
>>> for user in a_users:
... print user.first_letter, user.username
This same functionality could be easily exposed as part of the where clause, the only difference being that the first letter is not selected and therefore not an attribute of the model instance:
a_users = User.filter(R('LOWER(SUBSTR(username, 1, 1)) = %s', 'a'))
We can write subqueries as part of a SelectQuery
, for example counting
the number of entries on a blog:
entry_query = R('(SELECT COUNT(*) FROM entry WHERE entry.blog_id=blog.id)', 'entry_count')
blogs = Blog.select(['id', 'name', entry_query]).order_by(('entry_count', 'desc'))
for blog in blogs:
print blog.title, blog.entry_count
It is also possible to use subqueries as part of a where clause, for example finding blogs that have no entries:
no_entry_query = R('NOT EXISTS (SELECT * FROM entry WHERE entry.blog_id=blog.id)')
blogs = Blog.filter(no_entry_query)
for blog in blogs:
print blog.name, ' has no entries'
Working with transactions¶
Context manager¶
You can execute queries within a transaction using the transaction
context manager,
which will issue a commit if all goes well, or a rollback if an exception is raised:
db = SqliteDatabase(':memory:')
with db.transaction():
blog.delete_instance(recursive=True) # delete blog and associated entries
Decorator¶
Similar to the context manager, you can decorate functions with the commit_on_success
decorator:
db = SqliteDatabase(':memory:')
@db.commit_on_success
def delete_blog(blog):
blog.delete_instance(recursive=True)
Changing autocommit behavior¶
By default, databases are initialized with autocommit=True
, you can turn this
on and off at runtime if you like. The behavior below is roughly the same as the
context manager and decorator:
db.set_autocommit(False)
try:
blog.delete_instance(recursive=True)
except:
db.rollback()
raise
else:
db.commit()
finally:
db.set_autocommit(True)
If you would like to manually control every transaction, simply turn autocommit off when instantiating your database:
db = SqliteDatabase(':memory:', autocommit=False)
Blog.create(name='foo blog')
db.commit()
Introspecting databases¶
If you’d like to generate some models for an existing database, you can try out the database introspection tool “pwiz” that comes with peewee.
Usage:
python pwiz.py my_postgresql_database
It works with postgresql, mysql and sqlite:
python pwiz.py test.db --engine=sqlite
pwiz will generate code for:
- database connection object
- a base model class to use this connection
- models that were introspected from the database tables
The generated code is written to stdout.