Source code for snakemake.shell

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2021, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

import _io
import sys
import os
import subprocess as sp
import inspect
import shutil
import stat
import tempfile
import threading

from snakemake.utils import format, argvquote, cmd_exe_quote, find_bash_on_windows
from snakemake.common import ON_WINDOWS, RULEFUNC_CONTEXT_MARKER
from snakemake.logging import logger
from snakemake.deployment import singularity
from snakemake.deployment.conda import Conda
from snakemake.exceptions import WorkflowError


__author__ = "Johannes Köster"

STDOUT = sys.stdout
if not isinstance(sys.stdout, _io.TextIOWrapper):
    # workaround for nosetest since it overwrites sys.stdout
    # in a strange way that does not work with Popen
    STDOUT = None


# There is a max length for a command executed as well as a maximum
# length for each argument passed to a command. The latter impacts us
# especially when doing `sh -c 'long script from user'`. On Linux, it's
# hardcoded in the kernel as 32 pages, or 128kB. On OSX it appears to be
# close to `getconf ARG_MAX`, about 253kb.
MAX_ARG_LEN = 16 * 4096 - 1


[docs]class shell: _process_args = {} _process_prefix = "" _process_suffix = "" _lock = threading.Lock() _processes = {} _win_command_prefix = "" conda_block_conflicting_envvars = True
[docs] @classmethod def get_executable(cls): return cls._process_args.get("executable", None)
[docs] @classmethod def check_output(cls, cmd, **kwargs): executable = cls.get_executable() if ON_WINDOWS and executable: cmd = '"{}" {} {}'.format( executable, cls._win_command_prefix, argvquote(cmd) ) return sp.check_output(cmd, shell=False, executable=executable, **kwargs) else: return sp.check_output(cmd, shell=True, executable=executable, **kwargs)
[docs] @classmethod def executable(cls, cmd): if cmd and not os.path.isabs(cmd): # always enforce absolute path cmd = shutil.which(cmd) if not cmd: raise WorkflowError( "Cannot set default shell {} because it " "is not available in your " "PATH.".format(cmd) ) if ON_WINDOWS: if cmd is None: cls._process_prefix = "" cls._win_command_prefix = "" elif os.path.split(cmd)[-1].lower() in ("bash", "bash.exe"): if cmd == r"C:\Windows\System32\bash.exe": raise WorkflowError( "Cannot use WSL bash.exe on Windows. Ensure that you have " "a usable bash.exe availble on your path." ) cls._process_prefix = "set -euo pipefail; " cls._win_command_prefix = "-c" elif os.path.split(cmd)[-1].lower() == "bash": cls._process_prefix = "set -euo pipefail; " cls._process_args["executable"] = cmd
[docs] @classmethod def prefix(cls, prefix): cls._process_prefix = format(prefix, stepout=2)
[docs] @classmethod def suffix(cls, suffix): cls._process_suffix = format(suffix, stepout=2)
[docs] @classmethod def win_command_prefix(cls, cmd): """The command prefix used on windows when specifing a explicit shell executable. This would be "-c" for bash. Note: that if no explicit executable is set commands are executed with Popen(..., shell=True) which uses COMSPEC on windows where this is not needed. """ cls._win_command_prefix = cmd
[docs] @classmethod def kill(cls, jobid): with cls._lock: if jobid in cls._processes: cls._processes[jobid].kill() del cls._processes[jobid]
[docs] @classmethod def cleanup(cls): with cls._lock: cls._processes.clear()
def __new__( cls, cmd, *args, iterable=False, read=False, bench_record=None, **kwargs ): if "stepout" in kwargs: raise KeyError("Argument stepout is not allowed in shell command.") if ON_WINDOWS and not cls.get_executable(): # If bash is not used on Windows quoting must be handled in a special way kwargs["quote_func"] = cmd_exe_quote cmd = format(cmd, *args, stepout=2, **kwargs) stdout = sp.PIPE if iterable or read else STDOUT close_fds = sys.platform != "win32" func_context = inspect.currentframe().f_back.f_locals if func_context.get(RULEFUNC_CONTEXT_MARKER): # If this comes from a rule, we expect certain information to be passed # implicitly via the rule func context, which is added here. context = func_context else: # Otherwise, context is just filled via kwargs. context = dict() # add kwargs to context (overwriting the locals of the caller) context.update(kwargs) jobid = context.get("jobid") if not context.get("is_shell"): logger.shellcmd(cmd) conda_env = context.get("conda_env", None) conda_base_path = context.get("conda_base_path", None) container_img = context.get("container_img", None) env_modules = context.get("env_modules", None) shadow_dir = context.get("shadow_dir", None) resources = context.get("resources", {}) singularity_args = context.get("singularity_args", "") threads = context.get("threads", 1) cmd = " ".join((cls._process_prefix, cmd, cls._process_suffix)).strip() if env_modules: cmd = env_modules.shellcmd(cmd) logger.info("Activating environment modules: {}".format(env_modules)) if conda_env: if ON_WINDOWS and not cls.get_executable(): # If we use cmd.exe directly on winodws we need to prepend batch activation script. cmd = Conda(container_img, prefix_path=conda_base_path).shellcmd_win( conda_env, cmd ) else: cmd = Conda(container_img, prefix_path=conda_base_path).shellcmd( conda_env, cmd ) tmpdir = None if len(cmd.replace("'", r"'\''")) + 2 > MAX_ARG_LEN: tmpdir = tempfile.mkdtemp(dir=".snakemake", prefix="shell_tmp.") script = os.path.join(os.path.abspath(tmpdir), "script.sh") with open(script, "w") as script_fd: print(cmd, file=script_fd) os.chmod(script, os.stat(script).st_mode | stat.S_IXUSR | stat.S_IRUSR) cmd = '"{}" "{}"'.format(cls.get_executable() or "/bin/sh", script) if container_img: cmd = singularity.shellcmd( container_img, cmd, singularity_args, envvars=None, shell_executable=cls._process_args["executable"], container_workdir=shadow_dir, is_python_script=context.get("is_python_script", False), ) logger.info("Activating singularity image {}".format(container_img)) if conda_env: logger.info("Activating conda environment: {}".format(conda_env)) tmpdir_resource = resources.get("tmpdir", None) # environment variable lists for linear algebra libraries taken from: # https://stackoverflow.com/a/53224849/2352071 # https://github.com/xianyi/OpenBLAS/tree/59243d49ab8e958bb3872f16a7c0ef8c04067c0a#setting-the-number-of-threads-using-environment-variables envvars = dict(os.environ) threads = str(threads) envvars["OMP_NUM_THREADS"] = threads envvars["GOTO_NUM_THREADS"] = threads envvars["OPENBLAS_NUM_THREADS"] = threads envvars["MKL_NUM_THREADS"] = threads envvars["VECLIB_MAXIMUM_THREADS"] = threads envvars["NUMEXPR_NUM_THREADS"] = threads if tmpdir_resource: envvars["TMPDIR"] = tmpdir_resource envvars["TMP"] = tmpdir_resource envvars["TEMPDIR"] = tmpdir_resource envvars["TEMP"] = tmpdir_resource if "additional_envvars" in kwargs: env = kwargs["additional_envvars"] if not isinstance(env, dict) or not all( isinstance(v, str) for v in env.values() ): raise WorkflowError( "Given environment variables for shell command have to be a dict of strings, " "but the following was provided instead:\n{}".format(env) ) envvars.update(env) if conda_env and cls.conda_block_conflicting_envvars: # remove envvars that conflict with conda for var in ["R_LIBS", "PYTHONPATH", "PERLLIB", "PERL5LIB"]: try: del envvars[var] except KeyError: pass use_shell = True if ON_WINDOWS and cls.get_executable(): # If executable is set on Windows shell mode can not be used # and the executable should be prepended the command together # with a command prefix (e.g. -c for bash). use_shell = False cmd = '"{}" {} {}'.format( cls.get_executable(), cls._win_command_prefix, argvquote(cmd) ) proc = sp.Popen( cmd, bufsize=-1, shell=use_shell, stdout=stdout, universal_newlines=iterable or read or None, close_fds=close_fds, **cls._process_args, env=envvars, ) if jobid is not None: with cls._lock: cls._processes[jobid] = proc ret = None if iterable: return cls.iter_stdout(proc, cmd, tmpdir) if read: ret = proc.stdout.read() if bench_record is not None: from snakemake.benchmark import benchmarked with benchmarked(proc.pid, bench_record): retcode = proc.wait() else: retcode = proc.wait() if tmpdir: shutil.rmtree(tmpdir) if jobid is not None: with cls._lock: del cls._processes[jobid] if retcode: raise sp.CalledProcessError(retcode, cmd) return ret
[docs] @staticmethod def iter_stdout(proc, cmd, tmpdir): for l in proc.stdout: yield l[:-1] retcode = proc.wait() if tmpdir: shutil.rmtree(tmpdir) if retcode: raise sp.CalledProcessError(retcode, cmd)
# set bash as default shell on posix compatible OS if os.name == "posix": if not shutil.which("bash"): logger.warning( "Cannot set bash as default shell because it is not " "available in your PATH. Falling back to sh." ) if not shutil.which("sh"): logger.warning( "Cannot fall back to sh since it seems to be not " "available on this system. Using whatever is " "defined as default." ) else: shell.executable("sh") else: shell.executable("bash") elif ON_WINDOWS: shell.executable(None)