Published on

使用 sqlite-utils upsert 失败 Troubleshooting

Authors
  • avatar
    Name
    Wang Zhiwei
    Twitter

借助 sqlite-utils 实现缓存类:

class Cache:
    _columns: Dict = dict()
    _pk: Tuple = tuple()
    _not_null: Set = set()
    _defaults: Dict = dict()
    _column_order: List = list()
    _json_columns: Set = set()

    def __init__(self, sqlite_db: str, name: str, recreate: bool = False):
        self.name = name
        self.db = Database(sqlite_db, recreate=recreate)
        self.table = self.db.create_table(
            name=name,
            columns=self._columns,
            pk=self._pk,
            not_null=self._not_null,
            defaults=self._defaults,
            column_order=self._column_order,
            if_not_exists=True,
        )
	   ...


class TaskManager(Cache):
    _columns = {
        "task_id": str,
        "db_name": str,
        "ms_table_name": str,
        "rs_table_name": str,
        "local_path": str,
        "s3_path": str,
        "file_size": int,
        "need_staging": int,
        "bcp_status": int,
        "split_status": int,
        "compress_status": int,
        "s3_upload_status": int,
        "redshift_load_status": int,
        "redshift_staging_status": int,
        "bcp_cost": float,
        "split_cost": float,
        "compress_cost": float,
        "s3_upload_cost": float,
        "redshift_load_cost": float,
        "redshift_staging_cost": float,
    }
    _pk: str = "task_id"
    _not_null: Set = {"task_id", "db_name", "ms_table_name", "rs_table_name"}
    _column_order: List = list(_columns.keys())
    _defaults = {
        "file_size": 0,
        "need_staging": 0,
        "bcp_status": 0,
        "split_status": 0,
        "compress_status": 0,
        "s3_upload_status": 0,
        "redshift_load_status": 0,
        "redshift_staging_status": 0,
        "bcp_cost": 0,
        "split_cost": 0,
        "compress_cost": 0,
        "s3_upload_cost": 0,
        "redshift_load_cost": 0,
        "redshift_staging_cost": 0,
    }

    def __init__(self, sqlite_db: str = "tasks.sqlite", name: str = "tasks", recreate: bool = False):
        super().__init__(sqlite_db, name, recreate)
		...

新增一条记录

tm = TaskManager()
record = {
        "task_id": "ab0dff35eff14ea43c4a545ff4f29363",
        "db_name": "PRESTAGE",
        "ms_table_name": "[COACHING].[Dictionary]",
        "rs_table_name": "\"enriched_prestage_coaching\".\"dictionary\"",
        "local_path": "xxx",
        "s3_path": "s3://xxx",
        "file_size": 87521,
        "need_staging": 0,
        "bcp_status": 1,
        "split_status": 1,
        "compress_status": 1,
        "s3_upload_status": 1,
        "redshift_load_status": 1,
        "redshift_staging_status": 0,
        "bcp_cost": 0.05672574043273926,
        "split_cost": 0,
        "compress_cost": 0,
        "s3_upload_cost": 1.8196437358856201,
        "redshift_load_cost": 14.933900356292725,
        "redshift_staging_cost": 0
}
tm.table.upsert(record, pk="task_id")

上述操作没有任务报错,但是却并未在 tasks 表中看到任何新增数据,很奇怪,那就通过打断点的方式逐步去 DEBUG。

sqlite-utils Table 类的 upsert 方法调用链:

Table.upsert
-> Table.upsert_all
-> Table.insert_all
-> Table.insert_chunk

insert_chunk 函数中进行数据插入的主要部分代码:

queries_and_params = self.build_insert_queries_and_params(
            extracts,
            chunk,
            all_columns,
            hash_id,
            hash_id_columns,
            upsert,
            pk,
            conversions,
            num_records_processed,
            replace,
            ignore,
)

with self.db.conn:
     result = None
     for query, params in queries_and_params:
	       try:
	           result = self.db.execute(query, params)
				 ...

queries_and_params 的值:

[
    [
        "INSERT OR IGNORE INTO [tasks]([task_id]) VALUES(?);",
        [
            "ab0dff35eff14ea43c4a545ff4f29363"
        ]
    ],
    [
        "UPDATE [tasks] SET [bcp_cost] = ?, [bcp_status] = ?, [compress_cost] = ?, [compress_status] = ?, [db_name] = ?, [file_size] = ?, [local_path] = ?, [ms_table_name] = ?, [need_staging] = ?, [redshift_load_cost] = ?, [redshift_load_status] = ?, [redshift_staging_cost] = ?, [redshift_staging_status] = ?, [rs_table_name] = ?, [s3_path] = ?, [s3_upload_cost] = ?, [s3_upload_status] = ?, [split_cost] = ?, [split_status] = ? WHERE [task_id] = ?",
        [
            0.05672574043273926,
            1,
            0,
            1,
            "PRESTAGE",
            87521,
            "xxx",
            "[COACHING].[Dictionary]",
            0,
            14.933900356292725,
            1,
            0,
            0,
            "\"enriched_prestage_coaching\".\"dictionary\"",
            "xxx",
            1.8196437358856201,
            1,
            0,
            1,
            "ab0dff35eff14ea43c4a545ff4f29363"
        ]
    ]
]

可以看到,一次 upsert 插入,最后被拆分成了两条 SQL 语句去执行。第一步插入一条只包含主键的记录,然后再更新一条记录的其它字段值。

INSERT OR IGNORE INTO [tasks]([task_id]) VALUES(?);
UPDATE [tasks] SET [bcp_cost] = ?, [bcp_status] = ?, [compress_cost] = ?, [compress_status] = ?, [db_name] = ?, [file_size] = ?, [local_path] = ?, [ms_table_name] = ?, [need_staging] = ?, [redshift_load_cost] = ?, [redshift_load_status] = ?, [redshift_staging_cost] = ?, [redshift_staging_status] = ?, [rs_table_name] = ?, [s3_path] = ?, [s3_upload_cost] = ?, [s3_upload_status] = ?, [split_cost] = ?, [split_status] = ? WHERE [task_id] = ?

问题在于插入使用了 INSERT OR IGNORE INTO 的语法,插入失败的话也会忽略部分异常,不会提示用户。这里的异常是,对于 tasks 表定义了非空字段 "task_id", "db_name", "ms_table_name", "rs_table_name" ,在执行第一步 INSERT 时只给了主键的值,违背了 NOT NULL 约束,所以无法插入成功,又因为用了 OR IGNORE 语法所以直接忽略了异常不反馈给用户。

在 MySQL 中有 INSERT INTO ON DUPLICATE 语法实现 upsert,是否 SQLite 不支持所以 sqlite-utils 中才这样实现。

Google 搜索关键词 sqlite insert into on duplicate key update 显示的第一条结果为来自 Stack Overflow 的提问,高赞回答提供了相同的 Solution。

INSERT OR IGNORE INTO visits VALUES ($ip, 0);
UPDATE visits SET hits = hits + 1 WHERE ip LIKE $ip;

https://stackoverflow.com/a/2718352/7700479


在另一个后提交的答案中提到, SQLite version 3.24.0 (2018-06-04) 增加了 upsert 语法支持。

https://www.sqlite.org/draft/lang_UPSERT.html


sqlite-utils upsert 实现的代码提交日期晚于 2018-06-04,作者按理不会不知道 SQLite 的新 feature,我在 commit 里搜索了一番,最后找到这个 issue。


作者描述到,sqlite-utils upsert 方法实现的功能非 SQLite upsert 语法声明,方法命名不恰当。早期的 upsert 方法实现基于 INSERT OR REPLACE INTO 语法, 有些问题,后期改成了 INSERT OR IGNORE INTO + UPDATE 的实现。

下面是一段 SQLite 官方文档中关于 INSERT OR REPLACE INTO 的描述:

When a UNIQUE or PRIMARY KEY constraint violation occurs, the REPLACE algorithm deletes pre-existing rows that are causing the constraint violation prior to inserting or updating the current row and the command continues executing normally. If a NOT NULL constraint violation occurs, the REPLACE conflict resolution replaces the NULL value with the default value for that column

https://www.sqlite.org/lang_conflict.html