Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/arrow/llms.txt
Use this file to discover all available pages before exploring further.
Acero is Arrow’s query engine that enables the execution of complex analytical queries on Arrow data. It provides a declarative API for building and executing query plans.
Core Concepts
Execution Plans
An execution plan is a directed acyclic graph (DAG) of nodes that process data:
- Source nodes: Generate data (e.g., from tables, files, datasets)
- Transform nodes: Modify data (e.g., filter, project, aggregate)
- Sink nodes: Consume data (e.g., collect results, write to storage)
Declarations
Declarations describe execution nodes without actually constructing them:
#include <arrow/acero/api.h>
#include <arrow/acero/exec_plan.h>
using namespace arrow::acero;
// Create a simple scan -> filter -> project pipeline
auto table = arrow::TableFromJSON(
arrow::schema({{"a", arrow::int32()}, {"b", arrow::int32()}}),
{R"([{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}])"}
);
// Build declaration sequence
auto declarations = Declaration::Sequence({
{"table_source", TableSourceNodeOptions(table)},
{"filter", FilterNodeOptions(arrow::compute::greater(
arrow::compute::field_ref("a"),
arrow::compute::literal(1)
))},
{"project", ProjectNodeOptions(
{arrow::compute::field_ref("a"),
arrow::compute::field_ref("b")},
{"col_a", "col_b"}
)}
});
// Execute and collect results
auto result_table = DeclarationToTable(declarations);
import pyarrow as pa
import pyarrow.acero as ac
import pyarrow.compute as pc
# Create sample table
table = pa.table({
"a": [1, 2, 3],
"b": [10, 20, 30]
})
# Build declaration
declaration = ac.Declaration.from_sequence([
("table_source", ac.TableSourceNodeOptions(table)),
("filter", ac.FilterNodeOptions(pc.field("a") > 1)),
("project", ac.ProjectNodeOptions(
[pc.field("a"), pc.field("b")],
names=["col_a", "col_b"]
))
])
# Execute and get results
result_table = declaration.to_table()
Building Query Plans
Filtering Data
#include <arrow/acero/exec_plan.h>
using namespace arrow::acero;
// Create execution plan
auto plan = ExecPlan::Make().ValueOrDie();
// Add source node
auto source = MakeExecNode(
"table_source",
plan.get(),
{},
TableSourceNodeOptions(table)
).ValueOrDie();
// Add filter node
auto filter = MakeExecNode(
"filter",
plan.get(),
{source},
FilterNodeOptions(
arrow::compute::and_(
arrow::compute::greater(arrow::compute::field_ref("age"),
arrow::compute::literal(25)),
arrow::compute::equal(arrow::compute::field_ref("city"),
arrow::compute::literal("NYC"))
)
)
).ValueOrDie();
// Start execution
plan->StartProducing();
// Wait for completion
auto finished = plan->finished();
finished.Wait();
import pyarrow.acero as ac
import pyarrow.compute as pc
# Build filter expression
filter_expr = (pc.field("age") > 25) & (pc.field("city") == "NYC")
# Create declaration
declaration = ac.Declaration([
("table_source", ac.TableSourceNodeOptions(table)),
("filter", ac.FilterNodeOptions(filter_expr))
])
result = declaration.to_table()
Aggregation
#include <arrow/acero/options.h>
#include <arrow/compute/api_aggregate.h>
using namespace arrow::acero;
// Group by and aggregate
std::vector<arrow::compute::Aggregate> aggregates = {
{"sum", nullptr, "amount", "total_amount"},
{"mean", nullptr, "price", "avg_price"},
{"count", nullptr, "id", "count"}
};
auto aggregate_decl = Declaration::Sequence({
{"table_source", TableSourceNodeOptions(table)},
{"aggregate", AggregateNodeOptions(
aggregates,
/*keys=*/{"category", "region"}
)}
});
auto result = DeclarationToTable(aggregate_decl);
import pyarrow.acero as ac
# Define aggregations
aggregates = [
("sum", ac.AggregateOptions(), "amount", "total_amount"),
("mean", ac.AggregateOptions(), "price", "avg_price"),
("count", ac.AggregateOptions(), "id", "count")
]
declaration = ac.Declaration([
("table_source", ac.TableSourceNodeOptions(table)),
("aggregate", ac.AggregateNodeOptions(
aggregates,
keys=["category", "region"]
))
])
result = declaration.to_table()
Joins
#include <arrow/acero/options.h>
using namespace arrow::acero;
auto left_table = /* ... */;
auto right_table = /* ... */;
// Hash join on key columns
auto join_decl = Declaration::Sequence({
{"table_source", TableSourceNodeOptions(left_table)},
});
auto right_source = Declaration("table_source",
TableSourceNodeOptions(right_table));
auto join = Declaration("hash_join",
{join_decl, right_source},
HashJoinNodeOptions(
JoinType::INNER,
/*left_keys=*/{"id"},
/*right_keys=*/{"user_id"},
/*left_output=*/{{"id"}, {"name"}},
/*right_output=*/{{"amount"}}
));
auto result = DeclarationToTable(join);
import pyarrow.acero as ac
# Create join
left_source = ac.Declaration(
"table_source",
ac.TableSourceNodeOptions(left_table)
)
right_source = ac.Declaration(
"table_source",
ac.TableSourceNodeOptions(right_table)
)
join_decl = ac.Declaration(
"hash_join",
[left_source, right_source],
ac.HashJoinNodeOptions(
join_type="inner",
left_keys=["id"],
right_keys=["user_id"],
left_output=["id", "name"],
right_output=["amount"]
)
)
result = join_decl.to_table()
Sorting
#include <arrow/acero/options.h>
#include <arrow/compute/api_vector.h>
using namespace arrow::acero;
// Sort by multiple columns
arrow::compute::SortOptions sort_opts({
arrow::compute::SortKey("price", arrow::compute::SortOrder::Descending),
arrow::compute::SortKey("date", arrow::compute::SortOrder::Ascending)
});
auto sort_decl = Declaration::Sequence({
{"table_source", TableSourceNodeOptions(table)},
{"order_by", OrderByNodeOptions(sort_opts)}
});
auto result = DeclarationToTable(sort_decl);
import pyarrow.acero as ac
import pyarrow.compute as pc
# Sort by multiple columns
sort_opts = pc.SortOptions([
("price", "descending"),
("date", "ascending")
])
declaration = ac.Declaration.from_sequence([
("table_source", ac.TableSourceNodeOptions(table)),
("order_by", ac.OrderByNodeOptions(sort_opts))
])
result = declaration.to_table()
Execution Modes
Synchronous Execution
// Execute and wait for completion
auto result = arrow::acero::DeclarationToTable(
declaration,
/*use_threads=*/true
).ValueOrDie();
# Synchronous execution (default)
result = declaration.to_table(use_threads=True)
Asynchronous Execution
#include <arrow/util/future.h>
// Execute asynchronously
arrow::Future<std::shared_ptr<arrow::Table>> future =
arrow::acero::DeclarationToTableAsync(
declaration,
/*use_threads=*/true
);
// Wait for result
auto result = future.result();
import asyncio
# Asynchronous execution
async def execute_query():
result = await declaration.to_table_async(use_threads=True)
return result
result = asyncio.run(execute_query())
Streaming Results
// Get results as a RecordBatchReader
auto reader = arrow::acero::DeclarationToReader(
declaration
).ValueOrDie();
// Process batches as they arrive
while (true) {
auto batch = reader->Next().ValueOrDie();
if (!batch) break;
// Process batch
}
# Get streaming reader
reader = declaration.to_reader()
# Process batches
for batch in reader:
# Process batch
print(batch)
Query Options
Customize query execution with QueryOptions:
#include <arrow/acero/exec_plan.h>
using namespace arrow::acero;
QueryOptions query_opts;
query_opts.use_threads = true;
query_opts.memory_pool = arrow::default_memory_pool();
query_opts.use_legacy_batching = false;
auto result = DeclarationToTable(declaration, query_opts);
# Query options are typically set via function parameters
result = declaration.to_table(
use_threads=True,
memory_pool=pa.default_memory_pool()
)
- Use streaming execution for large datasets to avoid loading everything in memory
- Enable multi-threading with
use_threads=true for parallel processing
- Push down filters early in the pipeline to reduce data volume
- Batch operations to amortize overhead
- Use appropriate join algorithms based on data size and distribution
Next Steps