sqlite_utils Python 库¶
入门¶
下面是如何创建一个新的 SQLite 数据库文件,其中包含一个新的 chickens
表,并填充四条记录
from sqlite_utils import Database
db = Database("chickens.db")
db["chickens"].insert_all([{
"name": "Azi",
"color": "blue",
}, {
"name": "Lila",
"color": "blue",
}, {
"name": "Suna",
"color": "gold",
}, {
"name": "Cardi",
"color": "black",
}])
你可以像这样遍历这些行
for row in db["chickens"].rows:
print(row)
输出如下
{'name': 'Azi', 'color': 'blue'}
{'name': 'Lila', 'color': 'blue'}
{'name': 'Suna', 'color': 'gold'}
{'name': 'Cardi', 'color': 'black'}
要运行 SQL 查询,请使用 db.query()
for row in db.query("""
select color, count(*)
from chickens group by color
order by count(*) desc
"""):
print(row)
输出如下
{'color': 'blue', 'count(*)': 2}
{'color': 'gold', 'count(*)': 1}
{'color': 'black', 'count(*)': 1}
连接或创建数据库¶
Database 对象可以通过传入磁盘文件的路径或现有的 SQLite3 数据库连接来构造
from sqlite_utils import Database
db = Database("my_database.db")
如果 my_database.db
不存在,这将创建它。
如果你想从头开始重新创建数据库(首先删除磁盘上已有的文件),可以使用 recreate=True
参数
db = Database("my_database.db", recreate=True)
除了文件路径,你还可以传入一个现有的 SQLite 连接
import sqlite3
db = Database(sqlite3.connect("my_database.db"))
如果你想创建一个内存数据库,可以这样做
db = Database(memory=True)
你还可以创建一个命名的内存数据库。与普通内存数据库不同,只要至少有一个对该数据库的引用存在,这些数据库就可以被多个线程访问。del db 将从内存中清除数据库。
db = Database(memory_name="my_shared_database")
默认情况下,连接使用 PRAGMA recursive_triggers=on
。如果你不想使用 递归触发器,可以通过以下方式将其关闭
db = Database(memory=True, recursive_triggers=False)
默认情况下,任何实现 prepare_connection(conn) 钩子的 sqlite-utils 插件 在创建 Database
对象时都会对连接执行。你可以通过 execute_plugins=False
退出执行插件,例如这样
db = Database(memory=True, execute_plugins=False)
你可以传入 strict=True
来为使用此数据库对象创建的所有表启用 SQLite STRICT 模式
db = Database("my_database.db", strict=True)
附加其他数据库¶
SQLite 支持跨数据库 SQL 查询,可以将来自多个数据库文件中的表数据进行连接。
你可以使用 .attach()
方法附加一个额外的数据库,提供该数据库的别名以及磁盘上 SQLite 文件的路径。
db = Database("first.db")
db.attach("second", "second.db")
# Now you can run queries like this one:
print(db.query("""
select * from table_in_first
union all
select * from second.table_in_second
"""))
你可以使用传入 db.attach(alias, filepath)
的别名作为前缀来引用附加数据库中的表,例如上面 SQL 查询中的 second.table_in_second
引用。
跟踪查询¶
你可以使用 tracer
机制查看 SQLite 正在执行的 SQL 查询。tracer 是你提供的一个函数,每次执行 SQL 时都会调用该函数并传入 sql
和 params
参数,例如
def tracer(sql, params):
print("SQL: {} - params: {}".format(sql, params))
你可以将此函数传递给 Database()
构造函数,如下所示
db = Database(memory=True, tracer=tracer)
你还可以使用 with db.tracer(...)
上下文管理器临时启用 tracer 函数,只对一个代码块生效
db = Database(memory=True)
# ... later
with db.tracer(print):
db["dogs"].insert({"name": "Cleo"})
这个例子只会在 with
代码块期间打印查询。
执行查询¶
The Database
类提供了几种直接执行 SQL 查询的方法。
db.query(sql, params)¶
db.query(sql)
函数执行 SQL 查询并返回一个迭代器,该迭代器生成表示结果行的 Python 字典
db = Database(memory=True)
db["dogs"].insert_all([{"name": "Cleo"}, {"name": "Pancakes"}])
for row in db.query("select * from dogs"):
print(row)
# Outputs:
# {'name': 'Cleo'}
# {'name': 'Pancakes'}
db.execute(sql, params)¶
db.execute()
和 db.executescript()
方法是对底层 SQLite 连接的 .execute()
和 .executescript()
方法的包装。如果注册了 tracer 函数,这些包装器会记录日志到该函数。
db.execute(sql)
返回用于执行 SQL 的 sqlite3.Cursor 对象。
db = Database(memory=True)
db["dogs"].insert({"name": "Cleo"})
cursor = db.execute("update dogs set name = 'Cleopaws'")
print(cursor.rowcount)
# Outputs the number of rows affected by the update
# In this case 2
其他游标方法如 .fetchone()
和 .fetchall()
也可用,请参阅标准库文档。
传递参数¶
db.query()
和 db.execute()
都接受一个可选的第二个参数,用于向 SQL 查询传递参数。
参数可以是元组/列表或字典的形式,取决于查询中使用的参数类型。以这种方式传递的值将正确引用和转义,有助于避免 SQL 注入漏洞。
SQL 查询中的 ?
参数可以使用列表填充
db.execute("update dogs set name = ?", ["Cleopaws"])
# This will rename ALL dogs to be called "Cleopaws"
使用 :name
的命名参数可以使用字典填充
dog = next(db.query(
"select rowid, name from dogs where name = :name",
{"name": "Cleopaws"}
))
# dog is now {'rowid': 1, 'name': 'Cleopaws'}
在此示例中,使用 next()
从 db.query()
方法返回的迭代器中检索第一个结果。
访问表¶
表可以使用索引操作符访问,如下所示
table = db["my_table"]
如果表尚不存在,则在你第一次尝试向其中插入或增补数据时会创建它。
你还可以使用 .table()
方法访问表,如下所示
table = db.table("my_table")
使用此工厂函数可以设置 表配置选项。
列出表¶
你可以使用 .table_names()
方法列出数据库中表的名称
>>> db.table_names()
['dogs']
要仅查看 FTS4 表,请使用 .table_names(fts4=True)
。对于 FTS5,使用 .table_names(fts5=True)
。
你还可以使用 .tables
属性迭代表对象本身
>>> db.tables
[<Table dogs>]
列出视图¶
.view_names()
会显示数据库中的视图列表
>>> db.view_names()
['good_dogs']
你还可以使用 .views
属性迭代视图对象
>>> db.views
[<View good_dogs>]
视图对象类似于 Table 对象,但任何尝试插入或更新数据的操作都会抛出错误。视图对象上可用的方法和属性完整列表如下
columns
columns_dict
count
schema
rows
rows_where(where, where_args, order_by, select)
drop()
列出行¶
要遍历表中每一行的字典,请使用 .rows
>>> db = sqlite_utils.Database("dogs.db")
>>> for row in db["dogs"].rows:
... print(row)
{'id': 1, 'age': 4, 'name': 'Cleo'}
{'id': 2, 'age': 2, 'name': 'Pancakes'}
你还可以使用 .rows_where(where, where_args)
根据 WHERE 子句过滤行
>>> db = sqlite_utils.Database("dogs.db")
>>> for row in db["dogs"].rows_where("age > ?", [3]):
... print(row)
{'id': 1, 'age': 4, 'name': 'Cleo'}
第一个参数是 SQL 片段。第二个可选参数是要传递给该片段的值 - 你可以使用 ?
占位符并传递数组,或者使用 :named
参数并传递字典,如下所示
>>> for row in db["dogs"].rows_where("age > :age", {"age": 3}):
... print(row)
{'id': 1, 'age': 4, 'name': 'Cleo'}
要返回自定义列(而不是使用 select *
的默认列),请传递 select="column1, column2"
>>> db = sqlite_utils.Database("dogs.db")
>>> for row in db["dogs"].rows_where(select='name, age'):
... print(row)
{'name': 'Cleo', 'age': 4}
要指定顺序,请使用 order_by=
参数
>>> for row in db["dogs"].rows_where("age > 1", order_by="age"):
... print(row)
{'id': 2, 'age': 2, 'name': 'Pancakes'}
{'id': 1, 'age': 4, 'name': 'Cleo'}
你可以使用 order_by="age desc"
进行降序排序。
通过排除 where
参数,你可以对表中所有记录进行排序
>>> for row in db["dogs"].rows_where(order_by="age desc"):
... print(row)
{'id': 1, 'age': 4, 'name': 'Cleo'}
{'id': 2, 'age': 2, 'name': 'Pancakes'}
此方法还接受 offset=
和 limit=
参数,用于为 SQL 查询指定 OFFSET 和 LIMIT
>>> for row in db["dogs"].rows_where(order_by="age desc", limit=1):
... print(row)
{'id': 1, 'age': 4, 'name': 'Cleo'}
计数行¶
要计算由 where 过滤器返回的行数,请使用 .count_where(where, where_args)
>>> db["dogs"].count_where("age > ?", [1])
2
列出带主键的行¶
有时,检索每行的主键会很有用,以便将该键(或主键元组)传递给 .get()
或 .update()
方法。
.pks_and_rows_where()
方法采用与 .rows_where()
相同的签名(除了 select=
参数),但返回一个生成器,该生成器生成 (primary key, row dictionary)
对。
主键值通常是单个值,但如果表具有复合主键,也可以是元组。
如果表是 rowid
表(没有显式主键列),则将返回该 ID。
>>> db = sqlite_utils.Database(memory=True)
>>> db["dogs"].insert({"name": "Cleo"})
>>> for pk, row in db["dogs"].pks_and_rows_where():
... print(pk, row)
1 {'rowid': 1, 'name': 'Cleo'}
>>> db["dogs_with_pk"].insert({"id": 5, "name": "Cleo"}, pk="id")
>>> for pk, row in db["dogs_with_pk"].pks_and_rows_where():
... print(pk, row)
5 {'id': 5, 'name': 'Cleo'}
>>> db["dogs_with_compound_pk"].insert(
... {"species": "dog", "id": 3, "name": "Cleo"},
... pk=("species", "id")
... )
>>> for pk, row in db["dogs_with_compound_pk"].pks_and_rows_where():
... print(pk, row)
('dog', 3) {'species': 'dog', 'id': 3, 'name': 'Cleo'}
检索特定记录¶
你可以使用 table.get()
按主键检索记录
>>> db = sqlite_utils.Database("dogs.db")
>>> print(db["dogs"].get(1))
{'id': 1, 'age': 4, 'name': 'Cleo'}
如果表具有复合主键,你可以将主键值作为元组传入
>>> db["compound_dogs"].get(("mixed", 3))
如果记录不存在,将抛出 NotFoundError
异常
from sqlite_utils.db import NotFoundError
try:
row = db["dogs"].get(5)
except NotFoundError:
print("Dog not found")
显示模式¶
db.schema
属性以字符串形式返回数据库的完整 SQL 模式
>>> db = sqlite_utils.Database("dogs.db")
>>> print(db.schema)
CREATE TABLE "dogs" (
[id] INTEGER PRIMARY KEY,
[name] TEXT
);
创建表¶
创建新表最简单的方法是向其中插入一条记录
from sqlite_utils import Database
import sqlite3
db = Database("dogs.db")
dogs = db["dogs"]
dogs.insert({
"name": "Cleo",
"twitter": "cleopaws",
"age": 3,
"is_good_dog": True,
})
这将自动创建一个名为“dogs”的新表,其模式如下
CREATE TABLE dogs (
name TEXT,
twitter TEXT,
age INTEGER,
is_good_dog INTEGER
)
你还可以通过将 pk=
参数传递给 .insert()
调用来指定主键。只有当插入的记录导致表被创建时,此参数才会生效
dogs.insert({
"id": 1,
"name": "Cleo",
"twitter": "cleopaws",
"age": 3,
"is_good_dog": True,
}, pk="id")
像这样插入一行后,dogs.last_rowid
属性将返回分配给最近插入记录的 SQLite rowid
。
如果你指定了主键,dogs.last_pk
属性将返回最后插入的主键值。这在编写创建外键或多对多关系的代码时非常有用。
自定义列顺序和列类型¶
如果你使用的是 Python 3.6 或更高版本,表的列顺序将从字典中键的顺序派生。
如果你想显式设置列的顺序,可以使用 column_order=
参数
db["dogs"].insert({
"id": 1,
"name": "Cleo",
"twitter": "cleopaws",
"age": 3,
"is_good_dog": True,
}, pk="id", column_order=("id", "twitter", "name"))
你无需将所有列都传递给 column_order
参数。如果你只传递列的一个子集,其余列将根据字典中键的顺序进行排序。
列类型是根据提供的示例数据检测的。有时你可能需要覆盖这些检测到的类型 - 例如,为作为字符串提供的数据创建一个整数列,或确保第一个示例为 None
的表创建为 INTEGER
列而不是 TEXT
列。你可以使用 columns=
参数来做到这一点
db["dogs"].insert({
"id": 1,
"name": "Cleo",
"age": "5",
}, pk="id", columns={"age": int, "weight": float})
这将创建一个具有以下模式的表
CREATE TABLE [dogs] (
[id] INTEGER PRIMARY KEY,
[name] TEXT,
[age] INTEGER,
[weight] FLOAT
)
显式创建表¶
你可以使用 .create()
方法直接创建一个新表,而无需向其中插入任何数据
db["cats"].create({
"id": int,
"name": str,
"weight": float,
}, pk="id")
此处的第一个参数是一个字典,用于指定要创建的列。每列都与一个 Python 类型配对,表示列的类型。有关这些类型如何工作的完整详细信息,请参阅 添加列。
此方法接受可选参数 pk=
、column_order=
、foreign_keys=
、not_null=set()
和 defaults=dict()
- 下面会解释。
如果已存在同名表,将抛出 sqlite_utils.utils.sqlite3.OperationalError
异常。
你可以传递 ignore=True
来忽略该错误。你还可以使用 if_not_exists=True
来使用 SQL CREATE TABLE IF NOT EXISTS
模式实现相同的效果
db["cats"].create({
"id": int,
"name": str,
}, pk="id", if_not_exists=True)
要删除并替换任何已存在的同名表,请传递 replace=True
。这是一项危险的操作,将导致表中现有数据丢失。
你还可以传递 transform=True
,以便对任何现有表进行 转换 以匹配你的新表规范。这是一项危险的操作,因为它会删除 .create()
调用中不再列出的列,因此在运行此操作时请务必小心。
db["cats"].create({
"id": int,
"name": str,
"weight": float,
}, pk="id", transform=True)
如果以下任何项发生更改,transform=True
选项将更新表模式
指定的列或其类型
指定的主键
使用
column_order=
定义的列顺序not_null=
或defaults=
参数
transform=True
目前不会检测和应用 foreign_keys=
的更改。
你可以传递 strict=True
以在 STRICT
模式下创建表
db["cats"].create({
"id": int,
"name": str,
}, strict=True)
复合主键¶
如果你想创建一个具有跨多个列的复合主键的表,可以通过将列名元组传递给任何接受 pk=
参数的方法来实现。例如
db["cats"].create({
"id": int,
"breed": str,
"name": str,
"weight": float,
}, pk=("breed", "id"))
这也适用于 .insert()
、.insert_all()
、.upsert()
和 .upsert_all()
方法。
指定外键¶
任何可以创建表的操作(.create()
、.insert()
、.insert_all()
、.upsert()
和 .upsert_all()
)都接受一个可选的 foreign_keys=
参数,可用于为正在创建的表设置外键约束。
如果你将数据库与 Datasette 一起使用,Datasette 将检测这些约束并使用它们生成指向相关记录的超链接。
foreign_keys
参数接受一个列表,该列表指示应创建哪些外键。该列表可以采用多种形式。最简单的是列名列表
foreign_keys=["author_id"]
该库将根据 添加外键约束 中描述的规则猜测你想引用的表。
你也可以更明确,通过传入元组列表
foreign_keys=[
("author_id", "authors", "id")
]
这意味着 author_id
列应该是一个外键,引用 authors
表中的 id
列。
你也可以省略元组中的第三项,让引用的列自动设置为该表的主键。完整示例
db["authors"].insert_all([
{"id": 1, "name": "Sally"},
{"id": 2, "name": "Asheesh"}
], pk="id")
db["books"].insert_all([
{"title": "Hedgehogs of the world", "author_id": 1},
{"title": "How to train your wolf", "author_id": 2},
], foreign_keys=[
("author_id", "authors")
])
表配置选项¶
.insert()
、.upsert()
、.insert_all()
和 .upsert_all()
方法都接受多个关键字参数,其中一些影响在它们导致创建表时发生的情况,另一些影响这些方法的行为。
你可以通过 db.table(...)
方法(而不是使用 db["table_name"]
)访问表来为这些方法设置默认值,如下所示
table = db.table(
"authors",
pk="id",
not_null={"name", "score"},
column_order=("id", "name", "score", "url")
)
# Now you can call .insert() like so:
table.insert({"id": 1, "name": "Tracy", "score": 5})
可以通过这种方式指定的配置选项包括 pk
、foreign_keys
、column_order
、not_null
、defaults
、batch_size
、hash_id
、hash_id_columns
、alter
、ignore
、replace
、extracts
、conversions
、columns
、strict
。这些都将在下面进行文档说明。
设置默认值和非空约束¶
每个可以导致创建表的方法都接受可选参数 not_null=set()
和 defaults=dict()
。接受这些可选参数的方法包括
db.create_table(...)
table.create(...)
table.insert(...)
table.insert_all(...)
table.upsert(...)
table.upsert_all(...)
你可以使用 not_null=
传递一个列名集合,这些列在创建时应设置 NOT NULL
约束。
你可以使用 defaults=
传递一个字典,将列映射到应在 CREATE TABLE
语句中指定的默认值。
这是一个使用这些功能的示例
db["authors"].insert_all(
[{"id": 1, "name": "Sally", "score": 2}],
pk="id",
not_null={"name", "score"},
defaults={"score": 1},
)
db["authors"].insert({"name": "Dharma"})
list(db["authors"].rows)
# Outputs:
# [{'id': 1, 'name': 'Sally', 'score': 2},
# {'id': 3, 'name': 'Dharma', 'score': 1}]
print(db["authors"].schema)
# Outputs:
# CREATE TABLE [authors] (
# [id] INTEGER PRIMARY KEY,
# [name] TEXT NOT NULL,
# [score] INTEGER NOT NULL DEFAULT 1
# )
重命名表¶
db.rename_table(old_name, new_name)
方法可用于重命名表
db.rename_table("my_table", "new_name_for_my_table")
这会执行以下 SQL
ALTER TABLE [my_table] RENAME TO [new_name_for_my_table]
复制表¶
table.duplicate()
方法创建表的副本,复制表模式和该表中的所有行
db["authors"].duplicate("authors_copy")
新的 authors_copy
表现在将包含 authors
表数据的副本。
如果表不存在,此方法将引发 sqlite_utils.db.NoTable
异常。
批量插入¶
如果你有不止一条记录要插入,insert_all()
方法是一种效率更高的插入方式。就像 insert()
一样,它会自动检测应该创建的列,但它会检查前 100 条记录以帮助决定列类型。
像这样使用它
db["dogs"].insert_all([{
"id": 1,
"name": "Cleo",
"twitter": "cleopaws",
"age": 3,
"is_good_dog": True,
}, {
"id": 2,
"name": "Marnie",
"twitter": "MarnieTheDog",
"age": 16,
"is_good_dog": True,
}], pk="id", column_order=("id", "twitter", "name"))
CREATE TABLE
语句中使用的列类型是根据第一批数据中的数据类型自动派生的。后续批次中的任何额外列都将导致 sqlite3.OperationalError
异常被抛出,除非提供了 alter=True
参数,在这种情况下将创建新列。
该函数可以接受行的迭代器或生成器,并将根据批次大小提交它们。默认批次大小为 100,但你可以使用 batch_size
参数指定不同的批次大小
db["big_table"].insert_all(({
"id": 1,
"name": "Name {}".format(i),
} for i in range(10000)), batch_size=1000)
你还可以使用 ignore=True
跳过插入任何已存在主键的记录。这适用于 .insert({...}, ignore=True)
和 .insert_all([...], ignore=True)
。
你还可以使用 truncate=True
在插入新记录之前删除表中的所有现有行。这在你想要替换表中的数据时很有用。
插入新记录后,传递 analyze=True
以对表运行 ANALYZE
。
插入替换数据¶
如果你尝试使用已存在的主键插入数据,.insert()
或 .insert_all()
方法将抛出 sqlite3.IntegrityError
异常。
这个示例捕获了该异常
from sqlite_utils.utils import sqlite3
try:
db["dogs"].insert({"id": 1, "name": "Cleo"}, pk="id")
except sqlite3.IntegrityError:
print("Record already exists with that primary key")
从 sqlite_utils.utils.sqlite3
导入可以确保你的代码即使在使用 pysqlite3
库而不是 Python 标准库 sqlite3
模块时也能正常工作。
使用 ignore=True
参数忽略此错误
# This fails silently if a record with id=1 already exists
db["dogs"].insert({"id": 1, "name": "Cleo"}, pk="id", ignore=True)
要替换任何具有匹配主键的现有记录,请使用 replace=True
参数传递给 .insert()
或 .insert_all()
db["dogs"].insert_all([{
"id": 1,
"name": "Cleo",
"twitter": "cleopaws",
"age": 3,
"is_good_dog": True,
}, {
"id": 2,
"name": "Marnie",
"twitter": "MarnieTheDog",
"age": 16,
"is_good_dog": True,
}], pk="id", replace=True)
注意
在 sqlite-utils 2.0 之前,.upsert()
和 .upsert_all()
方法的工作方式与现在 2.x 中的 .insert(replace=True)
和 .insert_all(replace=True)
相同。有关 2.0 中引入的这些方法的新行为,请参阅 增补数据。
更新特定记录¶
你还可以使用 table.update()
按主键更新记录
>>> db = sqlite_utils.Database("dogs.db")
>>> print(db["dogs"].get(1))
{'id': 1, 'age': 4, 'name': 'Cleo'}
>>> db["dogs"].update(1, {"age": 5})
>>> print(db["dogs"].get(1))
{'id': 1, 'age': 5, 'name': 'Cleo'}
update()
的第一个参数是主键。如果表具有复合主键,这可以是单个值,也可以是元组
>>> db["compound_dogs"].update((5, 3), {"name": "Updated"})
第二个参数是要更新的列及其新值的字典。
你还可以使用 alter=True
使任何缺失的列自动添加
>>> db["dogs"].update(1, {"breed": "Mutt"}, alter=True)
删除特定记录¶
你还可以使用 table.delete()
删除记录
>>> db = sqlite_utils.Database("dogs.db")
>>> db["dogs"].delete(1)
delete()
方法接受记录的主键。如果行具有复合主键,这可以是值的元组
>>> db["compound_dogs"].delete((5, 3))
删除多条记录¶
你还可以使用 table.delete_where()
删除表中与特定 WHERE 语句匹配的所有记录
>>> db = sqlite_utils.Database("dogs.db")
>>> # Delete every dog with age less than 3
>>> with db.conn:
>>> db["dogs"].delete_where("age < ?", [3])
在不带其他参数的情况下调用 table.delete_where()
将删除表中的所有行。
删除行后,传递 analyze=True
对表运行 ANALYZE
。
增补数据¶
增补允许你在记录不存在时插入它们,如果记录存在,则根据主键匹配来更新它们。
例如,给定 dogs 数据库,你可以像这样增补 Cleo 的记录
db["dogs"].upsert({
"id": 1,
"name": "Cleo",
"twitter": "cleopaws",
"age": 4,
"is_good_dog": True,
}, pk="id", column_order=("id", "twitter", "name"))
如果存在 id=1 的记录,它将被更新以匹配这些字段。如果不存在,将创建它。
.upsert()
中未引用的任何现有列将保持不变。如果你想完全替换记录,请改用 .insert(doc, replace=True)
。
请注意,如果你确定表已经创建,这里的 pk
和 column_order
参数是可选的。如果在第一次增补时表可能不存在,则应该传递它们。
upsert_all()
方法也可用,其行为类似于 insert_all()
但执行增补操作。
注意
sqlite-utils 1.x 中的 .upsert()
和 .upsert_all()
的工作方式与现在 2.x 中的 .insert(..., replace=True)
和 .insert_all(..., replace=True)
相同。有关此更改的详细信息,请参阅 issue #66。
转换列中的数据¶
table.convert(...)
方法可用于对列中的值应用转换函数,以更新该列或填充新列。它是 sqlite-utils convert 命令的 Python 库对应方法。
此功能的工作原理是注册一个应用 Python 转换的自定义 SQLite 函数,然后运行等效于 UPDATE table SET column = convert_value(column);
的 SQL 查询。
要将特定列转换为大写,可以使用以下方法
db["dogs"].convert("name", lambda value: value.upper())
你可以传递一个列列表,在这种情况下,转换将应用于每一列
db["dogs"].convert(["name", "twitter"], lambda value: value.upper())
要将转换的输出保存到不同的列,请使用 output=
参数
db["dogs"].convert("name", lambda value: value.upper(), output="name_upper")
如果新列尚不存在,这将添加新列。你可以传递 output_type=int
或其他类型来控制新列的类型 - 否则默认为文本。
如果你想在将结果保存在单独的输出列后删除原始列,请传递 drop=True
。
默认情况下,对于列值为假值(例如 0
或 None
)的任何行都将被跳过。传递 skip_false=False
可禁用此行为。
通过传递 multi=True
和返回 Python 字典的转换函数,可以从单个输入列创建多个新列。此示例创建新的 upper
和 lower
列,它们由单个 title
列填充
table.convert(
"title", lambda v: {"upper": v.upper(), "lower": v.lower()}, multi=True
)
.convert()
方法接受可选的 where=
和 where_args=
参数,可用于将转换应用于由 where 子句指定的行子集。以下是如何仅将转换应用于 id
大于 20 的行
table.convert("title", lambda v: v.upper(), where="id > :id", where_args={"id": 20})
这些参数的行为与 .rows_where() 方法的相应参数相同,因此你可以使用 ?
占位符和值列表,而不是使用字典的 :named
占位符。
使用查找表¶
在填充大表时,一个有用的模式是将常见值分解到查找表中。考虑一个 Trees 表,其中每棵树都有一个物种。理想情况下,这些物种应分解到一个单独的 Species 表中,每个物种都被分配一个整数主键,可以在 Trees 表的 species_id 列中引用。
显式创建查找表¶
调用 db["Species"].lookup({"name": "Palm"})
会创建一个名为 Species
的表(如果不存在),其中包含两列:id
和 name
。它在 name
列上设置唯一约束,以保证不包含重复行。然后它插入一个 name
设置为 Palm 的新行,并返回新的整数主键值。
如果 Species
表已存在,它将插入新行并返回主键。如果已存在具有该 name
的行,它将直接返回相应的主键值。
如果你对没有唯一约束的现有表调用 .lookup()
,它将尝试添加约束,如果无法创建约束,则会引发 IntegrityError
。
如果你传入包含多个值的字典,这两个值都将用于插入或检索相应的 ID,并且创建的任何唯一约束将覆盖所有这些列,例如
db["Trees"].insert({
"latitude": 49.1265976,
"longitude": 2.5496218,
"species": db["Species"].lookup({
"common_name": "Common Juniper",
"latin_name": "Juniperus communis"
})
})
.lookup()
方法有一个可选的第二个参数,可用于填充表中的其他列,但仅当该行尚不存在时。这些列不会包含在唯一索引中。
要创建包含首次出现时间的备注的物种记录,可以使用此方法
db["Species"].lookup({"name": "Palm"}, {"first_seen": "2021-03-04"})
第一次调用时,将为 name="Palm"
创建记录。之后使用该名称的任何调用将忽略第二个参数,即使它包含不同的值。
.lookup()
还接受关键字参数,这些参数会传递给 insert() method,可用于影响创建表的形状。支持的参数包括
pk
- 默认值为id
foreign_keys
column_order
not_null
defaults
extracts
conversions
columns
strict
在插入/增补时自动填充查找表¶
使用查找表更高效的方式是使用 extracts=
参数定义它们,.insert()
、.upsert()
、.insert_all()
、.upsert_all()
以及 .table(...)
工厂函数都接受此参数。
extracts=
指定在数据插入期间应“提取”到单独查找表中的列。
它可以是列名列表,在这种情况下,提取的表名将与列名完全匹配;也可以是字典,将列名映射到所需提取表名。
要将 species
列提取到单独的 Species
表中,可以这样做
# Using the table factory
trees = db.table("Trees", extracts={"species": "Species"})
trees.insert({
"latitude": 49.1265976,
"longitude": 2.5496218,
"species": "Common Juniper"
})
# If you want the table to be called 'species', you can do this:
trees = db.table("Trees", extracts=["species"])
# Using .insert() directly
db["Trees"].insert({
"latitude": 49.1265976,
"longitude": 2.5496218,
"species": "Common Juniper"
}, extracts={"species": "Species"})
使用多对多关系¶
sqlite-utils
包含一个用于使用多对多关系创建记录的快捷方式,即 table.m2m(...)
方法。
以下是如何用一行代码创建两个新记录并通过多对多表连接它们
db["dogs"].insert({"id": 1, "name": "Cleo"}, pk="id").m2m(
"humans", {"id": 1, "name": "Natalie"}, pk="id"
)
运行此示例实际上创建了三个表:dogs
、humans
和一个多对多表 dogs_humans
。它将在每个表中插入一条记录。
.m2m()
方法对受 .insert()
或 .update()
影响的最后一条记录执行操作 - 即由 table.last_pk
属性标识的记录。要对特定记录执行 .m2m()
,可以先通过将其主键传递给 .update()
来选择它
db["dogs"].update(1).m2m(
"humans", {"id": 2, "name": "Simon"}, pk="id"
)
.m2m()
的第一个参数可以是作为字符串的表名,也可以是表对象本身。
第二个参数可以是单个字典记录或字典列表。这些字典将传递给对指定表的 .upsert()
操作。
这是创建狗记录并向其添加两个人的备选代码
db = Database(memory=True)
dogs = db.table("dogs", pk="id")
humans = db.table("humans", pk="id")
dogs.insert({"id": 1, "name": "Cleo"}).m2m(
humans, [
{"id": 1, "name": "Natalie"},
{"id": 2, "name": "Simon"}
]
)
该方法将尝试通过查找与关系中两个表都具有外键关系的表来找到现有的多对多表。
如果找不到这样的表,它将使用两个表的名称创建一个新表 - 在此示例中为 dogs_humans
。你可以使用 .m2m()
的 m2m_table=
参数来自定义此表的名称。
如果找到多个与两个指定表都具有外键的候选表,它将引发 sqlite_utils.db.NoObviousTable
异常。你可以通过使用 m2m_table=
指定正确的表来避免此错误。
.m2m()
方法还接受一个可选的 pk=
参数,用于指定创建表时应使用的主键;以及一个可选的 alter=True
参数,用于指定如果现有表需要缺失的列,则应添加这些列。
同时使用 m2m 和查找表¶
你还可以使用 lookup=
参数作为 .m2m()
调用的一部分来使用(或创建)查找表。这接受与 table.lookup()
相同的参数 - 一个用于在查找表中查找或创建行的值字典。
此示例创建一个 dogs
表并填充它,创建一个 characteristics
表并填充它,并在两者之间建立多对多关系。它连续调用两次 .m2m()
以创建两个相关的特性
db = Database(memory=True)
dogs = db.table("dogs", pk="id")
dogs.insert({"id": 1, "name": "Cleo"}).m2m(
"characteristics", lookup={
"name": "Playful"
}
).m2m(
"characteristics", lookup={
"name": "Opinionated"
}
)
你可以像这样检查数据库以查看结果
>>> db.table_names()
['dogs', 'characteristics', 'characteristics_dogs']
>>> list(db["dogs"].rows)
[{'id': 1, 'name': 'Cleo'}]
>>> list(db["characteristics"].rows)
[{'id': 1, 'name': 'Playful'}, {'id': 2, 'name': 'Opinionated'}]
>>> list(db["characteristics_dogs"].rows)
[{'characteristics_id': 1, 'dogs_id': 1}, {'characteristics_id': 2, 'dogs_id': 1}]
>>> print(db["characteristics_dogs"].schema)
CREATE TABLE [characteristics_dogs] (
[characteristics_id] INTEGER REFERENCES [characteristics]([id]),
[dogs_id] INTEGER REFERENCES [dogs]([id]),
PRIMARY KEY ([characteristics_id], [dogs_id])
)
分析列¶
table.analyze_column(column)
方法由 analyze-tables CLI 命令使用。
它接受以下参数和选项
column
- 必需要分析的列名
common_limit
返回的最常见值的数量。默认为 10。
value_truncate
如果设置为整数,长于此长度的值将被截断为该长度。默认为 None。
most_common
如果设置为 False,返回的
ColumnDetails
的most_common
字段将设置为 None。默认为 True。least_common
如果设置为 False,返回的
ColumnDetails
的least_common
字段将设置为 None。默认为 True。
并返回一个具有以下字段的 ColumnDetails
命名元组
table
表名
column
列名
total_rows
表中的总行数
num_null
此列为空值的行数
num_blank
此列为空白(空字符串)的行数
num_distinct
此列中唯一值的数量
most_common
N
个最常见的值,作为(value, count)
元组的列表;如果表完全由唯一值组成,则为None
least_common
N
个最不常见的值,作为(value, count)
元组的列表;如果表完全唯一或唯一值的数量小于 N(因为它们已在most_common
中返回),则为None
添加列¶
你还可以使用 .add_column(col_name, col_type)
方法向表添加新列
db["dogs"].add_column("instagram", str)
db["dogs"].add_column("weight", float)
db["dogs"].add_column("dob", datetime.date)
db["dogs"].add_column("image", "BLOB")
db["dogs"].add_column("website") # str by default
你还可以使用作为字符串的 SQLite 类型指定 col_type
参数,或直接传递 Python 类型,例如 str
或 float
。
col_type
是可选的 - 如果省略它,将使用 TEXT
类型。
你可以指定的 SQLite 类型包括 "TEXT"
、"INTEGER"
、"FLOAT"
或 "BLOB"
。
如果你传递 Python 类型,它将被映射到 SQLite 类型,如下所示
float: "FLOAT"
int: "INTEGER"
bool: "INTEGER"
str: "TEXT"
bytes: "BLOB"
datetime.datetime: "TEXT"
datetime.date: "TEXT"
datetime.time: "TEXT"
datetime.timedelta: "TEXT"
# If numpy is installed
np.int8: "INTEGER"
np.int16: "INTEGER"
np.int32: "INTEGER"
np.int64: "INTEGER"
np.uint8: "INTEGER"
np.uint16: "INTEGER"
np.uint32: "INTEGER"
np.uint64: "INTEGER"
np.float16: "FLOAT"
np.float32: "FLOAT"
np.float64: "FLOAT"
注意
在 sqlite-utils 3.x 中,当正确列类型实际上是 REAL 时,FLOAT
用于浮点列。如果你指定 strict=True
,在 strict 模式下创建的表将改用正确的列类型 REAL
。我们计划在 sqlite-utils 4.x 中更改此行为以始终使用 REAL
,但这将是一个小的破坏性更改,因此将保留到下一个主要版本,详见 issue #645。
你还可以使用 fk
参数添加一个列,该列是另一个表的外键引用
db["dogs"].add_column("species_id", fk="species")
这将自动检测 species 表上的主键名称,并将其(及其类型)用于新列。
你还可以使用 fk_col
显式指定要引用的列
db["dogs"].add_column("species_id", fk="species", fk_col="ref")
你还可以使用 not_null_default
在新列上设置 NOT NULL DEFAULT 'x'
约束
db["dogs"].add_column("friends_count", int, not_null_default=0)
在插入/更新时自动添加列¶
你还可以插入或更新包含新列的数据,并使用 alter=True
参数自动修改表以适应新模式。此参数可以传递给 .insert()
、.upsert()
、.insert_all()
和 .upsert_all()
这四个方法,或者传递给 db.table(table_name, alter=True)
以在该表实例的所有方法调用中默认启用它。
db["new_table"].insert({"name": "Gareth"})
# This will throw an exception:
db["new_table"].insert({"name": "Gareth", "age": 32})
# This will succeed and add a new "age" integer column:
db["new_table"].insert({"name": "Gareth", "age": 32}, alter=True)
# You can see confirm the new column like so:
print(db["new_table"].columns_dict)
# Outputs this:
# {'name': <class 'str'>, 'age': <class 'int'>}
# This works too:
new_table = db.table("new_table", alter=True)
new_table.insert({"name": "Gareth", "age": 32, "shoe_size": 11})
添加外键约束¶
SQLite 的 ALTER TABLE
语句不具备向现有列添加外键引用的能力。
此处的 add_foreign_key()
方法是 table.transform() 的方便包装。
也可以通过直接更新 sqlite_master 表来添加外键。sqlite-utils-fast-fks 插件实现了这种模式,使用了 sqlite-utils 3.35 版本之前包含的代码。
这是一个此机制的实际示例
db["authors"].insert_all([
{"id": 1, "name": "Sally"},
{"id": 2, "name": "Asheesh"}
], pk="id")
db["books"].insert_all([
{"title": "Hedgehogs of the world", "author_id": 1},
{"title": "How to train your wolf", "author_id": 2},
])
db["books"].add_foreign_key("author_id", "authors", "id")
table.add_foreign_key(column, other_table, other_column)
方法接受列名、被引用的表以及该表中键列的名称。如果你省略 other_column
参数,将自动使用该表的主键。如果你省略 other_table
参数,将根据一些简单规则猜测表名
如果列的格式为
author_id
,则查找名为author
或authors
的表如果列名不以
_id
结尾,则尝试查找与列名完全相同的表或该名称加上“s”的表
此方法首先检查指定的外键引用现有的表和列,并且不与现有外键冲突。如果这些检查失败,将抛出 sqlite_utils.db.AlterError
异常。
要忽略键已存在的情况,请使用 ignore=True
db["books"].add_foreign_key("author_id", "authors", "id", ignore=True)
一次添加多个外键约束¶
你还可以使用 db.add_foreign_keys(...)
一次添加多个外键。此方法接受一个由四个元素组成的元组列表,每个元组指定一个 table
、column
、other_table
和 other_column
。
这是一个一次添加两个外键的示例
db.add_foreign_keys([
("dogs", "breed_id", "breeds", "id"),
("dogs", "home_town_id", "towns", "id")
])
此方法运行与 .add_foreign_keys()
相同的检查,如果这些检查失败,将抛出 sqlite_utils.db.AlterError
异常。
为所有外键添加索引¶
如果你想确保数据库中的每个外键列都有相应的索引,可以这样做
db.index_foreign_keys()
删除表或视图¶
你还可以使用 .drop()
方法删除表或视图
db["my_table"].drop()
如果你想忽略表或视图不存在导致的错误,请传递 ignore=True
。
db["my_table"].drop(ignore=True)
转换表¶
SQLite 的 ALTER TABLE
语句功能有限。它可以添加和删除列、重命名表,但不能更改列类型、更改 NOT NULL
状态或更改表的主键。
table.transform()
方法可以通过执行 SQLite 文档中描述的多步模式来完成所有这些操作
开始一个事务
使用所需更改
CREATE TABLE tablename_new_x123
使用
INSERT INTO tablename_new_x123 SELECT * FROM tablename;
将旧数据复制到新表中DROP TABLE tablename;
ALTER TABLE tablename_new_x123 RENAME TO tablename;
提交事务
.transform()
方法接受多个参数,所有参数都是可选的。
额外的好处是,调用 .transform()
会重新格式化存储在 SQLite 中的表模式,使其更易读。即使您不带任何参数调用它,此功能也有效。
要保留原始表而不是删除它,请传递 keep_table=
选项并指定您希望将其重命名为什么名称
table.transform(types={"age": int}, keep_table="original_table")
如果表无法转换,通常是因为存在与列修改不兼容的现有约束或索引,则此方法会引发 sqlite_utils.db.TransformError
异常。
修改列类型¶
要修改列的类型,请使用 types=
参数
# Convert the 'age' column to an integer, and 'weight' to a float
table.transform(types={"age": int, "weight": float})
有关可用类型列表,请参阅添加列。
重命名列¶
rename=
参数可以重命名列
# Rename 'age' to 'initial_age':
table.transform(rename={"age": "initial_age"})
删除列¶
要删除列,请将它们作为集合传递给 drop=
# Drop the 'age' column:
table.transform(drop={"age"})
更改主键¶
要更改表的主键,请使用 pk=
。对于常规主键,可以传递单个列;对于复合主键,可以传递一个列元组。传递 pk=None
将移除主键并将表转换为 rowid
表。
# Make `user_id` the new primary key
table.transform(pk="user_id")
更改非空(not null)状态¶
您可以使用 not_null=
更改列的 NOT NULL
状态。您可以将一个列集合传递给它,以将这些列设置为 NOT NULL
# Make the 'age' and 'weight' columns NOT NULL
table.transform(not_null={"age", "weight"})
如果您想将现有 NOT NULL
列更改为允许空值,可以通过传递 true/false 值的字典来实现
# 'age' is NOT NULL but we want to allow NULL:
table.transform(not_null={"age": False})
# Make age allow NULL and switch weight to being NOT NULL:
table.transform(not_null={"age": False, "weight": True})
修改列默认值¶
defaults=
参数可用于设置或更改不同列的默认值
# Set default age to 1:
table.transform(defaults={"age": 1})
# Now remove the default from that column:
table.transform(defaults={"age": None})
更改列顺序¶
column_order=
参数可用于更改列的顺序。如果您传递部分列的名称,这些列将排在前面,而您省略的列将按其现有顺序排在后面。
# Change column order
table.transform(column_order=("name", "age", "id")
添加外键约束¶
您可以使用 add_foreign_keys=
参数向表添加一个或多个外键约束
db["places"].transform(
add_foreign_keys=(
("country", "country", "id"),
("continent", "continent", "id")
)
)
这接受与指定外键中描述的相同参数 - 因此您可以将它们指定为 (column, other_table, other_column)
的完整元组,或者如果您仅传递列名,并且可以从列名自动推断出表,则可以采用快捷方式。
db["places"].transform(
add_foreign_keys=(("country", "continent"))
)
替换外键约束¶
foreign_keys=
参数类似于 add_foreign_keys=
,但可用于替换表上的所有外键约束,删除任何未明确提及的约束
db["places"].transform(
foreign_keys=(
("continent", "continent", "id"),
)
)
删除外键约束¶
您可以使用 .transform()
从表中移除外键约束。
此示例删除了两个外键 - 一个是从 places.country
到 country.id
的,另一个是从 places.continent
到 continent.id
的
db["places"].transform(
drop_foreign_keys=("country", "continent")
)
使用 .transform_sql() 进行自定义转换¶
.transform()
方法可以处理大多数情况,但它不会自动升级与正在转换的表关联的索引、视图或触发器。
如果您想执行更高级的操作,可以使用与传递给 table.transform(...)
相同的参数调用 table.transform_sql(...)
方法。
此方法将返回应执行的 SQL 语句列表以实现更改。然后,您可以在执行这些 SQL 之前对其进行修改 - 或添加额外的 SQL 语句。
将列提取到单独的表中¶
table.extract()
方法可用于将指定的列提取到单独的表中。
想象一下,一个 Trees
表如下所示
id |
TreeAddress |
Species |
---|---|---|
1 |
52 Vine St |
Palm |
2 |
12 Draft St |
Oak |
3 |
51 Dark Ave |
Palm |
4 |
1252 左街 |
Palm |
1252 Left St
Species
列包含重复值。通过将该列提取到单独的 Species
表中并使用外键列指向它,可以改进此数据库。
CREATE TABLE [Trees] (
[id] INTEGER PRIMARY KEY,
[TreeAddress] TEXT,
[Species] TEXT
)
上述表的模式为
db["Trees"].extract("Species")
以下是使用 .extract()
提取 Species
列的方法
CREATE TABLE "Trees" (
[id] INTEGER PRIMARY KEY,
[TreeAddress] TEXT,
[Species_id] INTEGER,
FOREIGN KEY(Species_id) REFERENCES Species(id)
)
运行此代码后,表模式现在如下所示
CREATE TABLE [Species] (
[id] INTEGER PRIMARY KEY,
[Species] TEXT
)
将创建一个新的 Species
表,其模式如下
.extract()
方法默认创建一个与被提取列同名的表,并添加一个名为 tablename_id
的外键列。
db["Trees"].extract("Species", table="tree_species", fk_column="tree_species_id")
您可以使用 table=
指定自定义表名,使用 fk_column=
指定自定义外键名。此示例创建一个名为 tree_species
的表和一个名为 tree_species_id
的外键列
CREATE TABLE "Trees" (
[id] INTEGER PRIMARY KEY,
[TreeAddress] TEXT,
[tree_species_id] INTEGER,
FOREIGN KEY(tree_species_id) REFERENCES tree_species(id)
)
CREATE TABLE [tree_species] (
[id] INTEGER PRIMARY KEY,
[Species] TEXT
)
结果模式如下所示
id |
TreeAddress |
您还可以将多个列提取到同一个外部表中。例如,假设您有一个如下所示的表 |
CommonName |
---|---|---|---|
1 |
52 Vine St |
Palm |
LatinName |
2 |
12 Draft St |
Oak |
Arecaceae |
3 |
51 Dark Ave |
Palm |
LatinName |
4 |
1252 左街 |
Palm |
LatinName |
Quercus
db["Trees"].extract(["CommonName", "LatinName"])
您可以将 ["CommonName", "LatinName"]
传递给 .extract()
来提取这两列
CREATE TABLE "Trees" (
[id] INTEGER PRIMARY KEY,
[TreeAddress] TEXT,
[CommonName_LatinName_id] INTEGER,
FOREIGN KEY(CommonName_LatinName_id) REFERENCES CommonName_LatinName(id)
)
CREATE TABLE [CommonName_LatinName] (
[id] INTEGER PRIMARY KEY,
[CommonName] TEXT,
[LatinName] TEXT
)
这将生成以下模式
db["Trees"].extract(["CommonName", "LatinName"], table="Species", fk_column="species_id")
您可以将 ["CommonName", "LatinName"]
传递给 .extract()
来提取这两列
CREATE TABLE "Trees" (
[id] INTEGER PRIMARY KEY,
[TreeAddress] TEXT,
[species_id] INTEGER,
FOREIGN KEY(species_id) REFERENCES Species(id)
)
CREATE TABLE [Species] (
[id] INTEGER PRIMARY KEY,
[CommonName] TEXT,
[LatinName] TEXT
)
表名 CommonName_LatinName
来自于提取的列。您可以使用 table=
和 fk_column=
指定自定义名称,如下所示
db["Trees"].extract(
["CommonName", "LatinName"],
table="Species",
fk_column="species_id",
rename={"CommonName": "name", "LatinName": "latin"}
)
您可以使用 rename=
参数重命名查找表中的列。要创建一个名为 Species
,包含名为 name
和 latin
的列的表,您可以这样做
CREATE TABLE [Species] (
[id] INTEGER PRIMARY KEY,
[name] TEXT,
[latin] TEXT
)
这将生成一个如下所示的查找表
有时,您会遇到处理没有明确提供的 ID 的数据集的情况,但您希望为其分配一个 ID,以便以后可以 upsert 到该表中而不会创建重复记录。
在这种情况下,一个有用的技术是创建一个由行内容的 sha1 哈希值派生的 ID。
db = sqlite_utils.Database("dogs.db")
db["dogs"].upsert({"name": "Cleo", "twitter": "cleopaws"}, hash_id="id")
print(list(db["dogs]))
sqlite-utils
可以使用 hash_id=
选项为您完成此操作。例如
[{'id': 'f501265970505d9825d8d9f590bfab3519fb20b1', 'name': 'Cleo', 'twitter': 'cleopaws'}]
输出
dog_id = db["dogs"].upsert({
"name": "Cleo",
"twitter": "cleopaws"
}, hash_id="id").last_pk
# dog_id is now "f501265970505d9825d8d9f590bfab3519fb20b1"
如果您将立即使用该 ID,可以使用 last_pk
访问它
db["dogs"].upsert(
{"name": "Cleo", "twitter": "cleopaws", "age": 7},
hash_id_columns=("name", "twitter")
)
将使用所有列值创建哈希。要使用列的子集创建哈希,请传递 hash_id_columns=
参数
如果您指定了 hash_id_columns=
,则 hash_id=
参数是可选的 - 它将默认将哈希值放入名为 id
的列中。
您可以使用 hash_record(record, keys=…) 工具函数手动计算这些哈希值。
db.create_view("good_dogs", """
select * from dogs where is_good_dog = 1
""")
数据库类上的 .create_view()
方法可用于创建视图
如果已存在具有该名称的视图,这将引发 sqlite_utils.utils.OperationalError
异常。
db.create_view("good_dogs", """
select * from dogs where is_good_dog = 1
""", replace=True)
您可以传递 ignore=True
以静默忽略现有视图并不做任何操作,或者传递 replace=True
以在新定义与当前视图的 select 语句不同时替换现有视图
SQLite 具有出色的 JSON 支持,sqlite-utils
可以帮助您利用这一点:如果您尝试插入可以表示为 JSON 列表或字典的值,sqlite-utils
将创建一个 TEXT 列并将您的数据存储为序列化的 JSON。这意味着您可以快速将甚至复杂的数据结构存储在 SQLite 中,并使用 JSON 功能查询它们。
db["niche_museums"].insert({
"name": "The Bigfoot Discovery Museum",
"url": "http://bigfootdiscoveryproject.com/"
"hours": {
"Monday": [11, 18],
"Wednesday": [11, 18],
"Thursday": [11, 18],
"Friday": [11, 18],
"Saturday": [11, 18],
"Sunday": [11, 18]
},
"address": {
"streetAddress": "5497 Highway 9",
"addressLocality": "Felton, CA",
"postalCode": "95018"
}
})
db.execute("""
select json_extract(address, '$.addressLocality')
from niche_museums
""").fetchall()
# Returns [('Felton, CA',)]
例如
有时,在插入值之前通过 SQL 函数处理它们可能很有用。一个简单的示例可能是在插入值时将其转换为大写。
conversions={...}
参数可用于指定自定义 SQL,作为 INSERT
或 UPDATE
SQL 语句的一部分。
db["example"].insert({
"name": "The Bigfoot Discovery Museum"
}, conversions={"name": "upper(?)"})
# list(db["example"].rows) now returns:
# [{'name': 'THE BIGFOOT DISCOVERY MUSEUM'}]
您可以像这样为特定列指定大写转换
字典键是要转换的列名。值是要使用的 SQL 片段,其中 ?
占位符代表原始值。
import sqlite3
import sqlite_utils
from shapely.geometry import shape
import httpx
db = sqlite_utils.Database("places.db")
# Initialize SpatiaLite
db.init_spatialite()
# Use sqlite-utils to create a places table
places = db["places"].create({"id": int, "name": str})
# Add a SpatiaLite 'geometry' column
places.add_geometry_column("geometry", "MULTIPOLYGON")
# Fetch some GeoJSON from Who's On First:
geojson = httpx.get(
"https://raw.githubusercontent.com/whosonfirst-data/"
"whosonfirst-data-admin-gb/master/data/404/227/475/404227475.geojson"
).json()
# Convert to "Well Known Text" format using shapely
wkt = shape(geojson["geometry"]).wkt
# Insert the record, converting the WKT to a SpatiaLite geometry:
db["places"].insert(
{"name": "Wales", "geometry": wkt},
conversions={"geometry": "GeomFromText(?, 4326)"},
)
一个更有用的示例:如果您正在使用 SpatiaLite,您可能希望从 WKT 值创建几何值。执行此操作的代码可能如下所示
此示例使用来自 Who’s On First 的地理数据,并依赖于 Shapely 和 HTTPX Python 库。
>>> db.sqlite_version
(3, 36, 0)
db.sqlite_version
属性返回一个整数元组,表示用于该数据库对象的 SQLite 版本
full_sql = "".join(db.iterdump())
db.iterdump()
方法返回表示数据库完整转储的 SQL 字符串序列。像这样使用它
这使用了 sqlite3.Connection.iterdump() 方法。
pip install sqlite-dump
如果您正在使用 pysqlite3
或 sqlean.py
,可能缺少底层方法。如果您安装 sqlite-dump 包,则 db.iterdump()
方法将改用该实现
>>> db["PlantType"]
<Table PlantType (id, value)>
如果您已加载现有表或视图,可以使用自省来了解更多信息
>>> db["PlantType"].exists()
True
>>> db["PlantType2"].exists()
False
.exists()
方法可用于判断表是否存在
>>> db["PlantType"].count
3
>>> db["Street_Tree_List"].count
189144
.count
属性显示当前行数(select count(*) from table
)
如果数据库上设置了 use_counts_table
属性,此属性将利用 使用触发器缓存表计数。您可以通过调用 table.count_where()
而不是访问该属性来完全避免此优化。
>>> db["PlantType"].columns
[Column(cid=0, name='id', type='INTEGER', notnull=0, default_value=None, is_pk=1),
Column(cid=1, name='value', type='TEXT', notnull=0, default_value=None, is_pk=0)]
.columns
属性显示表或视图中的列。它返回 Column(cid, name, type, notnull, default_value, is_pk)
命名元组的列表。
>>> db["PlantType"].columns_dict
{'id': <class 'int'>, 'value': <class 'str'>}
.columns_dict
属性返回列的字典版本,其中仅包含名称和 Python 类型
>>> db["table_with_defaults"].default_values
{'score': 5}
.default_values
属性返回具有默认值的每个列的默认值字典
>>> db["PlantType"].pks
['id']
.pks
属性返回表的 प्राइमरी key 列名字符串列表
如果表没有主键但是一个 rowid 表,则此属性将返回 ['rowid']
。
>>> db["PlantType"].use_rowid
False
几乎所有 SQLite 表都具有 rowid
列,但没有明确定义主键的表必须使用该 rowid
作为标识单独行的主键。.use_rowid
属性检查表是否需要以这种方式使用 rowid
- 如果表没有明确定义的主键,则返回 True
,否则返回 False
。
>>> db["Street_Tree_List"].foreign_keys
[ForeignKey(table='Street_Tree_List', column='qLegalStatus', other_table='qLegalStatus', other_column='id'),
ForeignKey(table='Street_Tree_List', column='qCareAssistant', other_table='qCareAssistant', other_column='id'),
ForeignKey(table='Street_Tree_List', column='qSiteInfo', other_table='qSiteInfo', other_column='id'),
ForeignKey(table='Street_Tree_List', column='qSpecies', other_table='qSpecies', other_column='id'),
ForeignKey(table='Street_Tree_List', column='qCaretaker', other_table='qCaretaker', other_column='id'),
ForeignKey(table='Street_Tree_List', column='PlantType', other_table='PlantType', other_column='id')]
.foreign_keys
属性返回表的外键关系,作为 ForeignKey(table, column, other_table, other_column)
命名元组的列表。它在视图上不可用。
>>> print(db["Street_Tree_List"].schema)
CREATE TABLE "Street_Tree_List" (
"TreeID" INTEGER,
"qLegalStatus" INTEGER,
"qSpecies" INTEGER,
"qAddress" TEXT,
"SiteOrder" INTEGER,
"qSiteInfo" INTEGER,
"PlantType" INTEGER,
"qCaretaker" INTEGER,
"qCareAssistant" INTEGER,
"PlantDate" TEXT,
"DBH" INTEGER,
"PlotSize" TEXT,
"PermitNotes" TEXT,
"XCoord" REAL,
"YCoord" REAL,
"Latitude" REAL,
"Longitude" REAL,
"Location" TEXT
,
FOREIGN KEY ("PlantType") REFERENCES [PlantType](id),
FOREIGN KEY ("qCaretaker") REFERENCES [qCaretaker](id),
FOREIGN KEY ("qSpecies") REFERENCES [qSpecies](id),
FOREIGN KEY ("qSiteInfo") REFERENCES [qSiteInfo](id),
FOREIGN KEY ("qCareAssistant") REFERENCES [qCareAssistant](id),
FOREIGN KEY ("qLegalStatus") REFERENCES [qLegalStatus](id))
.schema
属性将表的模式作为 SQL 字符串输出
>>> db["ny_times_us_counties"].strict
False
.strict
属性标识表是否为 SQLite STRICT table。
>>> db["Street_Tree_List"].indexes
[Index(seq=0, name='"Street_Tree_List_qLegalStatus"', unique=0, origin='c', partial=0, columns=['qLegalStatus']),
Index(seq=1, name='"Street_Tree_List_qCareAssistant"', unique=0, origin='c', partial=0, columns=['qCareAssistant']),
Index(seq=2, name='"Street_Tree_List_qSiteInfo"', unique=0, origin='c', partial=0, columns=['qSiteInfo']),
Index(seq=3, name='"Street_Tree_List_qSpecies"', unique=0, origin='c', partial=0, columns=['qSpecies']),
Index(seq=4, name='"Street_Tree_List_qCaretaker"', unique=0, origin='c', partial=0, columns=['qCaretaker']),
Index(seq=5, name='"Street_Tree_List_PlantType"', unique=0, origin='c', partial=0, columns=['PlantType'])]
.indexes
属性返回为表创建的所有索引,作为 Index(seq, name, unique, origin, partial, columns)
命名元组的列表。它在视图上不可用。
>>> db["ny_times_us_counties"].xindexes
[
XIndex(
name='idx_ny_times_us_counties_date',
columns=[
XIndexColumn(seqno=0, cid=0, name='date', desc=1, coll='BINARY', key=1),
XIndexColumn(seqno=1, cid=-1, name=None, desc=0, coll='BINARY', key=0)
]
),
XIndex(
name='idx_ny_times_us_counties_fips',
columns=[
XIndexColumn(seqno=0, cid=3, name='fips', desc=0, coll='BINARY', key=1),
XIndexColumn(seqno=1, cid=-1, name=None, desc=0, coll='BINARY', key=0)
]
)
]
.xindexes
属性使用 SQLite PRAGMA index_xinfo() 机制返回表中索引的更详细信息。它返回一个 XIndex(name, columns)
命名元组列表,其中 columns
是 XIndexColumn(seqno, cid, name, desc, coll, key)
命名元组的列表。
>>> db["authors"].triggers
[Trigger(name='authors_ai', table='authors', sql='CREATE TRIGGER [authors_ai] AFTER INSERT...'),
Trigger(name='authors_ad', table='authors', sql="CREATE TRIGGER [authors_ad] AFTER DELETE..."),
Trigger(name='authors_au', table='authors', sql="CREATE TRIGGER [authors_au] AFTER UPDATE")]
>>> db.triggers
... similar output to db["authors"].triggers
.triggers
属性列出数据库触发器。它可用于数据库对象和表对象。它返回一个 Trigger(name, table, sql)
命名元组列表。
>>> db["authors"].triggers_dict
{'authors_ai': 'CREATE TRIGGER [authors_ai] AFTER INSERT...',
'authors_ad': 'CREATE TRIGGER [authors_ad] AFTER DELETE...',
'authors_au': 'CREATE TRIGGER [authors_au] AFTER UPDATE'}
.triggers_dict
属性将该表的触发器作为字典返回,将其名称映射到其 SQL 定义。
>>> db.triggers_dict
{'authors_ai': 'CREATE TRIGGER [authors_ai] AFTER INSERT...',
'authors_ad': 'CREATE TRIGGER [authors_ad] AFTER DELETE...',
'authors_au': 'CREATE TRIGGER [authors_au] AFTER UPDATE'}
数据库上存在相同的属性,它将返回所有表中的所有触发器
>>> db["authors"].detect_fts()
"authors_fts"
detect_fts()
方法返回关联的 SQLite FTS 表名(如果存在且与此表关联)。如果表未配置全文搜索,则返回 None
。
>>> db["authors"].enable_fts(["name"])
>>> db["authors_fts"].virtual_table_using
"FTS5"
.virtual_table_using
属性揭示了表是否为虚拟表。对于常规表,它返回 None
,否则返回虚拟表类型的全大写版本。例如
>>> db["authors"].has_counts_triggers
False
>>> db["authors"].enable_counts()
>>> db["authors"].has_counts_triggers
True
.has_counts_triggers
属性显示表是否已配置触发器以更新 _counts
表,如 使用触发器缓存表计数中所述。
>>> db.supports_strict
True
数据库对象上的此属性返回 True
,如果可用的 SQLite 版本支持 STRICT 模式,该模式在 SQLite 3.37.0(2021-11-27)中添加。
SQLite 包含实现强大全文搜索的捆绑扩展。
db["dogs"].enable_fts(["name", "twitter"])
您可以使用 .enable_fts(columns)
为表启用全文搜索
rows = list(db["dogs"].search("cleo"))
然后,您可以使用 .search()
方法运行搜索
此方法返回一个生成器,可以迭代该生成器以获取每行的字典,类似于列出行。
db["dogs"].insert({
"id": 2,
"name": "Marnie",
"twitter": "MarnieTheDog",
"age": 16,
"is_good_dog": True,
}, pk="id")
db["dogs"].populate_fts(["name", "twitter"])
如果您向表中插入更多记录,则需要使用 populate_fts()
刷新搜索索引
db["dogs"].enable_fts(["name", "twitter"], create_triggers=True)
更好的解决方案是使用数据库触发器。您可以使用 create_triggers=True
设置数据库触发器以自动更新全文索引
db["dogs"].enable_fts(["name", "twitter"], fts_version="FTS4")
.enable_fts()
默认使用 FTS5。如果您希望改用 FTS4,请使用以下内容
db["articles"].enable_fts(["headline", "body"], tokenize="porter")
您可以使用 tokenize=
参数自定义为表配置的分词器。例如,要启用 Porter 词干提取,以便像“running”这样的英文单词可以匹配像“run”这样的词干替代词,请使用 tokenize="porter"
SQLite 文档提供了更多关于 FTS5 分词器和 FTS4 分词器的信息。porter
是两者的有效选项。
如果您尝试配置已存在的 FTS 表,将引发 sqlite3.OperationalError
异常。
db["articles"].enable_fts(["headline"], tokenize="porter", replace=True)
您可以使用 replace=True
将现有表替换为新配置
如果 FTS 表已经存在,这将不起作用;否则,它将删除并使用新设置重新创建该表。这考虑了列、分词器、使用的 FTS 版本以及表是否具有触发器。
db["dogs"].disable_fts()
要移除您创建的 FTS 表和触发器,请使用 disable_fts()
表方法
SQLite 支持高级搜索查询语法。在某些情况下,您可能希望禁用此功能,因为像 .
这样的字符可能具有特殊含义,这会导致在搜索用户提供的字符串时出错。
db.quote_fts("Search term.")
# Returns: '"Search" "term."'
db.quote_fts(query)
方法返回应用了 SQLite 全文搜索引号的查询,以便该查询可以安全地用于搜索
for article in db["articles"].search("jquery"):
print(article)
table.search(q)
方法返回一个生成器,用于迭代与搜索短语 q
匹配的行的 Python 字典,结果按相关性排序,最相关的结果排在前面。
.search()
方法还接受以下可选参数order_by
字符串- 用于排序的列。默认为相关性评分。可以可选地包含
desc
,例如rowid desc
。 columns
字符串数组- 要返回的列。默认为所有列。
limit
整数- 要返回的结果数量。默认为所有结果。
offset
整数- 与 limit 参数一起使用的偏移量。
where
字符串- 用于 WHERE 子句的额外 SQL 片段
where_args
字典- 用于额外 WHERE 子句中
:param
占位符的参数 include_rank
布尔值- 如果设置,将包含一个
rank
列,其中包含 BM25 排名分数 - 仅适用于 FTS5 表。 quote
布尔值
将FTS 引用规则应用于搜索查询,禁用高级查询语法,从而避免意外错误。
for article in db["articles"].search(
"dog",
order_by="published desc",
limit=3,
where="id > :min_id",
where_args={"min_id": 10},
columns=["title", "published"]
):
print(article)
要只返回与 "dog"
匹配的三个结果的 title 和 published 列,其中 id
大于 10,并按 published
排序(最新在前),请使用以下代码
使用 table.search_sql() 构建 SQL 查询¶
print(db["articles"].search_sql(columns=["title", "author"]))
sqlite-utils
可以使用 hash_id=
选项为您完成此操作。例如
with original as (
select
rowid,
[title],
[author]
from [articles]
)
select
[original].[title],
[original].[author]
from
[original]
join [articles_fts] on [original].rowid = [articles_fts].rowid
where
[articles_fts] match :query
order by
[articles_fts].rank
您可以使用 table.search_sql()
方法生成用于搜索的 SQL 查询。它接受与 table.search()
相同的参数,但搜索查询和 where_args
参数除外,因为这些参数应在执行返回的 SQL 时提供。
此方法检测 SQLite 表是使用 FTS4 还是 FTS5,并根据搜索类型输出按相关性排序的正确 SQL。
with original as (
select
rowid,
[title],
[author]
from [articles]
)
select
[original].[title],
[original].[author]
from
[original]
join [articles_fts] on [original].rowid = [articles_fts].rowid
where
[articles_fts] match :query
order by
rank_bm25(matchinfo([articles_fts], 'pcnalx'))
FTS4 输出如下所示
db.register_fts4_bm25()
这使用了来自 sqlite-fts4 的 rank_bm25()
自定义 SQL 函数。您可以使用此方法将该自定义函数注册到 Database
连接上
db["dogs"].rebuild_fts()
您可以使用 table.rebuild_fts()
方法重建表。这对于表配置更改或索引数据以某种方式损坏时很有用。
db["dogs_fts"].rebuild_fts()
此方法可以在已配置全文搜索的表(此例中为 dogs
)上调用,也可以直接在 _fts
表上调用
INSERT INTO dogs_fts (dogs_fts) VALUES ("rebuild");
这会运行以下 SQL
db["dogs"].optimize()
此方法可以在已配置全文搜索的表(此例中为 dogs
)上调用,也可以直接在 _fts
表上调用
INSERT INTO dogs_fts (dogs_fts) VALUES ("optimize");
填充 FTS 表后,您可以对其进行优化以大幅减小其大小,如下所示
SQLite 中的 select count(*)
查询需要对主键索引进行完全扫描,并且随着表变大,所需时间会越来越长。
db["dogs"].enable_counts()
table.enable_counts()
方法可用于配置触发器,以持续更新 _counts
表中的记录。然后可以使用此值快速检索关联表中的行数。
CREATE TABLE [_counts] (
[table] TEXT PRIMARY KEY,
[count] INTEGER DEFAULT 0
)
如果 _counts
表尚不存在,这将创建它,其模式如下
db.enable_counts()
您可以使用数据库的 enable_counts()
方法为数据库中的每个表启用缓存计数(虚拟表和 _counts
表本身除外)
启用后,表计数将存储在 _counts
表中。当行被添加或删除时,计数记录将由触发器自动保持最新。
>>> db.cached_counts()
{'global-power-plants': 33643,
'global-power-plants_fts_data': 136,
'global-power-plants_fts_idx': 199,
'global-power-plants_fts_docsize': 33643,
'global-power-plants_fts_config': 1}
要访问这些计数,您可以直接查询 _counts
表,也可以使用 db.cached_counts()
方法。此方法返回一个字典,将表映射到其计数
>>> db.cached_counts(["global-power-plants"])
{'global-power-plants': 33643}
您可以向此方法传递一个表名列表,以仅检索这些计数
table.count
属性默认执行 select count(*)
查询,除非将 db.use_counts_table
属性设置为 True
。
db = Database("global-power-plants.db", use_counts_table=True)
您可以在实例化数据库对象时将 use_counts_table
设置为 True
如果该属性为 True
,则对 table.count
属性的任何调用将首先尝试在 _counts
表中查找缓存的计数,如果值不可用或表丢失,则回退到 count(*)
查询。
在数据库或表对象上调用 .enable_counts()
方法将把该数据库对象的 use_counts_table
设置为 True
,在其生命周期内有效。
db.reset_counts()
如果 _counts
表与实际表计数不同步,可以使用 .reset_counts()
方法修复它
db["dogs"].create_index(["is_good_dog"])
您可以使用 .create_index(columns)
方法在表上创建索引。该方法接受一个列列表
默认情况下,索引将被命名为 idx_{table-name}_{columns}
。如果您传递 find_unique_name=True
且自动派生的名称已存在,则将通过递增后缀数字(例如 idx_items_title_2
)找到一个可用名称。
db["dogs"].create_index(
["is_good_dog", "age"],
index_name="good_dogs_by_age"
)
您可以通过传递 index_name
参数来自定义创建的索引的名称
from sqlite_utils.db import DescIndex
db["dogs"].create_index(
["is_good_dog", DescIndex("age")],
index_name="good_dogs_by_age"
)
要为某列创建降序索引,请像这样将列名包装在 db.DescIndex()
中
db["dogs"].create_index(["name"], unique=True)
您可以通过传递 unique=True
创建唯一索引
使用 if_not_exists=True
可以实现在同名索引已存在时不做任何操作。
传递 analyze=True
在创建新索引后对其运行 ANALYZE
。
SQLite ANALYZE 命令构建一个统计信息表,查询计划程序可以使用该表更好地决定针对给定查询使用哪些索引。
如果您的数据库很大,并且您认为您的索引没有被有效利用,则应该运行 ANALYZE
。
db.analyze()
要对数据库中的每个索引运行 ANALYZE
,请使用以下代码
db.analyze("idx_countries_country_name")
要仅针对特定的命名索引运行,请将索引名称传递给该方法
db["dogs"].analyze()
要针对附加到特定表的所有索引运行,您可以将表名传递给 db.analyze(...)
,也可以直接在表上调用该方法,如下所示
Database("my_database.db").vacuum()
您可以通过对数据库运行 VACUUM 来优化它,如下所示
Database("my_database.db").enable_wal()
您可以使用 .enable_wal()
为数据库启用 Write-Ahead Logging
Database("my_database.db").disable_wal()
您可以使用 .disable_wal()
禁用 WAL 模式
journal_mode = Database("my_database.db").journal_mode
您可以使用 journal_mode
属性检查数据库当前的日志模式
这通常是 wal
或 delete
(表示 WAL 已禁用),但也可能具有其他值 - 请参阅PRAGMA journal_mode 文档。
当您为插入或 upsert 的 Python 字典列表创建新表时,这些方法会根据您传入的数据检测数据库列的正确类型。
在某些情况下,您可能需要干预此过程,以某种方式自定义正在创建的列 - 请参阅明确创建表。
db["cats"].create({
"id": int,
"name": str,
"weight": float,
})
该表 .create()
方法接受一个字典,将列名映射到它们应该存储的 Python 类型
SQLite 具有出色的 JSON 支持,sqlite-utils
可以帮助您利用这一点:如果您尝试插入可以表示为 JSON 列表或字典的值,sqlite-utils
将创建一个 TEXT 列并将您的数据存储为序列化的 JSON。这意味着您可以快速将甚至复杂的数据结构存储在 SQLite 中,并使用 JSON 功能查询它们。
from sqlite_utils import Database, suggest_column_types
cats = [{
"id": 1,
"name": "Snowflake"
}, {
"id": 2,
"name": "Crabtree",
"age": 4
}]
types = suggest_column_types(cats)
# types now looks like this:
# {"id": <class 'int'>,
# "name": <class 'str'>,
# "age": <class 'int'>}
# Manually add an extra field:
types["thumbnail"] = bytes
# types now looks like this:
# {"id": <class 'int'>,
# "name": <class 'str'>,
# "age": <class 'int'>,
# "thumbnail": <class 'bytes'>}
# Create the table
db = Database("cats.db")
db["cats"].create(types, pk="id")
# Insert the records
db["cats"].insert_all(cats)
# list(db["cats"].rows) now returns:
# [{"id": 1, "name": "Snowflake", "age": None, "thumbnail": None}
# {"id": 2, "name": "Crabtree", "age": 4, "thumbnail": None}]
# The table schema looks like this:
# print(db["cats"].schema)
# CREATE TABLE [cats] (
# [id] INTEGER PRIMARY KEY,
# [name] TEXT,
# [age] INTEGER,
# [thumbnail] BLOB
# )
您可以使用 suggest_column_types()
助手函数从记录列表中派生出列名和类型字典,适合传递给 table.create()
。
SQLite 支持注册用 Python 编写的自定义 SQL 函数。db.register_function()
方法允许您注册这些函数,并跟踪已注册的函数。
from sqlite_utils import Database
db = Database(memory=True)
def reverse_string(s):
return "".join(reversed(list(s)))
db.register_function(reverse_string)
print(db.execute('select reverse_string("hello")').fetchone()[0])
# This prints "olleh"
如果您将其用作方法,它将自动检测函数所需的名称和参数数量
@db.register_function
def reverse_string(s):
return "".join(reversed(list(s)))
print(db.execute('select reverse_string("hello")').fetchone()[0])
您也可以像这样使用该方法作为函数装饰器
@db.register_function(name="rev")
def reverse_string(s):
return "".join(reversed(list(s)))
print(db.execute('select rev("hello")').fetchone()[0])
默认情况下,Python 函数的名称将用作 SQL 函数的名称。您可以使用 name=
关键字参数自定义此名称
@db.register_function(deterministic=True)
def reverse_string(s):
return "".join(reversed(list(s)))
Python 3.8 添加了注册确定性 SQLite 函数的能力,允许您指明函数对于任何给定输入将返回完全相同的结果,从而允许 SQLite 应用一些性能优化。您可以使用 deterministic=True
将函数标记为确定性函数,如下所示
如果您在 3.8 之前的 Python 版本上运行此代码,您的代码仍将正常工作,但 deterministic=True
参数将被忽略。
默认情况下,注册同名同参数的函数不会产生任何影响 - Database
实例会跟踪已注册的函数,并在第二次调用 @db.register_function
时跳过注册。
@db.register_function(deterministic=True, replace=True)
def reverse_string(s):
return s[::-1]
如果您想特意用新实现替换已注册的函数,请使用 replace=True
参数
Unexpected error: user-defined function raised exception
用户定义函数中发生的异常默认返回以下错误
from sqlite_utils.utils import sqlite3
sqlite3.enable_callback_tracebacks(True)
您可以通过在自定义函数执行之前执行以下代码,使 sqlite3
返回更有用的错误,包括来自自定义函数的 traceback
在几乎所有情况下,您都应该使用 db.query()
的可选 parameters
参数将值传递给您的 SQL 查询,如传递参数中所述。
>>> db = Database(memory=True)
>>> db.quote("hello")
"'hello'"
>>> db.quote("hello'this'has'quotes")
"'hello''this''has''quotes'"
如果该选项不适用于您的用例,您可以使用 db.quote()
方法引用字符串以在 SQLite 中使用,如下所示
-
sqlite_utils.utils.rows_from_file()
助手函数可以从 CSV、TSV、JSON 或按行分隔的 JSON 文件中读取行(字典序列)。 sqlite_utils.utils.rows_from_file(fp, format=None, dialect=None, encoding=None, ignore_extras=False, extras_key=None)[source]
from sqlite_utils.utils import rows_from_file import io rows, format = rows_from_file(io.StringIO("id,name\n1,Cleo"))) print(list(rows), format) # Outputs [{'id': '1', 'name': 'Cleo'}] Format.CSV
从包含四种不同格式之一的文件类对象中加载字典序列。
默认情况下,它会尝试自动检测数据的格式,或者您可以使用 format= 选项传递明确的格式。
class Format(enum.Enum): CSV = 1 TSV = 2 JSON = 3 NL = 4
返回一个元组
(rows_generator, format_used)
,其中rows_generator
可以迭代返回字典,而format_used
是来自sqlite_utils.utils.Format
枚举的值如果 CSV 或 TSV 文件中的行包含的字段数多于标头中声明的字段数,当您遍历生成器时,将引发
sqlite_utils.utils.RowError
异常。您也可以通过传递
ignore_extras=True
来忽略额外的数据。- 或者传递
extras_key="rest"
将这些额外的值放入一个名为rest
的键的列表中。 参数:
fp (BinaryIO) – 包含二进制数据的文件类对象
format (Format | None) – 要使用的格式 - 省略此项以检测格式
dialect (Type[Dialect] | None) – 要使用的 CSV 方言 - 省略此项以检测方言
encoding (str | None) – 读取 CSV/TSV 数据时要使用的字符编码
ignore_extras (bool | None) – 忽略行上的任何额外字段
- extras_key (str | None) – 将任何额外字段放入以此键命名的列表中
返回类型:
- 或者传递
Tuple[Iterable[dict], Format]
_csv.Error: field larger than field limit (131072)
有时处理包含极长字段的 CSV 文件时,您可能会看到如下错误
Python 标准库 csv
模块强制执行字段大小限制。您可以使用 csv.field_size_limit(new_limit)
方法(此处有文档)增加该限制,但如果您不想选择新的级别,可以将其增加到最大可能值。
此值的最大可能值未记录,并且因系统而异。
from sqlite_utils.utils import maximize_csv_field_size_limit
maximize_csv_field_size_limit()
调用 sqlite_utils.utils.maximize_csv_field_size_limit()
将值设置为当前系统的最高可能值
from sqlite_utils.utils import ORIGINAL_CSV_FIELD_SIZE_LIMIT
import csv
csv.field_size_limit(ORIGINAL_CSV_FIELD_SIZE_LIMIT)
如果在调用此函数后需要重置为原始值,可以这样做
有时您会发现自己处理缺乏类型信息的数据 - 例如来自 CSV 文件的数据。
TypeTracker
类可用于尝试自动识别最初表示为字符串的数据的最可能类型。
import csv, io
csv_file = io.StringIO("id,name\n1,Cleo\n2,Cardi")
rows = list(csv.DictReader(csv_file))
# rows is now this:
# [{'id': '1', 'name': 'Cleo'}, {'id': '2', 'name': 'Cardi'}]
考虑这个例子
from sqlite_utils import Database
db = Database(memory=True)
db["creatures"].insert_all(rows)
print(db.schema)
# Outputs:
# CREATE TABLE [creatures] (
# [id] TEXT,
# [name] TEXT
# );
如果我们将这些数据直接插入到表中,我们将得到一个完全由 TEXT
列组成的模式
from sqlite_utils.utils import TypeTracker
tracker = TypeTracker()
db["creatures2"].insert_all(tracker.wrap(rows))
print(tracker.types)
# Outputs {'id': 'integer', 'name': 'text'}
我们可以使用 TypeTracker
实例检测最佳列类型
db["creatures2"].transform(types=tracker.types)
print(db["creatures2"].schema)
# Outputs:
# CREATE TABLE [creatures2] (
# [id] INTEGER,
# [name] TEXT
# );
然后,我们可以使用table.transform()方法将这些类型应用于我们的新表
SpatiaLite 是 SQLite 的地理扩展(类似于 PostgreSQL + PostGIS)。使用它需要查找、加载和初始化扩展,向现有表添加几何列,并可选地创建空间索引。这里的实用程序有助于简化该设置。
- 初始化 SpatiaLite¶
Database.init_spatialite(path=None)[source]
init_spatialite
方法将加载和初始化 SpatiaLite 扩展。path
参数应为已编译扩展的绝对路径,可以使用find_spatialite
找到该路径。from sqlite_utils.db import Database from sqlite_utils.utils import find_spatialite db = Database("mydb.db") db.init_spatialite(find_spatialite())
如果 SpatiaLite 成功初始化,则返回
True
。from sqlite_utils.db import Database from sqlite_utils.utils import find_spatialite db = Database("mydb.db") db.init_spatialite("./local/mod_spatialite.dylib")
- 或者传递
extras_key="rest"
将这些额外的值放入一个名为rest
的键的列表中。 如果您在意外的位置安装了 SpatiaLite(例如,用于测试备用版本),可以传入一个绝对路径
- extras_key (str | None) – 将任何额外字段放入以此键命名的列表中
path (str | None) – SpatiaLite 模块在磁盘上的路径
- 或者传递
bool
- 查找 SpatiaLite¶
sqlite_utils.utils.find_spatialite()[source]¶
find_spatialite()
函数在一些常见位置搜索 SpatiaLite SQLite 扩展。它返回位置的字符串路径,如果未找到 SpatiaLite,则返回None
。from sqlite_utils import Database from sqlite_utils.utils import find_spatialite db = Database("mydb.db") spatialite = find_spatialite() if spatialite: db.conn.enable_load_extension(True) db.conn.load_extension(spatialite) # or use with db.init_spatialite like this db.init_spatialite(find_spatialite())
- extras_key (str | None) – 将任何额外字段放入以此键命名的列表中
您可以在代码中这样使用它
str | None
- 添加几何列¶
Table.add_geometry_column(column_name, geometry_type, srid=4326, coord_dimension='XY', not_null=False)[source]
在 SpatiaLite 中,几何列只能添加到现有表中。要执行此操作,请使用
table.add_geometry_column
,并传入几何类型。默认情况下,这将使用 SRID 4326 添加一个可为空的列。可以使用
column_name
、srid
和not_null
参数自定义此设置。from sqlite_utils.db import Database from sqlite_utils.utils import find_spatialite db = Database("mydb.db") db.init_spatialite(find_spatialite()) # the table must exist before adding a geometry column table = db["locations"].create({"name": str}) table.add_geometry_column("geometry", "POINT")
- 或者传递
extras_key="rest"
将这些额外的值放入一个名为rest
的键的列表中。 如果成功添加列,则返回
True
,否则返回False
。column_name (str) – 要添加的列名
geometry_type (str) – 几何列的类型,例如
"GEOMETRY"
或"POINT" 或 ``"POLYGON"
srid (int) – 整数 SRID,默认为 WGS84 的 4326
coord_dimension (str) – 要使用的维度,默认为
"XY"
- 设置为"XYZ"
以在三维空间中工作
- extras_key (str | None) – 将任何额外字段放入以此键命名的列表中
path (str | None) – SpatiaLite 模块在磁盘上的路径
- 或者传递
not_null (bool) – 列是否为 NOT NULL
- 创建空间索引¶
Table.create_spatial_index(column_name)[source]
空间索引可以显着加快边界框查询。要创建空间索引,请使用
create_spatial_index
并传入现有几何列的名称。# assuming SpatiaLite is loaded, create the table, add the column table = db["locations"].create({"name": str}) table.add_geometry_column("geometry", "POINT") # now we can index it table.create_spatial_index("geometry") # the spatial index is a virtual table, which we can inspect print(db["idx_locations_geometry"].schema) # outputs: # CREATE VIRTUAL TABLE "idx_locations_geometry" USING rtree(pkid, xmin, xmax, ymin, ymax)
- 或者传递
extras_key="rest"
将这些额外的值放入一个名为rest
的键的列表中。 如果成功创建索引,则返回
True
,否则返回False
。如果索引已存在,则调用此函数无效。- extras_key (str | None) – 将任何额外字段放入以此键命名的列表中
path (str | None) – SpatiaLite 模块在磁盘上的路径
- 或者传递