Source code for snakemake.conda

import os
import re
import subprocess
import tempfile
from urllib.request import urlopen
from urllib.parse import urlparse
from urllib.error import URLError
import hashlib
import shutil
from distutils.version import StrictVersion
import json
from glob import glob
import tarfile
import uuid

from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError
from snakemake.logging import logger
from snakemake.common import strip_prefix
from snakemake import utils
from snakemake import singularity
from snakemake.io import git_content


[docs]def content(env_file): if env_file.startswith("git+file:"): return git_content(env_file).encode("utf-8") elif urlparse(env_file).scheme: try: return urlopen(env_file).read() except URLError as e: raise WorkflowError( "Failed to open environment file {}:".format(env_file), e ) else: if not os.path.exists(env_file): raise WorkflowError("Conda env file does not " "exist: {}".format(env_file)) with open(env_file, "rb") as f: return f.read()
[docs]class Env: """Conda environment from a given specification file.""" def __init__(self, env_file, dag, singularity_img=None): self.file = env_file self._env_dir = dag.workflow.persistence.conda_env_path self._env_archive_dir = dag.workflow.persistence.conda_env_archive_path self._hash = None self._content_hash = None self._content = None self._path = None self._archive_file = None self._singularity_img = singularity_img @property def singularity_img_url(self): return self._singularity_img.url if self._singularity_img else None @property def content(self): if self._content is None: self._content = content(self.file) return self._content @property def hash(self): if self._hash is None: md5hash = hashlib.md5() # Include the absolute path of the target env dir into the hash. # By this, moving the working directory around automatically # invalidates all environments. This is necessary, because binaries # in conda environments can contain hardcoded absolute RPATHs. assert os.path.isabs(self._env_dir) md5hash.update(self._env_dir.encode()) if self._singularity_img: md5hash.update(self._singularity_img.url.encode()) md5hash.update(self.content) self._hash = md5hash.hexdigest() return self._hash @property def content_hash(self): if self._content_hash is None: md5hash = hashlib.md5() md5hash.update(self.content) self._content_hash = md5hash.hexdigest() return self._content_hash @property def path(self): """Path to directory of the conda environment. First tries full hash, if it does not exist, (8-prefix) is used as default. """ hash = self.hash env_dir = self._env_dir for h in [hash, hash[:8]]: path = os.path.join(env_dir, h) if os.path.exists(path): return path return path @property def archive_file(self): """Path to archive of the conda environment, which may or may not exist.""" if self._archive_file is None: self._archive_file = os.path.join(self._env_archive_dir, self.content_hash) return self._archive_file
[docs] def create_archive(self): """Create self-contained archive of environment.""" from snakemake.shell import shell try: import yaml except ImportError: raise WorkflowError( "Error importing PyYAML. " "Please install PyYAML to archive workflows." ) # importing requests locally because it interferes with instantiating conda environments import requests env_archive = self.archive_file if os.path.exists(env_archive): return env_archive try: # Download logger.info( "Downloading packages for conda environment {}...".format(self.file) ) os.makedirs(env_archive, exist_ok=True) try: out = shell.check_output( "conda list --explicit --prefix '{}'".format(self.path), stderr=subprocess.STDOUT, ) logger.debug(out.decode()) except subprocess.CalledProcessError as e: raise WorkflowError( "Error exporting conda packages:\n" + e.output.decode() ) with open(os.path.join(env_archive, "packages.txt"), "w") as pkg_list: for l in out.decode().split("\n"): if l and not l.startswith("#") and not l.startswith("@"): pkg_url = l logger.info(pkg_url) parsed = urlparse(pkg_url) pkg_name = os.path.basename(parsed.path) # write package name to list print(pkg_name, file=pkg_list) # download package pkg_path = os.path.join(env_archive, pkg_name) with open(pkg_path, "wb") as copy: r = requests.get(pkg_url) r.raise_for_status() copy.write(r.content) try: tarfile.open(pkg_path) except: raise WorkflowError( "Package is invalid tar archive: {}".format(pkg_url) ) except ( requests.exceptions.ChunkedEncodingError, requests.exceptions.HTTPError, ) as e: shutil.rmtree(env_archive) raise WorkflowError("Error downloading conda package {}.".format(pkg_url)) except (Exception, BaseException) as e: shutil.rmtree(env_archive) raise e return env_archive
[docs] def create(self, dryrun=False): """ Create the conda enviroment.""" from snakemake.shell import shell # Read env file and create hash. env_file = self.file tmp_file = None url_scheme, *_ = urlparse(env_file) if (url_scheme and not url_scheme == "file") or ( not url_scheme and env_file.startswith("git+file:/") ): with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp: tmp.write(self.content) env_file = tmp.name tmp_file = tmp.name env_hash = self.hash env_path = self.path # Check for broken environment if os.path.exists( os.path.join(env_path, "env_setup_start") ) and not os.path.exists(os.path.join(env_path, "env_setup_done")): if dryrun: logger.info( "Incomplete Conda environment {} will be recreated.".format( utils.simplify_path(self.file) ) ) else: logger.info( "Removing incomplete Conda environment {}...".format( utils.simplify_path(self.file) ) ) shutil.rmtree(env_path, ignore_errors=True) # Create environment if not already present. if not os.path.exists(env_path): if dryrun: logger.info( "Conda environment {} will be created.".format( utils.simplify_path(self.file) ) ) return env_path conda = Conda(self._singularity_img) logger.info( "Creating conda environment {}...".format( utils.simplify_path(self.file) ) ) # Check if env archive exists. Use that if present. env_archive = self.archive_file try: # Touch "start" flag file os.makedirs(env_path, exist_ok=True) with open(os.path.join(env_path, "env_setup_start"), "a") as f: pass if os.path.exists(env_archive): logger.info("Installing archived conda packages.") pkg_list = os.path.join(env_archive, "packages.txt") if os.path.exists(pkg_list): # read pacakges in correct order # this is for newer env archives where the package list # was stored packages = [ os.path.join(env_archive, pkg.rstrip()) for pkg in open(pkg_list) ] else: # guess order packages = glob(os.path.join(env_archive, "*.tar.bz2")) # install packages manually from env archive cmd = " ".join( ["conda", "create", "--copy", "--prefix '{}'".format(env_path)] + packages ) if self._singularity_img: cmd = singularity.shellcmd( self._singularity_img.path, cmd, envvars=self.get_singularity_envvars(), ) out = shell.check_output(cmd, stderr=subprocess.STDOUT) else: # Copy env file to env_path (because they can be on # different volumes and singularity should only mount one). # In addition, this allows to immediately see what an # environment in .snakemake/conda contains. target_env_file = env_path + ".yaml" shutil.copy(env_file, target_env_file) logger.info("Downloading and installing remote packages.") cmd = " ".join( [ "conda", "env", "create", "--file '{}'".format(target_env_file), "--prefix '{}'".format(env_path), ] ) if self._singularity_img: cmd = singularity.shellcmd( self._singularity_img.path, cmd, envvars=self.get_singularity_envvars(), ) out = shell.check_output(cmd, stderr=subprocess.STDOUT) # Touch "done" flag file with open(os.path.join(env_path, "env_setup_done"), "a") as f: pass logger.debug(out.decode()) logger.info( "Environment for {} created (location: {})".format( os.path.relpath(env_file), os.path.relpath(env_path) ) ) except subprocess.CalledProcessError as e: # remove potential partially installed environment shutil.rmtree(env_path, ignore_errors=True) raise CreateCondaEnvironmentException( "Could not create conda environment from {}:\n".format(env_file) + e.output.decode() ) if tmp_file: # temporary file was created os.remove(tmp_file) return env_path
[docs] @classmethod def get_singularity_envvars(self): return {"CONDA_PKGS_DIRS": "/tmp/conda/{}".format(uuid.uuid4())}
def __hash__(self): # this hash is only for object comparison, not for env paths return hash(self.file) def __eq__(self, other): if isinstance(other, Env): return self.file == other.file return False
[docs]class Conda: instances = dict() def __new__(cls, singularity_img=None): if singularity_img not in cls.instances: inst = super().__new__(cls) inst.__init__(singularity_img=singularity_img) cls.instances[singularity_img] = inst return inst else: return cls.instances[singularity_img] def __init__(self, singularity_img=None): from snakemake.shell import shell from snakemake import singularity if isinstance(singularity_img, singularity.Image): singularity_img = singularity_img.path self.singularity_img = singularity_img self._check() self.info = json.loads(shell.check_output(self._get_cmd("conda info --json"))) def _get_cmd(self, cmd): if self.singularity_img: return singularity.shellcmd(self.singularity_img, cmd) return cmd def _check(self): from snakemake.shell import shell try: # Use type here since conda now is a function. # type allows to check for both functions and regular commands. shell.check_output(self._get_cmd("type conda"), stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: if self.singularity_img: raise CreateCondaEnvironmentException( "The 'conda' command is not " "available inside " "your singularity container " "image. Snakemake mounts " "your conda installation " "into singularity. " "Sometimes, this can fail " "because of shell restrictions. " "It has been tested to work " "with docker://ubuntu, but " "it e.g. fails with " "docker://bash " ) else: raise CreateCondaEnvironmentException( "The 'conda' command is not " "available in the " "shell {} that will be " "used by Snakemake. You have " "to ensure that it is in your " "PATH, e.g., first activating " "the conda base environment " "with `conda activate base`.".format(shell.get_executable()) ) try: version = ( shell.check_output( self._get_cmd("conda --version"), stderr=subprocess.STDOUT ) .decode() .split()[1] ) if StrictVersion(version) < StrictVersion("4.2"): raise CreateCondaEnvironmentException( "Conda must be version 4.2 or later." ) except subprocess.CalledProcessError as e: raise CreateCondaEnvironmentException( "Unable to check conda version:\n" + e.output.decode() )
[docs] def prefix_path(self): return self.info["conda_prefix"]
[docs] def bin_path(self): return os.path.join(self.prefix_path(), "bin")
[docs] def shellcmd(self, env_path, cmd): from snakemake.shell import shell # get path to activate script activate = os.path.join(self.bin_path(), "activate") return "source {} '{}'; {}".format(activate, env_path, cmd)