"""Generic text parsers for conversion of lattices in different formats to AT"""
from __future__ import annotations
__all__ = [
"AnyDescr",
"ElementDescr",
"SequenceDescr",
"BaseParser",
"UnorderedParser",
"DictNoDot",
]
from os import getcwd
from os.path import join, normpath, dirname
import re
import abc
from itertools import repeat
from collections.abc import Callable, Iterable, Generator, Mapping
from .utils import split_ignoring_parentheses, protect, restore
from ..lattice import Element, Lattice, params_filter
_dot = re.compile(r"\.?[a-z][\w.]*") # An identifier: starts with a letter
_singlequoted = re.compile(r"'([\w.]*)'")
_named = re.compile(r"name=([\w.]*)")
_doublequoted = re.compile(r'^"(.*)"$')
def _default_arg_parser(parser: BaseParser, argstr: str):
"""
Evaluate a keyword argument of a command and return the pair (key, value)
"""
try:
key, value = split_ignoring_parentheses(argstr, delimiter="=")
except ValueError: # Positional argument -> boolean flag
argstr = argstr.lower()
if argstr.startswith("-"):
v = False
key = argstr[1:]
else:
key = argstr
v = True
else: # Keyword argument
key = key.lower().replace("from", "frm")
# noinspection PyProtectedMember
if key in parser._str_arguments:
v = _doublequoted.sub(r"\1", value)
else:
v = parser.evaluate(value)
return key, v
[docs]
class DictNoDot(dict):
@staticmethod
def _no_dot(expr):
def repl(match):
return match.group().replace(".", "_")
return _dot.sub(repl, expr.lower())
def __setitem__(self, key, value):
super().__setitem__(self._no_dot(key), value)
def __getitem__(self, key):
return super().__getitem__(self._no_dot(key))
[docs]
def get(self, key, *args):
return super().get(self._no_dot(key), *args)
[docs]
class AnyDescr(abc.ABC):
"""Base class for source object descriptors"""
def __init__(self, *args, **kwargs):
self.name = kwargs.pop("name", self.__class__.__name__)
self.inverse = kwargs.pop("inverse", False)
# kwargs.setdefault("madclass", self.__class__.__name__)
super().__init__(*args, **kwargs)
def __neg__(self):
return self.inverted(copy=True)
def __call__(self, *args, copy: bool = True, **kwargs) -> AnyDescr | None:
"""Create a copy of the element with updated fields"""
if copy:
b = {key: kwargs.pop(key, value) for key, value in vars(self).items()}
b.update(kwargs)
# b.update(madclass=self.name)
return type(self)(self, *args, **b)
else:
self.update(*args, **kwargs)
return None
[docs]
def update(self, *args, **kwargs):
# Update attributes
for key, value in vars(self).items():
setattr(self, key, kwargs.pop(key, value))
if isinstance(self, Mapping):
# Update mapping
super().update(*args, **kwargs)
else:
# Add new attributes
for key, value in kwargs:
setattr(self, key, value)
[docs]
def inverted(self, copy=False):
"""Return a reversed element or line"""
instance = self() if copy else self
instance.inverse = not self.inverse
return instance
[docs]
@abc.abstractmethod
def expand(self, parser: BaseParser) -> Generator[Element, None, None]:
"""Iterator on the possibly inverted sequence of elements"""
pass
[docs]
class ElementDescr(AnyDescr, dict):
"""Simple representation of an element as a dict"""
def __init__(self, *args, **kwargs):
kwargs.pop("copy", False)
kwargs.setdefault("madtype", self.__class__.__name__)
super().__init__(*args, **kwargs)
def __getattr__(self, item):
# Allows accessing items using the attribute access syntax
return self[item]
def __rmul__(self, other):
"""Element repetition"""
return list(repeat(self, other))
def __repr__(self):
descr = super().copy()
cls = descr.pop("madtype")
keywords = [f"name={self.name}"]
keywords += [f"{k}={v!r}" for k, v in descr.items()]
return f"{cls}({', '.join(keywords)})"
[docs]
@staticmethod
def convert(name: str, *args, **params) -> list[Element]:
"""Generate the AT element, Most be overloaded for each specific element"""
return []
# noinspection PyUnusedLocal
[docs]
def expand(self, parser: BaseParser) -> Generator[Element, None, None]:
"""Iterator on the possibly inverted sequence of elements"""
try:
elems = self.convert(self.name, **self)
except Exception as exc:
exc.args += (f"{self}",)
raise
if self.inverse:
for elem in reversed(elems):
yield elem.swap_faces(copy=True)
else:
yield from elems
@property
def length(self) -> float:
"""Element length"""
return self.get("l", 0.0)
[docs]
class SequenceDescr(AnyDescr, list, abc.ABC):
"""Simple representation of a sequence of elements as a list"""
def __repr__(self):
str = super().__repr__()
return f"{self.__class__.__name__}({str})"
@property
def length(self) -> float:
return getattr(self, "l", 0.0)
[docs]
class BaseParser(DictNoDot):
"""Generic file parser
Analyses files with the following MAD-like format:
``variable = value``
``label : command [,attribute=value] [,attribute=value]...``
The parser builds a database of all the defined objects
"""
_str_arguments = set()
_argument_parser = {}
def __init__(
self,
env: dict,
*args,
delimiter: str | None = None,
continuation: str = "\\",
linecomment: str | tuple[str] | None = "#",
blockcomment: tuple[str, str] | None = None,
endfile: str | None = None,
verbose: bool = False,
**kwargs,
):
"""
Args:
env: global namespace used for evaluating commands
delimiter: command delimiter
continuation: command continuation character
linecomment: Line comment character
blockcomment: Block comment delimiter
endfile: "End of input" marker
verbose: If True, print detail on the processing
*args: dict initializer
**kwargs: dict initializer
"""
if isinstance(linecomment, tuple):
def line_comment(line):
for linecom in linecomment:
line, *_ = line.split(sep=linecom, maxsplit=1)
return line
else:
if linecomment is None:
def line_comment(line):
return line
else:
def line_comment(line):
line, *_ = line.split(sep=linecomment, maxsplit=1)
return line
if blockcomment is None:
# noinspection PyUnusedLocal
def handle_comments(buffer, line, in_comment):
buffer.append(line_comment(line))
return False, ""
else:
def handle_comments(buffer, line, in_comment):
if in_comment:
*rest, line = line.split(sep=endcomment, maxsplit=1)
in_comment = len(rest) <= 0
return in_comment, "" if in_comment > 0 else line
else:
line = line_comment(line)
contents, *rest = line.split(sep=begcomment, maxsplit=1)
buffer.append(contents)
in_comment = len(rest) > 0
return in_comment, rest[0] if in_comment else ""
begcomment, endcomment = blockcomment
self.skip_comments = handle_comments
self.delimiter = delimiter
self.continuation = continuation
self.endfile = endfile
self._verbose = verbose
self.env = env
self.bases = [getcwd()]
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def _print(self, *args, **kwargs):
if self._verbose:
print(*args, **kwargs)
[docs]
def clear(self):
"""Clean the database"""
super().clear()
self.update(self.kwargs)
[docs]
def evaluate(self, expr):
"""Evaluate an expression using *self* as local namespace"""
return eval(expr, self.env, self)
def _eval_cmd(self, cmdname: str, no_global: bool = False) -> Callable:
"""Evaluate a command"""
cmd: Callable | None = self.get(cmdname, None)
if cmd is not None:
return cmd
else:
cmdname = cmdname.lower()
cmd = self.env.get(cmdname, None)
if cmd is None:
raise KeyError(cmdname)
elif no_global:
raise TypeError(f"{cmdname!r} is not allowed in this context")
else:
return cmd
@staticmethod
def _reason(exc: Exception) -> str:
"""Extract the element name from the exception"""
if isinstance(exc, KeyError): # Undefined element, attribute
return exc.args[0]
elif isinstance(exc, NameError): # refpos missing
return _singlequoted.search(exc.args[0])[1]
elif isinstance(exc, TypeError):
idx = _named.search(exc.args[-1]) # Missing pos. arg.
if idx is None:
idx = _singlequoted.search(exc.args[0]) # Not allowed in seq.
if idx is None:
return "TypeError"
else:
return idx[1]
elif isinstance(exc, ValueError): # overlap
print(exc.args[0])
return _singlequoted.search(exc.args[0])[1]
else:
return type(exc).__name__
def _argparser(self, command: str, *args: str):
argparser = self._argument_parser.get(command.lower(), _default_arg_parser)
return (argparser(self, arg) for arg in args)
def _assign(self, label: str, key: str, value: str):
"""Variable assignment"""
return key, self.evaluate(value)
def _raw_command(
self,
label: str | None,
cmdname: str,
*argnames: str,
no_global: bool = False,
**kwargs,
):
"""Command execution"""
func = self._eval_cmd(cmdname, no_global=no_global)
kwargs.update(self._argparser(cmdname, *argnames))
if label is None:
kwargs.setdefault("copy", False)
else:
kwargs.setdefault("name", label)
return func(**kwargs)
def _command(self, *args, **kwargs):
return self._raw_command(*args, **kwargs)
def _format_statement(self, line: str) -> str:
"""Reformat the input line
Overload this method for specific languages"""
line, matches = protect(line, fence=('"', '"')) # protect the quoted parts
line = "".join(line.split()) # Remove all spaces
(line,) = restore(matches, line)
return line
def _statement(self, line: str) -> bool:
"""Split a statement in 'label: command'"""
fmtline = self._format_statement(line)
if self.endfile is not None and fmtline.startswith(self.endfile):
return False
label, *cmd = fmtline.split(":", maxsplit=1)
if cmd: # label
fmtline = cmd[0]
else: # no label
label = None
arguments = split_ignoring_parentheses(fmtline)
self._decode(label, *arguments)
return True
def _decode(self, label: str, cmdname: str, *argnames: str) -> None:
"""Execute the split statement"""
left, *right = cmdname.split("=")
if right:
label, result = self._assign(label, left, right[0])
else:
result = self._command(label, cmdname, *argnames)
if not (label is None or result is None):
self[label] = result
def _finalise(self, final: bool = True) -> None:
"""Called at the end of processing"""
pass
@staticmethod
def _command_str(_, label, cmdname, *argnames):
string = ", ".join((cmdname, *argnames))
if label is not None:
string = " : ".join((label, string))
return string
def _analyse(self, key: str) -> None:
"""Print info on failed statements"""
print(f"\n{key!r} is not defined\n")
[docs]
def expand(self, key: str) -> Generator[Element, None, None]:
"""iterator over AT objects generated by a source object"""
try:
v = self[key]
if isinstance(v, AnyDescr):
yield from v.expand(self)
else:
yield v
except Exception as exc:
print(f"{type(exc).__name__}: {exc.args[0]}")
for arg in exc.args[1:]:
print(arg)
self._analyse(self._reason(exc))
raise
def _generator(self, params):
"""Generate AT elements for the Lattice constructor"""
use = params.setdefault("use", "ring")
params.setdefault("name", use)
# Iterate from the elements
yield from self.expand(use)
[docs]
def lattice(self, use="ring", **kwargs):
"""Create a lattice from the selected sequence
Parameters:
use: Name of the MADX sequence or line containing the desired
lattice. Default: ``ring``
Keyword Args:
name (str): Name of the lattice. Default: MADX sequence name.
particle(Particle): Circulating particle. Default: from MADX
energy (float): Energy of the lattice [eV], Default: from MADX
periodicity(int): Number of periods. Default: 1
*: All other keywords will be set as Lattice attributes
"""
return Lattice(self._generator, iterator=params_filter, use=use, **kwargs)
[docs]
def parse_lines(
self,
lines: Iterable[str],
final: bool = True,
**kwargs,
) -> None:
"""Process input lines and fill the database
Args:
lines: Iterable of input lines
final: If :py:obj:`True`, signals that the undefined variables may be set
to the default value
**kwargs: Initial variable definitions
"""
self.update(**kwargs)
buffer = []
in_comment: bool = False
ok: bool = True
for line_number, contents in enumerate(lines):
# Handle comments
while contents:
in_comment, contents = self.skip_comments(buffer, contents, in_comment)
if buffer:
contents = "".join(buffer).strip()
buffer = []
if not contents:
continue
else:
continue
# print(repr(contents))
# Handle delimiters
if self.delimiter is None:
statements = []
last = contents
else:
*statements, last = contents.split(sep=self.delimiter)
# Handle continuation
if self.continuation is None:
buffer.append(last)
else:
idc = last.find(self.continuation)
if idc >= 0:
buffer.append(last[:idc])
else:
statements.append(last)
# Process statements
for stmnt in statements:
stmnt = stmnt.strip()
if not stmnt:
continue
try:
ok = self._statement(stmnt)
except Exception as exc:
message = f"Line {line_number} {stmnt!r}, {exc}"
raise type(exc)(message) from exc
if not ok:
break
if not ok:
break
self._finalise(final=final)
[docs]
def parse_files(
self,
*filenames: str,
final: bool = True,
prolog: None | int | Callable[..., None] = None,
epilog: Callable[..., None] | None = None,
**kwargs,
) -> None:
"""Process files and fill the database
Args:
*filenames: Files to process
final: If :py:obj:`True`, signals that the undefined variables may be set
to the default value
prolog:
epilog:
**kwargs: Initial variable definitions
"""
self.update(**kwargs)
last = len(filenames) - 1
for nf, fn in enumerate(filenames):
fn = normpath(join(self.bases[-1], fn))
self.bases.append(dirname(fn))
print("Processing", fn)
try:
with open(fn) as f:
if callable(prolog):
prolog(f)
elif isinstance(prolog, int):
for _i in range(prolog):
next(f)
self.parse_lines(f, final=final and (nf == last))
if callable(epilog):
epilog(f)
finally:
print("End", fn)
self.bases.pop()
[docs]
class UnorderedParser(BaseParser):
"""parser allowing definitions in any order
This is done by storing the failed statements in a queue and iteratively trying
to execute them after all input statements have been processed, until the number
of failures is constant (hopefully zero)
"""
def __init__(self, env: dict, *args, **kwargs):
"""
Args:
env: global namespace
delimiter: command delimiter
continuation: command continuation character
linecomment: Line comment character
blockcomment: Block comment delimiter
endfile: End of input marker
verbose: If True, print detail on the processing
*args: dict initializer
**kwargs: dict initializer
"""
super().__init__(env, *args, **kwargs)
self.delayed = []
[docs]
def clear(self):
super().clear()
self.delayed = []
def _postpone(self, reason, label, cmdname, *argnames):
"""Store failing commands in self.delayed for later evaluation"""
self.delayed.append((reason, label, cmdname, *argnames))
def _decode(self, label: str, cmdname: str, *argnames: str) -> None:
"""Postpone failing commands"""
try:
super()._decode(label, cmdname, *argnames)
except (KeyError, NameError) as exc: # store the failing assignment
self._postpone(self._reason(exc), label, cmdname, *argnames)
def _finalise(self, final: bool = True) -> None:
"""Loop on evaluation of the pending statements"""
nend = len(self.delayed)
if nend > 0:
self._print(f"\nDelayed evaluation of {nend} statements\n")
while nend > 0:
statements = self.delayed
self.delayed = []
nstart = nend
for _reason, *args in statements:
self._decode(*args)
nend = len(self.delayed)
if nend == nstart:
break
def _lookup(self, item: str):
"""Search for an object in the pending statements"""
for reason, label, *args in self.delayed:
if label is not None and label.lower() == item:
return (reason, label, *args)
return None
def _missing(self, verbose: bool = False):
miss = set()
for cmd in self.delayed:
reason = cmd[0]
if reason == cmd[2].lower():
if verbose:
print(
f"Unknown command {cmd[2]!r} ignored: "
f"{self._command_str(*cmd)!r}"
)
continue
while cmd is not None:
reason = cmd[0]
cmd = self._lookup(reason)
miss.add(reason)
return miss
def _analyse(self, key: str) -> None:
"""Print the chain of failing commands"""
cmd = self._lookup(key.lower())
reason = key
while cmd is not None:
reason = cmd[0]
print(f"\n{key!r} depends on {reason!r}: {self._command_str(*cmd)!r}")
key = reason
cmd = self._lookup(reason)
print(f"\n{reason!r} is not defined\n")
missing = property(_missing, doc="Set of missing definitions")