DuckDB ADBC 客户端的最新稳定版本为 1.5.0。
Arrow 数据库连接 (ADBC) 与 ODBC 和 JDBC 类似,是一种 C 风格的 API,旨在实现不同数据库系统之间的代码可移植性。这使得开发人员可以轻松构建与数据库系统通信的应用程序,而无需针对特定系统编写代码。ADBC 与 ODBC/JDBC 的主要区别在于,ADBC 使用 Arrow 在数据库系统和应用程序之间传输数据。DuckDB 拥有一个 ADBC 驱动程序,利用了 DuckDB 与 Arrow 之间的零拷贝集成来高效传输数据。
请参阅 ADBC 文档页面,了解关于 ADBC 的更深入讨论及详细的 API 说明。
已实现的功能
DuckDB-ADBC 驱动程序实现了完整的 ADBC 规范,但 ConnectionReadPartition 和 StatementExecutePartitions 函数除外。这两个函数存在是为了支持那些在内部对查询结果进行分区的系统,这不适用于 DuckDB。在本节中,我们将描述 ADBC 中存在的主要功能,以及它们所需的参数,并为每个函数提供示例。
数据库
对数据库进行操作的一组函数。
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
DatabaseNew |
分配一个新的(但未初始化)数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseNew(&adbc_database, &adbc_error) |
DatabaseSetOption |
设置 char* 选项。 | (AdbcDatabase *database, const char *key, const char *value, AdbcError *error) |
AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error) |
DatabaseInit |
完成设置选项并初始化数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseInit(&adbc_database, &adbc_error) |
DatabaseRelease |
销毁数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseRelease(&adbc_database, &adbc_error) |
连接
用于创建和销毁与数据库交互的连接的一组函数。
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
ConnectionNew |
分配一个新的(但未初始化)连接。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionNew(&adbc_connection, &adbc_error) |
ConnectionSetOption |
选项可以在 ConnectionInit 之前设置。 | (AdbcConnection*, const char*, const char*, AdbcError*) |
AdbcConnectionSetOption(&adbc_connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &adbc_error) |
ConnectionInit |
完成设置选项并初始化连接。 | (AdbcConnection*, AdbcDatabase*, AdbcError*) |
AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error) |
ConnectionRelease |
销毁此连接。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionRelease(&adbc_connection, &adbc_error) |
一组用于检索数据库元数据的函数。通常,这些函数会返回 Arrow 对象,具体为 ArrowArrayStream。
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
ConnectionGetObjects |
获取所有目录、数据库模式、表和列的分层视图。 | (AdbcConnection*, int, const char*, const char*, const char*, const char**, const char*, ArrowArrayStream*, AdbcError*) |
AdbcDatabaseInit(&adbc_database, &adbc_error) |
ConnectionGetTableSchema |
获取表的 Arrow Schema。 | (AdbcConnection*, const char*, const char*, const char*, ArrowSchema*, AdbcError*) |
AdbcDatabaseRelease(&adbc_database, &adbc_error) |
ConnectionGetTableTypes |
获取数据库中的表类型列表。 | (AdbcConnection*, ArrowArrayStream*, AdbcError*) |
AdbcDatabaseNew(&adbc_database, &adbc_error) |
一组具有连接事务语义的函数。默认情况下,所有连接都以自动提交模式启动,但这可以通过 ConnectionSetOption 函数关闭。
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
ConnectionCommit |
提交任何挂起的事务。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionCommit(&adbc_connection, &adbc_error) |
ConnectionRollback |
回滚任何挂起的事务。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionRollback(&adbc_connection, &adbc_error) |
语句 (Statement)
语句保持与查询执行相关的状态。它们既代表一次性查询,也代表预编译语句。它们可以被复用;但是,这样做会使该语句之前的任何结果集失效。
用于创建、销毁和设置语句选项的函数
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
StatementNew |
为给定连接创建一个新语句。 | (AdbcConnection*, AdbcStatement*, AdbcError*) |
AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error) |
StatementRelease |
销毁一个语句。 | (AdbcStatement*, AdbcError*) |
AdbcStatementRelease(&adbc_statement, &adbc_error) |
StatementSetOption |
在语句上设置字符串选项。 | (AdbcStatement*, const char*, const char*, AdbcError*) |
StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "TABLE_NAME", &adbc_error) |
与查询执行相关的函数
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
StatementSetSqlQuery |
设置要执行的 SQL 查询。然后可以使用 StatementExecuteQuery 执行该查询。 | (AdbcStatement*, const char*, AdbcError*) |
AdbcStatementSetSqlQuery(&adbc_statement, "SELECT * FROM TABLE", &adbc_error) |
StatementSetSubstraitPlan |
设置要执行的 Substrait 计划。然后可以使用 StatementExecuteQuery 执行该查询。 | (AdbcStatement*, const uint8_t*, size_t, AdbcError*) |
AdbcStatementSetSubstraitPlan(&adbc_statement, substrait_plan, length, &adbc_error) |
StatementExecuteQuery |
执行一个语句并获取结果。 | (AdbcStatement*, ArrowArrayStream*, int64_t*, AdbcError*) |
AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error) |
StatementPrepare |
将此语句转换为预编译语句,以便多次执行。 | (AdbcStatement*, AdbcError*) |
AdbcStatementPrepare(&adbc_statement, &adbc_error) |
与绑定相关的函数,用于批量插入或预编译语句。
| 函数名 | 描述 | 参数 | 示例 |
|---|---|---|---|
StatementBindStream |
绑定 Arrow Stream。这可用于批量插入或预编译语句。 | (AdbcStatement*, ArrowArrayStream*, AdbcError*) |
StatementBindStream(&adbc_statement, &input_data, &adbc_error) |
设置 DuckDB ADBC 驱动程序
在使用 DuckDB 作为 ADBC 驱动程序之前,必须在系统上安装 libduckdb 共享库,并使其可供应用程序使用。该库包含 ADBC 驱动程序与之交互的 DuckDB 核心引擎。
下载 libduckdb
从 DuckDB 发布页面下载适合您平台的 libduckdb 库
- Linux:
libduckdb-linux-amd64.zip(包含libduckdb.so) - macOS:
libduckdb-osx-universal.zip(包含libduckdb.dylib) - Windows:
libduckdb-windows-amd64.zip(包含duckdb.dll)
解压压缩包以获取共享库文件。
安装库
Linux
- 从下载的压缩包中解压
libduckdb.so文件 -
确保您的代码可以使用该库。您可以:
-
将其复制到系统库目录(需要 root 权限)
sudo cp libduckdb.so /usr/local/lib/ sudo ldconfig -
或将其放在自定义目录中,并将该目录添加到您的
LD_LIBRARY_PATHmkdir -p ~/lib cp libduckdb.so ~/lib/ export LD_LIBRARY_PATH=~/lib:$LD_LIBRARY_PATH
-
macOS
- 从下载的压缩包中解压
libduckdb.dylib文件 -
确保您的代码可以使用该库。您可以:
-
将其复制到系统库目录
sudo cp libduckdb.dylib /usr/local/lib/ -
或将其放在自定义目录中,并将该目录添加到您的
DYLD_LIBRARY_PATHmkdir -p ~/lib cp libduckdb.dylib ~/lib/ export DYLD_LIBRARY_PATH=~/lib:$DYLD_LIBRARY_PATH
-
Windows
- 从下载的压缩包中解压
duckdb.dll文件 - 将其放置在以下位置之一
- 与您的应用程序可执行文件相同的目录
PATH环境变量中列出的目录- Windows 系统目录(例如
C:\Windows\System32)
理解库路径
LD_LIBRARY_PATH (Linux) 和 DYLD_LIBRARY_PATH (macOS) 是环境变量,用于告知系统在运行时到何处查找共享库。当您的应用程序尝试加载 libduckdb 时,系统会搜索这些路径以定位库文件。
验证安装
您可以验证库是否已正确安装且可访问
Linux/macOS
ldd path/to/your/application # Linux
otool -L path/to/your/application # macOS
示例
无论使用何种编程语言,使用 DuckDB 的 ADBC 都需要两个数据库选项。第一个是 driver,它采用 DuckDB 库的路径(有关安装说明,请参阅上方的设置 DuckDB ADBC 驱动程序)。第二个选项是 entrypoint,它是 DuckDB-ADBC 驱动程序导出的一个函数,用于初始化所有 ADBC 函数。配置这两个选项后,我们可以选择设置 path 选项,提供磁盘上的路径以存储 DuckDB 数据库。如果未设置,则会创建一个内存数据库。配置所有必要选项后,我们就可以继续初始化数据库了。以下是如何在各种不同的语言环境中进行此操作。
C++
我们通过声明通过 ADBC 查询数据所需的基本变量来开始 C++ 示例。这些变量包括错误处理、数据库、连接、语句处理以及用于在 DuckDB 和应用程序之间传输数据的 Arrow Stream。
AdbcError adbc_error;
AdbcDatabase adbc_database;
AdbcConnection adbc_connection;
AdbcStatement adbc_statement;
ArrowArrayStream arrow_stream;
然后我们可以初始化我们的数据库变量。在初始化数据库之前,我们需要如上所述设置 driver 和 entrypoint 选项。然后我们设置 path 选项并初始化数据库。driver 选项应指向您安装的 libduckdb 库 – 有关安装说明,请参阅设置 DuckDB ADBC 驱动程序。
AdbcDatabaseNew(&adbc_database, &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "driver", "path/to/libduckdb.dylib", &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "entrypoint", "duckdb_adbc_init", &adbc_error);
// By default, we start an in-memory database, but you can optionally define a path to store it on disk.
AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error);
AdbcDatabaseInit(&adbc_database, &adbc_error);
初始化数据库后,我们必须创建并初始化与它的连接。
AdbcConnectionNew(&adbc_connection, &adbc_error);
AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error);
现在我们可以初始化语句并通过连接运行查询。在 AdbcStatementExecuteQuery 之后,arrow_stream 将填充结果。
AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error);
AdbcStatementSetSqlQuery(&adbc_statement, "SELECT 42", &adbc_error);
int64_t rows_affected;
AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error);
arrow_stream.release(arrow_stream)
除了运行查询外,我们还可以通过 arrow_streams 摄取数据。为此,我们需要设置一个带有我们要插入的表名的选项,绑定流,然后执行查询。
StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "AnswerToEverything", &adbc_error);
StatementBindStream(&adbc_statement, &arrow_stream, &adbc_error);
StatementExecuteQuery(&adbc_statement, nullptr, nullptr, &adbc_error);
Python
首先要做的是使用 pip 安装 ADBC 驱动程序管理器。您还需要安装 pyarrow 以直接访问 Apache Arrow 格式的结果集(例如使用 fetch_arrow_table)。
pip install adbc_driver_manager pyarrow
有关
adbc_driver_manager包的详细信息,请参阅adbc_driver_manager包文档。
与 C++ 一样,我们需要提供包含 libduckdb 共享对象位置和入口点函数的初始化选项。请注意,DuckDB 的 path 参数是通过 db_kwargs 字典传递的。
import adbc_driver_duckdb.dbapi
with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
cur.execute("SELECT 42")
# fetch a pyarrow table
tbl = cur.fetch_arrow_table()
print(tbl)
除了 fetch_arrow_table 之外,DBApi 的其他方法也在游标上实现,例如 fetchone 和 fetchall。数据也可以通过 arrow_streams 摄取。我们只需要在语句上设置选项以绑定数据流并执行查询即可。
import adbc_driver_duckdb.dbapi
import pyarrow
data = pyarrow.record_batch(
[[1, 2, 3, 4], ["a", "b", "c", "d"]],
names = ["ints", "strs"],
)
with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
cur.adbc_ingest("AnswerToEverything", data)
Go
确保首先安装 libduckdb 库 – 有关详细的安装说明,请参阅设置 DuckDB ADBC 驱动程序。
以下示例使用内存中 DuckDB 数据库通过 SQL 查询修改内存中 Arrow RecordBatch
package main
import (
"bytes"
"context"
"fmt"
"io"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/drivermgr"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/memory"
)
func _makeSampleArrowRecord() arrow.Record {
b := array.NewFloat64Builder(memory.DefaultAllocator)
b.AppendValues([]float64{1, 2, 3}, nil)
col := b.NewArray()
defer col.Release()
defer b.Release()
schema := arrow.NewSchema([]arrow.Field{{Name: "column1", Type: arrow.PrimitiveTypes.Float64}}, nil)
return array.NewRecord(schema, []arrow.Array{col}, int64(col.Len()))
}
type DuckDBSQLRunner struct {
ctx context.Context
conn adbc.Connection
db adbc.Database
}
func NewDuckDBSQLRunner(ctx context.Context) (*DuckDBSQLRunner, error) {
var drv drivermgr.Driver
db, err := drv.NewDatabase(map[string]string{
"driver": "duckdb",
"entrypoint": "duckdb_adbc_init",
"path": ":memory:",
})
if err != nil {
return nil, fmt.Errorf("failed to create new in-memory DuckDB database: %w", err)
}
conn, err := db.Open(ctx)
if err != nil {
return nil, fmt.Errorf("failed to open connection to new in-memory DuckDB database: %w", err)
}
return &DuckDBSQLRunner{ctx: ctx, conn: conn, db: db}, nil
}
func serializeRecord(record arrow.Record) (io.Reader, error) {
buf := new(bytes.Buffer)
wr := ipc.NewWriter(buf, ipc.WithSchema(record.Schema()))
if err := wr.Write(record); err != nil {
return nil, fmt.Errorf("failed to write record: %w", err)
}
if err := wr.Close(); err != nil {
return nil, fmt.Errorf("failed to close writer: %w", err)
}
return buf, nil
}
func (r *DuckDBSQLRunner) importRecord(sr io.Reader) error {
rdr, err := ipc.NewReader(sr)
if err != nil {
return fmt.Errorf("failed to create IPC reader: %w", err)
}
defer rdr.Release()
_, err = adbc.IngestStream(r.ctx, r.conn, rdr, "temp_table", adbc.OptionValueIngestModeCreate, adbc.IngestStreamOptions{})
return err
}
func (r *DuckDBSQLRunner) runSQL(sql string) ([]arrow.Record, error) {
stmt, err := r.conn.NewStatement()
if err != nil {
return nil, fmt.Errorf("failed to create new statement: %w", err)
}
defer stmt.Close()
if err := stmt.SetSqlQuery(sql); err != nil {
return nil, fmt.Errorf("failed to set SQL query: %w", err)
}
out, n, err := stmt.ExecuteQuery(r.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer out.Release()
result := make([]arrow.Record, 0, n)
for out.Next() {
rec := out.Record()
rec.Retain() // .Next() will release the record, so we need to retain it
result = append(result, rec)
}
if out.Err() != nil {
return nil, out.Err()
}
return result, nil
}
func (r *DuckDBSQLRunner) RunSQLOnRecord(record arrow.Record, sql string) ([]arrow.Record, error) {
serializedRecord, err := serializeRecord(record)
if err != nil {
return nil, fmt.Errorf("failed to serialize record: %w", err)
}
if err := r.importRecord(serializedRecord); err != nil {
return nil, fmt.Errorf("failed to import record: %w", err)
}
result, err := r.runSQL(sql)
if err != nil {
return nil, fmt.Errorf("failed to run SQL: %w", err)
}
if _, err := r.runSQL("DROP TABLE temp_table"); err != nil {
return nil, fmt.Errorf("failed to drop temp table after running query: %w", err)
}
return result, nil
}
func (r *DuckDBSQLRunner) Close() {
r.conn.Close()
r.db.Close()
}
func main() {
rec := _makeSampleArrowRecord()
fmt.Println(rec)
runner, err := NewDuckDBSQLRunner(context.Background())
if err != nil {
panic(err)
}
defer runner.Close()
resultRecords, err := runner.RunSQLOnRecord(rec, "SELECT column1+1 FROM temp_table")
if err != nil {
panic(err)
}
for _, resultRecord := range resultRecords {
fmt.Println(resultRecord)
resultRecord.Release()
}
}
运行它会产生以下输出
record:
schema:
fields: 1
- column1: type=float64
rows: 3
col[0][column1]: [1 2 3]
record:
schema:
fields: 1
- (column1 + 1): type=float64, nullable
rows: 3
col[0][(column1 + 1)]: [2 3 4]