python轻量级orm框架 peewee常用功能速查
Peewee是一种简单而小的ORM。它有很少的(但富有表现力的)概念,使它易于学习和直观的使用。
peewee常用功能速查
peewee 简介
Peewee是一种简单而小的ORM。它有很少的(但富有表现力的)概念,使它易于学习和直观的使用。
常见orm数据库框架
- Django ORM
- peewee
- SQLAlchemy
Django ORM
优点
:
易用,学习曲线短
和Django紧密集合,用Django时使用约定俗成的方法去操作数据库缺点
:
QuerySet速度不给力,会逼我用Mysqldb来操作原生sql语句。
Peewee
优点
:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成缺点
:
不支持自动化 schema 迁移
不能像Django那样,使线上的mysql表结构生成结构化的模型。
SQLAlchemy
优点
:
巨牛逼的API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询缺点
:
工作单元概念不常见
重量级 API,导致长学习曲线
peewee 简单demo
import datetime
from peewee import *
db = MySQLDatabase(
"test", host="127.0.0.1", port=3306, user="root", passwd="123456"
)
db.connect()
class BaseModel(Model):
class Meta:
database = db
class Person(BaseModel):
name = CharField()
age = IntegerField()
height = IntegerField()
sex = BooleanField(default=\'male\')
if __name__ == "__main__":
Person.create_table()
# 创建
Person.create(name=\'tom\', age=30, height=177)
# 查询
res = Person.select().where(Person.name==\'tom\')
print(res)
print(res[0])
print(res[0].name)
print(res[0].age)
print(res[0].height)
print(res[0].sex)
>>>>
SELECT `t1`.`id`, `t1`.`name`, `t1`.`age`, `t1`.`High`, `t1`.`sex` FROM `person` AS `t1` WHERE (`t1`.`name` = \'ljk\')
1
tom
30
177
True
Model 和 Field 关系
在ORM对象关系数据库中 Model是一个类,映射到数据库表中就是一个表。Filed是字段,映射到表中就是字段。model实例就是数据库中的一条记录。在peewee中Model和Field的关系如下:
Thing | 对应关系 |
---|---|
Model 类 | 表 |
Field 实例 | 表中字段 |
Model 实例 | 表中数据 |
数据库连接和model类定义的典型使用
import datetime
from peewee import *
db = SqliteDatabase(\'my_app.db\')
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
username = CharField(unique=True)
class Tweet(BaseModel):
user = ForeignKeyField(User, backref=\'tweets\')
message = TextField()
created_date = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
- 创建一个数据库实例
db = SqliteDatabase(\'my_app.db\')
- 创建一个基础model类
class BaseModel(Model):
class Meta:
database = db
定义一个用于建立数据库连接的基模类是一种推荐的做法,因为将不必为后续表指定数据库。
3.定义一个普通 model 类
class User(BaseModel):
username = CharField(unique=True)
模型定义使用的是其他流行的orm(如SQLAlchemy或Django)中看到的声明式风格。因为User继承了BaseModel 类,所以User类可以继承数据库连接。
User已经明确定义了一个具有唯一约束的用户名列。因为我们没有指定主键,peewee 会自动添加一个自增整数主键字段,名为 id。没有指定主键的表peewee会自动创建一个名字为id的自增主键。
Model 模型
为了不污染model的命名空间,model的配置放在特殊的元属性类中。这是从Django的框架中借鉴过来的。
contacts_db = SqliteDatabase(\'contacts.db\')
class Person(Model):
name = CharField()
class Meta:
database = contacts_db
在简单model示例中,你会注意到,我们创建了一个定义数据库的BaseModel,然后扩展了它。这是定义数据库和创建模型的首选方法。
你可以通过ModelClass._meta
来使用:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: type object \'Person\' has no attribute \'Meta\'
>>> Person._meta
<peewee.modeloptions object="" at="" 0x7f51a2f03790="">
ModelOptions
实现了几个查看model metadata的方法:
{\'id\': <peewee.autofield object="" at="" 0x7f51a2e92750="">,
\'name\': <peewee.charfield object="" at="" 0x7f51a2f0a510="">}
>>> Person._meta.primary_key
<peewee.autofield object="" at="" 0x7f51a2e92750="">
>>> Person._meta.database
<peewee.sqlitedatabase object="" at="" 0x7f519bff6dd0="">
Model 在ORM数据中就是一张表,那么表的属性可以有如下选项。它们是被定义在Meta中元数据。
Option | Meaning | 是否可继承? |
---|---|---|
database | 指定表创建依附的数据库 | yes |
table_name | 表名 | no |
table_function | 生成表名的函数 | yes |
indexes | 多行索引 | yes |
primary_key | 主键 | yes |
constraints | 表约束的列表 | yes |
schema | 模型的数据库架构 | yes |
only_save_dirty | 调用model.save()时,仅保存脏字段,指定字段? | yes |
options | 创建表扩展的选项字典 | yes |
table_settings | 在右括号后设置字符串的列表 | yes |
temporary | 指示临时表 | yes |
legacy_table_names | 使用旧表名生成(默认情况下启用) | yes |
depends_on | 指示此表依赖于另一个表进行创建 | no |
without_rowid | 指示表不应具有rowid(仅限SQLite) | no |
strict_tables | 指示严格的数据类型(仅限SQLite,3.37+) | yes |
Filed 字段
Field类是用来将Model属性映射到数据库列。每个字段类型都有一个相应的SQL存储类,将python数据类型转化为基本的存储类型。
当创建Model类时,fields被定义成类的属性。它看起来和django的数据库框架很类似。
class User(Model):
username = CharField()
join_date = DateTimeField()
about_me = TextField()
在上面的例子中,因为没有field有主键属性primary_key=True,所以会创建一个名字是id的自增主键。
peewee中可用的字段包括:
字段类型 | Sqlite | Postgresql | MySQL |
---|---|---|---|
AutoField | integer | serial | integer |
BigAutoField | integer | bigserial | bigint |
IntegerField | integer | integer | integer |
BigIntegerField | integer | bigint | bigint |
SmallIntegerField | integer | smallint | smallint |
IdentityField | not supported | int identity | not supported |
FloatField | real | real | real |
DoubleField | real | double precision | double precision |
DecimalField | decimal | numeric | numeric |
CharField | varchar | varchar | varchar |
FixedCharField | char | char | char |
TextField | text | text | text |
BlobField | blob | bytea | blob |
BitField | integer | bigint | bigint |
BigBitField | blob | bytea | blob |
UUIDField | text | uuid | varchar(40) |
BinaryUUIDField | blob | bytea | varbinary(16) |
DateTimeField | datetime | timestamp | datetime |
DateField | date | date | date |
TimeField | time | time | time |
TimestampField | integer | integer | integer |
IPField | integer | bigint | bigint |
BooleanField | integer | boolean | bool |
BareField | untyped | not supported | not supported |
ForeignKeyField | integer | integer | integer |
字段初始化参数
所有字段类型接受的参数及其默认值
- null = False 允许空值
- index = False 创建索引
- unique = False 创建唯一索引
- column_name = None 显式指定数据库中的列名
- default = None 默认值,可以使任意值或可调用对象
- primary_key = False 指明主键
- constraints = None 约束条件
- sequence = None 序列名字(如果数据库支持)
- collation = None 排序字段
- unindexed = False 虚表上的字段不应该被索引
- choices = None 两种可选项:value display
- help_text = None 帮助说明字段。表示此字段的任何有用文本的字符串
- verbose_name = None 表示此字段的用户友好名称的字符串
- index_type = None 索引类型
字段特有参数
在一些字段中有些自己特有的参数,如下:
字段类型 | 特有参数 |
---|---|
CharField | max_length |
FixedCharField | max_length |
DateTimeField | formats |
DateField | formats |
TimeField | formats |
TimestampField | resolution, utc |
DecimalField | max_digits, decimal_places, auto_round, rounding |
ForeignKeyField | model, field, backref, on_delete, on_update, deferrable lazy_load |
BareField | adapt |
字段默认参数
peewee可以为每一个字段提供默认值,比如给intergerField 默认值0而不是NULL。你可以申明字段时指定默认值:
class Message(Model):
context = TextField()
read_count = IntegerField(default=0)
在某些情况下,默认值是动态的会更有意义。一个可能的场景就是当前时间。Peewee 允许您在这些情况下指定一个函数,该函数的返回值将在创建对象时使用。注意,使用时只提供了函数,并不需要实际调用它。
class Message(Model):
context = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
如果你正在使用一个接受可变类型(list, dict等)的字段,并想提供一个默认值。将默认值包装在一个简单的函数中是个好主意,这样,多个模型实例就不会共享对同一底层对象的引用。
def house_defaults():
return {\'beds\': 0, \'baths\': 0}
class House(Model):
number = TextField()
street = TextField()
attributes = JSONField(default=house_defaults)
索引
peewee可以通过单列索引和多列索引。可选地包括UNIQUE约束。Peewee还支持对模型和字段的用户定义约束。
单列索引
单列索引使用字段初始化参数定义。下面的示例在用户名字段上添加一个惟一索引,在电子邮件字段上添加一个普通索引
class User(Model):
username = CharField(unique=True)
email = CharField(index=True)
在列上添加用户定义的约束。你可以使用constraints参数。例如,您可能希望指定一个默认值,或者添加一个CHECK约束
class Product(Model):
name = CharField(unique=True)
price = DecimalField(constraints=[Check(\'price < 10000\')])
created = DateTimeField(
constraints=[SQL("DEFAULT (datetime(\'now\'))")])
多列索引
可以使用嵌套元组将多列索引定义为元属性。每个表的索引是一个2元组,第一部分是索引字段名称的元组,可以有多个字段,第二部分是一个布尔值,指示索引是否应该唯一。
class Transaction(Model):
from_acct = CharField()
to_acct = CharField()
amount = DecimalField()
date = DateTimeField()
class Meta:
indexes = (
# create a unique on from/to/date
((\'from_acct\', \'to_acct\', \'date\'), True),
# create a non-unique on from/to
((\'from_acct\', \'to_acct\'), False),
)
记住,如果索引元组只包含一项,则添加末尾逗号
基本操作 增删改查
peewee中关于增删改查的基本操作方法如下:
增
:
create():最常用创建,返回创建实例
save():第一次执行的save是插入,第二次是修改
insert: 插入数据,不创建数据库实例。返回id
insert_many: 批量插入
bulk_create:批量插入,类似于insert_many。可指定单次插入的数量
batch_commit: 自动添加了一个事务,然后一条条的插入
insert_from: 从另一个表中查询的数据作为插入的数据
删除
:
delete().where().execute()
delete_instance() 直接执行删除了,不用调用execute() 方法
修改
:
save(): 第一次执行的save是插入,第二次是修改
update() 用于多字段更新
查询
:
Model.get(): 检索与给定查询匹配的单个实例。报 Model.DoesNotExist 异常。如果有多条记录满足条件,则返回第一条
get_or_none() :与get使用方法相同。区别是找不到结果时不会报错
get_by_id() :通过主键查找,是一种快捷方式
Model[\’id_num\’]: 和上面的get_by_id一样是通过主键查找。
get_or_create(): 首先查询,如果查不到将创建一个新的记录
select() 查询多条数据
创建
单条插入
你可以用Model.create()
创建一个新的实例。这个方法接收关键字参数,参数要和表定义的字段一致。返回值是新的实例
>>> User.create(username=\'Charlie\')
<__main__.User object at 0x2529350>
批量插入
有几种方法可以快速加载大量数据,缺乏经验的做法是在循环中调用Model.create来创建
data_source = [
{\'field1\': \'val1-1\', \'field2\': \'val1-2\'},
{\'field1\': \'val2-1\', \'field2\': \'val2-2\'},
# ...
]
for data_dict in data_source:
MyModel.create(**data_dict)
上面的方法比较慢的原因有几个:
- 如果没有在事务中装饰循环,那么每个对create()的调用都发生在它自己的事务中。这将会非常缓慢
- 必须生成每个InsertQuery并将其解析为SQL
- 需要原生SQL语句传入到数据库中解析
- 检索最后一个insert id,这在某些情况下会导致执行额外的查询
可以通过一个简单的装饰:atomic
来大幅度提高速度
# This is much faster.
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
上面的代码仍然没有解决2、3、4这三点。我们可以通过 insert_many
带来一个大的速度提升。这个方法接收多列元组或字典,然后在一次SQL语句中插入多行数据。
data_source = [
{\'field1\': \'val1-1\', \'field2\': \'val1-2\'},
{\'field1\': \'val2-1\', \'field2\': \'val2-2\'},
# ...
]
# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()
insert_many()
方法还接收多行元组,同时需要提供一个对应的字段。
# We can INSERT tuples as well...
data = [(\'val1-1\', \'val1-2\'),
(\'val2-1\', \'val2-2\'),
(\'val3-1\', \'val3-2\')]
# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
在装饰中批量插入是一个好的方法。
# You can, of course, wrap this in a transaction as well:
with db.atomic():
MyModel.insert_many(data, fields=fields).execute()
插入大量数据
在大量数据的插入场景下,根据数据源中的行数,您可能需要将其分解为多个块。SQLite通常有999或32766的限制
您可以编写一个循环来将数据批处理成块(在这种情况下,强烈建议您使用事务)
# Insert rows 100 at a time.
with db.atomic():
for idx in range(0, len(data_source), 100):
MyModel.insert_many(data_source[idx:idx+100]).execute()
peewwee提供了一个chunked函数帮助你高效的将普通可迭代对象拆分成为可批处理对象。
from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
for batch in chunked(data_source, 100):
MyModel.insert_many(batch).execute()
Model.bulk_create()
的行为有点像insert_many(),但是可以用来插入没有保存的数据库实例,并且可以指定每次插入的数量。如一共插入345,如果指定了一次插入100条记录,那么就是4次插入,3 * 100 + 1 * 45
什么叫没有保存的数据库实例呢?就是类似于User(username=\'kk\')
,创建的数据库实例。
# Read list of usernames from a file, for example.
with open(\'user_list.txt\') as fh:
# Create a list of unsaved User instances.
users = [User(username=line.strip()) for line in fh.readlines()]
# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
User.bulk_create(users, batch_size=100)
bulk_update()
和bulk_create
类似,可以用来插入没有保存的数据库实例,自动添加了一个事务,然后一条条的插入
# List of row data to insert.
row_data = [{\'username\': \'u1\'}, {\'username\': \'u2\'}, ...]
# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
User.create(**row)
从另一个表批量装载
Model.insert_from()
如果要批量插入的数据存储在另一个表中,还可以创建源为SELECT查询的INSERT查询。
res = (TweetArchive
.insert_from(
Tweet.select(Tweet.user, Tweet.message),
fields=[TweetArchive.user, TweetArchive.message])
.execute())
删除
要删除单个模型实例,可以使用model.delete_instance()快捷方式。delete_instance()将删除给定的模型实例,并且可以选择递归地删除任何依赖对象(通过指定recursive=True)。
删除一个记录:Model.delete_instance()
删除任意记录:Model.delete()
更新
save()
:单个更新
一旦模型实例有了主键,随后对save()的任何调用都将导致一个UPDATE而不是另一个INSERT。模型的主键不会改变
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2
update
:批量更新
接受关键字参数,其中键对应于模型的字段名称
>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute() # Returns the number of rows that were updated.
4
查询
单条记录查询
你可以通过Model.get()方法查询到给条件的数据。如果是通过主键查找,也可以用一个快捷方法 Model.get_by_id()。
此方法是使用给定查询调用Model.select()的快捷方式,但将结果集限制为一行。需要注意的是使用get()方法,如果没有找到匹配的数据会抛出错误:DoesNotExist
get
>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>
>>> User.get_by_id(1) # Same as above.
<__main__.User object at 0x252df10>
>>> User[1] # Also same as above.
<__main__.User object at 0x252dd10>
>>> User.get(User.id == 1).username
u\'Charlie\'
>>> User.get(User.username == \'Charlie\')
<__main__.User object at 0x2529410>
>>> User.get(User.username == \'nobody\')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: [\'nobody\']
单条记录查询方法:
- Model.get()
- Model.get_by_id()
- Model.get_or_none() – if no matching row is found, return None.
- Model.select()
- SelectBase.get()
- SelectBase.first() – return first record of result-set or None.
查询或创建
Model.get_or_create() 它首先尝试检索匹配的行。如果失败,将创建一个新行。
通常,可以依赖唯一约束或主键来防止创建重复对象。例如,假设我们希望使用示例用户模型实现注册新用户帐户。用户模型对用户名字段有唯一的约束,因此我们将依赖数据库的完整性保证,以确保不会出现重复的用户名:
try:
with db.atomic():
return User.create(username=username)
except peewee.IntegrityError:
# `username` is a unique column, so this username already exists,
# making it safe to call .get().
return User.get(User.username == username)
上面的例子首先尝试创建,然后回退到查询,依靠数据库来强制执行唯一约束。
如果您希望首先尝试检索记录,可以使用get_or_create()。该函数返回一个2元组,其中包含实例和一个布尔值,该值指示对象是否被创建。
user, created = User.get_or_create(username=username)
person, created = Person.get_or_create(
first_name=first_name,
last_name=last_name,
defaults={\'dob\': dob, \'favorite_color\': \'green\'})
查询多行记录
可以通过Model.select()获取多行数据。peewee允许你迭代这些数据,同时也可以索引和切片。
>>> query = User.select()
>>> [user.username for user in query]
[\'Charlie\', \'Huey\', \'Peewee\']
>>> query[1]
<__main__.User at 0x7f83e80f5550>
>>> query[1].username
\'Huey\'
>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]
select()是很智能的,在查询一次的前提下可以多次迭代,切片,下标取值等。
在缓存结果时,同一查询的后续迭代不会命中数据库。要禁用此行为(以减少内存使用),请在迭代时调用Select.iterator()。
除了返回模型实例外,Select查询还可以返回字典、元组和命名元组。根据您的用例,您可能会发现将行作为字典使用更容易
>>> query = User.select().dicts()
>>> for row in query:
... print(row)
{\'id\': 1, \'username\': \'Charlie\'}
{\'id\': 2, \'username\': \'Huey\'}
{\'id\': 3, \'username\': \'Peewee\'}
iterator()
:不缓存查询结果
默认情况下,peewee将缓存迭代Select查询时返回的行。这是一种优化,允许多次迭代以及索引和切片,而不会导致额外的查询。但是,当您计划在大量行上进行迭代时,这种缓存可能会有问题。
为了减少内存的消耗,使用iterator()方法。这个方法允许返回结果不缓存数据。使用更少的内存。
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats and serialize.
for stat in stats.iterator():
serializer.serialize_object(stat)
对于简单的查询,您可以通过将行作为字典返回来进一步提高速度。namedtuples或元组。以下方法可用于任何Select查询,以更改结果行类型。
dicts()
namedtuples()
tuples()
objects
: 将多个查询表放在一个实例中
当对包含多个表中的列的大量行进行迭代时,peewee将为返回的每一行构建查询模型。对于复杂查询,此操作可能很慢。例如,如果我们选择一个tweet列表以及tweet作者的用户名和头像,Peewee必须为每一行创建两个对象(tweet和用户)。除了上述行类型之外,还有第四个方法objects(),它将作为模型实例返回行,但不会分解模型查询。
query = (Tweet
.select(Tweet, User) # Select tweet and user data.
.join(User))
# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
print(tweet.user.username, tweet.content)
# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
print(tweet.username, tweet.content)
为了获得最佳性能,您可以执行查询,然后使用底层数据库游标对结果进行迭代。
Database.execute()。接受查询对象,执行查询,并返回DB-API 2.0游标对象。光标将返回原始行元组:
query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
print(username, \'->\', content)
事务
数据库事务
(Transaction)是一种机制,包含了一组数据库操作命令
事务把所有的命令作为一个整体一起向系统提交或撤销操作请求,即这一组数据库命令要么都执行,要么都不执行,因此事务是一个不可分割的工作逻辑单元。
事务具有 4 个特性,即原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability),这 4 个特性通常简称为 ACID。
peewee事务
Peewee实现事务的方法是Database.atomic()
方法,非常简单
当事务执行成功之后,它会自动commit(),不需要我们手动调。当事务的代码块中抛出异常时,它会自动调用rollback(),将数据库状态恢复到操作之前,保证要么命令全部执行,要么全部不执行。
Peewee中实现事务有两种使用方式,一种是将atomic当做Context manager使用,另外一种将atomic当修饰器使用。Context manager
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
装饰器
@db.atomic()
def insert_data()
for data_dict in data_source:
MyModel.create(**data_dict)
事务其他特性:
- 除了自动commit()和rollback()之外,也可以手动调用commit()和rollback()方法
- 事务支持嵌套使用
- 在一个事务中对数据库操作能够有效减少事务的耗时,增加操作效率
过滤
您可以使用普通的python操作符过滤特定的记录。
>>> user = User.get(User.username == \'Charlie\')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
... print(tweet.user.username, \'->\', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun
>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
... print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00
...
print(tweet.message)
hello world
this is fun
look at this picture of my food
记录分类
给返回的数据排序,可以使用order_by
1.普通使用
>>> for t in Tweet.select().order_by(Tweet.created_date):
... print(t.pub_date)
2.倒序排列
可以使用desc或者-
号
Tweet.select().order_by(Tweet.created_date.desc())
Tweet.select().order_by(-Tweet.created_date) # Note the "-" prefix.
3.正序排列
User.select().order_by(+User.username)
4.高级使用
对计算值进行排序时,可以包括必要的SQL表达式,也可以引用指定给该值的别名。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias(\'num_tweets\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username))
您可以使用select子句中使用的相同计数表达式进行订购。在下面的示例中,我们按tweet ID的COUNT()降序排序:
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias(\'num_tweets\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(fn.COUNT(Tweet.id).desc()))
或者,可以在select子句中引用指定给计算值的别名。这种方法的优点是易于阅读。请注意,我们不是直接引用命名别名,而是使用SQL帮助程序对其进行包装:
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias(\'num_tweets\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(SQL(\'num_tweets\').desc()))
同样,也可以使用如上
ntweets = fn.COUNT(Tweet.id)
query = (User
.select(User.username, ntweets.alias(\'num_tweets\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(ntweets.desc())
计数
可以使用count来计算返回数量
>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50
分页
paginate()
方法可以很简单的获取一个分页的数据。paginate有两个参数:page_number 和 items_per_page。第一个参数是取回数据的页数;第二个参数是每一页多少元素。这两个参数加起来才能完成分页
>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
... print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19
分页的功能也可以用limit()
和offset()
来实现
Tweet.select().order_by(Tweet.id).offset(10).limit(10)
offset(10) 跳过10个记录
limit(10) 取10个记录
聚合查询
聚合查询:对查询出来的结果进一步处理,包括统计,分组,求最大值,求平均值等。
聚合常用的函数:
COUNT:计算表中的记录数(行数)
SUM:计算表中数值列中数据的合计值
AVG:计算表中数值列中数据的平均值
MAX:求出表中任意列中数据的最大值
MIN:求出表中任意列中数据的最小值
用于汇总的函数称为聚合函数或者聚集函数。所谓聚合,就是将多行汇总为一行。实际上,所有的聚合函数都是这样,输入多行输出一行。
聚合函数的使用:
mysql> select * from person;
+----+------+-----+------+-----+
| id | name | age | High | sex |
+----+------+-----+------+-----+
| 1 | ljk | 30 | 177 | 1 |
| 2 | aghj | 23 | 168 | 1 |
+----+------+-----+------+-----+
2 rows in set (0.00 sec)
************************************
* 聚合函数 *
************************************
mysql> select count(*) from person;
+----------+
| count(*) |
+----------+
| 2 |
+----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select sum(age) from person;
+----------+
| sum(age) |
+----------+
| 53 |
+----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select avg(high) from person;
+-----------+
| avg(high) |
+-----------+
| 172.5000 |
+-----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select max(high) from person;
+-----------+
| max(high) |
+-----------+
| 177 |
+-----------+
1 row in set (0.00 sec)
mysql> select * from person;
+----+------+-----+------+-----+
| id | name | age | High | sex |
+----+------+-----+------+-----+
| 1 | ljk | 30 | 177 | 1 |
| 2 | aghj | 23 | 168 | 1 |
| 3 | 0 | 22 | 165 | 0 |
+----+------+-----+------+-----+
3 rows in set (0.00 sec)
mysql> select avg(High) from person group by sex;
+-----------+
| avg(High) |
+-----------+
| 172.5000 |
| 165.0000 |
+-----------+
2 rows in set (0.00 sec)
# 使用having对分组的数据筛选
mysql> select avg(High) as high from person group by sex having high > 170;
+----------+
| high |
+----------+
| 172.5000 |
+----------+
1 row in set (0.00 sec)
where
:分组之前筛选数据
where 子句的作用是在对查询结果进行分组前,将不符合where条件的行去掉,即在分组之前过滤数据,where条件中不能包含聚组函数,使用where条件过滤出特定的行。
having
: 对分组之后筛选分组的数据
having 子句的作用是筛选满足条件的组,即在分组之后过滤数据,条件中经常包含聚组函数,使用having 条件过滤出特定的组,也可以使用多个分组标准进行分组。
总结一下过滤的顺序
on->join->where->group by->having
分组
查询用户以及每个人拥有的tweet账号数量。这里使用了group_by,将结果根据User表分类。
query = (User
.select(User, fn.Count(Tweet.id).alias(\'count\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User))
假设有如下数据库,一个多对多的关系。
class Photo(Model):
image = CharField()
class Tag(Model):
name = CharField()
class PhotoTag(Model):
photo = ForeignKeyField(Photo)
tag = ForeignKeyField(Tag)
查询Tag记录,按照Tag分组,筛选出每组Tag里Photo数量超过5个的记录。
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
HAVING 子句可以让我们筛选分组后的各组数据。
HAVING,它与 GROUP BY 配合使用,为聚合操作指定条件。
WHERE 子句只能指定行的条件,而不能指定组的条件。所以当数据分组之后就需要 HAVING 对分组的数据筛选。
具体区别:
- where 用在group_by前,having用在group_by之后。
- 聚合函数(avg、sum、max、min、count),不能作为条件放在where之后,但可以放在having之后
Scalar
对查询出来的数据做处理
可以通过调用Query.scalar()来检索标量值。例如
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
您可以通过传递来检索多个标量值
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
窗口
窗口函数是指对作为SELECT查询一部分处理的数据滑动窗口进行操作的聚合函数。窗口功能可以执行以下操作:
对结果集的子集执行聚合。
计算一个运行总数。
排名结果。
将行值与前面(或后面!)行中的值进行比较。
peewee支持SQL窗口函数,可以通过调用Function.over()并传入分区或排序参数来创建这些函数。
复杂筛选
peewee支持以下类型的比较
查询中支持的筛选运算符
Comparison | Meaning |
---|---|
== | x equals y |
< | x is less than y |
<= | x is less than or equal to y |
> | x is greater than y |
>= | x is greater than or equal to y |
!= | x is not equal to y |
<< | x IN y, where y is a list or query |
>> | x IS y, where y is None/NULL |
% | x LIKE y where y may contain wildcards |
** | x ILIKE y where y may contain wildcards |
^ | x XOR y |
~ | Unary negation (e.g., NOT x) |
筛选方法
因为用完了要重写的操作符,所以有一些额外的查询操作可以作为方法使用
Method | Meaning |
---|---|
.in_(value) | 查询在范围内 |
.not_in(value) | 查询不在范围内 |
.is_null(is_null) | 为空或不为空。接受布尔参数 |
.contains(substr) | 通配符搜索子字符串 |
.startswith(prefix) | 查询以prefix开头的数据 |
.endswith(suffix) | 查询以prefix结尾的数据 |
.between(low, high) | 查询在low和high中间的值 |
.regexp(exp) | 正则表达式匹配匹配的数据,贪婪模式 |
.iregexp(exp) | 正则表达式匹配匹配的数据,非贪婪模式 |
.bin_and(value) | 二进制加 |
.bin_or(value) | 二进制或 |
.concat(other) | Concatenate two strings or objects using ||. |
.distinct() | 标记重复的数据 |
.collate(collation) | 指定具有给定排序规则的列 |
.cast(type) | 将列的值强制转换为给定类型 |
联合查询逻辑操作
使用逻辑操作的联合查询
Operator | Meaning | Example |
---|---|---|
& | AND | (User.is_active == True) & (User.is_admin == True) |
| | OR | (User.is_admin) | (User.is_superuser) |
~ | NOT (unary negation) | ~(User.username.contains(\’admin\’)) |
# Find the user whose username is "charlie".
User.select().where(User.username == \'charlie\')
# Find the users whose username is in [charlie, huey, mickey]
User.select().where(User.username.in_([\'charlie\', \'huey\', \'mickey\']))
Employee.select().where(Employee.salary.between(50000, 60000))
Employee.select().where(Employee.name.startswith(\'C\'))
Blog.select().where(Blog.title.contains(search_string))
请注意,实际的比较用括号括起来。 Python 的运算符优先级要求将比较括在括号中。
# Find any users who are active administrations.
User.select().where(
(User.is_admin == True) &
(User.is_active == True))
可能你尝试使用python语法中的in and or 和not操作,但是在查询中是不生效的。所有的操作返回都是一个布尔值。
建议如下:
- 用
.in_()
和.not_in()
替换 in和not in
- 用&替换and
- 用|替换or
- 用~替换not
- 用.is_null()替换 is None 或 == None
SQL 方法
SQL方法,如like
,sum
等,可以通过fn
来表达
从peewee中导入fn:from peewee import fn
query = (User
.select(User, fn.COUNT(Tweet.id).alias(\'tweet_count\'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User)
.order_by(fn.COUNT(Tweet.id).desc()))
for user in query:
print(\'%s -- %s tweets\' % (user.username, user.tweet_count))
fn可以表达任何SQL方法,它的参数可以是字段,值,子查询甚至嵌套函数
基础使用
- fn.AVG() 返回指定列的平均值,NULL值不包括在计算中。
- fn.SUM() 返回指定列的数目,NULL值不包括在计算中。
- fn.MIN() 返回指定列的最小值,NULL值不包括在计算中。
- fn.MAX() 返回指定列的最大值,NULL值不包括在计算中。
- fn.DATE() 返回指定日期时间格式列的日期格式
- fn.DECIMAL(10, 2) ===> decimal(10,2)中的“2”表示小数部分的位数
进阶使用
- fn.to_char() 返回指定列格式化后的字符串 e.g.: fn.to_char(18.88, \’99.999\’) ===> 18.888; fn.to_char(model.field, \’\’)。
- fn.char_length(str) 返回字符串字符数
- fn.array_agg() 接受一组值并返回一个数组。
- fn.array_agg(model.name).order_by(model.id.asc()) # array_agg(name order by id asc)
- fn.rank().over(partition_by=[field1, field2, or aggregation_field1], order_by=[fn.SUM(Booking.slots).desc()]) 实现rank() over(partition by filed order by filed)分区功能。
- fn.length() 返回指定列的长度。也可应用于order_by。e.g.: .order_by(fn.length(model.field).asc())。
- fn.CONCAT() 返回合并的字符串(CONCAT一定要大写,小写的concat用法不一样)。fn.CONCAT(model.id, \’-\’, model.name) ===> \’188-张三\’
SQL helper
有时,您可能想在sql中传一些任意的sql语句。您可以使用特殊的SQL类来实现这一点
# We\'ll query the user table and annotate it with a count of tweets for
# the given user
query = (User
.select(User, fn.Count(Tweet.id).alias(\'ct\'))
.join(Tweet)
.group_by(User))
# Now we will order by the count, which was aliased to "ct"
query = query.order_by(SQL(\'ct\'))
# You could, of course, also write this as:
query = query.order_by(fn.COUNT(Tweet.id))
使用peewee执行手工SQL语句有两种方法
- Database.execute_sql() 用于执行任何类型的查询
- RawQuery 执行SELECT查询并返回模型实例
query = MyModel.raw(\'SELECT * FROM my_table WHERE data = %s\', user_data)
query.execute_sql()
安全和SQL注入
默认情况下,peewee将参数化查询,因此用户传入的任何参数都将被转义。
请确保将任何用户定义的数据作为查询参数传入,而不是作为实际SQL查询的一部分传入:
query = MyModel.raw(\'SELECT * FROM my_table WHERE data = %s\' % (user_data,))
# Good. `user_data` will be treated as a parameter to the query.
query = MyModel.raw(\'SELECT * FROM my_table WHERE data = %s\', user_data)
# Bad! DO NOT DO THIS!
query = MyModel.select().where(SQL(\'Some SQL expression %s\' % user_data))
# Good. `user_data` will be treated as a parameter.
query = MyModel.select().where(SQL(\'Some SQL expression %s\', user_data))
MySQL和Postgresql使用“%s”表示参数。另一方面,SQLite使用“?”。请确保使用适合数据库的字符。还可以通过检查Database.param来查找此参数。
小结
个人水平问题翻译并不是很准确,由于方法太多使用也未给出适当例子。后面有时间挑增删改查等功能写详细操作。
</peewee.sqlitedatabase></peewee.autofield></peewee.charfield></peewee.autofield></peewee.modeloptions>