__author__ = "Johannes Köster"
__copyright__ = "Copyright 2021, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import os
import traceback
import textwrap
from tokenize import TokenError
from snakemake.logging import logger
[docs]def get_exception_origin(ex, linemaps):
for file, lineno, _, _ in reversed(traceback.extract_tb(ex.__traceback__)):
if file in linemaps:
return lineno, file
[docs]def cut_traceback(ex):
snakemake_path = os.path.dirname(__file__)
for line in traceback.extract_tb(ex.__traceback__):
dir = os.path.dirname(line[0])
if not dir:
dir = "."
is_snakemake_dir = lambda path: os.path.realpath(path).startswith(
os.path.realpath(snakemake_path)
)
if not os.path.isdir(dir) or not is_snakemake_dir(dir):
yield line
[docs]def log_verbose_traceback(ex):
tb = "Full " + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
logger.debug(tb)
[docs]def print_exception(ex, linemaps):
"""
Print an error message for a given exception.
Arguments
ex -- the exception
linemaps -- a dict of a dict that maps for each snakefile
the compiled lines to source code lines in the snakefile.
"""
log_verbose_traceback(ex)
if isinstance(ex, SyntaxError) or isinstance(ex, IndentationError):
logger.error(
format_error(
ex,
ex.lineno,
linemaps=linemaps,
snakefile=ex.filename,
show_traceback=True,
)
)
return
origin = get_exception_origin(ex, linemaps)
if origin is not None:
lineno, file = origin
logger.error(
format_error(
ex, lineno, linemaps=linemaps, snakefile=file, show_traceback=True
)
)
return
elif isinstance(ex, TokenError):
logger.error(format_error(ex, None, show_traceback=False))
elif isinstance(ex, MissingRuleException):
logger.error(
format_error(
ex, None, linemaps=linemaps, snakefile=ex.filename, show_traceback=False
)
)
elif isinstance(ex, RuleException):
for e in ex._include + [ex]:
if not e.omit:
logger.error(
format_error(
e,
e.lineno,
linemaps=linemaps,
snakefile=e.filename,
show_traceback=True,
)
)
elif isinstance(ex, WorkflowError):
logger.error(
format_error(
ex,
ex.lineno,
linemaps=linemaps,
snakefile=ex.snakefile,
show_traceback=True,
)
)
elif isinstance(ex, KeyboardInterrupt):
logger.info("Cancelling snakemake on user request.")
else:
traceback.print_exception(type(ex), ex, ex.__traceback__)
[docs]class WorkflowError(Exception):
def __init__(self, *args, lineno=None, snakefile=None, rule=None):
super().__init__("\n".join(self.format_arg(arg) for arg in args))
if rule is not None:
self.lineno = rule.lineno
self.snakefile = rule.snakefile
else:
self.lineno = lineno
self.snakefile = snakefile
self.rule = rule
[docs]class SourceFileError(WorkflowError):
def __init__(self, msg):
super().__init__("Error in source file definition: {}".format(msg))
[docs]class WildcardError(WorkflowError):
pass
[docs]class RuleException(Exception):
"""
Base class for exception occurring within the
execution or definition of rules.
"""
def __init__(
self, message=None, include=None, lineno=None, snakefile=None, rule=None
):
"""
Creates a new instance of RuleException.
Arguments
message -- the exception message
include -- iterable of other exceptions to be included
lineno -- the line the exception originates
snakefile -- the file the exception originates
"""
super(RuleException, self).__init__(message)
self._include = set()
if include:
for ex in include:
self._include.add(ex)
self._include.update(ex._include)
if rule is not None:
if lineno is None:
lineno = rule.lineno
if snakefile is None:
snakefile = rule.snakefile
self._include = list(self._include)
self.lineno = lineno
self.filename = snakefile
self.omit = not message
@property
def messages(self):
return map(str, (ex for ex in self._include + [self] if not ex.omit))
[docs]class ChildIOException(WorkflowError):
def __init__(
self,
parent=None,
child=None,
wildcards=None,
lineno=None,
snakefile=None,
rule=None,
):
msg = "File/directory is a child to another output:\n" + "{}\n{}".format(
parent, child
)
super().__init__(msg, lineno=lineno, snakefile=snakefile, rule=rule)
[docs]class IOException(RuleException):
def __init__(self, prefix, rule, files, include=None, lineno=None, snakefile=None):
message = (
"{} for rule {}:\n{}".format(prefix, rule, "\n".join(files))
if files
else ""
)
super().__init__(
message=message,
include=include,
lineno=lineno,
snakefile=snakefile,
rule=rule,
)
[docs]class MissingOutputException(RuleException):
def __init__(
self,
message=None,
include=None,
lineno=None,
snakefile=None,
rule=None,
jobid="",
):
message = "Job {} completed successfully, but some output files are missing. {}".format(
message, jobid
)
super().__init__(message, include, lineno, snakefile, rule)
[docs]class PeriodicWildcardError(RuleException):
pass
[docs]class ProtectedOutputException(IOException):
def __init__(self, rule, files, include=None, lineno=None, snakefile=None):
super().__init__(
"Write-protected output files",
rule,
files,
include,
lineno=lineno,
snakefile=snakefile,
)
[docs]class ImproperOutputException(IOException):
def __init__(self, rule, files, include=None, lineno=None, snakefile=None):
super().__init__(
"Outputs of incorrect type (directories when expecting files or vice versa). "
"Output directories must be flagged with directory().",
rule,
files,
include,
lineno=lineno,
snakefile=snakefile,
)
[docs]class UnexpectedOutputException(IOException):
def __init__(self, rule, files, include=None, lineno=None, snakefile=None):
super().__init__(
"Unexpectedly present output files "
"(accidentally created by other rule?)",
rule,
files,
include,
lineno=lineno,
snakefile=snakefile,
)
[docs]class ImproperShadowException(RuleException):
def __init__(self, rule, lineno=None, snakefile=None):
super().__init__(
"Rule cannot shadow if using ThreadPoolExecutor",
rule=rule,
lineno=lineno,
snakefile=snakefile,
)
[docs]class AmbiguousRuleException(RuleException):
def __init__(self, filename, job_a, job_b, lineno=None, snakefile=None):
from snakemake import utils
wildcards_a = utils.format("{}", job_a._format_wildcards)
wildcards_b = utils.format("{}", job_b._format_wildcards)
super().__init__(
"Rules {job_a} and {job_b} are ambiguous for the file {f}.\n"
"Consider starting rule output with a unique prefix, constrain "
"your wildcards, or use the ruleorder directive.\n"
"Wildcards:\n"
"\t{job_a}: {wildcards_a}\n"
"\t{job_b}: {wildcards_b}\n"
"Expected input files:\n"
"\t{job_a}: {job_a.input}\n"
"\t{job_b}: {job_b.input}"
"Expected output files:\n"
"\t{job_a}: {job_a.output}\n"
"\t{job_b}: {job_b.output}".format(
job_a=job_a,
job_b=job_b,
f=filename,
wildcards_a=wildcards_a,
wildcards_b=wildcards_b,
),
lineno=lineno,
snakefile=snakefile,
)
self.rule1, self.rule2 = job_a.rule, job_b.rule
[docs]class CyclicGraphException(RuleException):
def __init__(self, repeatedrule, file, rule=None):
super().__init__(
"Cyclic dependency on rule {}.".format(repeatedrule), rule=rule
)
self.file = file
[docs]class MissingRuleException(RuleException):
def __init__(self, file, lineno=None, snakefile=None):
super().__init__(
"No rule to produce {} (if you use input functions make sure that they don't raise unexpected exceptions).".format(
file
),
lineno=lineno,
snakefile=snakefile,
)
[docs]class UnknownRuleException(RuleException):
def __init__(self, name, prefix="", lineno=None, snakefile=None):
msg = "There is no rule named {}.".format(name)
if prefix:
msg = "{} {}".format(prefix, msg)
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class NoRulesException(RuleException):
def __init__(self, lineno=None, snakefile=None):
super().__init__(
"There has to be at least one rule.", lineno=lineno, snakefile=snakefile
)
[docs]class IncompleteFilesException(RuleException):
def __init__(self, files):
super().__init__(
"The files below seem to be incomplete. "
"If you are sure that certain files are not incomplete, "
"mark them as complete with\n\n"
" snakemake --cleanup-metadata <filenames>\n\n"
"To re-generate the files rerun your command with the "
"--rerun-incomplete flag.\nIncomplete files:\n{}".format("\n".join(files))
)
[docs]class IOFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class RemoteFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class HTTPFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class FTPFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class S3FileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class AzureFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class SFTPFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class DropboxFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class XRootDFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class NCBIFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class WebDAVFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)
[docs]class ClusterJobException(RuleException):
def __init__(self, job_info, jobid):
super().__init__(
"Error executing rule {} on cluster (jobid: {}, external: {}, jobscript: {}). "
"For detailed error see the cluster log.".format(
job_info.job.rule.name, jobid, job_info.jobid, job_info.jobscript
),
lineno=job_info.job.rule.lineno,
snakefile=job_info.job.rule.snakefile,
)
[docs]class CreateRuleException(RuleException):
pass
[docs]class TerminatedException(Exception):
pass
[docs]class CreateCondaEnvironmentException(WorkflowError):
pass
[docs]class SpawnedJobError(Exception):
pass
[docs]class CheckSumMismatchException(WorkflowError):
""" "should be called to indicate that checksum of a file compared to known
hash does not match, typically done with large downloads, etc.
"""
pass
[docs]class IncompleteCheckpointException(Exception):
def __init__(self, rule, targetfile):
super().__init__(
"The requested checkpoint output is not yet created."
"If you see this error, you have likely tried to use "
"checkpoint output outside of an input function, or "
"you have tried to call an input function directly "
"via <function_name>(). Please check the docs at "
"https://snakemake.readthedocs.io/en/stable/"
"snakefiles/rules.html#data-dependent-conditional-execution "
"and note that the input function in the example rule "
"'aggregate' is NOT called, but passed to the rule "
"by name, such that Snakemake can call it internally "
"once the checkpoint is finished."
)
self.rule = rule
from snakemake.io import checkpoint_target
self.targetfile = checkpoint_target(targetfile)
[docs]class CacheMissException(Exception):
pass