Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/arrow/llms.txt
Use this file to discover all available pages before exploring further.
Substrait is a cross-language specification for data compute operations. Arrow’s Substrait integration enables serializing and deserializing query plans for interoperability between different query engines and data processing systems.
Overview
Arrow’s Substrait support provides:
- Plan serialization: Convert Acero execution plans to Substrait format
- Plan deserialization: Execute Substrait plans using Acero engine
- Expression conversion: Translate between Arrow compute expressions and Substrait
- Type mapping: Bidirectional conversion between Arrow and Substrait types
- Cross-system interoperability: Exchange query plans between systems
Key Concepts
Substrait Plans
A Substrait Plan represents a complete query execution plan with:
- Relations: Operations like scans, filters, projections, aggregations
- Expressions: Computations on data (literals, field references, functions)
- Types: Schema information for data flowing through the plan
- Extensions: Custom functions and types via extension URIs
Arrow Acero Integration
Arrow executes Substrait plans using the Acero query engine:
- Substrait relations map to Acero execution nodes
- Substrait expressions map to Arrow compute expressions
- Plans execute using Arrow’s streaming execution model
Deserializing Substrait Plans
Basic Plan Execution
Execute a Substrait plan using Arrow:
#include "arrow/engine/api.h"
#include "arrow/compute/api.h"
#include "arrow/acero/api.h"
namespace eng = arrow::engine;
namespace ac = arrow::acero;
// Consumer that processes output batches
class MyConsumer : public ac::SinkNodeConsumer {
public:
arrow::Status Init(
const std::shared_ptr<arrow::Schema>& schema,
ac::BackpressureControl* backpressure_control,
ac::ExecPlan* plan) override {
schema_ = schema;
return arrow::Status::OK();
}
arrow::Status Consume(arrow::compute::ExecBatch batch) override {
std::cout << "Received " << batch.length << " rows" << std::endl;
batches_.push_back(batch);
return arrow::Status::OK();
}
arrow::Future<> Finish() override {
std::cout << "Finished: " << batches_.size()
<< " batches" << std::endl;
return arrow::Future<>::MakeFinished();
}
private:
std::shared_ptr<arrow::Schema> schema_;
std::vector<arrow::compute::ExecBatch> batches_;
};
// Load and execute Substrait plan
arrow::Status ExecutePlan(const std::shared_ptr<arrow::Buffer>& plan_buffer) {
// Create consumer for output
auto consumer = std::make_shared<MyConsumer>();
// Deserialize plan to Acero execution plan
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<ac::ExecPlan> exec_plan,
eng::DeserializePlan(*plan_buffer, consumer));
// Execute the plan
ARROW_RETURN_NOT_OK(exec_plan->Validate());
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
// Wait for completion
auto finished = exec_plan->finished();
return finished.status();
}
Multiple Consumer Factory
Handle plans with multiple output relations:
std::vector<std::shared_ptr<ac::SinkNodeConsumer>> consumers;
// Factory creates consumer for each relation
auto consumer_factory = [&]() -> std::shared_ptr<ac::SinkNodeConsumer> {
auto tag = consumers.size();
auto consumer = std::make_shared<MyConsumer>(tag);
consumers.push_back(consumer);
return consumer;
};
// Deserialize plan with multiple relations
ARROW_ASSIGN_OR_RAISE(
std::vector<ac::Declaration> declarations,
eng::DeserializePlans(*plan_buffer, consumer_factory));
std::cout << "Plan has " << declarations.size()
<< " output relations" << std::endl;
Writing to Filesystem
Write plan output directly to files:
#include "arrow/dataset/api.h"
namespace ds = arrow::dataset;
// Factory for write options
auto write_options_factory = []() {
auto options = std::make_shared<ds::WriteNodeOptions>();
// Configure output format
options->format = std::make_shared<ds::ParquetFileFormat>();
// Set output directory
options->base_dir = "/path/to/output";
// Partitioning scheme (optional)
options->partitioning =
std::make_shared<ds::HivePartitioning>(
arrow::schema({
arrow::field("year", arrow::int32()),
arrow::field("month", arrow::int32())
}));
return options;
};
// Deserialize and execute plan writing to filesystem
ARROW_ASSIGN_OR_RAISE(
std::vector<ac::Declaration> declarations,
eng::DeserializePlans(*plan_buffer, write_options_factory));
Serializing to Substrait
Serialize Acero Plan
Convert an Acero execution plan to Substrait:
// Build Acero plan
auto table_source = ac::Declaration(
"table_source",
ac::TableSourceNodeOptions{table});
auto filter = ac::Declaration(
"filter",
{table_source},
ac::FilterNodeOptions{arrow::compute::greater(
arrow::compute::field_ref("value"),
arrow::compute::literal(100))});
auto project = ac::Declaration(
"project",
{filter},
ac::ProjectNodeOptions{{arrow::compute::field_ref("name")}});
// Initialize extension set
eng::ExtensionSet ext_set;
// Serialize to Substrait
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
eng::SerializePlan(project, &ext_set));
// Save or transmit serialized plan
std::ofstream out("plan.substrait", std::ios::binary);
out.write(reinterpret_cast<const char*>(serialized->data()),
serialized->size());
Serialize Expressions
Serialize standalone expressions:
// Create bound expressions
arrow::compute::Expression expr = arrow::compute::call(
"add",
{arrow::compute::field_ref("a"),
arrow::compute::field_ref("b")});
auto schema = arrow::schema({
arrow::field("a", arrow::int32()),
arrow::field("b", arrow::int32())
});
eng::BoundExpressions bound_exprs{
{expr},
schema
};
// Serialize expressions
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
eng::SerializeExpressions(bound_exprs));
Deserialize Expressions
// Deserialize expressions
ARROW_ASSIGN_OR_RAISE(
eng::BoundExpressions bound_exprs,
eng::DeserializeExpressions(*serialized));
std::cout << "Schema: " << bound_exprs.schema->ToString() << std::endl;
std::cout << "Expressions: " << bound_exprs.expressions.size() << std::endl;
for (const auto& expr : bound_exprs.expressions) {
std::cout << " " << expr.ToString() << std::endl;
}
Type Conversion
Serialize Arrow Types
// Serialize Arrow type to Substrait
auto arrow_type = arrow::struct_({
arrow::field("x", arrow::float64()),
arrow::field("y", arrow::float64())
});
eng::ExtensionSet ext_set;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
eng::SerializeType(*arrow_type, &ext_set));
Deserialize Substrait Types
// Deserialize Substrait type to Arrow
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::DataType> arrow_type,
eng::DeserializeType(*serialized, ext_set));
std::cout << "Type: " << arrow_type->ToString() << std::endl;
Schema Conversion
Serialize Schema
auto schema = arrow::schema({
arrow::field("id", arrow::int64()),
arrow::field("name", arrow::utf8()),
arrow::field("value", arrow::float64())
});
eng::ExtensionSet ext_set;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
eng::SerializeSchema(*schema, &ext_set));
Deserialize Schema
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Schema> schema,
eng::DeserializeSchema(*serialized, ext_set));
for (const auto& field : schema->fields()) {
std::cout << field->name() << ": "
<< field->type()->ToString() << std::endl;
}
Extension Sets
Extension sets manage custom functions and types:
// Create extension set
eng::ExtensionSet ext_set;
// Use during serialization
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
eng::SerializePlan(declaration, &ext_set));
// Extensions are tracked in ext_set
std::cout << "Functions used: "
<< ext_set.num_functions() << std::endl;
std::cout << "Types used: "
<< ext_set.num_types() << std::endl;
// Reuse ext_set for consistent anchors
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized2,
eng::SerializePlan(declaration2, &ext_set));
Conversion Options
Control conversion behavior:
eng::ConversionOptions options;
// Strictness level
options.strictness = eng::ConversionStrictness::kBestEffort;
// Named table provider
options.named_table_provider = [](const std::vector<std::string>& names) {
// Return table for given name path
return LoadTable(names);
};
// Use options in deserialization
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<ac::ExecPlan> plan,
eng::DeserializePlan(*buffer, consumer,
/*registry=*/nullptr,
/*ext_set_out=*/nullptr,
options));
Convert between binary and JSON formats:
// Binary to JSON
ARROW_ASSIGN_OR_RAISE(
std::string json,
eng::internal::SubstraitToJSON("Plan", *binary_buffer));
std::cout << "Plan as JSON:\n" << json << std::endl;
// JSON to binary
std::string json_plan = R"({
"relations": [
{"rel": {
"read": {
"base_schema": {
"names": ["a", "b"],
"struct": {
"types": [{"i32": {}}, {"i32": {}}]
}
},
"local_files": {
"items": [{"uri_file": "data.parquet", "parquet": {}}]
}
}
}}
]
})";
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> binary,
eng::internal::SubstraitFromJSON("Plan", json_plan));
Complete Example
Full workflow from serialization to execution:
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/engine/api.h>
#include <arrow/acero/api.h>
// Producer: Serialize plan
arrow::Status SerializePlan(
const arrow::acero::Declaration& declaration,
const std::string& output_path) {
arrow::engine::ExtensionSet ext_set;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> serialized,
arrow::engine::SerializePlan(declaration, &ext_set));
std::ofstream out(output_path, std::ios::binary);
out.write(reinterpret_cast<const char*>(serialized->data()),
serialized->size());
std::cout << "Serialized plan to " << output_path << std::endl;
return arrow::Status::OK();
}
// Consumer: Execute plan
arrow::Status ExecutePlan(const std::string& plan_path) {
// Read serialized plan
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::io::ReadableFile> file,
arrow::io::ReadableFile::Open(plan_path));
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::Buffer> buffer,
file->Read());
// Create consumer
auto consumer = std::make_shared<MyConsumer>();
// Deserialize and execute
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::acero::ExecPlan> plan,
arrow::engine::DeserializePlan(*buffer, consumer));
ARROW_RETURN_NOT_OK(plan->Validate());
ARROW_RETURN_NOT_OK(plan->StartProducing());
return plan->finished().status();
}
Supported Operations
Arrow supports these Substrait relation types:
- Read: Scan data from files or tables
- Filter: Select rows based on predicates
- Project: Compute derived columns
- Aggregate: Group and aggregate data
- Join: Combine data from multiple sources
- Sort: Order data
- Fetch: Limit and offset
- Set operations: Union, intersect, except
Best Practices
Extension Set Management
// Share extension set across related operations
eng::ExtensionSet ext_set;
// Serialize multiple related plans
for (const auto& decl : declarations) {
ARROW_ASSIGN_OR_RAISE(
auto buffer,
eng::SerializePlan(decl, &ext_set)); // Reuse ext_set
// Save buffer...
}
Error Handling
auto result = eng::DeserializePlan(*buffer, consumer);
if (!result.ok()) {
std::cerr << "Failed to deserialize plan: "
<< result.status().ToString() << std::endl;
// Try JSON conversion for debugging
auto json_result = eng::internal::SubstraitToJSON("Plan", *buffer);
if (json_result.ok()) {
std::cerr << "Plan contents:\n" << *json_result << std::endl;
}
return result.status();
}
Version Compatibility
// Check Substrait version in plan
// Plans include version information in extension URIs
// Be prepared to handle plans from different Substrait versions
eng::ConversionOptions options;
options.strictness = eng::ConversionStrictness::kBestEffort;
// Best effort mode tolerates minor version differences
ARROW_ASSIGN_OR_RAISE(
auto plan,
eng::DeserializePlan(*buffer, consumer,
/*registry=*/nullptr,
/*ext_set_out=*/nullptr,
options));
When to Use Substrait
Substrait is beneficial for:
- Multi-system workflows: Pass queries between different engines
- Query federation: Distribute query execution across systems
- Plan optimization: Serialize, optimize externally, then execute
- Remote execution: Send query plans to remote workers
- Testing: Validate query plans across implementations
Direct Acero usage may be better for:
- Single-system execution: No need for serialization overhead
- Performance-critical: Avoid serialization/deserialization cost
- Dynamic plans: Plans built and executed immediately
Important Considerations
- Substrait integration requires Arrow built with
ARROW_SUBSTRAIT=ON
- Not all Acero operations have Substrait equivalents (yet)
- Extension functions must be available in consuming system
- File paths in plans must be accessible from execution environment
- Substrait is evolving - newer features may not be supported
- Always validate deserialized plans before execution