__author__ = "Johannes Köster"
__contributors__ = ["Soohyun Lee"]
__copyright__ = "Copyright 2015, Johannes Köster"
__email__ = "koester@jimmy.harvard.edu"
__license__ = "MIT"
import os
import subprocess
import glob
from argparse import ArgumentError
import logging as _logging
import re
import sys
import inspect
import threading
import webbrowser
from functools import partial
import importlib
import shutil
from snakemake.workflow import Workflow
from snakemake.exceptions import print_exception, WorkflowError
from snakemake.logging import setup_logger, logger
from snakemake.io import load_configfile
from snakemake.shell import shell
from snakemake.utils import update_config, available_cpu_count
from snakemake.common import Mode, __version__
from snakemake.resources import parse_resources, DefaultResources
SNAKEFILE_CHOICES = [
"Snakefile",
"snakefile",
"workflow/Snakefile",
"workflow/snakefile",
]
[docs]def snakemake(
snakefile,
report=None,
listrules=False,
list_target_rules=False,
cores=1,
nodes=1,
local_cores=1,
resources=dict(),
default_resources=None,
config=dict(),
configfiles=None,
config_args=None,
workdir=None,
targets=None,
dryrun=False,
touch=False,
forcetargets=False,
forceall=False,
forcerun=[],
until=[],
omit_from=[],
prioritytargets=[],
stats=None,
printreason=False,
printshellcmds=False,
debug_dag=False,
printdag=False,
printrulegraph=False,
printfilegraph=False,
printd3dag=False,
nocolor=False,
quiet=False,
keepgoing=False,
cluster=None,
cluster_config=None,
cluster_sync=None,
drmaa=None,
drmaa_log_dir=None,
jobname="snakejob.{rulename}.{jobid}.sh",
immediate_submit=False,
standalone=False,
ignore_ambiguity=False,
snakemakepath=None,
lock=True,
unlock=False,
cleanup_metadata=None,
cleanup_conda=False,
cleanup_shadow=False,
force_incomplete=False,
ignore_incomplete=False,
list_version_changes=False,
list_code_changes=False,
list_input_changes=False,
list_params_changes=False,
list_untracked=False,
list_resources=False,
summary=False,
archive=None,
delete_all_output=False,
delete_temp_output=False,
detailed_summary=False,
latency_wait=3,
wait_for_files=None,
print_compilation=False,
debug=False,
notemp=False,
keep_remote_local=False,
nodeps=False,
keep_target_files=False,
allowed_rules=None,
jobscript=None,
greediness=None,
no_hooks=False,
overwrite_shellcmd=None,
updated_files=None,
log_handler=None,
keep_logger=False,
max_jobs_per_second=None,
max_status_checks_per_second=100,
restart_times=0,
attempt=1,
verbose=False,
force_use_threads=False,
use_conda=False,
use_singularity=False,
singularity_args="",
conda_prefix=None,
list_conda_envs=False,
singularity_prefix=None,
shadow_prefix=None,
create_envs_only=False,
mode=Mode.default,
wrapper_prefix=None,
kubernetes=None,
kubernetes_envvars=None,
container_image=None,
tibanna=False,
tibanna_sfn=None,
precommand="",
default_remote_provider=None,
default_remote_prefix="",
assume_shared_fs=True,
cluster_status=None,
export_cwl=None,
):
"""Run snakemake on a given snakefile.
This function provides access to the whole snakemake functionality. It is not thread-safe.
Args:
snakefile (str): the path to the snakefile
report (str): create an HTML report for a previous run at the given path
listrules (bool): list rules (default False)
list_target_rules (bool): list target rules (default False)
cores (int): the number of provided cores (ignored when using cluster support) (default 1)
nodes (int): the number of provided cluster nodes (ignored without cluster support) (default 1)
local_cores (int): the number of provided local cores if in cluster mode (ignored without cluster support) (default 1)
resources (dict): provided resources, a dictionary assigning integers to resource names, e.g. {gpu=1, io=5} (default {})
default_resources (DefaultResources): default values for resources not defined in rules (default None)
config (dict): override values for workflow config
workdir (str): path to working directory (default None)
targets (list): list of targets, e.g. rule or file names (default None)
dryrun (bool): only dry-run the workflow (default False)
touch (bool): only touch all output files if present (default False)
forcetargets (bool): force given targets to be re-created (default False)
forceall (bool): force all output files to be re-created (default False)
forcerun (list): list of files and rules that shall be re-created/re-executed (default [])
prioritytargets (list): list of targets that shall be run with maximum priority (default [])
stats (str): path to file that shall contain stats about the workflow execution (default None)
printreason (bool): print the reason for the execution of each job (default false)
printshellcmds (bool): print the shell command of each job (default False)
printdag (bool): print the dag in the graphviz dot language (default False)
printrulegraph (bool): print the graph of rules in the graphviz dot language (default False)
printfilegraph (bool): print the graph of rules with their input and output files in the graphviz dot language (default False)
printd3dag (bool): print a D3.js compatible JSON representation of the DAG (default False)
nocolor (bool): do not print colored output (default False)
quiet (bool): do not print any default job information (default False)
keepgoing (bool): keep goind upon errors (default False)
cluster (str): submission command of a cluster or batch system to use, e.g. qsub (default None)
cluster_config (str,list): configuration file for cluster options, or list thereof (default None)
cluster_sync (str): blocking cluster submission command (like SGE 'qsub -sync y') (default None)
drmaa (str): if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job
drmaa_log_dir (str): the path to stdout and stderr output of DRMAA jobs (default None)
jobname (str): naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh")
immediate_submit (bool): immediately submit all cluster jobs, regardless of dependencies (default False)
standalone (bool): kill all processes very rudely in case of failure (do not use this if you use this API) (default False) (deprecated)
ignore_ambiguity (bool): ignore ambiguous rules and always take the first possible one (default False)
snakemakepath (str): deprecated parameter whose value is ignored. Do not use.
lock (bool): lock the working directory when executing the workflow (default True)
unlock (bool): just unlock the working directory (default False)
cleanup_metadata (list): just cleanup metadata of given list of output files (default None)
cleanup_conda (bool): just cleanup unused conda environments (default False)
cleanup_shadow (bool): just cleanup old shadow directories (default False)
force_incomplete (bool): force the re-creation of incomplete files (default False)
ignore_incomplete (bool): ignore incomplete files (default False)
list_version_changes (bool): list output files with changed rule version (default False)
list_code_changes (bool): list output files with changed rule code (default False)
list_input_changes (bool): list output files with changed input files (default False)
list_params_changes (bool): list output files with changed params (default False)
list_untracked (bool): list files in the workdir that are not used in the workflow (default False)
summary (bool): list summary of all output files and their status (default False)
archive (str): archive workflow into the given tarball
delete_all_output (bool) remove all files generated by the workflow (default False)
delete_temp_output (bool) remove all temporary files generated by the workflow (default False)
latency_wait (int): how many seconds to wait for an output file to appear after the execution of a job, e.g. to handle filesystem latency (default 3)
wait_for_files (list): wait for given files to be present before executing the workflow
list_resources (bool): list resources used in the workflow (default False)
summary (bool): list summary of all output files and their status (default False). If no option is specified a basic summary will be ouput. If 'detailed' is added as an option e.g --summary detailed, extra info about the input and shell commands will be included
detailed_summary (bool): list summary of all input and output files and their status (default False)
print_compilation (bool): print the compilation of the snakefile (default False)
debug (bool): allow to use the debugger within rules
notemp (bool): ignore temp file flags, e.g. do not delete output files marked as temp after use (default False)
keep_remote_local (bool): keep local copies of remote files (default False)
nodeps (bool): ignore dependencies (default False)
keep_target_files (bool): do not adjust the paths of given target files relative to the working directory.
allowed_rules (set): restrict allowed rules to the given set. If None or empty, all rules are used.
jobscript (str): path to a custom shell script template for cluster jobs (default None)
greediness (float): set the greediness of scheduling. This value between 0 and 1 determines how careful jobs are selected for execution. The default value (0.5 if prioritytargets are used, 1.0 else) provides the best speed and still acceptable scheduling quality.
overwrite_shellcmd (str): a shell command that shall be executed instead of those given in the workflow. This is for debugging purposes only.
updated_files(list): a list that will be filled with the files that are updated or created during the workflow execution
verbose (bool): show additional debug output (default False)
max_jobs_per_second (int): maximal number of cluster/drmaa jobs per second, None to impose no limit (default None)
restart_times (int): number of times to restart failing jobs (default 0)
attempt (int): initial value of Job.attempt. This is intended for internal use only (default 1).
force_use_threads: whether to force use of threads over processes. helpful if shared memory is full or unavailable (default False)
use_conda (bool): create conda environments for each job (defined with conda directive of rules)
use_singularity (bool): run jobs in singularity containers (if defined with singularity directive)
singularity_args (str): additional arguments to pass to singularity
conda_prefix (str): the directory in which conda environments will be created (default None)
singularity_prefix (str): the directory to which singularity images will be pulled (default None)
shadow_prefix (str): prefix for shadow directories. The job-specific shadow directories will be created in $SHADOW_PREFIX/shadow/ (default None)
create_envs_only (bool): if specified, only builds the conda environments specified for each job, then exits.
list_conda_envs (bool): list conda environments and their location on disk.
mode (snakemake.common.Mode): execution mode
wrapper_prefix (str): prefix for wrapper script URLs (default None)
kubernetes (str): submit jobs to kubernetes, using the given namespace.
kubernetes_envvars (list): environment variables that shall be passed to kubernetes jobs.
container_image (str): Docker image to use, e.g., for kubernetes.
default_remote_provider (str): default remote provider to use instead of local files (e.g. S3, GS)
default_remote_prefix (str): prefix for default remote provider (e.g. name of the bucket).
tibanna (str): submit jobs to AWS cloud using Tibanna.
tibanna_sfn (str): Step function (Unicorn) name of Tibanna (e.g. tibanna_unicorn_monty). This must be deployed first using tibanna cli.
precommand (str): commands to run on AWS cloud before the snakemake command (e.g. wget, git clone, unzip, etc). Use with --tibanna.
assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true).
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as single argument.
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
:level:
the log level ("info", "error", "debug", "progress", "job_info")
:level="info", "error" or "debug":
:msg:
the log message
:level="progress":
:done:
number of already executed jobs
:total:
number of total jobs
:level="job_info":
:input:
list of input files of a job
:output:
list of output files of a job
:log:
path to log file of a job
:local:
whether a job is executed locally (i.e. ignoring cluster)
:msg:
the job message
:reason:
the job reason
:priority:
the job priority
:threads:
the threads of the job
Returns:
bool: True if workflow execution was successful.
"""
assert not immediate_submit or (
immediate_submit and notemp
), "immediate_submit has to be combined with notemp (it does not support temp file handling)"
if tibanna:
assume_shared_fs = False
default_remote_provider = "S3"
default_remote_prefix = default_remote_prefix.rstrip("/")
assert (
default_remote_prefix
), "default_remote_prefix needed if tibanna is specified"
assert tibanna_sfn, "tibanna_sfn needed if tibanna is specified"
if updated_files is None:
updated_files = list()
if cluster or cluster_sync or drmaa or tibanna:
cores = sys.maxsize
else:
nodes = sys.maxsize
if isinstance(cluster_config, str):
# Loading configuration from one file is still supported for
# backward compatibility
cluster_config = [cluster_config]
if cluster_config:
# Load all configuration files
configs = [load_configfile(f) for f in cluster_config]
# Merge in the order as specified, overriding earlier values with
# later ones
cluster_config_content = configs[0]
for other in configs[1:]:
update_config(cluster_config_content, other)
else:
cluster_config_content = dict()
run_local = not (cluster or cluster_sync or drmaa or kubernetes or tibanna)
if run_local and not dryrun:
# clean up all previously recorded jobids.
shell.cleanup()
# force thread use for any kind of cluster
use_threads = (
force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
)
if not keep_logger:
stdout = (
(
dryrun
and not (printdag or printd3dag or printrulegraph or printfilegraph)
)
or listrules
or list_target_rules
or list_resources
)
setup_logger(
handler=log_handler,
quiet=quiet,
printreason=printreason,
printshellcmds=printshellcmds,
debug_dag=debug_dag,
nocolor=nocolor,
stdout=stdout,
debug=verbose,
use_threads=use_threads,
mode=mode,
)
if greediness is None:
greediness = 0.5 if prioritytargets else 1.0
else:
if not (0 <= greediness <= 1.0):
logger.error("Error: greediness must be a float between 0 and 1.")
return False
if not os.path.exists(snakefile):
logger.error('Error: Snakefile "{}" not found.'.format(snakefile))
return False
snakefile = os.path.abspath(snakefile)
cluster_mode = (
(cluster is not None) + (cluster_sync is not None) + (drmaa is not None)
)
if cluster_mode > 1:
logger.error("Error: cluster and drmaa args are mutually exclusive")
return False
if debug and (cores > 1 or cluster_mode):
logger.error(
"Error: debug mode cannot be used with more than one core or cluster execution."
)
return False
overwrite_config = dict()
if configfiles is None:
configfiles = []
for f in configfiles:
# get values to override. Later configfiles override earlier ones.
overwrite_config.update(load_configfile(f))
# convert provided paths to absolute paths
configfiles = list(map(os.path.abspath, configfiles))
# directly specified elements override any configfiles
if config:
overwrite_config.update(config)
if config_args is None:
config_args = unparse_config(config)
if workdir:
olddir = os.getcwd()
if not os.path.exists(workdir):
logger.info("Creating specified working directory {}.".format(workdir))
os.makedirs(workdir)
workdir = os.path.abspath(workdir)
os.chdir(workdir)
logger.setup_logfile()
try:
# handle default remote provider
_default_remote_provider = None
if default_remote_provider is not None:
try:
rmt = importlib.import_module(
"snakemake.remote." + default_remote_provider
)
except ImportError as e:
raise WorkflowError("Unknown default remote provider.")
if rmt.RemoteProvider.supports_default:
_default_remote_provider = rmt.RemoteProvider(
keep_local=True, is_default=True
)
else:
raise WorkflowError(
"Remote provider {} does not (yet) support to "
"be used as default provider."
)
workflow = Workflow(
snakefile=snakefile,
jobscript=jobscript,
overwrite_shellcmd=overwrite_shellcmd,
overwrite_config=overwrite_config,
overwrite_workdir=workdir,
overwrite_configfiles=configfiles,
overwrite_clusterconfig=cluster_config_content,
config_args=config_args,
debug=debug,
use_conda=use_conda or list_conda_envs or cleanup_conda,
use_singularity=use_singularity,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
singularity_args=singularity_args,
mode=mode,
wrapper_prefix=wrapper_prefix,
printshellcmds=printshellcmds,
restart_times=restart_times,
attempt=attempt,
default_remote_provider=_default_remote_provider,
default_remote_prefix=default_remote_prefix,
run_local=run_local,
default_resources=default_resources,
)
success = True
workflow.include(
snakefile, overwrite_first_rule=True, print_compilation=print_compilation
)
workflow.check()
if not print_compilation:
if listrules:
workflow.list_rules()
elif list_target_rules:
workflow.list_rules(only_targets=True)
elif list_resources:
workflow.list_resources()
else:
# if not printdag and not printrulegraph:
# handle subworkflows
subsnakemake = partial(
snakemake,
cores=cores,
nodes=nodes,
local_cores=local_cores,
resources=resources,
default_resources=default_resources,
dryrun=dryrun,
touch=touch,
printreason=printreason,
printshellcmds=printshellcmds,
debug_dag=debug_dag,
nocolor=nocolor,
quiet=quiet,
keepgoing=keepgoing,
cluster=cluster,
cluster_sync=cluster_sync,
drmaa=drmaa,
drmaa_log_dir=drmaa_log_dir,
jobname=jobname,
immediate_submit=immediate_submit,
standalone=standalone,
ignore_ambiguity=ignore_ambiguity,
restart_times=restart_times,
attempt=attempt,
lock=lock,
unlock=unlock,
cleanup_metadata=cleanup_metadata,
cleanup_conda=cleanup_conda,
cleanup_shadow=cleanup_shadow,
force_incomplete=force_incomplete,
ignore_incomplete=ignore_incomplete,
latency_wait=latency_wait,
verbose=verbose,
notemp=notemp,
keep_remote_local=keep_remote_local,
nodeps=nodeps,
jobscript=jobscript,
greediness=greediness,
no_hooks=no_hooks,
overwrite_shellcmd=overwrite_shellcmd,
config=config,
config_args=config_args,
cluster_config=cluster_config,
keep_logger=True,
force_use_threads=use_threads,
use_conda=use_conda,
use_singularity=use_singularity,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
singularity_args=singularity_args,
list_conda_envs=list_conda_envs,
kubernetes=kubernetes,
kubernetes_envvars=kubernetes_envvars,
container_image=container_image,
create_envs_only=create_envs_only,
default_remote_provider=default_remote_provider,
default_remote_prefix=default_remote_prefix,
tibanna=tibanna,
tibanna_sfn=tibanna_sfn,
precommand=precommand,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
)
success = workflow.execute(
targets=targets,
dryrun=dryrun,
touch=touch,
cores=cores,
nodes=nodes,
local_cores=local_cores,
forcetargets=forcetargets,
forceall=forceall,
forcerun=forcerun,
prioritytargets=prioritytargets,
until=until,
omit_from=omit_from,
quiet=quiet,
keepgoing=keepgoing,
printshellcmds=printshellcmds,
printreason=printreason,
printrulegraph=printrulegraph,
printfilegraph=printfilegraph,
printdag=printdag,
cluster=cluster,
cluster_sync=cluster_sync,
jobname=jobname,
drmaa=drmaa,
drmaa_log_dir=drmaa_log_dir,
kubernetes=kubernetes,
kubernetes_envvars=kubernetes_envvars,
container_image=container_image,
tibanna=tibanna,
tibanna_sfn=tibanna_sfn,
precommand=precommand,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
printd3dag=printd3dag,
immediate_submit=immediate_submit,
ignore_ambiguity=ignore_ambiguity,
stats=stats,
force_incomplete=force_incomplete,
ignore_incomplete=ignore_incomplete,
list_version_changes=list_version_changes,
list_code_changes=list_code_changes,
list_input_changes=list_input_changes,
list_params_changes=list_params_changes,
list_untracked=list_untracked,
list_conda_envs=list_conda_envs,
summary=summary,
archive=archive,
delete_all_output=delete_all_output,
delete_temp_output=delete_temp_output,
latency_wait=latency_wait,
wait_for_files=wait_for_files,
detailed_summary=detailed_summary,
nolock=not lock,
unlock=unlock,
resources=resources,
notemp=notemp,
keep_remote_local=keep_remote_local,
nodeps=nodeps,
keep_target_files=keep_target_files,
cleanup_metadata=cleanup_metadata,
cleanup_conda=cleanup_conda,
cleanup_shadow=cleanup_shadow,
subsnakemake=subsnakemake,
updated_files=updated_files,
allowed_rules=allowed_rules,
greediness=greediness,
no_hooks=no_hooks,
force_use_threads=use_threads,
create_envs_only=create_envs_only,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
report=report,
export_cwl=export_cwl,
)
except BrokenPipeError:
# ignore this exception and stop. It occurs if snakemake output is piped into less and less quits before reading the whole output.
# in such a case, snakemake shall stop scheduling and quit with error 1
success = False
except (Exception, BaseException) as ex:
if "workflow" in locals():
print_exception(ex, workflow.linemaps)
else:
print_exception(ex, dict())
success = False
if workdir:
os.chdir(olddir)
if "workflow" in locals() and workflow.persistence:
workflow.persistence.unlock()
if not keep_logger:
logger.cleanup()
return success
[docs]def parse_config(args):
"""Parse config from args."""
parsers = [int, float, eval, str]
config = dict()
if args.config is not None:
valid = re.compile(r"[a-zA-Z_]\w*$")
for entry in args.config:
try:
key, val = entry.split("=", 1)
except ValueError:
raise ValueError(
"Config entries have to be defined as name=value pairs."
)
if not valid.match(key):
raise ValueError("Config entry must start with a valid identifier.")
v = None
for parser in parsers:
try:
v = parser(val)
# avoid accidental interpretation as function
if not callable(v):
break
except:
pass
assert v is not None
config[key] = v
return config
[docs]def unparse_config(config):
if not isinstance(config, dict):
raise ValueError("config is not a dict")
items = []
for key, value in config.items():
if isinstance(value, dict):
raise ValueError("config may only be a flat dict")
encoded = "'{}'".format(value) if isinstance(value, str) else value
items.append("{}={}".format(key, encoded))
return items
APPDIRS = None
[docs]def get_appdirs():
global APPDIRS
if APPDIRS is None:
from appdirs import AppDirs
APPDIRS = AppDirs("snakemake", "snakemake")
return APPDIRS
[docs]def get_profile_file(profile, file, return_default=False):
dirs = get_appdirs()
if os.path.isabs(profile):
search_dirs = [os.path.dirname(profile)]
profile = os.path.basename(profile)
else:
search_dirs = [os.getcwd(), dirs.user_config_dir, dirs.site_config_dir]
get_path = lambda d: os.path.join(d, profile, file)
for d in search_dirs:
p = get_path(d)
if os.path.exists(p):
return p
if return_default:
return file
return None
[docs]def get_argument_parser(profile=None):
"""Generate and return argument parser."""
import configargparse
from configargparse import YAMLConfigFileParser
dirs = get_appdirs()
config_files = []
if profile:
if profile == "":
print("Error: invalid profile name.", file=sys.stderr)
exit(1)
config_file = get_profile_file(profile, "config.yaml")
if config_file is None:
print(
"Error: profile given but no config.yaml found. "
"Profile has to be given as either absolute path, relative "
"path or name of a directory available in either "
"{site} or {user}.".format(
site=dirs.site_config_dir, user=dirs.user_config_dir
),
file=sys.stderr,
)
exit(1)
config_files = [config_file]
parser = configargparse.ArgumentParser(
description="Snakemake is a Python based language and execution "
"environment for GNU Make-like workflows.",
default_config_files=config_files,
config_file_parser_class=YAMLConfigFileParser,
)
group_exec = parser.add_argument_group("EXECUTION")
group_exec.add_argument(
"target",
nargs="*",
default=None,
help="Targets to build. May be rules or files.",
)
group_exec.add_argument(
"--dry-run",
"--dryrun",
"-n",
dest="dryrun",
action="store_true",
help="Do not execute anything, and display what would be done. "
"If you have a very large workflow, use --dry-run --quiet to just "
"print a summary of the DAG of jobs.",
)
group_exec.add_argument(
"--profile",
help="""
Name of profile to use for configuring
Snakemake. Snakemake will search for a corresponding
folder in {} and {}. Alternatively, this can be an
absolute or relative path.
The profile folder has to contain a file 'config.yaml'.
This file can be used to set default values for command
line options in YAML format. For example,
'--cluster qsub' becomes 'cluster: qsub' in the YAML
file. Profiles can be obtained from
https://github.com/snakemake-profiles.
""".format(
dirs.site_config_dir, dirs.user_config_dir
),
)
group_exec.add_argument(
"--snakefile",
"-s",
metavar="FILE",
help=(
"The workflow definition in form of a snakefile."
"Usually, you should not need to specify this. "
"By default, Snakemake will search for {} "
"beneath the current working "
"directory, in this order. "
"Only if you definitely want a different layout, "
"you need to use this parameter."
).format(", ".join(map("'{}'".format, SNAKEFILE_CHOICES))),
)
group_exec.add_argument(
"--cores",
"--jobs",
"-j",
action="store",
const=available_cpu_count(),
nargs="?",
metavar="N",
type=int,
help=(
"Use at most N cores in parallel (default: 1). "
"If N is omitted, the limit is set to the number of "
"available cores."
),
)
group_exec.add_argument(
"--local-cores",
action="store",
default=available_cpu_count(),
metavar="N",
type=int,
help=(
"In cluster mode, use at most N cores of the host machine in parallel "
" (default: number of CPU cores of the host). The cores are used to execute "
"local rules. This option is ignored when not in cluster mode."
),
)
group_exec.add_argument(
"--resources",
"--res",
nargs="*",
metavar="NAME=INT",
help=(
"Define additional resources that shall constrain the scheduling "
"analogously to threads (see above). A resource is defined as "
"a name and an integer value. E.g. --resources gpu=1. Rules can "
"use resources by defining the resource keyword, e.g. "
"resources: gpu=1. If now two rules require 1 of the resource "
"'gpu' they won't be run in parallel by the scheduler."
),
)
group_exec.add_argument(
"--default-resources",
"--default-res",
nargs="*",
metavar="NAME=INT",
help=(
"Define default values of resources for rules that do not define their own values. "
"In addition to plain integers, python expressions over inputsize are allowed (e.g. '2*input.size')."
"When specifying this without any arguments (--default-resources), it defines 'mem_mb=max(2*input.size, 1000)' "
"'disk_mb=max(2*input.size, 1000)', i.e., default disk and mem usage is twice the input file size but at least 1GB."
),
)
group_exec.add_argument(
"--config",
"-C",
nargs="*",
metavar="KEY=VALUE",
help=(
"Set or overwrite values in the workflow config object. "
"The workflow config object is accessible as variable config inside "
"the workflow. Default values can be set by providing a JSON file "
"(see Documentation)."
),
)
group_exec.add_argument(
"--configfile",
"--configfiles",
nargs="+",
metavar="FILE",
help=(
"Specify or overwrite the config file of the workflow (see the docs). "
"Values specified in JSON or YAML format are available in the global config "
"dictionary inside the workflow. Multiple files overwrite each other in "
"the given order."
),
)
group_exec.add_argument(
"--directory",
"-d",
metavar="DIR",
action="store",
help=(
"Specify working directory (relative paths in "
"the snakefile will use this as their origin)."
),
)
group_exec.add_argument(
"--touch",
"-t",
action="store_true",
help=(
"Touch output files (mark them up to date without really "
"changing them) instead of running their commands. This is "
"used to pretend that the rules were executed, in order to "
"fool future invocations of snakemake. Fails if a file does "
"not yet exist."
),
)
group_exec.add_argument(
"--keep-going",
"-k",
action="store_true",
help="Go on with independent jobs if a job fails.",
)
group_exec.add_argument(
"--force",
"-f",
action="store_true",
help=(
"Force the execution of the selected target or the first rule "
"regardless of already created output."
),
)
group_exec.add_argument(
"--forceall",
"-F",
action="store_true",
help=(
"Force the execution of the selected (or the first) rule and "
"all rules it is dependent on regardless of already created "
"output."
),
)
group_exec.add_argument(
"--forcerun",
"-R",
nargs="*",
metavar="TARGET",
help=(
"Force the re-execution or creation of the given rules or files."
" Use this option if you changed a rule and want to have all its "
"output in your workflow updated."
),
)
group_exec.add_argument(
"--prioritize",
"-P",
nargs="+",
metavar="TARGET",
help=(
"Tell the scheduler to assign creation of given targets "
"(and all their dependencies) highest priority. (EXPERIMENTAL)"
),
)
group_exec.add_argument(
"--until",
"-U",
nargs="+",
metavar="TARGET",
help=(
"Runs the pipeline until it reaches the specified rules or "
"files. Only runs jobs that are dependencies of the specified "
"rule or files, does not run sibling DAGs. "
),
)
group_exec.add_argument(
"--omit-from",
"-O",
nargs="+",
metavar="TARGET",
help=(
"Prevent the execution or creation of the given rules or files "
"as well as any rules or files that are downstream of these targets "
"in the DAG. Also runs jobs in sibling DAGs that are independent of the "
"rules or files specified here."
),
)
group_exec.add_argument(
"--rerun-incomplete",
"--ri",
action="store_true",
help=("Re-run all " "jobs the output of which is recognized as incomplete."),
)
group_exec.add_argument(
"--shadow-prefix",
metavar="DIR",
help=(
"Specify a directory in which the 'shadow' directory is created. "
"If not supplied, the value is set to the '.snakemake' directory relative "
"to the working directory."
),
)
group_utils = parser.add_argument_group("UTILITIES")
group_utils.add_argument(
"--report",
nargs="?",
const="report.html",
metavar="HTMLFILE",
help="Create an HTML report with results and statistics. "
"If no filename is given, report.html is the default.",
)
group_utils.add_argument(
"--export-cwl",
action="store",
metavar="FILE",
help="Compile workflow to CWL and store it in given FILE.",
)
group_utils.add_argument(
"--list",
"-l",
action="store_true",
help="Show available rules in given Snakefile.",
)
group_utils.add_argument(
"--list-target-rules",
"--lt",
action="store_true",
help="Show available target rules in given Snakefile.",
)
group_utils.add_argument(
"--dag",
action="store_true",
help="Do not execute anything and print the directed "
"acyclic graph of jobs in the dot language. Recommended "
"use on Unix systems: snakemake --dag | dot | display",
)
group_utils.add_argument(
"--rulegraph",
action="store_true",
help="Do not execute anything and print the dependency graph "
"of rules in the dot language. This will be less "
"crowded than above DAG of jobs, but also show less information. "
"Note that each rule is displayed once, hence the displayed graph will be "
"cyclic if a rule appears in several steps of the workflow. "
"Use this if above option leads to a DAG that is too large. "
"Recommended use on Unix systems: snakemake --rulegraph | dot | display",
)
group_utils.add_argument(
"--filegraph",
action="store_true",
help="Do not execute anything and print the dependency graph "
"of rules with their input and output files in the dot language. "
"This is an intermadiate solution between above DAG of jobs and the rule graph. "
"Note that each rule is displayed once, hence the displayed graph will be "
"cyclic if a rule appears in several steps of the workflow. "
"Use this if above option leads to a DAG that is too large. "
"Recommended use on Unix systems: snakemake --filegraph | dot | display",
)
group_utils.add_argument(
"--d3dag",
action="store_true",
help="Print the DAG in D3.js compatible JSON format.",
)
group_utils.add_argument(
"--summary",
"-S",
action="store_true",
help="Print a summary of all files created by the workflow. The "
"has the following columns: filename, modification time, "
"rule version, status, plan.\n"
"Thereby rule version contains the version"
"the file was created with (see the version keyword of rules), and "
"status denotes whether the file is missing, its input files are "
"newer or if version or implementation of the rule changed since "
"file creation. Finally the last column denotes whether the file "
"will be updated or created during the next workflow execution.",
)
group_utils.add_argument(
"--detailed-summary",
"-D",
action="store_true",
help="Print a summary of all files created by the workflow. The "
"has the following columns: filename, modification time, "
"rule version, input file(s), shell command, status, plan.\n"
"Thereby rule version contains the version"
"the file was created with (see the version keyword of rules), and "
"status denotes whether the file is missing, its input files are "
"newer or if version or implementation of the rule changed since "
"file creation. The input file and shell command columns are self"
"explanatory. Finally the last column denotes whether the file "
"will be updated or created during the next workflow execution.",
)
group_utils.add_argument(
"--archive",
metavar="FILE",
help="Archive the workflow into the given tar archive FILE. The archive "
"will be created such that the workflow can be re-executed on a vanilla "
"system. The function needs conda and git to be installed. "
"It will archive every file that is under git version control. "
"Note that it is best practice to have the Snakefile, config files, and "
"scripts under version control. Hence, they will be included in the archive. "
"Further, it will add input files that are not generated by "
"by the workflow itself and conda environments. Note that symlinks are "
"dereferenced. Supported "
"formats are .tar, .tar.gz, .tar.bz2 and .tar.xz.",
)
group_utils.add_argument(
"--cleanup-metadata",
"--cm",
nargs="+",
metavar="FILE",
help="Cleanup the metadata "
"of given files. That means that snakemake removes any tracked "
"version info, and any marks that files are incomplete.",
)
group_utils.add_argument(
"--cleanup-shadow",
action="store_true",
help="Cleanup old shadow directories which have not been deleted due "
"to failures or power loss.",
)
group_utils.add_argument(
"--unlock", action="store_true", help="Remove a lock on the working directory."
)
group_utils.add_argument(
"--list-version-changes",
"--lv",
action="store_true",
help="List all output files that have been created with "
"a different version (as determined by the version keyword).",
)
group_utils.add_argument(
"--list-code-changes",
"--lc",
action="store_true",
help="List all output files for which the rule body (run or shell) have "
"changed in the Snakefile.",
)
group_utils.add_argument(
"--list-input-changes",
"--li",
action="store_true",
help="List all output files for which the defined input files have changed "
"in the Snakefile (e.g. new input files were added in the rule "
"definition or files were renamed). For listing input file "
"modification in the filesystem, use --summary.",
)
group_utils.add_argument(
"--list-params-changes",
"--lp",
action="store_true",
help="List all output files for which the defined params have changed "
"in the Snakefile.",
)
group_utils.add_argument(
"--list-untracked",
"--lu",
action="store_true",
help="List all files in the working directory that are not used in the "
"workflow. This can be used e.g. for identifying leftover files. Hidden files "
"and directories are ignored.",
)
group_utils.add_argument(
"--delete-all-output",
action="store_true",
help="Remove all files generated by the workflow. Use together with --dry-run "
"to list files without actually deleting anything. Note that this will "
"not recurse into subworkflows. Write-protected files are not removed. "
"Nevertheless, use with care!",
)
group_utils.add_argument(
"--delete-temp-output",
action="store_true",
help="Remove all temporary files generated by the workflow. Use together "
"with --dry-run to list files without actually deleting anything. Note "
"that this will not recurse into subworkflows.",
)
group_utils.add_argument(
"--bash-completion",
action="store_true",
help="Output code to register bash completion for snakemake. Put the "
"following in your .bashrc (including the accents): "
"`snakemake --bash-completion` or issue it in an open terminal "
"session.",
)
group_utils.add_argument("--version", "-v", action="version", version=__version__)
group_output = parser.add_argument_group("OUTPUT")
group_output.add_argument(
"--reason",
"-r",
action="store_true",
help="Print the reason for each executed rule.",
)
group_output.add_argument(
"--gui",
nargs="?",
const="8000",
metavar="PORT",
type=str,
help="Serve an HTML based user interface to the given network and "
"port e.g. 168.129.10.15:8000. By default Snakemake is only "
"available in the local network (default port: 8000). To make "
"Snakemake listen to all ip addresses add the special host address "
"0.0.0.0 to the url (0.0.0.0:8000). This is important if Snakemake "
"is used in a virtualised environment like Docker. If possible, a "
"browser window is opened.",
)
group_output.add_argument(
"--printshellcmds",
"-p",
action="store_true",
help="Print out the shell commands that will be executed.",
)
group_output.add_argument(
"--debug-dag",
action="store_true",
help="Print candidate and selected jobs (including their wildcards) while "
"inferring DAG. This can help to debug unexpected DAG topology or errors.",
)
group_output.add_argument(
"--stats",
metavar="FILE",
help="Write stats about Snakefile execution in JSON format to the given file.",
)
group_output.add_argument(
"--nocolor", action="store_true", help="Do not use a colored output."
)
group_output.add_argument(
"--quiet",
"-q",
action="store_true",
help="Do not output any progress or rule information.",
)
group_output.add_argument(
"--print-compilation",
action="store_true",
help="Print the python representation of the workflow.",
)
group_output.add_argument(
"--verbose", action="store_true", help="Print debugging output."
)
group_behavior = parser.add_argument_group("BEHAVIOR")
group_behavior.add_argument(
"--force-use-threads",
dest="force_use_threads",
action="store_true",
help="Force threads rather than processes. Helpful if shared memory (/dev/shm) is full or unavailable.",
)
group_behavior.add_argument(
"--allow-ambiguity",
"-a",
action="store_true",
help=(
"Don't check for ambiguous rules and simply use the first if "
"several can produce the same file. This allows the user to "
"prioritize rules by their order in the snakefile."
),
)
group_behavior.add_argument(
"--nolock", action="store_true", help="Do not lock the working directory"
)
group_behavior.add_argument(
"--ignore-incomplete",
"--ii",
action="store_true",
help="Do not check for incomplete output files.",
)
group_behavior.add_argument(
"--latency-wait",
"--output-wait",
"-w",
type=int,
default=5,
metavar="SECONDS",
help="Wait given seconds if an output file of a job is not present after "
"the job finished. This helps if your filesystem "
"suffers from latency (default 5).",
)
group_behavior.add_argument(
"--wait-for-files",
nargs="*",
metavar="FILE",
help="Wait --latency-wait seconds for these "
"files to be present before executing the workflow. "
"This option is used internally to handle filesystem latency in cluster "
"environments.",
)
group_behavior.add_argument(
"--notemp",
"--nt",
action="store_true",
help="Ignore temp() declarations. This is useful when running only "
"a part of the workflow, since temp() would lead to deletion of "
"probably needed files by other parts of the workflow.",
)
group_behavior.add_argument(
"--keep-remote",
action="store_true",
help="Keep local copies of remote input files.",
)
group_behavior.add_argument(
"--keep-target-files",
action="store_true",
help="Do not adjust the paths of given target files relative to the working directory.",
)
group_behavior.add_argument(
"--allowed-rules",
nargs="+",
help="Only consider given rules. If omitted, all rules in Snakefile are "
"used. Note that this is intended primarily for internal use and may "
"lead to unexpected results otherwise.",
)
group_behavior.add_argument(
"--max-jobs-per-second",
default=10,
type=float,
help="Maximal number of cluster/drmaa jobs per second, default is 10, "
"fractions allowed.",
)
group_behavior.add_argument(
"--max-status-checks-per-second",
default=10,
type=float,
help="Maximal number of job status checks per second, default is 10, "
"fractions allowed.",
)
group_behavior.add_argument(
"--restart-times",
default=0,
type=int,
help="Number of times to restart failing jobs (defaults to 0).",
)
group_behavior.add_argument(
"--attempt",
default=1,
type=int,
help="Internal use only: define the initial value of the attempt "
"parameter (default: 1).",
)
group_behavior.add_argument(
"--wrapper-prefix",
default="https://github.com/snakemake/snakemake-wrappers/raw/",
help="Prefix for URL created from wrapper directive (default: "
"https://github.com/snakemake/snakemake-wrappers/raw/). Set this to "
"a different URL to use your fork or a local clone of the repository, "
"e.g., use a git URL like 'git+file://path/to/your/local/clone@'.",
)
group_behavior.add_argument(
"--default-remote-provider",
choices=["S3", "GS", "FTP", "SFTP", "S3Mocked", "gfal", "gridftp", "iRODS"],
help="Specify default remote provider to be used for "
"all input and output files that don't yet specify "
"one.",
)
group_behavior.add_argument(
"--default-remote-prefix",
default="",
help="Specify prefix for default remote provider. E.g. " "a bucket name.",
)
group_behavior.add_argument(
"--no-shared-fs",
action="store_true",
help="Do not assume that jobs share a common file "
"system. When this flag is activated, Snakemake will "
"assume that the filesystem on a cluster node is not "
"shared with other nodes. For example, this will lead "
"to downloading remote files on each cluster node "
"separately. Further, it won't take special measures "
"to deal with filesystem latency issues. This option "
"will in most cases only make sense in combination with "
"--default-remote-provider. Further, when using --cluster "
"you will have to also provide --cluster-status. "
"Only activate this if you "
"know what you are doing.",
)
group_behavior.add_argument(
"--greediness",
type=float,
default=None,
help="Set the greediness of scheduling. This value between 0 and 1 "
"determines how careful jobs are selected for execution. The default "
"value (1.0) provides the best speed and still acceptable scheduling "
"quality.",
)
group_behavior.add_argument(
"--no-hooks",
action="store_true",
help="Do not invoke onstart, onsuccess or onerror hooks after execution.",
)
group_behavior.add_argument(
"--overwrite-shellcmd",
help="Provide a shell command that shall be executed instead of those "
"given in the workflow. "
"This is for debugging purposes only.",
)
group_behavior.add_argument(
"--debug",
action="store_true",
help="Allow to debug rules with e.g. PDB. This flag "
"allows to set breakpoints in run blocks.",
)
group_behavior.add_argument(
"--runtime-profile",
metavar="FILE",
help="Profile Snakemake and write the output to FILE. This requires yappi "
"to be installed.",
)
group_behavior.add_argument(
"--mode",
choices=[Mode.default, Mode.subprocess, Mode.cluster],
default=Mode.default,
type=int,
help="Set execution mode of Snakemake (internal use only).",
)
group_cluster = parser.add_argument_group("CLUSTER")
# TODO extend below description to explain the wildcards that can be used
cluster_mode_group = group_cluster.add_mutually_exclusive_group()
cluster_mode_group.add_argument(
"--cluster",
"-c",
metavar="CMD",
help=(
"Execute snakemake rules with the given submit command, "
"e.g. qsub. Snakemake compiles jobs into scripts that are "
"submitted to the cluster with the given command, once all input "
"files for a particular job are present.\n"
"The submit command can be decorated to make it aware of certain "
"job properties (name, rulename, input, output, params, wildcards, log, threads "
"and dependencies (see the argument below)), e.g.:\n"
"$ snakemake --cluster 'qsub -pe threaded {threads}'."
),
),
cluster_mode_group.add_argument(
"--cluster-sync",
metavar="CMD",
help=(
"cluster submission command will block, returning the remote exit"
"status upon remote termination (for example, this should be used"
"if the cluster command is 'qsub -sync y' (SGE)"
),
),
cluster_mode_group.add_argument(
"--drmaa",
nargs="?",
const="",
metavar="ARGS",
help="Execute snakemake on a cluster accessed via DRMAA, "
"Snakemake compiles jobs into scripts that are "
"submitted to the cluster with the given command, once all input "
"files for a particular job are present. ARGS can be used to "
"specify options of the underlying cluster system, "
"thereby using the job properties name, rulename, input, output, params, wildcards, log, "
"threads and dependencies, e.g.: "
"--drmaa ' -pe threaded {threads}'. Note that ARGS must be given in quotes and "
"with a leading whitespace.",
)
group_cluster.add_argument(
"--cluster-config",
"-u",
metavar="FILE",
default=[],
action="append",
help=(
"A JSON or YAML file that defines the wildcards used in 'cluster'"
"for specific rules, instead of having them specified in the Snakefile. "
"For example, for rule 'job' you may define: "
"{ 'job' : { 'time' : '24:00:00' } } to specify the time for rule 'job'. "
"You can specify more than one file. The configuration files are merged "
"with later values overriding earlier ones."
),
),
group_cluster.add_argument(
"--immediate-submit",
"--is",
action="store_true",
help="Immediately submit all jobs to the cluster instead of waiting "
"for present input files. This will fail, unless you make "
"the cluster aware of job dependencies, e.g. via:\n"
"$ snakemake --cluster 'sbatch --dependency {dependencies}.\n"
"Assuming that your submit script (here sbatch) outputs the "
"generated job id to the first stdout line, {dependencies} will "
"be filled with space separated job ids this job depends on.",
)
group_cluster.add_argument(
"--jobscript",
"--js",
metavar="SCRIPT",
help="Provide a custom job script for submission to the cluster. "
"The default script resides as 'jobscript.sh' in the "
"installation directory.",
)
group_cluster.add_argument(
"--jobname",
"--jn",
default="snakejob.{name}.{jobid}.sh",
metavar="NAME",
help="Provide a custom name for the jobscript that is submitted to the "
'cluster (see --cluster). NAME is "snakejob.{name}.{jobid}.sh" '
"per default. The wildcard {jobid} has to be present in the name.",
)
group_cluster.add_argument(
"--cluster-status",
help="Status command for cluster execution. This is only considered "
"in combination with the --cluster flag. If provided, Snakemake will "
"use the status command to determine if a job has finished successfully "
"or failed. For this it is necessary that the submit command provided "
"to --cluster returns the cluster job id. Then, the status command "
"will be invoked with the job id. Snakemake expects it to return "
"'success' if the job was successfull, 'failed' if the job failed and "
"'running' if the job still runs.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
metavar="DIR",
help="Specify a directory in which stdout and stderr files of DRMAA"
" jobs will be written. The value may be given as a relative path,"
" in which case Snakemake will use the current invocation directory"
" as the origin. If given, this will override any given '-o' and/or"
" '-e' native specification. If not given, all DRMAA stdout and"
" stderr files are written to the current working directory.",
)
group_cloud = parser.add_argument_group("CLOUD")
group_kubernetes = parser.add_argument_group("KUBERNETES")
group_tibanna = parser.add_argument_group("TIBANNA")
group_kubernetes.add_argument(
"--kubernetes",
metavar="NAMESPACE",
nargs="?",
const="default",
help="Execute workflow in a kubernetes cluster (in the cloud). "
"NAMESPACE is the namespace you want to use for your job (if nothing "
"specified: 'default'). "
"Usually, this requires --default-remote-provider and "
"--default-remote-prefix to be set to a S3 or GS bucket where your . "
"data shall be stored. It is further advisable to activate conda "
"integration via --use-conda.",
)
group_kubernetes.add_argument(
"--kubernetes-env",
nargs="+",
metavar="ENVVAR",
default=[],
help="Specify environment variables to pass to the kubernetes job.",
)
group_kubernetes.add_argument(
"--container-image",
metavar="IMAGE",
help="Docker image to use, e.g., when submitting jobs to kubernetes. "
"By default, this is 'https://hub.docker.com/r/snakemake/snakemake', tagged with "
"the same version as the currently running Snakemake instance. "
"Note that overwriting this value is up to your responsibility. "
"Any used image has to contain a working snakemake installation "
"that is compatible with (or ideally the same as) the currently "
"running version.",
)
group_tibanna.add_argument(
"--tibanna",
action="store_true",
help="Execute workflow on AWS cloud using Tibanna. This requires "
"--default-remote-prefix to be set to S3 bucket name and prefix"
" (e.g. 'bucketname/subdirectory') where input is already stored"
" and output will be sent to. Using --tibanna implies --defaut-resources"
" is set as default. Optionally, use --precommand to"
" specify any preparation command to run before snakemake command"
" on the cloud (inside snakemake container on Tibanna VM)."
" Also, --use-conda, --use-singularity, --config, --configfile are"
" supported and will be carried over.",
)
group_tibanna.add_argument(
"--tibanna-sfn",
help="Name of Tibanna Unicorn step function (e.g. tibanna_unicorn_monty)."
"This works as serverless scheduler/resource allocator and must be "
"deployed first using tibanna cli. (e.g. tibanna deploy_unicorn --usergroup="
"monty --buckets=bucketname",
)
group_tibanna.add_argument(
"--precommand",
help="Any command to execute before snakemake command on AWS cloud "
"such as wget, git clone, unzip, etc. This is used with --tibanna."
"Do not include input/output download/upload commands - file transfer"
" between S3 bucket and the run environment (container) is automatically"
" handled by Tibanna.",
)
group_conda = parser.add_argument_group("CONDA")
group_conda.add_argument(
"--use-conda",
action="store_true",
help="If defined in the rule, run job in a conda environment. "
"If this flag is not set, the conda directive is ignored.",
)
group_conda.add_argument(
"--list-conda-envs",
action="store_true",
help="List all conda environments and their location on " "disk.",
)
group_conda.add_argument(
"--cleanup-conda",
action="store_true",
help="Cleanup unused conda environments.",
)
group_conda.add_argument(
"--conda-prefix",
metavar="DIR",
help="Specify a directory in which the 'conda' and 'conda-archive' "
"directories are created. These are used to store conda environments "
"and their archives, respectively. If not supplied, the value is set "
"to the '.snakemake' directory relative to the invocation directory. "
"If supplied, the `--use-conda` flag must also be set. The value may "
"be given as a relative path, which will be extrapolated to the "
"invocation directory, or as an absolute path.",
)
group_conda.add_argument(
"--create-envs-only",
action="store_true",
help="If specified, only creates the job-specific "
"conda environments then exits. The `--use-conda` "
"flag must also be set.",
)
group_singularity = parser.add_argument_group("SINGULARITY")
group_singularity.add_argument(
"--use-singularity",
action="store_true",
help="If defined in the rule, run job within a singularity container. "
"If this flag is not set, the singularity directive is ignored.",
)
group_singularity.add_argument(
"--singularity-prefix",
metavar="DIR",
help="Specify a directory in which singularity images will be stored."
"If not supplied, the value is set "
"to the '.snakemake' directory relative to the invocation directory. "
"If supplied, the `--use-singularity` flag must also be set. The value "
"may be given as a relative path, which will be extrapolated to the "
"invocation directory, or as an absolute path.",
)
group_singularity.add_argument(
"--singularity-args",
default="",
metavar="ARGS",
help="Pass additional args to singularity.",
)
return parser
[docs]def main(argv=None):
"""Main entry point."""
parser = get_argument_parser()
args = parser.parse_args(argv)
if args.profile:
# reparse args while inferring config file from profile
parser = get_argument_parser(args.profile)
args = parser.parse_args(argv)
def adjust_path(f):
if os.path.exists(f) or os.path.isabs(f):
return f
else:
return get_profile_file(args.profile, f, return_default=True)
# update file paths to be relative to the profile
# (if they do not exist relative to CWD)
if args.jobscript:
args.jobscript = adjust_path(args.jobscript)
if args.cluster:
args.cluster = adjust_path(args.cluster)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
if args.cluster_status:
args.cluster_status = adjust_path(args.cluster_status)
if args.bash_completion:
cmd = b"complete -o bashdefault -C snakemake-bash-completion snakemake"
sys.stdout.buffer.write(cmd)
sys.exit(0)
try:
resources = parse_resources(args.resources)
config = parse_config(args)
if (args.default_resources is not None and not args.default_resources) or (
args.tibanna and not args.default_resources
):
args.default_resources = [
"mem_mb=max(2*input.size, 1000)",
"disk_mb=max(2*input.size, 1000)",
]
default_resources = DefaultResources(args.default_resources)
except ValueError as e:
print(e, file=sys.stderr)
print("", file=sys.stderr)
sys.exit(1)
if args.cluster or args.cluster_sync or args.drmaa:
if args.cores is None:
if args.dryrun:
args.cores = 1
else:
print(
"Error: you need to specify the maximum number of jobs to "
"be queued or executed at the same time with --jobs.",
file=sys.stderr,
)
sys.exit(1)
elif args.cores is None:
args.cores = 1
if args.drmaa_log_dir is not None:
if not os.path.isabs(args.drmaa_log_dir):
args.drmaa_log_dir = os.path.abspath(os.path.expanduser(args.drmaa_log_dir))
if args.runtime_profile:
import yappi
yappi.start()
if args.immediate_submit and not args.notemp:
print(
"Error: --immediate-submit has to be combined with --notemp, "
"because temp file handling is not supported in this mode.",
file=sys.stderr,
)
sys.exit(1)
if (args.conda_prefix or args.create_envs_only) and not args.use_conda:
print(
"Error: --use-conda must be set if --conda-prefix or "
"--create-envs-only is set.",
file=sys.stderr,
)
sys.exit(1)
if args.singularity_prefix and not args.use_singularity:
print(
"Error: --use_singularity must be set if --singularity-prefix " "is set.",
file=sys.stderr,
)
sys.exit(1)
if args.kubernetes and (
not args.default_remote_provider or not args.default_remote_prefix
):
print(
"Error: --kubernetes must be combined with "
"--default-remote-provider and --default-remote-prefix, see "
"https://snakemake.readthedocs.io/en/stable/executable.html"
"#executing-a-snakemake-workflow-via-kubernetes",
file=sys.stderr,
)
sys.exit(1)
if args.tibanna:
if not args.default_remote_prefix:
print(
"Error: --tibanna must be combined with --default-remote-prefix "
"to provide bucket name and subdirectory (prefix) "
"(e.g. 'bucketname/projectname'",
file=sys.stderr,
)
sys.exit(1)
args.default_remote_prefix = args.default_remote_prefix.rstrip("/")
if not args.tibanna_sfn:
args.tibanna_sfn = os.environ.get("TIBANNA_DEFAULT_STEP_FUNCTION_NAME", "")
if not args.tibanna_sfn:
print(
"Error: to use --tibanna, either --tibanna-sfn or environment variable "
"TIBANNA_DEFAULT_STEP_FUNCTION_NAME must be set and exported "
"to provide name of the tibanna unicorn step function "
"(e.g. 'tibanna_unicorn_monty'). The step function must be deployed first "
"using tibanna cli (e.g. tibanna deploy_unicorn --usergroup=monty "
"--buckets=bucketname)",
file=sys.stderr,
)
sys.exit(1)
if args.delete_all_output and args.delete_temp_output:
print(
"Error: --delete-all-output and --delete-temp-output are mutually exclusive.",
file=sys.stderr,
)
sys.exit(1)
if args.snakefile is None:
for p in SNAKEFILE_CHOICES:
if os.path.exists(p):
args.snakefile = p
break
if args.snakefile is None:
print(
"Error: no Snakefile found, tried {}.".format(
", ".join(SNAKEFILE_CHOICES), file=sys.stderr
)
)
sys.exit(1)
if args.gui is not None:
try:
import snakemake.gui as gui
except ImportError:
print(
"Error: GUI needs Flask to be installed. Install "
"with easy_install or contact your administrator.",
file=sys.stderr,
)
sys.exit(1)
_logging.getLogger("werkzeug").setLevel(_logging.ERROR)
_snakemake = partial(snakemake, os.path.abspath(args.snakefile))
gui.register(_snakemake, args)
if ":" in args.gui:
host, port = args.gui.split(":")
else:
port = args.gui
host = "127.0.0.1"
url = "http://{}:{}".format(host, port)
print("Listening on {}.".format(url), file=sys.stderr)
def open_browser():
try:
webbrowser.open(url)
except:
pass
print("Open this address in your browser to access the GUI.", file=sys.stderr)
threading.Timer(0.5, open_browser).start()
success = True
try:
gui.app.run(debug=False, threaded=True, port=int(port), host=host)
except (KeyboardInterrupt, SystemExit):
# silently close
pass
else:
success = snakemake(
args.snakefile,
report=args.report,
listrules=args.list,
list_target_rules=args.list_target_rules,
cores=args.cores,
local_cores=args.local_cores,
nodes=args.cores,
resources=resources,
default_resources=default_resources,
config=config,
configfiles=args.configfile,
config_args=args.config,
workdir=args.directory,
targets=args.target,
dryrun=args.dryrun,
printshellcmds=args.printshellcmds,
printreason=args.reason,
debug_dag=args.debug_dag,
printdag=args.dag,
printrulegraph=args.rulegraph,
printfilegraph=args.filegraph,
printd3dag=args.d3dag,
touch=args.touch,
forcetargets=args.force,
forceall=args.forceall,
forcerun=args.forcerun,
prioritytargets=args.prioritize,
until=args.until,
omit_from=args.omit_from,
stats=args.stats,
nocolor=args.nocolor,
quiet=args.quiet,
keepgoing=args.keep_going,
cluster=args.cluster,
cluster_config=args.cluster_config,
cluster_sync=args.cluster_sync,
drmaa=args.drmaa,
drmaa_log_dir=args.drmaa_log_dir,
kubernetes=args.kubernetes,
kubernetes_envvars=args.kubernetes_env,
container_image=args.container_image,
tibanna=args.tibanna,
tibanna_sfn=args.tibanna_sfn,
precommand=args.precommand,
jobname=args.jobname,
immediate_submit=args.immediate_submit,
standalone=True,
ignore_ambiguity=args.allow_ambiguity,
lock=not args.nolock,
unlock=args.unlock,
cleanup_metadata=args.cleanup_metadata,
cleanup_conda=args.cleanup_conda,
cleanup_shadow=args.cleanup_shadow,
force_incomplete=args.rerun_incomplete,
ignore_incomplete=args.ignore_incomplete,
list_version_changes=args.list_version_changes,
list_code_changes=args.list_code_changes,
list_input_changes=args.list_input_changes,
list_params_changes=args.list_params_changes,
list_untracked=args.list_untracked,
summary=args.summary,
detailed_summary=args.detailed_summary,
archive=args.archive,
delete_all_output=args.delete_all_output,
delete_temp_output=args.delete_temp_output,
print_compilation=args.print_compilation,
verbose=args.verbose,
debug=args.debug,
jobscript=args.jobscript,
notemp=args.notemp,
keep_remote_local=args.keep_remote,
greediness=args.greediness,
no_hooks=args.no_hooks,
overwrite_shellcmd=args.overwrite_shellcmd,
latency_wait=args.latency_wait,
wait_for_files=args.wait_for_files,
keep_target_files=args.keep_target_files,
allowed_rules=args.allowed_rules,
max_jobs_per_second=args.max_jobs_per_second,
max_status_checks_per_second=args.max_status_checks_per_second,
restart_times=args.restart_times,
attempt=args.attempt,
force_use_threads=args.force_use_threads,
use_conda=args.use_conda,
conda_prefix=args.conda_prefix,
list_conda_envs=args.list_conda_envs,
use_singularity=args.use_singularity,
singularity_prefix=args.singularity_prefix,
shadow_prefix=args.shadow_prefix,
singularity_args=args.singularity_args,
create_envs_only=args.create_envs_only,
mode=args.mode,
wrapper_prefix=args.wrapper_prefix,
default_remote_provider=args.default_remote_provider,
default_remote_prefix=args.default_remote_prefix,
assume_shared_fs=not args.no_shared_fs,
cluster_status=args.cluster_status,
export_cwl=args.export_cwl,
)
if args.runtime_profile:
with open(args.runtime_profile, "w") as out:
profile = yappi.get_func_stats()
profile.sort("totaltime")
profile.print_all(out=out)
sys.exit(0 if success else 1)
[docs]def bash_completion(snakefile="Snakefile"):
"""Entry point for bash completion."""
if not len(sys.argv) >= 2:
print(
"Calculate bash completion for snakemake. This tool shall not be invoked by hand."
)
sys.exit(1)
def print_candidates(candidates):
if candidates:
candidates = sorted(set(candidates))
## Use bytes for avoiding '^M' under Windows.
sys.stdout.buffer.write(b"\n".join(s.encode() for s in candidates))
prefix = sys.argv[2]
if prefix.startswith("-"):
print_candidates(
action.option_strings[0]
for action in get_argument_parser()._actions
if action.option_strings and action.option_strings[0].startswith(prefix)
)
else:
candidates = []
files = glob.glob("{}*".format(prefix))
if files:
candidates.extend(files)
if os.path.exists(snakefile):
workflow = Workflow(snakefile=snakefile)
workflow.include(snakefile)
candidates.extend(
[file for file in workflow.concrete_files if file.startswith(prefix)]
+ [rule.name for rule in workflow.rules if rule.name.startswith(prefix)]
)
if len(candidates) > 0:
print_candidates(candidates)
sys.exit(0)