Lakehouse 格式(通常称为开放表格式)是用于在对象存储中存储数据,同时保持 ACID 事务或保留快照历史记录等保证的规范。随着时间的推移,出现了多种 Lakehouse 格式,每种格式都有其独特的元数据(又称目录)管理方法。在本页中,我们将介绍 DuckDB 对其中一些格式的支持,以及你可以使用的一些变通方法,以便在继续使用 DuckDB 的同时,实现与这些格式接近完全的互操作性。
DuckDB Lakehouse 支持矩阵
DuckDB 将 Iceberg、Delta 和 DuckLake 作为一等公民进行支持。下表展示了 DuckDB 通过核心扩展原生支持的功能。
| DuckLake | Iceberg | Delta | |
|---|---|---|---|
| 扩展 | ducklake |
iceberg |
delta |
| 读取 | |||
| 写入 | |||
| 删除 | |||
| 更新 | |||
| Upsert(更新插入) | |||
| 创建表 | |||
| 创建分区表 | |||
| 附加到目录 | * | ||
| 重命名表 | |||
| 重命名列 | |||
| 添加/删除列 | |||
| 更改列类型 | |||
| 压缩与维护 | |||
| 加密 | |||
| 管理表属性 | |||
| 时间旅行 (Time travel) | |||
| 查询表变更 |
* 通过 unity_catalog 扩展。
DuckDB 旨在构建具有最小依赖项的原生扩展。例如,iceberg 扩展不依赖于第三方 Iceberg 库,这意味着所有数据和元数据操作都是在 DuckDB 扩展中原生实现的。对于 delta 扩展,我们使用了 delta-kernel-rs 项目,它旨在为引擎构建尽可能接近原生的 Delta 集成提供轻量级平台。
为什么原生实现很重要? 原生实现允许 DuckDB 进行更多的性能优化,例如复杂的谓词下推(通过文件级和行组级修剪)并改善内存管理。
针对不支持功能的变通方法
如果 DuckDB 核心扩展无法满足你的使用场景,你仍然可以使用 DuckDB 处理数据,并利用外部库来帮助完成不支持的操作。如果你使用 Python 客户端,有一些非常出色的现成库可以为你提供帮助。这些示例都有一个共同点:它们使用 Arrow 作为与 DuckDB 进行高效、零拷贝数据交换的接口。
在 DuckDB 中使用 PyIceberg
在此示例中,我们将使用 PyIceberg 来创建和更改表模式(Schema),并使用 DuckDB 对表进行读取和写入。
点击此处查看完整示例。
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
)
import duckdb
# Create a table with PyIceberg
catalog = load_catalog(
"docs",
**{
"uri": "http://127.0.0.1:8181",
"s3.endpoint": "http://127.0.0.1:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
}
)
schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False)
)
catalog.create_table(
identifier="default.bids",
schema=schema,
partition_spec=partition_spec,
)
# Write and read the table with DuckDB
with duckdb.connect() as conn:
conn.execute("""
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL false
);
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);
""")
conn.execute("""
INSERT INTO my_datalake.default.bids VALUES ('2024-01-01 10:00:00', 'AAPL', 150.0, 150.5);
""")
conn.sql("SELECT * FROM my_datalake.default.bids;").show()
# Alter schema with PyIceberg
table = catalog.load_table("default.bids")
with table.update_schema() as update:
update.add_column("retries", IntegerType(), "Number of retries to place the bid")
在 DuckDB 中使用 delta-rs
在此示例中,我们使用 delta-rs Python 绑定创建一个 Delta 表,然后使用 DuckDB 的 delta 扩展进行读取。我们还展示了如何使用 DuckDB 执行其他读取操作,例如使用 Arrow 零拷贝集成读取变更数据馈送(Change Data Feed)。如果读取的数据量较大,通过使用 Arrow Datasets,此操作也可以是惰性的(Lazy)。
点击此处查看完整示例。
import deltalake as dl
import pyarrow as pa
# Create a delta table and read it with DuckDB Delta extension
dl.write_deltalake(
"tmp/some_table",
pa.table({
"id": [1, 2, 3],
"value": ["a", "b", "c"]
})
)
with duckdb.connect() as conn:
conn.execute("""
INSTALL delta;
LOAD delta;
""")
conn.sql("""
SELECT * FROM delta_scan('tmp/some_table')
""").show()
# Append some data and read the data change feed using the PyArrow integration
dl.write_deltalake(
"tmp/some_table",
pa.table({
"id": [4, 5],
"value": ["d", "e"]
}),
mode="append"
)
table = dl.DeltaTable("tmp/some_table").load_cdf(starting_version=1, ending_version=2)
with duckdb.connect() as conn:
conn.register("t", table)
conn.sql("SELECT * FROM t").show()