Apache Arrow defines efficient binary formats for serializing record batches, crucial for high-performance data processing and interchange. These formats are categorized into two main types: streaming and file formats, each serving distinct purposes in data manipulation. Understanding these formats is key to leveraging Arrow’s capabilities for optimized data workflows.
Delving into Writing and Reading Streams
Arrow’s streaming format is designed for transmitting sequences of record batches of arbitrary length. This format is inherently sequential, requiring processing from the beginning to the end, and does not support random access. Conversely, the file format, also known as the Random Access format, is tailored for serializing a fixed number of record batches. Its support for random access makes it particularly advantageous when working with memory maps, enabling efficient data retrieval and manipulation.
To effectively utilize the examples in this section, it is recommended to first familiarize yourself with the concepts outlined in the Memory and IO section of the Arrow documentation.
Practical Guide to Using Streams
Let’s begin by constructing a simple record batch to illustrate the streaming process:
<span>In [1]: </span><span>import</span><span>pyarrow</span><span>as</span><span>pa</span> <span>In [2]: </span><span>data</span> <span>=</span> <span>[</span> <span> ...: </span> <span>pa</span><span>.</span><span>array</span><span>([</span><span>1</span><span>,</span> <span>2</span><span>,</span> <span>3</span><span>,</span> <span>4</span><span>]),</span> <span> ...: </span> <span>pa</span><span>.</span><span>array</span><span>([</span><span>'foo'</span><span>,</span> <span>'bar'</span><span>,</span> <span>'baz'</span><span>,</span> <span>None</span><span>]),</span> <span> ...: </span> <span>pa</span><span>.</span><span>array</span><span>([</span><span>True</span><span>,</span> <span>None</span><span>,</span> <span>False</span><span>,</span> <span>True</span><span>])</span> <span> ...: </span><span>]</span> <span> ...: </span> <span>In [3]: </span><span>batch</span> <span>=</span> <span>pa</span><span>.</span><span>record_batch</span><span>(</span><span>data</span><span>,</span> <span>names</span><span>=</span><span>[</span><span>'f0'</span><span>,</span> <span>'f1'</span><span>,</span> <span>'f2'</span><span>])</span> <span>In [4]: </span><span>batch</span><span>.</span><span>num_rows</span> <span>Out[4]: </span><span>4</span> <span>In [5]: </span><span>batch</span><span>.</span><span>num_columns</span> <span>Out[5]: </span><span>3</span>
This code snippet initializes a record batch with three columns of different data types: integers, strings, and booleans, showcasing the versatility of Arrow in handling diverse data structures.
Now, we can proceed to create a stream containing multiple instances of this batch. For this purpose, we employ <span>RecordBatchStreamWriter</span>
, designed to write to a writable <span>NativeFile</span>
object or any writable Python object. For convenience, the [
new_stream()](generated/pyarrow.ipc.new_stream.html#pyarrow.ipc.new_stream "pyarrow.ipc.new_stream")
function provides an easy way to instantiate <span>RecordBatchStreamWriter</span>
:
<span>In [6]: </span><span>sink</span> <span>=</span> <span>pa</span><span>.</span><span>BufferOutputStream</span><span>()</span> <span>In [7]: </span><span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>new_stream</span><span>(</span><span>sink</span><span>,</span> <span>batch</span><span>.</span><span>schema</span><span>)</span> <span>as</span> <span>writer</span><span>:</span> <span> ...: </span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>5</span><span>):</span> <span> ...: </span> <span>writer</span><span>.</span><span>write_batch</span><span>(</span><span>batch</span><span>)</span> <span> ...: </span>
In this example, we utilize an in-memory Arrow buffer stream (<span>sink</span>
). However, it’s important to note that this could equally be a socket, a file, or any other IO sink, demonstrating the adaptability of Arrow streams to various data transport mechanisms.
When initializing the <span>StreamWriter</span>
, we provide the schema of the record batch. This is crucial because Arrow streams mandate that all batches within a single stream must adhere to the same schema, ensuring data consistency and predictability throughout the stream.
Following the stream creation and writing process, we can retrieve the stream content as an in-memory byte buffer and examine its size:
<span>In [8]: </span><span>buf</span> <span>=</span> <span>sink</span><span>.</span><span>getvalue</span><span>()</span> <span>In [9]: </span><span>buf</span><span>.</span><span>size</span> <span>Out[9]: </span><span>1984</span>
The variable <span>buf</span>
now holds the complete stream as a byte buffer in memory, ready for further processing or transmission. To read this stream, we can use <span>RecordBatchStreamReader</span>
or the more convenient [
pyarrow.ipc.open_stream](generated/pyarrow.ipc.open_stream.html#pyarrow.ipc.open_stream "pyarrow.ipc.open_stream")
function:
<span>In [10]: </span><span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>open_stream</span><span>(</span><span>buf</span><span>)</span> <span>as</span> <span>reader</span><span>:</span> <span> ....: </span> <span>schema</span> <span>=</span> <span>reader</span><span>.</span><span>schema</span> <span> ....: </span> <span>batches</span> <span>=</span> <span>[</span><span>b</span> <span>for</span> <span>b</span> <span>in</span> <span>reader</span><span>]</span> <span> ....: </span> <span>In [11]: </span><span>schema</span> <span>Out[11]: </span> <span>f0: int64</span> <span>f1: string</span> <span>f2: bool</span> <span>In [12]: </span><span>len</span><span>(</span><span>batches</span><span>)</span> <span>Out[12]: </span><span>5</span>
This demonstrates how to open a stream for reading, extract the schema, and iterate through the batches contained within the stream.
Verifying that the retrieved batches are identical to the original input batches is straightforward:
<span>In [13]: </span><span>batches</span><span>[</span><span>0</span><span>]</span><span>.</span><span>equals</span><span>(</span><span>batch</span><span>)</span> <span>Out[13]: </span><span>True</span>
A significant advantage of Arrow streams is their ability to perform zero-copy reads when the input source supports it, such as with memory maps or <span>pyarrow.BufferReader</span>
. In such cases, reading data from the stream does not involve allocating new memory, leading to substantial performance gains, especially when dealing with large datasets. This efficiency is particularly relevant in scenarios where minimizing memory overhead is critical, such as real-time data streaming and processing.
Working with Random Access Files
The <span>RecordBatchFileWriter</span>
in Arrow provides an API mirroring that of <span>RecordBatchStreamWriter</span>
, but for file-based random access. You can create a <span>RecordBatchFileWriter</span>
using the <span>new_file()</span>
function:
<span>In [14]: </span><span>sink</span> <span>=</span> <span>pa</span><span>.</span><span>BufferOutputStream</span><span>()</span> <span>In [15]: </span><span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>new_file</span><span>(</span><span>sink</span><span>,</span> <span>batch</span><span>.</span><span>schema</span><span>)</span> <span>as</span> <span>writer</span><span>:</span> <span> ....: </span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>10</span><span>):</span> <span> ....: </span> <span>writer</span><span>.</span><span>write_batch</span><span>(</span><span>batch</span><span>)</span> <span> ....: </span> <span>In [16]: </span><span>buf</span> <span>=</span> <span>sink</span><span>.</span><span>getvalue</span><span>()</span> <span>In [17]: </span><span>buf</span><span>.</span><span>size</span> <span>Out[17]: </span><span>4226</span>
The crucial distinction between <span>RecordBatchFileReader</span>
and <span>RecordBatchStreamReader</span>
lies in the input source requirements. <span>RecordBatchFileReader</span>
necessitates that the input source implements a <span>seek</span>
method to enable random access, whereas <span>RecordBatchStreamReader</span>
only requires sequential read operations. Opening a file for random access reading is facilitated by the <span>open_file()</span>
method:
<span>In [18]: </span><span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>open_file</span><span>(</span><span>buf</span><span>)</span> <span>as</span> <span>reader</span><span>:</span> <span> ....: </span> <span>num_record_batches</span> <span>=</span> <span>reader</span><span>.</span><span>num_record_batches</span> <span> ....: </span> <span>In [19]: </span><span>b</span> <span>=</span> <span>reader</span><span>.</span><span>get_batch</span><span>(</span><span>3</span><span>)</span>
Because the entire payload is accessible in the file format, we can determine the total number of record batches within the file and retrieve any batch at random, providing flexibility in data access patterns.
<span>In [20]: </span><span>num_record_batches</span> <span>Out[20]: </span><span>10</span> <span>In [21]: </span><span>b</span><span>.</span><span>equals</span><span>(</span><span>batch</span><span>)</span> <span>Out[21]: </span><span>True</span>
This demonstrates the random access capability, where we directly fetch the 4th batch (index 3) from the file, bypassing sequential reading, which is highly beneficial for applications requiring selective data retrieval.
Seamless Integration with pandas DataFrames
For users working with pandas, Arrow simplifies the process of reading stream and file formats into pandas DataFrames. Both <span>RecordBatchFileReader</span>
and <span>RecordBatchStreamReader</span>
classes offer a <span>read_pandas</span>
method. This method efficiently reads multiple record batches and seamlessly converts them into a single DataFrame, streamlining data ingestion into pandas workflows:
<span>In [22]: </span><span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>open_file</span><span>(</span><span>buf</span><span>)</span> <span>as</span> <span>reader</span><span>:</span> <span> ....: </span> <span>df</span> <span>=</span> <span>reader</span><span>.</span><span>read_pandas</span><span>()</span> <span> ....: </span> <span>In [23]: </span><span>df</span><span>[:</span><span>5</span><span>]</span> <span>Out[23]: </span> <span> f0 f1 f2</span> <span>0 1 foo True</span> <span>1 2 bar None</span> <span>2 3 baz False</span> <span>3 4 None True</span> <span>4 1 foo True</span>
This integration significantly simplifies the transition between Arrow’s efficient data representation and pandas’ data analysis capabilities, making Arrow a valuable tool in pandas-centric data pipelines.
Achieving Efficiency in Writing and Reading Arrow Data
Arrow’s architecture, optimized for zero-copy operations and memory-mapped data, facilitates highly efficient reading and writing of arrays with minimal memory footprint. When dealing with raw Arrow data, both the Arrow File Format and the Arrow Streaming Format provide robust mechanisms for serialization and deserialization.
To persist an array to a file, the <span>new_file()</span>
function is employed to create a <span>RecordBatchFileWriter</span>
instance. This writer enables the segmentation of data into batches for writing to the designated file.
For instance, to write an array containing 10 million integers, we can divide it into 1000 batches, each holding 10,000 entries:
<span>In [24]: </span><span>BATCH_SIZE</span> <span>=</span> <span>10000</span> <span>In [25]: </span><span>NUM_BATCHES</span> <span>=</span> <span>1000</span> <span>In [26]: </span><span>schema</span> <span>=</span> <span>pa</span><span>.</span><span>schema</span><span>([</span><span>pa</span><span>.</span><span>field</span><span>(</span><span>'nums'</span><span>,</span> <span>pa</span><span>.</span><span>int32</span><span>())])</span> <span>In [27]: </span><span>with</span> <span>pa</span><span>.</span><span>OSFile</span><span>(</span><span>'bigfile.arrow'</span><span>,</span> <span>'wb'</span><span>)</span> <span>as</span> <span>sink</span><span>:</span> <span> ....: </span> <span>with</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>new_file</span><span>(</span><span>sink</span><span>,</span> <span>schema</span><span>)</span> <span>as</span> <span>writer</span><span>:</span> <span> ....: </span> <span>for</span> <span>row</span> <span>in</span> <span>range</span><span>(</span><span>NUM_BATCHES</span><span>):</span> <span> ....: </span> <span>batch</span> <span>=</span> <span>pa</span><span>.</span><span>record_batch</span><span>([</span><span>pa</span><span>.</span><span>array</span><span>(</span><span>range</span><span>(</span><span>BATCH_SIZE</span><span>),</span> <span>type</span><span>=</span><span>pa</span><span>.</span><span>int32</span><span>())],</span> <span>schema</span><span>)</span> <span> ....: </span> <span>writer</span><span>.</span><span>write</span><span>(</span><span>batch</span><span>)</span> <span> ....: </span>
While record batches are structured to accommodate multiple columns, in practical scenarios, data is often organized and written in a tabular format, akin to a <span>Table</span>
.
Batch-wise writing is advantageous as it limits the in-memory footprint to only the current batch being processed. However, reading data back can be further optimized using memory mapping, which allows direct referencing of data from disk, eliminating the need for memory allocation during read operations.
Under normal read conditions, loading the file might consume a noticeable amount of memory:
<span>In [28]: </span><span>with</span> <span>pa</span><span>.</span><span>OSFile</span><span>(</span><span>'bigfile.arrow'</span><span>,</span> <span>'rb'</span><span>)</span> <span>as</span> <span>source</span><span>:</span> <span> ....: </span> <span>loaded_array</span> <span>=</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>open_file</span><span>(</span><span>source</span><span>)</span><span>.</span><span>read_all</span><span>()</span> <span> ....: </span> <span>In [29]: </span><span>print</span><span>(</span><span>"LEN:"</span><span>,</span> <span>len</span><span>(</span><span>loaded_array</span><span>))</span> <span>LEN: 10000000</span> <span>In [30]: </span><span>print</span><span>(</span><span>"RSS: </span><span>{}</span><span>MB"</span><span>.</span><span>format</span><span>(</span><span>pa</span><span>.</span><span>total_allocated_bytes</span><span>()</span> <span>>></span> <span>20</span><span>))</span> <span>RSS: 38MB</span>
To maximize read efficiency for large datasets, memory mapping can be employed. This technique enables Arrow to directly access data mapped from disk, bypassing memory allocation and leveraging the operating system’s memory management capabilities. The OS can lazily page in and page out mapped memory as needed, allowing for the processing of arrays larger than available RAM, without incurring write-back overhead.
<span>In [31]: </span><span>with</span> <span>pa</span><span>.</span><span>memory_map</span><span>(</span><span>'bigfile.arrow'</span><span>,</span> <span>'rb'</span><span>)</span> <span>as</span> <span>source</span><span>:</span> <span> ....: </span> <span>loaded_array</span> <span>=</span> <span>pa</span><span>.</span><span>ipc</span><span>.</span><span>open_file</span><span>(</span><span>source</span><span>)</span><span>.</span><span>read_all</span><span>()</span> <span> ....: </span> <span>In [32]: </span><span>print</span><span>(</span><span>"LEN:"</span><span>,</span> <span>len</span><span>(</span><span>loaded_array</span><span>))</span> <span>LEN: 10000000</span> <span>In [33]: </span><span>print</span><span>(</span><span>"RSS: </span><span>{}</span><span>MB"</span><span>.</span><span>format</span><span>(</span><span>pa</span><span>.</span><span>total_allocated_bytes</span><span>()</span> <span>>></span> <span>20</span><span>))</span> <span>RSS: 0MB</span>
Utilizing memory mapping in this manner can drastically reduce the resident memory footprint, as demonstrated by the RSS (Resident Set Size) dropping to 0MB in the example above. This is particularly advantageous when dealing with extremely large datasets that exceed available system memory.
It’s worth noting that other high-level Arrow APIs, such as <span>read_table()</span>
in the Parquet module, also provide a <span>memory_map</span>
option. However, in those contexts, memory mapping might not directly contribute to reducing resident memory consumption due to the complexities of higher-level data structures and operations. For a detailed understanding of memory mapping in Parquet, refer to Reading Parquet and Memory Mapping.
In conclusion, Apache Arrow’s streaming and file formats, combined with memory mapping capabilities, offer a powerful toolkit for efficient data serialization, inter-process communication (IPC), and high-performance data handling, especially in scenarios demanding minimal memory usage and rapid data access. Whether you need to stream large datasets, serialize data for storage, or optimize data exchange between processes, Arrow provides the tools to achieve significant performance improvements.