# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ---------------------------------------------------------------------- # HDFS IO implementation # cython: language_level = 3 import re from pyarrow.lib cimport check_status, _Weakrefable, NativeFile from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_fs cimport * from pyarrow.lib import frombytes, tobytes, ArrowIOError from queue import Queue, Empty as QueueEmpty, Full as QueueFull _HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') def have_libhdfs(): try: with nogil: check_status(HaveLibHdfs()) return True except Exception: return False def strip_hdfs_abspath(path): m = _HDFS_PATH_RE.match(path) if m: return m.group(3) else: return path cdef class HadoopFileSystem(_Weakrefable): cdef: shared_ptr[CIOHadoopFileSystem] client cdef readonly: bint is_open object host object user object kerb_ticket int port dict extra_conf def _connect(self, host, port, user, kerb_ticket, extra_conf): cdef HdfsConnectionConfig conf if host is not None: conf.host = tobytes(host) self.host = host conf.port = port self.port = port if user is not None: conf.user = tobytes(user) self.user = user if kerb_ticket is not None: conf.kerb_ticket = tobytes(kerb_ticket) self.kerb_ticket = kerb_ticket with nogil: check_status(HaveLibHdfs()) if extra_conf is not None and isinstance(extra_conf, dict): conf.extra_conf = {tobytes(k): tobytes(v) for k, v in extra_conf.items()} self.extra_conf = extra_conf with nogil: check_status(CIOHadoopFileSystem.Connect(&conf, &self.client)) self.is_open = True @classmethod def connect(cls, *args, **kwargs): return cls(*args, **kwargs) def __dealloc__(self): if self.is_open: self.close() def close(self): """ Disconnect from the HDFS cluster """ self._ensure_client() with nogil: check_status(self.client.get().Disconnect()) self.is_open = False cdef _ensure_client(self): if self.client.get() == NULL: raise IOError('HDFS client improperly initialized') elif not self.is_open: raise IOError('HDFS client is closed') def exists(self, path): """ Returns True if the path is known to the cluster, False if it does not (or there is an RPC error) """ self._ensure_client() cdef c_string c_path = tobytes(path) cdef c_bool result with nogil: result = self.client.get().Exists(c_path) return result def isdir(self, path): cdef HdfsPathInfo info try: self._path_info(path, &info) except ArrowIOError: return False return info.kind == ObjectType_DIRECTORY def isfile(self, path): cdef HdfsPathInfo info try: self._path_info(path, &info) except ArrowIOError: return False return info.kind == ObjectType_FILE def get_capacity(self): """ Get reported total capacity of file system Returns ------- capacity : int """ cdef int64_t capacity = 0 with nogil: check_status(self.client.get().GetCapacity(&capacity)) return capacity def get_space_used(self): """ Get space used on file system Returns ------- space_used : int """ cdef int64_t space_used = 0 with nogil: check_status(self.client.get().GetUsed(&space_used)) return space_used def df(self): """ Return free space on disk, like the UNIX df command Returns ------- space : int """ return self.get_capacity() - self.get_space_used() def rename(self, path, new_path): cdef c_string c_path = tobytes(path) cdef c_string c_new_path = tobytes(new_path) with nogil: check_status(self.client.get().Rename(c_path, c_new_path)) def info(self, path): """ Return detailed HDFS information for path Parameters ---------- path : string Path to file or directory Returns ------- path_info : dict """ cdef HdfsPathInfo info self._path_info(path, &info) return { 'path': frombytes(info.name), 'owner': frombytes(info.owner), 'group': frombytes(info.group), 'size': info.size, 'block_size': info.block_size, 'last_modified': info.last_modified_time, 'last_accessed': info.last_access_time, 'replication': info.replication, 'permissions': info.permissions, 'kind': ('directory' if info.kind == ObjectType_DIRECTORY else 'file') } def stat(self, path): """ Return basic file system statistics about path Parameters ---------- path : string Path to file or directory Returns ------- stat : dict """ cdef FileStatistics info cdef c_string c_path = tobytes(path) with nogil: check_status(self.client.get() .Stat(c_path, &info)) return { 'size': info.size, 'kind': ('directory' if info.kind == ObjectType_DIRECTORY else 'file') } cdef _path_info(self, path, HdfsPathInfo* info): cdef c_string c_path = tobytes(path) with nogil: check_status(self.client.get() .GetPathInfo(c_path, info)) def ls(self, path, bint full_info): cdef: c_string c_path = tobytes(path) vector[HdfsPathInfo] listing list results = [] int i self._ensure_client() with nogil: check_status(self.client.get() .ListDirectory(c_path, &listing)) cdef const HdfsPathInfo* info for i in range( listing.size()): info = &listing[i] # Try to trim off the hdfs://HOST:PORT piece name = strip_hdfs_abspath(frombytes(info.name)) if full_info: kind = ('file' if info.kind == ObjectType_FILE else 'directory') results.append({ 'kind': kind, 'name': name, 'owner': frombytes(info.owner), 'group': frombytes(info.group), 'last_modified_time': info.last_modified_time, 'last_access_time': info.last_access_time, 'size': info.size, 'replication': info.replication, 'block_size': info.block_size, 'permissions': info.permissions }) else: results.append(name) return results def chmod(self, path, mode): """ Change file permissions Parameters ---------- path : string absolute path to file or directory mode : int POSIX-like bitmask """ self._ensure_client() cdef c_string c_path = tobytes(path) cdef int c_mode = mode with nogil: check_status(self.client.get() .Chmod(c_path, c_mode)) def chown(self, path, owner=None, group=None): """ Change file permissions Parameters ---------- path : string absolute path to file or directory owner : string, default None New owner, None for no change group : string, default None New group, None for no change """ cdef: c_string c_path c_string c_owner c_string c_group const char* c_owner_ptr = NULL const char* c_group_ptr = NULL self._ensure_client() c_path = tobytes(path) if owner is not None: c_owner = tobytes(owner) c_owner_ptr = c_owner.c_str() if group is not None: c_group = tobytes(group) c_group_ptr = c_group.c_str() with nogil: check_status(self.client.get() .Chown(c_path, c_owner_ptr, c_group_ptr)) def mkdir(self, path): """ Create indicated directory and any necessary parent directories """ self._ensure_client() cdef c_string c_path = tobytes(path) with nogil: check_status(self.client.get() .MakeDirectory(c_path)) def delete(self, path, bint recursive=False): """ Delete the indicated file or directory Parameters ---------- path : string recursive : boolean, default False If True, also delete child paths for directories """ self._ensure_client() cdef c_string c_path = tobytes(path) with nogil: check_status(self.client.get() .Delete(c_path, recursive == 1)) def open(self, path, mode='rb', buffer_size=None, replication=None, default_block_size=None): """ Open HDFS file for reading or writing Parameters ---------- mode : string Must be one of 'rb', 'wb', 'ab' Returns ------- handle : HdfsFile """ self._ensure_client() cdef HdfsFile out = HdfsFile() if mode not in ('rb', 'wb', 'ab'): raise Exception("Mode must be 'rb' (read), " "'wb' (write, new file), or 'ab' (append)") cdef c_string c_path = tobytes(path) cdef c_bool append = False # 0 in libhdfs means "use the default" cdef int32_t c_buffer_size = buffer_size or 0 cdef int16_t c_replication = replication or 0 cdef int64_t c_default_block_size = default_block_size or 0 cdef shared_ptr[HdfsOutputStream] wr_handle cdef shared_ptr[HdfsReadableFile] rd_handle if mode in ('wb', 'ab'): if mode == 'ab': append = True with nogil: check_status( self.client.get() .OpenWritable(c_path, append, c_buffer_size, c_replication, c_default_block_size, &wr_handle)) out.set_output_stream( wr_handle) out.is_writable = True else: with nogil: check_status(self.client.get() .OpenReadable(c_path, &rd_handle)) out.set_random_access_file( rd_handle) out.is_readable = True assert not out.closed if c_buffer_size == 0: c_buffer_size = 2 ** 16 out.mode = mode out.buffer_size = c_buffer_size out.parent = _HdfsFileNanny(self, out) out.own_file = True return out def download(self, path, stream, buffer_size=None): with self.open(path, 'rb') as f: f.download(stream, buffer_size=buffer_size) def upload(self, path, stream, buffer_size=None): """ Upload file-like object to HDFS path """ with self.open(path, 'wb') as f: f.upload(stream, buffer_size=buffer_size) # ARROW-404: Helper class to ensure that files are closed before the # client. During deallocation of the extension class, the attributes are # decref'd which can cause the client to get closed first if the file has the # last remaining reference cdef class _HdfsFileNanny(_Weakrefable): cdef: object client object file_handle_ref def __cinit__(self, client, file_handle): import weakref self.client = client self.file_handle_ref = weakref.ref(file_handle) def __dealloc__(self): fh = self.file_handle_ref() if fh: fh.close() # avoid cyclic GC self.file_handle_ref = None self.client = None cdef class HdfsFile(NativeFile): cdef readonly: int32_t buffer_size object mode object parent def __dealloc__(self): self.parent = None