__author__ = "Johannes Köster"
__copyright__ = "Copyright 2021, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import tokenize
import textwrap
import os
from urllib.error import HTTPError, URLError, ContentTooShortError
import urllib.request
from io import TextIOWrapper
from snakemake.exceptions import WorkflowError
from snakemake import common
dd = textwrap.dedent
INDENT = "\t"
[docs]def is_newline(token, newline_tokens=set((tokenize.NEWLINE, tokenize.NL))):
return token.type in newline_tokens
[docs]def is_indent(token):
return token.type == tokenize.INDENT
[docs]def is_dedent(token):
return token.type == tokenize.DEDENT
[docs]def is_op(token):
return token.type == tokenize.OP
[docs]def is_greater(token):
return is_op(token) and token.string == ">"
[docs]def is_comma(token):
return is_op(token) and token.string == ","
[docs]def is_name(token):
return token.type == tokenize.NAME
[docs]def is_colon(token):
return is_op(token) and token.string == ":"
[docs]def is_string(token):
return token.type == tokenize.STRING
[docs]def is_eof(token):
return token.type == tokenize.ENDMARKER
[docs]def lineno(token):
return token.start[0]
[docs]class StopAutomaton(Exception):
def __init__(self, token):
self.token = token
[docs]class TokenAutomaton:
subautomata = dict()
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
self.root = root
self.snakefile = snakefile
self.state = None
self.base_indent = base_indent
self.line = 0
self.indent = 0
self.was_indented = False
self.lasttoken = None
self._dedent = dedent
@property
def dedent(self):
return self._dedent
@property
def effective_indent(self):
return self.base_indent + self.indent - self.dedent
[docs] def indentation(self, token):
if is_indent(token) or is_dedent(token):
self.indent = token.end[1] - self.base_indent
self.was_indented |= self.indent > 0
[docs] def consume(self):
for token in self.snakefile:
self.indentation(token)
try:
for t, orig in self.state(token):
if self.lasttoken == "\n" and not t.isspace():
yield INDENT * self.effective_indent, orig
yield t, orig
self.lasttoken = t
except tokenize.TokenError as e:
self.error(
str(e).split(",")[0].strip("()''"), token
) # TODO the inferred line number seems to be wrong sometimes
[docs] def error(self, msg, token):
raise SyntaxError(msg, (self.snakefile.path, lineno(token), None, None))
[docs] def subautomaton(self, automaton, *args, **kwargs):
return self.subautomata[automaton](
self.snakefile,
*args,
base_indent=self.base_indent + self.indent,
dedent=self.dedent,
root=False,
**kwargs,
)
[docs]class KeywordState(TokenAutomaton):
prefix = ""
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.line = 0
self.state = self.colon
@property
def keyword(self):
return self.__class__.__name__.lower()[len(self.prefix) :]
[docs] def end(self):
yield ")"
[docs] def decorate_end(self, token):
for t in self.end():
if isinstance(t, tuple):
yield t
else:
yield t, token
[docs] def colon(self, token):
if is_colon(token):
self.state = self.block
for t in self.start():
yield t, token
else:
self.error("Colon expected after keyword {}.".format(self.keyword), token)
[docs] def is_block_end(self, token):
return (self.line and self.indent <= 0) or is_eof(token)
[docs] def block(self, token):
if self.lasttoken == "\n" and is_comment(token):
# ignore lines containing only comments
self.line -= 1
if self.is_block_end(token):
yield from self.decorate_end(token)
yield "\n", token
raise StopAutomaton(token)
if is_newline(token):
self.line += 1
yield token.string, token
elif not (is_indent(token) or is_dedent(token)):
if is_comment(token):
yield token.string, token
else:
yield from self.block_content(token)
[docs] def yield_indent(self, token):
return token.string, token
[docs] def block_content(self, token):
yield token.string, token
[docs]class GlobalKeywordState(KeywordState):
[docs] def start(self):
yield "workflow.{keyword}(".format(keyword=self.keyword)
[docs]class DecoratorKeywordState(KeywordState):
decorator = None
args = list()
[docs] def start(self):
yield "@workflow.{}".format(self.decorator)
yield "\n"
yield "def __{}({}):".format(self.decorator, ", ".join(self.args))
[docs] def end(self):
yield ""
[docs]class RuleKeywordState(KeywordState):
def __init__(self, snakefile, base_indent=0, dedent=0, root=True, rulename=None):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.rulename = rulename
[docs] def start(self):
yield "\n"
yield "@workflow.{keyword}(".format(keyword=self.keyword)
[docs]class SectionKeywordState(KeywordState):
[docs] def start(self):
yield ", {keyword}=".format(keyword=self.keyword)
[docs] def end(self):
# no end needed
return list()
# Global keyword states
[docs]class Envvars(GlobalKeywordState):
@property
def keyword(self):
return "register_envvars"
[docs]class Include(GlobalKeywordState):
pass
[docs]class Workdir(GlobalKeywordState):
pass
[docs]class Configfile(GlobalKeywordState):
pass
# PEPs
[docs]class Pepfile(GlobalKeywordState):
pass
[docs]class Pepschema(GlobalKeywordState):
pass
[docs]class Report(GlobalKeywordState):
pass
[docs]class Scattergather(GlobalKeywordState):
pass
[docs]class Ruleorder(GlobalKeywordState):
[docs] def block_content(self, token):
if is_greater(token):
yield ",", token
elif is_name(token):
yield repr(token.string), token
else:
self.error(
"Expected a descending order of rule names, "
"e.g. rule1 > rule2 > rule3 ...",
token,
)
[docs]class GlobalWildcardConstraints(GlobalKeywordState):
@property
def keyword(self):
return "global_wildcard_constraints"
[docs]class GlobalSingularity(GlobalKeywordState):
@property
def keyword(self):
return "global_container"
[docs]class GlobalContainer(GlobalKeywordState):
@property
def keyword(self):
return "global_container"
[docs]class GlobalContainerized(GlobalKeywordState):
@property
def keyword(self):
return "global_containerized"
# subworkflows
[docs]class SubworkflowKeywordState(SectionKeywordState):
prefix = "Subworkflow"
[docs]class SubworkflowSnakefile(SubworkflowKeywordState):
pass
[docs]class SubworkflowWorkdir(SubworkflowKeywordState):
pass
[docs]class SubworkflowConfigfile(SubworkflowKeywordState):
pass
[docs]class Subworkflow(GlobalKeywordState):
subautomata = dict(
snakefile=SubworkflowSnakefile,
workdir=SubworkflowWorkdir,
configfile=SubworkflowConfigfile,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.has_snakefile = False
self.has_workdir = False
self.has_name = False
self.primary_token = None
[docs] def end(self):
if not (self.has_snakefile or self.has_workdir):
self.error(
"A subworkflow needs either a path to a Snakefile or to a workdir.",
self.primary_token,
)
yield ")"
[docs] def name(self, token):
if is_name(token):
yield "workflow.subworkflow({name!r}".format(name=token.string), token
self.has_name = True
elif is_colon(token) and self.has_name:
self.primary_token = token
self.state = self.block
else:
self.error("Expected name after subworkflow keyword.", token)
[docs] def block_content(self, token):
if is_name(token):
try:
if token.string == "snakefile":
self.has_snakefile = True
if token.string == "workdir":
self.has_workdir = True
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in "
"subworkflow definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
# ignore docstring
pass
else:
self.error(
"Expecting subworkflow keyword, comment or docstrings "
"inside a subworkflow definition.",
token,
)
[docs]class Localrules(GlobalKeywordState):
[docs] def block_content(self, token):
if is_comma(token):
yield ",", token
elif is_name(token):
yield repr(token.string), token
else:
self.error(
"Expected a comma separated list of rules that shall "
"not be executed by the cluster command.",
token,
)
# Rule keyword states
[docs]class Name(RuleKeywordState):
pass
[docs]class Output(RuleKeywordState):
pass
[docs]class Params(RuleKeywordState):
pass
[docs]class Threads(RuleKeywordState):
pass
[docs]class Shadow(RuleKeywordState):
pass
[docs]class Resources(RuleKeywordState):
pass
[docs]class Priority(RuleKeywordState):
pass
[docs]class Version(RuleKeywordState):
pass
[docs]class Log(RuleKeywordState):
pass
[docs]class Message(RuleKeywordState):
pass
[docs]class Benchmark(RuleKeywordState):
pass
[docs]class Conda(RuleKeywordState):
pass
[docs]class Singularity(RuleKeywordState):
@property
def keyword(self):
return "container"
[docs]class Container(RuleKeywordState):
pass
[docs]class Containerized(RuleKeywordState):
pass
[docs]class EnvModules(RuleKeywordState):
pass
[docs]class Group(RuleKeywordState):
pass
[docs]class Cache(RuleKeywordState):
@property
def keyword(self):
return "cache_rule"
[docs]class DefaultTarget(RuleKeywordState):
@property
def keyword(self):
return "default_target_rule"
[docs]class Handover(RuleKeywordState):
pass
[docs]class WildcardConstraints(RuleKeywordState):
@property
def keyword(self):
return "wildcard_constraints"
[docs]class Run(RuleKeywordState):
def __init__(self, snakefile, rulename, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.rulename = rulename
self.content = 0
[docs] def start(self):
yield "@workflow.run"
yield "\n"
yield (
"def __rule_{rulename}(input, output, params, wildcards, threads, "
"resources, log, version, rule, conda_env, container_img, "
"singularity_args, use_singularity, env_modules, bench_record, jobid, "
"is_shell, bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, "
"conda_base_path, basedir, runtime_sourcecache_path, {rule_func_marker}=True):".format(
rulename=self.rulename
if self.rulename is not None
else self.snakefile.rulecount,
rule_func_marker=common.RULEFUNC_CONTEXT_MARKER,
)
)
[docs] def end(self):
yield ""
[docs] def block_content(self, token):
self.content += 1
yield token.string, token
[docs] def is_block_end(self, token):
return (self.content and self.line and self.indent <= 0) or is_eof(token)
[docs]class AbstractCmd(Run):
overwrite_cmd = None
start_func = None
end_func = None
def __init__(self, snakefile, rulename, base_indent=0, dedent=0, root=True):
super().__init__(
snakefile, rulename, base_indent=base_indent, dedent=dedent, root=root
)
self.cmd = list()
self.token = None
if self.overwrite_cmd is not None:
self.block_content = self.overwrite_block_content
[docs] def is_block_end(self, token):
return (self.line and self.indent <= 0) or is_eof(token)
[docs] def start(self):
if self.start_func is not None:
yield self.start_func
yield "("
[docs] def args(self):
yield from []
[docs] def end(self):
# the end is detected. So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield ")"
yield "\n"
for t in super().start():
yield t
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield self.end_func
yield "("
yield "\n".join(self.cmd)
yield from self.args()
yield "\n"
yield ")"
for t in super().end():
yield t
[docs] def decorate_end(self, token):
if self.token is None:
# no block after shell keyword
self.error(
"Command must be given as string after the shell keyword.", token
)
for t in self.end():
yield t, self.token
[docs] def block_content(self, token):
self.token = token
self.cmd.append(token.string)
yield token.string, token
[docs] def overwrite_block_content(self, token):
if self.token is None:
self.token = token
cmd = repr(self.overwrite_cmd)
self.cmd.append(cmd)
yield cmd, token
[docs]class Shell(AbstractCmd):
start_func = "@workflow.shellcmd"
end_func = "shell"
[docs] def args(self):
yield ", bench_record=bench_record, bench_iteration=bench_iteration"
[docs]class Script(AbstractCmd):
start_func = "@workflow.script"
end_func = "script"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)
[docs]class Notebook(Script):
start_func = "@workflow.notebook"
end_func = "notebook"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, "
"edit_notebook, runtime_sourcecache_path"
)
[docs]class Wrapper(Script):
start_func = "@workflow.wrapper"
end_func = "wrapper"
[docs] def args(self):
yield (
", input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, workflow.wrapper_prefix, jobid, bench_iteration, "
"cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)
[docs]class CWL(Script):
start_func = "@workflow.cwl"
end_func = "cwl"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, use_singularity, bench_record, jobid, runtime_sourcecache_path"
)
rule_property_subautomata = dict(
name=Name,
input=Input,
output=Output,
params=Params,
threads=Threads,
resources=Resources,
priority=Priority,
version=Version,
log=Log,
message=Message,
benchmark=Benchmark,
conda=Conda,
singularity=Singularity,
container=Container,
containerized=Containerized,
envmodules=EnvModules,
wildcard_constraints=WildcardConstraints,
shadow=Shadow,
group=Group,
cache=Cache,
handover=Handover,
default_target=DefaultTarget,
)
[docs]class Rule(GlobalKeywordState):
subautomata = dict(
run=Run,
shell=Shell,
script=Script,
notebook=Notebook,
wrapper=Wrapper,
cwl=CWL,
**rule_property_subautomata,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.lineno = None
self.rulename = None
self.run = False
self.snakefile.rulecount += 1
[docs] def start(self, aux=""):
yield (
"@workflow.rule(name={rulename!r}, lineno={lineno}, "
"snakefile={snakefile!r}{aux})".format(
rulename=self.rulename,
lineno=self.lineno,
snakefile=self.snakefile.path,
aux=aux,
)
)
[docs] def end(self):
if not self.run:
yield "@workflow.norun()"
yield "\n"
for t in self.subautomaton("run", rulename=self.rulename).start():
yield t
# the end is detected.
# So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield "pass"
[docs] def name(self, token):
if is_name(token):
self.rulename = token.string
elif is_colon(token):
self.lineno = self.snakefile.lines + 1
self.state = self.block
for t in self.start():
yield t, token
else:
self.error(
"Expected name or colon after " "rule or checkpoint keyword.", token
)
[docs] def block_content(self, token):
if is_name(token):
try:
if (
token.string == "run"
or token.string == "shell"
or token.string == "script"
or token.string == "wrapper"
or token.string == "cwl"
):
if self.run:
raise self.error(
"Multiple run or shell keywords in rule {}.".format(
self.rulename
),
token,
)
self.run = True
elif self.run:
raise self.error(
"No rule keywords allowed after "
"run/shell/script/wrapper/cwl in "
"rule {}.".format(self.rulename),
token,
)
for t in self.subautomaton(
token.string, rulename=self.rulename
).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in rule definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
yield "\n", token
yield "@workflow.docstring({})".format(token.string), token
else:
self.error(
"Expecting rule keyword, comment or docstrings "
"inside a rule definition.",
token,
)
@property
def dedent(self):
return self.indent
[docs]class Checkpoint(Rule):
[docs] def start(self):
yield from super().start(aux=", checkpoint=True")
[docs]class OnSuccess(DecoratorKeywordState):
decorator = "onsuccess"
args = ["log"]
[docs]class OnError(DecoratorKeywordState):
decorator = "onerror"
args = ["log"]
[docs]class OnStart(DecoratorKeywordState):
decorator = "onstart"
args = ["log"]
# modules
[docs]class ModuleKeywordState(SectionKeywordState):
prefix = "Module"
[docs]class ModuleSnakefile(ModuleKeywordState):
pass
[docs]class ModulePrefix(ModuleKeywordState):
pass
[docs]class ModuleConfig(ModuleKeywordState):
pass
[docs]class ModuleSkipValidation(ModuleKeywordState):
@property
def keyword(self):
return "skip_validation"
[docs]class ModuleReplacePrefix(ModuleKeywordState):
@property
def keyword(self):
return "replace_prefix"
[docs]class Module(GlobalKeywordState):
subautomata = dict(
snakefile=ModuleSnakefile,
meta_wrapper=ModuleMetaWrapper,
config=ModuleConfig,
skip_validation=ModuleSkipValidation,
replace_prefix=ModuleReplacePrefix,
prefix=ModulePrefix,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.has_snakefile = False
self.has_meta_wrapper = False
self.has_name = False
self.primary_token = None
[docs] def end(self):
if not (self.has_snakefile or self.has_meta_wrapper):
self.error(
"A module needs either a path to a Snakefile or a meta wrapper URL.",
self.primary_token,
)
yield ")"
[docs] def name(self, token):
if is_name(token):
yield "workflow.module({name!r}".format(name=token.string), token
self.has_name = True
elif is_colon(token) and self.has_name:
self.primary_token = token
self.state = self.block
else:
self.error("Expected name after module keyword.", token)
[docs] def block_content(self, token):
if is_name(token):
try:
if token.string == "snakefile":
self.has_snakefile = True
if token.string == "meta_wrapper":
self.has_meta_wrapper = True
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in "
"module definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
# ignore docstring
pass
else:
self.error(
"Expecting module keyword, comment or docstrings "
"inside a module definition.",
token,
)
[docs]class UseRule(GlobalKeywordState):
subautomata = rule_property_subautomata
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.state_keyword_rule
self.rules = []
self.has_with = False
self.name_modifier = []
self.from_module = None
self._with_block = []
self.lineno = self.snakefile.lines + 1
[docs] def end(self):
name_modifier = "".join(self.name_modifier) if self.name_modifier else None
yield "@workflow.userule(rules={!r}, from_module={!r}, name_modifier={!r}, lineno={})".format(
self.rules, self.from_module, name_modifier, self.lineno
)
yield "\n"
# yield with block
yield from self._with_block
yield "@workflow.run"
yield "\n"
rulename = self.rules[0]
if rulename == "*":
rulename = "__allrules__"
yield "def __userule_{}_{}():".format(self.from_module, rulename)
# the end is detected.
# So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield "pass"
[docs] def state_keyword_rule(self, token):
if is_name(token) and token.string == "rule":
self.state = self.state_rules_rule
yield from ()
else:
self.error("Expecting keyword 'rule' after keyword 'use'", token)
[docs] def state_rules_rule(self, token):
if is_name(token):
if token.string == "from" or token.string == "as" and not self.rules:
self.error("Expecting rule names after 'use rule' statement.", token)
self.rules.append(token.string)
self.state = self.state_rules_comma_or_end
yield from ()
elif is_op(token):
if token.string == "*":
self.rules.append(token.string)
self.state = self.state_rules_end
yield from ()
else:
self.error(
"Expecting rule name or '*' after 'use rule' statement.", token
)
else:
self.error(
"Expecting rule listing (comma separated) after 'use rule' statement.",
token,
)
# TODO newline and parentheses handling
[docs] def state_rules_end(self, token):
if is_name(token) and token.string == "from":
self.state = self.state_from
yield from ()
else:
self.error(
"Expecting list of rules in 'use rule' statement to end with keyword 'from'.",
token,
)
[docs] def state_rules_comma_or_end(self, token):
if is_name(token):
if token.string == "from" or token.string == "as":
if not self.rules:
self.error(
"Expecting rule names after 'use rule' statement.", token
)
if token.string == "from":
self.state = self.state_from
else:
self.state = self.state_as
yield from ()
else:
self.error(
"Expecting list of rules in 'use rule' statement to end with keyword 'from'.",
token,
)
elif is_comma(token):
self.state = self.state_rules_rule
yield from ()
else:
self.error(
"Unexpected token in list of rules within 'use rule' statement.", token
)
[docs] def state_from(self, token):
if is_name(token):
self.state = self.state_modifier
self.from_module = token.string
yield from ()
else:
self.error(
"Expecting module name after 'from' keyword in 'use rule' statement.",
token,
)
[docs] def state_modifier(self, token):
if is_name(token):
if token.string == "as" and not self.name_modifier:
self.state = self.state_as
yield from ()
elif token.string == "with":
yield from self.handle_with(token)
else:
self.error(
"Expecting at most one 'as' or 'with' statement, or the end of the line.",
token,
)
elif is_newline(token) or is_comment(token) or is_eof(token):
# end of the statement, close block manually
yield from self.block(token)
else:
self.error(
"Expecting either 'as', 'with' or end of line in 'use rule' statement.",
token,
)
[docs] def handle_with(self, token):
if "*" in self.rules:
self.error(
"Keyword 'with' in 'use rule' statement is not allowed in combination with rule pattern '*'.",
token,
)
self.has_with = True
self.state = self.state_with
yield from ()
[docs] def state_as(self, token):
if is_name(token):
if token.string != "with":
self.name_modifier.append(token.string)
yield from ()
else:
yield from self.handle_with(token)
elif is_op(token) and token.string == "*":
self.name_modifier.append(token.string)
yield from ()
elif is_newline(token) or is_comment(token) or is_eof(token):
# end of the statement, close block manually
yield from self.block(token)
else:
self.error(
"Expecting rulename modifying pattern (e.g. modulename_*) after 'as' keyword.",
token,
)
[docs] def state_with(self, token):
if is_colon(token):
self.state = self.block
yield from ()
else:
self.error(
"Expecting colon after 'with' keyword in 'use rule' statement.", token
)
[docs] def block_content(self, token):
if is_comment(token):
yield "\n", token
yield token.string, token
elif is_name(token):
try:
self._with_block.extend(self.subautomaton(token.string).consume())
yield from ()
except KeyError:
self.error(
"Unexpected keyword {} in rule definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
self.block(e.token)
else:
self.error(
"Expecting a keyword or comment "
"inside a 'use rule ... with:' statement.",
token,
)
@property
def dedent(self):
return self.indent
[docs]class Python(TokenAutomaton):
subautomata = dict(
envvars=Envvars,
include=Include,
workdir=Workdir,
configfile=Configfile,
pepfile=Pepfile,
pepschema=Pepschema,
report=Report,
ruleorder=Ruleorder,
rule=Rule,
checkpoint=Checkpoint,
subworkflow=Subworkflow,
localrules=Localrules,
onsuccess=OnSuccess,
onerror=OnError,
onstart=OnStart,
wildcard_constraints=GlobalWildcardConstraints,
singularity=GlobalSingularity,
container=GlobalContainer,
containerized=GlobalContainerized,
scattergather=Scattergather,
module=Module,
use=UseRule,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.python
[docs] def python(self, token):
if not (is_indent(token) or is_dedent(token)):
if self.lasttoken is None or self.lasttoken.isspace():
try:
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
yield token.string, token
except StopAutomaton as e:
self.indentation(e.token)
for t in self.python(e.token):
yield t
else:
yield token.string, token
[docs]class Snakefile:
def __init__(self, path, workflow, rulecount=0):
self.path = path.get_path_or_uri()
self.file = workflow.sourcecache.open(path)
self.tokens = tokenize.generate_tokens(self.file.readline)
self.rulecount = rulecount
self.lines = 0
def __next__(self):
return next(self.tokens)
def __iter__(self):
return self
def __enter__(self):
return self
def __exit__(self, *args):
self.file.close()
[docs]def parse(path, workflow, overwrite_shellcmd=None, rulecount=0):
Shell.overwrite_cmd = overwrite_shellcmd
with Snakefile(path, workflow, rulecount=rulecount) as snakefile:
automaton = Python(snakefile)
linemap = dict()
compilation = list()
for t, orig_token in automaton.consume():
l = lineno(orig_token)
linemap.update(
dict(
(i, l)
for i in range(
snakefile.lines + 1, snakefile.lines + t.count("\n") + 1
)
)
)
snakefile.lines += t.count("\n")
compilation.append(t)
compilation = "".join(format_tokens(compilation))
if linemap:
last = max(linemap)
linemap[last + 1] = linemap[last]
return compilation, linemap, snakefile.rulecount