The chat responses are generated using Generative AI technology for intuitive search and may not be entirely accurate. They are not intended as professional advice. For full details, including our use rights, privacy practices and potential export control restrictions, please refer to our Generative AI Service Privacy Information. As this is a test version, please let us know if something irritating comes up. Like you get recommended a chocolate fudge ice cream instead of an energy managing application. If that occurs, please use the feedback button in our contact form!
Skip to content

Revolutionize your AI operations across locations with seamless cloud integration. Our Industrial AI Suite runs on a new line of Industrial PCs powered by NVIDIA's GPUs accelerating AI execution. This makes complex AI tasks in advanced automation broadly available and boosts efficiency.

deploy

Pipeline packaging.

This module contains classes and functionality for creating and validating pipeline configuration packages.

Component

Base class for pipeline components, with name, description, and a list of inputs and outputs.

A new component is created with the given name and an empty input and output list.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
class Component:
    """Base class for pipeline components, with name, description, and a list of inputs and outputs.

    A new component is created with the given name and an empty input and output list.
    """
    reserved_names = ["timestamp"]

    @dataclass
    class BatchInfo:
        """
        Batch information for the component.

        This attribute specifies whether the component can handle batch input or output data.
        When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary.
        It is important to note that the input and output variables on the component should still be defined as if they are single variables.

        If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.
        """
        inputBatch: bool = False
        outputBatch: bool = False

        def dict(self):
            return {
                'inputBatch': 'Yes' if self.inputBatch is True else 'No',
                'outputBatch': 'Yes' if self.outputBatch is True else 'No'
            }

    def __init__(self, name: str, desc: str = ""):
        """
        Creates a new component with the given name and an empty input and output list.

        Args:
            name (str): Name of the component.
            desc (str): Optional description of the component
        """

        if _allowed_characters_in_names.match(name) is None:
            raise AssertionError("Component name contains invalid character. The allowed characters are [-a-zA-Z0-9_].")
        self.name = name
        self.desc = desc
        self.inputs = {}
        self.outputs = {}

        self.batch = self.BatchInfo(False, False)

    def __repr__(self) -> str:
        text = f"[{self.__class__.__name__}] {self.name}\n"
        if self.desc != "":
            text += f"{self.desc}\n"
        if len(self.inputs) > 0:
            text += "\nComponent Inputs:\n"
            for name, input in self.inputs.items():
                text += f"> {name} ({input['type']}){': ' + input['desc'] if input.get('desc') is not None else ''}\n"
        if len(self.outputs) > 0:
            text += "\nComponent Outputs:\n"
            for name, output in self.outputs.items():
                text += f"< {name} ({output['type']}){': ' + output['desc'] if output.get('desc') is not None else ''}\n"
        return text

    def add_input(self, name: str, _type: str, desc: Optional[str] = None):
        """
        Adds a new input to the component with its type.
        Name of the variables cannot be reserved name like 'timestamp'.
        Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.

        Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
        In case the type is not known by the AI SDK, a warning message will be printed.
        The most frequently used types are
        - String:
            Typically used for data received from Databus
        - Object:
            Object type variables are designed to receive from Vision Connect or transfer images between components
        - Numeric scalar types:
            Typically used for data received from S7 Connector

        The example payload below shows the format of image received from VCA Connector
        ```python
            payload = { "image":
                {
                    "resolutionWidth": image.width,
                    "resolutionHeight": image.height,
                    "mimeType": ["image/raw"],
                    "dataType": "uint8",
                    "channelsPerPixel": 3,
                    "image": _swap_bytes(image.tobytes())
                }
            }
        ```
        Between components the format is the same format as the format of Object as an output.
        ```python
            "processedImage": {
                "metadata": json.dumps( {
                                "resolutionWidth": image.width,
                                "resolutionHeight": image.height
                                }
                            ),
                "bytes": image.tobytes()
            }
        ```

        Args:
            name (str): Name of the new input.
            _type (str): Type of the new input.
            desc (str): Description of the input. (optional)
        """
        if self.inputs is None:
            self.inputs = {}
        if name in self.inputs:
            raise AssertionError(f"Input '{name}' already exists.")
        if name.lower() in self.reserved_names:
            raise AssertionError(f"Input '{name}' is a reserved keyword.")

        flags = type_validation_flags(_type)
        if flags:
            flags_str = ", ".join([f.value for f in flags])
            _logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")

        self.inputs[name] = {
            "type": _type,
        }
        if desc is not None:
            self.inputs[name]['desc'] = desc

    def change_input(self, name: str, _type: str, desc: Optional[str] = None):
        """
        Changes one of the inputs of the component.

        Args:
            name (str): Name of the input to be changed.
            _type (str): New type of the input.
            desc (str): Description of the input. (optional)
        """
        if name not in self.inputs:
            raise AssertionError(f"There is no input with name '{name}'")
        flags = type_validation_flags(_type)
        if flags:
            flags_str = ", ".join([f.value for f in flags])
            _logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
        self.inputs[name]['type'] = _type
        if desc is not None:
            self.inputs[name]['desc'] = desc

    def delete_input(self, name: str):
        """
        Deletes an input from the component by name.
        Once the package has been created with the given component, it is recommended not to change the component directly.
        Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
        It is recommended to use `package.delete_input_wire(...)` with default parameter `with_input=True`.

        Args:
            name (str): Name of the input to be deleted.
        """
        if name not in self.inputs:
            raise AssertionError(f"Component '{self.name}' has no input '{name}'")
        self.inputs.pop(name)

    def add_output(self, name: str, _type: str, desc: Optional[str] = None):
        """
        Adds a new output to the component.

        Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
        In case the type is not known by the AI SDK, a warning message will be printed.
        The most frequently used types are
        - String:
            Typically used for data to be sent to Databus
        - Object:
            Typically used for images to be sent to ZMQ Connector
        - Numeric scalar types:
            Typically used for data sent to S7 Connector

        For outputs of type `Object` the entrypoint must return with a `dictionary` containing two fields, where one field has type `str` and the other field has type `bytes`.
        The example below shows the required format, assuming that 'image' is a PIL Image.
        ```python
            "processedImage": {
                "metadata": json.dumps( {
                                "resolutionWidth": image.width,
                                "resolutionHeight": image.height
                                }
                            ),
                "bytes": image.tobytes()
            }
        ```

        Args:
            name (str): Name of the new output.
            _type (str): Type of the new output.
            desc (str): Description of the output. (optional)
        """
        if self.outputs is None:
            self.outputs = {}
        if name in self.outputs:
            raise AssertionError(f"Output '{name}' already exists")
        if name.lower() in self.reserved_names:
            raise AssertionError(f"Output '{name}' is a reserved keyword.")
        flags = type_validation_flags(_type)
        if flags:
            flags_str = ", ".join([f.value for f in flags])
            _logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
        self.outputs[name] = {
            "type": _type,
        }
        if desc is not None:
            self.outputs[name]['desc'] = desc

    def change_output(self, name: str, _type: str, desc: Optional[str] = None):
        """
        Changes one of the outputs of the component.

        Args:
            name (str): Name of the output to be changed.
            _type (str): The new type of the output.
            desc (str): Description of the output. (optional)
        """
        if name not in self.outputs:
            raise AssertionError(f"There is no output with name '{name}'")
        flags = type_validation_flags(_type)
        if flags:
            flags_str = ", ".join([f.value for f in flags])
            _logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
        self.outputs[name]['type'] = _type
        if desc is not None:
            self.outputs[name]['desc'] = desc

    def delete_output(self, name: str):
        """
        Deletes an output from the component by name.
        Once the package has been created with the given component, it is recommended not to change the component directly.
        Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
        Deleting an output which is represented in any wire will cause package inconsistency.

        Args:
            name (str): Name of the output to be deleted.
        """
        if name not in self.outputs:
            raise AssertionError(f"Component '{self.name}' has no output '{name}'")
        self.outputs.pop(name)

    def _to_dict(self):
        inputs = []
        inputs += [{
            'name': name,
            'type': self.inputs[name]['type'],
        } for name in self.inputs]

        outputs = []
        outputs += [{
            'name': name,
            'type': self.outputs[name]['type'],
            'metric': False,
        } for name in self.outputs]

        return {
            'name': self.name,
            'description': self.desc,
            'batch': self.batch.dict(),
            'inputType': inputs,
            'outputType': outputs,
        }

    def validate(self):
        """
        Empty method for child classess to implement.
        """
        pass

    def save(self, destination, validate):
        """
        Empty method for child classess to implement.
        """
        pass

BatchInfo dataclass

Batch information for the component.

This attribute specifies whether the component can handle batch input or output data. When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary. It is important to note that the input and output variables on the component should still be defined as if they are single variables.

If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
@dataclass
class BatchInfo:
    """
    Batch information for the component.

    This attribute specifies whether the component can handle batch input or output data.
    When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary.
    It is important to note that the input and output variables on the component should still be defined as if they are single variables.

    If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.
    """
    inputBatch: bool = False
    outputBatch: bool = False

    def dict(self):
        return {
            'inputBatch': 'Yes' if self.inputBatch is True else 'No',
            'outputBatch': 'Yes' if self.outputBatch is True else 'No'
        }

__init__(name, desc='')

Creates a new component with the given name and an empty input and output list.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the component.

required
descstr

Optional description of the component

''
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def __init__(self, name: str, desc: str = ""):
    """
    Creates a new component with the given name and an empty input and output list.

    Args:
        name (str): Name of the component.
        desc (str): Optional description of the component
    """

    if _allowed_characters_in_names.match(name) is None:
        raise AssertionError("Component name contains invalid character. The allowed characters are [-a-zA-Z0-9_].")
    self.name = name
    self.desc = desc
    self.inputs = {}
    self.outputs = {}

    self.batch = self.BatchInfo(False, False)

add_input(name, _type, desc=None)

Adds a new input to the component with its type. Name of the variables cannot be reserved name like 'timestamp'. Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.

Types supported by AI Inference Server version 1.6 are contained in the type_dictionary. Newer AI Inference server version may support additional types. In case the type is not known by the AI SDK, a warning message will be printed. The most frequently used types are - String: Typically used for data received from Databus - Object: Object type variables are designed to receive from Vision Connect or transfer images between components - Numeric scalar types: Typically used for data received from S7 Connector

The example payload below shows the format of image received from VCA Connector

    payload = { "image":
        {
            "resolutionWidth": image.width,
            "resolutionHeight": image.height,
            "mimeType": ["image/raw"],
            "dataType": "uint8",
            "channelsPerPixel": 3,
            "image": _swap_bytes(image.tobytes())
        }
    }
Between components the format is the same format as the format of Object as an output.
    "processedImage": {
        "metadata": json.dumps( {
                        "resolutionWidth": image.width,
                        "resolutionHeight": image.height
                        }
                    ),
        "bytes": image.tobytes()
    }

Parameters:

NameTypeDescriptionDefault
namestr

Name of the new input.

required
_typestr

Type of the new input.

required
descstr

Description of the input. (optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def add_input(self, name: str, _type: str, desc: Optional[str] = None):
    """
    Adds a new input to the component with its type.
    Name of the variables cannot be reserved name like 'timestamp'.
    Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.

    Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
    In case the type is not known by the AI SDK, a warning message will be printed.
    The most frequently used types are
    - String:
        Typically used for data received from Databus
    - Object:
        Object type variables are designed to receive from Vision Connect or transfer images between components
    - Numeric scalar types:
        Typically used for data received from S7 Connector

    The example payload below shows the format of image received from VCA Connector
    ```python
        payload = { "image":
            {
                "resolutionWidth": image.width,
                "resolutionHeight": image.height,
                "mimeType": ["image/raw"],
                "dataType": "uint8",
                "channelsPerPixel": 3,
                "image": _swap_bytes(image.tobytes())
            }
        }
    ```
    Between components the format is the same format as the format of Object as an output.
    ```python
        "processedImage": {
            "metadata": json.dumps( {
                            "resolutionWidth": image.width,
                            "resolutionHeight": image.height
                            }
                        ),
            "bytes": image.tobytes()
        }
    ```

    Args:
        name (str): Name of the new input.
        _type (str): Type of the new input.
        desc (str): Description of the input. (optional)
    """
    if self.inputs is None:
        self.inputs = {}
    if name in self.inputs:
        raise AssertionError(f"Input '{name}' already exists.")
    if name.lower() in self.reserved_names:
        raise AssertionError(f"Input '{name}' is a reserved keyword.")

    flags = type_validation_flags(_type)
    if flags:
        flags_str = ", ".join([f.value for f in flags])
        _logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")

    self.inputs[name] = {
        "type": _type,
    }
    if desc is not None:
        self.inputs[name]['desc'] = desc

change_input(name, _type, desc=None)

Changes one of the inputs of the component.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the input to be changed.

required
_typestr

New type of the input.

required
descstr

Description of the input. (optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def change_input(self, name: str, _type: str, desc: Optional[str] = None):
    """
    Changes one of the inputs of the component.

    Args:
        name (str): Name of the input to be changed.
        _type (str): New type of the input.
        desc (str): Description of the input. (optional)
    """
    if name not in self.inputs:
        raise AssertionError(f"There is no input with name '{name}'")
    flags = type_validation_flags(_type)
    if flags:
        flags_str = ", ".join([f.value for f in flags])
        _logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
    self.inputs[name]['type'] = _type
    if desc is not None:
        self.inputs[name]['desc'] = desc

delete_input(name)

Deletes an input from the component by name. Once the package has been created with the given component, it is recommended not to change the component directly. Instead, all necessary methods to change it are available through the package to avoid component inconsistencies. It is recommended to use package.delete_input_wire(...) with default parameter with_input=True.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the input to be deleted.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def delete_input(self, name: str):
    """
    Deletes an input from the component by name.
    Once the package has been created with the given component, it is recommended not to change the component directly.
    Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
    It is recommended to use `package.delete_input_wire(...)` with default parameter `with_input=True`.

    Args:
        name (str): Name of the input to be deleted.
    """
    if name not in self.inputs:
        raise AssertionError(f"Component '{self.name}' has no input '{name}'")
    self.inputs.pop(name)

add_output(name, _type, desc=None)

Adds a new output to the component.

Types supported by AI Inference Server version 1.6 are contained in the type_dictionary. Newer AI Inference server version may support additional types. In case the type is not known by the AI SDK, a warning message will be printed. The most frequently used types are - String: Typically used for data to be sent to Databus - Object: Typically used for images to be sent to ZMQ Connector - Numeric scalar types: Typically used for data sent to S7 Connector

For outputs of type Object the entrypoint must return with a dictionary containing two fields, where one field has type str and the other field has type bytes. The example below shows the required format, assuming that 'image' is a PIL Image.

    "processedImage": {
        "metadata": json.dumps( {
                        "resolutionWidth": image.width,
                        "resolutionHeight": image.height
                        }
                    ),
        "bytes": image.tobytes()
    }

Parameters:

NameTypeDescriptionDefault
namestr

Name of the new output.

required
_typestr

Type of the new output.

required
descstr

Description of the output. (optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def add_output(self, name: str, _type: str, desc: Optional[str] = None):
    """
    Adds a new output to the component.

    Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
    In case the type is not known by the AI SDK, a warning message will be printed.
    The most frequently used types are
    - String:
        Typically used for data to be sent to Databus
    - Object:
        Typically used for images to be sent to ZMQ Connector
    - Numeric scalar types:
        Typically used for data sent to S7 Connector

    For outputs of type `Object` the entrypoint must return with a `dictionary` containing two fields, where one field has type `str` and the other field has type `bytes`.
    The example below shows the required format, assuming that 'image' is a PIL Image.
    ```python
        "processedImage": {
            "metadata": json.dumps( {
                            "resolutionWidth": image.width,
                            "resolutionHeight": image.height
                            }
                        ),
            "bytes": image.tobytes()
        }
    ```

    Args:
        name (str): Name of the new output.
        _type (str): Type of the new output.
        desc (str): Description of the output. (optional)
    """
    if self.outputs is None:
        self.outputs = {}
    if name in self.outputs:
        raise AssertionError(f"Output '{name}' already exists")
    if name.lower() in self.reserved_names:
        raise AssertionError(f"Output '{name}' is a reserved keyword.")
    flags = type_validation_flags(_type)
    if flags:
        flags_str = ", ".join([f.value for f in flags])
        _logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
    self.outputs[name] = {
        "type": _type,
    }
    if desc is not None:
        self.outputs[name]['desc'] = desc

change_output(name, _type, desc=None)

Changes one of the outputs of the component.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the output to be changed.

required
_typestr

The new type of the output.

required
descstr

Description of the output. (optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def change_output(self, name: str, _type: str, desc: Optional[str] = None):
    """
    Changes one of the outputs of the component.

    Args:
        name (str): Name of the output to be changed.
        _type (str): The new type of the output.
        desc (str): Description of the output. (optional)
    """
    if name not in self.outputs:
        raise AssertionError(f"There is no output with name '{name}'")
    flags = type_validation_flags(_type)
    if flags:
        flags_str = ", ".join([f.value for f in flags])
        _logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
    self.outputs[name]['type'] = _type
    if desc is not None:
        self.outputs[name]['desc'] = desc

delete_output(name)

Deletes an output from the component by name. Once the package has been created with the given component, it is recommended not to change the component directly. Instead, all necessary methods to change it are available through the package to avoid component inconsistencies. Deleting an output which is represented in any wire will cause package inconsistency.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the output to be deleted.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def delete_output(self, name: str):
    """
    Deletes an output from the component by name.
    Once the package has been created with the given component, it is recommended not to change the component directly.
    Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
    Deleting an output which is represented in any wire will cause package inconsistency.

    Args:
        name (str): Name of the output to be deleted.
    """
    if name not in self.outputs:
        raise AssertionError(f"Component '{self.name}' has no output '{name}'")
    self.outputs.pop(name)

validate()

Empty method for child classess to implement.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def validate(self):
    """
    Empty method for child classess to implement.
    """
    pass

save(destination, validate)

Empty method for child classess to implement.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def save(self, destination, validate):
    """
    Empty method for child classess to implement.
    """
    pass

GPURuntimeComponent

Bases: Component

The GPURuntimeComponent is used to define a component that runs on a GPU device. The component works only with ONNX models and can be used in an Inference Pipeline.

Attributes:

NameTypeDescription
namestr

Component name.

versionstr

Component version.

descstr

Component description.

Methods:

NameDescription
use_model

Path | str, max_batch_size: int, optimization: Optional[model_config.TensorRTOptimization] = None, warmup: model_config.Warmup = None): Add an ONNX model file for the component.

use_config

Path | str): Use a custom config.pbtxt file instead of the autogenerated one.

save

Path | str, validate = False): Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
class GPURuntimeComponent(Component):
    """
    The GPURuntimeComponent is used to define a component that runs on a GPU device.
    The component works only with ONNX models and can be used in an Inference Pipeline.

    Attributes:
        name (str): Component name.
        version (str): Component version.
        desc (str): Component description.

    Methods:
        use_model(self, path: Path | str, max_batch_size: int, optimization: Optional[model_config.TensorRTOptimization] = None, warmup: model_config.Warmup = None): Add an ONNX model file for the component.
        use_config(self, path: Path | str): Use a custom config.pbtxt file instead of the autogenerated one.
        save(self, destination: Path | str, validate = False): Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
    """

    def __init__(self, name: str = "inference", version: str = "1", desc: str = ""):
        """
        Creates a new, empty GPU Runtime component.

        Args:
            name (str): Component name. (default: inference)
            version (str): Component version. (default: 1)
            desc (str): Component description (optional)
        """
        super().__init__(name=name, desc=desc)
        self.version: str = version
        self.entrypoint: Path | None = None
        self.model_path: Path | None = None
        self.model_version: str = "1"
        self.config: Path | None = None
        self.auto_config = None
        self.warmup: bool = False
        self.optimization: bool = False

    def _to_dict(self):
        return {
            **super()._to_dict(),
            'version': self.version,
            'entrypoint': f"{self.model_version}/{self.entrypoint.name}",
            'hwType': 'GPU',
            'warmup': self.warmup,
            'optimization': self.optimization,
            'runtime': {
                'type': 'gpuruntime',
                'version': '0.1.0',
            }
        }

    def use_model(self, path: Path | str,
                  max_batch_size: int,
                  optimization: model_config.TensorRTOptimization | None = None,
                  warmup: model_config.Warmup | bool | None = None):
        """
        Add the ONNX model file for the component.

        Args:
            path (Path | str): The path to the ONNX model file.
            max_batch_size (int): The maximum batch size for the model.
            optimization (model_config.TensorRTOptimization, optional): The optimization configuration for the model. Defaults to None.
            warmup (model_config.Warmup, optional): The warmup configuration for the model. Defaults to None.

        Raises:
            AssertionError: If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0.

        """
        path = Path(path)

        if not path.is_file():
            raise AssertionError(f"specified model file not found: '{path}'")

        if path.suffix != ".onnx":
            raise AssertionError(f"model file extension is not '.onnx': '{path}'")

        if max_batch_size < 0:
            raise AssertionError("max_batch_size must be greater or equal to 0")

        self.entrypoint = Path("model.onnx")
        self.model_path = path
        if self.config is not None:
            _logger.warning("Previously added configuration was removed. Component will use the default configuration unless you specify your own.")
        self.config = None

        # Removing old automatic variables
        if self.auto_config is not None:
            for var in self.auto_config.inputs:
                self.delete_input(var["name"])
            for var in self.auto_config.outputs:
                self.delete_output(var["name"])

        model_warmup = warmup
        if isinstance(warmup, bool):
            self.warmup = warmup
            model_warmup = model_config.Warmup.ZERO_DATA if warmup else model_config.Warmup.DISABLED
        elif isinstance(warmup, model_config.Warmup):
            self.warmup = warmup != model_config.Warmup.DISABLED
        else:
            self.warmup = False
            model_warmup = model_config.Warmup.DISABLED

        self.optimization = isinstance(optimization, model_config.TensorRTOptimization)

        self.auto_config = model_config.ModelConfig(onnx_path=path,
                                                    max_batch_size=max_batch_size,
                                                    warmup=model_warmup,
                                                    optimization=optimization)
        for var in self.auto_config.inputs:
            self.add_input(var["name"], var["type"])
        for var in self.auto_config.outputs:
            self.add_output(var["name"], var["type"])

    def use_config(self, path: Path | str):
        """
        Sets the configuration file to be used for inference.
        Intended usage is to use a custom configuration file instead of the autogenerated one.
        This way extra configurations can be added to the component, such as the execution accelerator.

        Args:
            path (Path | str): The path to the configuration file.

        Raises:
            AssertionError: If the specified config file is not found or has an invalid extension.

        """
        path = Path(path)

        if not path.is_file():
            raise AssertionError(f"specified config file not found: '{path}'")

        if path.suffix != ".pbtxt":
            raise AssertionError(f"config file extension is not '.pbtxt': '{path}'")

        config_data = _validate_and_load_gpuruntime_config(path)

        self.warmup = (0 < len(config_data.model_warmup))
        self.optimization = (0 < len(f"{config_data.optimization}"))

        self.config = path

    def save(self, destination: Path | str, validate = False):
        """
        Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.

        The component folder contains the following:

        - An `.onnx` model file
        - A `.pbtxt` configuration file

        Args:
            destination (path-like): Target directory to which the component will be saved.
        """
        if self.entrypoint is None:
            raise AssertionError("An ONNX model file must be specified before the component can be saved.")

        component_dir = Path(destination) / self.name
        component_dir.mkdir(parents = True, exist_ok = True)

        model_dir = component_dir / self.model_version
        model_dir.mkdir(exist_ok = True)

        shutil.copy(str(self.model_path), str(model_dir / "model.onnx"))

        if self.config is None:
            _logger.warning("Configuration was not specified. Model will be saved with default configuration.")
            (component_dir / "config.pbtxt").write_text(f"{self.auto_config}")
        else:
            shutil.copy(self.config, component_dir)

__init__(name='inference', version='1', desc='')

Creates a new, empty GPU Runtime component.

Parameters:

NameTypeDescriptionDefault
namestr

Component name. (default: inference)

'inference'
versionstr

Component version. (default: 1)

'1'
descstr

Component description (optional)

''
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def __init__(self, name: str = "inference", version: str = "1", desc: str = ""):
    """
    Creates a new, empty GPU Runtime component.

    Args:
        name (str): Component name. (default: inference)
        version (str): Component version. (default: 1)
        desc (str): Component description (optional)
    """
    super().__init__(name=name, desc=desc)
    self.version: str = version
    self.entrypoint: Path | None = None
    self.model_path: Path | None = None
    self.model_version: str = "1"
    self.config: Path | None = None
    self.auto_config = None
    self.warmup: bool = False
    self.optimization: bool = False

use_model(path, max_batch_size, optimization=None, warmup=None)

Add the ONNX model file for the component.

Parameters:

NameTypeDescriptionDefault
pathPath | str

The path to the ONNX model file.

required
max_batch_sizeint

The maximum batch size for the model.

required
optimizationTensorRTOptimization

The optimization configuration for the model. Defaults to None.

None
warmupWarmup

The warmup configuration for the model. Defaults to None.

None

Raises:

TypeDescription
AssertionError

If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def use_model(self, path: Path | str,
              max_batch_size: int,
              optimization: model_config.TensorRTOptimization | None = None,
              warmup: model_config.Warmup | bool | None = None):
    """
    Add the ONNX model file for the component.

    Args:
        path (Path | str): The path to the ONNX model file.
        max_batch_size (int): The maximum batch size for the model.
        optimization (model_config.TensorRTOptimization, optional): The optimization configuration for the model. Defaults to None.
        warmup (model_config.Warmup, optional): The warmup configuration for the model. Defaults to None.

    Raises:
        AssertionError: If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0.

    """
    path = Path(path)

    if not path.is_file():
        raise AssertionError(f"specified model file not found: '{path}'")

    if path.suffix != ".onnx":
        raise AssertionError(f"model file extension is not '.onnx': '{path}'")

    if max_batch_size < 0:
        raise AssertionError("max_batch_size must be greater or equal to 0")

    self.entrypoint = Path("model.onnx")
    self.model_path = path
    if self.config is not None:
        _logger.warning("Previously added configuration was removed. Component will use the default configuration unless you specify your own.")
    self.config = None

    # Removing old automatic variables
    if self.auto_config is not None:
        for var in self.auto_config.inputs:
            self.delete_input(var["name"])
        for var in self.auto_config.outputs:
            self.delete_output(var["name"])

    model_warmup = warmup
    if isinstance(warmup, bool):
        self.warmup = warmup
        model_warmup = model_config.Warmup.ZERO_DATA if warmup else model_config.Warmup.DISABLED
    elif isinstance(warmup, model_config.Warmup):
        self.warmup = warmup != model_config.Warmup.DISABLED
    else:
        self.warmup = False
        model_warmup = model_config.Warmup.DISABLED

    self.optimization = isinstance(optimization, model_config.TensorRTOptimization)

    self.auto_config = model_config.ModelConfig(onnx_path=path,
                                                max_batch_size=max_batch_size,
                                                warmup=model_warmup,
                                                optimization=optimization)
    for var in self.auto_config.inputs:
        self.add_input(var["name"], var["type"])
    for var in self.auto_config.outputs:
        self.add_output(var["name"], var["type"])

use_config(path)

Sets the configuration file to be used for inference. Intended usage is to use a custom configuration file instead of the autogenerated one. This way extra configurations can be added to the component, such as the execution accelerator.

Parameters:

NameTypeDescriptionDefault
pathPath | str

The path to the configuration file.

required

Raises:

TypeDescription
AssertionError

If the specified config file is not found or has an invalid extension.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def use_config(self, path: Path | str):
    """
    Sets the configuration file to be used for inference.
    Intended usage is to use a custom configuration file instead of the autogenerated one.
    This way extra configurations can be added to the component, such as the execution accelerator.

    Args:
        path (Path | str): The path to the configuration file.

    Raises:
        AssertionError: If the specified config file is not found or has an invalid extension.

    """
    path = Path(path)

    if not path.is_file():
        raise AssertionError(f"specified config file not found: '{path}'")

    if path.suffix != ".pbtxt":
        raise AssertionError(f"config file extension is not '.pbtxt': '{path}'")

    config_data = _validate_and_load_gpuruntime_config(path)

    self.warmup = (0 < len(config_data.model_warmup))
    self.optimization = (0 < len(f"{config_data.optimization}"))

    self.config = path

save(destination, validate=False)

Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.

The component folder contains the following:

  • An .onnx model file
  • A .pbtxt configuration file

Parameters:

NameTypeDescriptionDefault
destinationpath - like

Target directory to which the component will be saved.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def save(self, destination: Path | str, validate = False):
    """
    Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.

    The component folder contains the following:

    - An `.onnx` model file
    - A `.pbtxt` configuration file

    Args:
        destination (path-like): Target directory to which the component will be saved.
    """
    if self.entrypoint is None:
        raise AssertionError("An ONNX model file must be specified before the component can be saved.")

    component_dir = Path(destination) / self.name
    component_dir.mkdir(parents = True, exist_ok = True)

    model_dir = component_dir / self.model_version
    model_dir.mkdir(exist_ok = True)

    shutil.copy(str(self.model_path), str(model_dir / "model.onnx"))

    if self.config is None:
        _logger.warning("Configuration was not specified. Model will be saved with default configuration.")
        (component_dir / "config.pbtxt").write_text(f"{self.auto_config}")
    else:
        shutil.copy(self.config, component_dir)

PipelineData

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_data.py
class PipelineData:

    def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
        """
        A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
        The name and version will define together the name of the zip file when the package is saved.

        Args:
            name (str): Name of the package
            desc (str): Package description (optional)
            version (str): Version of the package
        """

        self.name = name
        self.desc = desc

        self.version = version
        self.init_version = version  # initial version; used when version is not set in Pipeline.export() and save()
        self.save_version = version  # contains the version determined at exporting/saving
        self.package_id: Optional[uuid.UUID] = None

        self.author = 'AI SDK'

        self.components = {}
        self.wiring = {}
        self.parameters: list[PipelineParameter] = []

        self.periodicity = None
        self.timeshift_reference = []

        self.inputs: list[PipelineVariable] = []
        self.outputs: list[PipelineVariable] = []
        self.log_level = logging.INFO

        self.cycle_time = None

    # Helper method returns index or None if the pipeline inputs/outputs contain the given variable
    def _get_io_index(self, io_list: list[PipelineVariable], variable: str) -> Optional[int]:
        for i, io in enumerate(io_list):
            if io.variableName == variable:
                return i
        return None

    def _get_io_variable(self, variable_name: str) -> Optional[PipelineVariable]:
        for io in self.inputs + self.outputs:
            if io.variableName == variable_name:
                return io
        return None

    def _get_parameter(self, parameter_name: str) -> Optional[PipelineParameter]:
        for param in self.parameters:
            if param.name == parameter_name:
                return param
        return None

    def _get_parameter_index(self, parameter_name: str) -> Optional[int]:
        for i, param in enumerate(self.parameters):
            if param.name == parameter_name:
                return i
        return None

__init__(name, version=None, desc='')

A newly initialized Pipeline will contain no Component or wire, just its name and version will be set. The name and version will define together the name of the zip file when the package is saved.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the package

required
descstr

Package description (optional)

''
versionstr

Version of the package

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_data.py
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
    """
    A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
    The name and version will define together the name of the zip file when the package is saved.

    Args:
        name (str): Name of the package
        desc (str): Package description (optional)
        version (str): Version of the package
    """

    self.name = name
    self.desc = desc

    self.version = version
    self.init_version = version  # initial version; used when version is not set in Pipeline.export() and save()
    self.save_version = version  # contains the version determined at exporting/saving
    self.package_id: Optional[uuid.UUID] = None

    self.author = 'AI SDK'

    self.components = {}
    self.wiring = {}
    self.parameters: list[PipelineParameter] = []

    self.periodicity = None
    self.timeshift_reference = []

    self.inputs: list[PipelineVariable] = []
    self.outputs: list[PipelineVariable] = []
    self.log_level = logging.INFO

    self.cycle_time = None

save_readme_html(pipeline, destination)

Saves a README.html in the destination folder that describes the pipeline.

Parameters:

NameTypeDescriptionDefault
destinationpath - like

Path of the destination folder.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_page.py
def save_readme_html(pipeline: PipelineData, destination: Union[str, os.PathLike]):
    """
    Saves a `README.html` in the `destination` folder that describes the pipeline.

    Args:
        destination (path-like): Path of the destination folder.
    """
    pipelinePage = _PipelinePage(pipeline)
    readme_html_path = Path(destination) / README_HTML
    readme_html_path.write_text(pipelinePage.__str__())

Pipeline

Bases: PipelineData

Pipeline represents a pipeline configuration package with Components and wires to provide a data flow on the AI Inference Server. The Components have inputs and outputs to transfer data to each other and the wires describe this data flow between them. The package also contains configuration files required to deploy a pipeline on an Industrial Edge device.

A newly initialized Pipeline does not contain any Component or wire, only its name and version will be set. The name and version together will define the name of the zip file when the package is saved.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the package

required
versionstr

Version of the package

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
class Pipeline(PipelineData):
    """
    `Pipeline` represents a pipeline configuration package with `Components` and wires to provide a data flow on the AI Inference Server.
    The `Components` have inputs and outputs to transfer data to each other and the wires describe this data flow between them.
    The package also contains configuration files required to deploy a pipeline on an Industrial Edge device.

    A newly initialized `Pipeline` does not contain any `Component` or wire, only its name and version will be set.
    The name and version together will define the name of the zip file when the package is saved.

    Args:
        name (str): Name of the package
        version (str): Version of the package
    """
    _wire_hash_string = "{}.{} -> {}.{}"

    def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
        """
        A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
        The name and version will define together the name of the zip file when the package is saved.

        Args:
            name (str): Name of the package
            desc (str): Package description (optional)
            version (str): Version of the package
        """

        super().__init__(name, version, desc)

        self.report_writer = PipelineReportWriter()
        report_writer_handler = ReportWriterHandler(self.report_writer)
        _logger.addHandler(report_writer_handler)
        _python_dependencies_logger.addHandler(report_writer_handler)
        _wheelhouse_logger.addHandler(report_writer_handler)

    def _set_log_level(self, log_level: int):
        self.log_level = log_level
        _logger.setLevel(self.log_level)

    @staticmethod
    def from_components(components: list, name: str, version: Optional[str] = None, desc: str = "") -> "Pipeline":
        """
        Creates a pipeline configuration from the given components.
        The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components.
        The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs.
        The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.

        Args:
            components (list): List of PythonComponents
            name (str): Name of the pipeline
            version (str): Version information of the pipeline. (Optional)
        Returns:
            Pipeline: Pipeline object with the auto-wired components
        """
        pipeline = Pipeline(name, version, desc=desc)

        first_component = components[0]
        pipeline.add_component(first_component)
        pipeline.inputs = [PipelineVariable(first_component.name, component_input) for component_input in first_component.inputs]
        pipeline.outputs = [PipelineVariable(first_component.name, output) for output in first_component.outputs]

        for component in components[1:]:
            pipeline.add_component(component)
            for t_output in pipeline.outputs:
                try:
                    pipeline.add_wiring(t_output.componentName, t_output.variableName, component.name, t_output.variableName)
                except Exception as e:
                    _logger.warning(f"Output variable {t_output.componentName}.{t_output.variableName} couldn't be auto-wired.\nCause: {e}")

            unwired_variables = [f'{component.name}.{x}' for x in component.inputs if not any(s.endswith(f'{component.name}.{x}') for s in pipeline.wiring)]
            if len(unwired_variables) > 0:
                for variable in unwired_variables:
                    _logger.warning(f"Input variable {variable} couldn't be auto-wired.\n")
            pipeline.outputs = [PipelineVariable(component.name, output) for output in component.outputs]

        # ImageSet is not allowed in pipeline outputs
        for i, o in enumerate(pipeline.outputs):
            if pipeline.components[o.componentName].outputs[o.variableName]["type"].lower() == 'imageset':
                _logger.warning(f"Output variable {o.componentName}.{o.variableName} of type 'ImageSet' can not be added as pipeline output.")
                pipeline.outputs.pop(i)

        return pipeline

    def __repr__(self) -> str:
        """
        Textual representation of the configured package.
        The method shows the `Components` with their inputs, outputs and parameters as well as the wiring between these `Components`.

        Returns:
            [str]: Textual representation of the package
        """
        version = self.save_version if self.save_version is not None else self.init_version

        text = f"[{self.__class__.__name__}] {self.name} ({version})\n"
        if self.desc != "":
            text += f"{self.desc}\n"

        if len(self.parameters) > 0:
            text += "\nPipeline Parameters:\n"
            for parameter in self.parameters:
                text += f"- {parameter.name} ({parameter.dtype}, default: '{parameter.defaultValue}'){(': ' + parameter.description) if parameter.description is not None else ''}\n"

        if len(self.inputs) > 0:
            text += "\nPipeline Inputs:\n"
            for p_input in self.inputs:
                c_input = self.components[p_input.componentName].inputs[p_input.variableName]
                text += f"> {p_input.variableName} ({c_input['type']}){': ' + c_input['desc'] if c_input.get('desc') is not None else ''}\n"

        if len(self.outputs) > 0:
            text += "\nPipeline Outputs:\n"
            for p_output in self.outputs:
                c_output = self.components[p_output.componentName].outputs[p_output.variableName]
                text += f"< {p_output.variableName} ({c_output['type']}){': ' + c_output['desc'] if c_output.get('desc') is not None else ''}\n"

        metrics = [(name, metric, component_name) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for name, metric in component.metrics.items()]
        if len(metrics) > 0:
            text += "\nMetrics:\n"
            for name, metric, _ in metrics:
                text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"

        if len(self.wiring) > 0:
            text += "\nI/O Wiring:\n"
            for p_input in self.inputs:
                text += f"  {p_input.variableName} -> {p_input.componentName}.{p_input.variableName}\n"
            for wire_hash in self.wiring:
                text += f"  {wire_hash}\n"
            for p_output in self.outputs:
                text += f"  {p_output.componentName}.{p_output.variableName} -> {p_output.variableName}\n"
            for name, metric, component_name in metrics:
                text += f"  {component_name}.{name} -> {name}\n"

        if self.periodicity is not None:
            text += "\nTimeshifting:\n"
            text += f"  Periodicity: {self.periodicity} ms\n"
            if len(self.timeshift_reference) > 0:
                text += "  References:\n"
                for ref in self.timeshift_reference:
                    text += f"  - {ref}\n"

        if self.cycle_time is not None:
            text += f"\nCycle Time: {self.cycle_time} ms\n"

        for component in self.components.values():
            text += "\n" + component.__repr__()

        return text

    def add_input(self, component, variable):
        """
        Defines an input variable on the given component as a pipeline input.

        Args:
            component (str): Name of the component
            variable (str): Name of the input variable
        """
        try:
            _ = self.components[component].inputs[variable]
        except KeyError:
            raise AssertionError("The component with input variable must exist in the pipeline.")

        if self.inputs is None:
            self.inputs = []

        if self._get_io_index(self.inputs, variable) is not None:
            raise AssertionError("The pipeline input already exists.")

        self.inputs.append(PipelineVariable(componentName=component, variableName=variable))

    def delete_input(self, component: str, variable: str):
        """
        Deletes a pipeline input.

        Args:
            component (str): Name of the component
            variable (str): Name of the input variable
        """

        if (idx := self._get_io_index(self.inputs, variable)) is None:
            raise AssertionError("The pipeline input does not exist.")

        self.inputs.pop(idx)

    def add_output(self, component, variable):
        """
        Defines an output variable on the given component as a pipeline output.

        Args:
            component (str): Name of the component
            variable (str): Name of the output variable

        """
        try:
            output = self.components[component].outputs[variable]
        except KeyError:
            raise AssertionError("The component with output variable must exist in the pipeline.")

        if self.outputs is None:
            self.outputs = []

        if self._get_io_index(self.outputs, variable) is not None:
            raise AssertionError("The pipeline output already exists.")

        if output["type"].lower() == 'imageset':
            _logger.warning("Output variables of type 'ImageSet' can not be added as pipeline outputs.")

        self.outputs.append(PipelineVariable(componentName=component, variableName=variable))

    def delete_output(self, component: str, variable: str):
        """
        Deletes a pipeline output.

        Args:
            component (str): Name of the component
            variable (str): Name of the output variable

        """

        if (idx := self._get_io_index(self.outputs, variable)) is None:
            raise AssertionError("The pipeline output does not exist.")

        self.outputs.pop(idx)

    def add_component(self, component: Component):
        """
        Adds a `Component` to the pipeline configuration without any connection.
        The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.

        Args:
            component (Component): `Component` to be added
        """

        if component.name in self.components:
            raise AssertionError(f"Component with name {component.name} already exists. Please rename the component.")
        self.components[component.name] = component

    def add_wiring(self, from_component: str, from_output: str, to_component: str, to_input: str):
        """
        Creates a one-to-one connection between the input and output of two components.
        The method checks if the connection is allowed with the following requirements:

        - The components exist with the given inputs/outputs
        - The given inputs and outputs are not connected to any wire
        - The types of the connected input and output are compatible

        Args:
            from_component (str): Name of the component which provides data to the `to_component`
            from_output (str): Name of the output variable of the `from_component`
            to_component (str): Name of the component which consumes data from the `from_component`
            to_input (str): Name of the input variable of the `to_component`
        """
        if from_component not in self.components:
            raise AssertionError(f"No component named '{from_component}'")
        if to_component not in self.components:
            raise AssertionError(f"No component named '{to_component}'")
        if from_output not in self.components[from_component].outputs:
            raise AssertionError(f"Component '{from_component}' has no output named '{from_output}'")
        if to_input not in self.components[to_component].inputs:
            raise AssertionError(f"Component '{to_component}' has no input named '{to_input}'")
        if self.get_wire_for_input(to_component, to_input) is not None:
            raise AssertionError(f"Input '{to_input}' of component '{to_component}' is already wired")

        _output_type = self.components[from_component].outputs[from_output]["type"]
        _input_type = self.components[to_component].inputs[to_input]["type"]
        if _output_type != _input_type:
            raise AssertionError("Output and input types do not match")

        wire_hash = self._wire_hash_string.format(from_component, from_output, to_component, to_input)
        self.wiring[wire_hash] = {
            "fromComponent": from_component,
            "fromOutput": from_output,
            "toComponent": to_component,
            "toInput": to_input,
        }

    def get_wire_for_output(self, component_name: str, output_name: str) -> Optional[dict]:
        """
        Searches for the wire which connects a component with `component_name` as data provider through its output with name output_name.

        Args:
            component_name (str): Name of the data provider component.
            output_name (str): Name of the output variable of `component_name`.

        Returns:
            Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
        """
        wires = [x for x in self.wiring.values() if x["fromComponent"] == component_name and x["fromOutput"] == output_name]
        return wires[0] if wires else None

    def get_wire_for_input(self, component_name: str, input_name: str) -> Optional[dict]:
        """
        Searches for the wire which connects a component with `component_name` as data consumer through its input with name `input_name`.

        Args:
            component_name (str): Name of the data consumer component.
            input_name (str): Name of the input variable of `component_name`.

        Returns:
            Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
        """
        wires = [x for x in self.wiring.values() if x["toComponent"] == component_name and x["toInput"] == input_name]
        return wires[0] if wires else None

    def delete_input_wire(self, component: str, variable: str, with_input: bool = True) -> None:
        """
        Deletes an existing connection between two components.
        The connection must be given with the name of the consumer component and its input variable.
        If an inter signal alignment reference variable is affected it cannot be deleted.
        By default, the input variable will be also deleted.

        Args:
            component (str): Name of the component which has the input given the name variable
            variable (str): Name of the input variable on the component which connected by the wire
            with_input (bool, optional): If set, the input variable will be also deleted from the component. Defaults to True.

        Raises:
            AssertionError: When the variable acts as inter signal alignment reference, it cannot be deleted, and an `AssertionError` will be raised.
        """
        wire = self.get_wire_for_input(component, variable)
        if wire is None:
            raise AssertionError(f"There is no wiring for input '{variable}' of component '{component}'")
        if variable in self.timeshift_reference:
            raise AssertionError("Inter signal alignment reference variables can not be deleted.")
        wire_hash = self._wire_hash_string.format(wire['fromComponent'], wire['fromOutput'], wire['toComponent'], wire['toInput'])
        self.wiring.pop(wire_hash)

        if with_input:
            self.components[component].delete_input(variable)

    def add_dependencies(self, packages: list) -> None:
        """
        @Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis.
        Collects the given Python packages with their versions from the executing Python environment and add them to all components of type `PythonComponent`.
        This step is necessary in order to execute the pipeline configuration on the Edge side.
        The method can be called multiple times but each time the previously-collected dependencies are cleared.
        The reason for this is to ensure a consistent dependency list for the `requirements.txt` file when the package is saved.

        Args:
            packages (list): List of the necessary python packages to execute the script defined by self.entrypoint
        """
        python_components = [self.components[name] for name in self.components if type(self.components[name]) is PythonComponent]
        for component in python_components:
            component.add_dependencies(packages)

    def set_timeshifting_periodicity(self, periodicity: int) -> None:
        """
        Enables inter-signal alignment with the given sampling period.

        With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model.
        By default, `startingPoint` property is set to `First timestamp`, which means that inter-signal alignment is started at the
        first incoming value for any input variable.

        This property can be changed to `Signal reference` by adding inter-signal alignment reference variables
        via the `add_timeshifting_reference(..)` method. In this case, inter-signal alignment is started when the first value arrives
        for the defined input variables.

        Args:
            periodicity (int): Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31).
        """

        periodicity = int(periodicity)
        if periodicity not in range(10, int(math.pow(2, 31))):
            raise AssertionError("Inter signal alignment periodicity must be an integer and in range [10, 2^31)")

        self.periodicity = periodicity
        _logger.info(f"Inter signal alignment periodicity has been set to {self.periodicity}.")

    def add_timeshifting_reference(self, reference: str) -> None:
        """
        Enables signal alignment mode `Signal reference` by declaring input variables as reference variables.

        Args:
            reference (str): Variable name to be added to `self.timeshift_reference` list.
        """
        if reference not in [i.variableName for i in self.inputs]:
            raise AssertionError(f"There is no input variable defined with name '{reference}'")
        if reference in self.timeshift_reference:
            _logger.warning(f"Reference variable with name '{reference}' has been already added.")
            return
        self.timeshift_reference.append(reference)

    def remove_timeshifting_reference(self, reference: str) -> None:
        """
        Removes previously-defined inter-signal alignment reference variables.
        If no reference variables remain, the `startingPoint` will be `First timestamp`.

        Args:
            reference (str): Variable name to be removed from `self.timeshift_reference` list.
        """
        if reference not in self.timeshift_reference:
            raise AssertionError(f"Reference variable with name {'reference'} does not exist.")
        self.timeshift_reference.remove(reference)

    def set_cycle_time(self, cycle_time_usec: int | None):
        """
        Sets the cycle time for the pipeline.

        Args:
            cycle_time_usec (int | None): Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000.
        """
        self.cycle_time = cycle_time_usec
        self._check_cycle_time()

    def get_pipeline_config(self) -> dict:
        """
        Saves the information on the composed pipeline configuration package into a YAML file.

        This YAML file describes the components and the data flow between them for the AI Inference Server.
        The file is created in the `destination` folder with name `pipeline_config.yml`
        """
        version = self.save_version if self.save_version is not None else self.init_version

        metric_fields = [(component_name, field) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for field in component.metrics.keys()]

        pipeline_inputs = [
            i.__dict__(self.components[i.componentName].inputs)
            for i in self.inputs
        ]

        pipeline_outputs = [
            {**o.__dict__(self.components[o.componentName].outputs), 'metric': False}
            for o in self.outputs
        ]

        pipeline_outputs += [{
            'name': field,
            'type': 'String',
            'metric': True,
            'topic': f"/siemens/edge/aiinference/{self.name}/{version}/metrics/{component_name}/{field}",
        } for component_name, field in metric_fields]

        pipeline_dag = [{
            'source': f'Databus.{i.variableName}',
            'target': f'{i.componentName}.{i.variableName}',
        } for i in self.inputs]

        pipeline_dag += [{
            'source': f"{wire['fromComponent']}.{wire['fromOutput']}",
            'target': f"{wire['toComponent']}.{wire['toInput']}",
        } for wire in self.wiring.values()]

        pipeline_dag += [{
            'source': f'{o.componentName}.{o.variableName}',
            'target': f'Databus.{o.variableName}',
        } for o in self.outputs]

        pipeline_dag += [{
            'source': f'{component_name}.{field}',
            'target': f'Databus.{field}',
        } for component_name, field in metric_fields]

        config_yml_content = {
            'fileFormatVersion': '1.2.0',
            'dataFlowPipelineInfo': {
                'author': self.author,
                'createdOn': datetime.now(),
                'dataFlowPipelineVersion': version,
                'description': self.desc if self.desc else 'Created by AI SDK',
                'projectName': self.name,
                'packageId': str(self.package_id)
            },
            'dataFlowPipeline': {
                'components': [component._to_dict() for component in self.components.values()],
                'pipelineDag': pipeline_dag,
                'pipelineInputs': pipeline_inputs,
                'pipelineOutputs': pipeline_outputs,
            },
            'packageType': 'full'
        }
        if len(self.parameters) > 0:
            config_yml_content["dataFlowPipeline"]["pipelineParameters"] = [
                param.__param_dict__()
                for param in self.parameters
            ]
        if self.cycle_time is not None:
            config_yml_content["dataFlowPipeline"]["cycle_time"] = self.cycle_time

        return config_yml_content

    def save_pipeline_config(self, destination):
        """
        Saves the information about the composed pipeline configuration package into a YAML file.

        This YAML file describes the components and the data flow between them for AI Inference Server.
        The file will be created in the `destination` folder with name `pipeline_config.yml`

        Args:
            destination (path-like): Path of the `destination` directory.
        """

        with open(Path(destination) / PIPELINE_CONFIG, "w") as f:
            yaml.dump(self.get_pipeline_config(), f)

    def get_datalink_metadata(self) -> dict:
        """
        The method generates metadata information based on available information.

        Returns:
            dict: Dictionary with the necessary information for the AI Inference Server.
        """

        timeshifting = {
            "id": None,
            "enabled": False,
            "periodicity": self.periodicity,
            "startingPoint": None,
        }

        if self.periodicity is not None:
            timeshifting["enabled"] = True
            timeshifting["startingPoint"] = 'First timestamp'

        if len(self.timeshift_reference) > 0:
            timeshifting["startingPoint"] = 'Signal reference'

        exported_metadata = {
            "fileFormatVersion": "1.0.0",
            "id": None,
            "version": None,
            "createdOn": datetime.now(),
            "updatedOn": datetime.now(),
            "timeShifting": timeshifting,
            "inputs": [
                {
                    'name': input.variableName,
                    'mapping': None,
                    'timeShiftingReference': input.variableName in self.timeshift_reference,
                    'type': self.components[input.componentName].inputs[input.variableName]['type']
                } for input in self.inputs
            ]
        }
        return exported_metadata

    def save_datalink_metadata(self, destination):
        """
        Saves metadata for pipeline input variables.
        This method saves metadata for the AI Inference Server into a YAML file.
        This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment.
        The file is created in the `destination` folder with the name `datalink_metadata.yml`

        Args:
            destination (path-like): Path of the destination directory.
        """
        with open(Path(destination) / DATALINK_METADATA, "w") as f:
            yaml.dump(self.get_datalink_metadata(), f)

    def save_telemetry_data_if_consented(self, destination: Path):
        try:
            Path(destination / TELEMETRY_YAML).unlink(missing_ok=True)
            # checking whether the user has made a statement about telemetry collection
            if is_telemetry_allowed():
                self.save_telemetry_data(destination)
        except RuntimeError as consent_is_undecided:
            raise consent_is_undecided

    def save_telemetry_data(self, destination: Path):
        """
        Save telemetry data to a specified destination.

        Args:
            destination (Path): The path where the telemetry data should be saved.
        """

        telemetry_path = destination / TELEMETRY_YAML
        telemetry_data = {}

        try:
            consent_allowed = is_telemetry_allowed()
        except RuntimeError:
            consent_allowed = "undecided"

        try:
            consent_data, _ = read_telemetry_consent_file()
        except BaseException:
            consent_data = {}

        telemetry_data["telemetry_consent"] = {}
        telemetry_data["telemetry_consent"][CONSENT_KEY] = consent_allowed
        telemetry_data["telemetry_consent"][CONSENT_KEY_SDK] = consent_data.get(CONSENT_KEY_SDK, 'unknown')
        telemetry_data["telemetry_consent"][CONSENT_KEY_TS] = consent_data.get(CONSENT_KEY_TS, 'unknown')
        telemetry_data["telemetry_consent"][CONSENT_KEY_METHOD] = consent_data.get(CONSENT_KEY_METHOD, get_consent_source())

        telemetry_data["platform"] = {}
        telemetry_data["platform"]["os"] = platform.system()
        telemetry_data["platform"]["release"] = platform.release()
        telemetry_data["platform"]["python_version"] = platform.python_version()

        _logger.info(f"locals: {locals()}")
        telemetry_data["environment"] = {}
        env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
        env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
        env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
        env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
        telemetry_data["environment"]["info"] = env_info

        telemetry_data["industrial_ai"] = {}
        try:
            telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
        except importlib_metadata.PackageNotFoundError:
            telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
            _logger.debug("simaticai package not found")

        telemetry_data["pipeline"] = {}
        telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component].python_version for component in self.components
                                                                 if isinstance(self.components[component], PythonComponent)))
        telemetry_data["pipeline"]["file_extensions"] = list(set(f.suffix for f in Path(destination).rglob("*")
                                                                 if f.suffix not in ["", ".zip", ".yml", ".yaml", ".html"]))
        with open(telemetry_path, 'w') as telemetry_file:
            yaml.dump(telemetry_data, telemetry_file)

    def validate(self, destination: str | Path = "."):
        """
        Validates whether the package configuration is compatible with the expected runtime environment.

        The method verifies:

        - If the package has at least one component
        - If all wires create connections between existing components and their variables
        - If metadata is defined and valid.
        - If a package with the same name already exists in the `destination` folder. In this case a warning message appears and the `save(..)` method overwrites the existing package.
        - If the package has multiple components and if they are using the same Python version

        Args:
            destination (str, optional): Path of the expected destination folder. Defaults to ".".
        """
        version = self.save_version if self.save_version is not None else self.init_version

        if len(self.components) < 1:
            raise AssertionError("The package must have at least one component.")

        for p_output in self.outputs:
            if self.components[p_output.componentName].batch.outputBatch:
                raise AssertionError(f"The component '{p_output.componentName}' has pipeline output defined with variable name '{p_output.variableName}'. \
                                      None of component with pipeline output is allowed to provide batch output.")

        for wire_hash in self.wiring.copy():
            wire = self.wiring[wire_hash]
            self._check_wiring(wire, wire_hash)

        pipeline_inputs = [p_input.variableName for p_input in self.inputs]
        pipeline_outputs = [p_output.variableName for p_output in self.outputs]
        if any(variable in pipeline_outputs for variable in pipeline_inputs):
            conflicts = set(pipeline_inputs).intersection(set(pipeline_outputs))
            raise AssertionError(f"Pipeline input and output variables must be unique. Conflicting variables: {conflicts}")

        self._check_timeshifting()
        self._check_cycle_time()
        self._check_supported_types_for_connections()

        package_path = Path(destination) / f"{self.name}_{version}".replace(" ", "-")
        if package_path.is_dir():
            _logger.warning(f"Target folder ({package_path}) already exists! Unless changing the package name the package could be invalid and your files will be overwritten!")

        python_versions = set()

        for component in self.components:
            self.components[component].validate()

            if isinstance(self.components[component], PythonComponent):
                python_versions.add(self.components[component].python_version)

        if (1 < len(python_versions)):
            _logger.warning("The use of multiple python version in a single pipeline is not recommended. We recommend using only one of the supported versions, which are Python 3.11 or 3.12.")

        _logger.info(f"Package '{self.name}' is valid and ready to save.")

    def _check_timeshifting(self):
        if len(self.timeshift_reference) > 0 and self.periodicity is None:
            raise AssertionError("When using inter signal alignment reference variables, the periodicity must be set.")

    def _check_supported_types_for_connections(self):
        for p_input in self.inputs:
            variable_type = self.components[p_input.componentName].inputs[p_input.variableName]['type']
            warnings, errors = p_input.check_connection_type_compatibility(variable_type)
            warnings_str = ", ".join(warnings)
            errors_str = ", ".join(errors)
            if warnings:
                _logger.warning(f"Input variable `{p_input.variableName}` of component `{p_input.componentName}` is of type "
                                f"`{variable_type}` which is not supported for connection type `{p_input.connection.cptype.name}`. "
                                f"Issues: {warnings_str}.")
            if errors:
                _logger.error(f"Input variable `{p_input.variableName}` of component `{p_input.componentName}` is of type "
                              f"`{variable_type}` which is not supported for connection type `{p_input.connection.cptype.name}`. "
                              f"Issues: {errors_str}.")

        for p_output in self.outputs:
            variable_type = self.components[p_output.componentName].outputs[p_output.variableName]['type']
            warnings, errors = p_output.check_connection_type_compatibility(variable_type)
            warnings_str = ", ".join(warnings)
            errors_str = ", ".join(errors)
            if warnings:
                _logger.warning(f"Output variable `{p_output.variableName}` of component `{p_output.componentName}` is of type "
                                f"`{variable_type}` which is not supported for connection type `{p_output.connection.cptype.name}`. "
                                f"Issues: {warnings_str}.")
            if errors:
                _logger.error(f"Output variable `{p_output.variableName}` of component `{p_output.componentName}` is of type "
                              f"`{variable_type}` which is not supported for connection type `{p_output.connection.cptype.name}`. "
                              f"Issues: {errors_str}.")
        for p_param in self.parameters:
            if isinstance(p_param, SecretPipelineParameter):
                continue
            if not p_param.topicBased:
                continue
            if not p_param.has_valid_connection():
                _logger.error(f"Pipeline parameter `{p_param.name}` is of type `{p_param.dtype}` which is not supported for connection type `{p_param.connection.cptype.name}`.")

    def _check_wiring(self, wire, wire_hash):
        error_messages = []
        if wire['fromComponent'] not in self.components:
            error_messages.append(f"From component {wire['fromComponent']} does not exist")
        if wire['toComponent'] not in self.components:
            error_messages.append(f"To component {wire['toComponent']} does not exist")
        if wire['fromOutput'] not in self.components[wire['fromComponent']].outputs:
            error_messages.append(f"Output variable {wire['fromOutput']} does not exist on component {wire['fromComponent']}")
        if wire['toInput'] not in self.components[wire['toComponent']].inputs:
            error_messages.append(f"Input variable {wire['toInput']} does not exist on component {wire['toComponent']}")
        if len(error_messages) == 0:
            from_type_ = self.components[wire['fromComponent']].outputs[wire['fromOutput']]['type']
            to_type_ = self.components[wire['toComponent']].inputs[wire['toInput']]['type']
            if from_type_ != to_type_:
                error_messages.append(f"The types of input and output variables does not match for wiring {wire_hash}.")
        if len(error_messages) > 0:
            self.wiring.pop(wire_hash)
            error_messages.append("The wire has been deleted, please check the variables and re-create the connection.")
            raise AssertionError(error_messages.__str__())

    def _check_cycle_time(self) -> bool:
        if self.cycle_time is None:
            return True
        if not isinstance(self.cycle_time, int) or self.cycle_time < 1:
            _logger.error(cycle_time_error_message)
            return False
        if self.cycle_time < 1_000 or 1_000_000 < self.cycle_time:
            _logger.warning(cycle_time_error_message)
            return False
        return True

    def save(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
        """
        @Deprecated, reason: only edge package generation will be supported in the future. Use export instead.

        Saves the assembled package in a zip format.
        The name of the file is defined as `{package_name}_{package_version}.zip`.
        If a file with such a name already exists in the `destination` folder, it gets overwritten and a warning message appears.

        The package is also available as a subfolder on the destination path with the name `{package_name}_{package_version}`.
        If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.

        The package contains files and folders in the following structure:

        - Package folder with name `{package_name}_{package_version}`
            - `datalink-metadata.yml`
            - `pipeline-config.yml`
            - Component folder with name `{component_name}`

            When the component is a `PythonComponent`, this folder contains:

            - `requirements.txt`
            - Entrypoint script defined by the entrypoint of the component
            - Extra files as added to the specified folders
            - Source folder with name `src` with necessary python scripts

        If a package ID is specified, and a package with the same ID and version is already present in the `destination` folder,
        an error is raised.

        Args:
            destination (str, optional): Target directory for saving the package. Defaults to ".".
            package_id (UUID): The optional package ID. If None, a new UUID is generated.
        """

        try:
            # checking whether the user has made a statement about telemetry collection
            is_telemetry_allowed()
        except RuntimeError as consent_is_undecided:
            raise consent_is_undecided

        self._set_save_version_and_package_id(Path(destination), package_id, version)

        destination = Path(destination)
        self.validate(destination)
        name = self.name.replace(" ", "-")
        package_name = f"{name}_{self.save_version}"

        destination = destination / package_name
        destination.mkdir(parents=True, exist_ok=True)

        for component in self.components:
            self.components[component].save(destination, False)
            if isinstance(self.components[component], PythonComponent):
                self.report_writer.add_direct_dependencies(self.components[component].name, self.components[component].python_dependencies.dependencies)

        self.save_datalink_metadata(destination)
        self.save_pipeline_config(destination)
        p_page.save_readme_html(self, destination)
        self.save_telemetry_data_if_consented(destination)

        zip_destination = shutil.make_archive(
            base_name=str(destination.parent / package_name), format='zip',
            root_dir=destination.parent, base_dir=package_name,
            verbose=True, logger=_logger)

        return Path(zip_destination)

    def _set_save_version_and_package_id(self, destination: Path, package_id: Optional[uuid.UUID], version: Optional[str]):
        previous_versions_and_ids = self._get_versions_and_package_ids_of_existing_packages(destination)
        previous_versions, previous_package_ids = zip(*previous_versions_and_ids) if previous_versions_and_ids else ([], [])

        # if package id is provided, we use that
        if package_id is not None:
            self.package_id = package_id
        # if not, we auto-generate a package id
        else:
            previous_package_ids_set = {pkg_id for pkg_id in previous_package_ids if pkg_id is not None}
            if self.package_id is not None:
                previous_package_ids_set.add(self.package_id)

            if len(previous_package_ids_set) == 0:
                self.package_id = uuid.uuid4()
            elif len(previous_package_ids_set) == 1:
                self.package_id = previous_package_ids_set.pop()
            else:
                _logger.error(f"Multiple package IDs found in the destination folder: {previous_package_ids_set}. Set a package ID.")
                raise RuntimeError(f"Multiple package IDs found in the destination folder: {previous_package_ids_set}. Set a package ID.")

        # Preference #1: use the provided version
        if version is not None:
            self.save_version = version
        # Preference #2: use the version set at init time
        elif self.init_version is not None:
            self.save_version = self.init_version
        # Preference #3: auto-generate version
        else:
            previous_decimal_versions_set = {int(v) for v in previous_versions if v is not None and v.isdecimal()}
            if len(previous_decimal_versions_set) == 0:
                self.save_version = "1"
            else:
                self.save_version = str(max(previous_decimal_versions_set) + 1)

        # checking if the package zip already exists
        name = self.name.replace(" ", "-")
        package_name = f"{name}_{self.save_version}"
        package_file = destination / f"{package_name}.zip"
        if package_file.exists():
            _logger.warning(f"Target package with version '{self.save_version}' already exists: '{package_file}. The package will be overwritten.")
        edge_package_file = destination / f"{name}-edge_{self.save_version}.zip"
        if edge_package_file.exists():
            _logger.warning(f"Target package with version '{self.save_version}' already exists: '{edge_package_file}. The package will be overwritten.")

        self.version = self.save_version

    def _get_versions_and_package_ids_of_existing_packages(self, destination: Path) -> List[Tuple[str, Optional[uuid.UUID]]]:
        package_versions_and_ids = []
        for file in destination.glob(f"{self.name.replace(' ', '-')}*.zip"):
            with zipfile.ZipFile(file) as zip_file:
                config_path = next(f for f in zip_file.namelist() if f.endswith("pipeline_config.yml"))
                with zip_file.open(config_path) as config_file:
                    config = yaml.load(config_file, Loader=yaml.SafeLoader)
                    pipeline_info = config.get("dataFlowPipelineInfo", {})
                    name = pipeline_info.get("projectName", None)
                    if name is None or name != self.name:
                        continue
                    version = pipeline_info.get("dataFlowPipelineVersion", None)
                    package_id = pipeline_info.get("packageId", None)
                    package_id = uuid.UUID(package_id) if package_id is not None else None
                    package_versions_and_ids.append((version, package_id))
        return package_versions_and_ids

    def export(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
        """
        Export a runnable pipeline package.

        Args:
            destination (str): optional target directory for saving the package. Defaults to ".".
            package_id (UUID): optional package ID. If None, a new UUID is generated.
            version (str): optional version. If None, an automatic version number is generated.
        """

        try:
            # checking whether the user has made a statement about telemetry collection
            is_telemetry_allowed()
        except RuntimeError as consent_is_undecided:
            raise consent_is_undecided

        destination = Path(destination)

        config_package = None
        try:
            config_package = self.save(destination, package_id, version)
            runtime_package = convert_package(config_package, self.report_writer)
            return runtime_package
        finally:
            if config_package is not None:
                Path(config_package).unlink(missing_ok=True)

    def _add_parameter(self, parameter: PipelineParameter | SecretPipelineParameter):
        if any(param.name == parameter.name for param in self.parameters):
            raise AssertionError(f"Parameter with name '{parameter.name}' already exists in the pipeline.")
        if parameter is None:
            raise AssertionError("Parameter cannot be None.")
        self.parameters.append(parameter)

    def add_secret_parameter(self, name: str, desc: str | None = None):
        """
        Adds a secret parameter to the pipeline configuration.

        Args:
            name (str): Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with `__AI_IS_` prefix.
            desc (str, optional): Description of the secret parameter. Must be a string between 3 and 60 characters if provided.

        Raises ValueError if the conditions for name or description are not met.
        """
        self._add_parameter(SecretPipelineParameter(name=name, description=desc))

    def add_parameter(self, name: str, default_value, type_name: str = "String", topic_based: bool = False, desc: str = None):
        """
        Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline.
        The parameter's default value and its properties are saved in the pipeline configuration
        and the value of the parameter can later be changed on AI Inference Server.

        Args:
            name (str): Name of the parameter
            desc (str): Description of the parameter (optional)
            type_name (str, optional): Data type of the parameter. Defaults to "String".
            default_value (str): Default value of the parameter
            topic_based (bool, optional): If true, the parameter can be updated from a message queue.

        Raises:

            ValueError:
                When:
                - the default value of the parameter is not of the specified data type (`type_name`) or
                - the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
                - the specified data type is not given in the right format or
                - the type of the given `topic_based` parameter is not `bool`.
                - the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
        """
        self._add_parameter(PipelineParameter(
            name=name,
            defaultValue=default_value,
            dtype=type_name,
            topicBased=topic_based,
            description=desc))

__init__(name, version=None, desc='')

A newly initialized Pipeline will contain no Component or wire, just its name and version will be set. The name and version will define together the name of the zip file when the package is saved.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the package

required
descstr

Package description (optional)

''
versionstr

Version of the package

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
    """
    A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
    The name and version will define together the name of the zip file when the package is saved.

    Args:
        name (str): Name of the package
        desc (str): Package description (optional)
        version (str): Version of the package
    """

    super().__init__(name, version, desc)

    self.report_writer = PipelineReportWriter()
    report_writer_handler = ReportWriterHandler(self.report_writer)
    _logger.addHandler(report_writer_handler)
    _python_dependencies_logger.addHandler(report_writer_handler)
    _wheelhouse_logger.addHandler(report_writer_handler)

from_components(components, name, version=None, desc='') staticmethod

Creates a pipeline configuration from the given components. The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components. The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs. The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.

Parameters:

NameTypeDescriptionDefault
componentslist

List of PythonComponents

required
namestr

Name of the pipeline

required
versionstr

Version information of the pipeline. (Optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
@staticmethod
def from_components(components: list, name: str, version: Optional[str] = None, desc: str = "") -> "Pipeline":
    """
    Creates a pipeline configuration from the given components.
    The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components.
    The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs.
    The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.

    Args:
        components (list): List of PythonComponents
        name (str): Name of the pipeline
        version (str): Version information of the pipeline. (Optional)
    Returns:
        Pipeline: Pipeline object with the auto-wired components
    """
    pipeline = Pipeline(name, version, desc=desc)

    first_component = components[0]
    pipeline.add_component(first_component)
    pipeline.inputs = [PipelineVariable(first_component.name, component_input) for component_input in first_component.inputs]
    pipeline.outputs = [PipelineVariable(first_component.name, output) for output in first_component.outputs]

    for component in components[1:]:
        pipeline.add_component(component)
        for t_output in pipeline.outputs:
            try:
                pipeline.add_wiring(t_output.componentName, t_output.variableName, component.name, t_output.variableName)
            except Exception as e:
                _logger.warning(f"Output variable {t_output.componentName}.{t_output.variableName} couldn't be auto-wired.\nCause: {e}")

        unwired_variables = [f'{component.name}.{x}' for x in component.inputs if not any(s.endswith(f'{component.name}.{x}') for s in pipeline.wiring)]
        if len(unwired_variables) > 0:
            for variable in unwired_variables:
                _logger.warning(f"Input variable {variable} couldn't be auto-wired.\n")
        pipeline.outputs = [PipelineVariable(component.name, output) for output in component.outputs]

    # ImageSet is not allowed in pipeline outputs
    for i, o in enumerate(pipeline.outputs):
        if pipeline.components[o.componentName].outputs[o.variableName]["type"].lower() == 'imageset':
            _logger.warning(f"Output variable {o.componentName}.{o.variableName} of type 'ImageSet' can not be added as pipeline output.")
            pipeline.outputs.pop(i)

    return pipeline

__repr__()

Textual representation of the configured package. The method shows the Components with their inputs, outputs and parameters as well as the wiring between these Components.

Returns:

TypeDescription
str

[str]: Textual representation of the package

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def __repr__(self) -> str:
    """
    Textual representation of the configured package.
    The method shows the `Components` with their inputs, outputs and parameters as well as the wiring between these `Components`.

    Returns:
        [str]: Textual representation of the package
    """
    version = self.save_version if self.save_version is not None else self.init_version

    text = f"[{self.__class__.__name__}] {self.name} ({version})\n"
    if self.desc != "":
        text += f"{self.desc}\n"

    if len(self.parameters) > 0:
        text += "\nPipeline Parameters:\n"
        for parameter in self.parameters:
            text += f"- {parameter.name} ({parameter.dtype}, default: '{parameter.defaultValue}'){(': ' + parameter.description) if parameter.description is not None else ''}\n"

    if len(self.inputs) > 0:
        text += "\nPipeline Inputs:\n"
        for p_input in self.inputs:
            c_input = self.components[p_input.componentName].inputs[p_input.variableName]
            text += f"> {p_input.variableName} ({c_input['type']}){': ' + c_input['desc'] if c_input.get('desc') is not None else ''}\n"

    if len(self.outputs) > 0:
        text += "\nPipeline Outputs:\n"
        for p_output in self.outputs:
            c_output = self.components[p_output.componentName].outputs[p_output.variableName]
            text += f"< {p_output.variableName} ({c_output['type']}){': ' + c_output['desc'] if c_output.get('desc') is not None else ''}\n"

    metrics = [(name, metric, component_name) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for name, metric in component.metrics.items()]
    if len(metrics) > 0:
        text += "\nMetrics:\n"
        for name, metric, _ in metrics:
            text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"

    if len(self.wiring) > 0:
        text += "\nI/O Wiring:\n"
        for p_input in self.inputs:
            text += f"  {p_input.variableName} -> {p_input.componentName}.{p_input.variableName}\n"
        for wire_hash in self.wiring:
            text += f"  {wire_hash}\n"
        for p_output in self.outputs:
            text += f"  {p_output.componentName}.{p_output.variableName} -> {p_output.variableName}\n"
        for name, metric, component_name in metrics:
            text += f"  {component_name}.{name} -> {name}\n"

    if self.periodicity is not None:
        text += "\nTimeshifting:\n"
        text += f"  Periodicity: {self.periodicity} ms\n"
        if len(self.timeshift_reference) > 0:
            text += "  References:\n"
            for ref in self.timeshift_reference:
                text += f"  - {ref}\n"

    if self.cycle_time is not None:
        text += f"\nCycle Time: {self.cycle_time} ms\n"

    for component in self.components.values():
        text += "\n" + component.__repr__()

    return text

add_input(component, variable)

Defines an input variable on the given component as a pipeline input.

Parameters:

NameTypeDescriptionDefault
componentstr

Name of the component

required
variablestr

Name of the input variable

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_input(self, component, variable):
    """
    Defines an input variable on the given component as a pipeline input.

    Args:
        component (str): Name of the component
        variable (str): Name of the input variable
    """
    try:
        _ = self.components[component].inputs[variable]
    except KeyError:
        raise AssertionError("The component with input variable must exist in the pipeline.")

    if self.inputs is None:
        self.inputs = []

    if self._get_io_index(self.inputs, variable) is not None:
        raise AssertionError("The pipeline input already exists.")

    self.inputs.append(PipelineVariable(componentName=component, variableName=variable))

delete_input(component, variable)

Deletes a pipeline input.

Parameters:

NameTypeDescriptionDefault
componentstr

Name of the component

required
variablestr

Name of the input variable

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_input(self, component: str, variable: str):
    """
    Deletes a pipeline input.

    Args:
        component (str): Name of the component
        variable (str): Name of the input variable
    """

    if (idx := self._get_io_index(self.inputs, variable)) is None:
        raise AssertionError("The pipeline input does not exist.")

    self.inputs.pop(idx)

add_output(component, variable)

Defines an output variable on the given component as a pipeline output.

Parameters:

NameTypeDescriptionDefault
componentstr

Name of the component

required
variablestr

Name of the output variable

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_output(self, component, variable):
    """
    Defines an output variable on the given component as a pipeline output.

    Args:
        component (str): Name of the component
        variable (str): Name of the output variable

    """
    try:
        output = self.components[component].outputs[variable]
    except KeyError:
        raise AssertionError("The component with output variable must exist in the pipeline.")

    if self.outputs is None:
        self.outputs = []

    if self._get_io_index(self.outputs, variable) is not None:
        raise AssertionError("The pipeline output already exists.")

    if output["type"].lower() == 'imageset':
        _logger.warning("Output variables of type 'ImageSet' can not be added as pipeline outputs.")

    self.outputs.append(PipelineVariable(componentName=component, variableName=variable))

delete_output(component, variable)

Deletes a pipeline output.

Parameters:

NameTypeDescriptionDefault
componentstr

Name of the component

required
variablestr

Name of the output variable

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_output(self, component: str, variable: str):
    """
    Deletes a pipeline output.

    Args:
        component (str): Name of the component
        variable (str): Name of the output variable

    """

    if (idx := self._get_io_index(self.outputs, variable)) is None:
        raise AssertionError("The pipeline output does not exist.")

    self.outputs.pop(idx)

add_component(component)

Adds a Component to the pipeline configuration without any connection. The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.

Parameters:

NameTypeDescriptionDefault
componentComponent

Component to be added

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_component(self, component: Component):
    """
    Adds a `Component` to the pipeline configuration without any connection.
    The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.

    Args:
        component (Component): `Component` to be added
    """

    if component.name in self.components:
        raise AssertionError(f"Component with name {component.name} already exists. Please rename the component.")
    self.components[component.name] = component

add_wiring(from_component, from_output, to_component, to_input)

Creates a one-to-one connection between the input and output of two components. The method checks if the connection is allowed with the following requirements:

  • The components exist with the given inputs/outputs
  • The given inputs and outputs are not connected to any wire
  • The types of the connected input and output are compatible

Parameters:

NameTypeDescriptionDefault
from_componentstr

Name of the component which provides data to the to_component

required
from_outputstr

Name of the output variable of the from_component

required
to_componentstr

Name of the component which consumes data from the from_component

required
to_inputstr

Name of the input variable of the to_component

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_wiring(self, from_component: str, from_output: str, to_component: str, to_input: str):
    """
    Creates a one-to-one connection between the input and output of two components.
    The method checks if the connection is allowed with the following requirements:

    - The components exist with the given inputs/outputs
    - The given inputs and outputs are not connected to any wire
    - The types of the connected input and output are compatible

    Args:
        from_component (str): Name of the component which provides data to the `to_component`
        from_output (str): Name of the output variable of the `from_component`
        to_component (str): Name of the component which consumes data from the `from_component`
        to_input (str): Name of the input variable of the `to_component`
    """
    if from_component not in self.components:
        raise AssertionError(f"No component named '{from_component}'")
    if to_component not in self.components:
        raise AssertionError(f"No component named '{to_component}'")
    if from_output not in self.components[from_component].outputs:
        raise AssertionError(f"Component '{from_component}' has no output named '{from_output}'")
    if to_input not in self.components[to_component].inputs:
        raise AssertionError(f"Component '{to_component}' has no input named '{to_input}'")
    if self.get_wire_for_input(to_component, to_input) is not None:
        raise AssertionError(f"Input '{to_input}' of component '{to_component}' is already wired")

    _output_type = self.components[from_component].outputs[from_output]["type"]
    _input_type = self.components[to_component].inputs[to_input]["type"]
    if _output_type != _input_type:
        raise AssertionError("Output and input types do not match")

    wire_hash = self._wire_hash_string.format(from_component, from_output, to_component, to_input)
    self.wiring[wire_hash] = {
        "fromComponent": from_component,
        "fromOutput": from_output,
        "toComponent": to_component,
        "toInput": to_input,
    }

get_wire_for_output(component_name, output_name)

Searches for the wire which connects a component with component_name as data provider through its output with name output_name.

Parameters:

NameTypeDescriptionDefault
component_namestr

Name of the data provider component.

required
output_namestr

Name of the output variable of component_name.

required

Returns:

TypeDescription
Optional[dict]

Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_wire_for_output(self, component_name: str, output_name: str) -> Optional[dict]:
    """
    Searches for the wire which connects a component with `component_name` as data provider through its output with name output_name.

    Args:
        component_name (str): Name of the data provider component.
        output_name (str): Name of the output variable of `component_name`.

    Returns:
        Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
    """
    wires = [x for x in self.wiring.values() if x["fromComponent"] == component_name and x["fromOutput"] == output_name]
    return wires[0] if wires else None

get_wire_for_input(component_name, input_name)

Searches for the wire which connects a component with component_name as data consumer through its input with name input_name.

Parameters:

NameTypeDescriptionDefault
component_namestr

Name of the data consumer component.

required
input_namestr

Name of the input variable of component_name.

required

Returns:

TypeDescription
Optional[dict]

Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_wire_for_input(self, component_name: str, input_name: str) -> Optional[dict]:
    """
    Searches for the wire which connects a component with `component_name` as data consumer through its input with name `input_name`.

    Args:
        component_name (str): Name of the data consumer component.
        input_name (str): Name of the input variable of `component_name`.

    Returns:
        Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
    """
    wires = [x for x in self.wiring.values() if x["toComponent"] == component_name and x["toInput"] == input_name]
    return wires[0] if wires else None

delete_input_wire(component, variable, with_input=True)

Deletes an existing connection between two components. The connection must be given with the name of the consumer component and its input variable. If an inter signal alignment reference variable is affected it cannot be deleted. By default, the input variable will be also deleted.

Parameters:

NameTypeDescriptionDefault
componentstr

Name of the component which has the input given the name variable

required
variablestr

Name of the input variable on the component which connected by the wire

required
with_inputbool

If set, the input variable will be also deleted from the component. Defaults to True.

True

Raises:

TypeDescription
AssertionError

When the variable acts as inter signal alignment reference, it cannot be deleted, and an AssertionError will be raised.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_input_wire(self, component: str, variable: str, with_input: bool = True) -> None:
    """
    Deletes an existing connection between two components.
    The connection must be given with the name of the consumer component and its input variable.
    If an inter signal alignment reference variable is affected it cannot be deleted.
    By default, the input variable will be also deleted.

    Args:
        component (str): Name of the component which has the input given the name variable
        variable (str): Name of the input variable on the component which connected by the wire
        with_input (bool, optional): If set, the input variable will be also deleted from the component. Defaults to True.

    Raises:
        AssertionError: When the variable acts as inter signal alignment reference, it cannot be deleted, and an `AssertionError` will be raised.
    """
    wire = self.get_wire_for_input(component, variable)
    if wire is None:
        raise AssertionError(f"There is no wiring for input '{variable}' of component '{component}'")
    if variable in self.timeshift_reference:
        raise AssertionError("Inter signal alignment reference variables can not be deleted.")
    wire_hash = self._wire_hash_string.format(wire['fromComponent'], wire['fromOutput'], wire['toComponent'], wire['toInput'])
    self.wiring.pop(wire_hash)

    if with_input:
        self.components[component].delete_input(variable)

add_dependencies(packages)

@Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis. Collects the given Python packages with their versions from the executing Python environment and add them to all components of type PythonComponent. This step is necessary in order to execute the pipeline configuration on the Edge side. The method can be called multiple times but each time the previously-collected dependencies are cleared. The reason for this is to ensure a consistent dependency list for the requirements.txt file when the package is saved.

Parameters:

NameTypeDescriptionDefault
packageslist

List of the necessary python packages to execute the script defined by self.entrypoint

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_dependencies(self, packages: list) -> None:
    """
    @Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis.
    Collects the given Python packages with their versions from the executing Python environment and add them to all components of type `PythonComponent`.
    This step is necessary in order to execute the pipeline configuration on the Edge side.
    The method can be called multiple times but each time the previously-collected dependencies are cleared.
    The reason for this is to ensure a consistent dependency list for the `requirements.txt` file when the package is saved.

    Args:
        packages (list): List of the necessary python packages to execute the script defined by self.entrypoint
    """
    python_components = [self.components[name] for name in self.components if type(self.components[name]) is PythonComponent]
    for component in python_components:
        component.add_dependencies(packages)

set_timeshifting_periodicity(periodicity)

Enables inter-signal alignment with the given sampling period.

With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model. By default, startingPoint property is set to First timestamp, which means that inter-signal alignment is started at the first incoming value for any input variable.

This property can be changed to Signal reference by adding inter-signal alignment reference variables via the add_timeshifting_reference(..) method. In this case, inter-signal alignment is started when the first value arrives for the defined input variables.

Parameters:

NameTypeDescriptionDefault
periodicityint

Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31).

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def set_timeshifting_periodicity(self, periodicity: int) -> None:
    """
    Enables inter-signal alignment with the given sampling period.

    With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model.
    By default, `startingPoint` property is set to `First timestamp`, which means that inter-signal alignment is started at the
    first incoming value for any input variable.

    This property can be changed to `Signal reference` by adding inter-signal alignment reference variables
    via the `add_timeshifting_reference(..)` method. In this case, inter-signal alignment is started when the first value arrives
    for the defined input variables.

    Args:
        periodicity (int): Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31).
    """

    periodicity = int(periodicity)
    if periodicity not in range(10, int(math.pow(2, 31))):
        raise AssertionError("Inter signal alignment periodicity must be an integer and in range [10, 2^31)")

    self.periodicity = periodicity
    _logger.info(f"Inter signal alignment periodicity has been set to {self.periodicity}.")

add_timeshifting_reference(reference)

Enables signal alignment mode Signal reference by declaring input variables as reference variables.

Parameters:

NameTypeDescriptionDefault
referencestr

Variable name to be added to self.timeshift_reference list.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_timeshifting_reference(self, reference: str) -> None:
    """
    Enables signal alignment mode `Signal reference` by declaring input variables as reference variables.

    Args:
        reference (str): Variable name to be added to `self.timeshift_reference` list.
    """
    if reference not in [i.variableName for i in self.inputs]:
        raise AssertionError(f"There is no input variable defined with name '{reference}'")
    if reference in self.timeshift_reference:
        _logger.warning(f"Reference variable with name '{reference}' has been already added.")
        return
    self.timeshift_reference.append(reference)

remove_timeshifting_reference(reference)

Removes previously-defined inter-signal alignment reference variables. If no reference variables remain, the startingPoint will be First timestamp.

Parameters:

NameTypeDescriptionDefault
referencestr

Variable name to be removed from self.timeshift_reference list.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def remove_timeshifting_reference(self, reference: str) -> None:
    """
    Removes previously-defined inter-signal alignment reference variables.
    If no reference variables remain, the `startingPoint` will be `First timestamp`.

    Args:
        reference (str): Variable name to be removed from `self.timeshift_reference` list.
    """
    if reference not in self.timeshift_reference:
        raise AssertionError(f"Reference variable with name {'reference'} does not exist.")
    self.timeshift_reference.remove(reference)

set_cycle_time(cycle_time_usec)

Sets the cycle time for the pipeline.

Parameters:

NameTypeDescriptionDefault
cycle_time_usecint | None

Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def set_cycle_time(self, cycle_time_usec: int | None):
    """
    Sets the cycle time for the pipeline.

    Args:
        cycle_time_usec (int | None): Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000.
    """
    self.cycle_time = cycle_time_usec
    self._check_cycle_time()

get_pipeline_config()

Saves the information on the composed pipeline configuration package into a YAML file.

This YAML file describes the components and the data flow between them for the AI Inference Server. The file is created in the destination folder with name pipeline_config.yml

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_pipeline_config(self) -> dict:
    """
    Saves the information on the composed pipeline configuration package into a YAML file.

    This YAML file describes the components and the data flow between them for the AI Inference Server.
    The file is created in the `destination` folder with name `pipeline_config.yml`
    """
    version = self.save_version if self.save_version is not None else self.init_version

    metric_fields = [(component_name, field) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for field in component.metrics.keys()]

    pipeline_inputs = [
        i.__dict__(self.components[i.componentName].inputs)
        for i in self.inputs
    ]

    pipeline_outputs = [
        {**o.__dict__(self.components[o.componentName].outputs), 'metric': False}
        for o in self.outputs
    ]

    pipeline_outputs += [{
        'name': field,
        'type': 'String',
        'metric': True,
        'topic': f"/siemens/edge/aiinference/{self.name}/{version}/metrics/{component_name}/{field}",
    } for component_name, field in metric_fields]

    pipeline_dag = [{
        'source': f'Databus.{i.variableName}',
        'target': f'{i.componentName}.{i.variableName}',
    } for i in self.inputs]

    pipeline_dag += [{
        'source': f"{wire['fromComponent']}.{wire['fromOutput']}",
        'target': f"{wire['toComponent']}.{wire['toInput']}",
    } for wire in self.wiring.values()]

    pipeline_dag += [{
        'source': f'{o.componentName}.{o.variableName}',
        'target': f'Databus.{o.variableName}',
    } for o in self.outputs]

    pipeline_dag += [{
        'source': f'{component_name}.{field}',
        'target': f'Databus.{field}',
    } for component_name, field in metric_fields]

    config_yml_content = {
        'fileFormatVersion': '1.2.0',
        'dataFlowPipelineInfo': {
            'author': self.author,
            'createdOn': datetime.now(),
            'dataFlowPipelineVersion': version,
            'description': self.desc if self.desc else 'Created by AI SDK',
            'projectName': self.name,
            'packageId': str(self.package_id)
        },
        'dataFlowPipeline': {
            'components': [component._to_dict() for component in self.components.values()],
            'pipelineDag': pipeline_dag,
            'pipelineInputs': pipeline_inputs,
            'pipelineOutputs': pipeline_outputs,
        },
        'packageType': 'full'
    }
    if len(self.parameters) > 0:
        config_yml_content["dataFlowPipeline"]["pipelineParameters"] = [
            param.__param_dict__()
            for param in self.parameters
        ]
    if self.cycle_time is not None:
        config_yml_content["dataFlowPipeline"]["cycle_time"] = self.cycle_time

    return config_yml_content

save_pipeline_config(destination)

Saves the information about the composed pipeline configuration package into a YAML file.

This YAML file describes the components and the data flow between them for AI Inference Server. The file will be created in the destination folder with name pipeline_config.yml

Parameters:

NameTypeDescriptionDefault
destinationpath - like

Path of the destination directory.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_pipeline_config(self, destination):
    """
    Saves the information about the composed pipeline configuration package into a YAML file.

    This YAML file describes the components and the data flow between them for AI Inference Server.
    The file will be created in the `destination` folder with name `pipeline_config.yml`

    Args:
        destination (path-like): Path of the `destination` directory.
    """

    with open(Path(destination) / PIPELINE_CONFIG, "w") as f:
        yaml.dump(self.get_pipeline_config(), f)

The method generates metadata information based on available information.

Returns:

NameTypeDescription
dictdict

Dictionary with the necessary information for the AI Inference Server.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_datalink_metadata(self) -> dict:
    """
    The method generates metadata information based on available information.

    Returns:
        dict: Dictionary with the necessary information for the AI Inference Server.
    """

    timeshifting = {
        "id": None,
        "enabled": False,
        "periodicity": self.periodicity,
        "startingPoint": None,
    }

    if self.periodicity is not None:
        timeshifting["enabled"] = True
        timeshifting["startingPoint"] = 'First timestamp'

    if len(self.timeshift_reference) > 0:
        timeshifting["startingPoint"] = 'Signal reference'

    exported_metadata = {
        "fileFormatVersion": "1.0.0",
        "id": None,
        "version": None,
        "createdOn": datetime.now(),
        "updatedOn": datetime.now(),
        "timeShifting": timeshifting,
        "inputs": [
            {
                'name': input.variableName,
                'mapping': None,
                'timeShiftingReference': input.variableName in self.timeshift_reference,
                'type': self.components[input.componentName].inputs[input.variableName]['type']
            } for input in self.inputs
        ]
    }
    return exported_metadata

Saves metadata for pipeline input variables. This method saves metadata for the AI Inference Server into a YAML file. This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment. The file is created in the destination folder with the name datalink_metadata.yml

Parameters:

NameTypeDescriptionDefault
destinationpath - like

Path of the destination directory.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_datalink_metadata(self, destination):
    """
    Saves metadata for pipeline input variables.
    This method saves metadata for the AI Inference Server into a YAML file.
    This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment.
    The file is created in the `destination` folder with the name `datalink_metadata.yml`

    Args:
        destination (path-like): Path of the destination directory.
    """
    with open(Path(destination) / DATALINK_METADATA, "w") as f:
        yaml.dump(self.get_datalink_metadata(), f)

save_telemetry_data(destination)

Save telemetry data to a specified destination.

Parameters:

NameTypeDescriptionDefault
destinationPath

The path where the telemetry data should be saved.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_telemetry_data(self, destination: Path):
    """
    Save telemetry data to a specified destination.

    Args:
        destination (Path): The path where the telemetry data should be saved.
    """

    telemetry_path = destination / TELEMETRY_YAML
    telemetry_data = {}

    try:
        consent_allowed = is_telemetry_allowed()
    except RuntimeError:
        consent_allowed = "undecided"

    try:
        consent_data, _ = read_telemetry_consent_file()
    except BaseException:
        consent_data = {}

    telemetry_data["telemetry_consent"] = {}
    telemetry_data["telemetry_consent"][CONSENT_KEY] = consent_allowed
    telemetry_data["telemetry_consent"][CONSENT_KEY_SDK] = consent_data.get(CONSENT_KEY_SDK, 'unknown')
    telemetry_data["telemetry_consent"][CONSENT_KEY_TS] = consent_data.get(CONSENT_KEY_TS, 'unknown')
    telemetry_data["telemetry_consent"][CONSENT_KEY_METHOD] = consent_data.get(CONSENT_KEY_METHOD, get_consent_source())

    telemetry_data["platform"] = {}
    telemetry_data["platform"]["os"] = platform.system()
    telemetry_data["platform"]["release"] = platform.release()
    telemetry_data["platform"]["python_version"] = platform.python_version()

    _logger.info(f"locals: {locals()}")
    telemetry_data["environment"] = {}
    env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
    env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
    env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
    env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
    telemetry_data["environment"]["info"] = env_info

    telemetry_data["industrial_ai"] = {}
    try:
        telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
    except importlib_metadata.PackageNotFoundError:
        telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
        _logger.debug("simaticai package not found")

    telemetry_data["pipeline"] = {}
    telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component].python_version for component in self.components
                                                             if isinstance(self.components[component], PythonComponent)))
    telemetry_data["pipeline"]["file_extensions"] = list(set(f.suffix for f in Path(destination).rglob("*")
                                                             if f.suffix not in ["", ".zip", ".yml", ".yaml", ".html"]))
    with open(telemetry_path, 'w') as telemetry_file:
        yaml.dump(telemetry_data, telemetry_file)

validate(destination='.')

Validates whether the package configuration is compatible with the expected runtime environment.

The method verifies:

  • If the package has at least one component
  • If all wires create connections between existing components and their variables
  • If metadata is defined and valid.
  • If a package with the same name already exists in the destination folder. In this case a warning message appears and the save(..) method overwrites the existing package.
  • If the package has multiple components and if they are using the same Python version

Parameters:

NameTypeDescriptionDefault
destinationstr

Path of the expected destination folder. Defaults to ".".

'.'
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def validate(self, destination: str | Path = "."):
    """
    Validates whether the package configuration is compatible with the expected runtime environment.

    The method verifies:

    - If the package has at least one component
    - If all wires create connections between existing components and their variables
    - If metadata is defined and valid.
    - If a package with the same name already exists in the `destination` folder. In this case a warning message appears and the `save(..)` method overwrites the existing package.
    - If the package has multiple components and if they are using the same Python version

    Args:
        destination (str, optional): Path of the expected destination folder. Defaults to ".".
    """
    version = self.save_version if self.save_version is not None else self.init_version

    if len(self.components) < 1:
        raise AssertionError("The package must have at least one component.")

    for p_output in self.outputs:
        if self.components[p_output.componentName].batch.outputBatch:
            raise AssertionError(f"The component '{p_output.componentName}' has pipeline output defined with variable name '{p_output.variableName}'. \
                                  None of component with pipeline output is allowed to provide batch output.")

    for wire_hash in self.wiring.copy():
        wire = self.wiring[wire_hash]
        self._check_wiring(wire, wire_hash)

    pipeline_inputs = [p_input.variableName for p_input in self.inputs]
    pipeline_outputs = [p_output.variableName for p_output in self.outputs]
    if any(variable in pipeline_outputs for variable in pipeline_inputs):
        conflicts = set(pipeline_inputs).intersection(set(pipeline_outputs))
        raise AssertionError(f"Pipeline input and output variables must be unique. Conflicting variables: {conflicts}")

    self._check_timeshifting()
    self._check_cycle_time()
    self._check_supported_types_for_connections()

    package_path = Path(destination) / f"{self.name}_{version}".replace(" ", "-")
    if package_path.is_dir():
        _logger.warning(f"Target folder ({package_path}) already exists! Unless changing the package name the package could be invalid and your files will be overwritten!")

    python_versions = set()

    for component in self.components:
        self.components[component].validate()

        if isinstance(self.components[component], PythonComponent):
            python_versions.add(self.components[component].python_version)

    if (1 < len(python_versions)):
        _logger.warning("The use of multiple python version in a single pipeline is not recommended. We recommend using only one of the supported versions, which are Python 3.11 or 3.12.")

    _logger.info(f"Package '{self.name}' is valid and ready to save.")

save(destination='.', package_id=None, version=None)

@Deprecated, reason: only edge package generation will be supported in the future. Use export instead.

Saves the assembled package in a zip format. The name of the file is defined as {package_name}_{package_version}.zip. If a file with such a name already exists in the destination folder, it gets overwritten and a warning message appears.

The package is also available as a subfolder on the destination path with the name {package_name}_{package_version}. If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.

The package contains files and folders in the following structure:

  • Package folder with name {package_name}_{package_version}

    • datalink-metadata.yml
    • pipeline-config.yml
    • Component folder with name {component_name}

    When the component is a PythonComponent, this folder contains:

    • requirements.txt
    • Entrypoint script defined by the entrypoint of the component
    • Extra files as added to the specified folders
    • Source folder with name src with necessary python scripts

If a package ID is specified, and a package with the same ID and version is already present in the destination folder, an error is raised.

Parameters:

NameTypeDescriptionDefault
destinationstr

Target directory for saving the package. Defaults to ".".

'.'
package_idUUID

The optional package ID. If None, a new UUID is generated.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
    """
    @Deprecated, reason: only edge package generation will be supported in the future. Use export instead.

    Saves the assembled package in a zip format.
    The name of the file is defined as `{package_name}_{package_version}.zip`.
    If a file with such a name already exists in the `destination` folder, it gets overwritten and a warning message appears.

    The package is also available as a subfolder on the destination path with the name `{package_name}_{package_version}`.
    If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.

    The package contains files and folders in the following structure:

    - Package folder with name `{package_name}_{package_version}`
        - `datalink-metadata.yml`
        - `pipeline-config.yml`
        - Component folder with name `{component_name}`

        When the component is a `PythonComponent`, this folder contains:

        - `requirements.txt`
        - Entrypoint script defined by the entrypoint of the component
        - Extra files as added to the specified folders
        - Source folder with name `src` with necessary python scripts

    If a package ID is specified, and a package with the same ID and version is already present in the `destination` folder,
    an error is raised.

    Args:
        destination (str, optional): Target directory for saving the package. Defaults to ".".
        package_id (UUID): The optional package ID. If None, a new UUID is generated.
    """

    try:
        # checking whether the user has made a statement about telemetry collection
        is_telemetry_allowed()
    except RuntimeError as consent_is_undecided:
        raise consent_is_undecided

    self._set_save_version_and_package_id(Path(destination), package_id, version)

    destination = Path(destination)
    self.validate(destination)
    name = self.name.replace(" ", "-")
    package_name = f"{name}_{self.save_version}"

    destination = destination / package_name
    destination.mkdir(parents=True, exist_ok=True)

    for component in self.components:
        self.components[component].save(destination, False)
        if isinstance(self.components[component], PythonComponent):
            self.report_writer.add_direct_dependencies(self.components[component].name, self.components[component].python_dependencies.dependencies)

    self.save_datalink_metadata(destination)
    self.save_pipeline_config(destination)
    p_page.save_readme_html(self, destination)
    self.save_telemetry_data_if_consented(destination)

    zip_destination = shutil.make_archive(
        base_name=str(destination.parent / package_name), format='zip',
        root_dir=destination.parent, base_dir=package_name,
        verbose=True, logger=_logger)

    return Path(zip_destination)

export(destination='.', package_id=None, version=None)

Export a runnable pipeline package.

Parameters:

NameTypeDescriptionDefault
destinationstr

optional target directory for saving the package. Defaults to ".".

'.'
package_idUUID

optional package ID. If None, a new UUID is generated.

None
versionstr

optional version. If None, an automatic version number is generated.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def export(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
    """
    Export a runnable pipeline package.

    Args:
        destination (str): optional target directory for saving the package. Defaults to ".".
        package_id (UUID): optional package ID. If None, a new UUID is generated.
        version (str): optional version. If None, an automatic version number is generated.
    """

    try:
        # checking whether the user has made a statement about telemetry collection
        is_telemetry_allowed()
    except RuntimeError as consent_is_undecided:
        raise consent_is_undecided

    destination = Path(destination)

    config_package = None
    try:
        config_package = self.save(destination, package_id, version)
        runtime_package = convert_package(config_package, self.report_writer)
        return runtime_package
    finally:
        if config_package is not None:
            Path(config_package).unlink(missing_ok=True)

add_secret_parameter(name, desc=None)

Adds a secret parameter to the pipeline configuration.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with __AI_IS_ prefix.

required
descstr

Description of the secret parameter. Must be a string between 3 and 60 characters if provided.

None

Raises ValueError if the conditions for name or description are not met.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_secret_parameter(self, name: str, desc: str | None = None):
    """
    Adds a secret parameter to the pipeline configuration.

    Args:
        name (str): Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with `__AI_IS_` prefix.
        desc (str, optional): Description of the secret parameter. Must be a string between 3 and 60 characters if provided.

    Raises ValueError if the conditions for name or description are not met.
    """
    self._add_parameter(SecretPipelineParameter(name=name, description=desc))

add_parameter(name, default_value, type_name='String', topic_based=False, desc=None)

Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline. The parameter's default value and its properties are saved in the pipeline configuration and the value of the parameter can later be changed on AI Inference Server.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the parameter

required
descstr

Description of the parameter (optional)

None
type_namestr

Data type of the parameter. Defaults to "String".

'String'
default_valuestr

Default value of the parameter

required
topic_basedbool

If true, the parameter can be updated from a message queue.

False
ValueError:
    When:
    - the default value of the parameter is not of the specified data type (`type_name`) or
    - the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
    - the specified data type is not given in the right format or
    - the type of the given `topic_based` parameter is not `bool`.
    - the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_parameter(self, name: str, default_value, type_name: str = "String", topic_based: bool = False, desc: str = None):
    """
    Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline.
    The parameter's default value and its properties are saved in the pipeline configuration
    and the value of the parameter can later be changed on AI Inference Server.

    Args:
        name (str): Name of the parameter
        desc (str): Description of the parameter (optional)
        type_name (str, optional): Data type of the parameter. Defaults to "String".
        default_value (str): Default value of the parameter
        topic_based (bool, optional): If true, the parameter can be updated from a message queue.

    Raises:

        ValueError:
            When:
            - the default value of the parameter is not of the specified data type (`type_name`) or
            - the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
            - the specified data type is not given in the right format or
            - the type of the given `topic_based` parameter is not `bool`.
            - the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
    """
    self._add_parameter(PipelineParameter(
        name=name,
        defaultValue=default_value,
        dtype=type_name,
        topicBased=topic_based,
        description=desc))

convert_package(zip_path, report_writer=None)

@Deprecated, reason: only edge package generation will be supported in the future. Use Pipeline.export(...) instead.

Create an Edge Configuration Package from a given Pipeline Configuration Package.

If the input zip file is {path}/{name}_{version}.zip, the output file will be created as {path}/{name}-edge_{version}.zip. Please make sure that the given zip file comes from a trusted source!

If a file with such a name already exists, it is overwritten.

First, this method verifies that the requirements identified by name and version are either included in PythonPackages.zip or available on pypi.org for the target platform.

Currently, the supported edge devices run Linux on 64-bit x86 architecture, so the accepted Python libraries are restricted to the platform independent ones and packages built for 'x86_64' platforms. AI Inference Server also provides a Python 3.11 and 3.12 runtime environment, so the supported Python libraries are restricted to Python 3.11 and 3.12 compatible packages.

If for the target platform the required dependency is not available on pypi.org and not present in PythonPackages.zip, it will log the problem at ERROR level. Then it downloads all dependencies (either direct or transitive), and creates a new zip file, which is validated against the AI Inference Server's schema. This functionality requires pip with version of 21.3.1 or greater.

This method can be used from the command line too. Example usage:

python -m simaticai convert_package <path_to_pipeline_configuration_package.zip>

Parameters:

NameTypeDescriptionDefault
zip_pathpath - like

path to the pipeline configuration package zip file.

required
report_writerReportWriter

a ReportWriter object to write the report for a pipeline. Defaults to None.

None

Returns:

TypeDescription
Path

os.PathLike: The path of the created zip file.

Raises:

TypeDescription
PipelineValidationError

If the validation fails. See the logger output for details.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def convert_package(zip_path: Union[str, os.PathLike], report_writer: Optional[PipelineReportWriter] = None) -> Path:
    """
    @Deprecated, reason: only edge package generation will be supported in the future. Use Pipeline.export(...) instead.

    Create an Edge Configuration Package from a given Pipeline Configuration Package.

    If the input zip file is `{path}/{name}_{version}.zip`, the output file will be created as `{path}/{name}-edge_{version}.zip`.
    Please make sure that the given zip file comes from a trusted source!

    If a file with such a name already exists, it is overwritten.

    First, this method verifies that the requirements identified by name and version are either included
    in `PythonPackages.zip` or available on pypi.org for the target platform.

    Currently, the supported edge devices run Linux on 64-bit x86 architecture, so the accepted Python libraries are restricted to the platform independent ones and packages built for 'x86_64' platforms.
    AI Inference Server also provides a Python 3.11 and 3.12 runtime environment, so the supported Python libraries are restricted to Python 3.11 and 3.12 compatible packages.

    If for the target platform the required dependency is not available on pypi.org
    and not present in `PythonPackages.zip`,  it will log the problem at ERROR level.
    Then it downloads all dependencies (either direct or transitive), and creates a new zip
    file, which is validated against the AI Inference Server's schema.
    This functionality requires pip with version of 21.3.1 or greater.

    This method can be used from the command line too.
    Example usage:
    ```
    python -m simaticai convert_package <path_to_pipeline_configuration_package.zip>
    ```

    Args:
        zip_path (path-like): path to the pipeline configuration package zip file.
        report_writer (ReportWriter, optional): a ReportWriter object to write the report for a pipeline. Defaults to None.

    Returns:
        os.PathLike: The path of the created zip file.

    Exceptions:
        PipelineValidationError: If the validation fails. See the logger output for details.
    """

    try:
        # checking whether the user has made a statement about telemetry collection
        is_telemetry_allowed()
    except RuntimeError as consent_is_undecided:
        raise consent_is_undecided

    zip_path = Path(zip_path)
    if zip_path.stem.find('_') < 0:
        raise AssertionError("The input zip file name must contain an underscore character.")
    with tempfiles.OpenZipInTemp(zip_path) as zip_dir:
        top_level_items = list(zip_dir.iterdir())
        if len(top_level_items) != 1:
            raise AssertionError("The Pipeline Configuration Package must contain a single top level directory.")
        package_dir = zip_dir / top_level_items[0]
        runtime_dir = zip_dir / "edge_config_package"
        runtime_dir.mkdir(parents=True, exist_ok=True)
        config = yaml_helper.read_yaml(package_dir / PIPELINE_CONFIG)
        _validate_with_schema("input pipeline_config.yml", config, "pipeline.schema.json")
        runtime_config = _generate_runtime_config(config)
        if report_writer is not None:
            report_writer.set_path(Path(zip_path.parent / f"{zip_path.stem}_package_report.md"))
            report_writer.set_pipeline_config(config)

        for component in config['dataFlowPipeline']['components']:
            source_dir = package_dir / component["name"]

            if component["runtime"]["type"] == "python":
                python_version = component['runtime']['version']
                try:
                    python_version_validator(python_version)
                except ValueError as error:
                    raise AssertionError(error)

                pip_option_no_deps = component["runtime"].get("installTransitiveDependencies", True) is False
                if pip_option_no_deps:
                    _logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
                                    "This may lead to runtime errors if the dependencies are not handled manually.")
                dependency_set = _package_component_dependencies(source_dir, python_version, pip_option_no_deps)
                if report_writer is not None:
                    report_writer.add_full_dependency_set(component_name=component["name"], dependency_set=dependency_set)
                runtime_config["runtimeConfiguration"]["components"].append({
                    "name": component["name"],
                    "device": "IED1",
                    "targetRuntime": "Python",
                })

            if component["runtime"]["type"] == "gpuruntime":
                runtime_config["runtimeConfiguration"]["components"].append({
                    "name": component["name"],
                    "device": "IED1",
                    "targetRuntime": "gpuruntime",
                })

            _package_component(source_dir, runtime_dir / 'components' / f"{component['name']}_{component['version']}")

        if report_writer is not None:
            report_writer.write_report()
        _logger.info(f"Report on {zip_path.stem} is saved to {zip_path.parent}.")
        shutil.copy(str(package_dir / PIPELINE_CONFIG), str(runtime_dir / PIPELINE_CONFIG))
        datalink_metadata_yaml = package_dir / DATALINK_METADATA
        if datalink_metadata_yaml.is_file():
            shutil.copy(str(datalink_metadata_yaml), runtime_dir / DATALINK_METADATA)

        _validate_with_schema(f"generated {RUNTIME_CONFIG}", runtime_config, "runtime.schema.json")
        with open(runtime_dir / RUNTIME_CONFIG, "w", encoding="utf8") as file:
            yaml.dump(runtime_config, file)

        readme_html = package_dir / README_HTML
        if readme_html.exists():
            (runtime_dir / README_HTML).write_text(readme_html.read_text())

        telemetry_yaml = package_dir / TELEMETRY_YAML
        if telemetry_yaml.exists():
            (runtime_dir / TELEMETRY_YAML).write_text(telemetry_yaml.read_text())

        edge_package_path = Path(shutil.make_archive(
            # replacing the last occurrence of "_" with "-edge_".
            base_name=str(PurePath(zip_path).parent / "-edge_".join(zip_path.stem.rsplit("_", 1))),
            format='zip',
            root_dir=runtime_dir,
            verbose=True,
            logger=_logger))

        package_size = edge_package_path.stat().st_size  # zipped package size in bytes
        if package_size > PACKAGE_SIZE_LIMIT:
            package_size_GB = "{:.2f}".format(package_size / 1000 / 1000 / 1000)
            package_size_limit_GB = "{:.2f}".format(PACKAGE_SIZE_LIMIT / 1000 / 1000 / 1000)
            error_msg = f"Pipeline package size {package_size} bytes ({package_size_GB} GB) exceeds the limit of " \
                        f"{PACKAGE_SIZE_LIMIT} bytes ({package_size_limit_GB} GB). " \
                        "Please remove unnecessary files and dependencies and try again."

            _logger.error(error_msg)
            raise RuntimeError(error_msg)

        sha256_hash = calc_sha(edge_package_path)
        sha_format = f"{sha256_hash}  {edge_package_path.name}"
        edge_package_path.with_suffix('.sha256').write_text(sha_format)

        return edge_package_path

PythonComponent

Bases: Component

A pipeline component implemented using Python scripts and libraries.

A PythonComponent wraps Python code resource files such as saved models into a structured folder, which can be added to a pipeline configuration package.

For a comprehensive overview on how to wrap ML models into Python components, we recommend you refer to the AI SDK User Manual, especially the guideline for writing pipeline components. We also recommend you study the example Python components in the E2E Tutorials for the AI SDK.

A new PythonComponent is empty.

Parameters:

NameTypeDescriptionDefault
namestr

Component name. (default: inference)

'inference'
descstr

Component description (optional)

''
versionstr

Component version. (default: 0.0.1)

'0.0.1'
python_versionstr

Python version on the target AI Inference Server. At the moment of writing, the current version supports Python 3.11 and 3.12.

'3.12'
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
class PythonComponent(Component):
    """
    A pipeline component implemented using Python scripts and libraries.

    A `PythonComponent` wraps Python code resource files such as saved models into a structured folder, which can be added to a pipeline
    configuration package.

    For a comprehensive overview on how to wrap ML models into Python components, we recommend you refer to
    the AI SDK User Manual, especially the guideline for writing pipeline components. We also recommend you
    study the example Python components in the E2E Tutorials for the AI SDK.

    A new `PythonComponent` is empty.

    Args:
        name (str): Component name. (default: inference)
        desc (str): Component description (optional)
        version (str): Component version. (default: 0.0.1)
        python_version (str): Python version on the target AI Inference Server.
            At the moment of writing, the current version supports Python 3.11 and 3.12.
    """

    def __init__(self, name: str = "inference", version: str = "0.0.1", python_version: str = '3.12', desc: str = ""):
        """
        Creates a new, empty Python component.

        Args:
            name (str): Component name. (default: inference)
            desc (str): Component description (optional)
            version (str): Component version. (default: 0.0.1)
            python_version (str): Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12.
        """

        super().__init__(name=name, desc=desc)

        try:
            python_version_validator(python_version)
        except ValueError as error:
            raise AssertionError(error)

        self.python_version = str(python_version)
        self.version = version
        self.metrics = {}
        self.entrypoint: Path | None = None
        self.resources = {}
        self.python_dependencies = PythonDependencies(python_version)
        self._replicas = 1
        self.is_valid = False
        self.no_deps = False

    def __repr__(self) -> str:
        text = super().__repr__()

        if len(self.metrics) > 0:
            text += "\nMetrics:\n"
            for name, metric in self.metrics.items():
                text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"

        if len(self.resources):
            text += "\nResources:\n"
            for path, base in self.resources.items():
                text += f"  {base}/{path.name}\n".replace('./', '')

        if self.entrypoint is not None:
            text += f"Entrypoint: {self.entrypoint}\n"

        return text

    def set_entrypoint(self, entrypoint: str):
        """
        Sets the entrypoint module for the component.

        The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server.
        The script should consume a JSON string and produce another. See the short example below.

        The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.

        The example code below shows a basic structure of the entrypoint Python code.
        ```python
        import json
        import sys
        from pathlib import Path

        # by adding the parent folder of your modules to system path makes them available for relative import
        sys.path.insert(0, str(Path('./src').resolve()))
        from my_module import processor  # then the processor module can be imported

        def run(data: str):
            input_data = json.loads(data)  # incoming JSON string is loaded as a dictionary

            result = processor.process_data(input_data)  # the process_data can be called to process the incoming data

            # the code below creates the formatted output for the AI Inference Server
            if result is None:
                answer = {"ready": False, "output": None}
            else:
                answer = {"ready": True, "output": json.dumps(result)}

            return answer
        ```

        Args:
            entrypoint (str): Name of the new entrypoint script to be copied

        """
        self.is_valid = False

        if not any(key.name for key, value in self.resources.items() if key.name == entrypoint and value == '.'):
            raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")

        self.entrypoint = Path(entrypoint)

    def add_resources(self, base_dir: os.PathLike, resources: os.PathLike | list):
        """
        Adds files to a component.

        To make your file resources available on the AI Inference Server you need to add them to the package resources.
        These resources can be Python or config files, serialized ML models or reference data.
        They are then available on path {component_root}/{resources} in the runtime environment.
        When saving the package they will be copied from {base_dir}/{resources} into the package.
        Files in '__pycache__' folders will be excluded.
        Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.

        Args:
            base_dir (path-like): Root folder of your code from which the resources are referred
            resources (os.PathLike or List): A single path or list of relative paths to resource files

        """
        self.is_valid = False

        base_dir = Path(base_dir).resolve().absolute()
        if not base_dir.is_dir():
            raise AssertionError(f"Parameter 'base_dir' must be a directory and available in path {base_dir}.")
        resources = resources if type(resources) is list else [resources]

        for resource in resources:
            self._add_resource(base_dir, resource)

    def _add_resource(self, base_dir: Path, resource: os.PathLike):
        self.is_valid = False
        if Path(resource).is_absolute() or '..' in resource:
            raise AssertionError("The resource path must be relative and cannot contain '/../' elements.")

        resource_path = base_dir / resource

        if resource_path.is_file():
            self._add_resource_file(base_dir, resource_path)
            return

        if resource_path.is_dir():
            for glob_path in resource_path.rglob("*"):
                if glob_path.is_file():
                    self._add_resource_file(base_dir, glob_path)
            return

        raise AssertionError(f"Specified resource is not a file or directory: '{resource}'")

    def _add_resource_file(self, base_dir: Path, resource_path: Path):
        self.is_valid = False
        for parent in resource_path.parents:
            if parent.name == '__pycache__':
                return

        if resource_path in self.resources.keys():
            _logger.warning(f"Resource '{resource_path}' is already added to target directory '{self.resources[resource_path]}'")
            return

        self.resources[resource_path] = f"{resource_path.parent.relative_to(base_dir)}"

    def add_dependencies(self, packages: list):
        """
        Adds required dependencies for the Python code.

        The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server.
        The method will search for the packages for the target platform and collect their transitive dependencies as well.
        Packages that are distributed only in source format can be added too, but only if they are pure Python packages.

        Args:
            packages (list): Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution
        """
        self.is_valid = False
        self.python_dependencies.add_dependencies(packages)

    def set_requirements(self, requirements_path: os.PathLike, no_deps: bool = False):
        """
        Reads the defined dependencies from the given `requirements.txt` file and creates a new dependency list. Previously added dependencies will be cleared.

        The file format must follow Python's requirements file format defined in PEP 508.
        It can contain URLs to additional repositories in the form of `--extra-index-url=my.repo.example.com`.

        Args:
            requirements_path (str): Path of the given `requirements.txt` file
            no_deps (bool): If set to True, dependencies will be downloaded/installed without dependencies. Default False.
        """
        self.is_valid = False
        self.python_dependencies.set_requirements(requirements_path)
        if no_deps:
            self.no_deps = True
            _logger.warning("WARNING! Requiring packages without their transitive dependencies is only supported on AI Inference Server version >=2.6.0")
            _logger.warning("WARNING! Requiring packages without their transitive dependencies may lead to missing dependencies and runtime errors. Use with caution!")

    def set_pyproject_toml(self, pyproject_path: os.PathLike):
        """
        Reads the defined dependencies from the given `pyproject.toml` file and adds it to the requirements list.

        Only the dependencies defined in the `[project]` section will be added to the component.

        Args:
            pyproject_path (str): Path of the given `pyproject.toml` file
        """
        self.is_valid = False
        self.python_dependencies.set_pyproject_toml(pyproject_path)

    def add_python_packages(self, path: str) -> None:
        """
        Adds Python package(s) to the `PythonPackages.zip` file of the component.

        The `path` parameter can refer to either a `whl`, a `zip` or a `tar.gz` file.
        Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed.
        The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server.
        The method uses the `tempfile.tempdir` folder, so make sure that the folder is writeable.

        The wheel files must fulfill the requirements of the targeted device environment
        (e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).

        Args:
            path (str): Path of the distribution file

        Examples:
            `component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel')`
                adds the wheel file to `PythonPackages.zip` and adds dictionary item `component.dependencies['my_package'] = '0.0.1'`

            `component.add_python_packages('../resources/inference-wheels.zip')`
                adds all the wheel files in the zip to `PythonPackages.zip` and `component.dependencies`
        """
        self.is_valid = False
        self.python_dependencies.add_python_packages(path)

    def set_parallel_steps(self, replicas):
        """
        Sets the number of parallel executors.

        This method configures how many instances of the component can be
        executed at the same time.
        The component must be suitable for parallel execution. The inputs arriving
        to the component will be processed by different instances in parallel,
        and these instances do not share their state (e.g. variables). Every
        instance is initialized separately and receives only a fraction of the inputs.
        AI Inference Server supports at most 8 parallel instances.```

        Args:
            replicas (int): Number of parallel executors. Default is 1.

        Raises:
            ValueError: if the given argument is not a positive integer.
        """
        self.is_valid = False
        if (not isinstance(replicas, int)) or replicas < 1:
            raise ValueError("Replica count must be a positive integer.")
        if 8 < replicas:
            _logger.warning("The current maximum of parallel executors is 8.")
        self._replicas = replicas

    def add_metric(self, name: str, desc: str | None = None):
        """
        Adds a metric that will be automatically used as a pipeline output.

        Args:
            name (str): Name of the metric.
            desc (str): Description of the metric. (optional)
        """
        if "_" not in name:
            raise AssertionError("The metric name must contain at least one underscore")
        if self.metrics is None:
            self.metrics = {}
        if name in self.metrics:
            raise AssertionError(f"Metric '{name}' already exists")
        self.metrics[name] = {}
        if desc is not None:
            self.metrics[name]['desc'] = desc

    def delete_metric(self, name: str):
        """
        Remove a previously added metric.

        Args:
            name (str): Name of the metric to be deleted.
        """
        if name not in self.metrics:
            raise AssertionError(f"Component '{self.name}' has no metric '{name}'")
        self.metrics.pop(name)

    def _to_dict(self):
        runtime = {
            'type': 'python',
            'version': self.python_version,
        }
        if self.no_deps:
            runtime['installTransitiveDependencies'] = False

        component_dict = {
            **super()._to_dict(),
            'version': self.version,
            'entrypoint': f"./{self.entrypoint.name}",
            'hwType': 'CPU',
            'runtime': runtime,
            'replicas': self._replicas
        }

        component_dict["outputType"] += [{
            'name': name,
            'type': 'String',
            'metric': True,
        } for name in self.metrics.keys()]

        return component_dict

    def enable_dependency_optimization(self):
        """
        Allows changing repository URLs to optimize the package size

        Allows the replacement of the `--index-url` argument during `pip download` to download
        CPU runtime optimized dependencies only. Enabling this optimization, the present
        `--index-url` will be prepended to the `--extra-index-url` list, and the Pytorch CPU only repository
        will be set as the `--index-url`.
        A warning message will be printed if the repository URL modification was necessary.
        Some dependencies have both CPU and GPU runtime
        versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
        packaging the additional GPU runtime dependencies just enlarges the package size.
        If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
        `GPURuntimeComponent`.
        """
        self.python_dependencies.enable_dependency_optimization()

    def disable_dependency_optimization(self):
        """
        Disables any modification to repository URLs

        Disables the replacement of the `--index-url` argument during `pip download`.
        This way all `--index-url` or `--extra-index-url` arguments will be preserved if they
        were present in the requirements.txt file.
        Some dependencies have both CPU and GPU runtime
        versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
        packaging the additional GPU runtime dependencies just enlarges the package size.
        A warning message will be printed about the package size if this optimization is disabled and
        the dependency list contains GPU optimized dependencies.
        Disabling this optimization will not allow the component to run on GPU.
        If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
        `GPURuntimeComponent`.
        """
        self.python_dependencies.disable_dependency_optimization()

    def validate(self):
        """
        Validates that the component is ready to be serialized and packaged as part of a pipeline.
        """
        if not self.is_valid:
            if self.entrypoint is None:
                raise AssertionError("Entrypoint must be defined")
            if not any(key.name for key, value in self.resources.items() if key.name == self.entrypoint.name and value == '.'):
                raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")

            if len(self.python_dependencies.dependencies) < 1:
                _logger.warning(f"WARNING! There are no dependencies defined for component '{self.name}'. Please make sure that all necessary dependencies have been added.")

            self.python_dependencies.validate()

            self.is_valid = True

        _logger.info(f"Component '{self.name}' is valid and ready to use.")

    def save(self, destination, validate=True):
        """
        Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
        Validation can be skipped by setting parameter `validate` to False.
        This is useful when the component is already validated and only intended to be saved.

        The component folder contains the following:

        - `requirements.txt` with a list of Python dependencies
        - Entry point script defined by the `entrypoint` attribute of the component
        - Extra files as added to the specified folders
        - `PythonPackages.zip` with the wheel binaries for the environment to be installed

        Args:
            destination (path-like): Target directory to which the component will be saved.
            validate (bool): With value True, triggers component validation. Defaults to True.
        """
        if validate:
            self.validate()

        folder_path = Path(destination) / self.name
        folder_path.mkdir(parents=True, exist_ok=True)

        for file_path in self.resources:
            dir_path = folder_path / self.resources[file_path]
            os.makedirs(dir_path, exist_ok=True)
            shutil.copy(file_path, dir_path / file_path.name)

        self.python_dependencies.save(folder_path)

__init__(name='inference', version='0.0.1', python_version='3.12', desc='')

Creates a new, empty Python component.

Parameters:

NameTypeDescriptionDefault
namestr

Component name. (default: inference)

'inference'
descstr

Component description (optional)

''
versionstr

Component version. (default: 0.0.1)

'0.0.1'
python_versionstr

Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12.

'3.12'
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def __init__(self, name: str = "inference", version: str = "0.0.1", python_version: str = '3.12', desc: str = ""):
    """
    Creates a new, empty Python component.

    Args:
        name (str): Component name. (default: inference)
        desc (str): Component description (optional)
        version (str): Component version. (default: 0.0.1)
        python_version (str): Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12.
    """

    super().__init__(name=name, desc=desc)

    try:
        python_version_validator(python_version)
    except ValueError as error:
        raise AssertionError(error)

    self.python_version = str(python_version)
    self.version = version
    self.metrics = {}
    self.entrypoint: Path | None = None
    self.resources = {}
    self.python_dependencies = PythonDependencies(python_version)
    self._replicas = 1
    self.is_valid = False
    self.no_deps = False

set_entrypoint(entrypoint)

Sets the entrypoint module for the component.

The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server. The script should consume a JSON string and produce another. See the short example below.

The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.

The example code below shows a basic structure of the entrypoint Python code.

import json
import sys
from pathlib import Path

# by adding the parent folder of your modules to system path makes them available for relative import
sys.path.insert(0, str(Path('./src').resolve()))
from my_module import processor  # then the processor module can be imported

def run(data: str):
    input_data = json.loads(data)  # incoming JSON string is loaded as a dictionary

    result = processor.process_data(input_data)  # the process_data can be called to process the incoming data

    # the code below creates the formatted output for the AI Inference Server
    if result is None:
        answer = {"ready": False, "output": None}
    else:
        answer = {"ready": True, "output": json.dumps(result)}

    return answer

Parameters:

NameTypeDescriptionDefault
entrypointstr

Name of the new entrypoint script to be copied

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_entrypoint(self, entrypoint: str):
    """
    Sets the entrypoint module for the component.

    The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server.
    The script should consume a JSON string and produce another. See the short example below.

    The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.

    The example code below shows a basic structure of the entrypoint Python code.
    ```python
    import json
    import sys
    from pathlib import Path

    # by adding the parent folder of your modules to system path makes them available for relative import
    sys.path.insert(0, str(Path('./src').resolve()))
    from my_module import processor  # then the processor module can be imported

    def run(data: str):
        input_data = json.loads(data)  # incoming JSON string is loaded as a dictionary

        result = processor.process_data(input_data)  # the process_data can be called to process the incoming data

        # the code below creates the formatted output for the AI Inference Server
        if result is None:
            answer = {"ready": False, "output": None}
        else:
            answer = {"ready": True, "output": json.dumps(result)}

        return answer
    ```

    Args:
        entrypoint (str): Name of the new entrypoint script to be copied

    """
    self.is_valid = False

    if not any(key.name for key, value in self.resources.items() if key.name == entrypoint and value == '.'):
        raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")

    self.entrypoint = Path(entrypoint)

add_resources(base_dir, resources)

Adds files to a component.

To make your file resources available on the AI Inference Server you need to add them to the package resources. These resources can be Python or config files, serialized ML models or reference data. They are then available on path {component_root}/{resources} in the runtime environment. When saving the package they will be copied from {base_dir}/{resources} into the package. Files in 'pycache' folders will be excluded. Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.

Parameters:

NameTypeDescriptionDefault
base_dirpath - like

Root folder of your code from which the resources are referred

required
resourcesPathLike or List

A single path or list of relative paths to resource files

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_resources(self, base_dir: os.PathLike, resources: os.PathLike | list):
    """
    Adds files to a component.

    To make your file resources available on the AI Inference Server you need to add them to the package resources.
    These resources can be Python or config files, serialized ML models or reference data.
    They are then available on path {component_root}/{resources} in the runtime environment.
    When saving the package they will be copied from {base_dir}/{resources} into the package.
    Files in '__pycache__' folders will be excluded.
    Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.

    Args:
        base_dir (path-like): Root folder of your code from which the resources are referred
        resources (os.PathLike or List): A single path or list of relative paths to resource files

    """
    self.is_valid = False

    base_dir = Path(base_dir).resolve().absolute()
    if not base_dir.is_dir():
        raise AssertionError(f"Parameter 'base_dir' must be a directory and available in path {base_dir}.")
    resources = resources if type(resources) is list else [resources]

    for resource in resources:
        self._add_resource(base_dir, resource)

add_dependencies(packages)

Adds required dependencies for the Python code.

The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server. The method will search for the packages for the target platform and collect their transitive dependencies as well. Packages that are distributed only in source format can be added too, but only if they are pure Python packages.

Parameters:

NameTypeDescriptionDefault
packageslist

Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_dependencies(self, packages: list):
    """
    Adds required dependencies for the Python code.

    The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server.
    The method will search for the packages for the target platform and collect their transitive dependencies as well.
    Packages that are distributed only in source format can be added too, but only if they are pure Python packages.

    Args:
        packages (list): Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution
    """
    self.is_valid = False
    self.python_dependencies.add_dependencies(packages)

set_requirements(requirements_path, no_deps=False)

Reads the defined dependencies from the given requirements.txt file and creates a new dependency list. Previously added dependencies will be cleared.

The file format must follow Python's requirements file format defined in PEP 508. It can contain URLs to additional repositories in the form of --extra-index-url=my.repo.example.com.

Parameters:

NameTypeDescriptionDefault
requirements_pathstr

Path of the given requirements.txt file

required
no_depsbool

If set to True, dependencies will be downloaded/installed without dependencies. Default False.

False
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_requirements(self, requirements_path: os.PathLike, no_deps: bool = False):
    """
    Reads the defined dependencies from the given `requirements.txt` file and creates a new dependency list. Previously added dependencies will be cleared.

    The file format must follow Python's requirements file format defined in PEP 508.
    It can contain URLs to additional repositories in the form of `--extra-index-url=my.repo.example.com`.

    Args:
        requirements_path (str): Path of the given `requirements.txt` file
        no_deps (bool): If set to True, dependencies will be downloaded/installed without dependencies. Default False.
    """
    self.is_valid = False
    self.python_dependencies.set_requirements(requirements_path)
    if no_deps:
        self.no_deps = True
        _logger.warning("WARNING! Requiring packages without their transitive dependencies is only supported on AI Inference Server version >=2.6.0")
        _logger.warning("WARNING! Requiring packages without their transitive dependencies may lead to missing dependencies and runtime errors. Use with caution!")

set_pyproject_toml(pyproject_path)

Reads the defined dependencies from the given pyproject.toml file and adds it to the requirements list.

Only the dependencies defined in the [project] section will be added to the component.

Parameters:

NameTypeDescriptionDefault
pyproject_pathstr

Path of the given pyproject.toml file

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_pyproject_toml(self, pyproject_path: os.PathLike):
    """
    Reads the defined dependencies from the given `pyproject.toml` file and adds it to the requirements list.

    Only the dependencies defined in the `[project]` section will be added to the component.

    Args:
        pyproject_path (str): Path of the given `pyproject.toml` file
    """
    self.is_valid = False
    self.python_dependencies.set_pyproject_toml(pyproject_path)

add_python_packages(path)

Adds Python package(s) to the PythonPackages.zip file of the component.

The path parameter can refer to either a whl, a zip or a tar.gz file. Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed. The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server. The method uses the tempfile.tempdir folder, so make sure that the folder is writeable.

The wheel files must fulfill the requirements of the targeted device environment (e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).

Parameters:

NameTypeDescriptionDefault
pathstr

Path of the distribution file

required

Examples:

component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel') adds the wheel file to PythonPackages.zip and adds dictionary item component.dependencies['my_package'] = '0.0.1'

component.add_python_packages('../resources/inference-wheels.zip') adds all the wheel files in the zip to PythonPackages.zip and component.dependencies

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_python_packages(self, path: str) -> None:
    """
    Adds Python package(s) to the `PythonPackages.zip` file of the component.

    The `path` parameter can refer to either a `whl`, a `zip` or a `tar.gz` file.
    Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed.
    The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server.
    The method uses the `tempfile.tempdir` folder, so make sure that the folder is writeable.

    The wheel files must fulfill the requirements of the targeted device environment
    (e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).

    Args:
        path (str): Path of the distribution file

    Examples:
        `component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel')`
            adds the wheel file to `PythonPackages.zip` and adds dictionary item `component.dependencies['my_package'] = '0.0.1'`

        `component.add_python_packages('../resources/inference-wheels.zip')`
            adds all the wheel files in the zip to `PythonPackages.zip` and `component.dependencies`
    """
    self.is_valid = False
    self.python_dependencies.add_python_packages(path)

set_parallel_steps(replicas)

Sets the number of parallel executors.

This method configures how many instances of the component can be executed at the same time. The component must be suitable for parallel execution. The inputs arriving to the component will be processed by different instances in parallel, and these instances do not share their state (e.g. variables). Every instance is initialized separately and receives only a fraction of the inputs. AI Inference Server supports at most 8 parallel instances.```

Parameters:

NameTypeDescriptionDefault
replicasint

Number of parallel executors. Default is 1.

required

Raises:

TypeDescription
ValueError

if the given argument is not a positive integer.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_parallel_steps(self, replicas):
    """
    Sets the number of parallel executors.

    This method configures how many instances of the component can be
    executed at the same time.
    The component must be suitable for parallel execution. The inputs arriving
    to the component will be processed by different instances in parallel,
    and these instances do not share their state (e.g. variables). Every
    instance is initialized separately and receives only a fraction of the inputs.
    AI Inference Server supports at most 8 parallel instances.```

    Args:
        replicas (int): Number of parallel executors. Default is 1.

    Raises:
        ValueError: if the given argument is not a positive integer.
    """
    self.is_valid = False
    if (not isinstance(replicas, int)) or replicas < 1:
        raise ValueError("Replica count must be a positive integer.")
    if 8 < replicas:
        _logger.warning("The current maximum of parallel executors is 8.")
    self._replicas = replicas

add_metric(name, desc=None)

Adds a metric that will be automatically used as a pipeline output.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the metric.

required
descstr

Description of the metric. (optional)

None
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_metric(self, name: str, desc: str | None = None):
    """
    Adds a metric that will be automatically used as a pipeline output.

    Args:
        name (str): Name of the metric.
        desc (str): Description of the metric. (optional)
    """
    if "_" not in name:
        raise AssertionError("The metric name must contain at least one underscore")
    if self.metrics is None:
        self.metrics = {}
    if name in self.metrics:
        raise AssertionError(f"Metric '{name}' already exists")
    self.metrics[name] = {}
    if desc is not None:
        self.metrics[name]['desc'] = desc

delete_metric(name)

Remove a previously added metric.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the metric to be deleted.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def delete_metric(self, name: str):
    """
    Remove a previously added metric.

    Args:
        name (str): Name of the metric to be deleted.
    """
    if name not in self.metrics:
        raise AssertionError(f"Component '{self.name}' has no metric '{name}'")
    self.metrics.pop(name)

enable_dependency_optimization()

Allows changing repository URLs to optimize the package size

Allows the replacement of the --index-url argument during pip download to download CPU runtime optimized dependencies only. Enabling this optimization, the present --index-url will be prepended to the --extra-index-url list, and the Pytorch CPU only repository will be set as the --index-url. A warning message will be printed if the repository URL modification was necessary. Some dependencies have both CPU and GPU runtime versions, pytorch for example, but a PythonComponent can only run on CPU, so packaging the additional GPU runtime dependencies just enlarges the package size. If you want to run your model on GPU, convert it to an ONNX model and use it within a GPURuntimeComponent.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def enable_dependency_optimization(self):
    """
    Allows changing repository URLs to optimize the package size

    Allows the replacement of the `--index-url` argument during `pip download` to download
    CPU runtime optimized dependencies only. Enabling this optimization, the present
    `--index-url` will be prepended to the `--extra-index-url` list, and the Pytorch CPU only repository
    will be set as the `--index-url`.
    A warning message will be printed if the repository URL modification was necessary.
    Some dependencies have both CPU and GPU runtime
    versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
    packaging the additional GPU runtime dependencies just enlarges the package size.
    If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
    `GPURuntimeComponent`.
    """
    self.python_dependencies.enable_dependency_optimization()

disable_dependency_optimization()

Disables any modification to repository URLs

Disables the replacement of the --index-url argument during pip download. This way all --index-url or --extra-index-url arguments will be preserved if they were present in the requirements.txt file. Some dependencies have both CPU and GPU runtime versions, pytorch for example, but a PythonComponent can only run on CPU, so packaging the additional GPU runtime dependencies just enlarges the package size. A warning message will be printed about the package size if this optimization is disabled and the dependency list contains GPU optimized dependencies. Disabling this optimization will not allow the component to run on GPU. If you want to run your model on GPU, convert it to an ONNX model and use it within a GPURuntimeComponent.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def disable_dependency_optimization(self):
    """
    Disables any modification to repository URLs

    Disables the replacement of the `--index-url` argument during `pip download`.
    This way all `--index-url` or `--extra-index-url` arguments will be preserved if they
    were present in the requirements.txt file.
    Some dependencies have both CPU and GPU runtime
    versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
    packaging the additional GPU runtime dependencies just enlarges the package size.
    A warning message will be printed about the package size if this optimization is disabled and
    the dependency list contains GPU optimized dependencies.
    Disabling this optimization will not allow the component to run on GPU.
    If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
    `GPURuntimeComponent`.
    """
    self.python_dependencies.disable_dependency_optimization()

validate()

Validates that the component is ready to be serialized and packaged as part of a pipeline.

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def validate(self):
    """
    Validates that the component is ready to be serialized and packaged as part of a pipeline.
    """
    if not self.is_valid:
        if self.entrypoint is None:
            raise AssertionError("Entrypoint must be defined")
        if not any(key.name for key, value in self.resources.items() if key.name == self.entrypoint.name and value == '.'):
            raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")

        if len(self.python_dependencies.dependencies) < 1:
            _logger.warning(f"WARNING! There are no dependencies defined for component '{self.name}'. Please make sure that all necessary dependencies have been added.")

        self.python_dependencies.validate()

        self.is_valid = True

    _logger.info(f"Component '{self.name}' is valid and ready to use.")

save(destination, validate=True)

Saves the component to a folder structure, so it can be used as part of a pipeline configuration package. Validation can be skipped by setting parameter validate to False. This is useful when the component is already validated and only intended to be saved.

The component folder contains the following:

  • requirements.txt with a list of Python dependencies
  • Entry point script defined by the entrypoint attribute of the component
  • Extra files as added to the specified folders
  • PythonPackages.zip with the wheel binaries for the environment to be installed

Parameters:

NameTypeDescriptionDefault
destinationpath - like

Target directory to which the component will be saved.

required
validatebool

With value True, triggers component validation. Defaults to True.

True
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def save(self, destination, validate=True):
    """
    Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
    Validation can be skipped by setting parameter `validate` to False.
    This is useful when the component is already validated and only intended to be saved.

    The component folder contains the following:

    - `requirements.txt` with a list of Python dependencies
    - Entry point script defined by the `entrypoint` attribute of the component
    - Extra files as added to the specified folders
    - `PythonPackages.zip` with the wheel binaries for the environment to be installed

    Args:
        destination (path-like): Target directory to which the component will be saved.
        validate (bool): With value True, triggers component validation. Defaults to True.
    """
    if validate:
        self.validate()

    folder_path = Path(destination) / self.name
    folder_path.mkdir(parents=True, exist_ok=True)

    for file_path in self.resources:
        dir_path = folder_path / self.resources[file_path]
        os.makedirs(dir_path, exist_ok=True)
        shutil.copy(file_path, dir_path / file_path.name)

    self.python_dependencies.save(folder_path)

python_version_validator(version)

Checks if Python version string is valid and describes supported version.

Only version 3.11 and 3.12 is supported. A patch version is optional and accepted but logs a warning.

Accepted syntaxes are:

- {major}.{minor}
- {major}.{minor}.{patch}

Parameters:

NameTypeDescriptionDefault
versionstr

Python version string

required

Raises:

TypeDescription
ValueError

if the provided version is not supported

Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def python_version_validator(version: str):
    """
    Checks if Python version string is valid and describes supported version.

    Only version 3.11 and 3.12 is supported. A patch version is optional and accepted but logs a warning.

    Accepted syntaxes are:

        - {major}.{minor}
        - {major}.{minor}.{patch}

    Args:
        version (str): Python version string

    Raises:
        ValueError: if the provided version is not supported
    """

    error_message = "The defined python version is not supported. Currently supported Python versions are 3.11 and 3.12. Python version must be specified only with major and minor version, e.g. '3.11'."

    warning_message = """Required Python version was specified with patch version.
                Please note that the patch digit of the required Python version is often not taken into account by the Python ecosystem,
                so there is no guarantee it has the desired effect."""

    match str(version).split('.'):
        case ('3', '11' | '12'):
            return  # no patch version, so no warning
        case ('3', '11' | '12', _):
            _logger.warning(warning_message)
        case _:
            raise ValueError(error_message)