Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/arrow/llms.txt
Use this file to discover all available pages before exploring further.
Arrow Flight is a general-purpose client-server framework for high-performance transport of large datasets over network interfaces. Built on top of gRPC and the Arrow columnar format, Flight enables efficient data transfer with minimal serialization overhead.
Overview
Flight provides a set of RPC methods for:
- Data retrieval: Download datasets from a server (DoGet)
- Data upload: Upload datasets to a server (DoPut)
- Bidirectional streaming: Exchange data in both directions (DoExchange)
- Metadata operations: List available datasets, get schemas, execute actions
- Authentication: Secure connections with multiple auth mechanisms
Core Concepts
FlightDescriptor
A FlightDescriptor identifies a dataset or command:
message FlightDescriptor {
enum DescriptorType {
PATH = 1; // Named dataset path
CMD = 2; // Opaque command
}
DescriptorType type = 1;
bytes cmd = 2;
repeated string path = 3;
}
FlightInfo
FlightInfo describes how to access a dataset, including endpoints and schema:
message FlightInfo {
bytes schema = 1;
FlightDescriptor flight_descriptor = 2;
repeated FlightEndpoint endpoint = 3;
int64 total_records = 4;
int64 total_bytes = 5;
}
FlightEndpoint
FlightEndpoint specifies where to retrieve data:
message FlightEndpoint {
Ticket ticket = 1;
repeated Location location = 2;
google.protobuf.Timestamp expiration_time = 3;
}
Client Usage
#include <arrow/flight/client.h>
#include <arrow/flight/types.h>
using namespace arrow;
using namespace arrow::flight;
// Connect to Flight server
ARROW_ASSIGN_OR_RAISE(auto location,
Location::ForGrpcTcp("localhost", 8815));
ARROW_ASSIGN_OR_RAISE(auto client,
FlightClient::Connect(location));
// Get dataset information
FlightDescriptor descriptor;
descriptor.type = FlightDescriptor::PATH;
descriptor.path = {"dataset", "table1"};
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(descriptor));
// Retrieve data from the first endpoint
auto endpoint = info->endpoints()[0];
ARROW_ASSIGN_OR_RAISE(auto stream,
client->DoGet(endpoint.ticket));
// Read all batches
ARROW_ASSIGN_OR_RAISE(auto table,
stream->ToTable());
std::cout << "Rows: " << table->num_rows() << std::endl;
import pyarrow as pa
import pyarrow.flight as flight
# Connect to Flight server
client = flight.FlightClient(("localhost", 8815))
# Get dataset information
descriptor = flight.FlightDescriptor.for_path("dataset", "table1")
info = client.get_flight_info(descriptor)
# Retrieve data from the first endpoint
endpoint = info.endpoints[0]
reader = client.do_get(endpoint.ticket)
# Read all batches into a table
table = reader.read_all()
print(f"Rows: {len(table)}")
import org.apache.arrow.flight.*;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
// Create allocator and connect
BufferAllocator allocator = new RootAllocator();
FlightClient client = FlightClient.builder()
.allocator(allocator)
.location(Location.forGrpcInsecure("localhost", 8815))
.build();
// Get dataset information
FlightDescriptor descriptor = FlightDescriptor
.path("dataset", "table1");
FlightInfo info = client.getInfo(descriptor);
// Retrieve data
Ticket ticket = info.getEndpoints().get(0).getTicket();
FlightStream stream = client.getStream(ticket);
// Read batches
while (stream.next()) {
VectorSchemaRoot root = stream.getRoot();
System.out.println("Batch rows: " + root.getRowCount());
}
Server Implementation
#include <arrow/flight/server.h>
#include <arrow/flight/types.h>
#include <arrow/table.h>
class MyFlightServer : public FlightServerBase {
public:
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<FlightInfo>* info) override {
// Build schema for the dataset
auto schema = arrow::schema({
arrow::field("id", arrow::int64()),
arrow::field("name", arrow::utf8())
});
// Create location and ticket
ARROW_ASSIGN_OR_RAISE(auto location,
Location::ForGrpcTcp("localhost", 8815));
FlightEndpoint endpoint;
endpoint.ticket.ticket = "ticket-123";
endpoint.locations.push_back(location);
// Build FlightInfo
ARROW_ASSIGN_OR_RAISE(*info,
FlightInfo::Make(*schema, request, {endpoint}, -1, -1));
return Status::OK();
}
Status DoGet(const ServerCallContext& context,
const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override {
// Create sample data
auto schema = arrow::schema({
arrow::field("id", arrow::int64()),
arrow::field("name", arrow::utf8())
});
// Build record batch (simplified)
// In production, fetch from database or storage
std::shared_ptr<RecordBatch> batch;
// ... create batch ...
// Create reader from batches
std::vector<std::shared_ptr<RecordBatch>> batches = {batch};
ARROW_ASSIGN_OR_RAISE(auto reader,
RecordBatchReader::Make(batches, schema));
*stream = std::make_unique<RecordBatchStream>(reader);
return Status::OK();
}
};
// Start server
int main() {
ARROW_ASSIGN_OR_RAISE(auto location,
Location::ForGrpcTcp("0.0.0.0", 8815));
FlightServerOptions options(location);
auto server = std::make_unique<MyFlightServer>();
ARROW_RETURN_NOT_OK(server->Init(options));
std::cout << "Server listening on port "
<< server->port() << std::endl;
return server->Serve();
}
import pyarrow as pa
import pyarrow.flight as flight
class MyFlightServer(flight.FlightServerBase):
def get_flight_info(self, context, descriptor):
# Define schema
schema = pa.schema([
('id', pa.int64()),
('name', pa.string())
])
# Create endpoint
endpoint = flight.FlightEndpoint(
ticket=b"ticket-123",
locations=[flight.Location.for_grpc_tcp("localhost", 8815)]
)
return flight.FlightInfo(
schema=schema,
descriptor=descriptor,
endpoints=[endpoint],
total_records=-1,
total_bytes=-1
)
def do_get(self, context, ticket):
# Create sample data
schema = pa.schema([
('id', pa.int64()),
('name', pa.string())
])
data = [
pa.array([1, 2, 3]),
pa.array(['Alice', 'Bob', 'Charlie'])
]
batch = pa.record_batch(data, schema=schema)
return flight.RecordBatchStream(pa.Table.from_batches([batch]))
# Start server
if __name__ == '__main__':
location = flight.Location.for_grpc_tcp("0.0.0.0", 8815)
server = MyFlightServer(location)
print(f"Server listening on {location.uri}")
server.serve()
Uploading Data (DoPut)
// Client: Upload data to server
FlightDescriptor descriptor;
descriptor.type = FlightDescriptor::CMD;
descriptor.cmd = "insert_data";
auto schema = arrow::schema({
arrow::field("id", arrow::int64()),
arrow::field("value", arrow::float64())
});
ARROW_ASSIGN_OR_RAISE(auto do_put_result,
client->DoPut(descriptor, schema));
// Write batches
std::shared_ptr<RecordBatch> batch;
// ... create batch ...
ARROW_RETURN_NOT_OK(do_put_result.writer->WriteRecordBatch(*batch));
ARROW_RETURN_NOT_OK(do_put_result.writer->DoneWriting());
ARROW_RETURN_NOT_OK(do_put_result.writer->Close());
// Read server response metadata
std::shared_ptr<Buffer> metadata;
ARROW_RETURN_NOT_OK(do_put_result.reader->ReadMetadata(&metadata));
# Client: Upload data to server
descriptor = flight.FlightDescriptor.for_command(b"insert_data")
schema = pa.schema([
('id', pa.int64()),
('value', pa.float64())
])
writer, reader = client.do_put(descriptor, schema)
# Write batches
batch = pa.record_batch(
[pa.array([1, 2, 3]), pa.array([1.1, 2.2, 3.3])],
schema=schema
)
writer.write_batch(batch)
writer.done_writing()
# Read server response
metadata = reader.read()
Authentication
// Basic authentication
FlightCallOptions options;
std::string username = "user";
std::string password = "password";
ARROW_ASSIGN_OR_RAISE(auto result,
client->AuthenticateBasicToken(options, username, password));
std::string bearer_token = result.first;
// Use token in subsequent calls
FlightCallOptions authenticated_options;
authenticated_options.headers.push_back(
{"authorization", "Bearer " + bearer_token});
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(authenticated_options, descriptor));
# Basic authentication
client.authenticate_basic_token(b"user", b"password")
# Token is automatically included in subsequent requests
info = client.get_flight_info(descriptor)
Actions
Actions allow custom operations beyond standard data transfer:
// List available actions
ARROW_ASSIGN_OR_RAISE(auto actions, client->ListActions());
for (const auto& action : actions) {
std::cout << "Action: " << action.type
<< " - " << action.description << std::endl;
}
// Execute a custom action
Action action;
action.type = "drop_dataset";
action.body = Buffer::FromString("{\"name\": \"old_data\"}");
ARROW_ASSIGN_OR_RAISE(auto results, client->DoAction(action));
// Process results
std::unique_ptr<Result> result;
while (true) {
ARROW_ASSIGN_OR_RAISE(result, results->Next());
if (!result) break;
// Process result.body
}
# List available actions
actions = client.list_actions()
for action in actions:
print(f"Action: {action.type} - {action.description}")
# Execute a custom action
action = flight.Action("drop_dataset", b'{"name": "old_data"}')
results = client.do_action(action)
# Process results
for result in results:
print(result.body)
- Batch Size: Use appropriate batch sizes (typically 64K-1M rows)
- Parallel Endpoints: Leverage multiple endpoints for parallel data transfer
- Compression: Enable IPC compression for network-bound workloads
- Connection Pooling: Reuse client connections when possible
- Streaming: Use DoExchange for bidirectional streaming workloads