Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • fhennig/devel
  • master
  • rangersbach/c-interfacing
  • v0.1a1
  • v0.1a2
  • v0.1a3
  • v0.1a4
7 results

Target

Select target project
  • ob28imeq/pystencils-sfg
  • brendan-waters/pystencils-sfg
  • pycodegen/pystencils-sfg
3 results
Select Git revision
  • frontend-cleanup
  • lbwelding-features
  • master
  • refactor-indexing-params
  • unit_tests
  • v0.1a1
  • v0.1a2
  • v0.1a3
  • v0.1a4
9 results
Show changes
Showing
with 2635 additions and 918 deletions
from __future__ import annotations
from typing import overload
from pystencils.codegen import GpuKernel, Target
from pystencils.codegen.gpu_indexing import (
ManualLaunchConfiguration,
AutomaticLaunchConfiguration,
DynamicBlockSizeLaunchConfiguration,
)
from .mixin import SfgComposerMixIn
from .basic_composer import make_statements, make_sequence
from ..context import SfgContext
from ..ir import (
SfgKernelHandle,
SfgCallTreeNode,
SfgGpuKernelInvocation,
SfgBlock,
SfgSequence,
)
from ..lang import ExprLike, AugExpr
from ..lang.gpu import CudaAPI, HipAPI, ProvidesGpuRuntimeAPI
class SfgGpuComposer(SfgComposerMixIn):
"""Composer mix-in providing methods to generate GPU kernel invocations.
.. function:: gpu_invoke(kernel_handle: SfgKernelHandle, **kwargs)
Invoke a GPU kernel with launch configuration parameters depending on its code generator configuration.
The overloads of this method are listed below.
They all (partially) mirror the CUDA and HIP ``kernel<<< Gs, Bs, Sm, St >>>()`` syntax;
for details on the launch configuration arguments,
refer to `Launch Configurations in CUDA`_
or `Launch Configurations in HIP`_.
.. function:: gpu_invoke(kernel_handle: SfgKernelHandle, *, grid_size: ExprLike, block_size: ExprLike, shared_memory_bytes: ExprLike = "0", stream: ExprLike | None = None, ) -> SfgCallTreeNode
:noindex:
Invoke a GPU kernel with a manual launch grid.
Requires that the kernel was generated
with `manual_launch_grid <pystencils.codegen.config.GpuOptions.manual_launch_grid>`
set to `True`.
.. function:: gpu_invoke(self, kernel_handle: SfgKernelHandle, *, shared_memory_bytes: ExprLike = "0", stream: ExprLike | None = None, ) -> SfgCallTreeNode
:noindex:
Invoke a GPU kernel with an automatic launch grid.
This signature accepts kernels generated with an indexing scheme that
causes the launch grid to be determined automatically,
such as `Blockwise4D <pystencils.codegen.config.GpuIndexingScheme.Blockwise4D>`.
.. function:: gpu_invoke(self, kernel_handle: SfgKernelHandle, *, block_size: ExprLike | None = None, shared_memory_bytes: ExprLike = "0", stream: ExprLike | None = None, ) -> SfgCallTreeNode
:noindex:
Invoke a GPU kernel with a dynamic launch grid.
This signature accepts kernels generated with an indexing scheme that permits a user-defined
blocks size, such as `Linear3D <pystencils.codegen.config.GpuIndexingScheme.Linear3D>`.
The grid size is calculated automatically by dividing the number of work items in each
dimension by the block size, rounding up.
.. _Launch Configurations in CUDA: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#execution-configuration
.. _Launch Configurations in HIP: https://rocmdocs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html#calling-global-functions
""" # NOQA: E501
@overload
def gpu_invoke(
self,
kernel_handle: SfgKernelHandle,
*,
grid_size: ExprLike,
block_size: ExprLike,
shared_memory_bytes: ExprLike = "0",
stream: ExprLike | None = None,
) -> SfgCallTreeNode: ...
@overload
def gpu_invoke(
self,
kernel_handle: SfgKernelHandle,
*,
shared_memory_bytes: ExprLike = "0",
stream: ExprLike | None = None,
) -> SfgCallTreeNode: ...
@overload
def gpu_invoke(
self,
kernel_handle: SfgKernelHandle,
*,
block_size: ExprLike | None = None,
shared_memory_bytes: ExprLike = "0",
stream: ExprLike | None = None,
) -> SfgCallTreeNode: ...
def gpu_invoke(
self,
kernel_handle: SfgKernelHandle,
shared_memory_bytes: ExprLike = "0",
stream: ExprLike | None = None,
**kwargs,
) -> SfgCallTreeNode:
builder = GpuInvocationBuilder(self._ctx, kernel_handle)
builder.shared_memory_bytes = shared_memory_bytes
builder.stream = stream
return builder(**kwargs)
def cuda_invoke(
self,
kernel_handle: SfgKernelHandle,
num_blocks: ExprLike,
threads_per_block: ExprLike,
stream: ExprLike | None,
):
from warnings import warn
warn(
"cuda_invoke is deprecated and will be removed before version 0.1. "
"Use `gpu_invoke` instead.",
FutureWarning,
)
return self.gpu_invoke(
kernel_handle,
grid_size=num_blocks,
block_size=threads_per_block,
stream=stream,
)
class GpuInvocationBuilder:
def __init__(
self,
ctx: SfgContext,
kernel_handle: SfgKernelHandle,
):
self._ctx = ctx
self._kernel_handle = kernel_handle
ker = kernel_handle.kernel
if not isinstance(ker, GpuKernel):
raise ValueError(f"Non-GPU kernel was passed to `gpu_invoke`: {ker}")
launch_config = ker.get_launch_configuration()
self._launch_config = launch_config
gpu_api: type[ProvidesGpuRuntimeAPI]
match ker.target:
case Target.CUDA:
gpu_api = CudaAPI
case Target.HIP:
gpu_api = HipAPI
case _:
assert False, "unexpected GPU target"
self._gpu_api = gpu_api
self._dim3 = gpu_api.dim3
self._shared_memory_bytes: ExprLike = "0"
self._stream: ExprLike | None = None
@property
def shared_memory_bytes(self) -> ExprLike:
return self._shared_memory_bytes
@shared_memory_bytes.setter
def shared_memory_bytes(self, bs: ExprLike):
self._shared_memory_bytes = bs
@property
def stream(self) -> ExprLike | None:
return self._stream
@stream.setter
def stream(self, s: ExprLike | None):
self._stream = s
def _render_invocation(
self, grid_size: ExprLike, block_size: ExprLike
) -> SfgSequence:
stmt_grid_size = make_statements(grid_size)
stmt_block_size = make_statements(block_size)
stmt_smem = make_statements(self._shared_memory_bytes)
stmt_stream = (
make_statements(self._stream) if self._stream is not None else None
)
return make_sequence(
"/* clang-format off */",
"/* [pystencils-sfg] Formatting may add illegal spaces between angular brackets in `<<< >>>` */",
SfgGpuKernelInvocation(
self._kernel_handle,
stmt_grid_size,
stmt_block_size,
shared_memory_bytes=stmt_smem,
stream=stmt_stream,
),
"/* clang-format on */",
)
def __call__(self, **kwargs: ExprLike) -> SfgCallTreeNode:
match self._launch_config:
case ManualLaunchConfiguration():
return self._invoke_manual(**kwargs)
case AutomaticLaunchConfiguration():
return self._invoke_automatic(**kwargs)
case DynamicBlockSizeLaunchConfiguration():
return self._invoke_dynamic(**kwargs)
case _:
raise ValueError(
f"Unexpected launch configuration: {self._launch_config}"
)
def _invoke_manual(self, grid_size: ExprLike, block_size: ExprLike):
assert isinstance(self._launch_config, ManualLaunchConfiguration)
return self._render_invocation(grid_size, block_size)
def _invoke_automatic(self):
assert isinstance(self._launch_config, AutomaticLaunchConfiguration)
from .composer import SfgComposer
sfg = SfgComposer(self._ctx)
grid_size_entries = [
self._to_uint32_t(sfg.expr_from_lambda(gs))
for gs in self._launch_config._grid_size
]
grid_size_var = self._dim3(const=True).var("__grid_size")
block_size_entries = [
self._to_uint32_t(sfg.expr_from_lambda(bs))
for bs in self._launch_config._block_size
]
block_size_var = self._dim3(const=True).var("__block_size")
nodes = [
sfg.init(grid_size_var)(*grid_size_entries),
sfg.init(block_size_var)(*block_size_entries),
self._render_invocation(grid_size_var, block_size_var),
]
return SfgBlock(SfgSequence(nodes))
def _invoke_dynamic(self, block_size: ExprLike | None = None):
assert isinstance(self._launch_config, DynamicBlockSizeLaunchConfiguration)
from .composer import SfgComposer
sfg = SfgComposer(self._ctx)
block_size_init_args: tuple[ExprLike, ...]
if block_size is None:
block_size_init_args = tuple(
str(bs) for bs in self._launch_config.default_block_size
)
else:
block_size_init_args = (block_size,)
block_size_var = self._dim3(const=True).var("__block_size")
from ..lang.cpp import std
work_items_entries = [
sfg.expr_from_lambda(wit) for wit in self._launch_config.num_work_items
]
work_items_var = std.tuple("uint32_t", "uint32_t", "uint32_t", const=True).var(
"__work_items"
)
def _div_ceil(a: ExprLike, b: ExprLike):
return AugExpr.format("({a} + {b} - 1) / {b}", a=a, b=b)
grid_size_entries = [
_div_ceil(work_items_var.get(i), bs)
for i, bs in enumerate(
[
block_size_var.x,
block_size_var.y,
block_size_var.z,
]
)
]
grid_size_var = self._dim3(const=True).var("__grid_size")
nodes = [
sfg.init(block_size_var)(*block_size_init_args),
sfg.init(work_items_var)(*work_items_entries),
sfg.init(grid_size_var)(*grid_size_entries),
self._render_invocation(grid_size_var, block_size_var),
]
return SfgBlock(SfgSequence(nodes))
@staticmethod
def _to_uint32_t(expr: AugExpr) -> AugExpr:
return AugExpr("uint32_t").format("uint32_t({})", expr)
from __future__ import annotations
from ..context import SfgContext, SfgCursor
from .basic_composer import SfgBasicComposer
class SfgComposerMixIn:
# type: ignore
def __new__(cls, *args, **kwargs):
if not issubclass(cls, SfgBasicComposer):
raise Exception(f"{cls} must be mixed-in with SfgBasicComposer.")
else:
return super().__new__(cls)
def __init__(self) -> None:
self._ctx: SfgContext
self._cursor: SfgCursor
@property
def _composer(self) -> SfgBasicComposer:
assert isinstance(self, SfgBasicComposer)
return self
from __future__ import annotations
from argparse import ArgumentParser, BooleanOptionalAction
from types import ModuleType
from typing import Any, Sequence, Callable
from dataclasses import dataclass
from os import path
from importlib import util as iutil
from pathlib import Path
from pystencils.codegen.config import ConfigBase, Option, BasicOption, Category
from .lang import HeaderFile
class SfgConfigException(Exception): ... # noqa: E701
@dataclass
class FileExtensions(ConfigBase):
"""BasicOption category containing output file extensions."""
header: BasicOption[str] = BasicOption("hpp")
"""File extension for generated header file."""
impl: BasicOption[str] = BasicOption("cpp")
"""File extension for generated implementation file."""
@header.validate
@impl.validate
def _validate_extension(self, ext: str | None) -> str | None:
if ext is not None and ext[0] == ".":
return ext[1:]
return ext
@dataclass
class CodeStyle(ConfigBase):
"""Options affecting the code style used by the source file generator."""
indent_width: BasicOption[int] = BasicOption(2)
"""The number of spaces successively nested blocks should be indented with"""
includes_sorting_key: BasicOption[Callable[[HeaderFile], Any]] = BasicOption()
"""Key function that will be used to sort ``#include`` statements in generated files.
Pystencils-sfg will instruct clang-tidy to forego include sorting if this option is set.
"""
# TODO possible future options:
# - newline before opening {
# - trailing return types
def indent(self, s: str):
from textwrap import indent
prefix = " " * self.get_option("indent_width")
return indent(s, prefix)
@dataclass
class ClangFormatOptions(ConfigBase):
"""Options affecting the invocation of ``clang-format`` for automatic code formatting."""
code_style: BasicOption[str] = BasicOption("file")
"""Code style to be used by clang-format. Passed verbatim to ``--style`` argument of the clang-format CLI.
Similar to clang-format itself, the default value is ``file``, such that a ``.clang-format`` file found in the build
tree will automatically be used.
"""
force: BasicOption[bool] = BasicOption(False)
"""If set to ``True``, abort code generation if ``clang-format`` binary cannot be found."""
skip: BasicOption[bool] = BasicOption(False)
"""If set to ``True``, skip formatting using ``clang-format``."""
binary: BasicOption[str] = BasicOption("clang-format")
"""Path to the clang-format executable"""
@force.validate
def _validate_force(self, val: bool) -> bool:
if val and self.skip:
raise SfgConfigException(
"Cannot set both `clang_format.force` and `clang_format.skip` at the same time"
)
return val
@skip.validate
def _validate_skip(self, val: bool) -> bool:
if val and self.force:
raise SfgConfigException(
"Cannot set both `clang_format.force` and `clang_format.skip` at the same time"
)
return val
class _GlobalNamespace: ... # noqa: E701
GLOBAL_NAMESPACE = _GlobalNamespace()
"""Indicates the C++ global namespace."""
@dataclass
class SfgConfig(ConfigBase):
"""Configuration options for the `SourceFileGenerator`."""
extensions: Category[FileExtensions] = Category(FileExtensions())
"""File extensions of the generated files
Options in this category:
.. autosummary::
FileExtensions.header
FileExtensions.impl
"""
header_only: BasicOption[bool] = BasicOption(False)
"""If set to `True`, generate only a header file.
This will cause all definitions to be generated ``inline``.
"""
outer_namespace: BasicOption[str | _GlobalNamespace] = BasicOption(GLOBAL_NAMESPACE)
"""The outermost namespace in the generated file. May be a valid C++ nested namespace qualifier
(like ``a::b::c``) or `GLOBAL_NAMESPACE` if no outer namespace should be generated.
.. autosummary::
GLOBAL_NAMESPACE
"""
codestyle: Category[CodeStyle] = Category(CodeStyle())
"""Options affecting the code style emitted by pystencils-sfg.
Options in this category:
.. autosummary::
CodeStyle.indent_width
"""
clang_format: Category[ClangFormatOptions] = Category(ClangFormatOptions())
"""Options governing the code style used by the code generator
Options in this category:
.. autosummary::
ClangFormatOptions.code_style
ClangFormatOptions.force
ClangFormatOptions.skip
ClangFormatOptions.binary
"""
output_directory: Option[Path, str | Path] = Option(Path("."))
"""Directory to which the generated files should be written."""
@output_directory.validate
def _validate_output_directory(self, pth: str | Path) -> Path:
return Path(pth)
def _get_output_files(self, basename: str):
output_dir: Path = self.get_option("output_directory")
header_ext = self.extensions.get_option("header")
impl_ext = self.extensions.get_option("impl")
output_files = [output_dir / f"{basename}.{header_ext}"]
header_only = self.get_option("header_only")
if not header_only:
assert impl_ext is not None
output_files.append(output_dir / f"{basename}.{impl_ext}")
return tuple(output_files)
class CommandLineParameters:
@staticmethod
def add_args_to_parser(parser: ArgumentParser):
config_group = parser.add_argument_group("Configuration")
config_group.add_argument(
"--sfg-output-dir", type=str, default=None, dest="output_directory"
)
config_group.add_argument(
"--sfg-file-extensions",
type=str,
default=None,
dest="file_extensions",
help="Comma-separated list of file extensions",
)
config_group.add_argument(
"--sfg-header-only",
action=BooleanOptionalAction,
dest="header_only",
help="Generate only a header file.",
)
config_group.add_argument(
"--sfg-config-module", type=str, default=None, dest="config_module_path"
)
return parser
def __init__(self, args) -> None:
self._cl_config_module_path: str | None = args.config_module_path
self._cl_header_only: bool | None = args.header_only
self._cl_output_dir: str | None = args.output_directory
if args.file_extensions is not None:
file_extentions = list(args.file_extensions.split(","))
h_ext, impl_ext = self._get_file_extensions(file_extentions)
self._cl_header_ext = h_ext
self._cl_impl_ext = impl_ext
else:
self._cl_header_ext = None
self._cl_impl_ext = None
self._config_module: ModuleType | None
if self._cl_config_module_path is not None:
self._config_module = self._import_config_module(
self._cl_config_module_path
)
else:
self._config_module = None
@property
def configuration_module(self) -> ModuleType | None:
return self._config_module
def get_config(self) -> SfgConfig:
cfg = SfgConfig()
if self._config_module is not None and hasattr(
self._config_module, "configure_sfg"
):
self._config_module.configure_sfg(cfg)
if self._cl_header_only is not None:
cfg.header_only = self._cl_header_only
if self._cl_header_ext is not None:
cfg.extensions.header = self._cl_header_ext
if self._cl_impl_ext is not None:
cfg.extensions.impl = self._cl_impl_ext
if self._cl_output_dir is not None:
cfg.output_directory = self._cl_output_dir
return cfg
def find_conflicts(self, cfg: SfgConfig):
for name, mine, theirs in (
("header_only", self._cl_header_only, cfg.header_only),
("extensions.header", self._cl_header_ext, cfg.extensions.header),
("extensions.impl", self._cl_impl_ext, cfg.extensions.impl),
("output_directory", self._cl_output_dir, cfg.output_directory),
):
if mine is not None and theirs is not None and mine != theirs:
raise SfgConfigException(
f"Conflicting values given for option {name} on command line and inside generator script.\n"
f" Value on command-line: {name}",
f" Value in script: {name}",
)
def get_project_info(self) -> Any:
if self._config_module is not None and hasattr(
self._config_module, "project_info"
):
return self._config_module.project_info()
else:
return None
def _get_file_extensions(self, extensions: Sequence[str]):
h_ext = None
src_ext = None
extensions = tuple(ext.strip() for ext in extensions)
extensions = tuple((ext[1:] if ext[0] == "." else ext) for ext in extensions)
HEADER_FILE_EXTENSIONS = {"h", "hpp", "hxx", "h++", "cuh"}
IMPL_FILE_EXTENSIONS = {"c", "cpp", "cxx", "c++", "cu", "hip"}
for ext in extensions:
if ext in HEADER_FILE_EXTENSIONS:
if h_ext is not None:
raise SfgConfigException(
"Multiple header file extensions specified."
)
h_ext = ext
elif ext in IMPL_FILE_EXTENSIONS:
if src_ext is not None:
raise SfgConfigException(
"Multiple source file extensions specified."
)
src_ext = ext
else:
raise SfgConfigException(
f"Invalid file extension: Don't know what to do with '.{ext}'"
)
return h_ext, src_ext
def _import_config_module(self, module_path: str) -> ModuleType:
cfg_modulename = path.splitext(path.split(module_path)[1])[0]
cfg_spec = iutil.spec_from_file_location(cfg_modulename, module_path)
if cfg_spec is None:
raise SfgConfigException(
f"Unable to import configuration module {module_path}",
)
config_module = iutil.module_from_spec(cfg_spec)
cfg_spec.loader.exec_module(config_module) # type: ignore
return config_module
# mypy: strict_optional=False
from __future__ import annotations
from typing import Sequence, Any
from os import path
from enum import Enum, auto
from dataclasses import dataclass, replace, fields, InitVar
from argparse import ArgumentParser
from textwrap import indent
from importlib import util as iutil
from .exceptions import SfgException
HEADER_FILE_EXTENSIONS = {"h", "hpp"}
IMPL_FILE_EXTENSIONS = {"c", "cpp", ".impl.h"}
class SfgConfigSource(Enum):
DEFAULT = auto()
PROJECT = auto()
COMMANDLINE = auto()
SCRIPT = auto()
class SfgConfigException(Exception):
def __init__(self, cfg_src: SfgConfigSource | None, message: str):
super().__init__(cfg_src, message)
self.message = message
self.config_source = cfg_src
@dataclass
class SfgCodeStyle:
indent_width: int = 2
code_style: str = "file"
"""Code style to be used by clang-format. Passed verbatim to `--style` argument of the clang-format CLI.
Similar to clang-format itself, the default value is `file`, such that a `.clang-format` file found in the build
tree will automatically be used.
"""
force_clang_format: bool = False
"""If set to True, abort code generation if `clang-format` binary cannot be found."""
clang_format_binary: str = "clang-format"
"""Path to the clang-format executable"""
def indent(self, s: str):
prefix = " " * self.indent_width
return indent(s, prefix)
@dataclass
class SfgOutputSpec:
"""Name and path specification for files output by the code generator.
Filenames are constructed as `<output_directory>/<basename>.<extension>`."""
output_directory: str
"""Directory to which the generated files should be written."""
basename: str
"""Base name for output files."""
header_extension: str
"""File extension for generated header file."""
impl_extension: str
"""File extension for generated implementation file."""
def get_header_filename(self):
return f"{self.basename}.{self.header_extension}"
def get_impl_filename(self):
return f"{self.basename}.{self.impl_extension}"
def get_header_filepath(self):
return path.join(self.output_directory, self.get_header_filename())
def get_impl_filepath(self):
return path.join(self.output_directory, self.get_impl_filename())
@dataclass
class SfgConfiguration:
"""
Configuration for the [SfgSourceFileGenerator][pystencilssfg.SourceFileGenerator].
The source file generator draws configuration from a total of four sources:
- The [default configuration][pystencilssfg.configuration.DEFAULT_CONFIG];
- The project configuration;
- Command-line arguments;
- The user configuration passed to the constructor of `SourceFileGenerator`.
They take precedence in the following way:
- Project configuration overrides the default configuration
- Command line arguments override the project configuration
- User configuration overrides default and project configuration,
and must not conflict with command-line arguments; otherwise, an error is thrown.
### Project Configuration via Configurator Script
Currently, the only way to define the project configuration is via a configuration module.
A configurator module is a Python file defining the following function at the top-level:
```Python
from pystencilssfg import SfgConfiguration
def sfg_config() -> SfgConfiguration:
# ...
return SfgConfiguration(
# ...
)
```
The configuration module is passed to the code generation script via the command-line argument
`--sfg-config-module`.
"""
config_source: InitVar[SfgConfigSource | None] = None
header_extension: str | None = None
"""File extension for generated header file."""
impl_extension: str | None = None
"""File extension for generated implementation file."""
header_only: bool | None = None
"""If set to `True`, generate only a header file without accompaning source file."""
outer_namespace: str | None = None
"""The outermost namespace in the generated file. May be a valid C++ nested namespace qualifier
(like `a::b::c`) or `None` if no outer namespace should be generated."""
codestyle: SfgCodeStyle | None = None
"""Code style that should be used by the code generator."""
output_directory: str | None = None
"""Directory to which the generated files should be written."""
project_info: Any = None
"""Object for managing project-specific information. To be set by the configurator script."""
def __post_init__(self, cfg_src: SfgConfigSource | None = None):
if self.header_only:
raise SfgConfigException(
cfg_src, "Header-only code generation is not implemented yet."
)
if self.header_extension and self.header_extension[0] == ".":
self.header_extension = self.header_extension[1:]
if self.impl_extension and self.impl_extension[0] == ".":
self.impl_extension = self.impl_extension[1:]
def override(self, other: SfgConfiguration):
other_dict: dict[str, Any] = {
k: v for k, v in _shallow_dict(other).items() if v is not None
}
return replace(self, **other_dict)
def get_output_spec(self, basename: str) -> SfgOutputSpec:
assert self.header_extension is not None
assert self.impl_extension is not None
assert self.output_directory is not None
return SfgOutputSpec(
self.output_directory, basename, self.header_extension, self.impl_extension
)
DEFAULT_CONFIG = SfgConfiguration(
config_source=SfgConfigSource.DEFAULT,
header_extension="h",
impl_extension="cpp",
header_only=False,
outer_namespace=None,
codestyle=SfgCodeStyle(),
output_directory=".",
)
"""Default configuration for the [SourceFileGenerator][pystencilssfg.SourceFileGenerator]."""
def run_configurator(configurator_script: str):
cfg_modulename = path.splitext(path.split(configurator_script)[1])[0]
cfg_spec = iutil.spec_from_file_location(cfg_modulename, configurator_script)
if cfg_spec is None:
raise SfgConfigException(
SfgConfigSource.PROJECT,
f"Unable to load configurator script {configurator_script}",
)
configurator = iutil.module_from_spec(cfg_spec)
cfg_spec.loader.exec_module(configurator)
if not hasattr(configurator, "sfg_config"):
raise SfgConfigException(
SfgConfigSource.PROJECT,
"Project configurator does not define function `sfg_config`.",
)
project_config = configurator.sfg_config()
if not isinstance(project_config, SfgConfiguration):
raise SfgConfigException(
SfgConfigSource.PROJECT,
"sfg_config did not return a SfgConfiguration object.",
)
return project_config
def add_config_args_to_parser(parser: ArgumentParser):
config_group = parser.add_argument_group("Configuration")
config_group.add_argument(
"--sfg-output-dir", type=str, default=None, dest="output_directory"
)
config_group.add_argument(
"--sfg-file-extensions",
type=str,
default=None,
dest="file_extensions",
help="Comma-separated list of file extensions",
)
config_group.add_argument(
"--sfg-header-only", default=None, action="store_true", dest="header_only"
)
config_group.add_argument(
"--sfg-config-module", type=str, default=None, dest="configurator_script"
)
return parser
def config_from_parser_args(args):
if args.configurator_script is not None:
project_config = run_configurator(args.configurator_script)
else:
project_config = None
if args.file_extensions is not None:
file_extentions = list(args.file_extensions.split(","))
h_ext, src_ext = _get_file_extensions(
SfgConfigSource.COMMANDLINE, file_extentions
)
else:
h_ext, src_ext = None, None
cmdline_config = SfgConfiguration(
config_source=SfgConfigSource.COMMANDLINE,
header_extension=h_ext,
impl_extension=src_ext,
header_only=args.header_only,
output_directory=args.output_directory,
)
return project_config, cmdline_config
def config_from_commandline(argv: list[str]):
parser = ArgumentParser(
"pystencilssfg",
description="pystencils Source File Generator",
allow_abbrev=False,
)
add_config_args_to_parser(parser)
args, script_args = parser.parse_known_args(argv)
project_config, cmdline_config = config_from_parser_args(args)
return project_config, cmdline_config, script_args
def merge_configurations(
project_config: SfgConfiguration | None,
cmdline_config: SfgConfiguration | None,
script_config: SfgConfiguration | None,
):
# Project config completely overrides default config
config = DEFAULT_CONFIG
if project_config is not None:
config = config.override(project_config)
if cmdline_config is not None:
cmdline_dict = _shallow_dict(cmdline_config)
# Commandline config completely overrides project and default config
config = config.override(cmdline_config)
else:
cmdline_dict = {}
if script_config is not None:
# User config may only set values not specified on the command line
script_dict = _shallow_dict(script_config)
for key, cmdline_value in cmdline_dict.items():
if cmdline_value is not None and script_dict[key] is not None:
raise SfgException(
"Conflicting configuration:"
+ f" Parameter {key} was specified both in the script and on the command line."
)
config = config.override(script_config)
return config
def _get_file_extensions(cfgsrc: SfgConfigSource, extensions: Sequence[str]):
h_ext = None
src_ext = None
extensions = tuple((ext[1:] if ext[0] == "." else ext) for ext in extensions)
for ext in extensions:
if ext in HEADER_FILE_EXTENSIONS:
if h_ext is not None:
raise SfgConfigException(
cfgsrc, "Multiple header file extensions specified."
)
h_ext = ext
elif ext in IMPL_FILE_EXTENSIONS:
if src_ext is not None:
raise SfgConfigException(
cfgsrc, "Multiple source file extensions specified."
)
src_ext = ext
else:
raise SfgConfigException(
cfgsrc, f"Don't know how to interpret file extension '.{ext}'"
)
return h_ext, src_ext
def _shallow_dict(obj):
"""Workaround to create a shallow dict of a dataclass object, see
https://docs.python.org/3/library/dataclasses.html#dataclasses.asdict."""
return dict((field.name, getattr(obj, field.name)) for field in fields(obj))
from typing import Generator, Sequence, Any from __future__ import annotations
from typing import Sequence, Any, Generator
from .configuration import SfgCodeStyle from contextlib import contextmanager
from .source_components import (
SfgHeaderInclude, from .config import CodeStyle, ClangFormatOptions
SfgKernelNamespace, from .ir import (
SfgFunction, SfgSourceFile,
SfgClass, SfgNamespace,
SfgNamespaceBlock,
SfgCodeEntity,
SfgGlobalNamespace,
) )
from .ir.syntax import SfgNamespaceElement
from .exceptions import SfgException from .exceptions import SfgException
class SfgContext: class SfgContext:
"""Represents a header/implementation file pair in the code generator. """Manages context information during the execution of a generator script."""
## Source File Properties and Components
The SfgContext collects all properties and components of a header/implementation
file pair (or just the header file, if header-only generation is used).
These are:
- The code namespace, which is combined from the [outer_namespace][pystencilssfg.SfgContext.outer_namespace]
and the [inner_namespace][pystencilssfg.SfgContext.inner_namespace]. The outer namespace is meant to be set
externally e.g. by the project configuration, while the inner namespace is meant to be set by the generator
script.
- The [prelude comment][pystencilssfg.SfgContext.prelude_comment] is a block of text printed as a comment block
at the top of both generated files. Typically, it contains authorship and licence information.
- The set of [Included header files][pystencilssfg.SfgContext.includes].
- Custom [definitions][pystencilssfg.SfgContext.definitions], which are just arbitrary code strings.
- Any number of [kernel namespaces][pystencilssfg.SfgContext.kernel_namespaces], within which *pystencils*
kernels are managed.
- Any number of [functions][pystencilssfg.SfgContext.functions], which are meant to serve as wrappers
around kernel calls.
- Any number of [classes][pystencilssfg.SfgContext.classes], which can be used to build more extensive wrappers
around kernels.
## Order of Definitions
To honor C/C++ use-after-declare rules, the context preserves the order in which definitions, functions and classes
are added to it.
The header file printers implemented in *pystencils-sfg* will print the declarations accordingly.
The declarations can retrieved in order of definition via
[declarations_ordered][pystencilssfg.SfgContext.declarations_ordered].
"""
def __init__( def __init__(
self, self,
outer_namespace: str | None = None, header_file: SfgSourceFile,
codestyle: SfgCodeStyle = SfgCodeStyle(), impl_file: SfgSourceFile | None,
namespace: str | None = None,
codestyle: CodeStyle | None = None,
clang_format_opts: ClangFormatOptions | None = None,
argv: Sequence[str] | None = None, argv: Sequence[str] | None = None,
project_info: Any = None, project_info: Any = None,
): ):
"""
Args:
outer_namespace: Qualified name of the outer code namespace
codestyle: Code style that should be used by the code emitter
argv: The generator script's command line arguments.
Reserved for internal use by the [SourceFileGenerator][pystencilssfg.SourceFileGenerator].
project_info: Project-specific information provided by a build system.
Reserved for internal use by the [SourceFileGenerator][pystencilssfg.SourceFileGenerator].
"""
self._argv = argv self._argv = argv
self._project_info = project_info self._project_info = project_info
self._default_kernel_namespace = SfgKernelNamespace(self, "kernels")
self._outer_namespace = outer_namespace self._outer_namespace = namespace
self._inner_namespace: str | None = None self._inner_namespace: str | None = None
self._codestyle = codestyle self._codestyle = codestyle if codestyle is not None else CodeStyle()
self._clang_format: ClangFormatOptions = (
clang_format_opts if clang_format_opts is not None else ClangFormatOptions()
)
# Source Components self._header_file = header_file
self._prelude: str = "" self._impl_file = impl_file
self._includes: set[SfgHeaderInclude] = set()
self._definitions: list[str] = []
self._kernel_namespaces = {
self._default_kernel_namespace.name: self._default_kernel_namespace
}
self._functions: dict[str, SfgFunction] = dict()
self._classes: dict[str, SfgClass] = dict()
self._declarations_ordered: list[str | SfgFunction | SfgClass] = list() self._global_namespace = SfgGlobalNamespace()
# Standard stuff current_namespace: SfgNamespace
self.add_include(SfgHeaderInclude("cstdint", system_header=True)) if namespace is not None:
self.add_definition("#define RESTRICT __restrict__") current_namespace = self._global_namespace.get_child_namespace(namespace)
else:
current_namespace = self._global_namespace
self._cursor = SfgCursor(self, current_namespace)
@property @property
def argv(self) -> Sequence[str]: def argv(self) -> Sequence[str]:
...@@ -106,162 +73,101 @@ class SfgContext: ...@@ -106,162 +73,101 @@ class SfgContext:
return self._outer_namespace return self._outer_namespace
@property @property
def inner_namespace(self) -> str | None: def codestyle(self) -> CodeStyle:
"""Inner code namespace. Set by `set_namespace`."""
return self._inner_namespace
@property
def fully_qualified_namespace(self) -> str | None:
"""Combined outer and inner namespaces, as `outer_namespace::inner_namespace`."""
match (self.outer_namespace, self.inner_namespace):
case None, None:
return None
case outer, None:
return outer
case None, inner:
return inner
case outer, inner:
return f"{outer}::{inner}"
case _:
assert False
@property
def codestyle(self) -> SfgCodeStyle:
"""The code style object for this generation context.""" """The code style object for this generation context."""
return self._codestyle return self._codestyle
# ----------------------------------------------------------------------------------------------
# Prelude, Includes, Definitions, Namespace
# ----------------------------------------------------------------------------------------------
@property @property
def prelude_comment(self) -> str: def clang_format(self) -> ClangFormatOptions:
"""The prelude is a comment block printed at the top of both generated files.""" return self._clang_format
return self._prelude
def append_to_prelude(self, code_str: str):
"""Append a string to the prelude comment.
The string should not contain
C/C++ comment delimiters, since these will be added automatically during
code generation.
"""
if self._prelude:
self._prelude += "\n"
self._prelude += code_str
if not code_str.endswith("\n"):
self._prelude += "\n"
def includes(self) -> Generator[SfgHeaderInclude, None, None]:
"""Includes of headers. Public includes are added to the header file, private includes
are added to the implementation file."""
yield from self._includes
def add_include(self, include: SfgHeaderInclude):
self._includes.add(include)
def definitions(self) -> Generator[str, None, None]: @property
"""Definitions are arbitrary custom lines of code.""" def header_file(self) -> SfgSourceFile:
yield from self._definitions return self._header_file
def add_definition(self, definition: str):
"""Add a custom code string to the header file."""
self._definitions.append(definition)
self._declarations_ordered.append(definition)
def set_namespace(self, namespace: str):
"""Set the inner code namespace.
Throws an exception if the namespace was already set.
"""
if self._inner_namespace is not None:
raise SfgException("The code namespace was already set.")
self._inner_namespace = namespace
# ----------------------------------------------------------------------------------------------
# Kernel Namespaces
# ----------------------------------------------------------------------------------------------
@property @property
def default_kernel_namespace(self) -> SfgKernelNamespace: def impl_file(self) -> SfgSourceFile | None:
"""The default kernel namespace.""" return self._impl_file
return self._default_kernel_namespace
def kernel_namespaces(self) -> Generator[SfgKernelNamespace, None, None]: @property
"""Iterator over all registered kernel namespaces.""" def cursor(self) -> SfgCursor:
yield from self._kernel_namespaces.values() return self._cursor
def get_kernel_namespace(self, str) -> SfgKernelNamespace | None: @property
"""Retrieve a kernel namespace by name, or `None` if it does not exist.""" def files(self) -> Generator[SfgSourceFile, None, None]:
return self._kernel_namespaces.get(str) yield self._header_file
if self._impl_file is not None:
yield self._impl_file
def add_kernel_namespace(self, namespace: SfgKernelNamespace): @property
"""Adds a new kernel namespace. def global_namespace(self) -> SfgNamespace:
return self._global_namespace
If a kernel namespace of the same name already exists, throws an exception.
"""
if namespace.name in self._kernel_namespaces:
raise ValueError(f"Duplicate kernel namespace: {namespace.name}")
self._kernel_namespaces[namespace.name] = namespace class SfgCursor:
"""Cursor that tracks the current location in the source file(s) during execution of the generator script."""
# ---------------------------------------------------------------------------------------------- def __init__(self, ctx: SfgContext, namespace: SfgNamespace) -> None:
# Functions self._ctx = ctx
# ----------------------------------------------------------------------------------------------
def functions(self) -> Generator[SfgFunction, None, None]: self._cur_namespace: SfgNamespace = namespace
"""Iterator over all registered functions."""
yield from self._functions.values()
def get_function(self, name: str) -> SfgFunction | None: self._loc: dict[SfgSourceFile, list[SfgNamespaceElement]] = dict()
"""Retrieve a function by name. Returns `None` if no function of the given name exists.""" for f in self._ctx.files:
return self._functions.get(name, None) if not isinstance(namespace, SfgGlobalNamespace):
block = SfgNamespaceBlock(
self._cur_namespace, self._cur_namespace.fqname
)
f.elements.append(block)
self._loc[f] = block.elements
else:
self._loc[f] = f.elements
def add_function(self, func: SfgFunction): @property
"""Adds a new function. def context(self) -> SfgContext:
return self._ctx
If a function or class with the same name exists already, throws an exception. @property
""" def current_namespace(self) -> SfgNamespace:
if func.name in self._functions or func.name in self._classes: return self._cur_namespace
raise SfgException(f"Duplicate function: {func.name}")
self._functions[func.name] = func def get_entity(self, name: str) -> SfgCodeEntity | None:
self._declarations_ordered.append(func) return self._cur_namespace.get_entity(name)
# ---------------------------------------------------------------------------------------------- def add_entity(self, entity: SfgCodeEntity):
# Classes self._cur_namespace.add_entity(entity)
# ----------------------------------------------------------------------------------------------
def classes(self) -> Generator[SfgClass, None, None]: def write_header(self, elem: SfgNamespaceElement) -> None:
"""Iterator over all registered classes.""" self._loc[self._ctx.header_file].append(elem)
yield from self._classes.values()
def get_class(self, name: str) -> SfgClass | None: def write_impl(self, elem: SfgNamespaceElement) -> None:
"""Retrieve a class by name, or `None` if the class does not exist.""" impl_file = self._ctx.impl_file
return self._classes.get(name, None) if impl_file is None:
raise SfgException(
f"Cannot write element {elem} to implemenation file since no implementation file is being generated."
)
self._loc[impl_file].append(elem)
def add_class(self, cls: SfgClass): def enter_namespace(self, qual_name: str):
"""Add a class. namespace = self._cur_namespace.get_child_namespace(qual_name)
Throws an exception if a class or function of the same name exists already. outer_locs = self._loc.copy()
"""
if cls.class_name in self._classes or cls.class_name in self._functions:
raise SfgException(f"Duplicate class: {cls.class_name}")
self._classes[cls.class_name] = cls for f in self._ctx.files:
self._declarations_ordered.append(cls) block = SfgNamespaceBlock(namespace, qual_name)
self._loc[f].append(block)
self._loc[f] = block.elements
# ---------------------------------------------------------------------------------------------- outer_namespace = self._cur_namespace
# Declarations in order of addition self._cur_namespace = namespace
# ----------------------------------------------------------------------------------------------
def declarations_ordered( @contextmanager
self, def ctxmgr():
) -> Generator[str | SfgFunction | SfgClass, None, None]: try:
"""All declared definitions, classes and functions in the order they were added. yield None
finally:
# Have the cursor step back out of the nested namespace blocks
self._loc = outer_locs
self._cur_namespace = outer_namespace
Awareness about order is necessary due to the C++ declare-before-use rules.""" return ctxmgr()
yield from self._declarations_ordered
from .header_impl_pair import HeaderImplPairEmitter from .emitter import SfgCodeEmitter
from .file_printer import SfgFilePrinter
__all__ = [ __all__ = ["SfgCodeEmitter", "SfgFilePrinter"]
"HeaderImplPairEmitter"
]
import subprocess import subprocess
import shutil import shutil
from ..configuration import SfgCodeStyle from ..config import ClangFormatOptions
from ..exceptions import SfgException from ..exceptions import SfgException
def invoke_clang_format(code: str, codestyle: SfgCodeStyle) -> str: def invoke_clang_format(
code: str, options: ClangFormatOptions, sort_includes: str | None = None
) -> str:
"""Call the `clang-format` command-line tool to format the given code string """Call the `clang-format` command-line tool to format the given code string
according to the given style arguments. according to the given style arguments.
Args: Args:
code: Code string to format code: Code string to format
codestyle: [SfgCodeStyle][pystencilssfg.configuration.SfgCodeStyle] object options: Options controlling the clang-format invocation
defining the `clang-format` binary and the desired code style. sort_includes: Option to be passed on to clang-format's ``--sort-includes`` argument
Returns: Returns:
The formatted code, if `clang-format` was run sucessfully. The formatted code, if `clang-format` was run sucessfully.
...@@ -24,10 +26,19 @@ def invoke_clang_format(code: str, codestyle: SfgCodeStyle) -> str: ...@@ -24,10 +26,19 @@ def invoke_clang_format(code: str, codestyle: SfgCodeStyle) -> str:
be executed (binary not found, or error during exection), the function will be executed (binary not found, or error during exection), the function will
throw an exception. throw an exception.
""" """
args = [codestyle.clang_format_binary, f"--style={codestyle.code_style}"] if options.get_option("skip"):
return code
if not shutil.which(codestyle.clang_format_binary): binary = options.get_option("binary")
if codestyle.force_clang_format: force = options.get_option("force")
style = options.get_option("code_style")
args = [binary, f"--style={style}"]
if sort_includes is not None:
args += ["--sort-includes", sort_includes]
if not shutil.which(binary):
if force:
raise SfgException( raise SfgException(
"`force_clang_format` was set to true in code style, " "`force_clang_format` was set to true in code style, "
"but clang-format binary could not be found." "but clang-format binary could not be found."
...@@ -38,7 +49,7 @@ def invoke_clang_format(code: str, codestyle: SfgCodeStyle) -> str: ...@@ -38,7 +49,7 @@ def invoke_clang_format(code: str, codestyle: SfgCodeStyle) -> str:
result = subprocess.run(args, input=code, capture_output=True, text=True) result = subprocess.run(args, input=code, capture_output=True, text=True)
if result.returncode != 0: if result.returncode != 0:
if codestyle.force_clang_format: if force:
raise SfgException(f"Call to clang-format failed: \n{result.stderr}") raise SfgException(f"Call to clang-format failed: \n{result.stderr}")
else: else:
return code return code
......
from __future__ import annotations
from pathlib import Path
from ..config import CodeStyle, ClangFormatOptions
from ..ir import SfgSourceFile
from .file_printer import SfgFilePrinter
from .clang_format import invoke_clang_format
class SfgCodeEmitter:
def __init__(
self,
output_directory: Path,
code_style: CodeStyle = CodeStyle(),
clang_format: ClangFormatOptions = ClangFormatOptions(),
):
self._output_dir = output_directory
self._code_style = code_style
self._clang_format_opts = clang_format
self._printer = SfgFilePrinter(code_style)
def dumps(self, file: SfgSourceFile) -> str:
code = self._printer(file)
if self._code_style.get_option("includes_sorting_key") is not None:
sort_includes = "Never"
else:
sort_includes = None
code = invoke_clang_format(
code, self._clang_format_opts, sort_includes=sort_includes
)
return code
def emit(self, file: SfgSourceFile):
code = self.dumps(file)
self._output_dir.mkdir(parents=True, exist_ok=True)
fpath = self._output_dir / file.name
fpath.write_text(code)
from __future__ import annotations
from textwrap import indent
from pystencils.backend.emission import CAstPrinter
from ..ir import (
SfgSourceFile,
SfgSourceFileType,
SfgNamespaceBlock,
SfgEntityDecl,
SfgEntityDef,
SfgKernelHandle,
SfgFunction,
SfgClassMember,
SfgMethod,
SfgMemberVariable,
SfgConstructor,
SfgClass,
SfgClassBody,
SfgVisibilityBlock,
SfgVisibility,
)
from ..ir.syntax import SfgNamespaceElement, SfgClassBodyElement
from ..config import CodeStyle
class SfgFilePrinter:
def __init__(self, code_style: CodeStyle) -> None:
self._code_style = code_style
self._indent_width = code_style.get_option("indent_width")
def __call__(self, file: SfgSourceFile) -> str:
code = ""
if file.prelude:
comment = "/**\n"
comment += indent(file.prelude, " * ", predicate=lambda _: True)
comment += " */\n\n"
code += comment
if file.file_type == SfgSourceFileType.HEADER:
code += "#pragma once\n\n"
for header in file.includes:
incl = str(header) if header.system_header else f'"{str(header)}"'
code += f"#include {incl}\n"
if file.includes:
code += "\n"
# Here begins the actual code
code += "\n\n".join(self.visit(elem) for elem in file.elements)
code += "\n"
return code
def visit(
self, elem: SfgNamespaceElement | SfgClassBodyElement, inclass: bool = False
) -> str:
match elem:
case str():
return elem
case SfgNamespaceBlock(_, elements, label):
code = f"namespace {label} {{\n"
code += self._code_style.indent(
"\n\n".join(self.visit(e) for e in elements)
)
code += f"\n}} // namespace {label}"
return code
case SfgEntityDecl(entity):
return self.visit_decl(entity, inclass)
case SfgEntityDef(entity):
return self.visit_defin(entity, inclass)
case SfgClassBody():
return self.visit_defin(elem, inclass)
case _:
assert False, "illegal code element"
def visit_decl(
self,
declared_entity: SfgKernelHandle | SfgFunction | SfgClassMember | SfgClass,
inclass: bool = False,
) -> str:
match declared_entity:
case SfgKernelHandle(kernel):
kernel_printer = CAstPrinter(
indent_width=self._indent_width,
func_prefix="inline" if declared_entity.inline else None,
)
return kernel_printer.print_signature(kernel) + ";"
case SfgFunction(name, _, params) | SfgMethod(name, _, params):
return self._func_signature(declared_entity, inclass) + ";"
case SfgConstructor(cls, params):
params_str = ", ".join(
f"{param.dtype.c_string()} {param.name}" for param in params
)
return f"{cls.name}({params_str});"
case SfgMemberVariable(name, dtype):
return f"{dtype.c_string()} {name};"
case SfgClass(kwd, name):
return f"{str(kwd)} {name};"
case _:
assert False, f"unsupported declared entity: {declared_entity}"
def visit_defin(
self,
defined_entity: SfgKernelHandle | SfgFunction | SfgClassMember | SfgClassBody,
inclass: bool = False,
) -> str:
match defined_entity:
case SfgKernelHandle(kernel):
kernel_printer = CAstPrinter(
indent_width=self._indent_width,
func_prefix="inline" if defined_entity.inline else None,
)
return kernel_printer(kernel)
case SfgFunction(name, tree, params) | SfgMethod(name, tree, params):
sig = self._func_signature(defined_entity, inclass)
body = tree.get_code(self._code_style)
body = "\n{\n" + self._code_style.indent(body) + "\n}"
return sig + body
case SfgConstructor(cls, params):
params_str = ", ".join(
f"{param.dtype.c_string()} {param.name}" for param in params
)
code = ""
if not inclass:
code += f"{cls.name}::"
code += f"{cls.name} ({params_str})"
inits: list[str] = []
for var, args in defined_entity.initializers:
args_str = ", ".join(str(arg) for arg in args)
inits.append(f"{str(var)}({args_str})")
if inits:
code += "\n:" + ",\n".join(inits)
code += "\n{\n" + self._code_style.indent(defined_entity.body) + "\n}"
return code
case SfgMemberVariable(name, dtype):
code = dtype.c_string()
if not inclass:
code += f" {defined_entity.owning_class.name}::"
code += f" {name}"
if defined_entity.default_init is not None:
args_str = ", ".join(
str(expr) for expr in defined_entity.default_init
)
code += "{" + args_str + "}"
code += ";"
return code
case SfgClassBody(cls, vblocks):
code = f"{cls.class_keyword} {cls.name}"
if cls.base_classes:
code += " : " + ", ".join(cls.base_classes)
code += " {\n"
vblocks_str = [self._visibility_block(b) for b in vblocks]
code += "\n\n".join(vblocks_str)
code += "\n};\n"
return code
case _:
assert False, f"unsupported defined entity: {defined_entity}"
def _visibility_block(self, vblock: SfgVisibilityBlock):
prefix = (
f"{vblock.visibility}:\n"
if vblock.visibility != SfgVisibility.DEFAULT
else ""
)
elements = [self.visit(elem, inclass=True) for elem in vblock.elements]
return prefix + self._code_style.indent("\n".join(elements))
def _func_signature(self, func: SfgFunction | SfgMethod, inclass: bool):
code = ""
if func.attributes:
code += "[[" + ", ".join(func.attributes) + "]]"
if func.inline and not inclass:
code += "inline "
if isinstance(func, SfgMethod) and inclass:
if func.static:
code += "static "
if func.virtual:
code += "virtual "
if func.constexpr:
code += "constexpr "
code += func.return_type.c_string() + " "
params_str = ", ".join(
f"{param.dtype.c_string()} {param.name}" for param in func.parameters
)
if isinstance(func, SfgMethod) and not inclass:
code += f"{func.owning_class.name}::"
code += f"{func.name}({params_str})"
if isinstance(func, SfgMethod):
if func.const:
code += " const"
if func.override and inclass:
code += " override"
return code
from os import path, makedirs
from ..configuration import SfgOutputSpec
from ..context import SfgContext
from .prepare import prepare_context
from .printers import SfgHeaderPrinter, SfgImplPrinter
from .clang_format import invoke_clang_format
class HeaderImplPairEmitter:
"""Emits a header-implementation file pair."""
def __init__(self, output_spec: SfgOutputSpec):
"""Create a `HeaderImplPairEmitter` from an [SfgOutputSpec][pystencilssfg.configuration.SfgOutputSpec]."""
self._basename = output_spec.basename
self._output_directory = output_spec.output_directory
self._header_filename = output_spec.get_header_filename()
self._impl_filename = output_spec.get_impl_filename()
self._ospec = output_spec
@property
def output_files(self) -> tuple[str, str]:
"""The files that will be written by `write_files`."""
return (
path.join(self._output_directory, self._header_filename),
path.join(self._output_directory, self._impl_filename),
)
def write_files(self, ctx: SfgContext):
"""Write the code represented by the given [SfgContext][pystencilssfg.SfgContext] to the files
specified by the output specification."""
ctx = prepare_context(ctx)
header_printer = SfgHeaderPrinter(ctx, self._ospec)
impl_printer = SfgImplPrinter(ctx, self._ospec)
header = header_printer.get_code()
impl = impl_printer.get_code()
header = invoke_clang_format(header, ctx.codestyle)
impl = invoke_clang_format(impl, ctx.codestyle)
makedirs(self._output_directory, exist_ok=True)
with open(self._ospec.get_header_filepath(), "w") as headerfile:
headerfile.write(header)
with open(self._ospec.get_impl_filepath(), "w") as cppfile:
cppfile.write(impl)
from ..context import SfgContext
from ..visitors import CollectIncludes
def prepare_context(ctx: SfgContext):
"""Prepares a populated context for printing. Make sure to run this function on the
[SfgContext][pystencilssfg.SfgContext] before passing it to a printer.
Steps:
- Collection of includes: All defined functions and classes are traversed to collect all required
header includes
"""
# Collect all includes
required_includes = CollectIncludes().visit(ctx)
for incl in required_includes:
ctx.add_include(incl)
return ctx
from __future__ import annotations
from textwrap import indent
from itertools import chain, repeat, cycle
from pystencils.astnodes import KernelFunction
from pystencils import Backend
from pystencils.backends import generate_c
from ..context import SfgContext
from ..configuration import SfgOutputSpec
from ..visitors import visitor
from ..exceptions import SfgException
from ..source_components import (
SfgEmptyLines,
SfgHeaderInclude,
SfgKernelNamespace,
SfgFunction,
SfgClass,
SfgInClassDefinition,
SfgConstructor,
SfgMemberVariable,
SfgMethod,
SfgVisibility,
SfgVisibilityBlock
)
def interleave(*iters):
try:
for iter in cycle(iters):
yield next(iter)
except StopIteration:
pass
class SfgGeneralPrinter:
@visitor
def visit(self, obj: object) -> str:
raise SfgException(f"Can't print object of type {type(obj)}")
@visit.case(SfgEmptyLines)
def emptylines(self, el: SfgEmptyLines) -> str:
return "\n" * el.lines
@visit.case(str)
def string(self, s: str) -> str:
return s
@visit.case(SfgHeaderInclude)
def include(self, incl: SfgHeaderInclude) -> str:
if incl.system_header:
return f"#include <{incl.file}>"
else:
return f'#include "{incl.file}"'
def prelude(self, ctx: SfgContext) -> str:
if ctx.prelude_comment:
return (
"/*\n"
+ indent(ctx.prelude_comment, "* ", predicate=lambda _: True)
+ "*/\n"
)
else:
return ""
def param_list(self, func: SfgFunction) -> str:
params = sorted(list(func.parameters), key=lambda p: p.name)
return ", ".join(f"{param.dtype} {param.name}" for param in params)
class SfgHeaderPrinter(SfgGeneralPrinter):
def __init__(self, ctx: SfgContext, output_spec: SfgOutputSpec):
self._output_spec = output_spec
self._ctx = ctx
def get_code(self) -> str:
return self.visit(self._ctx)
@visitor
def visit(self, obj: object) -> str:
return super().visit(obj)
@visit.case(SfgContext)
def frame(self, ctx: SfgContext) -> str:
code = super().prelude(ctx)
code += "\n#pragma once\n\n"
includes = filter(lambda incl: not incl.private, ctx.includes())
code += "\n".join(self.visit(incl) for incl in includes)
code += "\n\n"
fq_namespace = ctx.fully_qualified_namespace
if fq_namespace is not None:
code += f"namespace {fq_namespace} {{\n\n"
parts = interleave(ctx.declarations_ordered(), repeat(SfgEmptyLines(1)))
code += "\n".join(self.visit(p) for p in parts)
if fq_namespace is not None:
code += f"}} // namespace {fq_namespace}\n"
return code
@visit.case(SfgFunction)
def function(self, func: SfgFunction):
params = sorted(list(func.parameters), key=lambda p: p.name)
param_list = ", ".join(f"{param.dtype} {param.name}" for param in params)
return f"{func.return_type} {func.name} ( {param_list} );"
@visit.case(SfgClass)
def sfg_class(self, cls: SfgClass):
code = f"{cls.class_keyword} {cls.class_name} \n"
if cls.base_classes:
code += f" : {','.join(cls.base_classes)}\n"
code += "{\n"
for block in cls.visibility_blocks():
code += self.visit(block) + "\n"
code += "};\n"
return code
@visit.case(SfgVisibilityBlock)
def vis_block(self, block: SfgVisibilityBlock) -> str:
code = ""
if block.visibility != SfgVisibility.DEFAULT:
code += f"{block.visibility}:\n"
code += self._ctx.codestyle.indent(
"\n".join(self.visit(m) for m in block.members())
)
return code
@visit.case(SfgInClassDefinition)
def sfg_inclassdef(self, definition: SfgInClassDefinition):
return definition.text
@visit.case(SfgConstructor)
def sfg_constructor(self, constr: SfgConstructor):
code = f"{constr.owning_class.class_name} ("
code += ", ".join(f"{param.dtype} {param.name}" for param in constr.parameters)
code += ")\n"
if constr.initializers:
code += " : " + ", ".join(constr.initializers) + "\n"
if constr.body:
code += "{\n" + self._ctx.codestyle.indent(constr.body) + "\n}\n"
else:
code += "{ }\n"
return code
@visit.case(SfgMemberVariable)
def sfg_member_var(self, var: SfgMemberVariable):
return f"{var.dtype} {var.name};"
@visit.case(SfgMethod)
def sfg_method(self, method: SfgMethod):
code = f"{method.return_type} {method.name} ({self.param_list(method)})"
code += "const" if method.const else ""
if method.inline:
code += (
" {\n"
+ self._ctx.codestyle.indent(method.tree.get_code(self._ctx))
+ "}\n"
)
else:
code += ";"
return code
def delimiter(content):
return f"""\
/*************************************************************************************
* {content}
*************************************************************************************/
"""
class SfgImplPrinter(SfgGeneralPrinter):
def __init__(self, ctx: SfgContext, output_spec: SfgOutputSpec):
self._output_spec = output_spec
self._ctx = ctx
def get_code(self) -> str:
return self.visit(self._ctx)
@visitor
def visit(self, obj: object) -> str:
return super().visit(obj)
@visit.case(SfgContext)
def frame(self, ctx: SfgContext) -> str:
code = super().prelude(ctx)
code += f'\n#include "{self._output_spec.get_header_filename()}"\n\n'
includes = filter(lambda incl: incl.private, ctx.includes())
code += "\n".join(self.visit(incl) for incl in includes)
code += "\n\n#define FUNC_PREFIX inline\n\n"
fq_namespace = ctx.fully_qualified_namespace
if fq_namespace is not None:
code += f"namespace {fq_namespace} {{\n\n"
parts = interleave(
chain(
[delimiter("Kernels")],
ctx.kernel_namespaces(),
[delimiter("Functions")],
ctx.functions(),
[delimiter("Class Methods")],
ctx.classes(),
),
repeat(SfgEmptyLines(1)),
)
code += "\n".join(self.visit(p) for p in parts)
if fq_namespace is not None:
code += f"}} // namespace {fq_namespace}\n"
return code
@visit.case(SfgKernelNamespace)
def kernel_namespace(self, kns: SfgKernelNamespace) -> str:
code = f"namespace {kns.name} {{\n\n"
code += "\n\n".join(self.visit(ast) for ast in kns.asts)
code += f"\n}} // namespace {kns.name}\n"
return code
@visit.case(KernelFunction)
def kernel(self, kfunc: KernelFunction) -> str:
return generate_c(kfunc, dialect=Backend.C)
@visit.case(SfgFunction)
def function(self, func: SfgFunction) -> str:
code = f"{func.return_type} {func.name} ({self.param_list(func)})"
code += (
"{\n" + self._ctx.codestyle.indent(func.tree.get_code(self._ctx)) + "}\n"
)
return code
@visit.case(SfgClass)
def sfg_class(self, cls: SfgClass) -> str:
methods = filter(lambda m: not m.inline, cls.methods())
return "\n".join(self.visit(m) for m in methods)
@visit.case(SfgMethod)
def sfg_method(self, method: SfgMethod) -> str:
const_qual = "const" if method.const else ""
code = f"{method.return_type} {method.owning_class.class_name}::{method.name}"
code += f"({self.param_list(method)}) {const_qual}"
code += (
" {\n" + self._ctx.codestyle.indent(method.tree.get_code(self._ctx)) + "}\n"
)
return code
class SfgException(Exception): class SfgException(Exception):
pass pass
from .sycl import SyclComposer
__all__ = ["SyclComposer"]
from __future__ import annotations
from typing import Sequence
from enum import Enum
import re
from pystencils.types import UserTypeSpec, PsType, PsCustomType, create_type
from pystencils import Target
from pystencilssfg.composer.basic_composer import SequencerArg
from ..config import CodeStyle
from ..exceptions import SfgException
from ..context import SfgContext
from ..composer import (
SfgBasicComposer,
SfgClassComposer,
SfgComposer,
SfgComposerMixIn,
make_sequence,
)
from ..ir import (
SfgKernelHandle,
SfgCallTreeNode,
SfgCallTreeLeaf,
SfgKernelCallNode,
)
from ..lang import SfgVar, AugExpr, cpptype, Ref, VarLike, _VarLike, asvar
from ..lang.cpp.sycl_accessor import SyclAccessor
accessor = SyclAccessor
class SyclComposerMixIn(SfgComposerMixIn):
"""Composer mix-in for SYCL code generation"""
def sycl_handler(self, name: str) -> SyclHandler:
"""Obtain a `SyclHandler`, which represents a ``sycl::handler`` object."""
return SyclHandler(self._ctx).var(name)
def sycl_group(self, dims: int, name: str) -> SyclGroup:
"""Obtain a `SyclHandler`, which represents a ``sycl::handler`` object."""
return SyclGroup(dims, self._ctx).var(name)
def sycl_range(self, dims: int, name: str, ref: bool = False) -> SyclRange:
return SyclRange(dims, ref=ref).var(name)
class SyclComposer(SfgBasicComposer, SfgClassComposer, SyclComposerMixIn):
"""Composer extension providing SYCL code generation capabilities"""
def __init__(self, sfg: SfgContext | SfgComposer):
super().__init__(sfg)
class SyclRange(AugExpr):
_template = cpptype("sycl::range< {dims} >", "<sycl/sycl.hpp>")
def __init__(self, dims: int, const: bool = False, ref: bool = False):
dtype = self._template(dims=dims, const=const, ref=ref)
super().__init__(dtype)
class SyclHandler(AugExpr):
"""Represents a SYCL command group handler (``sycl::handler``)."""
_type = cpptype("sycl::handler", "<sycl/sycl.hpp>")
def __init__(self, ctx: SfgContext):
dtype = Ref(self._type())
super().__init__(dtype)
self._ctx = ctx
def parallel_for(
self,
range: VarLike | Sequence[int],
):
"""Generate a ``parallel_for`` kernel invocation using this command group handler.
The syntax of this uses a chain of two calls to mimic C++ syntax:
.. code-block:: Python
sfg.parallel_for(range)(
# Body
)
The body is constructed via sequencing (see `make_sequence`).
Args:
range: Object, or tuple of integers, indicating the kernel's iteration range
"""
if isinstance(range, _VarLike):
range = asvar(range)
def check_kernel(khandle: SfgKernelHandle):
kfunc = khandle.kernel
if kfunc.target != Target.SYCL:
raise SfgException(
f"Kernel given to `parallel_for` is no SYCL kernel: {khandle.fqname}"
)
id_regex = re.compile(r"sycl::(id|item|nd_item)<\s*[0-9]\s*>")
def filter_id(param: SfgVar) -> bool:
return (
isinstance(param.dtype, PsCustomType)
and id_regex.search(param.dtype.c_string()) is not None
)
def sequencer(*args: SequencerArg):
id_param = []
for arg in args:
if isinstance(arg, SfgKernelCallNode):
check_kernel(arg._kernel_handle)
id_param.append(
list(filter(filter_id, arg._kernel_handle.scalar_parameters))[0]
)
if not all(item == id_param[0] for item in id_param):
raise ValueError(
"id_param should be the same for all kernels in parallel_for"
)
tree = make_sequence(*args)
kernel_lambda = SfgLambda(("=",), (id_param[0],), tree, None)
return SyclKernelInvoke(
self, SyclInvokeType.ParallelFor, range, kernel_lambda
)
return sequencer
class SyclGroup(AugExpr):
"""Represents a SYCL group (``sycl::group``)."""
_template = cpptype("sycl::group< {dims} >", "<sycl/sycl.hpp>")
def __init__(self, dimensions: int, ctx: SfgContext):
dtype = Ref(self._template(dims=dimensions))
super().__init__(dtype)
self._dimensions = dimensions
self._ctx = ctx
def parallel_for_work_item(
self, range: VarLike | Sequence[int], khandle: SfgKernelHandle
):
"""Generate a ``parallel_for_work_item` kernel invocation on this group.`
Args:
range: Object, or tuple of integers, indicating the kernel's iteration range
kernel: Handle to the pystencils-kernel to be executed
"""
if isinstance(range, _VarLike):
range = asvar(range)
kfunc = khandle.kernel
if kfunc.target != Target.SYCL:
raise SfgException(
f"Kernel given to `parallel_for` is no SYCL kernel: {khandle.fqname}"
)
id_regex = re.compile(r"sycl::id<\s*[0-9]\s*>")
def filter_id(param: SfgVar) -> bool:
return (
isinstance(param.dtype, PsCustomType)
and id_regex.search(param.dtype.c_string()) is not None
)
id_param = list(filter(filter_id, khandle.scalar_parameters))[0]
h_item = SfgVar("item", PsCustomType("sycl::h_item< 3 >"))
comp = SfgComposer(self._ctx)
tree = comp.seq(
comp.set_param(id_param, AugExpr.format("{}.get_local_id()", h_item)),
SfgKernelCallNode(khandle),
)
kernel_lambda = SfgLambda(("=",), (h_item,), tree, None)
invoke = SyclKernelInvoke(
self, SyclInvokeType.ParallelForWorkItem, range, kernel_lambda
)
return invoke
class SfgLambda:
"""Models a C++ lambda expression"""
def __init__(
self,
captures: Sequence[str],
params: Sequence[SfgVar],
tree: SfgCallTreeNode,
return_type: UserTypeSpec | None = None,
) -> None:
self._captures = tuple(captures)
self._params = tuple(params)
self._tree = tree
self._return_type: PsType | None = (
create_type(return_type) if return_type is not None else None
)
from ..ir.postprocessing import CallTreePostProcessing
postprocess = CallTreePostProcessing()
self._required_params = postprocess(self._tree).function_params - set(
self._params
)
@property
def captures(self) -> tuple[str, ...]:
return self._captures
@property
def parameters(self) -> tuple[SfgVar, ...]:
return self._params
@property
def body(self) -> SfgCallTreeNode:
return self._tree
@property
def return_type(self) -> PsType | None:
return self._return_type
@property
def required_parameters(self) -> set[SfgVar]:
return self._required_params
def get_code(self, cstyle: CodeStyle):
captures = ", ".join(self._captures)
params = ", ".join(f"{p.dtype.c_string()} {p.name}" for p in self._params)
body = self._tree.get_code(cstyle)
body = cstyle.indent(body)
rtype = (
f"-> {self._return_type.c_string()} "
if self._return_type is not None
else ""
)
return f"[{captures}] ({params}) {rtype}{{\n{body}\n}}"
class SyclInvokeType(Enum):
ParallelFor = ("parallel_for", SyclHandler)
ParallelForWorkItem = ("parallel_for_work_item", SyclGroup)
@property
def method(self) -> str:
return self.value[0]
@property
def invoker_class(self) -> type:
return self.value[1]
class SyclKernelInvoke(SfgCallTreeLeaf):
"""A SYCL kernel invocation on a given handler or group"""
def __init__(
self,
invoker: SyclHandler | SyclGroup,
invoke_type: SyclInvokeType,
range: SfgVar | Sequence[int],
lamb: SfgLambda,
):
if not isinstance(invoker, invoke_type.invoker_class):
raise SfgException(
f"Cannot invoke kernel via `{invoke_type.method}` on a {type(invoker)}"
)
super().__init__()
self._invoker = invoker
self._invoke_type = invoke_type
self._range: SfgVar | tuple[int, ...] = (
range if isinstance(range, SfgVar) else tuple(range)
)
self._lambda = lamb
self._required_params = set(invoker.depends | lamb.required_parameters)
if isinstance(range, SfgVar):
self._required_params.add(range)
@property
def invoker(self) -> SyclHandler | SyclGroup:
return self._invoker
@property
def range(self) -> SfgVar | tuple[int, ...]:
return self._range
@property
def kernel(self) -> SfgLambda:
return self._lambda
@property
def depends(self) -> set[SfgVar]:
return self._required_params
def get_code(self, cstyle: CodeStyle) -> str:
if isinstance(self._range, SfgVar):
range_code = self._range.name
else:
range_code = "{ " + ", ".join(str(r) for r in self._range) + " }"
kernel_code = self._lambda.get_code(cstyle)
invoker = str(self._invoker)
method = self._invoke_type.method
return f"{invoker}.{method}({range_code}, {kernel_code});"
# TODO from pathlib import Path
# mypy strict_optional=False
import sys from typing import Callable, Any
import os from .config import (
from os import path SfgConfig,
CommandLineParameters,
from .configuration import ( _GlobalNamespace,
SfgConfiguration,
config_from_commandline,
merge_configurations,
) )
from .context import SfgContext from .context import SfgContext
from .composer import SfgComposer
from .emission import SfgCodeEmitter
from .exceptions import SfgException
from .lang import HeaderFile
class SourceFileGenerator: class SourceFileGenerator:
"""Context manager that controls the code generation process in generator scripts.""" """Context manager that controls the code generation process in generator scripts.
The `SourceFileGenerator` must be used as a context manager by calling it within
a ``with`` statement in the top-level code of a generator script.
Upon entry to its context, it creates an `SfgComposer` which can be used to populate the generated files.
When the managed region finishes, the code files are generated and written to disk at the locations
defined by the configuration.
Existing copies of the target files are deleted on entry to the managed region,
and if an exception occurs within the managed region, no files are exported.
Args:
sfg_config: Inline configuration for the code generator
keep_unknown_argv: If `True`, any command line arguments given to the generator script
that the `SourceFileGenerator` does not understand are stored in
`sfg.context.argv <SfgContext.argv>`.
"""
def _scriptname(self) -> str:
import __main__
def __init__(self, sfg_config: SfgConfiguration | None = None): if not hasattr(__main__, "__file__"):
if sfg_config and not isinstance(sfg_config, SfgConfiguration): raise SfgException(
"Invalid execution environment: "
"It seems that you are trying to run the `SourceFileGenerator` in an environment "
"without a valid entry point, such as a REPL or a multiprocessing fork."
)
scriptpath = Path(__main__.__file__)
return scriptpath.name
def __init__(
self,
sfg_config: SfgConfig | None = None,
keep_unknown_argv: bool = False,
):
if sfg_config and not isinstance(sfg_config, SfgConfig):
raise TypeError("sfg_config is not an SfgConfiguration.") raise TypeError("sfg_config is not an SfgConfiguration.")
import __main__ scriptname = self._scriptname()
basename = scriptname.rsplit(".")[0]
from argparse import ArgumentParser
parser = ArgumentParser(
scriptname,
description="Generator script using pystencils-sfg",
allow_abbrev=False,
)
CommandLineParameters.add_args_to_parser(parser)
if keep_unknown_argv:
sfg_args, script_args = parser.parse_known_args()
else:
sfg_args = parser.parse_args()
script_args = []
cli_params = CommandLineParameters(sfg_args)
config = cli_params.get_config()
if sfg_config is not None:
cli_params.find_conflicts(sfg_config)
config.override(sfg_config)
self._header_only: bool = config.get_option("header_only")
self._output_dir: Path = config.get_option("output_directory")
output_files = config._get_output_files(basename)
scriptpath = __main__.__file__ from .ir import SfgSourceFile, SfgSourceFileType
scriptname = path.split(scriptpath)[1]
basename = path.splitext(scriptname)[0]
project_config, cmdline_config, script_args = config_from_commandline(sys.argv) self._header_file = SfgSourceFile(
output_files[0].name, SfgSourceFileType.HEADER
)
self._impl_file: SfgSourceFile | None
if self._header_only:
self._impl_file = None
else:
self._impl_file = SfgSourceFile(
output_files[1].name, SfgSourceFileType.TRANSLATION_UNIT
)
self._impl_file.includes.append(HeaderFile.parse(self._header_file.name))
# TODO: Find a way to not hard-code the restrict qualifier in pystencils
self._header_file.elements.append("#define RESTRICT __restrict__")
config = merge_configurations(project_config, cmdline_config, sfg_config) outer_namespace: str | _GlobalNamespace = config.get_option("outer_namespace")
assert config.codestyle is not None namespace: str | None
if isinstance(outer_namespace, _GlobalNamespace):
namespace = None
else:
namespace = outer_namespace
self._context = SfgContext( self._context = SfgContext(
config.outer_namespace, self._header_file,
self._impl_file,
namespace,
config.codestyle, config.codestyle,
config.clang_format,
argv=script_args, argv=script_args,
project_info=config.project_info, project_info=cli_params.get_project_info(),
) )
from .emission import HeaderImplPairEmitter sort_key = config.codestyle.get_option("includes_sorting_key")
if sort_key is None:
self._emitter = HeaderImplPairEmitter(config.get_output_spec(basename)) def default_key(h: HeaderFile):
return str(h)
sort_key = default_key
self._include_sort_key: Callable[[HeaderFile], Any] = sort_key
def clean_files(self): def clean_files(self):
for file in self._emitter.output_files: header_path = self._output_dir / self._header_file.name
if path.exists(file): if header_path.exists():
os.remove(file) header_path.unlink()
if self._impl_file is not None:
impl_path = self._output_dir / self._impl_file.name
if impl_path.exists():
impl_path.unlink()
def __enter__(self) -> SfgContext: def _finish_files(self) -> None:
from .ir import collect_includes
header_includes = collect_includes(self._header_file)
self._header_file.includes = list(
set(self._header_file.includes) | header_includes
)
self._header_file.includes.sort(key=self._include_sort_key)
if self._impl_file is not None:
impl_includes = collect_includes(self._impl_file)
# If some header is already included by the generated header file, do not duplicate that inclusion
impl_includes -= header_includes
self._impl_file.includes = list(
set(self._impl_file.includes) | impl_includes
)
self._impl_file.includes.sort(key=self._include_sort_key)
def _get_emitter(self):
return SfgCodeEmitter(
self._output_dir,
self._context.codestyle,
self._context.clang_format,
)
def __enter__(self) -> SfgComposer:
self.clean_files() self.clean_files()
return self._context return SfgComposer(self._context)
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None: if exc_type is None:
self._emitter.write_files(self._context) self._finish_files()
emitter = self._get_emitter()
emitter.emit(self._header_file)
if self._impl_file is not None:
emitter.emit(self._impl_file)
from .call_tree import (
SfgCallTreeNode,
SfgCallTreeLeaf,
SfgEmptyNode,
SfgKernelCallNode,
SfgGpuKernelInvocation,
SfgBlock,
SfgSequence,
SfgStatements,
SfgFunctionParams,
SfgRequireIncludes,
SfgBranch,
SfgSwitchCase,
SfgSwitch,
)
from .entities import (
SfgCodeEntity,
SfgNamespace,
SfgGlobalNamespace,
SfgKernelNamespace,
SfgKernelHandle,
SfgFunction,
SfgVisibility,
SfgClassKeyword,
SfgClassMember,
SfgMemberVariable,
SfgMethod,
SfgConstructor,
SfgClass,
)
from .syntax import (
SfgEntityDecl,
SfgEntityDef,
SfgVisibilityBlock,
SfgNamespaceBlock,
SfgClassBody,
SfgSourceFileType,
SfgSourceFile,
)
from .analysis import collect_includes
__all__ = [
"SfgCallTreeNode",
"SfgCallTreeLeaf",
"SfgEmptyNode",
"SfgKernelCallNode",
"SfgGpuKernelInvocation",
"SfgSequence",
"SfgBlock",
"SfgStatements",
"SfgFunctionParams",
"SfgRequireIncludes",
"SfgBranch",
"SfgSwitchCase",
"SfgSwitch",
"SfgCodeEntity",
"SfgNamespace",
"SfgGlobalNamespace",
"SfgKernelNamespace",
"SfgKernelHandle",
"SfgFunction",
"SfgVisibility",
"SfgClassKeyword",
"SfgClassMember",
"SfgMemberVariable",
"SfgMethod",
"SfgConstructor",
"SfgClass",
"SfgEntityDecl",
"SfgEntityDef",
"SfgVisibilityBlock",
"SfgNamespaceBlock",
"SfgClassBody",
"SfgSourceFileType",
"SfgSourceFile",
"collect_includes",
]
from __future__ import annotations
from ..lang import HeaderFile, includes
from .syntax import (
SfgSourceFile,
SfgNamespaceElement,
SfgClassBodyElement,
SfgNamespaceBlock,
SfgEntityDecl,
SfgEntityDef,
SfgClassBody,
SfgVisibilityBlock,
)
def collect_includes(file: SfgSourceFile) -> set[HeaderFile]:
from .call_tree import SfgCallTreeNode
from .entities import (
SfgCodeEntity,
SfgKernelHandle,
SfgFunction,
SfgMethod,
SfgClassMember,
SfgConstructor,
SfgMemberVariable,
)
def visit_decl(entity: SfgCodeEntity | SfgClassMember) -> set[HeaderFile]:
match entity:
case (
SfgKernelHandle(_, parameters)
| SfgFunction(_, _, parameters)
| SfgMethod(_, _, parameters)
| SfgConstructor(_, parameters, _, _)
):
incls: set[HeaderFile] = set().union(*(includes(p) for p in parameters))
if isinstance(entity, (SfgFunction, SfgMethod)):
incls |= includes(entity.return_type)
return incls
case SfgMemberVariable():
return includes(entity)
case _:
assert False, "unexpected entity"
def walk_syntax(
obj: (
SfgNamespaceElement
| SfgClassBodyElement
| SfgVisibilityBlock
| SfgCallTreeNode
),
) -> set[HeaderFile]:
match obj:
case str():
return set()
case SfgCallTreeNode():
return obj.required_includes.union(
*(walk_syntax(child) for child in obj.children),
)
case SfgEntityDecl(entity):
return visit_decl(entity)
case SfgEntityDef(entity):
match entity:
case SfgKernelHandle(kernel, _):
return (
set(HeaderFile.parse(h) for h in kernel.required_headers)
| {HeaderFile.parse("<cstdint>")}
| visit_decl(entity)
)
case SfgFunction(_, tree, _) | SfgMethod(_, tree, _):
return walk_syntax(tree) | visit_decl(entity)
case SfgConstructor():
return visit_decl(entity)
case SfgMemberVariable():
return includes(entity)
case _:
assert False, "unexpected entity"
case SfgNamespaceBlock(_, elements) | SfgVisibilityBlock(_, elements):
return set().union(*(walk_syntax(elem) for elem in elements)) # type: ignore
case SfgClassBody(_, vblocks):
return set().union(*(walk_syntax(vb) for vb in vblocks))
case _:
assert False, "unexpected syntax element"
return set().union(*(walk_syntax(elem) for elem in file.elements))
from __future__ import annotations
from typing import TYPE_CHECKING, Sequence, Iterable, NewType
from abc import ABC, abstractmethod
from .entities import SfgKernelHandle
from ..lang import SfgVar, HeaderFile
if TYPE_CHECKING:
from ..config import CodeStyle
class SfgCallTreeNode(ABC):
"""Base class for all nodes comprising SFG call trees.
## Code Printing
For extensibility, code printing is implemented inside the call tree.
Therefore, every instantiable call tree node must implement the method `get_code`.
By convention, the string returned by `get_code` should not contain a trailing newline.
"""
def __init__(self) -> None:
self._includes: set[HeaderFile] = set()
@property
@abstractmethod
def children(self) -> Sequence[SfgCallTreeNode]:
"""This node's children"""
@abstractmethod
def get_code(self, cstyle: CodeStyle) -> str:
"""Returns the code of this node.
By convention, the code block emitted by this function should not contain a trailing newline.
"""
@property
def depends(self) -> set[SfgVar]:
"""Set of objects this leaf depends on"""
return set()
@property
def required_includes(self) -> set[HeaderFile]:
"""Return a set of header includes required by this node"""
return self._includes
class SfgCallTreeLeaf(SfgCallTreeNode, ABC):
"""A leaf node of the call tree.
Leaf nodes must implement ``depends`` for automatic parameter collection.
"""
def __init__(self):
super().__init__()
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return ()
class SfgEmptyNode(SfgCallTreeLeaf):
"""A leaf node that does not emit any code.
Empty nodes must still implement ``depends``.
"""
def __init__(self):
super().__init__()
def get_code(self, cstyle: CodeStyle) -> str:
return ""
class SfgStatements(SfgCallTreeLeaf):
"""Represents (a sequence of) statements in the source language.
This class groups together arbitrary code strings
(e.g. sequences of C++ statements, cf. https://en.cppreference.com/w/cpp/language/statements),
and annotates them with the set of symbols read and written by these statements.
It is the user's responsibility to ensure that the code string is valid code in the output language,
and that the lists of required and defined objects are correct and complete.
Args:
code_string: Code to be printed out.
defined_params: Variables that will be newly defined and visible to code in sequence after these statements.
required_params: Variables that are required as input to these statements.
"""
def __init__(
self,
code_string: str,
defines: Iterable[SfgVar],
depends: Iterable[SfgVar],
includes: Iterable[HeaderFile] = (),
):
super().__init__()
self._code_string = code_string
self._defines = set(defines)
self._depends = set(depends)
self._includes = set(includes)
@property
def depends(self) -> set[SfgVar]:
return self._depends
@property
def defines(self) -> set[SfgVar]:
return self._defines
@property
def code_string(self) -> str:
return self._code_string
def get_code(self, cstyle: CodeStyle) -> str:
return self._code_string
class SfgFunctionParams(SfgEmptyNode):
def __init__(self, parameters: Sequence[SfgVar]):
super().__init__()
self._params = set(parameters)
@property
def depends(self) -> set[SfgVar]:
return self._params
class SfgRequireIncludes(SfgEmptyNode):
def __init__(self, includes: Iterable[HeaderFile]):
super().__init__()
self._includes = set(includes)
@property
def depends(self) -> set[SfgVar]:
return set()
class SfgSequence(SfgCallTreeNode):
__match_args__ = ("children",)
def __init__(self, children: Sequence[SfgCallTreeNode]):
super().__init__()
self._children = list(children)
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return self._children
@children.setter
def children(self, cs: Sequence[SfgCallTreeNode]):
self._children = list(cs)
def __getitem__(self, idx: int) -> SfgCallTreeNode:
return self._children[idx]
def __setitem__(self, idx: int, c: SfgCallTreeNode):
self._children[idx] = c
def get_code(self, cstyle: CodeStyle) -> str:
return "\n".join(c.get_code(cstyle) for c in self._children)
class SfgBlock(SfgCallTreeNode):
def __init__(self, seq: SfgSequence):
super().__init__()
self._seq = seq
@property
def sequence(self) -> SfgSequence:
return self._seq
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return (self._seq,)
def get_code(self, cstyle: CodeStyle) -> str:
seq_code = cstyle.indent(self._seq.get_code(cstyle))
return "{\n" + seq_code + "\n}"
# class SfgForLoop(SfgCallTreeNode):
# def __init__(self, control_line: SfgStatements, body: SfgCallTreeNode):
# super().__init__(control_line, body)
# @property
# def body(self) -> SfgStatements:
# return cast(SfgStatements)
class SfgKernelCallNode(SfgCallTreeLeaf):
def __init__(self, kernel_handle: SfgKernelHandle):
super().__init__()
self._kernel_handle = kernel_handle
@property
def depends(self) -> set[SfgVar]:
return set(self._kernel_handle.parameters)
def get_code(self, cstyle: CodeStyle) -> str:
kparams = self._kernel_handle.parameters
fnc_name = self._kernel_handle.fqname
call_parameters = ", ".join([p.name for p in kparams])
return f"{fnc_name}({call_parameters});"
class SfgGpuKernelInvocation(SfgCallTreeNode):
"""A CUDA or HIP kernel invocation.
See https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#execution-configuration
or https://rocmdocs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html#calling-global-functions
for the syntax.
"""
def __init__(
self,
kernel_handle: SfgKernelHandle,
grid_size: SfgStatements,
block_size: SfgStatements,
shared_memory_bytes: SfgStatements | None,
stream: SfgStatements | None,
):
from pystencils.codegen import GpuKernel
kernel = kernel_handle.kernel
if not isinstance(kernel, GpuKernel):
raise ValueError(
"An `SfgGpuKernelInvocation` node can only call GPU kernels."
)
super().__init__()
self._kernel_handle = kernel_handle
self._grid_size = grid_size
self._block_size = block_size
self._shared_memory_bytes = shared_memory_bytes
self._stream = stream
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return (
(
self._grid_size,
self._block_size,
)
+ (
(self._shared_memory_bytes,)
if self._shared_memory_bytes is not None
else ()
)
+ ((self._stream,) if self._stream is not None else ())
)
@property
def depends(self) -> set[SfgVar]:
return set(self._kernel_handle.parameters)
def get_code(self, cstyle: CodeStyle) -> str:
kparams = self._kernel_handle.parameters
fnc_name = self._kernel_handle.fqname
call_parameters = ", ".join([p.name for p in kparams])
grid_args = [self._grid_size, self._block_size]
if self._shared_memory_bytes is not None:
grid_args += [self._shared_memory_bytes]
if self._stream is not None:
grid_args += [self._stream]
grid = "<<< " + ", ".join(arg.get_code(cstyle) for arg in grid_args) + " >>>"
return f"{fnc_name}{grid}({call_parameters});"
class SfgBranch(SfgCallTreeNode):
def __init__(
self,
cond: SfgStatements,
branch_true: SfgSequence,
branch_false: SfgSequence | None = None,
):
super().__init__()
self._cond = cond
self._branch_true = branch_true
self._branch_false = branch_false
@property
def condition(self) -> SfgStatements:
return self._cond
@property
def branch_true(self) -> SfgSequence:
return self._branch_true
@property
def branch_false(self) -> SfgSequence | None:
return self._branch_false
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return (
self._cond,
self._branch_true,
) + ((self.branch_false,) if self.branch_false is not None else ())
def get_code(self, cstyle: CodeStyle) -> str:
code = f"if({self.condition.get_code(cstyle)}) {{\n"
code += cstyle.indent(self.branch_true.get_code(cstyle))
code += "\n}"
if self.branch_false is not None:
code += "else {\n"
code += cstyle.indent(self.branch_false.get_code(cstyle))
code += "\n}"
return code
class SfgSwitchCase(SfgCallTreeNode):
DefaultCaseType = NewType("DefaultCaseType", object)
"""Sentinel type representing the ``default`` case."""
Default = DefaultCaseType(object())
def __init__(self, label: str | SfgSwitchCase.DefaultCaseType, body: SfgSequence):
super().__init__()
self._label = label
self._body = body
@property
def label(self) -> str | DefaultCaseType:
return self._label
@property
def body(self) -> SfgSequence:
return self._body
@property
def children(self) -> Sequence[SfgCallTreeNode]:
return (self._body,)
@property
def is_default(self) -> bool:
return self._label == SfgSwitchCase.Default
def get_code(self, cstyle: CodeStyle) -> str:
code = ""
if self._label == SfgSwitchCase.Default:
code += "default: {\n"
else:
code += f"case {self._label}: {{\n"
code += cstyle.indent(self.body.get_code(cstyle))
code += "\n}"
return code
class SfgSwitch(SfgCallTreeNode):
def __init__(
self,
switch_arg: SfgStatements,
cases_dict: dict[str, SfgSequence],
default: SfgSequence | None = None,
):
super().__init__()
self._cases = [SfgSwitchCase(label, body) for label, body in cases_dict.items()]
if default is not None:
# invariant: the default case is always the last child
self._cases += [SfgSwitchCase(SfgSwitchCase.Default, default)]
self._switch_arg = switch_arg
self._default = (
SfgSwitchCase(SfgSwitchCase.Default, default)
if default is not None
else None
)
@property
def switch_arg(self) -> str | SfgStatements:
return self._switch_arg
@property
def default(self) -> SfgCallTreeNode | None:
return self._default
@property
def children(self) -> tuple[SfgCallTreeNode, ...]:
return (self._switch_arg,) + tuple(self._cases)
@property
def cases(self) -> tuple[SfgCallTreeNode, ...]:
if self._default is not None:
return tuple(self._cases[:-1])
else:
return tuple(self._cases)
@cases.setter
def cases(self, cs: Sequence[SfgSwitchCase]) -> None:
if len(cs) != len(self._cases):
raise ValueError("The number of child nodes must remain the same!")
self._default = None
for i, c in enumerate(cs):
if c.is_default:
if i != len(cs) - 1:
raise ValueError("Default case must be listed last.")
else:
self._default = c
self._children = list(cs)
def set_case(self, idx: int, c: SfgSwitchCase):
if c.is_default:
if idx != len(self._children) - 1:
raise ValueError("Default case must be the last child.")
elif self._default is None:
raise ValueError("Cannot replace normal case with default case.")
else:
self._default = c
self._children[-1] = c
else:
self._children[idx] = c
def get_code(self, cstyle: CodeStyle) -> str:
code = f"switch({self._switch_arg.get_code(cstyle)}) {{\n"
code += "\n".join(c.get_code(cstyle) for c in self._cases)
code += "}"
return code
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from abc import ABC from abc import ABC
from enum import Enum, auto from enum import Enum, auto
from typing import TYPE_CHECKING, Sequence, Generator from typing import (
from dataclasses import replace TYPE_CHECKING,
Sequence,
Generator,
)
from itertools import chain from itertools import chain
from pystencils import CreateKernelConfig, create_kernel from pystencils import Field
from pystencils.astnodes import KernelFunction from pystencils.codegen import Kernel
from pystencils.types import PsType, PsCustomType
from .types import SrcType from ..lang import SfgVar, SfgKernelParamVar, void, ExprLike
from .source_concepts import SrcObject from ..exceptions import SfgException
from .exceptions import SfgException
if TYPE_CHECKING: if TYPE_CHECKING:
from .tree import SfgCallTreeNode from . import SfgCallTreeNode
from .context import SfgContext
class SfgEmptyLines: # =========================================================================================================
def __init__(self, lines: int): #
self._lines = lines # SEMANTICAL ENTITIES
#
# These classes model *code entities*, which represent *semantic components* of the generated files.
#
# =========================================================================================================
@property
def lines(self) -> int:
return self._lines
class SfgCodeEntity:
"""Base class for code entities.
class SfgHeaderInclude: Each code entity has a name and an optional enclosing namespace.
def __init__( """
self, header_file: str, system_header: bool = False, private: bool = False
): def __init__(self, name: str, parent_namespace: SfgNamespace) -> None:
self._header_file = header_file self._name = name
self._system_header = system_header self._namespace: SfgNamespace = parent_namespace
self._private = private
@property @property
def file(self) -> str: def name(self) -> str:
return self._header_file """Name of this entity"""
return self._name
@property @property
def system_header(self): def fqname(self) -> str:
return self._system_header """Fully qualified name of this entity"""
if not isinstance(self._namespace, SfgGlobalNamespace):
return self._namespace.fqname + "::" + self._name
else:
return self._name
@property @property
def private(self): def parent_namespace(self) -> SfgNamespace | None:
return self._private """Parent namespace of this entity"""
return self._namespace
def __hash__(self) -> int:
return hash((self._header_file, self._system_header, self._private))
def __eq__(self, other: object) -> bool:
return (
isinstance(other, SfgHeaderInclude)
and self._header_file == other._header_file
and self._system_header == other._system_header
and self._private == other._private
)
class SfgKernelNamespace: class SfgNamespace(SfgCodeEntity):
def __init__(self, ctx, name: str): """A C++ namespace.
self._ctx = ctx
self._name = name
self._asts: dict[str, KernelFunction] = dict()
@property Each namespace has a name and a parent; its fully qualified name is given as
def name(self): ``<parent.name>::<name>``.
return self._name
@property Args:
def asts(self): name: Local name of this namespace
yield from self._asts.values() parent: Parent namespace enclosing this namespace
"""
def add(self, ast: KernelFunction, name: str | None = None):
"""Adds an existing pystencils AST to this namespace. def __init__(self, name: str, parent_namespace: SfgNamespace) -> None:
If a name is specified, the AST's function name is changed.""" super().__init__(name, parent_namespace)
if name is not None:
astname = name
else:
astname = ast.function_name
if astname in self._asts: self._entities: dict[str, SfgCodeEntity] = dict()
def get_entity(self, qual_name: str) -> SfgCodeEntity | None:
"""Find an entity with the given qualified name within this namespace.
If ``qual_name`` contains any qualifying delimiters ``::``,
each component but the last is interpreted as a namespace.
"""
tokens = qual_name.split("::", 1)
match tokens:
case [entity_name]:
return self._entities.get(entity_name, None)
case [nspace, remaining_qualname]:
sub_nspace = self._entities.get(nspace, None)
if sub_nspace is not None:
if not isinstance(sub_nspace, SfgNamespace):
raise KeyError(
f"Unable to find entity {qual_name} in namespace {self._name}: "
f"Entity {nspace} is not a namespace."
)
return sub_nspace.get_entity(remaining_qualname)
else:
return None
case _:
assert False, "unreachable code"
def add_entity(self, entity: SfgCodeEntity):
if entity.name in self._entities:
raise ValueError( raise ValueError(
f"Duplicate ASTs: An AST with name {astname} already exists in namespace {self._name}" f"Another entity with the name {entity.fqname} already exists"
) )
self._entities[entity.name] = entity
if name is not None: def get_child_namespace(self, qual_name: str):
ast.function_name = name if not qual_name:
raise ValueError("Anonymous namespaces are not supported")
self._asts[astname] = ast # Find the namespace by qualified lookup ...
namespace = self.get_entity(qual_name)
if namespace is not None:
if not type(namespace) is SfgNamespace:
raise ValueError(f"Entity {qual_name} exists, but is not a namespace")
else:
# ... or create it
tokens = qual_name.split("::")
namespace = self
for tok in tokens:
namespace = SfgNamespace(tok, namespace)
return SfgKernelHandle(self._ctx, astname, self, ast.get_parameters()) return namespace
def create(
self,
assignments,
name: str | None = None,
config: CreateKernelConfig | None = None,
):
"""Creates a new pystencils kernel from a list of assignments and a configuration.
This is a wrapper around
[`pystencils.create_kernel`](
https://pycodegen.pages.i10git.cs.fau.de/pystencils/
sphinx/kernel_compile_and_call.html#pystencils.create_kernel
)
with a subsequent call to [`add`][pystencilssfg.source_components.SfgKernelNamespace.add].
"""
if config is None:
config = CreateKernelConfig()
if name is not None: class SfgGlobalNamespace(SfgNamespace):
if name in self._asts: """The C++ global namespace."""
raise ValueError(
f"Duplicate ASTs: An AST with name {name} already exists in namespace {self._name}"
)
config = replace(config, function_name=name)
# type: ignore def __init__(self) -> None:
ast = create_kernel(assignments, config=config) super().__init__("", self)
return self.add(ast)
@property
def fqname(self) -> str:
return ""
class SfgKernelHandle(SfgCodeEntity):
"""Handle to a pystencils kernel."""
__match_args__ = ("kernel", "parameters")
class SfgKernelHandle:
def __init__( def __init__(
self, self,
ctx: SfgContext,
name: str, name: str,
namespace: SfgKernelNamespace, namespace: SfgKernelNamespace,
parameters: Sequence[KernelFunction.Parameter], kernel: Kernel,
inline: bool = False,
): ):
self._ctx = ctx super().__init__(name, namespace)
self._name = name
self._namespace = namespace
self._parameters = parameters
self._scalar_params = set() self._kernel = kernel
self._fields = set() self._parameters = [SfgKernelParamVar(p) for p in kernel.parameters]
for param in self._parameters: self._inline: bool = inline
if param.is_field_parameter:
self._fields |= set(param.fields)
else:
self._scalar_params.add(param.symbol)
@property self._scalar_params: set[SfgVar] = set()
def kernel_name(self): self._fields: set[Field] = set()
return self._name
@property
def kernel_namespace(self):
return self._namespace
@property for param in self._parameters:
def fully_qualified_name(self): if param.wrapped.is_field_parameter:
match self._ctx.fully_qualified_namespace: self._fields |= set(param.wrapped.fields)
case None: else:
return f"{self.kernel_namespace.name}::{self.kernel_name}" self._scalar_params.add(param)
case fqn:
return f"{fqn}::{self.kernel_namespace.name}::{self.kernel_name}"
@property @property
def parameters(self): def parameters(self) -> Sequence[SfgKernelParamVar]:
"""Parameters to this kernel"""
return self._parameters return self._parameters
@property @property
def scalar_parameters(self): def scalar_parameters(self) -> set[SfgVar]:
"""Scalar parameters to this kernel"""
return self._scalar_params return self._scalar_params
@property @property
def fields(self): def fields(self):
return self.fields """Fields accessed by this kernel"""
return self._fields
@property
def kernel(self) -> Kernel:
"""Underlying pystencils kernel object"""
return self._kernel
@property
def inline(self) -> bool:
return self._inline
class SfgFunction:
def __init__(
self, name: str, tree: SfgCallTreeNode, return_type: SrcType = SrcType("void")
):
self._name = name
self._tree = tree
self._return_type = return_type
from .visitors.tree_visitors import ExpandingParameterCollector class SfgKernelNamespace(SfgNamespace):
"""A namespace grouping together a number of kernels."""
param_collector = ExpandingParameterCollector() def __init__(self, name: str, parent: SfgNamespace):
self._parameters = param_collector.visit(self._tree) super().__init__(name, parent)
self._kernels: dict[str, SfgKernelHandle] = dict()
@property @property
def name(self): def name(self):
return self._name return self._name
@property @property
def parameters(self): def kernels(self) -> tuple[SfgKernelHandle, ...]:
return self._parameters return tuple(self._kernels.values())
@property def find_kernel(self, name: str) -> SfgKernelHandle | None:
def tree(self): return self._kernels.get(name, None)
return self._tree
@property def add_kernel(self, kernel: SfgKernelHandle):
def return_type(self) -> SrcType: if kernel.name in self._kernels:
return self._return_type raise ValueError(
f"Duplicate kernels: A kernel called {kernel.name} already exists "
f"in namespace {self.fqname}"
)
self._kernels[kernel.name] = kernel
@dataclass(frozen=True, match_args=False)
class CommonFunctionProperties:
tree: SfgCallTreeNode
parameters: tuple[SfgVar, ...]
return_type: PsType
inline: bool
constexpr: bool
attributes: Sequence[str]
@staticmethod
def collect_params(tree: SfgCallTreeNode, required_params: Sequence[SfgVar] | None):
from .postprocessing import CallTreePostProcessing
param_collector = CallTreePostProcessing()
params_set = param_collector(tree).function_params
if required_params is not None:
if not (params_set <= set(required_params)):
extras = params_set - set(required_params)
raise SfgException(
"Extraenous function parameters: "
f"Found free variables {extras} that were not listed in manually specified function parameters."
)
parameters = tuple(required_params)
else:
parameters = tuple(sorted(params_set, key=lambda p: p.name))
return parameters
class SfgFunction(SfgCodeEntity, CommonFunctionProperties):
"""A free function."""
__match_args__ = ("name", "tree", "parameters", "return_type") # type: ignore
def __init__(
self,
name: str,
namespace: SfgNamespace,
tree: SfgCallTreeNode,
return_type: PsType = void,
inline: bool = False,
constexpr: bool = False,
attributes: Sequence[str] = (),
required_params: Sequence[SfgVar] | None = None,
):
super().__init__(name, namespace)
parameters = self.collect_params(tree, required_params)
CommonFunctionProperties.__init__(
self,
tree,
parameters,
return_type,
inline,
constexpr,
attributes,
)
class SfgVisibility(Enum): class SfgVisibility(Enum):
"""Visibility qualifiers of C++"""
DEFAULT = auto() DEFAULT = auto()
PRIVATE = auto() PRIVATE = auto()
PROTECTED = auto() PROTECTED = auto()
...@@ -222,6 +298,8 @@ class SfgVisibility(Enum): ...@@ -222,6 +298,8 @@ class SfgVisibility(Enum):
class SfgClassKeyword(Enum): class SfgClassKeyword(Enum):
"""Class keywords of C++"""
STRUCT = auto() STRUCT = auto()
CLASS = auto() CLASS = auto()
...@@ -234,8 +312,10 @@ class SfgClassKeyword(Enum): ...@@ -234,8 +312,10 @@ class SfgClassKeyword(Enum):
class SfgClassMember(ABC): class SfgClassMember(ABC):
def __init__(self) -> None: """Base class for class member entities"""
self._cls: SfgClass | None = None
def __init__(self, cls: SfgClass) -> None:
self._cls: SfgClass = cls
self._visibility: SfgVisibility | None = None self._visibility: SfgVisibility | None = None
@property @property
...@@ -247,119 +327,115 @@ class SfgClassMember(ABC): ...@@ -247,119 +327,115 @@ class SfgClassMember(ABC):
@property @property
def visibility(self) -> SfgVisibility: def visibility(self) -> SfgVisibility:
if self._visibility is None: if self._visibility is None:
raise SfgException(f"{self} is not bound to a class and therefore has no visibility.")
return self._visibility
@property
def is_bound(self) -> bool:
return self._cls is not None
def _bind(self, cls: SfgClass, vis: SfgVisibility):
if self.is_bound:
raise SfgException(
f"Binding {self} to class {cls.class_name} failed: "
f"{self} was already bound to {self.owning_class.class_name}"
)
self._cls = cls
self._vis = vis
class SfgVisibilityBlock:
def __init__(self, visibility: SfgVisibility) -> None:
self._vis = visibility
self._members: list[SfgClassMember] = []
self._cls: SfgClass | None = None
@property
def visibility(self) -> SfgVisibility:
return self._vis
def append_member(self, member: SfgClassMember):
if self._cls is not None:
self._cls._add_member(member, self._vis)
self._members.append(member)
def members(self) -> Generator[SfgClassMember, None, None]:
yield from self._members
@property
def is_bound(self) -> bool:
return self._cls is not None
def _bind(self, cls: SfgClass):
if self._cls is not None:
raise SfgException( raise SfgException(
f"Binding visibility block to class {cls.class_name} failed: " f"{self} is not bound to a class and therefore has no visibility."
f"was already bound to {self._cls.class_name}"
) )
self._cls = cls return self._visibility
class SfgInClassDefinition(SfgClassMember):
def __init__(self, text: str):
SfgClassMember.__init__(self)
self._text = text
@property
def text(self) -> str:
return self._text
def __str__(self) -> str:
return self._text
class SfgMemberVariable(SfgVar, SfgClassMember):
"""Variable that is a field of a class"""
class SfgMemberVariable(SrcObject, SfgClassMember):
def __init__( def __init__(
self, self,
name: str, name: str,
dtype: SrcType dtype: PsType,
cls: SfgClass,
default_init: tuple[ExprLike, ...] | None = None,
): ):
SrcObject.__init__(self, name, dtype) SfgVar.__init__(self, name, dtype)
SfgClassMember.__init__(self) SfgClassMember.__init__(self, cls)
self._default_init = default_init
@property
def default_init(self) -> tuple[ExprLike, ...] | None:
return self._default_init
class SfgMethod(SfgClassMember, CommonFunctionProperties):
"""Instance method of a class"""
__match_args__ = ("name", "tree", "parameters", "return_type") # type: ignore
class SfgMethod(SfgFunction, SfgClassMember):
def __init__( def __init__(
self, self,
name: str, name: str,
cls: SfgClass,
tree: SfgCallTreeNode, tree: SfgCallTreeNode,
return_type: SrcType = SrcType("void"), return_type: PsType = void,
inline: bool = False, inline: bool = False,
const: bool = False, const: bool = False,
static: bool = False,
constexpr: bool = False,
virtual: bool = False,
override: bool = False,
attributes: Sequence[str] = (),
required_params: Sequence[SfgVar] | None = None,
): ):
SfgFunction.__init__(self, name, tree, return_type=return_type) super().__init__(cls)
SfgClassMember.__init__(self)
self._inline = inline self._name = name
self._static = static
self._const = const self._const = const
self._virtual = virtual
self._override = override
parameters = self.collect_params(tree, required_params)
CommonFunctionProperties.__init__(
self,
tree,
parameters,
return_type,
inline,
constexpr,
attributes,
)
@property @property
def inline(self) -> bool: def name(self) -> str:
return self._inline return self._name
@property
def static(self) -> bool:
return self._static
@property @property
def const(self) -> bool: def const(self) -> bool:
return self._const return self._const
@property
def virtual(self) -> bool:
return self._virtual
@property
def override(self) -> bool:
return self._override
class SfgConstructor(SfgClassMember): class SfgConstructor(SfgClassMember):
"""Constructor of a class"""
__match_args__ = ("owning_class", "parameters", "initializers", "body")
def __init__( def __init__(
self, self,
parameters: Sequence[SrcObject] = (), cls: SfgClass,
initializers: Sequence[str] = (), parameters: Sequence[SfgVar] = (),
body: str = "" initializers: Sequence[tuple[SfgVar | str, tuple[ExprLike, ...]]] = (),
body: str = "",
): ):
SfgClassMember.__init__(self) super().__init__(cls)
self._parameters = tuple(parameters) self._parameters = tuple(parameters)
self._initializers = tuple(initializers) self._initializers = tuple(initializers)
self._body = body self._body = body
@property @property
def parameters(self) -> tuple[SrcObject, ...]: def parameters(self) -> tuple[SfgVar, ...]:
return self._parameters return self._parameters
@property @property
def initializers(self) -> tuple[str, ...]: def initializers(self) -> tuple[tuple[SfgVar | str, tuple[ExprLike, ...]], ...]:
return self._initializers return self._initializers
@property @property
...@@ -367,51 +443,34 @@ class SfgConstructor(SfgClassMember): ...@@ -367,51 +443,34 @@ class SfgConstructor(SfgClassMember):
return self._body return self._body
class SfgClass: class SfgClass(SfgCodeEntity):
"""Models a C++ class. """A C++ class."""
### Adding members to classes __match_args__ = ("class_keyword", "name")
Members are never added directly to a class. Instead, they are added to
an [SfgVisibilityBlock][pystencilssfg.source_components.SfgVisibilityBlock]
which defines their syntactic position and visibility modifier in the code.
At the top of every class, there is a default visibility block
accessible through the `default` property.
To add members with custom visibility, create a new SfgVisibilityBlock,
add members to the block, and add the block using `append_visibility_block`.
A more succinct interface for constructing classes is available through the
[SfgClassComposer][pystencilssfg.composer.SfgClassComposer].
"""
def __init__( def __init__(
self, self,
class_name: str, name: str,
namespace: SfgNamespace,
class_keyword: SfgClassKeyword = SfgClassKeyword.CLASS, class_keyword: SfgClassKeyword = SfgClassKeyword.CLASS,
bases: Sequence[str] = (), bases: Sequence[str] = (),
): ):
if isinstance(bases, str): if isinstance(bases, str):
raise ValueError("Base classes must be given as a sequence.") raise ValueError("Base classes must be given as a sequence.")
self._class_name = class_name super().__init__(name, namespace)
self._class_keyword = class_keyword self._class_keyword = class_keyword
self._bases_classes = tuple(bases) self._bases_classes = tuple(bases)
self._default_block = SfgVisibilityBlock(SfgVisibility.DEFAULT)
self._default_block._bind(self)
self._blocks = [self._default_block]
self._definitions: list[SfgInClassDefinition] = []
self._constructors: list[SfgConstructor] = [] self._constructors: list[SfgConstructor] = []
self._methods: dict[str, SfgMethod] = dict() self._methods: list[SfgMethod] = []
self._member_vars: dict[str, SfgMemberVariable] = dict() self._member_vars: dict[str, SfgMemberVariable] = dict()
@property @property
def class_name(self) -> str: def src_type(self) -> PsType:
return self._class_name # TODO: Use CppTypeFactory instead
return PsCustomType(self._name)
@property
def src_type(self) -> SrcType:
return SrcType(self._class_name)
@property @property
def base_classes(self) -> tuple[str, ...]: def base_classes(self) -> tuple[str, ...]:
...@@ -421,42 +480,15 @@ class SfgClass: ...@@ -421,42 +480,15 @@ class SfgClass:
def class_keyword(self) -> SfgClassKeyword: def class_keyword(self) -> SfgClassKeyword:
return self._class_keyword return self._class_keyword
@property
def default(self) -> SfgVisibilityBlock:
return self._default_block
def append_visibility_block(self, block: SfgVisibilityBlock):
if block.visibility == SfgVisibility.DEFAULT:
raise SfgException(
"Can't add another block with DEFAULT visibility to a class. Use `.default` instead.")
block._bind(self)
for m in block.members():
self._add_member(m, block.visibility)
self._blocks.append(block)
def visibility_blocks(self) -> Generator[SfgVisibilityBlock, None, None]:
yield from self._blocks
def members( def members(
self, visibility: SfgVisibility | None = None self, visibility: SfgVisibility | None = None
) -> Generator[SfgClassMember, None, None]: ) -> Generator[SfgClassMember, None, None]:
if visibility is None: if visibility is None:
yield from chain.from_iterable( yield from chain(
b.members() for b in self._blocks self._constructors, self._methods, self._member_vars.values()
) )
else: else:
yield from chain.from_iterable( yield from filter(lambda m: m.visibility == visibility, self.members())
b.members() for b in filter(lambda b: b.visibility == visibility, self._blocks)
)
def definitions(
self, visibility: SfgVisibility | None = None
) -> Generator[SfgInClassDefinition, None, None]:
if visibility is not None:
yield from filter(lambda m: m.visibility == visibility, self._definitions)
else:
yield from self._definitions
def member_variables( def member_variables(
self, visibility: SfgVisibility | None = None self, visibility: SfgVisibility | None = None
...@@ -480,47 +512,24 @@ class SfgClass: ...@@ -480,47 +512,24 @@ class SfgClass:
self, visibility: SfgVisibility | None = None self, visibility: SfgVisibility | None = None
) -> Generator[SfgMethod, None, None]: ) -> Generator[SfgMethod, None, None]:
if visibility is not None: if visibility is not None:
yield from filter( yield from filter(lambda m: m.visibility == visibility, self._methods)
lambda m: m.visibility == visibility, self._methods.values()
)
else: else:
yield from self._methods.values() yield from self._methods
# PRIVATE def add_member(self, member: SfgClassMember, vis: SfgVisibility):
def _add_member(self, member: SfgClassMember, vis: SfgVisibility):
if isinstance(member, SfgConstructor): if isinstance(member, SfgConstructor):
self._add_constructor(member) self._constructors.append(member)
elif isinstance(member, SfgMemberVariable): elif isinstance(member, SfgMemberVariable):
self._add_member_variable(member) self._add_member_variable(member)
elif isinstance(member, SfgMethod): elif isinstance(member, SfgMethod):
self._add_method(member) self._methods.append(member)
elif isinstance(member, SfgInClassDefinition):
self._add_definition(member)
else: else:
raise SfgException(f"{member} is not a valid class member.") raise SfgException(f"{member} is not a valid class member.")
member._bind(self, vis)
def _add_definition(self, definition: SfgInClassDefinition):
self._definitions.append(definition)
def _add_constructor(self, constr: SfgConstructor):
# TODO: Check for signature conflicts?
self._constructors.append(constr)
def _add_method(self, method: SfgMethod):
if method.name in self._methods:
raise SfgException(
f"Duplicate method name {method.name} in class {self._class_name}"
)
self._methods[method.name] = method
def _add_member_variable(self, variable: SfgMemberVariable): def _add_member_variable(self, variable: SfgMemberVariable):
if variable.name in self._member_vars: if variable.name in self._member_vars:
raise SfgException( raise SfgException(
f"Duplicate field name {variable.name} in class {self._class_name}" f"Duplicate field name {variable.name} in class {self._name}"
) )
self._member_vars[variable.name] = variable self._member_vars[variable.name] = variable