__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
from tempfile import TemporaryFile
import tokenize
import textwrap
import os
from urllib.error import HTTPError, URLError, ContentTooShortError
import urllib.request
from io import TextIOWrapper
from snakemake.exceptions import WorkflowError
from snakemake import common
dd = textwrap.dedent
INDENT = "\t"
[docs]def is_newline(token, newline_tokens=set((tokenize.NEWLINE, tokenize.NL))):
return token.type in newline_tokens
[docs]def is_indent(token):
return token.type == tokenize.INDENT
[docs]def is_dedent(token):
return token.type == tokenize.DEDENT
[docs]def is_op(token):
return token.type == tokenize.OP
[docs]def is_greater(token):
return is_op(token) and token.string == ">"
[docs]def is_comma(token):
return is_op(token) and token.string == ","
[docs]def is_name(token):
return token.type == tokenize.NAME
[docs]def is_colon(token):
return is_op(token) and token.string == ":"
[docs]def is_string(token):
return token.type == tokenize.STRING
[docs]def is_eof(token):
return token.type == tokenize.ENDMARKER
[docs]def lineno(token):
return token.start[0]
[docs]class StopAutomaton(Exception):
def __init__(self, token):
self.token = token
[docs]class TokenAutomaton:
subautomata = dict()
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
self.root = root
self.snakefile = snakefile
self.state = None
self.base_indent = base_indent
self.line = 0
self.indent = 0
self.was_indented = False
self.lasttoken = None
self._dedent = dedent
@property
def dedent(self):
return self._dedent
@property
def effective_indent(self):
return self.base_indent + self.indent - self.dedent
[docs] def indentation(self, token):
if is_indent(token) or is_dedent(token):
self.indent = token.end[1] - self.base_indent
self.was_indented |= self.indent > 0
[docs] def consume(self):
for token in self.snakefile:
self.indentation(token)
try:
for t, orig in self.state(token):
if self.lasttoken == "\n" and not t.isspace():
yield INDENT * self.effective_indent, orig
yield t, orig
self.lasttoken = t
except tokenize.TokenError as e:
self.error(
str(e).split(",")[0].strip("()''"), token
) # TODO the inferred line number seems to be wrong sometimes
[docs] def error(self, msg, token):
raise SyntaxError(msg, (self.snakefile.path, lineno(token), None, None))
[docs] def subautomaton(self, automaton, *args, **kwargs):
return self.subautomata[automaton](
self.snakefile,
*args,
base_indent=self.base_indent + self.indent,
dedent=self.dedent,
root=False,
**kwargs,
)
[docs]class KeywordState(TokenAutomaton):
prefix = ""
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.line = 0
self.state = self.colon
@property
def keyword(self):
return self.__class__.__name__.lower()[len(self.prefix) :]
[docs] def end(self):
yield ")"
[docs] def decorate_end(self, token):
for t in self.end():
if isinstance(t, tuple):
yield t
else:
yield t, token
[docs] def colon(self, token):
if is_colon(token):
self.state = self.block
for t in self.start():
yield t, token
else:
self.error("Colon expected after keyword {}.".format(self.keyword), token)
[docs] def is_block_end(self, token):
return (self.line and self.indent <= 0) or is_eof(token)
[docs] def block(self, token):
if self.lasttoken == "\n" and is_comment(token):
# ignore lines containing only comments
self.line -= 1
if self.is_block_end(token):
yield from self.decorate_end(token)
yield "\n", token
raise StopAutomaton(token)
if is_newline(token):
self.line += 1
yield token.string, token
elif not (is_indent(token) or is_dedent(token)):
if is_comment(token):
yield token.string, token
else:
yield from self.block_content(token)
[docs] def yield_indent(self, token):
return token.string, token
[docs] def block_content(self, token):
yield token.string, token
[docs]class GlobalKeywordState(KeywordState):
[docs] def start(self):
yield "workflow.{keyword}(".format(keyword=self.keyword)
[docs]class DecoratorKeywordState(KeywordState):
decorator = None
args = list()
[docs] def start(self):
yield "@workflow.{}".format(self.decorator)
yield "\n"
yield "def __{}({}):".format(self.decorator, ", ".join(self.args))
[docs] def end(self):
yield ""
[docs]class RuleKeywordState(KeywordState):
def __init__(self, snakefile, base_indent=0, dedent=0, root=True, rulename=None):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.rulename = rulename
[docs] def start(self):
yield "\n"
yield "@workflow.{keyword}(".format(keyword=self.keyword)
[docs]class SectionKeywordState(KeywordState):
[docs] def start(self):
yield ", {keyword}=".format(keyword=self.keyword)
[docs] def end(self):
# no end needed
return list()
# Global keyword states
[docs]class Envvars(GlobalKeywordState):
@property
def keyword(self):
return "register_envvars"
[docs]class Include(GlobalKeywordState):
pass
[docs]class Workdir(GlobalKeywordState):
pass
[docs]class Configfile(GlobalKeywordState):
pass
# PEPs
[docs]class Pepfile(GlobalKeywordState):
@property
def keyword(self):
return "set_pepfile"
[docs]class Pepschema(GlobalKeywordState):
pass
[docs]class Report(GlobalKeywordState):
pass
[docs]class Scattergather(GlobalKeywordState):
pass
[docs]class Ruleorder(GlobalKeywordState):
[docs] def block_content(self, token):
if is_greater(token):
yield ",", token
elif is_name(token):
yield repr(token.string), token
else:
self.error(
"Expected a descending order of rule names, "
"e.g. rule1 > rule2 > rule3 ...",
token,
)
[docs]class GlobalWildcardConstraints(GlobalKeywordState):
@property
def keyword(self):
return "global_wildcard_constraints"
[docs]class GlobalSingularity(GlobalKeywordState):
@property
def keyword(self):
return "global_container"
[docs]class GlobalContainer(GlobalKeywordState):
@property
def keyword(self):
return "global_container"
[docs]class GlobalContainerized(GlobalKeywordState):
@property
def keyword(self):
return "global_containerized"
# subworkflows
[docs]class SubworkflowKeywordState(SectionKeywordState):
prefix = "Subworkflow"
[docs]class SubworkflowSnakefile(SubworkflowKeywordState):
pass
[docs]class SubworkflowWorkdir(SubworkflowKeywordState):
pass
[docs]class SubworkflowConfigfile(SubworkflowKeywordState):
pass
[docs]class Subworkflow(GlobalKeywordState):
subautomata = dict(
snakefile=SubworkflowSnakefile,
workdir=SubworkflowWorkdir,
configfile=SubworkflowConfigfile,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.has_snakefile = False
self.has_workdir = False
self.has_name = False
self.primary_token = None
[docs] def end(self):
if not (self.has_snakefile or self.has_workdir):
self.error(
"A subworkflow needs either a path to a Snakefile or to a workdir.",
self.primary_token,
)
yield ")"
[docs] def name(self, token):
if is_name(token):
yield "workflow.subworkflow({name!r}".format(name=token.string), token
self.has_name = True
elif is_colon(token) and self.has_name:
self.primary_token = token
self.state = self.block
else:
self.error("Expected name after subworkflow keyword.", token)
[docs] def block_content(self, token):
if is_name(token):
try:
if token.string == "snakefile":
self.has_snakefile = True
if token.string == "workdir":
self.has_workdir = True
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in "
"subworkflow definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
# ignore docstring
pass
else:
self.error(
"Expecting subworkflow keyword, comment or docstrings "
"inside a subworkflow definition.",
token,
)
[docs]class Localrules(GlobalKeywordState):
[docs] def block_content(self, token):
if is_comma(token):
yield ",", token
elif is_name(token):
yield repr(token.string), token
else:
self.error(
"Expected a comma separated list of rules that shall "
"not be executed by the cluster command.",
token,
)
# Rule keyword states
[docs]class Name(RuleKeywordState):
pass
[docs]class Output(RuleKeywordState):
pass
[docs]class Params(RuleKeywordState):
pass
[docs]class Threads(RuleKeywordState):
pass
[docs]class Shadow(RuleKeywordState):
pass
[docs]class Resources(RuleKeywordState):
pass
[docs]class Priority(RuleKeywordState):
pass
[docs]class Version(RuleKeywordState):
pass
[docs]class Log(RuleKeywordState):
pass
[docs]class Message(RuleKeywordState):
pass
[docs]class Benchmark(RuleKeywordState):
pass
[docs]class Conda(RuleKeywordState):
pass
[docs]class Singularity(RuleKeywordState):
@property
def keyword(self):
return "container"
[docs]class Container(RuleKeywordState):
pass
[docs]class Containerized(RuleKeywordState):
pass
[docs]class EnvModules(RuleKeywordState):
pass
[docs]class Group(RuleKeywordState):
pass
[docs]class Cache(RuleKeywordState):
@property
def keyword(self):
return "cache_rule"
[docs]class DefaultTarget(RuleKeywordState):
@property
def keyword(self):
return "default_target_rule"
[docs]class Handover(RuleKeywordState):
pass
[docs]class WildcardConstraints(RuleKeywordState):
@property
def keyword(self):
return "wildcard_constraints"
[docs]class Run(RuleKeywordState):
def __init__(self, snakefile, rulename, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.rulename = rulename
self.content = 0
[docs] def start(self):
yield "@workflow.run"
yield "\n"
yield (
"def __rule_{rulename}(input, output, params, wildcards, threads, "
"resources, log, version, rule, conda_env, container_img, "
"singularity_args, use_singularity, env_modules, bench_record, jobid, "
"is_shell, bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, "
"conda_base_path, basedir, runtime_sourcecache_path, {rule_func_marker}=True):".format(
rulename=self.rulename
if self.rulename is not None
else self.snakefile.rulecount,
rule_func_marker=common.RULEFUNC_CONTEXT_MARKER,
)
)
[docs] def end(self):
yield ""
[docs] def block_content(self, token):
self.content += 1
yield token.string, token
[docs] def is_block_end(self, token):
return (self.content and self.line and self.indent <= 0) or is_eof(token)
[docs]class AbstractCmd(Run):
overwrite_cmd = None
start_func = None
end_func = None
def __init__(self, snakefile, rulename, base_indent=0, dedent=0, root=True):
super().__init__(
snakefile, rulename, base_indent=base_indent, dedent=dedent, root=root
)
self.cmd = list()
self.token = None
if self.overwrite_cmd is not None:
self.block_content = self.overwrite_block_content
[docs] def is_block_end(self, token):
return (self.line and self.indent <= 0) or is_eof(token)
[docs] def start(self):
if self.start_func is not None:
yield self.start_func
yield "("
[docs] def args(self):
yield from []
[docs] def end(self):
# the end is detected. So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield ")"
yield "\n"
for t in super().start():
yield t
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield self.end_func
yield "("
yield "\n".join(self.cmd)
yield from self.args()
yield "\n"
yield ")"
for t in super().end():
yield t
[docs] def decorate_end(self, token):
if self.token is None:
# no block after shell keyword
self.error(
"Command must be given as string after the shell keyword.", token
)
for t in self.end():
yield t, self.token
[docs] def block_content(self, token):
self.token = token
self.cmd.append(token.string)
yield token.string, token
[docs] def overwrite_block_content(self, token):
if self.token is None:
self.token = token
cmd = repr(self.overwrite_cmd)
self.cmd.append(cmd)
yield cmd, token
[docs]class Shell(AbstractCmd):
start_func = "@workflow.shellcmd"
end_func = "shell"
[docs] def args(self):
yield ", bench_record=bench_record, bench_iteration=bench_iteration"
[docs]class Script(AbstractCmd):
start_func = "@workflow.script"
end_func = "script"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)
[docs]class Notebook(Script):
start_func = "@workflow.notebook"
end_func = "notebook"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, "
"edit_notebook, runtime_sourcecache_path"
)
[docs]class Wrapper(Script):
start_func = "@workflow.wrapper"
end_func = "wrapper"
[docs] def args(self):
yield (
", input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, workflow.wrapper_prefix, jobid, bench_iteration, "
"cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)
[docs]class TemplateEngine(Script):
start_func = "@workflow.template_engine"
end_func = "render_template"
[docs] def args(self):
yield (", input, output, params, wildcards, config")
[docs]class CWL(Script):
start_func = "@workflow.cwl"
end_func = "cwl"
[docs] def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, use_singularity, bench_record, jobid, runtime_sourcecache_path"
)
rule_property_subautomata = dict(
name=Name,
input=Input,
output=Output,
params=Params,
threads=Threads,
resources=Resources,
priority=Priority,
version=Version,
log=Log,
message=Message,
benchmark=Benchmark,
conda=Conda,
singularity=Singularity,
container=Container,
containerized=Containerized,
envmodules=EnvModules,
wildcard_constraints=WildcardConstraints,
shadow=Shadow,
group=Group,
cache=Cache,
handover=Handover,
default_target=DefaultTarget,
)
[docs]class Rule(GlobalKeywordState):
subautomata = dict(
run=Run,
shell=Shell,
script=Script,
notebook=Notebook,
wrapper=Wrapper,
template_engine=TemplateEngine,
cwl=CWL,
**rule_property_subautomata,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.lineno = None
self.rulename = None
self.run = False
self.snakefile.rulecount += 1
[docs] def start(self, aux=""):
yield (
"@workflow.rule(name={rulename!r}, lineno={lineno}, "
"snakefile={snakefile!r}{aux})".format(
rulename=self.rulename,
lineno=self.lineno,
snakefile=self.snakefile.path,
aux=aux,
)
)
[docs] def end(self):
if not self.run:
yield "@workflow.norun()"
yield "\n"
for t in self.subautomaton("run", rulename=self.rulename).start():
yield t
# the end is detected.
# So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield "pass"
[docs] def name(self, token):
if is_name(token):
self.rulename = token.string
elif is_colon(token):
self.lineno = self.snakefile.lines + 1
self.state = self.block
for t in self.start():
yield t, token
else:
self.error(
"Expected name or colon after " "rule or checkpoint keyword.", token
)
[docs] def block_content(self, token):
if is_name(token):
try:
if (
token.string == "run"
or token.string == "shell"
or token.string == "script"
or token.string == "wrapper"
or token.string == "notebook"
or token.string == "template_engine"
or token.string == "cwl"
):
if self.run:
raise self.error(
"Multiple run or shell keywords in rule {}.".format(
self.rulename
),
token,
)
self.run = True
elif self.run:
raise self.error(
"No rule keywords allowed after "
"run/shell/script/notebook/wrapper/template_engine/cwl in "
"rule {}.".format(self.rulename),
token,
)
for t in self.subautomaton(
token.string, rulename=self.rulename
).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in rule definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
yield "\n", token
yield "@workflow.docstring({})".format(token.string), token
else:
self.error(
"Expecting rule keyword, comment or docstrings "
"inside a rule definition.",
token,
)
@property
def dedent(self):
return self.indent
[docs]class Checkpoint(Rule):
[docs] def start(self):
yield from super().start(aux=", checkpoint=True")
[docs]class OnSuccess(DecoratorKeywordState):
decorator = "onsuccess"
args = ["log"]
[docs]class OnError(DecoratorKeywordState):
decorator = "onerror"
args = ["log"]
[docs]class OnStart(DecoratorKeywordState):
decorator = "onstart"
args = ["log"]
# modules
[docs]class ModuleKeywordState(SectionKeywordState):
prefix = "Module"
[docs]class ModuleSnakefile(ModuleKeywordState):
pass
[docs]class ModulePrefix(ModuleKeywordState):
pass
[docs]class ModuleConfig(ModuleKeywordState):
pass
[docs]class ModuleSkipValidation(ModuleKeywordState):
@property
def keyword(self):
return "skip_validation"
[docs]class ModuleReplacePrefix(ModuleKeywordState):
@property
def keyword(self):
return "replace_prefix"
[docs]class Module(GlobalKeywordState):
subautomata = dict(
snakefile=ModuleSnakefile,
meta_wrapper=ModuleMetaWrapper,
config=ModuleConfig,
skip_validation=ModuleSkipValidation,
replace_prefix=ModuleReplacePrefix,
prefix=ModulePrefix,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.name
self.has_snakefile = False
self.has_meta_wrapper = False
self.has_name = False
self.primary_token = None
[docs] def end(self):
if not (self.has_snakefile or self.has_meta_wrapper):
self.error(
"A module needs either a path to a Snakefile or a meta wrapper URL.",
self.primary_token,
)
yield ")"
[docs] def name(self, token):
if is_name(token):
yield "workflow.module({name!r}".format(name=token.string), token
self.has_name = True
elif is_colon(token) and self.has_name:
self.primary_token = token
self.state = self.block
else:
self.error("Expected name after module keyword.", token)
[docs] def block_content(self, token):
if is_name(token):
try:
if token.string == "snakefile":
self.has_snakefile = True
if token.string == "meta_wrapper":
self.has_meta_wrapper = True
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
self.error(
"Unexpected keyword {} in "
"module definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
for t in self.block(e.token):
yield t
elif is_comment(token):
yield "\n", token
yield token.string, token
elif is_string(token):
# ignore docstring
pass
else:
self.error(
"Expecting module keyword, comment or docstrings "
"inside a module definition.",
token,
)
[docs]class UseRule(GlobalKeywordState):
subautomata = rule_property_subautomata
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.state_keyword_rule
self.rules = []
self.has_with = False
self.name_modifier = []
self.from_module = None
self._with_block = []
self.lineno = self.snakefile.lines + 1
[docs] def end(self):
name_modifier = "".join(self.name_modifier) if self.name_modifier else None
yield "@workflow.userule(rules={!r}, from_module={!r}, name_modifier={!r}, lineno={})".format(
self.rules, self.from_module, name_modifier, self.lineno
)
yield "\n"
# yield with block
yield from self._with_block
yield "@workflow.run"
yield "\n"
rulename = self.rules[0]
if rulename == "*":
rulename = "__allrules__"
yield "def __userule_{}_{}():".format(self.from_module, rulename)
# the end is detected.
# So we can savely reset the indent to zero here
self.indent = 0
yield "\n"
yield INDENT * (self.effective_indent + 1)
yield "pass"
[docs] def state_keyword_rule(self, token):
if is_name(token) and token.string == "rule":
self.state = self.state_rules_rule
yield from ()
else:
self.error("Expecting keyword 'rule' after keyword 'use'", token)
[docs] def state_rules_rule(self, token):
if is_name(token):
if token.string == "from" or token.string == "as" and not self.rules:
self.error("Expecting rule names after 'use rule' statement.", token)
self.rules.append(token.string)
self.state = self.state_rules_comma_or_end
yield from ()
elif is_op(token):
if token.string == "*":
self.rules.append(token.string)
self.state = self.state_rules_end
yield from ()
else:
self.error(
"Expecting rule name or '*' after 'use rule' statement.", token
)
else:
self.error(
"Expecting rule listing (comma separated) after 'use rule' statement.",
token,
)
# TODO newline and parentheses handling
[docs] def state_rules_end(self, token):
if is_name(token) and token.string == "from":
self.state = self.state_from
yield from ()
else:
self.error(
"Expecting list of rules in 'use rule' statement to end with keyword 'from'.",
token,
)
[docs] def state_rules_comma_or_end(self, token):
if is_name(token):
if token.string == "from" or token.string == "as":
if not self.rules:
self.error(
"Expecting rule names after 'use rule' statement.", token
)
if token.string == "from":
self.state = self.state_from
else:
self.state = self.state_as
yield from ()
else:
self.error(
"Expecting list of rules in 'use rule' statement to end with keyword 'from'.",
token,
)
elif is_comma(token):
self.state = self.state_rules_rule
yield from ()
else:
self.error(
"Unexpected token in list of rules within 'use rule' statement.", token
)
[docs] def state_from(self, token):
if is_name(token):
self.state = self.state_modifier
self.from_module = token.string
yield from ()
else:
self.error(
"Expecting module name after 'from' keyword in 'use rule' statement.",
token,
)
[docs] def state_modifier(self, token):
if is_name(token):
if token.string == "as" and not self.name_modifier:
self.state = self.state_as
yield from ()
elif token.string == "with":
yield from self.handle_with(token)
else:
self.error(
"Expecting at most one 'as' or 'with' statement, or the end of the line.",
token,
)
elif is_newline(token) or is_comment(token) or is_eof(token):
# end of the statement, close block manually
yield from self.block(token)
else:
self.error(
"Expecting either 'as', 'with' or end of line in 'use rule' statement.",
token,
)
[docs] def handle_with(self, token):
if "*" in self.rules:
self.error(
"Keyword 'with' in 'use rule' statement is not allowed in combination with rule pattern '*'.",
token,
)
self.has_with = True
self.state = self.state_with
yield from ()
[docs] def state_as(self, token):
if is_name(token):
if token.string != "with":
self.name_modifier.append(token.string)
yield from ()
else:
yield from self.handle_with(token)
elif is_op(token) and token.string == "*":
self.name_modifier.append(token.string)
yield from ()
elif is_newline(token) or is_comment(token) or is_eof(token):
# end of the statement, close block manually
yield from self.block(token)
else:
self.error(
"Expecting rulename modifying pattern (e.g. modulename_*) after 'as' keyword.",
token,
)
[docs] def state_with(self, token):
if is_colon(token):
self.state = self.block
yield from ()
else:
self.error(
"Expecting colon after 'with' keyword in 'use rule' statement.", token
)
[docs] def block_content(self, token):
if is_comment(token):
yield "\n", token
yield token.string, token
elif is_name(token):
try:
self._with_block.extend(self.subautomaton(token.string).consume())
yield from ()
except KeyError:
self.error(
"Unexpected keyword {} in rule definition".format(token.string),
token,
)
except StopAutomaton as e:
self.indentation(e.token)
self.block(e.token)
else:
self.error(
"Expecting a keyword or comment "
"inside a 'use rule ... with:' statement.",
token,
)
@property
def dedent(self):
return self.indent
[docs]class Python(TokenAutomaton):
subautomata = dict(
envvars=Envvars,
include=Include,
workdir=Workdir,
configfile=Configfile,
pepfile=Pepfile,
pepschema=Pepschema,
report=Report,
ruleorder=Ruleorder,
rule=Rule,
checkpoint=Checkpoint,
subworkflow=Subworkflow,
localrules=Localrules,
onsuccess=OnSuccess,
onerror=OnError,
onstart=OnStart,
wildcard_constraints=GlobalWildcardConstraints,
singularity=GlobalSingularity,
container=GlobalContainer,
containerized=GlobalContainerized,
scattergather=Scattergather,
module=Module,
use=UseRule,
)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
self.state = self.python
[docs] def python(self, token):
if not (is_indent(token) or is_dedent(token)):
if self.lasttoken is None or self.lasttoken.isspace():
try:
for t in self.subautomaton(token.string).consume():
yield t
except KeyError:
yield token.string, token
except StopAutomaton as e:
self.indentation(e.token)
for t in self.python(e.token):
yield t
else:
yield token.string, token
[docs]class Snakefile:
def __init__(self, path, workflow, rulecount=0):
self.path = path.get_path_or_uri()
self.file = workflow.sourcecache.open(path)
self.tokens = tokenize.generate_tokens(self.file.readline)
self.rulecount = rulecount
self.lines = 0
def __next__(self):
return next(self.tokens)
def __iter__(self):
return self
def __enter__(self):
return self
def __exit__(self, *args):
self.file.close()
[docs]def parse(path, workflow, overwrite_shellcmd=None, rulecount=0):
Shell.overwrite_cmd = overwrite_shellcmd
with Snakefile(path, workflow, rulecount=rulecount) as snakefile:
automaton = Python(snakefile)
linemap = dict()
compilation = list()
for t, orig_token in automaton.consume():
l = lineno(orig_token)
linemap.update(
dict(
(i, l)
for i in range(
snakefile.lines + 1, snakefile.lines + t.count("\n") + 1
)
)
)
snakefile.lines += t.count("\n")
compilation.append(t)
compilation = "".join(format_tokens(compilation))
if linemap:
last = max(linemap)
linemap[last + 1] = linemap[last]
return compilation, linemap, snakefile.rulecount