# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from numbers import Integral
import warnings
from pyarrow.lib import Table
import pyarrow._orc as _orc
from pyarrow.fs import _resolve_filesystem_and_path
class ORCFile:
"""
Reader interface for a single ORC file
Parameters
----------
source : str or pyarrow.NativeFile
Readable source. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
"""
def __init__(self, source):
self.reader = _orc.ORCReader()
self.reader.open(source)
@property
def metadata(self):
"""The file metadata, as an arrow KeyValueMetadata"""
return self.reader.metadata()
@property
def schema(self):
"""The file schema, as an arrow schema"""
return self.reader.schema()
@property
def nrows(self):
"""The number of rows in the file"""
return self.reader.nrows()
@property
def nstripes(self):
"""The number of stripes in the file"""
return self.reader.nstripes()
@property
def file_version(self):
"""Format version of the ORC file, must be 0.11 or 0.12"""
return self.reader.file_version()
@property
def software_version(self):
"""Software instance and version that wrote this file"""
return self.reader.software_version()
@property
def compression(self):
"""Compression codec of the file"""
return self.reader.compression()
@property
def compression_size(self):
"""Number of bytes to buffer for the compression codec in the file"""
return self.reader.compression_size()
@property
def writer(self):
"""Name of the writer that wrote this file.
If the writer is unknown then its Writer ID
(a number) is returned"""
return self.reader.writer()
@property
def writer_version(self):
"""Version of the writer"""
return self.reader.writer_version()
@property
def row_index_stride(self):
"""Number of rows per an entry in the row index or 0
if there is no row index"""
return self.reader.row_index_stride()
@property
def nstripe_statistics(self):
"""Number of stripe statistics"""
return self.reader.nstripe_statistics()
@property
def content_length(self):
"""Length of the data stripes in the file in bytes"""
return self.reader.content_length()
@property
def stripe_statistics_length(self):
"""The number of compressed bytes in the file stripe statistics"""
return self.reader.stripe_statistics_length()
@property
def file_footer_length(self):
"""The number of compressed bytes in the file footer"""
return self.reader.file_footer_length()
@property
def file_postscript_length(self):
"""The number of bytes in the file postscript"""
return self.reader.file_postscript_length()
@property
def file_length(self):
"""The number of bytes in the file"""
return self.reader.file_length()
def _select_names(self, columns=None):
if columns is None:
return None
schema = self.schema
names = []
for col in columns:
if isinstance(col, Integral):
col = int(col)
if 0 <= col < len(schema):
col = schema[col].name
names.append(col)
else:
raise ValueError("Column indices must be in 0 <= ind < %d,"
" got %d" % (len(schema), col))
else:
return columns
return names
def read_stripe(self, n, columns=None):
"""Read a single stripe from the file.
Parameters
----------
n : int
The stripe index
columns : list
If not None, only these columns will be read from the stripe. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
Returns
-------
pyarrow.RecordBatch
Content of the stripe as a RecordBatch.
"""
columns = self._select_names(columns)
return self.reader.read_stripe(n, columns=columns)
def read(self, columns=None):
"""Read the whole file.
Parameters
----------
columns : list
If not None, only these columns will be read from the file. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'. Output always follows the
ordering of the file and not the `columns` list.
Returns
-------
pyarrow.Table
Content of the file as a Table.
"""
columns = self._select_names(columns)
return self.reader.read(columns=columns)
_orc_writer_args_docs = """file_version : {"0.11", "0.12"}, default "0.12"
Determine which ORC file version to use.
`Hive 0.11 / ORC v0 `_
is the older version
while `Hive 0.12 / ORC v1 `_
is the newer one.
batch_size : int, default 1024
Number of rows the ORC writer writes at a time.
stripe_size : int, default 64 * 1024 * 1024
Size of each ORC stripe in bytes.
compression : string, default 'uncompressed'
The compression codec.
Valid values: {'UNCOMPRESSED', 'SNAPPY', 'ZLIB', 'LZ4', 'ZSTD'}
Note that LZ0 is currently not supported.
compression_block_size : int, default 64 * 1024
Size of each compression block in bytes.
compression_strategy : string, default 'speed'
The compression strategy i.e. speed vs size reduction.
Valid values: {'SPEED', 'COMPRESSION'}
row_index_stride : int, default 10000
The row index stride i.e. the number of rows per
an entry in the row index.
padding_tolerance : double, default 0.0
The padding tolerance.
dictionary_key_size_threshold : double, default 0.0
The dictionary key size threshold. 0 to disable dictionary encoding.
1 to always enable dictionary encoding.
bloom_filter_columns : None, set-like or list-like, default None
Columns that use the bloom filter.
bloom_filter_fpp : double, default 0.05
Upper limit of the false-positive rate of the bloom filter.
"""
class ORCWriter:
__doc__ = """
Writer interface for a single ORC file
Parameters
----------
where : str or pyarrow.io.NativeFile
Writable target. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream
or pyarrow.io.FixedSizeBufferWriter.
{}
""".format(_orc_writer_args_docs)
is_open = False
def __init__(self, where, *,
file_version='0.12',
batch_size=1024,
stripe_size=64 * 1024 * 1024,
compression='uncompressed',
compression_block_size=65536,
compression_strategy='speed',
row_index_stride=10000,
padding_tolerance=0.0,
dictionary_key_size_threshold=0.0,
bloom_filter_columns=None,
bloom_filter_fpp=0.05,
):
self.writer = _orc.ORCWriter()
self.writer.open(
where,
file_version=file_version,
batch_size=batch_size,
stripe_size=stripe_size,
compression=compression,
compression_block_size=compression_block_size,
compression_strategy=compression_strategy,
row_index_stride=row_index_stride,
padding_tolerance=padding_tolerance,
dictionary_key_size_threshold=dictionary_key_size_threshold,
bloom_filter_columns=bloom_filter_columns,
bloom_filter_fpp=bloom_filter_fpp
)
self.is_open = True
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def write(self, table):
"""
Write the table into an ORC file. The schema of the table must
be equal to the schema used when opening the ORC file.
Parameters
----------
table : pyarrow.Table
The table to be written into the ORC file
"""
assert self.is_open
self.writer.write(table)
def close(self):
"""
Close the ORC file
"""
if self.is_open:
self.writer.close()
self.is_open = False
def read_table(source, columns=None, filesystem=None):
filesystem, path = _resolve_filesystem_and_path(source, filesystem)
if filesystem is not None:
source = filesystem.open_input_file(path)
if columns is not None and len(columns) == 0:
result = ORCFile(source).read().select(columns)
else:
result = ORCFile(source).read(columns=columns)
return result
read_table.__doc__ = """
Read a Table from an ORC file.
Parameters
----------
source : str, pyarrow.NativeFile, or file-like object
If a string passed, can be a single file name. For file-like objects,
only read a single file. Use pyarrow.BufferReader to read a file
contained in a bytes or buffer-like object.
columns : list
If not None, only these columns will be read from the file. A column
name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
'a.c', and 'a.d.e'. Output always follows the ordering of the file and
not the `columns` list. If empty, no columns will be read. Note
that the table will still have the correct num_rows set despite having
no columns.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
"""
def write_table(table, where, *,
file_version='0.12',
batch_size=1024,
stripe_size=64 * 1024 * 1024,
compression='uncompressed',
compression_block_size=65536,
compression_strategy='speed',
row_index_stride=10000,
padding_tolerance=0.0,
dictionary_key_size_threshold=0.0,
bloom_filter_columns=None,
bloom_filter_fpp=0.05):
if isinstance(where, Table):
warnings.warn(
"The order of the arguments has changed. Pass as "
"'write_table(table, where)' instead. The old order will raise "
"an error in the future.", FutureWarning, stacklevel=2
)
table, where = where, table
with ORCWriter(
where,
file_version=file_version,
batch_size=batch_size,
stripe_size=stripe_size,
compression=compression,
compression_block_size=compression_block_size,
compression_strategy=compression_strategy,
row_index_stride=row_index_stride,
padding_tolerance=padding_tolerance,
dictionary_key_size_threshold=dictionary_key_size_threshold,
bloom_filter_columns=bloom_filter_columns,
bloom_filter_fpp=bloom_filter_fpp
) as writer:
writer.write(table)
write_table.__doc__ = """
Write a table into an ORC file.
Parameters
----------
table : pyarrow.lib.Table
The table to be written into the ORC file
where : str or pyarrow.io.NativeFile
Writable target. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream
or pyarrow.io.FixedSizeBufferWriter.
{}
""".format(_orc_writer_args_docs)