helpers¶
Helpers.
This module contains functionality that does not belong to the main domain of the simaticai module.
TensorRTOptimization ¶
Class representing TensorRT optimization configuration.
This class provides methods to configure the optimization parameters for TensorRT.
Attributes:
| Name | Type | Description |
|---|---|---|
allowed_parameters | list | List of allowed parameter names. |
parameters | dict | Dictionary containing the optimization parameters. |
Methods:
| Name | Description |
|---|---|
add_extra_parameter | str, value: str) Adds an extra parameter to the TensorRT optimization. allowed_parameters: "precision_mode", "trt_engine_cache_enable", "trt_engine_cache_path", "max_cached_engines", "minimum_segment_size", "max_workspace_size_bytes" |
Intended usage
gpu_accelerator = TensorRTOptimization(precision_mode = TensorRTOptimization.PrecisionMode.FP16) .add_extra_parameter("minimum_segment_size", 3) model_config = ModelConfig(model_config, max_batch_size = 1, optimization = gpu_accelerator)
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/model_config.py
class TensorRTOptimization:
"""
Class representing TensorRT optimization configuration.
This class provides methods to configure the optimization parameters for TensorRT.
Attributes:
allowed_parameters (list): List of allowed parameter names.
parameters (dict): Dictionary containing the optimization parameters.
Methods:
add_extra_parameter(self, key: str, value: str)
Adds an extra parameter to the TensorRT optimization.
allowed_parameters: "precision_mode", "trt_engine_cache_enable", "trt_engine_cache_path",
"max_cached_engines", "minimum_segment_size", "max_workspace_size_bytes"
Intended usage:
gpu_accelerator = TensorRTOptimization(precision_mode = TensorRTOptimization.PrecisionMode.FP16)
.add_extra_parameter("minimum_segment_size", 3)
model_config = ModelConfig(model_config, max_batch_size = 1, optimization = gpu_accelerator)
"""
class PrecisionMode(Enum):
"""
Enum class for different precision modes in TensorRT optimization.
"""
FP32 = "FP32"
FP16 = "FP16"
allowed_parameters = ["precision_mode",
"max_cached_engines", "minimum_segment_size", "max_workspace_size_bytes",
"trt_engine_cache_enable", "trt_engine_cache_path"
]
def __init__(self,
precision_mode: PrecisionMode = PrecisionMode.FP32):
"""
Initializes a new instance of the TensorRTOptimization class.
Args:
precision_mode (PrecisionMode): The precision mode for the TensorRT optimization.
"""
self.parameters = {
"precision_mode": precision_mode.value,
"trt_engine_cache_enable": "true",
"trt_engine_cache_path": "/tmp/triton"
}
def add_extra_parameter(self, key: str, value: str):
"""
Add extra parameter to the TensorRT optimization.
Args:
key (str): The key of the parameter.
value (str): The value of the parameter.
"""
assert key in self.allowed_parameters, f"Parameter '{key}' is not allowed"
if key in self.parameters:
_logger.warn(f"Parameter '{key}' already exists with value {self.parameters[key]} and will be overwritten with value {value}")
self.parameters[key] = value
return self
def __str__(self):
"""
Returns a string representation of the TensorRTOptimization object.
"""
return TPL_TENSORRT_ACCELERATOR.format(extra_parameters=self._parameters_to_string())
def _parameters_to_string(self):
"""
Converts the parameters dictionary to a string representation.
"""
return "\n\t\t".join([f"parameters {{ key: \"{key}\" value: \"{value}\" }}" for key, value in self.parameters.items()])
PrecisionMode ¶
Bases: Enum
Enum class for different precision modes in TensorRT optimization.Source code in
docs/industrial-ai-suite/sdk/simaticai/helpers/model_config.pyclass PrecisionMode(Enum):
"""
Enum class for different precision modes in TensorRT optimization.
"""
FP32 = "FP32"
FP16 = "FP16"
__init__(precision_mode=PrecisionMode.FP32) ¶
Initializes a new instance of the TensorRTOptimization class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
precision_mode | PrecisionMode | The precision mode for the TensorRT optimization. | FP32 |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/model_config.py
def __init__(self,
precision_mode: PrecisionMode = PrecisionMode.FP32):
"""
Initializes a new instance of the TensorRTOptimization class.
Args:
precision_mode (PrecisionMode): The precision mode for the TensorRT optimization.
"""
self.parameters = {
"precision_mode": precision_mode.value,
"trt_engine_cache_enable": "true",
"trt_engine_cache_path": "/tmp/triton"
}
add_extra_parameter(key, value) ¶
Add extra parameter to the TensorRT optimization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key | str | The key of the parameter. | required |
value | str | The value of the parameter. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/model_config.py
def add_extra_parameter(self, key: str, value: str):
"""
Add extra parameter to the TensorRT optimization.
Args:
key (str): The key of the parameter.
value (str): The value of the parameter.
"""
assert key in self.allowed_parameters, f"Parameter '{key}' is not allowed"
if key in self.parameters:
_logger.warn(f"Parameter '{key}' already exists with value {self.parameters[key]} and will be overwritten with value {value}")
self.parameters[key] = value
return self
__str__() ¶
Returns a string representation of the TensorRTOptimization object.Source code in
docs/industrial-ai-suite/sdk/simaticai/helpers/model_config.pydef __str__(self):
"""
Returns a string representation of the TensorRTOptimization object.
"""
return TPL_TENSORRT_ACCELERATOR.format(extra_parameters=self._parameters_to_string())
parse_lines(lines) ¶
Processes dependency requirement lines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lines | list[str] | list of dependencies and (extra) index urls. | required |
Returns:
| Type | Description |
|---|---|
(dict[str, Spec], list[str]) | dictionary that contains the dependency specifcations and a list of (extra) index urls. |
Raises:
| Type | Description |
|---|---|
AssertionError | if the lines contain invalid dependency specifications. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/pep508.py
def parse_lines(lines: list[str]) -> "(dict[str, Spec], list[str], str)":
"""
Processes dependency requirement lines.
Args:
lines (list[str]): list of dependencies and (extra) index urls.
Returns:
(dict[str, Spec], list[str]): dictionary that contains the dependency specifcations and a list of (extra) index urls.
Raises:
AssertionError: if the lines contain invalid dependency specifications.
"""
dependencies = {}
invalid_lines = []
extra_index = []
index_url = None
lines = map(lambda row: row.strip(), lines)
lines = filter(lambda row: row != "" and not row.startswith("#"), lines)
for line in lines:
if line.startswith('--index-url'):
index_url = line
continue
if line.startswith('--extra-index-url'):
extra_index.append(line)
continue
try:
spec: Spec = parse_line(line)
dependencies[spec.name] = spec
except Exception as e:
invalid_lines.append(f"{line}\n{e}")
if len(invalid_lines) > 0:
raise AssertionError('\n'.join(invalid_lines))
return (dependencies, extra_index, index_url)
parse_requirements(requirements_path) ¶
Processes a requirements.txt file line-by-line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requirements_path | Union[str, PathLike] | Union[str, os.PathLike]: path to the requirements.txt file. | required |
Returns:
| Type | Description |
|---|---|
(dict[str, Spec], list[str]) | dictionary that contains the dependency specifcations and a list of (extra) index urls. |
Raises:
| Type | Description |
|---|---|
AssertionError | if the lines contain invalid dependency specifications. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/pep508.py
def parse_requirements(requirements_path: Union[str, os.PathLike]) -> "(dict[str, Spec], list[str], str)":
"""
Processes a requirements.txt file line-by-line.
Args:
requirements_path: Union[str, os.PathLike]: path to the requirements.txt file.
Returns:
(dict[str, Spec], list[str]): dictionary that contains the dependency specifcations and a list of (extra) index urls.
Raises:
AssertionError: if the lines contain invalid dependency specifications.
"""
with open(Path(requirements_path), "r") as f:
lines = f.readlines()
try:
return parse_lines(lines)
except AssertionError as err:
raise AssertionError(f"Requirements file '{requirements_path}' contains invalid dependency specifications:\n{str(err)}")
parse_pyproject_toml(pyproject_path) ¶
Processes a pyproject.toml file line-by-line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pyproject_path | Union[str, PathLike] | Union[str, os.PathLike]: path to the pyproject.toml file. | required |
Returns:
| Type | Description |
|---|---|
(dict[str, Spec], list[str]) | dictionary that contains the dependency specifcations and a list of (extra) index urls. |
Raises:
| Type | Description |
|---|---|
AssertionError | if the lines contain invalid dependency specifications. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/pep508.py
def parse_pyproject_toml(pyproject_path: Union[str, os.PathLike]) -> "(dict[str, Spec], list[str], str)":
"""
Processes a pyproject.toml file line-by-line.
Args:
pyproject_path: Union[str, os.PathLike]: path to the pyproject.toml file.
Returns:
(dict[str, Spec], list[str]): dictionary that contains the dependency specifcations and a list of (extra) index urls.
Raises:
AssertionError: if the lines contain invalid dependency specifications.
"""
with open(Path(pyproject_path), "rb") as f:
pyproject = toml.load(f)
if 'project' in pyproject and 'dependencies' in pyproject['project']:
lines = pyproject['project']['dependencies']
else:
raise AssertionError(f"The file '{pyproject_path}' must contain a [project] section with a [dependencies] field.")
try:
return parse_lines(lines)
except AssertionError as err:
raise AssertionError(f"The file '{pyproject_path}' contains invalid dependency specifications:\n{str(err)}")
Classes to generate a report for a dataflow pipeline and local pipeline runner.
ReportWriter ¶
Base class for report writers.Source code in
docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.pyclass ReportWriter:
"""
Base class for report writers.
"""
def __init__(self):
self.report_path = None
self.warnings_text = ""
self.errors_text = ""
def set_path(self, report_path: Path):
self.report_path = report_path
def add_warning(self, name, filename, line_number, warning_msg):
self.warnings_text += WARNING_LINE.format(name=name, filename=filename, line_number=line_number, warning_msg=warning_msg)
def add_error(self, name, filename, line_number, error_msg):
self.errors_text += ERROR_LINE.format(name=name, filename=filename, line_number=line_number, error_msg=error_msg)
def write_report(self):
raise NotImplementedError("Subclasses should implement this method")
def _write_warnings(self, file):
file.write(WARNINGS_HEADLINE)
file.write(self.warnings_text)
def _write_errors(self, file):
file.write(ERRORS_HEADLINE)
file.write(self.errors_text)
ReportWriterHandler ¶
Bases: Handler
A handler that can be given to a logger, so the report writer can capture logged warning and error messagesSource code in
docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.pyclass ReportWriterHandler(logging.Handler):
"""
A handler that can be given to a logger, so the report writer can capture logged warning and error messages
"""
def __init__(self, report_writer: ReportWriter):
super().__init__()
self.report_writer = report_writer
def emit(self, record):
if record.levelno == logging.WARNING:
self.report_writer.add_warning(record.name, record.filename, record.lineno, record.getMessage())
elif record.levelno == logging.ERROR:
self.report_writer.add_error(record.name, record.filename, record.lineno, record.getMessage())
ZipTreeElement ¶
A class to represent a file or folder in a zip file. During the recursive traversal of the zip file, the full name and file size are stored in this class.Source code in
docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.pyclass ZipTreeElement:
"""
A class to represent a file or folder in a zip file. During the recursive traversal of the zip file,
the full name and file size are stored in this class.
"""
def __init__(self, full_name, file_size):
self.full_name = full_name
self.file_size = file_size
PipelineReportWriter ¶
Bases: ReportWriter
A class to generate a report for a dataflow pipeline, including pipeline structure, component dependencies, and package vulnerabilities.
Methods:
| Name | Description |
|---|---|
set_path | Path): Sets the path where the report will be saved. |
set_pipeline_config | dict): Sets the pipeline configuration and updates the pipeline info and structure. |
add_full_dependency_set | str, dependency_set: set[tuple]): Adds a full set of dependencies for a component and updates the vulnerability dictionary. |
add_direct_dependencies | str, direct_dependencies: dict): Adds direct dependencies for a component. |
add_warning | Adds a warning to the report. |
write_report | Writes the report to the specified path. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.py
class PipelineReportWriter(ReportWriter):
"""
A class to generate a report for a dataflow pipeline, including pipeline structure, component dependencies,
and package vulnerabilities.
Methods:
set_path(report_path: Path):
Sets the path where the report will be saved.
set_pipeline_config(pipeline_config: dict):
Sets the pipeline configuration and updates the pipeline info and structure.
add_full_dependency_set(component_name: str, dependency_set: set[tuple]):
Adds a full set of dependencies for a component and updates the vulnerability dictionary.
add_direct_dependencies(component_name: str, direct_dependencies: dict):
Adds direct dependencies for a component.
add_warning(name, filename, line_number, warning_msg):
Adds a warning to the report.
write_report():
Writes the report to the specified path.
"""
def __init__(self):
super().__init__()
self.pipeline_config = {} # pipeline config json
self.pipeline_name = "Unnamed pipeline"
# dependency_names and package_names are transformed dependency names (lowercase, underscore instead of dash)
# set from outside
self.component_direct_dependency_namelist = {} # component -> list of dependency_names (with NO version)
self.component_all_dependencies = {} # component -> set of tuples of (dependency_name, dependency_version)
# collected before writing
self.component_direct_dependencies = {} # component -> set of dependency_names
self.component_transitive_dependencies = {} # component -> set of tuples of (dependency_name, dependency_version)
self.vulnerability_dict = {} # (package_name, package_version) -> vulnerabilities (None | list of dictionaries)
# report text sections to fill
self.pipeline_structure_text = ""
self.pipeline_info_text = ""
self.warnings_text = ""
def set_pipeline_config(self, pipeline_config: dict):
self.pipeline_config = pipeline_config
self._set_pipeline_info()
self._set_pipeline_structure()
@staticmethod
def _sort_pipeline_dag(pipeline_dag: list) -> list:
"""
Sorts a pipeline DAG in order to show dataflow from Pipeline Inputs to Pipeline Outputs.
Databus component is a privileged source, and it is always the first component in the report.
Args:
pipeline_dag (list): The pipeline DAG is a list of dictionaries with "source" and "target" keys.
Returns:
A sorted list of dictionaries representing the pipeline DAG.
"""
pipeline_dag.sort(key=lambda x: (x["source"], x["target"]))
sorted_dag = [edge for edge in pipeline_dag if "Databus" in edge['source']]
if sorted_dag == []:
return pipeline_dag
pipeline_dag = [edge for edge in pipeline_dag if "Databus" not in edge['source']]
# Extracts name of the target or source component from the edge
name_of_component = lambda edge, target_or_source: edge[target_or_source].rsplit(".", 1)[0]
while len(pipeline_dag) > 0:
sorted_targets = [name_of_component(edge, "target") for edge in sorted_dag]
sorted_dag.extend([
edge for edge in pipeline_dag
if name_of_component(edge, "source") in sorted_targets
])
pipeline_dag = [edge for edge in pipeline_dag if edge not in sorted_dag]
return sorted_dag
def _set_pipeline_structure(self):
self.pipeline_name = self.pipeline_config.get("dataFlowPipelineInfo", {}).get("projectName", "n/a")
self.pipeline_structure_text = "```plantuml\n"
components = self.pipeline_config.get("dataFlowPipeline", {}).get("components", [])
variables = {} # name: (type, is_metric)
for component in components:
input_variables = {_input["name"]: (_input["type"], False) for _input in component.get("inputType")}
output_variables = {_output["name"]: (_output["type"], _output.get("metric", False)) for _output in component.get("outputType")}
variables.update({**input_variables, **output_variables})
pipeline_dag = self.pipeline_config.get("dataFlowPipeline", {}).get("pipelineDag", [])
sorted_pipeline_dag = PipelineReportWriter._sort_pipeline_dag(pipeline_dag)
for transition in sorted_pipeline_dag:
source_component_name, source_variable_name = transition["source"].rsplit(".", 1)
target_component_name, target_variable_name = transition["target"].rsplit(".", 1)
variable_name_to_show = source_variable_name if source_variable_name == target_variable_name else f"{source_variable_name} -> {target_variable_name}"
source_component_name = source_component_name.replace("Databus", "AIIS")
target_component_name = target_component_name.replace("Databus", "AIIS")
variable_type, is_metric = variables[source_variable_name]
arrow = "-->" if is_metric else "->" # metric variables are drawn with a dashed line
self.pipeline_structure_text += PL_STRUCTURE.format(source_component=source_component_name,
arrow=arrow,
target_component=target_component_name,
variable_name=variable_name_to_show,
variable_type=variable_type)
self.pipeline_structure_text += "```\n\n"
def _set_pipeline_info(self):
dataflow_pipeline_info = self.pipeline_config.get("dataFlowPipelineInfo", {})
author = dataflow_pipeline_info.get("author", "n/a")
created_on = dataflow_pipeline_info.get("createdOn", "n/a")
pipeline_version = dataflow_pipeline_info.get("dataFlowPipelineVersion", "n/a")
description = dataflow_pipeline_info.get("description", "n/a")
package_id = dataflow_pipeline_info.get("packageId", "n/a")
project_name = dataflow_pipeline_info.get("projectName", "n/a")
self.pipeline_info_text = PL_INFO.format(author=author,
created_on=created_on,
pipeline_version=pipeline_version,
description=description,
package_id=package_id,
project_name=project_name)
# Transform every dependency and package name for consistency; i.e.,
# opencv-python-headless -> opencv_python_headless; Django -> django
@staticmethod
def transform_package_name(name: str):
new_name = name.replace("-", "_")
return new_name.lower()
# A full dependency set is a set of (package_name, package_version) tuples
# and contains all the dependencies installed for a component
def add_full_dependency_set(self, component_name: str, dependency_set: set[tuple]):
dependency_list = sorted(list(dependency_set), key=lambda x: x[0])
self._expand_component_all_dependencies(component_name, dependency_list)
self._update_vulnerability_dict(dependency_list)
def _expand_component_all_dependencies(self, component_name: str, dependency_list: list[tuple]):
if component_name not in self.component_all_dependencies:
self.component_all_dependencies[component_name] = set()
for package_name, package_version in dependency_list:
transformed_package_name = PipelineReportWriter.transform_package_name(package_name)
self.component_all_dependencies[component_name].add((transformed_package_name, package_version))
def _update_vulnerability_dict(self, dependency_list: list[tuple]):
vulnerability_dict = {}
for package_name, package_version in dependency_list:
transformed_package_name = PipelineReportWriter.transform_package_name(package_name)
vulnerability_dict[(transformed_package_name, package_version)] = None
url = f"https://pypi.org/pypi/{package_name}/{package_version}/json"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if 'vulnerabilities' in data:
vulnerability_dict[(transformed_package_name, package_version)] = data['vulnerabilities']
except requests.exceptions.Timeout:
pass
self.vulnerability_dict.update(vulnerability_dict)
def add_direct_dependencies(self, component_name: str, direct_dependencies: dict):
self.component_direct_dependency_namelist[component_name] = [PipelineReportWriter.transform_package_name(name)
for name in list(direct_dependencies.keys())]
def write_report(self):
if self.report_path is None:
return
self._set_component_dependencies()
with open(self.report_path, "w", encoding="utf-8") as file:
self._write_headline(file)
self._write_pipeline_info(file)
self._write_pipeline_structure(file)
self._write_dependencies(file)
self._write_package_vulnerabilities(file)
self._write_errors(file)
self._write_warnings(file)
def _set_component_dependencies(self):
for component in self.component_all_dependencies.keys():
# self.component_direct_dependencies should contain everything from self.component_all_dependencies
# if it is direct, i.e., the name is in self.component_direct_dependency_namelist
# self.component_transitive_dependencies should contain everything else
self.component_transitive_dependencies[component] = set()
self.component_direct_dependencies[component] = set()
all_dependencies = self.component_all_dependencies[component]
for dependency_name, dependency_version in all_dependencies:
if dependency_name in self.component_direct_dependency_namelist.get(component, []):
self.component_direct_dependencies[component].add((dependency_name, dependency_version))
else:
self.component_transitive_dependencies[component].add((dependency_name, dependency_version))
def _write_headline(self, file):
file.write(PL_REPORT_HEADLINE.format(pipeline_name=self.pipeline_name))
def _write_pipeline_info(self, file):
file.write(PL_INFO_HEADLINE)
file.write(self.pipeline_info_text)
def _write_pipeline_structure(self, file):
file.write(PL_STRUCTURE_HEADLINE)
file.write(self.pipeline_structure_text)
def _write_dependencies(self, file):
for component_name in self.component_all_dependencies.keys():
direct_dependencies = self.component_direct_dependencies.get(component_name, set())
transitive_dependencies = self.component_transitive_dependencies.get(component_name, set())
file.write(PL_COMPONMENT_DEPENDENCIES_HEADLINE.format(component_name=component_name))
file.write(PL_COMPONENT_DIRECT_DEPENDENCIES_HEADLINE)
sorted_direct_dependencies = sorted(list(direct_dependencies), key=lambda x: x[0])
for dependency_name, dependency_version in sorted_direct_dependencies:
file.write(PL_COMPONENT_DIRECT_DEPENDENCY.format(dependency_name=dependency_name,
dependency_version=dependency_version))
file.write("\n")
file.write(PL_COMPONENT_TRANSITIVE_DEPENDENCIES_HEADLINE)
sorted_transitive_dependencies = sorted(list(transitive_dependencies), key=lambda x: x[0])
for dependency_name, dependency_version in sorted_transitive_dependencies:
file.write(PL_COMPONENT_TRANSITIVE_DEPENDENCY.format(dependency_name=dependency_name,
dependency_version=dependency_version))
file.write("\n")
def _get_components_who_have_given_package(self, package_name, package_version):
components = []
for component in self.component_all_dependencies:
dependencies = self.component_all_dependencies[component]
if (package_name, package_version) in dependencies:
components.append(component)
return components
def _write_package_vulnerabilities(self, file):
file.write(PL_PACKAGE_VULNERABILITIES_HEADLINE)
sorted_vulnerability_dict_items = sorted(self.vulnerability_dict.items(), key=lambda x: x[0][0])
for (package_name, package_version), vulnerabilities in sorted_vulnerability_dict_items:
components = ', '.join(self._get_components_who_have_given_package(package_name, package_version))
if vulnerabilities is None:
file.write(PL_PACKAGE_VULNERABILITY_CANNOT_BE_CHECKED.format(package_name=package_name,
package_version=package_version,
components=components))
elif vulnerabilities == []:
file.write(PL_PACKAGE_VULNERABILITY_NOT_KNOWN.format(package_name=package_name,
package_version=package_version,
components=components))
else:
for vulnerability in vulnerabilities:
vulnerability_aliases = vulnerability.get('aliases', 'Vulnerability found with no alias. Check [PyPI repository](https://pypi.org/) for more details.')
vulnerability_link = vulnerability.get('link', 'No link found')
if vulnerability_link != 'No link found':
vulnerability_link = f"[{vulnerability_link}]({vulnerability_link})"
vulnerability_details = vulnerability.get('details', 'No details found')
vulnerability_fixed_in = vulnerability.get('fixed_in', '')
file.write(PL_PACKAGE_VULNERABILITY.format(package_name=package_name,
package_version=package_version,
vulnerability_aliases=vulnerability_aliases,
vulnerability_link=vulnerability_link,
vulnerability_details=vulnerability_details,
vulnerability_fixed_in=vulnerability_fixed_in,
components=components))
file.write("\n")
PipelineRunnerReportWriter ¶
Bases: ReportWriter
PipelineRunnerReportWriter is responsible for generating a detailed report of a local pipeline execution. It builds folder structures from zip files, manages component payload counts, and adds installed packages information.
Methods:
| Name | Description |
|---|---|
set_path | Path): Sets the path where the report will be saved. |
set_package_zip_path | Path): Sets the path to the package zip file and updates the folder tree. |
set_input_payload_length | str, length: int): Sets the input payload length for a component. |
set_output_payload_length | str, length: int): Sets the output payload length for a component. |
add_installed_packages | str, pip_report_file: Path): Adds installed packages for a component from a pip report file. |
add_warning | Adds a warning to the report. |
write_report | Writes the report to the specified path. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.py
class PipelineRunnerReportWriter(ReportWriter):
"""
PipelineRunnerReportWriter is responsible for generating a detailed report of a local pipeline execution.
It builds folder structures from zip files, manages component payload counts, and adds installed packages information.
Methods:
set_path(report_path: Path):
Sets the path where the report will be saved.
set_package_zip_path(zip_path: Path):
Sets the path to the package zip file and updates the folder tree.
set_input_payload_length(component_name: str, length: int):
Sets the input payload length for a component.
set_output_payload_length(component_name: str, length: int):
Sets the output payload length for a component.
add_installed_packages(component_name: str, pip_report_file: Path):
Adds installed packages for a component from a pip report file.
add_warning(name, filename, line_number, warning_msg):
Adds a warning to the report.
write_report():
Writes the report to the specified path.
"""
def __init__(self):
super().__init__()
self.package_zip_path = None
self.zip_file_name = ""
self.component_installed_packages = {} # component_name -> list[tuple(package_name, package_version, whl_name)]
self.component_payload_length = {} # component_name -> [input_payload_length, output_payload_length]
self.python_packages_zip_content = set()
# report text sections to fill
self.folder_tree_text = ""
self.warnings_text = ""
def set_package_zip_path(self, zip_path: Path):
self.package_zip_path = zip_path
with zipfile.ZipFile(zip_path, 'r') as zipf:
self.zip_file_name = zipf.filename
zip_tree = {}
for item_name in zipf.namelist():
zip_tree[item_name] = ZipTreeElement(full_name=item_name, file_size=zipf.getinfo(item_name).file_size)
self._print_structure_recursively(zip_tree, zipf)
@staticmethod
def _get_folder_and_file_list(item_names: list) -> tuple[list, list]:
"""
Given a list of item names, each item name is a file that either starts with a folder name, or not.
This function separates the folder names and the standalone file names.
E.g., ["a/b/something.txt", "c/another.txt", "else.txt"] -> ["a/", "c/"], ["else.txt"]
"""
folder_names = set()
file_names = []
for item in item_names:
item_parts = item.split('/')
if len(item_parts) > 1:
if item_parts[0] != '':
folder_names.add(item_parts[0] + '/')
else:
if item != '':
file_names.append(item)
return sorted(list(folder_names)), sorted(file_names)
@staticmethod
def format_size(size):
"""Format file size in human-readable form."""
for unit in ['B', 'KB', 'MB']:
if size < 1000:
return f"{size} {unit}"
size //= 1000
return f"{size} GB"
def _print_structure_recursively(self, zip_tree, zipf, prefix=""):
folder_names, file_names = PipelineRunnerReportWriter._get_folder_and_file_list(zip_tree.keys())
is_file_names_empty = file_names == []
self._print_folder_structure(zip_tree, zipf, prefix, folder_names, is_file_names_empty)
self._print_file_structure(zip_tree, zipf, prefix, file_names)
def _print_folder_structure(self, zip_tree, zipf, prefix, folder_names, is_file_names_empty):
for i, folder in enumerate(folder_names):
is_last = (i == len(folder_names) - 1) and is_file_names_empty
connector = LPLR_FOLDER_STRUCTURE_LAST_CONNECTOR_SYMBOL if is_last else LPLR_FOLDER_STRUCTURE_MID_CONNECTOR_SYMBOL
self.folder_tree_text += LPLR_FOLDER_STRUCTURE_FOLDER_LINE.format(prefix=prefix,
connector=connector,
folder=folder)
# create a new tree where items start with the same folder name; but cut out the folder name
new_zip_tree_from_folder = {}
for k, v in zip_tree.items():
if k.startswith(folder):
new_file_name = k.split('/', 1)[1]
new_zip_tree_from_folder[new_file_name] = v
prefix_post = LPLR_FOLDER_STRUCTURE_LAST_PREFIX_SYMBOL if is_last else LPLR_FOLDER_STRUCTURE_MID_PREFIX_SYMBOL
new_prefix_from_folder = prefix + prefix_post
self._print_structure_recursively(new_zip_tree_from_folder, zipf, new_prefix_from_folder)
def _print_file_structure(self, zip_tree, zipf, prefix, file_names):
for i, file_name in enumerate(file_names):
is_last = (i == len(file_names) - 1)
size_str = PipelineRunnerReportWriter.format_size(zip_tree[file_name].file_size)
connector = LPLR_FOLDER_STRUCTURE_LAST_CONNECTOR_SYMBOL if is_last else LPLR_FOLDER_STRUCTURE_MID_CONNECTOR_SYMBOL
self.folder_tree_text += LPLR_FOLDER_STRUCTURE_FILE_LINE.format(prefix=prefix,
connector=connector,
file=file_name,
size=size_str)
# zip files are handled similarly to folders:
# create a new tree where items start with the same folder name; but cut out the folder name
if not file_name.endswith('.zip'):
continue
full_name = zip_tree[file_name].full_name
with zipf.open(full_name) as nested_zip_file:
nested_zip_data = io.BytesIO(nested_zip_file.read())
with zipfile.ZipFile(nested_zip_data, 'r') as nested_zipf:
if file_name == "PythonPackages.zip":
self.python_packages_zip_content.update(sorted(list(nested_zipf.namelist())))
# create a new tree where items start with the same folder name; but cut out the folder name
new_zip_tree_from_zip = {}
for nested_item_name in nested_zipf.namelist():
new_zip_tree_from_zip[nested_item_name] = ZipTreeElement(nested_item_name,
nested_zipf.getinfo(nested_item_name).file_size)
prefix_post = LPLR_FOLDER_STRUCTURE_LAST_PREFIX_SYMBOL if is_last else LPLR_FOLDER_STRUCTURE_MID_PREFIX_SYMBOL
new_prefix_from_zip = prefix + prefix_post
self._print_structure_recursively(new_zip_tree_from_zip, nested_zipf, new_prefix_from_zip)
def set_input_payload_length(self, component_name: str, length: int):
if component_name in self.component_payload_length:
self.component_payload_length[component_name][0] = length
else:
self.component_payload_length[component_name] = [length, 0]
def set_output_payload_length(self, component_name: str, length: int):
if component_name in self.component_payload_length:
self.component_payload_length[component_name][1] = length
else:
self.component_payload_length[component_name] = [0, length]
def add_installed_packages(self, component_name: str, pip_report_file: Path):
if not pip_report_file.is_file():
return
pip_report = {}
with open(pip_report_file, 'r', encoding="utf-8") as file:
pip_report = json.load(file)
if component_name not in self.component_installed_packages:
self.component_installed_packages[component_name] = []
installed_packages = pip_report.get("install", [])
for package in installed_packages:
package_url = package.get("download_info", {}).get("url", "")
wheel_name = package_url.split("/")[-1] if package_url.endswith(".whl") else "n/a"
metadata = package.get("metadata", {})
package_name = metadata.get("name", "n/a")
package_version = metadata.get("version", "n/a")
self.component_installed_packages[component_name].append((package_name, package_version, wheel_name))
# check if one or more reports already exists; set the report path so a new report will have a new index
def _set_path_from_zip_path(self):
if self.package_zip_path is None:
return
workdir = self.package_zip_path.parent
base_name = self.package_zip_path.stem
report_files = list(workdir.glob(f"{base_name}_execution_report_*.md"))
max_index = 0
for report_file in report_files:
try:
index = int(report_file.stem.split('_')[-1])
if index > max_index:
max_index = index
except ValueError:
continue
self.set_path(workdir / f"{base_name}_execution_report_{max_index + 1}.md")
def write_report(self):
# if the report path is not set, set it from the zip path
self._set_path_from_zip_path()
if self.report_path is None:
return
with open(self.report_path, "w", encoding="utf-8") as file:
self._write_headline(file)
self._write_folder_structure(file)
self._write_python_packages_zip_content(file)
self._write_component_installed_packages(file)
self._write_payload_lengths(file)
self._write_errors(file)
self._write_warnings(file)
def _write_headline(self, file):
file.write(LPLR_REPORT_HEADLINE)
def _write_folder_structure(self, file):
file.write(LPLR_FOLDER_STRUCTURE_HEADLINE.format(file_name=self.zip_file_name))
file.write(LPLR_FOLDER_STRUCTURE.format(file_name=self.zip_file_name, folder_structure=self.folder_tree_text))
def _write_python_packages_zip_content(self, file):
file.write(LPLR_PYTHON_PACKAGES_ZIP_CONTENT_HEADLINE)
sorted_zip_content = sorted(list(self.python_packages_zip_content))
for package in sorted_zip_content:
file.write(LPLR_PYTHON_PACKAGES_ZIP_CONTENT.format(python_package=package))
file.write("\n")
def _write_component_installed_packages(self, file):
file.write(LPLR_COMPONENT_INSTALLED_PACKAGES_HEADLINE)
for component in self.component_installed_packages:
file.write(LPLR_COMPONENT_INSTALLED_PACKAGES.format(component=component))
sorted_installed_packages = sorted(self.component_installed_packages[component], key=lambda x: x[0])
for package_name, package_version, wheel_name in sorted_installed_packages:
file.write(LPLR_COMPONENT_INSTALLED_PACKAGES_ROW.format(package_name=package_name,
package_version=package_version,
wheel_name=wheel_name))
file.write("\n")
def _write_payload_lengths(self, file):
file.write(LPLR_PAYLOAD_LENGTHS_HEADLINE)
for component in self.component_payload_length:
input_payload_length, output_payload_length = self.component_payload_length[component]
file.write(LPLR_PAYLOAD_LENGTHS.format(component=component,
input_payload_length=input_payload_length,
output_payload_length=output_payload_length))
format_size(size) staticmethod ¶
Format file size in human-readable form.Source code in
docs/industrial-ai-suite/sdk/simaticai/helpers/reporter.py@staticmethod
def format_size(size):
"""Format file size in human-readable form."""
for unit in ['B', 'KB', 'MB']:
if size < 1000:
return f"{size} {unit}"
size //= 1000
return f"{size} GB"
Module for dealing with temporary files.
This module helps with extracting a zip file into a temporary folder.
OpenZipInTemp ¶
Unzip a zip archive into a temporary directory.
Example usage:
with OpenZipInTemp("path_to_zip_file.zip") as temp_dir:
# do something with temp_dir
pass
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
zip_path | path - like | path to the archive. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/tempfiles.py
class OpenZipInTemp:
"""
Unzip a zip archive into a temporary directory.
Example usage:
with OpenZipInTemp("path_to_zip_file.zip") as temp_dir:
# do something with temp_dir
pass
Args:
zip_path (path-like): path to the archive.
"""
def __init__(self, zip_path: Union[str, os.PathLike], clean_up: bool = True):
if not zipfile.is_zipfile(zip_path):
raise ValueError(f"File does not exist or not a zip file: {zip_path}")
self.zip_path = zip_path
self.tmp_path = None
self.clean_up = clean_up
def __enter__(self) -> Path:
self.tmp_path = Path(tempfile.mkdtemp(prefix="unzip-"))
with zipfile.ZipFile(self.zip_path, "r") as zip_file:
zip_file.extractall(path=self.tmp_path)
return self.tmp_path
def __exit__(self, e_type, e_val, e_trace):
if self.clean_up:
shutil.rmtree(self.tmp_path, ignore_errors=True)
Helper module for YAML files.
Reads YAML files into a dictionary with a custom loader.
read_yaml(path) ¶
Read a YAML file into a dictionary.
Loads the YAML file specified by path. The YAML loader is configured to read datetime objects as strings, for simplifying validation with a JSON schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path | path - like | Path of the YAML file. | required |
Returns:
| Name | Type | Description |
|---|---|---|
dict | dict | A dictionary, populated from the YAML file. |
Source code in docs/industrial-ai-suite/sdk/simaticai/helpers/yaml_helper.py
def read_yaml(path: os.PathLike | str) -> dict:
"""
Read a YAML file into a dictionary.
Loads the YAML file specified by `path`. The YAML loader is configured
to read datetime objects as strings, for simplifying validation with a JSON schema.
Args:
path (path-like): Path of the YAML file.
Returns:
dict: A dictionary, populated from the YAML file.
"""
_remove_implicit_resolver(yaml.SafeLoader)
path = Path(path)
with open(path, "r", encoding="utf8") as file:
return yaml.load(file, Loader=yaml.SafeLoader)