Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/arrow/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Acero is Apache Arrow’s streaming query execution engine designed for processing large (potentially infinite) data streams. Unlike the compute module which operates on in-memory data, Acero provides a dataflow execution model with pipelined processing and backpressure control.
Acero is experimental - The API is still evolving. While the engine is production-ready for many use cases, breaking changes may occur in future releases.
Architectural Position
Acero sits as the top layer in Arrow’s computational stack:
- Core Arrow: Buffers, arrays, types (no data transformation)
- Compute Module: Scalar and aggregate functions on in-memory data
- Acero: Streaming execution plans combining compute operations
Acero vs. Compute Module
| Aspect | Compute Module | Acero |
|---|
| Input | Complete arrays in memory | Streams of batches |
| Processing | Single function call | Graph of operators |
| Memory | All data loaded | Bounded memory usage |
| Execution | Synchronous | Asynchronous/pipelined |
| Use case | Fast in-memory operations | Large dataset processing |
// Compute Module: Operate on complete data
Result<Datum> result = compute::CallFunction(
"add", {array1, array2}, &exec_context);
// Acero: Stream through execution plan
Declaration plan = Declaration::Sequence({
{"table_source", TableSourceNodeOptions{table}},
{"project", ProjectNodeOptions{{add_expression}}},
{"table_sink", TableSinkNodeOptions{&output_table}}
});
AWAIT_READY(DeclarationToTable(plan));
Core Concepts
ExecBatch
The fundamental unit of data flow in Acero:
// From cpp/src/arrow/compute/exec.h
struct ExecBatch {
// Column data: Array or Scalar
std::vector<Datum> values;
// Number of rows (semantic length)
int64_t length = 0;
// Ordering information
std::optional<int64_t> index;
// Guarantee information
struct Guarantee {
bool all_valid = false; // No nulls
bool all_same = false; // All values identical
};
std::vector<Guarantee> guarantee;
};
Key differences from RecordBatch:
- No schema: Schema is stored at the node level (all batches in a stream share one schema)
- Scalar columns: A
Scalar represents a constant value across all rows
- Batch index: Tracks position in ordered streams
- Guarantees: Optimization hints (e.g., “this column has no nulls”)
Scalar vs Array columns:
// These four ExecBatch representations are semantically equivalent:
// Option 1: All arrays
ExecBatch{.values = {Array[1,2,3], Array[10,10,10]}, .length = 3}
// Option 2: Mixed (constant value as scalar)
ExecBatch{.values = {Array[1,2,3], Scalar(10)}, .length = 3}
// Option 3: All scalars
ExecBatch{.values = {Scalar(1), Scalar(10)}, .length = 3}
// Option 4: Different constant
ExecBatch{.values = {Array[1,2,3], Scalar(5)}, .length = 3}
// This is semantically: [(1,5), (2,5), (3,5)]
Design Rationale: Scalar columns avoid materializing constant values, saving memory and computation. For example, a computed constant in a projection can remain as a scalar through subsequent operations.
ExecNode
The building blocks of execution plans:
// From cpp/src/arrow/acero/exec_plan.h
class ExecNode {
public:
virtual ~ExecNode() = default;
// Node identification
virtual const char* kind_name() const = 0;
const std::string& label() const { return label_; }
// Graph structure
const NodeVector& inputs() const { return inputs_; }
const ExecNode* output() const { return output_; }
// Data schema
const std::shared_ptr<Schema>& output_schema() const;
// Ordering guarantees
const Ordering& ordering() const;
// Lifecycle
virtual Status Validate() const;
virtual void StartProducing() = 0;
virtual void StopProducing() = 0;
// Data flow (push-based)
virtual void InputReceived(ExecNode* input, ExecBatch batch) = 0;
virtual void InputFinished(ExecNode* input, int total_batches) = 0;
protected:
ExecPlan* plan_;
NodeVector inputs_;
ExecNode* output_;
std::shared_ptr<Schema> output_schema_;
std::string label_;
};
Node types:
- Source nodes (0 inputs): Generate or read data
table_source: Emit batches from in-memory table
scan: Read from dataset/files
- Transform nodes (1+ inputs, 1 output): Modify data
project: Compute new columns
filter: Remove rows
aggregate: Compute summaries
join: Combine streams
- Sink nodes (1+ inputs, 0 outputs): Consume data
table_sink: Accumulate into table
write: Write to files
consuming_sink: Custom callback
Push-based execution: Acero uses a push model where source nodes push data downstream through InputReceived() calls. This enables efficient pipelining and backpressure control.
ExecPlan
A graph of connected nodes representing a complete query:
class ExecPlan {
public:
// Maximum batch size (allows 16-bit indices)
static const uint32_t kMaxBatchSize = 1 << 15; // 32768 rows
// Create empty plan
static Result<std::shared_ptr<ExecPlan>> Make(
ExecContext exec_context = *threaded_exec_context());
// Add nodes
ExecNode* AddNode(std::unique_ptr<ExecNode> node);
template <typename Node, typename... Args>
Node* EmplaceNode(Args&&... args);
// Execution control
void StartProducing(); // Begin execution
void StopProducing(); // Request stop
Future<> finished(); // Wait for completion
// Introspection
const NodeVector& nodes() const;
std::string ToString() const;
};
Execution lifecycle:
Startup order: Nodes start in reverse topological order (sinks first, sources last). This ensures backpressure paths are established before data flows.
Declaration
The public API for building execution plans:
// A Declaration is a blueprint for an ExecNode
struct Declaration {
std::string factory_name; // Node type (e.g., "filter")
std::vector<Declaration> inputs; // Input declarations
std::shared_ptr<ExecNodeOptions> options; // Node configuration
std::string label; // Optional debug label
};
// Example: Project node declaration
Declaration project_decl{
"project",
{source_decl}, // Input
ProjectNodeOptions{
{compute::field_ref("x"), compute::field_ref("y")}
}
};
Why Declarations?
- Serializable: Can be converted to/from Substrait plans
- Reusable: Same declaration can instantiate multiple ExecPlans
- Testable: Easier to construct and validate than ExecPlans
- Composable: Build complex plans from simple components
// Compose declarations into pipeline
Declaration pipeline = Declaration::Sequence({
{"table_source", TableSourceNodeOptions{input_table}},
{"filter", FilterNodeOptions{filter_expr}},
{"project", ProjectNodeOptions{project_exprs}},
{"aggregate", AggregateNodeOptions{agg_options}}
});
// Execute the plan
Result<std::shared_ptr<Table>> result =
DeclarationToTable(std::move(pipeline));
Node Types and Operations
Source Nodes
TableSource: Emit batches from in-memory table
TableSourceNodeOptions options;
options.table = input_table;
options.max_batch_size = 4096;
Declaration source{"table_source", options};
Scan: Read from datasets/files (provided by datasets module)
ScanNodeOptions options;
options.dataset = dataset;
options.filter = filter_expression;
options.projection = projection_schema;
Declaration scan{"scan", options};
Project: Compute new columns from expressions
// Add computed column: z = x + y
ProjectNodeOptions options;
options.expressions = {
compute::field_ref("x"),
compute::field_ref("y"),
compute::call("add", {
compute::field_ref("x"),
compute::field_ref("y")
})
};
options.names = {"x", "y", "z"};
Declaration project{"project", {input}, options};
Filter: Remove rows based on predicate
// Keep rows where age >= 18
FilterNodeOptions options;
options.filter_expression = compute::call("greater_equal", {
compute::field_ref("age"),
compute::literal(18)
});
Declaration filter{"filter", {input}, options};
Aggregate: Compute summaries
// SELECT category, SUM(amount), AVG(price)
// FROM input
// GROUP BY category
AggregateNodeOptions options;
options.aggregates = {
{"sum", nullptr, "amount", "sum_amount"},
{"mean", nullptr, "price", "avg_price"}
};
options.keys = {"category"};
Declaration agg{"aggregate", {input}, options};
Join: Combine two streams
// Inner join on key column
HashJoinNodeOptions options;
options.join_type = JoinType::INNER;
options.left_keys = {FieldRef("id")};
options.right_keys = {FieldRef("user_id")};
options.output_suffix_for_left = "";
options.output_suffix_for_right = "_right";
Declaration join{"hashjoin", {left, right}, options};
Sink Nodes
TableSink: Accumulate into in-memory table
std::shared_ptr<Table> output_table;
TableSinkNodeOptions options;
options.output_table = &output_table;
Declaration sink{"table_sink", {input}, options};
Write: Write to dataset/files (datasets module)
WriteNodeOptions options;
options.base_dir = "/path/to/output";
options.format = std::make_shared<ParquetFileFormat>();
options.partitioning = partition_schema;
Declaration write{"write", {input}, options};
Data Ordering
Acero tracks ordering guarantees through execution:
struct Ordering {
// Explicit sort keys
std::vector<SortKey> sort_keys;
// Ordering type
enum Type {
kUnordered, // No guaranteed order (e.g., hash join)
kImplicit // Ordered but not by any column (e.g., row number)
};
Type type = kUnordered;
};
struct SortKey {
FieldRef field;
SortOrder order; // Ascending or Descending
};
Ordering through nodes:
- Scan/TableSource: May provide implicit ordering (row order)
- Filter/Project: Preserve input ordering
- OrderBy: Establish new explicit ordering
- HashAggregate: Destroy ordering (hash-based)
- GroupAggregate: May preserve ordering if pre-sorted
- HashJoin: Destroy ordering
- AsofJoin: Requires and preserves ordering
// Example: Ordering through pipeline
// 1. TableSource: Implicit ordering (row order)
Ordering{.type = kImplicit}
// 2. OrderBy: Establish explicit ordering
Ordering{
.sort_keys = {{"timestamp", SortOrder::Ascending}},
.type = kOrdered
}
// 3. Filter: Preserve ordering
Ordering{
.sort_keys = {{"timestamp", SortOrder::Ascending}},
.type = kOrdered
}
// 4. HashJoin: Destroy ordering
Ordering{.type = kUnordered}
Batch indices: Even if data is unordered, nodes assign sequential indices to batches. This enables downstream operators to re-establish order if needed.
Execution Model
Push-Based Streaming
Acero uses a push-based model where producers drive execution:
// Simplified execution flow
void SourceNode::StartProducing() {
// Start background thread/async tasks
scheduler_->Submit([this]() {
while (HasMoreData()) {
ExecBatch batch = ProduceNextBatch();
output_->InputReceived(this, std::move(batch));
}
output_->InputFinished(this, total_batches_);
});
}
void TransformNode::InputReceived(ExecNode* input, ExecBatch batch) {
// Process batch
ExecBatch transformed = TransformBatch(batch);
// Push to next node
output_->InputReceived(this, std::move(transformed));
}
Backpressure
Nodes can apply backpressure to prevent memory overflow:
// Slow consumer signals backpressure
void SlowNode::InputReceived(ExecNode* input, ExecBatch batch) {
if (queue_.size() > kMaxQueueSize) {
// Pause input
input->PauseProducing(this);
}
queue_.push(std::move(batch));
// Process later, then resume
ProcessQueue();
if (queue_.size() < kResumeThreshold) {
input->ResumeProducing(this);
}
}
Parallelism
Intra-node parallelism: Process multiple batches concurrently
// ThreadedTaskGroup processes batches in parallel
void ProjectNode::InputReceived(ExecNode* input, ExecBatch batch) {
task_group_->Append([this, batch = std::move(batch)]() mutable {
ExecBatch result = EvaluateExpressions(batch);
output_->InputReceived(this, std::move(result));
});
}
Inter-node parallelism: Multiple independent paths execute concurrently
Sources A and B can produce batches in parallel.
Thread safety: Nodes must handle concurrent InputReceived() calls safely. Most nodes use task groups or mutexes for synchronization.
Expression Integration
Acero integrates with Arrow’s compute expression system:
// Expressions compose function calls
auto expr = compute::call("add", {
compute::call("multiply", {
compute::field_ref("x"),
compute::literal(2)
}),
compute::field_ref("y")
});
// Represents: (x * 2) + y
// Use in project node
ProjectNodeOptions options;
options.expressions = {expr};
options.names = {"result"};
Expression evaluation:
Expressions are:
- Simplified (constant folding, etc.)
- Compiled to execution plan
- Executed on each batch
- Results combined with original columns
Architecture Patterns
Operator Fusion
Multiple operations can be fused into single node:
// Instead of: Scan -> Filter -> Project
// Use: ScanWithFilterAndProjection
ScanNodeOptions options;
options.dataset = dataset;
options.filter = filter_expr; // Pushdown
options.projection = project_schema; // Pushdown
Declaration scan{"scan", options};
Benefits:
- Fewer batch copies
- Better cache locality
- Reduced materialization
Batch Size Adaptation
Nodes can adjust batch sizes:
// Small input batches -> accumulate
if (input_batch.length < kMinBatchSize) {
accumulator_.push_back(std::move(input_batch));
if (accumulator_.total_length() >= kTargetBatchSize) {
ExecBatch large_batch = Concatenate(accumulator_);
output_->InputReceived(this, std::move(large_batch));
accumulator_.clear();
}
}
// Large input batches -> split
if (input_batch.length > kMaxBatchSize) {
for (ExecBatch slice : SplitBatch(input_batch, kTargetBatchSize)) {
output_->InputReceived(this, std::move(slice));
}
}
Streaming Aggregation
Aggregates accumulate state across batches:
class StreamingAggregateNode : public ExecNode {
// Accumulator state
struct GroupState {
std::vector<std::unique_ptr<KernelState>> agg_states;
};
std::unordered_map<GroupKey, GroupState> groups_;
void InputReceived(ExecNode* input, ExecBatch batch) override {
// Update accumulators
for (int64_t i = 0; i < batch.length; ++i) {
GroupKey key = ExtractKey(batch, i);
GroupState& state = groups_[key];
UpdateAggregates(state, batch, i);
}
}
void InputFinished(ExecNode* input, int total) override {
// Emit final results
ExecBatch result = FinalizeAggregates(groups_);
output_->InputReceived(this, std::move(result));
output_->InputFinished(this, 1);
}
};
Substrait Integration
Acero can consume Substrait query plans:
// Convert Substrait plan to Acero declarations
Result<DeclarationInfo> ConsumeSubstraitPlan(
const substrait::Plan& plan,
const ConversionOptions& options = {});
// Execute Substrait plan
Result<std::shared_ptr<Table>> ExecuteSubstraitPlan(
const substrait::Plan& plan);
This enables:
- Cross-engine plan interchange
- Language-agnostic query building
- Optimizer integration
Memory Management
- Bounded memory: Configure max batch size and queue depths
- Streaming: Don’t materialize entire result set
- Zero-copy: Use slicing and buffer reuse
// Configure memory limits
QueryOptions options;
options.max_batch_size = 8192; // Smaller batches
auto plan = *ExecPlan::Make(options, exec_context);
Batch Size Tuning
- Too small: High overhead, poor cache usage
- Too large: Memory pressure, latency spikes
- Sweet spot: 1K-64K rows (depends on row width)
Parallelism
- CPU-bound: Increase thread pool size
- I/O-bound: Increase I/O concurrency
- Memory-bound: Reduce batch size
// Configure thread pool
ExecContext exec_context;
exec_context.executor = arrow::internal::ThreadPool::Make(16);
Architecture Decisions
Why Push-Based?
Design Rationale: Push-based execution provides:
- Natural backpressure: Slow consumers naturally slow producers
- Efficient pipelining: Data flows through pipeline without polling
- Low latency: Results available as soon as produced
- Simple scheduling: No need for complex work-stealing
Pull-based would require cooperative scheduling and more complex coordination.
Why ExecBatch vs. RecordBatch?
- Scalar optimization: Avoid materializing constants
- Lightweight: No schema overhead per batch
- Extensible: Can add optimization hints
- Internal: Can change without breaking API
Why Declarations?
- Stability: Public API doesn’t expose internal ExecPlan/ExecNode
- Serialization: Can convert to/from Substrait
- Testing: Easier to construct and validate
- Flexibility: Implementation can change without API breaks
- Compute Module: Provides functions and expressions
- Datasets: Provides scan and write nodes
- IPC/Flight: Can stream batches over network
- Substrait: Plan serialization format
Further Reading