Skip to content
Snippets Groups Projects
Select Git revision
  • 05b9f9384b03a8873d487b6e3a312ff279a24e8f
  • main default protected
  • Neubauer/Vektoroperationen
  • burk/PhaseTransitionsHOG
  • luetjering/SPDE
  • mohr/citing
  • burk/MC
  • ponsuganth/extra-forms
  • dechant/manifolds
  • bauerd/bump-numpy
  • gurtner/nonlinear-diffusion
  • bauerd/backend-rework
  • ponsuganth/weak-rigid-body-mode
  • mohr/doc
  • boehm/isoP1-space
  • boehm/precomputation
  • ponsuganth/stokes-viscoplastic
  • ponsuganth/p0-coefficients
  • ponsuganth/std-functions
  • valian/linear_elasticity
  • ponsuganth/gradient-operator
21 results

generate_all_operators.py

Blame
  • generate_all_operators.py 24.19 KiB
    # HyTeG Operator Generator
    # Copyright (C) 2024  HyTeG Team
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    import argparse
    import sys
    from argparse import ArgumentParser
    from dataclasses import dataclass, field
    from functools import partial
    import logging
    import os
    import re
    from typing import Callable, List, Optional, Sequence, Set, Tuple
    
    import numpy as np
    import sympy as sp
    from sympy.core.cache import clear_cache
    from tabulate import tabulate
    
    from hog.blending import GeometryMap, IdentityMap, AnnulusMap, IcosahedralShellMap
    from hog.cse import CseImplementation
    from hog.element_geometry import ElementGeometry, TriangleElement, TetrahedronElement
    from hog.exception import HOGException
    from hog.forms import (
        diffusion,
        divergence,
        div_k_grad,
        shear_heating,
        epsilon,
        full_stokes,
        nonlinear_diffusion,
        nonlinear_diffusion_newton_galerkin,
    )
    from hog.forms_vectorial import curl_curl, curl_curl_plus_mass, mass_n1e1
    from hog.function_space import (
        FunctionSpace,
        LagrangianFunctionSpace,
        N1E1Space,
        TensorialVectorFunctionSpace,
    )
    from hog.logger import get_logger, TimedLogger
    from hog.operator_generation.kernel_types import (
        Apply,
        Assemble,
        AssembleDiagonal,
        KernelType,
    )
    from hog.operator_generation.loop_strategies import (
        LoopStrategy,
        SAWTOOTH,
        CUBES,
        FUSEDROWS,
    )
    from hog.operator_generation.operators import (
        HyTeGElementwiseOperator,
        MacroIntegrationDomain,
    )
    
    from operator import itemgetter
    from hog.operator_generation.optimizer import (
        Opts,
        opts_arg_mapping,
        ordered_opts_suffix,
    )
    from hog.quadrature import Quadrature
    from hog.symbolizer import Symbolizer
    from generate_all_hyteg_forms import valid_base_dir
    from hog.operator_generation.types import parse_argument_type, HOGType, HOGPrecision
    import quadpy
    from hog.quadrature.quadrature import select_quadrule
    
    ALL_GEOMETRY_MAPS = [
        IdentityMap(),
        AnnulusMap(),
        IcosahedralShellMap(),
    ]
    
    
    def parse_arguments():
        parser = ArgumentParser(
            description="Generates a collection of elementwise operators."
        )
    
        list_or_gen = parser.add_mutually_exclusive_group(required=True)
        list_or_gen.add_argument(
            "-l",
            "--list-operators",
            action="store_true",
            help="List all available operators.",
        )
        list_or_gen.add_argument(
            "--list-quadratures",
            nargs=1,
            help="List all available quadrature rules for a given geometry (t1: Line, t2: Triangle, t3: Tetrahedron)",
        )
    
        list_or_gen.add_argument(
            "--hyteg",
            help=(
                "Path to the HyTeG repository. The generated code is written to the correct directory in the source tree."
            ),
        )
        list_or_gen.add_argument(
            "--output",
            help=("Path to output directory."),
        )
    
        parser.add_argument(
            "-s",
            "--space-mapping",
            default=".*?",
            help="Filter by function space, e.g. 'P1' or 'P1toP2' (regex).",
            # TODO add choices list and type=string.lower
        )
    
        parser.add_argument(
            "-f",
            "--form",
            default=".*?",
            help="Filter by form, e.g. 'CurlCurl' or 'Diffusion' (regex).",
            # TODO add choices list and type=string.lower
        )
    
        parser.add_argument(
            "-o",
            "--opts",
            default="",
            nargs="+",
            help=f"""
        Enter a space separated list of optimizations. 
        Not all combinations are currently supported. Available optimizations are: {Opts.__members__.keys()}""",
            # TODO add choices list and type=string.lower
        )
        # TODO explanations for optimizations e.g. CUTLOOPS is not straight-forward
    
        parser.add_argument(
            "--loop-strategy",
            default="SAWTOOTH",
            help="Enter a loop strategy for the operator. Available strategies are SAWTOOTH, CUBES.",
            # TODO add choices list and type=string.lower
        )
    
        quad_rule_or_degree = parser.add_mutually_exclusive_group(required=False)
        quad_rule_or_degree.add_argument(
            "--quad-rule",
            type=str,
            nargs=2,
            default=None,
            help="""Supply two rules by their identifier (leftmost column when calling with --list-quadratures),
                    the first will be used for triangles and the second for tetrahedra.""",
        )
        quad_rule_or_degree.add_argument(
            "--quad-degree",
            type=int,
            nargs=2,
            default=[-1, -1],
            help="""Supply two polynomial degrees (as integers) up to which the quadrature should be exact. 
                    The generator will chose the rule with minimal points that integrates the specified 
                    degree exactly. The first integer will be used for triangles and the second for tetrahedra.""",
        )
    
        prec_or_mpfr = parser.add_mutually_exclusive_group(required=False)
        prec_or_mpfr.add_argument(
            "-p",
            "--precision",
            type=str.lower,
            help=(
                "Defines the precision in which the kernels are calculated; "
                "If not specified we choose fp64(double) precision"
            ),
            choices=list(choice.value for choice in HOGPrecision),
            default="fp64",
        )
    
        parser.add_argument(
            "-b",
            "--blending",
            default=str(IdentityMap()),
            help=f"Enter a blending type for the operator. Note some geometry maps only work in 2D or 3D respectively. "
            f"Available geometry maps: {[str(m) for m in ALL_GEOMETRY_MAPS]}",
            # TODO add choices list and type=string.lower
        )
    
        parser.add_argument(
            "--name",
            help=f"Specify the name of the generated C++ class.",
        )
    
        logging_group = parser.add_mutually_exclusive_group(required=False)
        logging_group.add_argument(
            "-v",
            "--verbose",
            type=int,
            help="specify verbosity level (0 = silent, 1 = info, 2 = debug)",
            choices=[0, 1, 2],
        )
        logging_group.add_argument(
            "--loglevel",
            type=str.lower,
            help=(
                "Defines which messages are shown and which not; "
                "If not specified 'warning' is chosen."
            ),
            choices=["debug", "info", "warning", "error", "critical"],
        )
    
        clang_format_group = parser.add_mutually_exclusive_group(required=False)
        clang_format_group.add_argument(
            "--no-clang-format",
            dest="clang_format",
            action="store_false",
            help="Do not apply clang-format on generated files.",
        )
    
        clang_format_group.add_argument(
            "--clang-format-binary",
            default="clang-format",
            help=f"Allows to specify the name of the clang-format binary and/or optionally its full path."
            f" By default 'clang-format' will be used.",
        )
    
        args = parser.parse_args()
        return parser, args
    
    
    strategy_args_mapping = {
        "CUBES": CUBES(),
        "SAWTOOTH": SAWTOOTH(),
        "FUSEDROWS": FUSEDROWS(),
    }
    
    
    def main():
        clear_cache()
    
        parser, args = parse_arguments()
    
        # Adapt clang_format_binary according to the value of clang_format
        if "clang_format" in vars(args):
            if not args.clang_format:
                args.clang_format_binary = None
    
        if args.verbose:
            if args.verbose == 0:
                log_level = logging.CRITICAL
            elif args.verbose == 1:
                log_level = logging.INFO
            elif args.verbose == 2:
                log_level = logging.DEBUG
            else:
                raise HOGException(f"Invalid verbosity level {args.verbose}.")
        elif args.loglevel:
            if args.loglevel == "debug":
                log_level = logging.DEBUG
            elif args.loglevel == "info":
                log_level = logging.INFO
            elif args.loglevel == "warning":
                log_level = logging.WARNING
            elif args.loglevel == "error":
                log_level = logging.ERROR
            elif args.loglevel == "critical":
                log_level = logging.CRITICAL
            else:
                raise HOGException(f"Invalid log level {args.loglevel}.")
        else:
            # The default log level is chosen here
            log_level = logging.DEBUG
    
        logger = get_logger(log_level)
        TimedLogger.set_log_level(log_level)
    
        logger.info(f"Running {__file__} with arguments: {' '.join(sys.argv[1:])}")
    
        symbolizer = Symbolizer()
    
        if args.list_quadratures:
            geometry = args.list_quadratures[0]
            logger.debug(f"Listing quadrature rules for geometry: {geometry}.")
            if geometry == "t3":
                schemes = quadpy.t3.schemes
            elif geometry == "t2":
                schemes = quadpy.t2.schemes
            else:
                raise HOGException(f"Unexpected geometry: {geometry}")
    
            all_schemes = []
            for key, s in schemes.items():
                try:
                    scheme = s()
                    entry = {
                        "key": key,
                        "name": scheme.name,
                        "points": scheme.points.shape[1],
                        "degree": scheme.degree,
                        "test_tolerance": scheme.test_tolerance,
                    }
                    all_schemes.append(entry)
                except TypeError as e:
                    # Some schemes seem to require parameters, we simply exclude these with this litte try-except hack
                    pass
    
            all_schemes = sorted(all_schemes, key=itemgetter("degree", "points"))
            table = tabulate(all_schemes, headers="keys", tablefmt="grid")
            print(table)
            exit()
    
        blending = None
        for m in ALL_GEOMETRY_MAPS:
            if str(m) == args.blending:
                blending = m
        if blending is None:
            raise HOGException("Invalid geometry map.")
        logger.debug(f"Blending: {blending}")
    
        type_descriptor = parse_argument_type(args)
    
        logger.debug(f"Data type name is {type_descriptor.pystencils_type}.")
    
        opts = {opts_arg_mapping[opt] for opt in args.opts}
        loop_strategy = strategy_args_mapping[args.loop_strategy]
        re_form = re.compile(args.form)
    
        if (
            not args.list_operators
            and args.quad_degree == [-1, -1]
            and args.quad_rule is None
        ):
            parser.error("Either --quad-degree or --quad-rule are required.")
        quad = {
            geometry: info
            for geometry, info in zip(
                [TriangleElement(), TetrahedronElement()],
                args.quad_degree if args.quad_rule is None else args.quad_rule,
            )
        }
        operators = all_operators(
            symbolizer,
            [(opts, loop_strategy, ordered_opts_suffix(loop_strategy, opts))],
            type_descriptor,
            blending,
        )
    
        re_mapping = re.compile(args.space_mapping)
        re_form = re.compile(args.form)
    
        filtered_operators = list(
            filter(
                lambda o: re.fullmatch(args.space_mapping, o.mapping)
                and re.fullmatch(args.form, o.name),
                operators,
            )
        )
    
        if len(filtered_operators) == 0:
            raise HOGException(
                f"Form {args.form} does not match any available forms. Run --list for a list."
            )
    
        if args.list_operators:
            print(
                tabulate(
                    sorted(
                        [
                            (
                                o.mapping,
                                o.name,
                                ", ".join(
                                    str(geo.dimensions) + "D" for geo in o.geometries
                                ),
                                len(o.opts),
                            )
                            for o in filtered_operators
                        ]
                    ),
                    headers=(
                        "Space mapping",
                        "Form",
                        "Dimensions",
                        "Optimizations",
                    ),
                )
            )
            exit()
    
        if args.hyteg:
            args.output = os.path.join(args.hyteg, "src/hyteg/operatorgeneration/generated")
            if not valid_base_dir(args.hyteg):
                raise HOGException(
                    "The specified directory does not seem to be the HyTeG directory."
                )
    
        for operator in filtered_operators:
            for opt, loop, opt_name in operator.opts:
                blending_str = (
                    "_" + str(blending) if not isinstance(blending, IdentityMap) else ""
                )
                if args.name:
                    name = args.name
                else:
                    name = (
                        f"{operator.mapping}Elementwise{operator.name}{blending_str}{opt_name}"
                        f"{'_' + str(type_descriptor) if type_descriptor.add_file_suffix else ''}"
                    )
                with TimedLogger(f"Generating {name}", level=logging.INFO):
                    generate_elementwise_op(
                        args,
                        symbolizer,
                        operator,
                        name,
                        opt,
                        loop,
                        blending,
                        quad,
                        type_descriptor=type_descriptor,
                    )
    
    
    def all_opts_sympy_cse() -> List[Tuple[Set[Opts], LoopStrategy, str]]:
        return [
            (
                set(
                    {
                        Opts.MOVECONSTANTS,
                        Opts.REORDER,
                        Opts.ELIMTMPS,
                        Opts.QUADLOOPS,
                        Opts.VECTORIZE,
                    }
                ),
                SAWTOOTH(),
                "_const_vect_fused_quadloops_reordered_elimtmps",
            ),
        ]
    
    
    def all_opts_poly_cse() -> List[Tuple[Set[Opts], LoopStrategy, str]]:
        return [
            (o | {Opts.POLYCSE}, l, n + "_polycse") for (o, l, n) in all_opts_sympy_cse()
        ]
    
    
    def all_opts_both_cses() -> List[Tuple[Set[Opts], LoopStrategy, str]]:
        return all_opts_sympy_cse() + all_opts_poly_cse()
    
    
    @dataclass
    class OperatorInfo:
        mapping: str
        name: str
        trial_space: FunctionSpace
        test_space: FunctionSpace
        form: Callable[
            [
                FunctionSpace,
                FunctionSpace,
                ElementGeometry,
                Symbolizer,
                GeometryMap,
            ],
            sp.Matrix,
        ]
        type_descriptor: HOGType
        kernel_types: List[KernelType] = None  # type: ignore[assignment] # will definitely be initialized in __post_init__
        geometries: Sequence[ElementGeometry] = field(
            default_factory=lambda: [TriangleElement(), TetrahedronElement()]
        )
        opts: List[Tuple[Set[Opts], LoopStrategy, str]] = field(
            default_factory=all_opts_sympy_cse
        )
        blending: GeometryMap = field(default_factory=lambda: IdentityMap())
    
        def __post_init__(self):
            # Removing geometries that are not supported by blending.
            # I don't like this, but it looks like the collection of operators has to be refactored anyway.
            self.geometries = list(
                set(self.geometries) & set(self.blending.supported_geometries())
            )
    
            if self.kernel_types is None:
                dims = [g.dimensions for g in self.geometries]
                self.kernel_types = [
                    Apply(
                        self.test_space,
                        self.trial_space,
                        type_descriptor=self.type_descriptor,
                        dims=dims,
                    )
                ]
    
                all_opts = set().union(*[o for (o, _, _) in self.opts])
                if not ({Opts.VECTORIZE, Opts.VECTORIZE512}.intersection(all_opts)):
                    self.kernel_types.append(
                        Assemble(
                            self.test_space,
                            self.trial_space,
                            type_descriptor=self.type_descriptor,
                            dims=dims,
                        )
                    )
    
                if self.test_space == self.trial_space:
                    self.kernel_types.append(
                        AssembleDiagonal(
                            self.test_space, type_descriptor=self.type_descriptor, dims=dims
                        )
                    )
    
    
    def all_operators(
        symbolizer: Symbolizer,
        opts: List[Tuple[Set[Opts], LoopStrategy, str]],
        type_descriptor: HOGType,
        blending: GeometryMap,
    ) -> List[OperatorInfo]:
        P1 = LagrangianFunctionSpace(1, symbolizer)
        P1Vector = TensorialVectorFunctionSpace(P1)
        P2 = LagrangianFunctionSpace(2, symbolizer)
        N1E1 = N1E1Space(symbolizer)
    
        two_d = [TriangleElement()]
        three_d = [TetrahedronElement()]
    
        ops: List[OperatorInfo] = []
    
        # fmt: off
        # TODO switch to manual specification of opts for now/developement, later use default factory
        ops.append(OperatorInfo(mapping="N1E1", name="CurlCurl", trial_space=N1E1, test_space=N1E1, form=curl_curl,
                                type_descriptor=type_descriptor, geometries=three_d, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="N1E1", name="Mass", trial_space=N1E1, test_space=N1E1, form=mass_n1e1,
                                type_descriptor=type_descriptor, geometries=three_d, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="N1E1", name="CurlCurlPlusMass", trial_space=N1E1, test_space=N1E1,
                                form=partial(curl_curl_plus_mass, alpha_fem_space=P1, beta_fem_space=P1),
                                type_descriptor=type_descriptor, geometries=three_d, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="P1", name="Diffusion", trial_space=P1, test_space=P1, form=diffusion,
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="P1", name="DivKGrad", trial_space=P1, test_space=P1,
                                form=partial(div_k_grad, coefficient_function_space=P1),
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
    
        ops.append(OperatorInfo(mapping="P2", name="Diffusion", trial_space=P2, test_space=P2, form=diffusion,
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="P2", name="DivKGrad", trial_space=P2, test_space=P2,
                                form=partial(div_k_grad, coefficient_function_space=P2),
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="P2", name="ShearHeating", trial_space=P2, test_space=P2,
                                form=partial(shear_heating, viscosity_function_space=P2, velocity_function_space=P2),
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
    
        ops.append(OperatorInfo(mapping="P1", name="NonlinearDiffusion", trial_space=P1, test_space=P1,
                                form=partial(nonlinear_diffusion, coefficient_function_space=P1),
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
        ops.append(OperatorInfo(mapping="P1", name="NonlinearDiffusionNewtonGalerkin", trial_space=P1,
                                test_space=P1, form=partial(nonlinear_diffusion_newton_galerkin,
                                coefficient_function_space=P1, onlyNewtonGalerkinPartOfForm=False),
                                type_descriptor=type_descriptor, opts=opts, blending=blending))
    
        ops.append(OperatorInfo(mapping="P1Vector", name="Diffusion", trial_space=P1Vector, test_space=P1Vector,
                                form=diffusion, type_descriptor=type_descriptor, opts=opts, blending=blending))
    
        # fmt: on
    
        for c in [0, 1, 2]:
            # fmt: off
            div_geometries = [TriangleElement(), TetrahedronElement()]
            if c == 2:
                div_geometries = [TetrahedronElement()]
            ops.append(OperatorInfo(mapping=f"P2ToP1", name=f"Div_{c}", trial_space=P1, test_space=P2,
                                    form=partial(divergence, transpose=False, component_index=c),
                                    type_descriptor=type_descriptor, opts=opts, geometries=div_geometries,
                                    blending=blending))
            ops.append(OperatorInfo(mapping=f"P1ToP2", name=f"DivT_{c}", trial_space=P2, test_space=P1,
                                    form=partial(divergence, transpose=True, component_index=c),
                                    type_descriptor=type_descriptor, opts=opts, geometries=div_geometries,
                                    blending=blending))
            # fmt: on
    
        for c in [0, 1]:
            for r in [0, 1]:
                p2_epsilon = partial(
                    epsilon,
                    variable_viscosity=True,
                    coefficient_function_space=P2,
                    component_trial=c,
                    component_test=r,
                )
    
                p2_full_stokes = partial(
                    full_stokes,
                    variable_viscosity=True,
                    coefficient_function_space=P2,
                    component_trial=c,
                    component_test=r,
                )
                # fmt: off
                ops.append(
                    OperatorInfo(mapping=f"P2", name=f"Epsilon_{r}_{c}", trial_space=P2, test_space=P2, form=p2_epsilon,
                                  type_descriptor=type_descriptor, opts=opts, blending=blending))
                ops.append(OperatorInfo(mapping=f"P2", name=f"FullStokes_{r}_{c}", trial_space=P2, test_space=P2,
                                        form=p2_full_stokes, type_descriptor=type_descriptor, opts=opts,
                                        blending=blending))
                # fmt: on
        for c, r in [(0, 2), (1, 2), (2, 2), (2, 1), (2, 0)]:
            p2_epsilon = partial(
                epsilon,
                variable_viscosity=True,
                coefficient_function_space=P2,
                component_trial=c,
                component_test=r,
            )
    
            p2_full_stokes = partial(
                full_stokes,
                variable_viscosity=True,
                coefficient_function_space=P2,
                component_trial=c,
                component_test=r,
            )
            # fmt: off
            ops.append(
                OperatorInfo(mapping=f"P2", name=f"Epsilon_{r}_{c}", trial_space=P2, test_space=P2, form=p2_epsilon,
                             type_descriptor=type_descriptor, geometries=three_d, opts=opts, blending=blending))
            ops.append(
                OperatorInfo(mapping=f"P2", name=f"FullStokes_{r}_{c}", trial_space=P2, test_space=P2, form=p2_full_stokes,
                              type_descriptor=type_descriptor, geometries=three_d, opts=opts, blending=blending))
            # fmt: on
    
        # Removing all operators without viable element types (e.g.: some ops only support 2D, but a blending map maybe only
        # supports 3D).
        ops = [o for o in ops if o.geometries]
    
        return ops
    
    
    def generate_elementwise_op(
        args: argparse.Namespace,
        symbolizer: Symbolizer,
        op_info: OperatorInfo,
        name: str,
        optimizations: Set[Opts],
        loop_strategy: LoopStrategy,
        blending: GeometryMap,
        quad_info: dict[ElementGeometry, int | str],
        type_descriptor: HOGType,
    ) -> None:
        """Generates a single operator and writes it to the HyTeG directory."""
    
        operator = HyTeGElementwiseOperator(
            name,
            symbolizer,
            opts=optimizations,
            kernel_types=op_info.kernel_types,
            type_descriptor=type_descriptor,
        )
    
        for geometry in op_info.geometries:
            # Is this necessary? Currently, the decision of the geometry is all over the place.
            if geometry not in blending.supported_geometries():
                continue
    
            quad = Quadrature(
                select_quadrule(quad_info[geometry], geometry),
                geometry,
            )
    
            form = op_info.form(
                op_info.test_space,
                op_info.trial_space,
                geometry,
                symbolizer,
                blending=blending,  # type: ignore[call-arg] # kw-args are not supported by Callable
            )
    
            operator.set_element_matrix(
                dim=geometry.dimensions,
                geometry=geometry,
                integration_domain=MacroIntegrationDomain.VOLUME,
                quad=quad,
                blending=blending,
                form=form,
            )
    
        dir_path = os.path.join(args.output, op_info.name.split("_")[0])
        operator.generate_class_code(
            dir_path,
            loop_strategy=loop_strategy,
            clang_format_binary=args.clang_format_binary,
        )
    
    
    if __name__ == "__main__":
        main()