Source code for snakemake.persistence

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2021, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

import os
import shutil
import signal
import marshal
import pickle
import json
import time
from base64 import urlsafe_b64encode, b64encode
from functools import lru_cache, partial
from itertools import filterfalse, count
from pathlib import Path

from snakemake.logging import logger
from snakemake.jobs import jobfiles
from snakemake.utils import listfiles


[docs]class Persistence: def __init__( self, nolock=False, dag=None, conda_prefix=None, singularity_prefix=None, shadow_prefix=None, warn_only=False, ): self.path = os.path.abspath(".snakemake") if not os.path.exists(self.path): os.mkdir(self.path) self._lockdir = os.path.join(self.path, "locks") if not os.path.exists(self._lockdir): os.mkdir(self._lockdir) self.dag = dag self._lockfile = dict() self._metadata_path = os.path.join(self.path, "metadata") self._incomplete_path = os.path.join(self.path, "incomplete") self.conda_env_archive_path = os.path.join(self.path, "conda-archive") self.benchmark_path = os.path.join(self.path, "benchmarks") self.source_cache = os.path.join(self.path, "source_cache") if conda_prefix is None: self.conda_env_path = os.path.join(self.path, "conda") else: self.conda_env_path = os.path.abspath(os.path.expanduser(conda_prefix)) if singularity_prefix is None: self.container_img_path = os.path.join(self.path, "singularity") else: self.container_img_path = os.path.abspath( os.path.expanduser(singularity_prefix) ) if shadow_prefix is None: self.shadow_path = os.path.join(self.path, "shadow") else: self.shadow_path = os.path.join(shadow_prefix, "shadow") # place to store any auxiliary information needed during a run (e.g. source tarballs) self.aux_path = os.path.join(self.path, "auxiliary") # migration of .snakemake folder structure migration_indicator = Path( os.path.join(self._incomplete_path, "migration_underway") ) if ( os.path.exists(self._metadata_path) and not os.path.exists(self._incomplete_path) ) or migration_indicator.exists(): os.makedirs(self._incomplete_path, exist_ok=True) migration_indicator.touch() self.migrate_v1_to_v2() migration_indicator.unlink() self._incomplete_cache = None for d in ( self._metadata_path, self._incomplete_path, self.shadow_path, self.conda_env_archive_path, self.conda_env_path, self.container_img_path, self.aux_path, ): os.makedirs(d, exist_ok=True) if nolock: self.lock = self.noop self.unlock = self.noop if warn_only: self.lock = self.lock_warn_only self.unlock = self.noop self._read_record = self._read_record_cached
[docs] def migrate_v1_to_v2(self): logger.info("Migrating .snakemake folder to new format...") i = 0 for path, _, filenames in os.walk(self._metadata_path): path = Path(path) for filename in filenames: with open(path / filename, "r") as f: try: record = json.load(f) except json.JSONDecodeError: continue # not a properly formatted JSON file if record.get("incomplete", False): target_path = Path(self._incomplete_path) / path.relative_to( self._metadata_path ) os.makedirs(target_path, exist_ok=True) shutil.copyfile( path / filename, target_path / filename, ) i += 1 # this can take a while for large folders... if (i % 10000) == 0 and i > 0: logger.info("{} files migrated".format(i)) logger.info("Migration complete")
@property def files(self): if self._files is None: self._files = set(self.dag.output_files) return self._files @property def locked(self): inputfiles = set(self.all_inputfiles()) outputfiles = set(self.all_outputfiles()) if os.path.exists(self._lockdir): for lockfile in self._locks("input"): with open(lockfile) as lock: for f in lock: f = f.strip() if f in outputfiles: return True for lockfile in self._locks("output"): with open(lockfile) as lock: for f in lock: f = f.strip() if f in outputfiles or f in inputfiles: return True return False
[docs] def lock_warn_only(self): if self.locked: logger.info( "Error: Directory cannot be locked. This usually " "means that another Snakemake instance is running on this directory. " "Another possibility is that a previous run exited unexpectedly." )
[docs] def lock(self): if self.locked: raise IOError("Another snakemake process " "has locked this directory.") self._lock(self.all_inputfiles(), "input") self._lock(self.all_outputfiles(), "output")
[docs] def unlock(self, *args): logger.debug("unlocking") for lockfile in self._lockfile.values(): try: logger.debug("removing lock") os.remove(lockfile) except OSError as e: if e.errno != 2: # missing file raise e logger.debug("removed all locks")
[docs] def cleanup_locks(self): shutil.rmtree(self._lockdir)
[docs] def cleanup_metadata(self, path): self._delete_record(self._metadata_path, path)
[docs] def cleanup_shadow(self): if os.path.exists(self.shadow_path): shutil.rmtree(self.shadow_path) os.mkdir(self.shadow_path)
[docs] def conda_cleanup_envs(self): # cleanup envs in_use = set(env.hash[:8] for env in self.dag.conda_envs.values()) for d in os.listdir(self.conda_env_path): if len(d) >= 8 and d[:8] not in in_use: if os.path.isdir(os.path.join(self.conda_env_path, d)): shutil.rmtree(os.path.join(self.conda_env_path, d)) else: os.remove(os.path.join(self.conda_env_path, d)) # cleanup env archives in_use = set(env.content_hash for env in self.dag.conda_envs.values()) for d in os.listdir(self.conda_env_archive_path): if d not in in_use: shutil.rmtree(os.path.join(self.conda_env_archive_path, d))
[docs] def started(self, job, external_jobid=None): for f in job.output: self._record( self._incomplete_path, {"external_jobid": external_jobid}, f, )
[docs] def finished(self, job, keep_metadata=True): if not keep_metadata: for f in job.expanded_output: self._delete_record(self._incomplete_path, f) return version = str(job.rule.version) if job.rule.version is not None else None code = self._code(job.rule) input = self._input(job) log = self._log(job) params = self._params(job) shellcmd = job.shellcmd conda_env = self._conda_env(job) fallback_time = time.time() for f in job.expanded_output: rec_path = self._record_path(self._incomplete_path, f) starttime = os.path.getmtime(rec_path) if os.path.exists(rec_path) else None # Sometimes finished is called twice, if so, lookup the previous starttime if not os.path.exists(rec_path): starttime = self._read_record(self._metadata_path, f).get( "starttime", None ) endtime = f.mtime.local_or_remote() if f.exists else fallback_time self._record( self._metadata_path, { "version": version, "code": code, "rule": job.rule.name, "input": input, "log": log, "params": params, "shellcmd": shellcmd, "incomplete": False, "starttime": starttime, "endtime": endtime, "job_hash": hash(job), "conda_env": conda_env, "container_img_url": job.container_img_url, }, f, ) self._delete_record(self._incomplete_path, f)
[docs] def cleanup(self, job): for f in job.expanded_output: self._delete_record(self._incomplete_path, f) self._delete_record(self._metadata_path, f)
[docs] def incomplete(self, job): if self._incomplete_cache is None: self._cache_incomplete_folder() if self._incomplete_cache is False: # cache deactivated def marked_incomplete(f): return self._exists_record(self._incomplete_path, f) else: def marked_incomplete(f): rec_path = self._record_path(self._incomplete_path, f) return rec_path in self._incomplete_cache return any(map(lambda f: f.exists and marked_incomplete(f), job.output))
def _cache_incomplete_folder(self): self._incomplete_cache = { os.path.join(path, f) for path, dirnames, filenames in os.walk(self._incomplete_path) for f in filenames }
[docs] def external_jobids(self, job): return list( set( self._read_record(self._incomplete_path, f).get("external_jobid", None) for f in job.output ) )
[docs] def metadata(self, path): return self._read_record(self._metadata_path, path)
[docs] def version(self, path): return self.metadata(path).get("version")
[docs] def rule(self, path): return self.metadata(path).get("rule")
[docs] def input(self, path): return self.metadata(path).get("input")
[docs] def log(self, path): return self.metadata(path).get("log")
[docs] def shellcmd(self, path): return self.metadata(path).get("shellcmd")
[docs] def params(self, path): return self.metadata(path).get("params")
[docs] def code(self, path): return self.metadata(path).get("code")
[docs] def version_changed(self, job, file=None): """Yields output files with changed versions of bool if file given.""" return _bool_or_gen(self._version_changed, job, file=file)
[docs] def code_changed(self, job, file=None): """Yields output files with changed code of bool if file given.""" return _bool_or_gen(self._code_changed, job, file=file)
[docs] def input_changed(self, job, file=None): """Yields output files with changed input of bool if file given.""" return _bool_or_gen(self._input_changed, job, file=file)
[docs] def params_changed(self, job, file=None): """Yields output files with changed params of bool if file given.""" return _bool_or_gen(self._params_changed, job, file=file)
def _version_changed(self, job, file=None): assert file is not None return self.version(file) != job.rule.version def _code_changed(self, job, file=None): assert file is not None return self.code(file) != self._code(job.rule) def _input_changed(self, job, file=None): assert file is not None return self.input(file) != self._input(job) def _params_changed(self, job, file=None): assert file is not None return self.params(file) != self._params(job)
[docs] def noop(self, *args): pass
def _b64id(self, s): return urlsafe_b64encode(str(s).encode()).decode() @lru_cache() def _code(self, rule): code = rule.run_func.__code__ return b64encode(pickle_code(code)).decode() @lru_cache() def _conda_env(self, job): if job.conda_env: return b64encode(job.conda_env.content).decode() @lru_cache() def _input(self, job): return sorted(job.input) @lru_cache() def _log(self, job): return sorted(job.log) @lru_cache() def _params(self, job): return sorted(map(repr, job.params)) @lru_cache() def _output(self, job): return sorted(job.output) def _record(self, subject, json_value, id): recpath = self._record_path(subject, id) os.makedirs(os.path.dirname(recpath), exist_ok=True) with open(recpath, "w") as f: json.dump(json_value, f) def _delete_record(self, subject, id): try: recpath = self._record_path(subject, id) os.remove(recpath) recdirs = os.path.relpath(os.path.dirname(recpath), start=subject) if recdirs != ".": os.removedirs(recdirs) except OSError as e: if e.errno != 2: # not missing raise e @lru_cache() def _read_record_cached(self, subject, id): return self._read_record_uncached(subject, id) def _read_record_uncached(self, subject, id): if not self._exists_record(subject, id): return dict() with open(self._record_path(subject, id), "r") as f: return json.load(f) def _exists_record(self, subject, id): return os.path.exists(self._record_path(subject, id)) def _locks(self, type): return ( f for f, _ in listfiles( os.path.join(self._lockdir, "{{n,[0-9]+}}.{}.lock".format(type)) ) if not os.path.isdir(f) ) def _lock(self, files, type): for i in count(0): lockfile = os.path.join(self._lockdir, "{}.{}.lock".format(i, type)) if not os.path.exists(lockfile): self._lockfile[type] = lockfile with open(lockfile, "w") as lock: print(*files, sep="\n", file=lock) return def _record_path(self, subject, id): max_len = ( os.pathconf(subject, "PC_NAME_MAX") if os.name == "posix" else 255 ) # maximum NTFS and FAT32 filename length if max_len == 0: max_len = 255 b64id = self._b64id(id) # split into chunks of proper length b64id = [b64id[i : i + max_len - 1] for i in range(0, len(b64id), max_len - 1)] # prepend dirs with @ (does not occur in b64) to avoid conflict with b64-named files in the same dir b64id = ["@" + s for s in b64id[:-1]] + [b64id[-1]] path = os.path.join(subject, *b64id) return path
[docs] def all_outputfiles(self): # we only look at output files that will be updated return jobfiles(self.dag.needrun_jobs, "output")
[docs] def all_inputfiles(self): # we consider all input files, also of not running jobs return jobfiles(self.dag.jobs, "input")
[docs] def deactivate_cache(self): self._read_record_cached.cache_clear() self._read_record = self._read_record_uncached self._incomplete_cache = False
def _bool_or_gen(func, job, file=None): if file is None: return (f for f in job.expanded_output if func(job, file=f)) else: return func(job, file=file)
[docs]def pickle_code(code): consts = [ (pickle_code(const) if type(const) == type(code) else const) for const in code.co_consts ] return pickle.dumps((code.co_code, code.co_varnames, consts, code.co_names))