Source code for pyxdsm.XDSM

from __future__ import print_function
import os
import numpy as np
import json
import subprocess
from collections import namedtuple

from pyxdsm import __version__ as pyxdsm_version

OPT = "Optimization"
SUBOPT = "SubOptimization"
SOLVER = "MDA"
DOE = "DOE"
IFUNC = "ImplicitFunction"
FUNC = "Function"
GROUP = "Group"
IGROUP = "ImplicitGroup"
METAMODEL = "Metamodel"
LEFT = "left"
RIGHT = "right"

tikzpicture_template = r"""
%%% Preamble Requirements %%%
% \usepackage{{geometry}}
% \usepackage{{amsfonts}}
% \usepackage{{amsmath}}
% \usepackage{{amssymb}}
% \usepackage{{tikz}}

% Optional packages such as sfmath set through python interface
% \usepackage{{{optional_packages}}}

% \usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}}

%%% End Preamble Requirements %%%

\input{{"{diagram_styles_path}"}}
\begin{{tikzpicture}}

\matrix[MatrixSetup]{{
{nodes}}};

% XDSM process chains
{process}

\begin{{pgfonlayer}}{{data}}
\path
{edges}
\end{{pgfonlayer}}

\end{{tikzpicture}}
"""

tex_template = r"""
% XDSM diagram created with pyXDSM {version}.
\documentclass{{article}}
\usepackage{{geometry}}
\usepackage{{amsfonts}}
\usepackage{{amsmath}}
\usepackage{{amssymb}}
\usepackage{{tikz}}

% Optional packages such as sfmath set through python interface
\usepackage{{{optional_packages}}}

% Define the set of TikZ packages to be included in the architecture diagram document
\usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}}


% Set the border around all of the architecture diagrams to be tight to the diagrams themselves
% (i.e. no longer need to tinker with page size parameters)
\usepackage[active,tightpage]{{preview}}
\PreviewEnvironment{{tikzpicture}}
\setlength{{\PreviewBorder}}{{5pt}}

\begin{{document}}

\input{{"{tikzpicture_path}"}}

\end{{document}}
"""


def chunk_label(label, n_chunks):
    # looping till length l
    for i in range(0, len(label), n_chunks):
        yield label[i : i + n_chunks]


def _parse_label(label, label_width=None):
    if isinstance(label, (tuple, list)):
        if label_width is None:
            return r"$\begin{array}{c}" + r" \\ ".join(label) + r"\end{array}$"
        else:
            labels = []
            for chunk in chunk_label(label, label_width):
                labels.append(", ".join(chunk))
            return r"$\begin{array}{c}" + r" \\ ".join(labels) + r"\end{array}$"
    else:
        return r"${}$".format(label)


def _label_to_spec(label, spec):
    if isinstance(label, str):
        label = [
            label,
        ]
    for var in label:
        if var:
            spec.add(var)


System = namedtuple("System", "node_name style label stack faded label_width spec_name")
Input = namedtuple("Input", "node_name label label_width style stack faded")
Output = namedtuple("Output", "node_name label label_width style stack faded side")
Connection = namedtuple("Connection", "src target label label_width style stack faded src_faded target_faded")
Process = namedtuple("Process", "systems arrow faded")


[docs] class XDSM: def __init__(self, use_sfmath=True, optional_latex_packages=None, auto_fade=None): """Initialize XDSM object Parameters ---------- use_sfmath : bool, optional Whether to use the sfmath latex package, by default True optional_latex_packages : string or list of strings, optional Additional latex packages to use when creating the pdf and tex versions of the diagram, by default None auto_fade : dictionary, optional Controls the automatic fading of inputs, outputs, connections and processes based on the fading of diagonal blocks. For each key "inputs", "outputs", "connections", and "processes", the value can be one of: - "all" : fade all blocks - "connected" : fade all components connected to faded blocks (both source and target must be faded for a conncection to be faded) - "none" : do not auto-fade anything For connections there are two additional options: - "incoming" : Fade all connections that are incoming to faded blocks. - "outgoing" : Fade all connections that are outgoing from faded blocks. """ self.systems = [] self.connections = [] self.left_outs = {} self.right_outs = {} self.ins = {} self.processes = [] self.use_sfmath = use_sfmath if optional_latex_packages is None: self.optional_packages = [] else: if isinstance(optional_latex_packages, str): self.optional_packages = [optional_latex_packages] elif isinstance(optional_latex_packages, list): self.optional_packages = optional_latex_packages else: raise ValueError("optional_latex_packages must be a string or a list of strings") self.auto_fade = {"inputs": "none", "outputs": "none", "connections": "none", "processes": "none"} fade_options = ["all", "connected", "none"] if auto_fade is not None: if any([key not in self.auto_fade for key in auto_fade.keys()]): raise ValueError( "The supplied 'auto_fade' dictionary contains keys that are not recognized. " + "valid keys are 'inputs', 'outputs', 'connections', 'processes'." ) self.auto_fade.update(auto_fade) for key in self.auto_fade.keys(): option_is_valid = self.auto_fade[key] in fade_options or ( key == "connections" and self.auto_fade[key] in ["incoming", "outgoing"] ) if not option_is_valid: raise ValueError( f"The supplied 'auto_fade' dictionary contains an invalid value: '{key}'. " + "valid values are 'all', 'connected', 'none', 'incoming', 'outgoing'." )
[docs] def add_system( self, node_name, style, label, stack=False, faded=False, label_width=None, spec_name=None, ): r""" Add a "system" block, which will be placed on the diagonal of the XDSM diagram. Parameters ---------- node_name : str The unique name given to this component style : str The type of the component label : str or list/tuple of strings The label to appear on the diagram. There are two options for this: - a single string - a list or tuple of strings, which is used for line breaking In either case, they should probably be enclosed in \text{} declarations to make sure the font is upright. stack : bool If true, the system will be displayed as several stacked rectangles, indicating the component is executed in parallel. faded : bool If true, the component will be faded, in order to highlight some other system. label_width : int or None If not None, AND if ``label`` is given as either a tuple or list, then this parameter controls how many items in the tuple/list will be displayed per line. If None, the label will be printed one item per line if given as a tuple or list, otherwise the string will be printed on a single line. spec_name : str The spec name used for the spec file. """ if spec_name is None: spec_name = node_name sys = System(node_name, style, label, stack, faded, label_width, spec_name) self.systems.append(sys)
[docs] def add_input(self, name, label, label_width=None, style="DataIO", stack=False, faded=False): r""" Add an input, which will appear in the top row of the diagram. Parameters ---------- name : str The unique name given to this component label : str or list/tuple of strings The label to appear on the diagram. There are two options for this: - a single string - a list or tuple of strings, which is used for line breaking In either case, they should probably be enclosed in \text{} declarations to make sure the font is upright. label_width : int or None If not None, AND if ``label`` is given as either a tuple or list, then this parameter controls how many items in the tuple/list will be displayed per line. If None, the label will be printed one item per line if given as a tuple or list, otherwise the string will be printed on a single line. style : str The style given to this component. Can be one of ['DataInter', 'DataIO'] stack : bool If true, the system will be displayed as several stacked rectangles, indicating the component is executed in parallel. faded : bool If true, the component will be faded, in order to highlight some other system. """ sys_faded = {} for s in self.systems: sys_faded[s.node_name] = s.faded if (self.auto_fade["inputs"] == "all") or ( self.auto_fade["inputs"] == "connected" and name in sys_faded and sys_faded[name] ): faded = True self.ins[name] = Input("output_" + name, label, label_width, style, stack, faded)
[docs] def add_output(self, name, label, label_width=None, style="DataIO", stack=False, faded=False, side="left"): r""" Add an output, which will appear in the left or right-most column of the diagram. Parameters ---------- name : str The unique name given to this component label : str or list/tuple of strings The label to appear on the diagram. There are two options for this: - a single string - a list or tuple of strings, which is used for line breaking In either case, they should probably be enclosed in \text{} declarations to make sure the font is upright. label_width : int or None If not None, AND if ``label`` is given as either a tuple or list, then this parameter controls how many items in the tuple/list will be displayed per line. If None, the label will be printed one item per line if given as a tuple or list, otherwise the string will be printed on a single line. style : str The style given to this component. Can be one of ``['DataInter', 'DataIO']`` stack : bool If true, the system will be displayed as several stacked rectangles, indicating the component is executed in parallel. faded : bool If true, the component will be faded, in order to highlight some other system. side : str Must be one of ``['left', 'right']``. This parameter controls whether the output is placed on the left-most column or the right-most column of the diagram. """ sys_faded = {} for s in self.systems: sys_faded[s.node_name] = s.faded if (self.auto_fade["outputs"] == "all") or ( self.auto_fade["outputs"] == "connected" and name in sys_faded and sys_faded[name] ): faded = True if side == "left": self.left_outs[name] = Output("left_output_" + name, label, label_width, style, stack, faded, side) elif side == "right": self.right_outs[name] = Output("right_output_" + name, label, label_width, style, stack, faded, side) else: raise ValueError("The option 'side' must be given as either 'left' or 'right'!")
[docs] def connect( self, src, target, label, label_width=None, style="DataInter", stack=False, faded=False, ): r""" Connects two components with a data line, and adds a label to indicate the data being transferred. Parameters ---------- src : str The name of the source component. target : str The name of the target component. label : str or list/tuple of strings The label to appear on the diagram. There are two options for this: - a single string - a list or tuple of strings, which is used for line breaking In either case, they should probably be enclosed in \text{} declarations to make sure the font is upright. label_width : int or None If not None, AND if ``label`` is given as either a tuple or list, then this parameter controls how many items in the tuple/list will be displayed per line. If None, the label will be printed one item per line if given as a tuple or list, otherwise the string will be printed on a single line. style : str The style given to this component. Can be one of ``['DataInter', 'DataIO']`` stack : bool If true, the system will be displayed as several stacked rectangles, indicating the component is executed in parallel. faded : bool If true, the component will be faded, in order to highlight some other system. """ if src == target: raise ValueError("Can not connect component to itself") if (not isinstance(label_width, int)) and (label_width is not None): raise ValueError("label_width argument must be an integer") sys_faded = {} for s in self.systems: sys_faded[s.node_name] = s.faded allFaded = self.auto_fade["connections"] == "all" srcFaded = src in sys_faded and sys_faded[src] targetFaded = target in sys_faded and sys_faded[target] if ( allFaded or (self.auto_fade["connections"] == "connected" and (srcFaded and targetFaded)) or (self.auto_fade["connections"] == "incoming" and targetFaded) or (self.auto_fade["connections"] == "outgoing" and srcFaded) ): faded = True self.connections.append(Connection(src, target, label, label_width, style, stack, faded, srcFaded, targetFaded))
[docs] def add_process(self, systems, arrow=True, faded=False): """ Add a process line between a list of systems, to indicate process flow. Parameters ---------- systems : list The names of the components, in the order in which they should be connected. For a complete cycle, repeat the first component as the last component. arrow : bool If true, arrows will be added to the process lines to indicate the direction of the process flow. """ sys_faded = {} for s in self.systems: sys_faded[s.node_name] = s.faded if (self.auto_fade["processes"] == "all") or ( self.auto_fade["processes"] == "connected" and any( [sys_faded[s] for s in systems if s in sys_faded.keys()] ) # sometimes a process may contain off-diagonal blocks ): faded = True self.processes.append(Process(systems, arrow, faded))
def _build_node_grid(self): size = len(self.systems) comps_rows = np.arange(size) comps_cols = np.arange(size) if self.ins: size += 1 # move all comps down one row comps_rows += 1 if self.left_outs: size += 1 # shift all comps to the right by one, to make room for inputs comps_cols += 1 if self.right_outs: size += 1 # don't need to shift anything in this case # build a map between comp node_names and row idx for ordering calculations row_idx_map = {} col_idx_map = {} node_str = r"\node [{style}] ({node_name}) {{{node_label}}};" grid = np.empty((size, size), dtype=object) grid[:] = "" # add all the components on the diagonal for i_row, j_col, comp in zip(comps_rows, comps_cols, self.systems): style = comp.style if comp.stack: style += ",stack" if comp.faded: style += ",faded" label = _parse_label(comp.label, comp.label_width) node = node_str.format(style=style, node_name=comp.node_name, node_label=label) grid[i_row, j_col] = node row_idx_map[comp.node_name] = i_row col_idx_map[comp.node_name] = j_col # add all the off diagonal nodes from components for conn in self.connections: # src, target, style, label, stack, faded, label_width src_row = row_idx_map[conn.src] target_col = col_idx_map[conn.target] loc = (src_row, target_col) style = conn.style if conn.stack: style += ",stack" if conn.faded: style += ",faded" label = _parse_label(conn.label, conn.label_width) node_name = "{}-{}".format(conn.src, conn.target) node = node_str.format(style=style, node_name=node_name, node_label=label) grid[loc] = node # add the nodes for left outputs for comp_name, out in self.left_outs.items(): style = out.style if out.stack: style += ",stack" if out.faded: style += ",faded" i_row = row_idx_map[comp_name] loc = (i_row, 0) label = _parse_label(out.label, out.label_width) node = node_str.format(style=style, node_name=out.node_name, node_label=label) grid[loc] = node # add the nodes for right outputs for comp_name, out in self.right_outs.items(): style = out.style if out.stack: style += ",stack" if out.faded: style += ",faded" i_row = row_idx_map[comp_name] loc = (i_row, -1) label = _parse_label(out.label, out.label_width) node = node_str.format(style=style, node_name=out.node_name, node_label=label) grid[loc] = node # add the inputs to the top of the grid for comp_name, inp in self.ins.items(): # node_name, style, label, stack = in_data style = inp.style if inp.stack: style += ",stack" if inp.faded: style += ",faded" j_col = col_idx_map[comp_name] loc = (0, j_col) label = _parse_label(inp.label, label_width=inp.label_width) node = node_str.format(style=style, node_name=inp.node_name, node_label=label) grid[loc] = node # mash the grid data into a string rows_str = "" for i, row in enumerate(grid): rows_str += "%Row {}\n".format(i) + "&\n".join(row) + r"\\" + "\n" return rows_str def _build_edges(self): h_edges = [] v_edges = [] edge_format_string = "({start}) edge [{style}] ({end})" for conn in self.connections: h_edge_style = "DataLine" v_edge_style = "DataLine" if conn.src_faded or conn.faded: h_edge_style += ",faded" if conn.target_faded or conn.faded: v_edge_style += ",faded" od_node_name = "{}-{}".format(conn.src, conn.target) h_edges.append(edge_format_string.format(start=conn.src, end=od_node_name, style=h_edge_style)) v_edges.append(edge_format_string.format(start=od_node_name, end=conn.target, style=v_edge_style)) for comp_name, out in self.left_outs.items(): style = "DataLine" if out.faded: style += ",faded" node_name = out.node_name h_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) for comp_name, out in self.right_outs.items(): style = "DataLine" if out.faded: style += ",faded" node_name = out.node_name h_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) for comp_name, inp in self.ins.items(): style = "DataLine" if inp.faded: style += ",faded" node_name = inp.node_name v_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) h_edges = sorted(h_edges, key=lambda s: "faded" in s) v_edges = sorted(v_edges, key=lambda s: "faded" in s) paths_str = "% Horizontal edges\n" + "\n".join(h_edges) + "\n" paths_str += "% Vertical edges\n" + "\n".join(v_edges) + ";" return paths_str def _build_process_chain(self): sys_names = [s.node_name for s in self.systems] output_names = ( [data[0] for _, data in self.ins.items()] + [data[0] for _, data in self.left_outs.items()] + [data[0] for _, data in self.right_outs.items()] ) # comp_name, in_data in self.ins.items(): # node_name, style, label, stack = in_data chain_str = "" for proc in self.processes: chain_str += "{ [start chain=process]\n \\begin{pgfonlayer}{process} \n" start_tip = False for i, sys in enumerate(proc.systems): if sys not in sys_names and sys not in output_names: raise ValueError( 'process includes a system named "{}" but no system with that name exists.'.format(sys) ) if sys in output_names and i == 0: start_tip = True if i == 0: chain_str += "\\chainin ({});\n".format(sys) else: if sys in output_names or (i == 1 and start_tip): if proc.arrow: style = "ProcessTipA" else: style = "ProcessTip" else: if proc.arrow: style = "ProcessHVA" else: style = "ProcessHV" if proc.faded: style = "Faded" + style chain_str += "\\chainin ({}) [join=by {}];\n".format(sys, style) chain_str += "\\end{pgfonlayer}\n}" return chain_str def _compose_optional_package_list(self): # Check for optional LaTeX packages optional_packages_list = self.optional_packages if self.use_sfmath: optional_packages_list.append("sfmath") # Join all packages into one string separated by comma optional_packages_str = ",".join(optional_packages_list) return optional_packages_str
[docs] def write(self, file_name, build=True, cleanup=True, quiet=False, outdir="."): """ Write output files for the XDSM diagram. This produces the following: - {file_name}.tikz A file containing the TikZ definition of the XDSM diagram. - {file_name}.tex A standalone document wrapped around an include of the TikZ file which can be compiled to a pdf. - {file_name}.pdf An optional compiled version of the standalone tex file. Parameters ---------- file_name : str The prefix to be used for the output files build : bool Flag that determines whether the standalone PDF of the XDSM will be compiled. Default is True. cleanup : bool Flag that determines if pdflatex build files will be deleted after build is complete quiet : bool Set to True to suppress output from pdflatex. outdir : str Path to an existing directory in which to place output files. If a relative path is given, it is interpreted relative to the current working directory. """ nodes = self._build_node_grid() edges = self._build_edges() process = self._build_process_chain() module_path = os.path.dirname(__file__) diagram_styles_path = os.path.join(module_path, "diagram_styles") # Hack for Windows. MiKTeX needs Linux style paths. diagram_styles_path = diagram_styles_path.replace("\\", "/") optional_packages_str = self._compose_optional_package_list() tikzpicture_str = tikzpicture_template.format( nodes=nodes, edges=edges, process=process, diagram_styles_path=diagram_styles_path, optional_packages=optional_packages_str, ) base_output_fp = os.path.join(outdir, file_name) with open(base_output_fp + ".tikz", "w") as f: f.write(tikzpicture_str) tex_str = tex_template.format( nodes=nodes, edges=edges, tikzpicture_path=file_name + ".tikz", diagram_styles_path=diagram_styles_path, optional_packages=optional_packages_str, version=pyxdsm_version, ) with open(base_output_fp + ".tex", "w") as f: f.write(tex_str) if build: command = [ "pdflatex", "-halt-on-error", "-interaction=nonstopmode", "-output-directory={}".format(outdir), ] if quiet: command += ["-interaction=batchmode", "-halt-on-error"] command += [f"{file_name}.tex"] subprocess.run(command, check=True) if cleanup: for ext in ["aux", "fdb_latexmk", "fls", "log"]: f_name = "{}.{}".format(base_output_fp, ext) if os.path.exists(f_name): os.remove(f_name)
[docs] def write_sys_specs(self, folder_name): """ Write I/O spec json files for systems to specified folder An I/O spec of a system is the collection of all variables going into and out of it. That includes any variables being passed between systems, as well as all inputs and outputs. This information is useful for comparing implementations (such as components and groups in OpenMDAO) to the XDSM diagrams. The json spec files can be used to write testing utilities that compare the inputs/outputs of an implementation to the XDSM, and thus allow you to verify that your codes match the XDSM diagram precisely. This technique is especially useful when large engineering teams are collaborating on model development. It allows them to use the XDSM as a shared contract between team members so everyone can be sure that their codes will sync up. Parameters ---------- folder_name: str name of the folder, which will be created if it doesn't exist, to put spec files into """ # find un-connected to each system by looking at Inputs specs = {} for sys in self.systems: specs[sys.node_name] = {"inputs": set(), "outputs": set()} for sys_name, inp in self.ins.items(): _label_to_spec(inp.label, specs[sys_name]["inputs"]) # find connected inputs/outputs to each system by looking at Connections for conn in self.connections: _label_to_spec(conn.label, specs[conn.target]["inputs"]) _label_to_spec(conn.label, specs[conn.src]["outputs"]) # find unconnected outputs to each system by looking at Outputs for sys_name, out in self.left_outs.items(): _label_to_spec(out.label, specs[sys_name]["outputs"]) for sys_name, out in self.right_outs.items(): _label_to_spec(out.label, specs[sys_name]["outputs"]) if not os.path.isdir(folder_name): os.mkdir(folder_name) for sys in self.systems: if sys.spec_name is not False: path = os.path.join(folder_name, sys.spec_name + ".json") with open(path, "w") as f: spec = specs[sys.node_name] spec["inputs"] = list(spec["inputs"]) spec["outputs"] = list(spec["outputs"]) json_str = json.dumps(spec, indent=2) f.write(json_str)