Source code for IceAdvect.utilities

#!/usr/bin/env python
"""
utilities.py
Written by Tyler Sutterley (05/2026)
Download and management utilities for syncing files

PYTHON DEPENDENCIES:
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml
    platformdirs: Python module for determining platform-specific directories
        https://pypi.org/project/platformdirs/

UPDATE HISTORY:
    Updated 05/2026: add exists to URL class to check if URL is valid
        added function to get the github url of an item in the project repo
    Updated 04/2026: add query and path functions to URL class
        added include_algorithm option to get_hash function
    Updated 01/2026: raise original exceptions in cases of HTTPError/URLError
    Written 01/2026
"""

from __future__ import print_function, division, annotations

import sys
import os
import re
import io
import ssl
import json
import time
import shutil
import hashlib
import inspect
import logging
import pathlib
import calendar
import warnings
import importlib
import posixpath
import subprocess
import lxml.etree
import platformdirs

if sys.version_info[0] == 2:
    from urllib import quote_plus
    from cookielib import CookieJar
    from urlparse import urlparse
    import urllib2
else:
    from urllib.parse import quote_plus, urlparse
    from http.cookiejar import CookieJar
    import urllib.request as urllib2

__all__ = [
    "reify",
    "get_data_path",
    "get_cache_path",
    "get_github_url",
    "import_dependency",
    "dependency_available",
    "is_valid_url",
    "Path",
    "URL",
    "compressuser",
    "get_hash",
    "get_git_revision_hash",
    "get_git_status",
    "url_split",
    "convert_arg_line_to_args",
    "get_unix_time",
    "copy",
    "symlink",
    "_create_default_ssl_context",
    "_create_ssl_context_no_verify",
    "_set_ssl_context_options",
    "check_connection",
    "http_list",
    "from_http",
    "from_json",
]


[docs] class reify: """Class decorator that puts the result of the method it decorates into the instance""" def __init__(self, wrapped): self.wrapped = wrapped self.__name__ = wrapped.__name__ self.__doc__ = wrapped.__doc__ def __get__(self, inst, objtype=None): if inst is None: return self val = self.wrapped(inst) setattr(inst, self.wrapped.__name__, val) return val
# PURPOSE: get absolute path within a package from a relative path
[docs] def get_data_path(relpath: list | str | pathlib.Path): """ Get the absolute path within a package from a relative path Parameters ---------- relpath: list, str or pathlib.Path Relative path """ # current file path filename = inspect.getframeinfo(inspect.currentframe()).filename filepath = pathlib.Path(filename).absolute().parent if isinstance(relpath, list): # use *splat operator to extract from list return filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): return filepath.joinpath(relpath)
# PURPOSE: get the path to the user cache directory
[docs] def get_cache_path( relpath: list | str | pathlib.Path | None = None, appname="advect" ): """ Get the path to the user cache directory for an application Parameters ---------- relpath: list, str, pathlib.Path or None Relative path appname: str, default 'advect' Application name """ # get platform-specific cache directory filepath = platformdirs.user_cache_path(appname=appname, ensure_exists=True) if isinstance(relpath, list): # use *splat operator to extract from list filepath = filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): filepath = filepath.joinpath(relpath) return pathlib.Path(filepath)
def get_github_url( relpath: list | str, username: str = "tsutterley", repository: str = "IceAdvect", branch: str = "main", ): """ Get the ``URL`` of an item's raw content from a GitHub repository Parameters ---------- relpath: list or str Relative path username: str, default 'tsutterley' GitHub username for project repository repository: str, default 'IceAdvect' GitHub project name branch: str, default 'main' GitHub branch name Returns ------- raw_url: str item ``URL`` """ # components of the URL for raw content from the project repository HOST = URL("https://raw.githubusercontent.com") HOST = HOST.joinpath(username, repository, "refs", "heads", branch) # check if relative path is a string and convert to list if isinstance(relpath, str): relpath = [relpath] # append the relative path components to the URL raw_url = HOST.joinpath(*relpath).urlname # return the URL for the raw content return raw_url
[docs] def import_dependency( name: str, extra: str = "", raise_exception: bool = False, ): """ Import an optional dependency Adapted from ``pandas.compat._optional::import_optional_dependency`` Parameters ---------- name: str Module name extra: str, default "" Additional text to include in the ``ImportError`` message raise_exception: bool, default False Raise an ``ImportError`` if the module is not found Returns ------- module: obj Imported module """ # check if the module name is a string msg = f"Invalid module name: '{name}'; must be a string" assert isinstance(name, str), msg # default error if module cannot be imported err = f"Missing optional dependency '{name}'. {extra}" module = type("module", (), {}) # try to import the module try: module = importlib.import_module(name) except (ImportError, ModuleNotFoundError) as exc: if raise_exception: raise ImportError(err) from exc else: logging.debug(err) # return the module return module
[docs] def dependency_available( name: str, minversion: str | None = None, ): """ Checks whether a module is installed without importing it Adapted from ``xarray.namedarray.utils.module_available`` Parameters ---------- name: str Module name minversion : str, optional Minimum version of the module Returns ------- available : bool Whether the module is installed """ # check if module is available if importlib.util.find_spec(name) is None: return False # check if the version is greater than the minimum required if minversion is not None: version = importlib.metadata.version(name) return version >= minversion # return if both checks are passed return True
def is_valid_url(url: str) -> bool: """ Checks if a string is a valid URL Parameters ---------- url: str URL to check """ try: result = urlparse(str(url)) return all([result.scheme, result.netloc]) except AttributeError: return False
[docs] def Path(filename: str | pathlib.Path, *args, **kwargs): """ Create a ``URL`` or ``pathlib.Path`` object Parameters ---------- filename: str or pathlib.Path File path or URL """ if is_valid_url(filename): return URL(filename, *args, **kwargs) else: return pathlib.Path(filename, *args, **kwargs).expanduser()
[docs] class URL: """Handles URLs similar to ``pathlib.Path`` objects""" def __init__(self, urlname: str | pathlib.Path, *args, **kwargs): """Initialize a ``URL`` object""" self.urlname = str(urlname) self._raw_paths = list(url_split(self.urlname)) self._headers = {}
[docs] @classmethod def from_parts(cls, parts: str | list | tuple): """ Return a ``URL`` object from components Parameters ---------- parts: str, list or tuple URL components """ # verify that parts are iterable as list or tuple if isinstance(parts, str): return cls(parts) else: return cls("/".join([*parts]))
[docs] def joinpath(self, *pathsegments: list[str]): """Append URL components to existing Parameters ---------- pathsegments: list[str] URL components to append """ return URL("/".join([*self._raw_paths, *pathsegments]))
[docs] def resolve(self): """Resolve the URL""" return URL("/".join([*self._raw_paths]))
[docs] def is_file(self): """Boolean flag if path is a local file""" return False
[docs] def is_dir(self): """Boolean flag if path is a local directory""" return False
[docs] def exists(self): """Boolean flag if ``URL`` is valid""" return is_valid_url(self.urlname)
[docs] def geturl(self): """String representation of the ``URL`` object""" return self._components.geturl()
[docs] def get(self, *args, **kwargs): """Get contents from URL""" return from_http(self.urlname, *args, headers=self._headers, **kwargs)
[docs] def headers(self, *args, **kwargs): """Get headers from URL""" self.urlopen(*args, **kwargs) return self._headers
[docs] def load(self, *args, **kwargs): """Load ``JSON`` response from URL""" return from_json(self.urlname, *args, headers=self._headers, **kwargs)
[docs] def ping(self, *args, **kwargs) -> bool: """Ping URL to check connection""" return check_connection(self.urlname, *args, **kwargs)
[docs] def query(self, *args, **kwargs): """List contents from URL""" return http_list(self.urlname, *args, headers=self._headers, **kwargs)
[docs] def read(self, *args, **kwargs): """Open URL and read response""" return self.urlopen(*args, **kwargs).read()
[docs] def request(self, *args, **kwargs): """Make URL request""" return urllib2.Request(self.urlname)
[docs] def urlopen(self, *args, **kwargs): """Open URL and return response""" request = urllib2.Request(self.urlname) response = urllib2.urlopen(request, *args, **kwargs) self._headers.update( {k.lower(): v for k, v in response.headers.items()} ) return response
@property def name(self): """URL basename""" return pathlib.PurePosixPath(self.urlname).name @property def netloc(self): """URL network location""" return self._components.netloc @property def parent(self): """URL parent path as a ``URL`` object""" paths = url_split(self.urlname)[:-1] return URL.from_parts(paths) @property def parents(self): """URL parents as a list of ``URL`` objects""" paths = url_split(self.urlname) return [URL.from_parts(paths[:i]) for i in range(len(paths) - 1, 0, -1)] @property def parts(self): """URL parts as a tuple""" paths = url_split(self._components.path) return (self.scheme, self.netloc, *paths) @property def path(self): """URL path""" return self._components.path @property def scheme(self): """URL scheme""" return self._components.scheme + "://" @property def stem(self): """URL stem""" return pathlib.PurePosixPath(self.urlname).stem @property def _components(self): """ URL parsed into six components using ``urlparse`` """ return urlparse(self.urlname) def __repr__(self): """Representation of the ``URL`` object""" return str(self.urlname) def __str__(self): """String representation of the ``URL`` object""" return str(self.urlname) def __add__(self, other): """Concatenate URL components using the addition operator""" return URL(self.urlname + str(other)) def __div__(self, other): """Join URL components using the division operator""" return self.joinpath(other) def __truediv__(self, other): """Join URL components using the division operator""" return self.joinpath(other)
def compressuser(filename: str | pathlib.Path): """ Tilde-compress a file to be relative to the home directory Parameters ---------- filename: str or pathlib.Path Input filename to tilde-compress """ # attempt to compress filename relative to home directory filename = pathlib.Path(filename).expanduser().absolute() try: relative_to = filename.relative_to(pathlib.Path().home()) except (ValueError, AttributeError) as exc: return filename else: return pathlib.Path("~").joinpath(relative_to) # PURPOSE: get the hash value of a file
[docs] def get_hash( local: str | io.IOBase | pathlib.Path, algorithm: str = "md5", include_algorithm: bool = False, ): """ Get the hash value from a local file or ``BytesIO`` object Parameters ---------- local: obj, str or pathlib.Path ``BytesIO`` object or path to file algorithm: str, default 'md5' Hashing algorithm for checksum validation include_algorithm: bool, default False Include the algorithm name in the returned hash """ # check if open file object or if local file exists if isinstance(local, io.IOBase): # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: value = hashlib.new(algorithm, local.getvalue()).hexdigest() return f"{algorithm}:{value}" if include_algorithm else value else: raise ValueError(f"Invalid hashing algorithm: {algorithm}") elif isinstance(local, (str, pathlib.Path)): # generate checksum hash for local file local = pathlib.Path(local).expanduser() # if file currently doesn't exist, return empty string if not local.exists(): return "" # open the local_file in binary read mode with local.open(mode="rb") as local_buffer: # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: value = hashlib.new(algorithm, local_buffer.read()).hexdigest() return f"{algorithm}:{value}" if include_algorithm else value else: raise ValueError(f"Invalid hashing algorithm: {algorithm}") else: return ""
# PURPOSE: get the git hash value
[docs] def get_git_revision_hash( refname: str = "HEAD", short: bool = False, ): """ Get the ``git`` hash value for a particular reference Parameters ---------- refname: str, default HEAD Symbolic reference name short: bool, default False Return the shorted hash value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath(".git") # build command cmd = ["git", f"--git-dir={gitpath}", "rev-parse"] cmd.append("--short") if short else None cmd.append(refname) # get output with warnings.catch_warnings(): return str(subprocess.check_output(cmd), encoding="utf8").strip()
# PURPOSE: get the current git status
[docs] def get_git_status(): """Get the status of a ``git`` repository as a boolean value""" # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath(".git") # build command cmd = ["git", f"--git-dir={gitpath}", "status", "--porcelain"] with warnings.catch_warnings(): return bool(subprocess.check_output(cmd))
# PURPOSE: recursively split a url path
[docs] def url_split(s: str): """ Recursively split a URL path into a list Parameters ---------- s: str URL string """ head, tail = posixpath.split(str(s)) if head in ("http:", "https:", "ftp:", "s3:"): return (s,) elif head in ("", posixpath.sep): return (tail,) return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments def convert_arg_line_to_args(arg_line): """ Convert file lines to arguments Parameters ---------- arg_line: str Line string containing a single argument and/or comments """ # remove commented lines and after argument comments for arg in re.sub(r"\#(.*?)$", r"", arg_line).split(): if not arg.strip(): continue yield arg # PURPOSE: returns the Unix timestamp value for a formatted date string
[docs] def get_unix_time( time_string: str, format: str = "%Y-%m-%d %H:%M:%S", ): """ Get the Unix timestamp value for a formatted date string Parameters ---------- time_string: str Formatted time string to parse format: str, default '%Y-%m-%d %H:%M:%S' Format for input time string """ try: parsed_time = time.strptime(time_string.rstrip(), format) except (TypeError, ValueError): pass else: return calendar.timegm(parsed_time)
# PURPOSE: make a copy of a file with all system information def copy( source: str | pathlib.Path, destination: str | pathlib.Path, move: bool = False, **kwargs, ): """ Copy or move a file with all system information Parameters ---------- source: str Source file destination: str Copied destination file move: bool, default False Remove the source file """ source = pathlib.Path(source).expanduser().absolute() destination = pathlib.Path(destination).expanduser().absolute() # log source and destination logging.info(f"{str(source)} -->\n\t{str(destination)}") shutil.copyfile(source, destination) shutil.copystat(source, destination) # remove the original file if moving if move: source.unlink() # PURPOSE: make a symbolic link to a file def symlink( source: str | pathlib.Path, destination: str | pathlib.Path, ): """ Create a symbolic link to a file Parameters ---------- source: str or pathlib.Path Source file destination: str or pathlib.Path Symbolic link file """ # verify that source and destination are pathlib.Path objects source = pathlib.Path(source).expanduser().absolute() destination = pathlib.Path(destination).expanduser().absolute() # verify conditions for creating symbolic link if source == destination: # skip if symlink has the same path as source file logging.debug(f"Symbolic link {destination} matches source {source}") return elif destination.exists() and not destination.is_symlink(): # file exists and is not a symbolic link raise FileExistsError(f"Existing file: {destination}") elif destination.is_symlink() and destination.resolve() == source.resolve(): # skip if symlink already points to the source file logging.debug(f"Symbolic link already exists: {destination}") return elif destination.is_symlink() and destination.resolve() != source.resolve(): # remove existing symbolic link if it points to a different file logging.debug(f"Removing existing symbolic link: {destination}") destination.unlink() # make source relative to destination parent if possible if source.is_relative_to(destination.parent): source = source.relative_to(destination.parent) # create new symbolic link logging.info(f"\t--> {destination} (symlink)") destination.symlink_to(source)
[docs] def _create_default_ssl_context() -> ssl.SSLContext: """Creates the default ``SSL`` context""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) _set_ssl_context_options(context) context.options |= ssl.OP_NO_COMPRESSION return context
[docs] def _create_ssl_context_no_verify() -> ssl.SSLContext: """Creates an ``SSL`` context for unverified connections""" context = _create_default_ssl_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context
[docs] def _set_ssl_context_options(context: ssl.SSLContext) -> None: """Sets the default options for the ``SSL`` context""" if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7): context.minimum_version = ssl.TLSVersion.TLSv1_2 else: context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_TLSv1 context.options |= ssl.OP_NO_TLSv1_1
# default ssl context _default_ssl_context = _create_ssl_context_no_verify() # PURPOSE: check connection with http host
[docs] def check_connection( HOST: str, context: ssl.SSLContext = _default_ssl_context, timeout: int = 20, ): """ Check internet connection with ``http`` host Parameters ---------- HOST: str Remote ``http`` host context: obj, default IceAdvect.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object timeout: int, default 20 Timeout in seconds for blocking operations """ # attempt to connect to http host try: urllib2.urlopen(HOST, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: return True
# PURPOSE: list a directory on an Apache http Server
[docs] def http_list( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, parser=lxml.etree.HTMLParser(), format: str = "%Y-%m-%d %H:%M", pattern: str = "", sort: bool = False, **kwargs, ): """ List a directory on an Apache ``http`` Server Parameters ---------- HOST: str or list Remote ``http`` host path timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default IceAdvect.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object parser: obj, default lxml.etree.HTMLParser() ``HTML`` parser for ``lxml`` format: str, default '%Y-%m-%d %H:%M' Format for input time string pattern: str, default '' Regular expression pattern for reducing list sort: bool, default False Sort output list Returns ------- colnames: list Column names in a directory collastmod: list Last modification times for items in the directory """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST), **kwargs) response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = tree.xpath("//tr/td[not(@*)]//a/@href") # get the Unix timestamp value for a modification time collastmod = [ get_unix_time(i, format=format) for i in tree.xpath('//tr/td[@align="right"][1]/text()') ] # reduce using regular expression pattern if pattern: i = [i for i, f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, collastmod)
# PURPOSE: download a file from a http host
[docs] def from_http( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, local: str | pathlib.Path | None = None, hash: str = "", chunk: int = 16384, headers: dict = {}, verbose: bool = False, fid: object = sys.stdout, label: str | None = None, mode: oct = 0o775, **kwargs, ): """ Download a file from a ``http`` host Parameters ---------- HOST: str or list Remote ``http`` host path split as list timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default IceAdvect.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object local: str, pathlib.Path or NoneType, default None Path to local file hash: str, default '' ``MD5`` hash of local file chunk: int, default 16384 Chunk size for transfer encoding headers: dict, default {} Dictionary of headers to append from URL request verbose: bool, default False Print file transfer information fid: object, default sys.stdout Open file object for logging file transfers if verbose label: str or None, default None Label for logging file transfer information if verbose mode: oct, default 0o775 Permissions mode of output local file Returns ------- remote_buffer: obj ``BytesIO`` representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # set default label for logging if label is None: label = f"{posixpath.join(*HOST)} -->\n\t{local}" # try downloading from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST), **kwargs) response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() shutil.copyfileobj(response, remote_buffer, chunk) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # copy headers from response headers.update({k.lower(): v for k, v in response.getheaders()}) # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # compare checksums if local and (hash != remote_hash): # convert to absolute path local = pathlib.Path(local).expanduser().absolute() # create directory if non-existent local.parent.mkdir(mode=mode, parents=True, exist_ok=True) # print file information logging.info(label) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with local.open(mode="wb") as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode local.chmod(mode) # return the bytesIO object remote_buffer.seek(0) return remote_buffer
# PURPOSE: load a JSON response from a http host
[docs] def from_json( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, headers: dict = {}, ) -> dict: """ Load a ``JSON`` response from a ``http`` host Parameters ---------- HOST: str or list Remote ``http`` host path split as list timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default IceAdvect.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object headers: dict, default {} Dictionary of headers to append from URL request Returns ------- json_response: dict ``JSON`` response """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try loading JSON from http try: # Create and submit request for JSON response request = urllib2.Request(posixpath.join(*HOST)) request.add_header("Accept", "application/json") response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # copy headers from response headers.update({k.lower(): v for k, v in response.getheaders()}) # load JSON response json_response = json.loads(response.read()) return json_response