Source code for xoutil.fs

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#

'''File system utilities.

This module contains file-system utilities that could have side-effects. For
path-handling functions that have no side-effects look at
`xoutil.fs.path`:mod:.

'''


from __future__ import (division as _py3_division,
                        print_function as _py3_print,
                        absolute_import as _py3_abs_import)

import sys
import os
from re import compile as _rcompile
from xoutil.fs.path import normalize_path


re_magic = _rcompile('[*?[]')
has_magic = lambda s: re_magic.search(s) is not None


def _get_regex(pattern=None, regex_pattern=None, shell_pattern=None):
    from functools import reduce
    import fnmatch
    from xoutil.params import check_count
    arg_count = reduce(lambda count, p: count + (1 if p is not None else 0),
                       (pattern, regex_pattern, shell_pattern), 0)
    check_count(arg_count, 0, 1, caller='_get_regex')    # XXX: WTF?!
    if arg_count == 1:
        if pattern is not None:
            if pattern.startswith('(?') or pattern.startswith('^(?'):
                regex_pattern = pattern
            else:
                shell_pattern = pattern
        return _rcompile(regex_pattern or fnmatch.translate(shell_pattern))
    elif arg_count == 0:
        return None


[docs]def iter_files(top='.', pattern=None, regex_pattern=None, shell_pattern=None, followlinks=False, maxdepth=None): '''Iterate filenames recursively. :param top: The top directory for recurse into. :param pattern: A pattern of the files you want to get from the iterator. It should be a string. If it starts with "(?" it will be regarded as a regular expression, otherwise a shell pattern. :param regex_pattern: An *alternative* to `pattern`. This will always be regarded as a regular expression. :param shell_pattern: An *alternative* to `pattern`. This should be a shell pattern. :param followlinks: The same meaning that in `os.walk`. .. versionadded:: 1.2.1 :param maxdepth: Only files above this level will be yielded. If None, no limit is placed. .. versionadded:: 1.2.1 .. warning:: It's an error to pass more than pattern argument. ''' regex = _get_regex(pattern, regex_pattern, shell_pattern) depth = 0 for dirpath, _dirs, filenames in os.walk(normalize_path(top), topdown=True, followlinks=followlinks): for filename in filenames: path = os.path.join(dirpath, filename) if (regex is None) or regex.search(path): yield path if maxdepth is not None: depth += 1 if depth >= maxdepth: _dirs[:] = []
# ------------------------------ iter_dict_files ------------------------------ _REGEX_PYTHON_PACKAGE = _rcompile(r'^(?P<dir>.+(?=/)/)?' r'(?P<packagename>[^/_-]+?)' r'([-_][Vv]?(?P<version>\d+([.-_]\w+)*))?' r'(?P<ext>[.](tar[.](gz|bz2)|zip|egg|tgz))$') _REGEX_DEFAULT_ALLFILES = _rcompile(r'^(?P<dir>.+(?=/)/)?' r'(?P<filename>[^/]+?)' r'([.](?P<ext>[^.]+))?$') def iter_dict_files(top='.', regex=None, wrong=None, followlinks=False): ''' Iterate filenames recursively. :param top: The top directory for recurse into. :param regex: Regular expression with group definitions to match. :param wrong: A key to store full name of not matching files. :param followlinks: The same meaning that in `os.walk`. .. versionadded:: 1.2.1 .. versionadded:: 1.2.0 ''' if regex: from xoutil.eight import string_types if isinstance(regex, string_types): regex = _rcompile(regex) else: regex = _REGEX_DEFAULT_ALLFILES for dirpath, _dirs, filenames in os.walk(normalize_path(top), followlinks=followlinks): for filename in filenames: path = os.path.join(dirpath, filename) match = regex.match(path) if match: yield match.groupdict() elif wrong is not None: yield {wrong: path}
[docs]def iter_dirs(top='.', pattern=None, regex_pattern=None, shell_pattern=None): ''' Iterate directories recursively. The params have analagous meaning that in `iter_files`:func: and the same restrictions. ''' regex = _get_regex(pattern, regex_pattern, shell_pattern) for path, _dirs, _files in os.walk(normalize_path(top)): if (regex is None) or regex.search(path): yield path
[docs]def rmdirs(top='.', pattern=None, regex_pattern=None, shell_pattern=None, exclude=None, confirm=None): '''Removes all empty dirs at `top`. :param top: The top directory to recurse into. :param pattern: A pattern of the dirs you want to remove. It should be a string. If it starts with "(?" it will be regarded as a regular expression, otherwise a shell pattern. :param exclude: A pattern of the dirs you DON'T want to remove. It should be a string. If it starts with "(?" it will be regarded as a regular expression, otherwise a shell pattern. This is a simple commodity to have you not to negate complex patterns. :param regex_pattern: An *alternative* to `pattern`. This will always be regarded as a regular expression. :param shell_pattern: An *alternative* to `pattern`. This should be a shell pattern. :param confirm: A callable that accepts a single argument, which is the path of the directory to be deleted. `confirm` should return True to allow the directory to be deleted. If `confirm` is None, then all matched dirs are deleted. .. note:: In order to avoid common mistakes we won't attempt to remove mount points. .. versionadded:: 1.1.3 ''' regex = _get_regex(pattern, regex_pattern, shell_pattern) exclude = _get_regex(exclude) if confirm is None: confirm = lambda _: True for path, _dirs, _files in os.walk(normalize_path(top)): # XXX: Make clearest next condition if ((regex is None or regex.search(path)) and (exclude is None or not exclude.search(path)) and not _dirs and not _files and confirm(path) and not os.path.ismount(path)): os.rmdir(path)
def regex_rename(top, pattern, repl, maxdepth=None): '''Rename files recursively using regular expressions substitution. :param top: The top directory to start walking. :param pattern: A regular expression pattern. Files whose fullname (including the path) match the expression will be renamed. :param repl: String to use as replacement. You may use backreferences as documented in python's ``re.sub`` function. :param maxdepth: Only walk files up to this level. If None, walk all files. .. versionadded:: 1.2.1 ''' from re import subn as _re_subn from xoutil.eight import string_types if isinstance(pattern, string_types): pattern = _rcompile(pattern) depth = 0 for path, _dirs, files in os.walk(top): for item in files: new_file, count = _re_subn(pattern, repl, item) if count > 0: old = os.path.join(path, item) new = os.path.join(path, new_file) os.rename(old, new) if maxdepth is not None: depth += 1 if depth >= maxdepth: _dirs[:] = [] filter_not_hidden = lambda path, _st: (path[0] != '.') and ('/.' not in path) filter_false = lambda path, stat_info: False def get_regex_filter(regex): '''Return a filter for "walk" based on a regular expression.''' from xoutil.eight import string_types if isinstance(regex, string_types): regex = _rcompile(regex) def _filter(path, stat_info): return regex.match(os.path.basename(path)) is not None return _filter def get_wildcard_filter(pattern): '''Return a filter for "walk" based on a wildcard pattern a la fnmatch.''' regex = _get_regex(pattern) def _filter(path, stat_info): return regex.match(os.path.basename(path)) is not None return _filter def get_mime_filter(mime_start): import mimetypes def _filter(path, stat_info): t = mimetypes.guess_type(path)[0] return t.startswith(mime_start) if t else False return _filter def nice_size(size): '''Formats `size` to a nice human-friendly format by appending one of `Kilo`, `Mega`, `Giga`, `Tera`, `Peta`, or `Eta` suffix. ''' tails = ' KMGTPE' order, highest = 0, len(tails) - 1 while (size >= 1024) and (order < highest): size /= 1024 order += 1 res = ('%.2f' % size).rstrip('0').rstrip('.') return '%s%s' % (res, tails[order])
[docs]def stat(path): ''' Return file or file system status. This is the same as the function ``os.stat`` but raises no error. ''' try: return os.stat(path) except os.error: return None
def lstat(path): '''Same as `os.lstat`, but raises no error.''' try: return os.lstat(path) except os.error: return None def set_stat(fname, stat_info): os.chmod(fname, stat_info.st_mode) os.chown(fname, stat_info.st_uid, stat_info.st_gid) os.utime(fname, (stat_info.st_atime, stat_info.st_mtime)) def read_file(path): '''Read a full file content and return an string.''' try: with open(path, 'r') as f: return f.read() except OSError: return ''
[docs]def listdir(path): '''Same as ``os.listdir`` but normalizes `path` and raises no error.''' try: return os.listdir(normalize_path(path)) except os.error: return []
def _list_magic(dirname, pattern): re = _get_regex(pattern) for name in listdir(dirname or os.curdir): if re.match(name): full = os.path.join(dirname, name) yield full, lstat(full) def _list_one(fname): st = lstat(fname) if st: yield fname, st def _list(pattern): from stat import S_ISDIR as _ISDIR if has_magic(pattern): head, tail = os.path.split(pattern) for dirname, st in _list(head): if _ISDIR(st.st_mode): if has_magic(tail): items = _list_magic(dirname, tail) elif tail: items = _list_one(os.path.join(dirname, tail)) else: items = ((dirname, st),) for item in items: yield item elif pattern: for item in _list_one(pattern): yield item else: yield ('', lstat(os.curdir))
[docs]def imap(func, pattern): r'''Yields `func(file_0, stat_0)`, `func(file_1, stat_1)`, ... for each dir path. The `pattern` may contain: - Simple shell-style wild-cards à la `fnmatch`. - Regex if pattern starts with '(?'. Expressions must be valid, as in "(?:[^.].*)$" or "(?i).*\.jpe?g$". Remember to add the end mark '$' if needed. ''' for item, st in _list(pattern): res = func(item, st) if res is not None: yield res
[docs]def walk_up(start, sentinel): '''Given a `start` directory walk-up the file system tree until either the FS root is reached or the `sentinel` is found. The `sentinel` must be a string containing the file name to be found. .. warning:: If `sentinel` is an absolute path that exists this will return `start`, no matter what `start` is (in windows this could be even different drives). If `start` path exists but is not a directory an OSError is raised. ''' from os.path import abspath, exists, isdir, join, dirname current = abspath(start) if not exists(current) or not isdir(current): raise OSError('Invalid directory "%s"' % current) previouspath = None found = False while not found and current is not previouspath: clue = join(current, sentinel) if exists(clue): found = True else: previouspath = current current = dirname(current) return current if found else None
if sys.version_info < (3, 4, 1): def makedirs(name, mode=0o777, exist_ok=False): """makedirs(path [, mode=0o777][, exist_ok=False]) Super-mkdir; create a leaf directory and all intermediate ones. Works like mkdir, except that any intermediate path segment (not just the rightmost) will be created if it does not exist. If the target directory with the same mode as we specified already exists, raises an OSError if exist_ok is False, otherwise no exception is raised. This is recursive. """ from errno import EEXIST from xoutil.future.codecs import safe_encode head, tail = os.path.split(name) if not tail: head, tail = os.path.split(head) if head and tail and not os.path.exists(head): try: makedirs(head, mode, exist_ok) except OSError as e: # be happy if someone already created the path if e.errno != EEXIST: raise cdir = os.path.curdir if isinstance(tail, bytes): cdir = safe_encode(os.path.curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return try: os.mkdir(name, mode) except OSError as e: if not exist_ok or e.errno != EEXIST or not os.path.isdir(name): raise else: from os import makedirs
[docs]def ensure_filename(filename, yields=False): '''Ensures the existence of a file with a given filename. If the filename is taken and is not pointing to a file (or a link to a file) an OSError is raised. If `exist_ok` is False the filename must not be taken; an OSError is raised otherwise. The function creates all directories if needed. See `makedirs`:func: for restrictions. If `yields` is True, returns the file object. This way you may open a file for writing like this:: with ensure_filename('/tmp/good-name-87.txt', yields=True) as fh: fh.write('Do it!') The file is open in mode 'w+b'. .. versionadded:: 1.6.1 Added parameter `yield`. ''' if not os.path.exists(filename): filename = normalize_path(filename) dirname = os.path.dirname(filename) makedirs(dirname, exist_ok=True) # TODO: Better hanlding of mode for reading/writing. fh = open(filename, 'w+b') if not yields: fh.close() else: return fh else: if not os.path.isfile(filename): raise OSError('Expected a file but another thing is found \'%s\'' % filename)
[docs]def concatfiles(*files): '''Concat several files to a single one. Each positional argument must be either: - a file-like object (ready to be passed to `shutil.copyfileobj`:func:) - a string, the file path. The last positional argument is the target. If it's file-like object it must be open for writing, and the caller is the responsible for closing it. Alternatively if there are only two positional arguments and the first is a collection, the sources will be the members of the first argument. ''' import shutil from xoutil.eight import string_types from xoutil.values.simple import force_iterable_coerce from xoutil.params import check_count check_count(files, 2, caller='concatfiles') if len(files) == 2: files, target = force_iterable_coerce(files[0]), files[1] else: files, target = files[:-1], files[-1] if isinstance(target, string_types): target, opened = open(target, 'wb'), True else: opened = False try: for f in files: if isinstance(f, string_types): fh = open(f, 'rb') closefh = True else: fh = f closefh = False try: shutil.copyfileobj(fh, target) finally: if closefh: fh.close() finally: if opened: target.close()
del sys