deploy¶
Pipeline packaging.
This module contains classes and functionality for creating and validating pipeline configuration packages.
Component ¶
Base class for pipeline components, with name, description, and a list of inputs and outputs.
A new component is created with the given name and an empty input and output list.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/component.pyclass Component:
"""Base class for pipeline components, with name, description, and a list of inputs and outputs.
A new component is created with the given name and an empty input and output list.
"""
reserved_names = ["timestamp"]
@dataclass
class BatchInfo:
"""
Batch information for the component.
This attribute specifies whether the component can handle batch input or output data.
When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary.
It is important to note that the input and output variables on the component should still be defined as if they are single variables.
If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.
"""
inputBatch: bool = False
outputBatch: bool = False
def dict(self):
return {
'inputBatch': 'Yes' if self.inputBatch is True else 'No',
'outputBatch': 'Yes' if self.outputBatch is True else 'No'
}
def __init__(self, name: str, desc: str = ""):
"""
Creates a new component with the given name and an empty input and output list.
Args:
name (str): Name of the component.
desc (str): Optional description of the component
"""
if _allowed_characters_in_names.match(name) is None:
raise AssertionError("Component name contains invalid character. The allowed characters are [-a-zA-Z0-9_].")
self.name = name
self.desc = desc
self.inputs = {}
self.outputs = {}
self.batch = self.BatchInfo(False, False)
def __repr__(self) -> str:
text = f"[{self.__class__.__name__}] {self.name}\n"
if self.desc != "":
text += f"{self.desc}\n"
if len(self.inputs) > 0:
text += "\nComponent Inputs:\n"
for name, input in self.inputs.items():
text += f"> {name} ({input['type']}){': ' + input['desc'] if input.get('desc') is not None else ''}\n"
if len(self.outputs) > 0:
text += "\nComponent Outputs:\n"
for name, output in self.outputs.items():
text += f"< {name} ({output['type']}){': ' + output['desc'] if output.get('desc') is not None else ''}\n"
return text
def add_input(self, name: str, _type: str, desc: Optional[str] = None):
"""
Adds a new input to the component with its type.
Name of the variables cannot be reserved name like 'timestamp'.
Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.
Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
In case the type is not known by the AI SDK, a warning message will be printed.
The most frequently used types are
- String:
Typically used for data received from Databus
- Object:
Object type variables are designed to receive from Vision Connect or transfer images between components
- Numeric scalar types:
Typically used for data received from S7 Connector
The example payload below shows the format of image received from VCA Connector
```python
payload = { "image":
{
"resolutionWidth": image.width,
"resolutionHeight": image.height,
"mimeType": ["image/raw"],
"dataType": "uint8",
"channelsPerPixel": 3,
"image": _swap_bytes(image.tobytes())
}
}
```
Between components the format is the same format as the format of Object as an output.
```python
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
```
Args:
name (str): Name of the new input.
_type (str): Type of the new input.
desc (str): Description of the input. (optional)
"""
if self.inputs is None:
self.inputs = {}
if name in self.inputs:
raise AssertionError(f"Input '{name}' already exists.")
if name.lower() in self.reserved_names:
raise AssertionError(f"Input '{name}' is a reserved keyword.")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.inputs[name] = {
"type": _type,
}
if desc is not None:
self.inputs[name]['desc'] = desc
def change_input(self, name: str, _type: str, desc: Optional[str] = None):
"""
Changes one of the inputs of the component.
Args:
name (str): Name of the input to be changed.
_type (str): New type of the input.
desc (str): Description of the input. (optional)
"""
if name not in self.inputs:
raise AssertionError(f"There is no input with name '{name}'")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.inputs[name]['type'] = _type
if desc is not None:
self.inputs[name]['desc'] = desc
def delete_input(self, name: str):
"""
Deletes an input from the component by name.
Once the package has been created with the given component, it is recommended not to change the component directly.
Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
It is recommended to use `package.delete_input_wire(...)` with default parameter `with_input=True`.
Args:
name (str): Name of the input to be deleted.
"""
if name not in self.inputs:
raise AssertionError(f"Component '{self.name}' has no input '{name}'")
self.inputs.pop(name)
def add_output(self, name: str, _type: str, desc: Optional[str] = None):
"""
Adds a new output to the component.
Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
In case the type is not known by the AI SDK, a warning message will be printed.
The most frequently used types are
- String:
Typically used for data to be sent to Databus
- Object:
Typically used for images to be sent to ZMQ Connector
- Numeric scalar types:
Typically used for data sent to S7 Connector
For outputs of type `Object` the entrypoint must return with a `dictionary` containing two fields, where one field has type `str` and the other field has type `bytes`.
The example below shows the required format, assuming that 'image' is a PIL Image.
```python
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
```
Args:
name (str): Name of the new output.
_type (str): Type of the new output.
desc (str): Description of the output. (optional)
"""
if self.outputs is None:
self.outputs = {}
if name in self.outputs:
raise AssertionError(f"Output '{name}' already exists")
if name.lower() in self.reserved_names:
raise AssertionError(f"Output '{name}' is a reserved keyword.")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.outputs[name] = {
"type": _type,
}
if desc is not None:
self.outputs[name]['desc'] = desc
def change_output(self, name: str, _type: str, desc: Optional[str] = None):
"""
Changes one of the outputs of the component.
Args:
name (str): Name of the output to be changed.
_type (str): The new type of the output.
desc (str): Description of the output. (optional)
"""
if name not in self.outputs:
raise AssertionError(f"There is no output with name '{name}'")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.outputs[name]['type'] = _type
if desc is not None:
self.outputs[name]['desc'] = desc
def delete_output(self, name: str):
"""
Deletes an output from the component by name.
Once the package has been created with the given component, it is recommended not to change the component directly.
Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
Deleting an output which is represented in any wire will cause package inconsistency.
Args:
name (str): Name of the output to be deleted.
"""
if name not in self.outputs:
raise AssertionError(f"Component '{self.name}' has no output '{name}'")
self.outputs.pop(name)
def _to_dict(self):
inputs = []
inputs += [{
'name': name,
'type': self.inputs[name]['type'],
} for name in self.inputs]
outputs = []
outputs += [{
'name': name,
'type': self.outputs[name]['type'],
'metric': False,
} for name in self.outputs]
return {
'name': self.name,
'description': self.desc,
'batch': self.batch.dict(),
'inputType': inputs,
'outputType': outputs,
}
def validate(self):
"""
Empty method for child classess to implement.
"""
pass
def save(self, destination, validate):
"""
Empty method for child classess to implement.
"""
pass
BatchInfo dataclass ¶
Batch information for the component.
This attribute specifies whether the component can handle batch input or output data. When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary. It is important to note that the input and output variables on the component should still be defined as if they are single variables.
If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/component.py@dataclass
class BatchInfo:
"""
Batch information for the component.
This attribute specifies whether the component can handle batch input or output data.
When set to True, the component will receive data in the form of a list of dictionaries instead of a single dictionary.
It is important to note that the input and output variables on the component should still be defined as if they are single variables.
If the input of the pipeline is configured for batch processing, it is recommended not to configure timeshifting, as the list will have the same timestamp for all elements, potentially resulting in data loss.
"""
inputBatch: bool = False
outputBatch: bool = False
def dict(self):
return {
'inputBatch': 'Yes' if self.inputBatch is True else 'No',
'outputBatch': 'Yes' if self.outputBatch is True else 'No'
}
__init__(name, desc='') ¶
Creates a new component with the given name and an empty input and output list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the component. | required |
desc | str | Optional description of the component | '' |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def __init__(self, name: str, desc: str = ""):
"""
Creates a new component with the given name and an empty input and output list.
Args:
name (str): Name of the component.
desc (str): Optional description of the component
"""
if _allowed_characters_in_names.match(name) is None:
raise AssertionError("Component name contains invalid character. The allowed characters are [-a-zA-Z0-9_].")
self.name = name
self.desc = desc
self.inputs = {}
self.outputs = {}
self.batch = self.BatchInfo(False, False)
add_input(name, _type, desc=None) ¶
Adds a new input to the component with its type. Name of the variables cannot be reserved name like 'timestamp'. Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.
Types supported by AI Inference Server version 1.6 are contained in the type_dictionary. Newer AI Inference server version may support additional types. In case the type is not known by the AI SDK, a warning message will be printed. The most frequently used types are - String: Typically used for data received from Databus - Object: Object type variables are designed to receive from Vision Connect or transfer images between components - Numeric scalar types: Typically used for data received from S7 Connector
The example payload below shows the format of image received from VCA Connector
payload = { "image":
{
"resolutionWidth": image.width,
"resolutionHeight": image.height,
"mimeType": ["image/raw"],
"dataType": "uint8",
"channelsPerPixel": 3,
"image": _swap_bytes(image.tobytes())
}
}
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the new input. | required |
_type | str | Type of the new input. | required |
desc | str | Description of the input. (optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def add_input(self, name: str, _type: str, desc: Optional[str] = None):
"""
Adds a new input to the component with its type.
Name of the variables cannot be reserved name like 'timestamp'.
Input variable 'timestamp' is a prebuilt key in the payload and its value contains the timestamp when the payload is created by AI Inference Server.
Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
In case the type is not known by the AI SDK, a warning message will be printed.
The most frequently used types are
- String:
Typically used for data received from Databus
- Object:
Object type variables are designed to receive from Vision Connect or transfer images between components
- Numeric scalar types:
Typically used for data received from S7 Connector
The example payload below shows the format of image received from VCA Connector
```python
payload = { "image":
{
"resolutionWidth": image.width,
"resolutionHeight": image.height,
"mimeType": ["image/raw"],
"dataType": "uint8",
"channelsPerPixel": 3,
"image": _swap_bytes(image.tobytes())
}
}
```
Between components the format is the same format as the format of Object as an output.
```python
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
```
Args:
name (str): Name of the new input.
_type (str): Type of the new input.
desc (str): Description of the input. (optional)
"""
if self.inputs is None:
self.inputs = {}
if name in self.inputs:
raise AssertionError(f"Input '{name}' already exists.")
if name.lower() in self.reserved_names:
raise AssertionError(f"Input '{name}' is a reserved keyword.")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.inputs[name] = {
"type": _type,
}
if desc is not None:
self.inputs[name]['desc'] = desc
change_input(name, _type, desc=None) ¶
Changes one of the inputs of the component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the input to be changed. | required |
_type | str | New type of the input. | required |
desc | str | Description of the input. (optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def change_input(self, name: str, _type: str, desc: Optional[str] = None):
"""
Changes one of the inputs of the component.
Args:
name (str): Name of the input to be changed.
_type (str): New type of the input.
desc (str): Description of the input. (optional)
"""
if name not in self.inputs:
raise AssertionError(f"There is no input with name '{name}'")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for input variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.inputs[name]['type'] = _type
if desc is not None:
self.inputs[name]['desc'] = desc
delete_input(name) ¶
Deletes an input from the component by name. Once the package has been created with the given component, it is recommended not to change the component directly. Instead, all necessary methods to change it are available through the package to avoid component inconsistencies. It is recommended to use package.delete_input_wire(...) with default parameter with_input=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the input to be deleted. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def delete_input(self, name: str):
"""
Deletes an input from the component by name.
Once the package has been created with the given component, it is recommended not to change the component directly.
Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
It is recommended to use `package.delete_input_wire(...)` with default parameter `with_input=True`.
Args:
name (str): Name of the input to be deleted.
"""
if name not in self.inputs:
raise AssertionError(f"Component '{self.name}' has no input '{name}'")
self.inputs.pop(name)
add_output(name, _type, desc=None) ¶
Adds a new output to the component.
Types supported by AI Inference Server version 1.6 are contained in the type_dictionary. Newer AI Inference server version may support additional types. In case the type is not known by the AI SDK, a warning message will be printed. The most frequently used types are - String: Typically used for data to be sent to Databus - Object: Typically used for images to be sent to ZMQ Connector - Numeric scalar types: Typically used for data sent to S7 Connector
For outputs of type Object the entrypoint must return with a dictionary containing two fields, where one field has type str and the other field has type bytes. The example below shows the required format, assuming that 'image' is a PIL Image.
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the new output. | required |
_type | str | Type of the new output. | required |
desc | str | Description of the output. (optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def add_output(self, name: str, _type: str, desc: Optional[str] = None):
"""
Adds a new output to the component.
Types supported by AI Inference Server version 1.6 are contained in the `type_dictionary`. Newer AI Inference server version may support additional types.
In case the type is not known by the AI SDK, a warning message will be printed.
The most frequently used types are
- String:
Typically used for data to be sent to Databus
- Object:
Typically used for images to be sent to ZMQ Connector
- Numeric scalar types:
Typically used for data sent to S7 Connector
For outputs of type `Object` the entrypoint must return with a `dictionary` containing two fields, where one field has type `str` and the other field has type `bytes`.
The example below shows the required format, assuming that 'image' is a PIL Image.
```python
"processedImage": {
"metadata": json.dumps( {
"resolutionWidth": image.width,
"resolutionHeight": image.height
}
),
"bytes": image.tobytes()
}
```
Args:
name (str): Name of the new output.
_type (str): Type of the new output.
desc (str): Description of the output. (optional)
"""
if self.outputs is None:
self.outputs = {}
if name in self.outputs:
raise AssertionError(f"Output '{name}' already exists")
if name.lower() in self.reserved_names:
raise AssertionError(f"Output '{name}' is a reserved keyword.")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.outputs[name] = {
"type": _type,
}
if desc is not None:
self.outputs[name]['desc'] = desc
change_output(name, _type, desc=None) ¶
Changes one of the outputs of the component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the output to be changed. | required |
_type | str | The new type of the output. | required |
desc | str | Description of the output. (optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def change_output(self, name: str, _type: str, desc: Optional[str] = None):
"""
Changes one of the outputs of the component.
Args:
name (str): Name of the output to be changed.
_type (str): The new type of the output.
desc (str): Description of the output. (optional)
"""
if name not in self.outputs:
raise AssertionError(f"There is no output with name '{name}'")
flags = type_validation_flags(_type)
if flags:
flags_str = ", ".join([f.value for f in flags])
_logger.warning(f"WARNING! Type `{_type}` for output variable `{name}` has the following flags: {flags_str}. Please check if the target Inference Server supports this type.")
self.outputs[name]['type'] = _type
if desc is not None:
self.outputs[name]['desc'] = desc
delete_output(name) ¶
Deletes an output from the component by name. Once the package has been created with the given component, it is recommended not to change the component directly. Instead, all necessary methods to change it are available through the package to avoid component inconsistencies. Deleting an output which is represented in any wire will cause package inconsistency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the output to be deleted. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/component.py
def delete_output(self, name: str):
"""
Deletes an output from the component by name.
Once the package has been created with the given component, it is recommended not to change the component directly.
Instead, all necessary methods to change it are available through the package to avoid component inconsistencies.
Deleting an output which is represented in any wire will cause package inconsistency.
Args:
name (str): Name of the output to be deleted.
"""
if name not in self.outputs:
raise AssertionError(f"Component '{self.name}' has no output '{name}'")
self.outputs.pop(name)
validate() ¶
Empty method for child classess to implement.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/component.pydef validate(self):
"""
Empty method for child classess to implement.
"""
pass
save(destination, validate) ¶
Empty method for child classess to implement.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/component.pydef save(self, destination, validate):
"""
Empty method for child classess to implement.
"""
pass
GPURuntimeComponent ¶
Bases: Component
The GPURuntimeComponent is used to define a component that runs on a GPU device. The component works only with ONNX models and can be used in an Inference Pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
name | str | Component name. |
version | str | Component version. |
desc | str | Component description. |
Methods:
| Name | Description |
|---|---|
use_model | Path | str, max_batch_size: int, optimization: Optional[model_config.TensorRTOptimization] = None, warmup: model_config.Warmup = None): Add an ONNX model file for the component. |
use_config | Path | str): Use a custom config.pbtxt file instead of the autogenerated one. |
save | Path | str, validate = False): Saves the component to a folder structure, so it can be used as part of a pipeline configuration package. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
class GPURuntimeComponent(Component):
"""
The GPURuntimeComponent is used to define a component that runs on a GPU device.
The component works only with ONNX models and can be used in an Inference Pipeline.
Attributes:
name (str): Component name.
version (str): Component version.
desc (str): Component description.
Methods:
use_model(self, path: Path | str, max_batch_size: int, optimization: Optional[model_config.TensorRTOptimization] = None, warmup: model_config.Warmup = None): Add an ONNX model file for the component.
use_config(self, path: Path | str): Use a custom config.pbtxt file instead of the autogenerated one.
save(self, destination: Path | str, validate = False): Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
"""
def __init__(self, name: str = "inference", version: str = "1", desc: str = ""):
"""
Creates a new, empty GPU Runtime component.
Args:
name (str): Component name. (default: inference)
version (str): Component version. (default: 1)
desc (str): Component description (optional)
"""
super().__init__(name=name, desc=desc)
self.version: str = version
self.entrypoint: Path | None = None
self.model_path: Path | None = None
self.model_version: str = "1"
self.config: Path | None = None
self.auto_config = None
self.warmup: bool = False
self.optimization: bool = False
def _to_dict(self):
return {
**super()._to_dict(),
'version': self.version,
'entrypoint': f"{self.model_version}/{self.entrypoint.name}",
'hwType': 'GPU',
'warmup': self.warmup,
'optimization': self.optimization,
'runtime': {
'type': 'gpuruntime',
'version': '0.1.0',
}
}
def use_model(self, path: Path | str,
max_batch_size: int,
optimization: model_config.TensorRTOptimization | None = None,
warmup: model_config.Warmup | bool | None = None):
"""
Add the ONNX model file for the component.
Args:
path (Path | str): The path to the ONNX model file.
max_batch_size (int): The maximum batch size for the model.
optimization (model_config.TensorRTOptimization, optional): The optimization configuration for the model. Defaults to None.
warmup (model_config.Warmup, optional): The warmup configuration for the model. Defaults to None.
Raises:
AssertionError: If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0.
"""
path = Path(path)
if not path.is_file():
raise AssertionError(f"specified model file not found: '{path}'")
if path.suffix != ".onnx":
raise AssertionError(f"model file extension is not '.onnx': '{path}'")
if max_batch_size < 0:
raise AssertionError("max_batch_size must be greater or equal to 0")
self.entrypoint = Path("model.onnx")
self.model_path = path
if self.config is not None:
_logger.warning("Previously added configuration was removed. Component will use the default configuration unless you specify your own.")
self.config = None
# Removing old automatic variables
if self.auto_config is not None:
for var in self.auto_config.inputs:
self.delete_input(var["name"])
for var in self.auto_config.outputs:
self.delete_output(var["name"])
model_warmup = warmup
if isinstance(warmup, bool):
self.warmup = warmup
model_warmup = model_config.Warmup.ZERO_DATA if warmup else model_config.Warmup.DISABLED
elif isinstance(warmup, model_config.Warmup):
self.warmup = warmup != model_config.Warmup.DISABLED
else:
self.warmup = False
model_warmup = model_config.Warmup.DISABLED
self.optimization = isinstance(optimization, model_config.TensorRTOptimization)
self.auto_config = model_config.ModelConfig(onnx_path=path,
max_batch_size=max_batch_size,
warmup=model_warmup,
optimization=optimization)
for var in self.auto_config.inputs:
self.add_input(var["name"], var["type"])
for var in self.auto_config.outputs:
self.add_output(var["name"], var["type"])
def use_config(self, path: Path | str):
"""
Sets the configuration file to be used for inference.
Intended usage is to use a custom configuration file instead of the autogenerated one.
This way extra configurations can be added to the component, such as the execution accelerator.
Args:
path (Path | str): The path to the configuration file.
Raises:
AssertionError: If the specified config file is not found or has an invalid extension.
"""
path = Path(path)
if not path.is_file():
raise AssertionError(f"specified config file not found: '{path}'")
if path.suffix != ".pbtxt":
raise AssertionError(f"config file extension is not '.pbtxt': '{path}'")
config_data = _validate_and_load_gpuruntime_config(path)
self.warmup = (0 < len(config_data.model_warmup))
self.optimization = (0 < len(f"{config_data.optimization}"))
self.config = path
def save(self, destination: Path | str, validate = False):
"""
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
The component folder contains the following:
- An `.onnx` model file
- A `.pbtxt` configuration file
Args:
destination (path-like): Target directory to which the component will be saved.
"""
if self.entrypoint is None:
raise AssertionError("An ONNX model file must be specified before the component can be saved.")
component_dir = Path(destination) / self.name
component_dir.mkdir(parents = True, exist_ok = True)
model_dir = component_dir / self.model_version
model_dir.mkdir(exist_ok = True)
shutil.copy(str(self.model_path), str(model_dir / "model.onnx"))
if self.config is None:
_logger.warning("Configuration was not specified. Model will be saved with default configuration.")
(component_dir / "config.pbtxt").write_text(f"{self.auto_config}")
else:
shutil.copy(self.config, component_dir)
__init__(name='inference', version='1', desc='') ¶
Creates a new, empty GPU Runtime component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Component name. (default: inference) | 'inference' |
version | str | Component version. (default: 1) | '1' |
desc | str | Component description (optional) | '' |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def __init__(self, name: str = "inference", version: str = "1", desc: str = ""):
"""
Creates a new, empty GPU Runtime component.
Args:
name (str): Component name. (default: inference)
version (str): Component version. (default: 1)
desc (str): Component description (optional)
"""
super().__init__(name=name, desc=desc)
self.version: str = version
self.entrypoint: Path | None = None
self.model_path: Path | None = None
self.model_version: str = "1"
self.config: Path | None = None
self.auto_config = None
self.warmup: bool = False
self.optimization: bool = False
use_model(path, max_batch_size, optimization=None, warmup=None) ¶
Add the ONNX model file for the component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path | Path | str | The path to the ONNX model file. | required |
max_batch_size | int | The maximum batch size for the model. | required |
optimization | TensorRTOptimization | The optimization configuration for the model. Defaults to None. | None |
warmup | Warmup | The warmup configuration for the model. Defaults to None. | None |
Raises:
| Type | Description |
|---|---|
AssertionError | If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def use_model(self, path: Path | str,
max_batch_size: int,
optimization: model_config.TensorRTOptimization | None = None,
warmup: model_config.Warmup | bool | None = None):
"""
Add the ONNX model file for the component.
Args:
path (Path | str): The path to the ONNX model file.
max_batch_size (int): The maximum batch size for the model.
optimization (model_config.TensorRTOptimization, optional): The optimization configuration for the model. Defaults to None.
warmup (model_config.Warmup, optional): The warmup configuration for the model. Defaults to None.
Raises:
AssertionError: If the specified model file is not found, has an invalid extension, or if max_batch_size is less than 0.
"""
path = Path(path)
if not path.is_file():
raise AssertionError(f"specified model file not found: '{path}'")
if path.suffix != ".onnx":
raise AssertionError(f"model file extension is not '.onnx': '{path}'")
if max_batch_size < 0:
raise AssertionError("max_batch_size must be greater or equal to 0")
self.entrypoint = Path("model.onnx")
self.model_path = path
if self.config is not None:
_logger.warning("Previously added configuration was removed. Component will use the default configuration unless you specify your own.")
self.config = None
# Removing old automatic variables
if self.auto_config is not None:
for var in self.auto_config.inputs:
self.delete_input(var["name"])
for var in self.auto_config.outputs:
self.delete_output(var["name"])
model_warmup = warmup
if isinstance(warmup, bool):
self.warmup = warmup
model_warmup = model_config.Warmup.ZERO_DATA if warmup else model_config.Warmup.DISABLED
elif isinstance(warmup, model_config.Warmup):
self.warmup = warmup != model_config.Warmup.DISABLED
else:
self.warmup = False
model_warmup = model_config.Warmup.DISABLED
self.optimization = isinstance(optimization, model_config.TensorRTOptimization)
self.auto_config = model_config.ModelConfig(onnx_path=path,
max_batch_size=max_batch_size,
warmup=model_warmup,
optimization=optimization)
for var in self.auto_config.inputs:
self.add_input(var["name"], var["type"])
for var in self.auto_config.outputs:
self.add_output(var["name"], var["type"])
use_config(path) ¶
Sets the configuration file to be used for inference. Intended usage is to use a custom configuration file instead of the autogenerated one. This way extra configurations can be added to the component, such as the execution accelerator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path | Path | str | The path to the configuration file. | required |
Raises:
| Type | Description |
|---|---|
AssertionError | If the specified config file is not found or has an invalid extension. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def use_config(self, path: Path | str):
"""
Sets the configuration file to be used for inference.
Intended usage is to use a custom configuration file instead of the autogenerated one.
This way extra configurations can be added to the component, such as the execution accelerator.
Args:
path (Path | str): The path to the configuration file.
Raises:
AssertionError: If the specified config file is not found or has an invalid extension.
"""
path = Path(path)
if not path.is_file():
raise AssertionError(f"specified config file not found: '{path}'")
if path.suffix != ".pbtxt":
raise AssertionError(f"config file extension is not '.pbtxt': '{path}'")
config_data = _validate_and_load_gpuruntime_config(path)
self.warmup = (0 < len(config_data.model_warmup))
self.optimization = (0 < len(f"{config_data.optimization}"))
self.config = path
save(destination, validate=False) ¶
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
The component folder contains the following:
- An
.onnxmodel file - A
.pbtxtconfiguration file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | path - like | Target directory to which the component will be saved. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/gpuruntime_component.py
def save(self, destination: Path | str, validate = False):
"""
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
The component folder contains the following:
- An `.onnx` model file
- A `.pbtxt` configuration file
Args:
destination (path-like): Target directory to which the component will be saved.
"""
if self.entrypoint is None:
raise AssertionError("An ONNX model file must be specified before the component can be saved.")
component_dir = Path(destination) / self.name
component_dir.mkdir(parents = True, exist_ok = True)
model_dir = component_dir / self.model_version
model_dir.mkdir(exist_ok = True)
shutil.copy(str(self.model_path), str(model_dir / "model.onnx"))
if self.config is None:
_logger.warning("Configuration was not specified. Model will be saved with default configuration.")
(component_dir / "config.pbtxt").write_text(f"{self.auto_config}")
else:
shutil.copy(self.config, component_dir)
PipelineData ¶
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_data.py
class PipelineData:
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
"""
A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
The name and version will define together the name of the zip file when the package is saved.
Args:
name (str): Name of the package
desc (str): Package description (optional)
version (str): Version of the package
"""
self.name = name
self.desc = desc
self.version = version
self.init_version = version # initial version; used when version is not set in Pipeline.export() and save()
self.save_version = version # contains the version determined at exporting/saving
self.package_id: Optional[uuid.UUID] = None
self.author = 'AI SDK'
self.components = {}
self.wiring = {}
self.parameters: list[PipelineParameter] = []
self.periodicity = None
self.timeshift_reference = []
self.inputs: list[PipelineVariable] = []
self.outputs: list[PipelineVariable] = []
self.log_level = logging.INFO
self.cycle_time = None
# Helper method returns index or None if the pipeline inputs/outputs contain the given variable
def _get_io_index(self, io_list: list[PipelineVariable], variable: str) -> Optional[int]:
for i, io in enumerate(io_list):
if io.variableName == variable:
return i
return None
def _get_io_variable(self, variable_name: str) -> Optional[PipelineVariable]:
for io in self.inputs + self.outputs:
if io.variableName == variable_name:
return io
return None
def _get_parameter(self, parameter_name: str) -> Optional[PipelineParameter]:
for param in self.parameters:
if param.name == parameter_name:
return param
return None
def _get_parameter_index(self, parameter_name: str) -> Optional[int]:
for i, param in enumerate(self.parameters):
if param.name == parameter_name:
return i
return None
__init__(name, version=None, desc='') ¶
A newly initialized Pipeline will contain no Component or wire, just its name and version will be set. The name and version will define together the name of the zip file when the package is saved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the package | required |
desc | str | Package description (optional) | '' |
version | str | Version of the package | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_data.py
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
"""
A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
The name and version will define together the name of the zip file when the package is saved.
Args:
name (str): Name of the package
desc (str): Package description (optional)
version (str): Version of the package
"""
self.name = name
self.desc = desc
self.version = version
self.init_version = version # initial version; used when version is not set in Pipeline.export() and save()
self.save_version = version # contains the version determined at exporting/saving
self.package_id: Optional[uuid.UUID] = None
self.author = 'AI SDK'
self.components = {}
self.wiring = {}
self.parameters: list[PipelineParameter] = []
self.periodicity = None
self.timeshift_reference = []
self.inputs: list[PipelineVariable] = []
self.outputs: list[PipelineVariable] = []
self.log_level = logging.INFO
self.cycle_time = None
save_readme_html(pipeline, destination) ¶
Saves a README.html in the destination folder that describes the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | path - like | Path of the destination folder. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline_page.py
def save_readme_html(pipeline: PipelineData, destination: Union[str, os.PathLike]):
"""
Saves a `README.html` in the `destination` folder that describes the pipeline.
Args:
destination (path-like): Path of the destination folder.
"""
pipelinePage = _PipelinePage(pipeline)
readme_html_path = Path(destination) / README_HTML
readme_html_path.write_text(pipelinePage.__str__())
Pipeline ¶
Bases: PipelineData
Pipeline represents a pipeline configuration package with Components and wires to provide a data flow on the AI Inference Server. The Components have inputs and outputs to transfer data to each other and the wires describe this data flow between them. The package also contains configuration files required to deploy a pipeline on an Industrial Edge device.
A newly initialized Pipeline does not contain any Component or wire, only its name and version will be set. The name and version together will define the name of the zip file when the package is saved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the package | required |
version | str | Version of the package | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
class Pipeline(PipelineData):
"""
`Pipeline` represents a pipeline configuration package with `Components` and wires to provide a data flow on the AI Inference Server.
The `Components` have inputs and outputs to transfer data to each other and the wires describe this data flow between them.
The package also contains configuration files required to deploy a pipeline on an Industrial Edge device.
A newly initialized `Pipeline` does not contain any `Component` or wire, only its name and version will be set.
The name and version together will define the name of the zip file when the package is saved.
Args:
name (str): Name of the package
version (str): Version of the package
"""
_wire_hash_string = "{}.{} -> {}.{}"
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
"""
A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
The name and version will define together the name of the zip file when the package is saved.
Args:
name (str): Name of the package
desc (str): Package description (optional)
version (str): Version of the package
"""
super().__init__(name, version, desc)
self.report_writer = PipelineReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
_python_dependencies_logger.addHandler(report_writer_handler)
_wheelhouse_logger.addHandler(report_writer_handler)
def _set_log_level(self, log_level: int):
self.log_level = log_level
_logger.setLevel(self.log_level)
@staticmethod
def from_components(components: list, name: str, version: Optional[str] = None, desc: str = "") -> "Pipeline":
"""
Creates a pipeline configuration from the given components.
The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components.
The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs.
The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.
Args:
components (list): List of PythonComponents
name (str): Name of the pipeline
version (str): Version information of the pipeline. (Optional)
Returns:
Pipeline: Pipeline object with the auto-wired components
"""
pipeline = Pipeline(name, version, desc=desc)
first_component = components[0]
pipeline.add_component(first_component)
pipeline.inputs = [PipelineVariable(first_component.name, component_input) for component_input in first_component.inputs]
pipeline.outputs = [PipelineVariable(first_component.name, output) for output in first_component.outputs]
for component in components[1:]:
pipeline.add_component(component)
for t_output in pipeline.outputs:
try:
pipeline.add_wiring(t_output.componentName, t_output.variableName, component.name, t_output.variableName)
except Exception as e:
_logger.warning(f"Output variable {t_output.componentName}.{t_output.variableName} couldn't be auto-wired.\nCause: {e}")
unwired_variables = [f'{component.name}.{x}' for x in component.inputs if not any(s.endswith(f'{component.name}.{x}') for s in pipeline.wiring)]
if len(unwired_variables) > 0:
for variable in unwired_variables:
_logger.warning(f"Input variable {variable} couldn't be auto-wired.\n")
pipeline.outputs = [PipelineVariable(component.name, output) for output in component.outputs]
# ImageSet is not allowed in pipeline outputs
for i, o in enumerate(pipeline.outputs):
if pipeline.components[o.componentName].outputs[o.variableName]["type"].lower() == 'imageset':
_logger.warning(f"Output variable {o.componentName}.{o.variableName} of type 'ImageSet' can not be added as pipeline output.")
pipeline.outputs.pop(i)
return pipeline
def __repr__(self) -> str:
"""
Textual representation of the configured package.
The method shows the `Components` with their inputs, outputs and parameters as well as the wiring between these `Components`.
Returns:
[str]: Textual representation of the package
"""
version = self.save_version if self.save_version is not None else self.init_version
text = f"[{self.__class__.__name__}] {self.name} ({version})\n"
if self.desc != "":
text += f"{self.desc}\n"
if len(self.parameters) > 0:
text += "\nPipeline Parameters:\n"
for parameter in self.parameters:
text += f"- {parameter.name} ({parameter.dtype}, default: '{parameter.defaultValue}'){(': ' + parameter.description) if parameter.description is not None else ''}\n"
if len(self.inputs) > 0:
text += "\nPipeline Inputs:\n"
for p_input in self.inputs:
c_input = self.components[p_input.componentName].inputs[p_input.variableName]
text += f"> {p_input.variableName} ({c_input['type']}){': ' + c_input['desc'] if c_input.get('desc') is not None else ''}\n"
if len(self.outputs) > 0:
text += "\nPipeline Outputs:\n"
for p_output in self.outputs:
c_output = self.components[p_output.componentName].outputs[p_output.variableName]
text += f"< {p_output.variableName} ({c_output['type']}){': ' + c_output['desc'] if c_output.get('desc') is not None else ''}\n"
metrics = [(name, metric, component_name) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for name, metric in component.metrics.items()]
if len(metrics) > 0:
text += "\nMetrics:\n"
for name, metric, _ in metrics:
text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"
if len(self.wiring) > 0:
text += "\nI/O Wiring:\n"
for p_input in self.inputs:
text += f" {p_input.variableName} -> {p_input.componentName}.{p_input.variableName}\n"
for wire_hash in self.wiring:
text += f" {wire_hash}\n"
for p_output in self.outputs:
text += f" {p_output.componentName}.{p_output.variableName} -> {p_output.variableName}\n"
for name, metric, component_name in metrics:
text += f" {component_name}.{name} -> {name}\n"
if self.periodicity is not None:
text += "\nTimeshifting:\n"
text += f" Periodicity: {self.periodicity} ms\n"
if len(self.timeshift_reference) > 0:
text += " References:\n"
for ref in self.timeshift_reference:
text += f" - {ref}\n"
if self.cycle_time is not None:
text += f"\nCycle Time: {self.cycle_time} ms\n"
for component in self.components.values():
text += "\n" + component.__repr__()
return text
def add_input(self, component, variable):
"""
Defines an input variable on the given component as a pipeline input.
Args:
component (str): Name of the component
variable (str): Name of the input variable
"""
try:
_ = self.components[component].inputs[variable]
except KeyError:
raise AssertionError("The component with input variable must exist in the pipeline.")
if self.inputs is None:
self.inputs = []
if self._get_io_index(self.inputs, variable) is not None:
raise AssertionError("The pipeline input already exists.")
self.inputs.append(PipelineVariable(componentName=component, variableName=variable))
def delete_input(self, component: str, variable: str):
"""
Deletes a pipeline input.
Args:
component (str): Name of the component
variable (str): Name of the input variable
"""
if (idx := self._get_io_index(self.inputs, variable)) is None:
raise AssertionError("The pipeline input does not exist.")
self.inputs.pop(idx)
def add_output(self, component, variable):
"""
Defines an output variable on the given component as a pipeline output.
Args:
component (str): Name of the component
variable (str): Name of the output variable
"""
try:
output = self.components[component].outputs[variable]
except KeyError:
raise AssertionError("The component with output variable must exist in the pipeline.")
if self.outputs is None:
self.outputs = []
if self._get_io_index(self.outputs, variable) is not None:
raise AssertionError("The pipeline output already exists.")
if output["type"].lower() == 'imageset':
_logger.warning("Output variables of type 'ImageSet' can not be added as pipeline outputs.")
self.outputs.append(PipelineVariable(componentName=component, variableName=variable))
def delete_output(self, component: str, variable: str):
"""
Deletes a pipeline output.
Args:
component (str): Name of the component
variable (str): Name of the output variable
"""
if (idx := self._get_io_index(self.outputs, variable)) is None:
raise AssertionError("The pipeline output does not exist.")
self.outputs.pop(idx)
def add_component(self, component: Component):
"""
Adds a `Component` to the pipeline configuration without any connection.
The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.
Args:
component (Component): `Component` to be added
"""
if component.name in self.components:
raise AssertionError(f"Component with name {component.name} already exists. Please rename the component.")
self.components[component.name] = component
def add_wiring(self, from_component: str, from_output: str, to_component: str, to_input: str):
"""
Creates a one-to-one connection between the input and output of two components.
The method checks if the connection is allowed with the following requirements:
- The components exist with the given inputs/outputs
- The given inputs and outputs are not connected to any wire
- The types of the connected input and output are compatible
Args:
from_component (str): Name of the component which provides data to the `to_component`
from_output (str): Name of the output variable of the `from_component`
to_component (str): Name of the component which consumes data from the `from_component`
to_input (str): Name of the input variable of the `to_component`
"""
if from_component not in self.components:
raise AssertionError(f"No component named '{from_component}'")
if to_component not in self.components:
raise AssertionError(f"No component named '{to_component}'")
if from_output not in self.components[from_component].outputs:
raise AssertionError(f"Component '{from_component}' has no output named '{from_output}'")
if to_input not in self.components[to_component].inputs:
raise AssertionError(f"Component '{to_component}' has no input named '{to_input}'")
if self.get_wire_for_input(to_component, to_input) is not None:
raise AssertionError(f"Input '{to_input}' of component '{to_component}' is already wired")
_output_type = self.components[from_component].outputs[from_output]["type"]
_input_type = self.components[to_component].inputs[to_input]["type"]
if _output_type != _input_type:
raise AssertionError("Output and input types do not match")
wire_hash = self._wire_hash_string.format(from_component, from_output, to_component, to_input)
self.wiring[wire_hash] = {
"fromComponent": from_component,
"fromOutput": from_output,
"toComponent": to_component,
"toInput": to_input,
}
def get_wire_for_output(self, component_name: str, output_name: str) -> Optional[dict]:
"""
Searches for the wire which connects a component with `component_name` as data provider through its output with name output_name.
Args:
component_name (str): Name of the data provider component.
output_name (str): Name of the output variable of `component_name`.
Returns:
Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
"""
wires = [x for x in self.wiring.values() if x["fromComponent"] == component_name and x["fromOutput"] == output_name]
return wires[0] if wires else None
def get_wire_for_input(self, component_name: str, input_name: str) -> Optional[dict]:
"""
Searches for the wire which connects a component with `component_name` as data consumer through its input with name `input_name`.
Args:
component_name (str): Name of the data consumer component.
input_name (str): Name of the input variable of `component_name`.
Returns:
Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
"""
wires = [x for x in self.wiring.values() if x["toComponent"] == component_name and x["toInput"] == input_name]
return wires[0] if wires else None
def delete_input_wire(self, component: str, variable: str, with_input: bool = True) -> None:
"""
Deletes an existing connection between two components.
The connection must be given with the name of the consumer component and its input variable.
If an inter signal alignment reference variable is affected it cannot be deleted.
By default, the input variable will be also deleted.
Args:
component (str): Name of the component which has the input given the name variable
variable (str): Name of the input variable on the component which connected by the wire
with_input (bool, optional): If set, the input variable will be also deleted from the component. Defaults to True.
Raises:
AssertionError: When the variable acts as inter signal alignment reference, it cannot be deleted, and an `AssertionError` will be raised.
"""
wire = self.get_wire_for_input(component, variable)
if wire is None:
raise AssertionError(f"There is no wiring for input '{variable}' of component '{component}'")
if variable in self.timeshift_reference:
raise AssertionError("Inter signal alignment reference variables can not be deleted.")
wire_hash = self._wire_hash_string.format(wire['fromComponent'], wire['fromOutput'], wire['toComponent'], wire['toInput'])
self.wiring.pop(wire_hash)
if with_input:
self.components[component].delete_input(variable)
def add_dependencies(self, packages: list) -> None:
"""
@Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis.
Collects the given Python packages with their versions from the executing Python environment and add them to all components of type `PythonComponent`.
This step is necessary in order to execute the pipeline configuration on the Edge side.
The method can be called multiple times but each time the previously-collected dependencies are cleared.
The reason for this is to ensure a consistent dependency list for the `requirements.txt` file when the package is saved.
Args:
packages (list): List of the necessary python packages to execute the script defined by self.entrypoint
"""
python_components = [self.components[name] for name in self.components if type(self.components[name]) is PythonComponent]
for component in python_components:
component.add_dependencies(packages)
def set_timeshifting_periodicity(self, periodicity: int) -> None:
"""
Enables inter-signal alignment with the given sampling period.
With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model.
By default, `startingPoint` property is set to `First timestamp`, which means that inter-signal alignment is started at the
first incoming value for any input variable.
This property can be changed to `Signal reference` by adding inter-signal alignment reference variables
via the `add_timeshifting_reference(..)` method. In this case, inter-signal alignment is started when the first value arrives
for the defined input variables.
Args:
periodicity (int): Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31).
"""
periodicity = int(periodicity)
if periodicity not in range(10, int(math.pow(2, 31))):
raise AssertionError("Inter signal alignment periodicity must be an integer and in range [10, 2^31)")
self.periodicity = periodicity
_logger.info(f"Inter signal alignment periodicity has been set to {self.periodicity}.")
def add_timeshifting_reference(self, reference: str) -> None:
"""
Enables signal alignment mode `Signal reference` by declaring input variables as reference variables.
Args:
reference (str): Variable name to be added to `self.timeshift_reference` list.
"""
if reference not in [i.variableName for i in self.inputs]:
raise AssertionError(f"There is no input variable defined with name '{reference}'")
if reference in self.timeshift_reference:
_logger.warning(f"Reference variable with name '{reference}' has been already added.")
return
self.timeshift_reference.append(reference)
def remove_timeshifting_reference(self, reference: str) -> None:
"""
Removes previously-defined inter-signal alignment reference variables.
If no reference variables remain, the `startingPoint` will be `First timestamp`.
Args:
reference (str): Variable name to be removed from `self.timeshift_reference` list.
"""
if reference not in self.timeshift_reference:
raise AssertionError(f"Reference variable with name {'reference'} does not exist.")
self.timeshift_reference.remove(reference)
def set_cycle_time(self, cycle_time_usec: int | None):
"""
Sets the cycle time for the pipeline.
Args:
cycle_time_usec (int | None): Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000.
"""
self.cycle_time = cycle_time_usec
self._check_cycle_time()
def get_pipeline_config(self) -> dict:
"""
Saves the information on the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for the AI Inference Server.
The file is created in the `destination` folder with name `pipeline_config.yml`
"""
version = self.save_version if self.save_version is not None else self.init_version
metric_fields = [(component_name, field) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for field in component.metrics.keys()]
pipeline_inputs = [
i.__dict__(self.components[i.componentName].inputs)
for i in self.inputs
]
pipeline_outputs = [
{**o.__dict__(self.components[o.componentName].outputs), 'metric': False}
for o in self.outputs
]
pipeline_outputs += [{
'name': field,
'type': 'String',
'metric': True,
'topic': f"/siemens/edge/aiinference/{self.name}/{version}/metrics/{component_name}/{field}",
} for component_name, field in metric_fields]
pipeline_dag = [{
'source': f'Databus.{i.variableName}',
'target': f'{i.componentName}.{i.variableName}',
} for i in self.inputs]
pipeline_dag += [{
'source': f"{wire['fromComponent']}.{wire['fromOutput']}",
'target': f"{wire['toComponent']}.{wire['toInput']}",
} for wire in self.wiring.values()]
pipeline_dag += [{
'source': f'{o.componentName}.{o.variableName}',
'target': f'Databus.{o.variableName}',
} for o in self.outputs]
pipeline_dag += [{
'source': f'{component_name}.{field}',
'target': f'Databus.{field}',
} for component_name, field in metric_fields]
config_yml_content = {
'fileFormatVersion': '1.2.0',
'dataFlowPipelineInfo': {
'author': self.author,
'createdOn': datetime.now(),
'dataFlowPipelineVersion': version,
'description': self.desc if self.desc else 'Created by AI SDK',
'projectName': self.name,
'packageId': str(self.package_id)
},
'dataFlowPipeline': {
'components': [component._to_dict() for component in self.components.values()],
'pipelineDag': pipeline_dag,
'pipelineInputs': pipeline_inputs,
'pipelineOutputs': pipeline_outputs,
},
'packageType': 'full'
}
if len(self.parameters) > 0:
config_yml_content["dataFlowPipeline"]["pipelineParameters"] = [
param.__param_dict__()
for param in self.parameters
]
if self.cycle_time is not None:
config_yml_content["dataFlowPipeline"]["cycle_time"] = self.cycle_time
return config_yml_content
def save_pipeline_config(self, destination):
"""
Saves the information about the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for AI Inference Server.
The file will be created in the `destination` folder with name `pipeline_config.yml`
Args:
destination (path-like): Path of the `destination` directory.
"""
with open(Path(destination) / PIPELINE_CONFIG, "w") as f:
yaml.dump(self.get_pipeline_config(), f)
def get_datalink_metadata(self) -> dict:
"""
The method generates metadata information based on available information.
Returns:
dict: Dictionary with the necessary information for the AI Inference Server.
"""
timeshifting = {
"id": None,
"enabled": False,
"periodicity": self.periodicity,
"startingPoint": None,
}
if self.periodicity is not None:
timeshifting["enabled"] = True
timeshifting["startingPoint"] = 'First timestamp'
if len(self.timeshift_reference) > 0:
timeshifting["startingPoint"] = 'Signal reference'
exported_metadata = {
"fileFormatVersion": "1.0.0",
"id": None,
"version": None,
"createdOn": datetime.now(),
"updatedOn": datetime.now(),
"timeShifting": timeshifting,
"inputs": [
{
'name': input.variableName,
'mapping': None,
'timeShiftingReference': input.variableName in self.timeshift_reference,
'type': self.components[input.componentName].inputs[input.variableName]['type']
} for input in self.inputs
]
}
return exported_metadata
def save_datalink_metadata(self, destination):
"""
Saves metadata for pipeline input variables.
This method saves metadata for the AI Inference Server into a YAML file.
This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment.
The file is created in the `destination` folder with the name `datalink_metadata.yml`
Args:
destination (path-like): Path of the destination directory.
"""
with open(Path(destination) / DATALINK_METADATA, "w") as f:
yaml.dump(self.get_datalink_metadata(), f)
def save_telemetry_data_if_consented(self, destination: Path):
try:
Path(destination / TELEMETRY_YAML).unlink(missing_ok=True)
# checking whether the user has made a statement about telemetry collection
if is_telemetry_allowed():
self.save_telemetry_data(destination)
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
def save_telemetry_data(self, destination: Path):
"""
Save telemetry data to a specified destination.
Args:
destination (Path): The path where the telemetry data should be saved.
"""
telemetry_path = destination / TELEMETRY_YAML
telemetry_data = {}
try:
consent_allowed = is_telemetry_allowed()
except RuntimeError:
consent_allowed = "undecided"
try:
consent_data, _ = read_telemetry_consent_file()
except BaseException:
consent_data = {}
telemetry_data["telemetry_consent"] = {}
telemetry_data["telemetry_consent"][CONSENT_KEY] = consent_allowed
telemetry_data["telemetry_consent"][CONSENT_KEY_SDK] = consent_data.get(CONSENT_KEY_SDK, 'unknown')
telemetry_data["telemetry_consent"][CONSENT_KEY_TS] = consent_data.get(CONSENT_KEY_TS, 'unknown')
telemetry_data["telemetry_consent"][CONSENT_KEY_METHOD] = consent_data.get(CONSENT_KEY_METHOD, get_consent_source())
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
_logger.debug("simaticai package not found")
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component].python_version for component in self.components
if isinstance(self.components[component], PythonComponent)))
telemetry_data["pipeline"]["file_extensions"] = list(set(f.suffix for f in Path(destination).rglob("*")
if f.suffix not in ["", ".zip", ".yml", ".yaml", ".html"]))
with open(telemetry_path, 'w') as telemetry_file:
yaml.dump(telemetry_data, telemetry_file)
def validate(self, destination: str | Path = "."):
"""
Validates whether the package configuration is compatible with the expected runtime environment.
The method verifies:
- If the package has at least one component
- If all wires create connections between existing components and their variables
- If metadata is defined and valid.
- If a package with the same name already exists in the `destination` folder. In this case a warning message appears and the `save(..)` method overwrites the existing package.
- If the package has multiple components and if they are using the same Python version
Args:
destination (str, optional): Path of the expected destination folder. Defaults to ".".
"""
version = self.save_version if self.save_version is not None else self.init_version
if len(self.components) < 1:
raise AssertionError("The package must have at least one component.")
for p_output in self.outputs:
if self.components[p_output.componentName].batch.outputBatch:
raise AssertionError(f"The component '{p_output.componentName}' has pipeline output defined with variable name '{p_output.variableName}'. \
None of component with pipeline output is allowed to provide batch output.")
for wire_hash in self.wiring.copy():
wire = self.wiring[wire_hash]
self._check_wiring(wire, wire_hash)
pipeline_inputs = [p_input.variableName for p_input in self.inputs]
pipeline_outputs = [p_output.variableName for p_output in self.outputs]
if any(variable in pipeline_outputs for variable in pipeline_inputs):
conflicts = set(pipeline_inputs).intersection(set(pipeline_outputs))
raise AssertionError(f"Pipeline input and output variables must be unique. Conflicting variables: {conflicts}")
self._check_timeshifting()
self._check_cycle_time()
self._check_supported_types_for_connections()
package_path = Path(destination) / f"{self.name}_{version}".replace(" ", "-")
if package_path.is_dir():
_logger.warning(f"Target folder ({package_path}) already exists! Unless changing the package name the package could be invalid and your files will be overwritten!")
python_versions = set()
for component in self.components:
self.components[component].validate()
if isinstance(self.components[component], PythonComponent):
python_versions.add(self.components[component].python_version)
if (1 < len(python_versions)):
_logger.warning("The use of multiple python version in a single pipeline is not recommended. We recommend using only one of the supported versions, which are Python 3.11 or 3.12.")
_logger.info(f"Package '{self.name}' is valid and ready to save.")
def _check_timeshifting(self):
if len(self.timeshift_reference) > 0 and self.periodicity is None:
raise AssertionError("When using inter signal alignment reference variables, the periodicity must be set.")
def _check_supported_types_for_connections(self):
for p_input in self.inputs:
variable_type = self.components[p_input.componentName].inputs[p_input.variableName]['type']
warnings, errors = p_input.check_connection_type_compatibility(variable_type)
warnings_str = ", ".join(warnings)
errors_str = ", ".join(errors)
if warnings:
_logger.warning(f"Input variable `{p_input.variableName}` of component `{p_input.componentName}` is of type "
f"`{variable_type}` which is not supported for connection type `{p_input.connection.cptype.name}`. "
f"Issues: {warnings_str}.")
if errors:
_logger.error(f"Input variable `{p_input.variableName}` of component `{p_input.componentName}` is of type "
f"`{variable_type}` which is not supported for connection type `{p_input.connection.cptype.name}`. "
f"Issues: {errors_str}.")
for p_output in self.outputs:
variable_type = self.components[p_output.componentName].outputs[p_output.variableName]['type']
warnings, errors = p_output.check_connection_type_compatibility(variable_type)
warnings_str = ", ".join(warnings)
errors_str = ", ".join(errors)
if warnings:
_logger.warning(f"Output variable `{p_output.variableName}` of component `{p_output.componentName}` is of type "
f"`{variable_type}` which is not supported for connection type `{p_output.connection.cptype.name}`. "
f"Issues: {warnings_str}.")
if errors:
_logger.error(f"Output variable `{p_output.variableName}` of component `{p_output.componentName}` is of type "
f"`{variable_type}` which is not supported for connection type `{p_output.connection.cptype.name}`. "
f"Issues: {errors_str}.")
for p_param in self.parameters:
if isinstance(p_param, SecretPipelineParameter):
continue
if not p_param.topicBased:
continue
if not p_param.has_valid_connection():
_logger.error(f"Pipeline parameter `{p_param.name}` is of type `{p_param.dtype}` which is not supported for connection type `{p_param.connection.cptype.name}`.")
def _check_wiring(self, wire, wire_hash):
error_messages = []
if wire['fromComponent'] not in self.components:
error_messages.append(f"From component {wire['fromComponent']} does not exist")
if wire['toComponent'] not in self.components:
error_messages.append(f"To component {wire['toComponent']} does not exist")
if wire['fromOutput'] not in self.components[wire['fromComponent']].outputs:
error_messages.append(f"Output variable {wire['fromOutput']} does not exist on component {wire['fromComponent']}")
if wire['toInput'] not in self.components[wire['toComponent']].inputs:
error_messages.append(f"Input variable {wire['toInput']} does not exist on component {wire['toComponent']}")
if len(error_messages) == 0:
from_type_ = self.components[wire['fromComponent']].outputs[wire['fromOutput']]['type']
to_type_ = self.components[wire['toComponent']].inputs[wire['toInput']]['type']
if from_type_ != to_type_:
error_messages.append(f"The types of input and output variables does not match for wiring {wire_hash}.")
if len(error_messages) > 0:
self.wiring.pop(wire_hash)
error_messages.append("The wire has been deleted, please check the variables and re-create the connection.")
raise AssertionError(error_messages.__str__())
def _check_cycle_time(self) -> bool:
if self.cycle_time is None:
return True
if not isinstance(self.cycle_time, int) or self.cycle_time < 1:
_logger.error(cycle_time_error_message)
return False
if self.cycle_time < 1_000 or 1_000_000 < self.cycle_time:
_logger.warning(cycle_time_error_message)
return False
return True
def save(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
"""
@Deprecated, reason: only edge package generation will be supported in the future. Use export instead.
Saves the assembled package in a zip format.
The name of the file is defined as `{package_name}_{package_version}.zip`.
If a file with such a name already exists in the `destination` folder, it gets overwritten and a warning message appears.
The package is also available as a subfolder on the destination path with the name `{package_name}_{package_version}`.
If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.
The package contains files and folders in the following structure:
- Package folder with name `{package_name}_{package_version}`
- `datalink-metadata.yml`
- `pipeline-config.yml`
- Component folder with name `{component_name}`
When the component is a `PythonComponent`, this folder contains:
- `requirements.txt`
- Entrypoint script defined by the entrypoint of the component
- Extra files as added to the specified folders
- Source folder with name `src` with necessary python scripts
If a package ID is specified, and a package with the same ID and version is already present in the `destination` folder,
an error is raised.
Args:
destination (str, optional): Target directory for saving the package. Defaults to ".".
package_id (UUID): The optional package ID. If None, a new UUID is generated.
"""
try:
# checking whether the user has made a statement about telemetry collection
is_telemetry_allowed()
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
self._set_save_version_and_package_id(Path(destination), package_id, version)
destination = Path(destination)
self.validate(destination)
name = self.name.replace(" ", "-")
package_name = f"{name}_{self.save_version}"
destination = destination / package_name
destination.mkdir(parents=True, exist_ok=True)
for component in self.components:
self.components[component].save(destination, False)
if isinstance(self.components[component], PythonComponent):
self.report_writer.add_direct_dependencies(self.components[component].name, self.components[component].python_dependencies.dependencies)
self.save_datalink_metadata(destination)
self.save_pipeline_config(destination)
p_page.save_readme_html(self, destination)
self.save_telemetry_data_if_consented(destination)
zip_destination = shutil.make_archive(
base_name=str(destination.parent / package_name), format='zip',
root_dir=destination.parent, base_dir=package_name,
verbose=True, logger=_logger)
return Path(zip_destination)
def _set_save_version_and_package_id(self, destination: Path, package_id: Optional[uuid.UUID], version: Optional[str]):
previous_versions_and_ids = self._get_versions_and_package_ids_of_existing_packages(destination)
previous_versions, previous_package_ids = zip(*previous_versions_and_ids) if previous_versions_and_ids else ([], [])
# if package id is provided, we use that
if package_id is not None:
self.package_id = package_id
# if not, we auto-generate a package id
else:
previous_package_ids_set = {pkg_id for pkg_id in previous_package_ids if pkg_id is not None}
if self.package_id is not None:
previous_package_ids_set.add(self.package_id)
if len(previous_package_ids_set) == 0:
self.package_id = uuid.uuid4()
elif len(previous_package_ids_set) == 1:
self.package_id = previous_package_ids_set.pop()
else:
_logger.error(f"Multiple package IDs found in the destination folder: {previous_package_ids_set}. Set a package ID.")
raise RuntimeError(f"Multiple package IDs found in the destination folder: {previous_package_ids_set}. Set a package ID.")
# Preference #1: use the provided version
if version is not None:
self.save_version = version
# Preference #2: use the version set at init time
elif self.init_version is not None:
self.save_version = self.init_version
# Preference #3: auto-generate version
else:
previous_decimal_versions_set = {int(v) for v in previous_versions if v is not None and v.isdecimal()}
if len(previous_decimal_versions_set) == 0:
self.save_version = "1"
else:
self.save_version = str(max(previous_decimal_versions_set) + 1)
# checking if the package zip already exists
name = self.name.replace(" ", "-")
package_name = f"{name}_{self.save_version}"
package_file = destination / f"{package_name}.zip"
if package_file.exists():
_logger.warning(f"Target package with version '{self.save_version}' already exists: '{package_file}. The package will be overwritten.")
edge_package_file = destination / f"{name}-edge_{self.save_version}.zip"
if edge_package_file.exists():
_logger.warning(f"Target package with version '{self.save_version}' already exists: '{edge_package_file}. The package will be overwritten.")
self.version = self.save_version
def _get_versions_and_package_ids_of_existing_packages(self, destination: Path) -> List[Tuple[str, Optional[uuid.UUID]]]:
package_versions_and_ids = []
for file in destination.glob(f"{self.name.replace(' ', '-')}*.zip"):
with zipfile.ZipFile(file) as zip_file:
config_path = next(f for f in zip_file.namelist() if f.endswith("pipeline_config.yml"))
with zip_file.open(config_path) as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
pipeline_info = config.get("dataFlowPipelineInfo", {})
name = pipeline_info.get("projectName", None)
if name is None or name != self.name:
continue
version = pipeline_info.get("dataFlowPipelineVersion", None)
package_id = pipeline_info.get("packageId", None)
package_id = uuid.UUID(package_id) if package_id is not None else None
package_versions_and_ids.append((version, package_id))
return package_versions_and_ids
def export(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
"""
Export a runnable pipeline package.
Args:
destination (str): optional target directory for saving the package. Defaults to ".".
package_id (UUID): optional package ID. If None, a new UUID is generated.
version (str): optional version. If None, an automatic version number is generated.
"""
try:
# checking whether the user has made a statement about telemetry collection
is_telemetry_allowed()
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
destination = Path(destination)
config_package = None
try:
config_package = self.save(destination, package_id, version)
runtime_package = convert_package(config_package, self.report_writer)
return runtime_package
finally:
if config_package is not None:
Path(config_package).unlink(missing_ok=True)
def _add_parameter(self, parameter: PipelineParameter | SecretPipelineParameter):
if any(param.name == parameter.name for param in self.parameters):
raise AssertionError(f"Parameter with name '{parameter.name}' already exists in the pipeline.")
if parameter is None:
raise AssertionError("Parameter cannot be None.")
self.parameters.append(parameter)
def add_secret_parameter(self, name: str, desc: str | None = None):
"""
Adds a secret parameter to the pipeline configuration.
Args:
name (str): Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with `__AI_IS_` prefix.
desc (str, optional): Description of the secret parameter. Must be a string between 3 and 60 characters if provided.
Raises ValueError if the conditions for name or description are not met.
"""
self._add_parameter(SecretPipelineParameter(name=name, description=desc))
def add_parameter(self, name: str, default_value, type_name: str = "String", topic_based: bool = False, desc: str = None):
"""
Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline.
The parameter's default value and its properties are saved in the pipeline configuration
and the value of the parameter can later be changed on AI Inference Server.
Args:
name (str): Name of the parameter
desc (str): Description of the parameter (optional)
type_name (str, optional): Data type of the parameter. Defaults to "String".
default_value (str): Default value of the parameter
topic_based (bool, optional): If true, the parameter can be updated from a message queue.
Raises:
ValueError:
When:
- the default value of the parameter is not of the specified data type (`type_name`) or
- the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
- the specified data type is not given in the right format or
- the type of the given `topic_based` parameter is not `bool`.
- the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
"""
self._add_parameter(PipelineParameter(
name=name,
defaultValue=default_value,
dtype=type_name,
topicBased=topic_based,
description=desc))
__init__(name, version=None, desc='') ¶
A newly initialized Pipeline will contain no Component or wire, just its name and version will be set. The name and version will define together the name of the zip file when the package is saved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the package | required |
desc | str | Package description (optional) | '' |
version | str | Version of the package | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def __init__(self, name: str, version: Optional[str] = None, desc: str = ""):
"""
A newly initialized `Pipeline` will contain no `Component` or wire, just its name and version will be set.
The name and version will define together the name of the zip file when the package is saved.
Args:
name (str): Name of the package
desc (str): Package description (optional)
version (str): Version of the package
"""
super().__init__(name, version, desc)
self.report_writer = PipelineReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
_python_dependencies_logger.addHandler(report_writer_handler)
_wheelhouse_logger.addHandler(report_writer_handler)
from_components(components, name, version=None, desc='') staticmethod ¶
Creates a pipeline configuration from the given components. The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components. The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs. The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
components | list | List of PythonComponents | required |
name | str | Name of the pipeline | required |
version | str | Version information of the pipeline. (Optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
@staticmethod
def from_components(components: list, name: str, version: Optional[str] = None, desc: str = "") -> "Pipeline":
"""
Creates a pipeline configuration from the given components.
The components are linked in a linear sequence with inputs and outputs auto-wired based on the name of the inputs and outputs of the components.
The inputs of the first component will be wired as the pipeline inputs and the outputs of the last component will be wired as the pipeline outputs.
The components must have unique names. Two or more versions of the same component can not be packaged simultaneously without renaming them.
Args:
components (list): List of PythonComponents
name (str): Name of the pipeline
version (str): Version information of the pipeline. (Optional)
Returns:
Pipeline: Pipeline object with the auto-wired components
"""
pipeline = Pipeline(name, version, desc=desc)
first_component = components[0]
pipeline.add_component(first_component)
pipeline.inputs = [PipelineVariable(first_component.name, component_input) for component_input in first_component.inputs]
pipeline.outputs = [PipelineVariable(first_component.name, output) for output in first_component.outputs]
for component in components[1:]:
pipeline.add_component(component)
for t_output in pipeline.outputs:
try:
pipeline.add_wiring(t_output.componentName, t_output.variableName, component.name, t_output.variableName)
except Exception as e:
_logger.warning(f"Output variable {t_output.componentName}.{t_output.variableName} couldn't be auto-wired.\nCause: {e}")
unwired_variables = [f'{component.name}.{x}' for x in component.inputs if not any(s.endswith(f'{component.name}.{x}') for s in pipeline.wiring)]
if len(unwired_variables) > 0:
for variable in unwired_variables:
_logger.warning(f"Input variable {variable} couldn't be auto-wired.\n")
pipeline.outputs = [PipelineVariable(component.name, output) for output in component.outputs]
# ImageSet is not allowed in pipeline outputs
for i, o in enumerate(pipeline.outputs):
if pipeline.components[o.componentName].outputs[o.variableName]["type"].lower() == 'imageset':
_logger.warning(f"Output variable {o.componentName}.{o.variableName} of type 'ImageSet' can not be added as pipeline output.")
pipeline.outputs.pop(i)
return pipeline
__repr__() ¶
Textual representation of the configured package. The method shows the Components with their inputs, outputs and parameters as well as the wiring between these Components.
Returns:
| Type | Description |
|---|---|
str | [str]: Textual representation of the package |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def __repr__(self) -> str:
"""
Textual representation of the configured package.
The method shows the `Components` with their inputs, outputs and parameters as well as the wiring between these `Components`.
Returns:
[str]: Textual representation of the package
"""
version = self.save_version if self.save_version is not None else self.init_version
text = f"[{self.__class__.__name__}] {self.name} ({version})\n"
if self.desc != "":
text += f"{self.desc}\n"
if len(self.parameters) > 0:
text += "\nPipeline Parameters:\n"
for parameter in self.parameters:
text += f"- {parameter.name} ({parameter.dtype}, default: '{parameter.defaultValue}'){(': ' + parameter.description) if parameter.description is not None else ''}\n"
if len(self.inputs) > 0:
text += "\nPipeline Inputs:\n"
for p_input in self.inputs:
c_input = self.components[p_input.componentName].inputs[p_input.variableName]
text += f"> {p_input.variableName} ({c_input['type']}){': ' + c_input['desc'] if c_input.get('desc') is not None else ''}\n"
if len(self.outputs) > 0:
text += "\nPipeline Outputs:\n"
for p_output in self.outputs:
c_output = self.components[p_output.componentName].outputs[p_output.variableName]
text += f"< {p_output.variableName} ({c_output['type']}){': ' + c_output['desc'] if c_output.get('desc') is not None else ''}\n"
metrics = [(name, metric, component_name) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for name, metric in component.metrics.items()]
if len(metrics) > 0:
text += "\nMetrics:\n"
for name, metric, _ in metrics:
text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"
if len(self.wiring) > 0:
text += "\nI/O Wiring:\n"
for p_input in self.inputs:
text += f" {p_input.variableName} -> {p_input.componentName}.{p_input.variableName}\n"
for wire_hash in self.wiring:
text += f" {wire_hash}\n"
for p_output in self.outputs:
text += f" {p_output.componentName}.{p_output.variableName} -> {p_output.variableName}\n"
for name, metric, component_name in metrics:
text += f" {component_name}.{name} -> {name}\n"
if self.periodicity is not None:
text += "\nTimeshifting:\n"
text += f" Periodicity: {self.periodicity} ms\n"
if len(self.timeshift_reference) > 0:
text += " References:\n"
for ref in self.timeshift_reference:
text += f" - {ref}\n"
if self.cycle_time is not None:
text += f"\nCycle Time: {self.cycle_time} ms\n"
for component in self.components.values():
text += "\n" + component.__repr__()
return text
add_input(component, variable) ¶
Defines an input variable on the given component as a pipeline input.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | str | Name of the component | required |
variable | str | Name of the input variable | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_input(self, component, variable):
"""
Defines an input variable on the given component as a pipeline input.
Args:
component (str): Name of the component
variable (str): Name of the input variable
"""
try:
_ = self.components[component].inputs[variable]
except KeyError:
raise AssertionError("The component with input variable must exist in the pipeline.")
if self.inputs is None:
self.inputs = []
if self._get_io_index(self.inputs, variable) is not None:
raise AssertionError("The pipeline input already exists.")
self.inputs.append(PipelineVariable(componentName=component, variableName=variable))
delete_input(component, variable) ¶
Deletes a pipeline input.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | str | Name of the component | required |
variable | str | Name of the input variable | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_input(self, component: str, variable: str):
"""
Deletes a pipeline input.
Args:
component (str): Name of the component
variable (str): Name of the input variable
"""
if (idx := self._get_io_index(self.inputs, variable)) is None:
raise AssertionError("The pipeline input does not exist.")
self.inputs.pop(idx)
add_output(component, variable) ¶
Defines an output variable on the given component as a pipeline output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | str | Name of the component | required |
variable | str | Name of the output variable | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_output(self, component, variable):
"""
Defines an output variable on the given component as a pipeline output.
Args:
component (str): Name of the component
variable (str): Name of the output variable
"""
try:
output = self.components[component].outputs[variable]
except KeyError:
raise AssertionError("The component with output variable must exist in the pipeline.")
if self.outputs is None:
self.outputs = []
if self._get_io_index(self.outputs, variable) is not None:
raise AssertionError("The pipeline output already exists.")
if output["type"].lower() == 'imageset':
_logger.warning("Output variables of type 'ImageSet' can not be added as pipeline outputs.")
self.outputs.append(PipelineVariable(componentName=component, variableName=variable))
delete_output(component, variable) ¶
Deletes a pipeline output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | str | Name of the component | required |
variable | str | Name of the output variable | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_output(self, component: str, variable: str):
"""
Deletes a pipeline output.
Args:
component (str): Name of the component
variable (str): Name of the output variable
"""
if (idx := self._get_io_index(self.outputs, variable)) is None:
raise AssertionError("The pipeline output does not exist.")
self.outputs.pop(idx)
add_component(component) ¶
Adds a Component to the pipeline configuration without any connection. The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | Component |
| required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_component(self, component: Component):
"""
Adds a `Component` to the pipeline configuration without any connection.
The component must have a unique name. Two or more versions of the same component can not be added to the same pipeline with the same component name.
Args:
component (Component): `Component` to be added
"""
if component.name in self.components:
raise AssertionError(f"Component with name {component.name} already exists. Please rename the component.")
self.components[component.name] = component
add_wiring(from_component, from_output, to_component, to_input) ¶
Creates a one-to-one connection between the input and output of two components. The method checks if the connection is allowed with the following requirements:
- The components exist with the given inputs/outputs
- The given inputs and outputs are not connected to any wire
- The types of the connected input and output are compatible
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_component | str | Name of the component which provides data to the | required |
from_output | str | Name of the output variable of the | required |
to_component | str | Name of the component which consumes data from the | required |
to_input | str | Name of the input variable of the | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_wiring(self, from_component: str, from_output: str, to_component: str, to_input: str):
"""
Creates a one-to-one connection between the input and output of two components.
The method checks if the connection is allowed with the following requirements:
- The components exist with the given inputs/outputs
- The given inputs and outputs are not connected to any wire
- The types of the connected input and output are compatible
Args:
from_component (str): Name of the component which provides data to the `to_component`
from_output (str): Name of the output variable of the `from_component`
to_component (str): Name of the component which consumes data from the `from_component`
to_input (str): Name of the input variable of the `to_component`
"""
if from_component not in self.components:
raise AssertionError(f"No component named '{from_component}'")
if to_component not in self.components:
raise AssertionError(f"No component named '{to_component}'")
if from_output not in self.components[from_component].outputs:
raise AssertionError(f"Component '{from_component}' has no output named '{from_output}'")
if to_input not in self.components[to_component].inputs:
raise AssertionError(f"Component '{to_component}' has no input named '{to_input}'")
if self.get_wire_for_input(to_component, to_input) is not None:
raise AssertionError(f"Input '{to_input}' of component '{to_component}' is already wired")
_output_type = self.components[from_component].outputs[from_output]["type"]
_input_type = self.components[to_component].inputs[to_input]["type"]
if _output_type != _input_type:
raise AssertionError("Output and input types do not match")
wire_hash = self._wire_hash_string.format(from_component, from_output, to_component, to_input)
self.wiring[wire_hash] = {
"fromComponent": from_component,
"fromOutput": from_output,
"toComponent": to_component,
"toInput": to_input,
}
get_wire_for_output(component_name, output_name) ¶
Searches for the wire which connects a component with component_name as data provider through its output with name output_name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component_name | str | Name of the data provider component. | required |
output_name | str | Name of the output variable of | required |
Returns:
| Type | Description |
|---|---|
Optional[dict] | Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_wire_for_output(self, component_name: str, output_name: str) -> Optional[dict]:
"""
Searches for the wire which connects a component with `component_name` as data provider through its output with name output_name.
Args:
component_name (str): Name of the data provider component.
output_name (str): Name of the output variable of `component_name`.
Returns:
Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
"""
wires = [x for x in self.wiring.values() if x["fromComponent"] == component_name and x["fromOutput"] == output_name]
return wires[0] if wires else None
get_wire_for_input(component_name, input_name) ¶
Searches for the wire which connects a component with component_name as data consumer through its input with name input_name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component_name | str | Name of the data consumer component. | required |
input_name | str | Name of the input variable of | required |
Returns:
| Type | Description |
|---|---|
Optional[dict] | Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_wire_for_input(self, component_name: str, input_name: str) -> Optional[dict]:
"""
Searches for the wire which connects a component with `component_name` as data consumer through its input with name `input_name`.
Args:
component_name (str): Name of the data consumer component.
input_name (str): Name of the input variable of `component_name`.
Returns:
Optional[dict]: Wire which contains the data provider and receiver with their names and the names of their variables.
"""
wires = [x for x in self.wiring.values() if x["toComponent"] == component_name and x["toInput"] == input_name]
return wires[0] if wires else None
delete_input_wire(component, variable, with_input=True) ¶
Deletes an existing connection between two components. The connection must be given with the name of the consumer component and its input variable. If an inter signal alignment reference variable is affected it cannot be deleted. By default, the input variable will be also deleted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | str | Name of the component which has the input given the name variable | required |
variable | str | Name of the input variable on the component which connected by the wire | required |
with_input | bool | If set, the input variable will be also deleted from the component. Defaults to True. | True |
Raises:
| Type | Description |
|---|---|
AssertionError | When the variable acts as inter signal alignment reference, it cannot be deleted, and an |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def delete_input_wire(self, component: str, variable: str, with_input: bool = True) -> None:
"""
Deletes an existing connection between two components.
The connection must be given with the name of the consumer component and its input variable.
If an inter signal alignment reference variable is affected it cannot be deleted.
By default, the input variable will be also deleted.
Args:
component (str): Name of the component which has the input given the name variable
variable (str): Name of the input variable on the component which connected by the wire
with_input (bool, optional): If set, the input variable will be also deleted from the component. Defaults to True.
Raises:
AssertionError: When the variable acts as inter signal alignment reference, it cannot be deleted, and an `AssertionError` will be raised.
"""
wire = self.get_wire_for_input(component, variable)
if wire is None:
raise AssertionError(f"There is no wiring for input '{variable}' of component '{component}'")
if variable in self.timeshift_reference:
raise AssertionError("Inter signal alignment reference variables can not be deleted.")
wire_hash = self._wire_hash_string.format(wire['fromComponent'], wire['fromOutput'], wire['toComponent'], wire['toInput'])
self.wiring.pop(wire_hash)
if with_input:
self.components[component].delete_input(variable)
add_dependencies(packages) ¶
@Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis. Collects the given Python packages with their versions from the executing Python environment and add them to all components of type PythonComponent. This step is necessary in order to execute the pipeline configuration on the Edge side. The method can be called multiple times but each time the previously-collected dependencies are cleared. The reason for this is to ensure a consistent dependency list for the requirements.txt file when the package is saved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packages | list | List of the necessary python packages to execute the script defined by self.entrypoint | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_dependencies(self, packages: list) -> None:
"""
@Deprecated, reason: components can have different Python versions and/or platform, therefore it's better to specify dependencies on a case-by-case basis.
Collects the given Python packages with their versions from the executing Python environment and add them to all components of type `PythonComponent`.
This step is necessary in order to execute the pipeline configuration on the Edge side.
The method can be called multiple times but each time the previously-collected dependencies are cleared.
The reason for this is to ensure a consistent dependency list for the `requirements.txt` file when the package is saved.
Args:
packages (list): List of the necessary python packages to execute the script defined by self.entrypoint
"""
python_components = [self.components[name] for name in self.components if type(self.components[name]) is PythonComponent]
for component in python_components:
component.add_dependencies(packages)
set_timeshifting_periodicity(periodicity) ¶
Enables inter-signal alignment with the given sampling period.
With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model. By default, startingPoint property is set to First timestamp, which means that inter-signal alignment is started at the first incoming value for any input variable.
This property can be changed to Signal reference by adding inter-signal alignment reference variables via the add_timeshifting_reference(..) method. In this case, inter-signal alignment is started when the first value arrives for the defined input variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
periodicity | int | Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31). | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def set_timeshifting_periodicity(self, periodicity: int) -> None:
"""
Enables inter-signal alignment with the given sampling period.
With inter-signal alignment enabled, the AI Inference Server collects data for different input variables before it triggers the model.
By default, `startingPoint` property is set to `First timestamp`, which means that inter-signal alignment is started at the
first incoming value for any input variable.
This property can be changed to `Signal reference` by adding inter-signal alignment reference variables
via the `add_timeshifting_reference(..)` method. In this case, inter-signal alignment is started when the first value arrives
for the defined input variables.
Args:
periodicity (int): Periodicity time in milliseconds for the AI Inference Server to perform inter-signal alignment. Valid range is [10, 2^31).
"""
periodicity = int(periodicity)
if periodicity not in range(10, int(math.pow(2, 31))):
raise AssertionError("Inter signal alignment periodicity must be an integer and in range [10, 2^31)")
self.periodicity = periodicity
_logger.info(f"Inter signal alignment periodicity has been set to {self.periodicity}.")
add_timeshifting_reference(reference) ¶
Enables signal alignment mode Signal reference by declaring input variables as reference variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reference | str | Variable name to be added to | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_timeshifting_reference(self, reference: str) -> None:
"""
Enables signal alignment mode `Signal reference` by declaring input variables as reference variables.
Args:
reference (str): Variable name to be added to `self.timeshift_reference` list.
"""
if reference not in [i.variableName for i in self.inputs]:
raise AssertionError(f"There is no input variable defined with name '{reference}'")
if reference in self.timeshift_reference:
_logger.warning(f"Reference variable with name '{reference}' has been already added.")
return
self.timeshift_reference.append(reference)
remove_timeshifting_reference(reference) ¶
Removes previously-defined inter-signal alignment reference variables. If no reference variables remain, the startingPoint will be First timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reference | str | Variable name to be removed from | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def remove_timeshifting_reference(self, reference: str) -> None:
"""
Removes previously-defined inter-signal alignment reference variables.
If no reference variables remain, the `startingPoint` will be `First timestamp`.
Args:
reference (str): Variable name to be removed from `self.timeshift_reference` list.
"""
if reference not in self.timeshift_reference:
raise AssertionError(f"Reference variable with name {'reference'} does not exist.")
self.timeshift_reference.remove(reference)
set_cycle_time(cycle_time_usec) ¶
Sets the cycle time for the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cycle_time_usec | int | None | Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def set_cycle_time(self, cycle_time_usec: int | None):
"""
Sets the cycle time for the pipeline.
Args:
cycle_time_usec (int | None): Cycle time in microseconds. Must be a positive integer between 1_000 and 1_000_000.
"""
self.cycle_time = cycle_time_usec
self._check_cycle_time()
get_pipeline_config() ¶
Saves the information on the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for the AI Inference Server. The file is created in the destination folder with name pipeline_config.ymlSource code in
docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.pydef get_pipeline_config(self) -> dict:
"""
Saves the information on the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for the AI Inference Server.
The file is created in the `destination` folder with name `pipeline_config.yml`
"""
version = self.save_version if self.save_version is not None else self.init_version
metric_fields = [(component_name, field) for component_name, component in self.components.items() if isinstance(component, PythonComponent) for field in component.metrics.keys()]
pipeline_inputs = [
i.__dict__(self.components[i.componentName].inputs)
for i in self.inputs
]
pipeline_outputs = [
{**o.__dict__(self.components[o.componentName].outputs), 'metric': False}
for o in self.outputs
]
pipeline_outputs += [{
'name': field,
'type': 'String',
'metric': True,
'topic': f"/siemens/edge/aiinference/{self.name}/{version}/metrics/{component_name}/{field}",
} for component_name, field in metric_fields]
pipeline_dag = [{
'source': f'Databus.{i.variableName}',
'target': f'{i.componentName}.{i.variableName}',
} for i in self.inputs]
pipeline_dag += [{
'source': f"{wire['fromComponent']}.{wire['fromOutput']}",
'target': f"{wire['toComponent']}.{wire['toInput']}",
} for wire in self.wiring.values()]
pipeline_dag += [{
'source': f'{o.componentName}.{o.variableName}',
'target': f'Databus.{o.variableName}',
} for o in self.outputs]
pipeline_dag += [{
'source': f'{component_name}.{field}',
'target': f'Databus.{field}',
} for component_name, field in metric_fields]
config_yml_content = {
'fileFormatVersion': '1.2.0',
'dataFlowPipelineInfo': {
'author': self.author,
'createdOn': datetime.now(),
'dataFlowPipelineVersion': version,
'description': self.desc if self.desc else 'Created by AI SDK',
'projectName': self.name,
'packageId': str(self.package_id)
},
'dataFlowPipeline': {
'components': [component._to_dict() for component in self.components.values()],
'pipelineDag': pipeline_dag,
'pipelineInputs': pipeline_inputs,
'pipelineOutputs': pipeline_outputs,
},
'packageType': 'full'
}
if len(self.parameters) > 0:
config_yml_content["dataFlowPipeline"]["pipelineParameters"] = [
param.__param_dict__()
for param in self.parameters
]
if self.cycle_time is not None:
config_yml_content["dataFlowPipeline"]["cycle_time"] = self.cycle_time
return config_yml_content
save_pipeline_config(destination) ¶
Saves the information about the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for AI Inference Server. The file will be created in the destination folder with name pipeline_config.yml
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | path - like | Path of the | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_pipeline_config(self, destination):
"""
Saves the information about the composed pipeline configuration package into a YAML file.
This YAML file describes the components and the data flow between them for AI Inference Server.
The file will be created in the `destination` folder with name `pipeline_config.yml`
Args:
destination (path-like): Path of the `destination` directory.
"""
with open(Path(destination) / PIPELINE_CONFIG, "w") as f:
yaml.dump(self.get_pipeline_config(), f)
get_datalink_metadata() ¶
The method generates metadata information based on available information.
Returns:
| Name | Type | Description |
|---|---|---|
dict | dict | Dictionary with the necessary information for the AI Inference Server. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def get_datalink_metadata(self) -> dict:
"""
The method generates metadata information based on available information.
Returns:
dict: Dictionary with the necessary information for the AI Inference Server.
"""
timeshifting = {
"id": None,
"enabled": False,
"periodicity": self.periodicity,
"startingPoint": None,
}
if self.periodicity is not None:
timeshifting["enabled"] = True
timeshifting["startingPoint"] = 'First timestamp'
if len(self.timeshift_reference) > 0:
timeshifting["startingPoint"] = 'Signal reference'
exported_metadata = {
"fileFormatVersion": "1.0.0",
"id": None,
"version": None,
"createdOn": datetime.now(),
"updatedOn": datetime.now(),
"timeShifting": timeshifting,
"inputs": [
{
'name': input.variableName,
'mapping': None,
'timeShiftingReference': input.variableName in self.timeshift_reference,
'type': self.components[input.componentName].inputs[input.variableName]['type']
} for input in self.inputs
]
}
return exported_metadata
save_datalink_metadata(destination) ¶
Saves metadata for pipeline input variables. This method saves metadata for the AI Inference Server into a YAML file. This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment. The file is created in the destination folder with the name datalink_metadata.yml
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | path - like | Path of the destination directory. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_datalink_metadata(self, destination):
"""
Saves metadata for pipeline input variables.
This method saves metadata for the AI Inference Server into a YAML file.
This metadata determines how the AI Inference Server feeds input to the pipeline, especially inter-signal alignment.
The file is created in the `destination` folder with the name `datalink_metadata.yml`
Args:
destination (path-like): Path of the destination directory.
"""
with open(Path(destination) / DATALINK_METADATA, "w") as f:
yaml.dump(self.get_datalink_metadata(), f)
save_telemetry_data(destination) ¶
Save telemetry data to a specified destination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | Path | The path where the telemetry data should be saved. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save_telemetry_data(self, destination: Path):
"""
Save telemetry data to a specified destination.
Args:
destination (Path): The path where the telemetry data should be saved.
"""
telemetry_path = destination / TELEMETRY_YAML
telemetry_data = {}
try:
consent_allowed = is_telemetry_allowed()
except RuntimeError:
consent_allowed = "undecided"
try:
consent_data, _ = read_telemetry_consent_file()
except BaseException:
consent_data = {}
telemetry_data["telemetry_consent"] = {}
telemetry_data["telemetry_consent"][CONSENT_KEY] = consent_allowed
telemetry_data["telemetry_consent"][CONSENT_KEY_SDK] = consent_data.get(CONSENT_KEY_SDK, 'unknown')
telemetry_data["telemetry_consent"][CONSENT_KEY_TS] = consent_data.get(CONSENT_KEY_TS, 'unknown')
telemetry_data["telemetry_consent"][CONSENT_KEY_METHOD] = consent_data.get(CONSENT_KEY_METHOD, get_consent_source())
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
_logger.debug("simaticai package not found")
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component].python_version for component in self.components
if isinstance(self.components[component], PythonComponent)))
telemetry_data["pipeline"]["file_extensions"] = list(set(f.suffix for f in Path(destination).rglob("*")
if f.suffix not in ["", ".zip", ".yml", ".yaml", ".html"]))
with open(telemetry_path, 'w') as telemetry_file:
yaml.dump(telemetry_data, telemetry_file)
validate(destination='.') ¶
Validates whether the package configuration is compatible with the expected runtime environment.
The method verifies:
- If the package has at least one component
- If all wires create connections between existing components and their variables
- If metadata is defined and valid.
- If a package with the same name already exists in the
destinationfolder. In this case a warning message appears and thesave(..)method overwrites the existing package. - If the package has multiple components and if they are using the same Python version
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | str | Path of the expected destination folder. Defaults to ".". | '.' |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def validate(self, destination: str | Path = "."):
"""
Validates whether the package configuration is compatible with the expected runtime environment.
The method verifies:
- If the package has at least one component
- If all wires create connections between existing components and their variables
- If metadata is defined and valid.
- If a package with the same name already exists in the `destination` folder. In this case a warning message appears and the `save(..)` method overwrites the existing package.
- If the package has multiple components and if they are using the same Python version
Args:
destination (str, optional): Path of the expected destination folder. Defaults to ".".
"""
version = self.save_version if self.save_version is not None else self.init_version
if len(self.components) < 1:
raise AssertionError("The package must have at least one component.")
for p_output in self.outputs:
if self.components[p_output.componentName].batch.outputBatch:
raise AssertionError(f"The component '{p_output.componentName}' has pipeline output defined with variable name '{p_output.variableName}'. \
None of component with pipeline output is allowed to provide batch output.")
for wire_hash in self.wiring.copy():
wire = self.wiring[wire_hash]
self._check_wiring(wire, wire_hash)
pipeline_inputs = [p_input.variableName for p_input in self.inputs]
pipeline_outputs = [p_output.variableName for p_output in self.outputs]
if any(variable in pipeline_outputs for variable in pipeline_inputs):
conflicts = set(pipeline_inputs).intersection(set(pipeline_outputs))
raise AssertionError(f"Pipeline input and output variables must be unique. Conflicting variables: {conflicts}")
self._check_timeshifting()
self._check_cycle_time()
self._check_supported_types_for_connections()
package_path = Path(destination) / f"{self.name}_{version}".replace(" ", "-")
if package_path.is_dir():
_logger.warning(f"Target folder ({package_path}) already exists! Unless changing the package name the package could be invalid and your files will be overwritten!")
python_versions = set()
for component in self.components:
self.components[component].validate()
if isinstance(self.components[component], PythonComponent):
python_versions.add(self.components[component].python_version)
if (1 < len(python_versions)):
_logger.warning("The use of multiple python version in a single pipeline is not recommended. We recommend using only one of the supported versions, which are Python 3.11 or 3.12.")
_logger.info(f"Package '{self.name}' is valid and ready to save.")
save(destination='.', package_id=None, version=None) ¶
@Deprecated, reason: only edge package generation will be supported in the future. Use export instead.
Saves the assembled package in a zip format. The name of the file is defined as {package_name}_{package_version}.zip. If a file with such a name already exists in the destination folder, it gets overwritten and a warning message appears.
The package is also available as a subfolder on the destination path with the name {package_name}_{package_version}. If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.
The package contains files and folders in the following structure:
Package folder with name
{package_name}_{package_version}datalink-metadata.ymlpipeline-config.yml- Component folder with name
{component_name}
When the component is a
PythonComponent, this folder contains:requirements.txt- Entrypoint script defined by the entrypoint of the component
- Extra files as added to the specified folders
- Source folder with name
srcwith necessary python scripts
If a package ID is specified, and a package with the same ID and version is already present in the destination folder, an error is raised.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | str | Target directory for saving the package. Defaults to ".". | '.' |
package_id | UUID | The optional package ID. If None, a new UUID is generated. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def save(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
"""
@Deprecated, reason: only edge package generation will be supported in the future. Use export instead.
Saves the assembled package in a zip format.
The name of the file is defined as `{package_name}_{package_version}.zip`.
If a file with such a name already exists in the `destination` folder, it gets overwritten and a warning message appears.
The package is also available as a subfolder on the destination path with the name `{package_name}_{package_version}`.
If the assembled content does not meet the expected one, this content can be changed and simply packed into a zip file.
The package contains files and folders in the following structure:
- Package folder with name `{package_name}_{package_version}`
- `datalink-metadata.yml`
- `pipeline-config.yml`
- Component folder with name `{component_name}`
When the component is a `PythonComponent`, this folder contains:
- `requirements.txt`
- Entrypoint script defined by the entrypoint of the component
- Extra files as added to the specified folders
- Source folder with name `src` with necessary python scripts
If a package ID is specified, and a package with the same ID and version is already present in the `destination` folder,
an error is raised.
Args:
destination (str, optional): Target directory for saving the package. Defaults to ".".
package_id (UUID): The optional package ID. If None, a new UUID is generated.
"""
try:
# checking whether the user has made a statement about telemetry collection
is_telemetry_allowed()
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
self._set_save_version_and_package_id(Path(destination), package_id, version)
destination = Path(destination)
self.validate(destination)
name = self.name.replace(" ", "-")
package_name = f"{name}_{self.save_version}"
destination = destination / package_name
destination.mkdir(parents=True, exist_ok=True)
for component in self.components:
self.components[component].save(destination, False)
if isinstance(self.components[component], PythonComponent):
self.report_writer.add_direct_dependencies(self.components[component].name, self.components[component].python_dependencies.dependencies)
self.save_datalink_metadata(destination)
self.save_pipeline_config(destination)
p_page.save_readme_html(self, destination)
self.save_telemetry_data_if_consented(destination)
zip_destination = shutil.make_archive(
base_name=str(destination.parent / package_name), format='zip',
root_dir=destination.parent, base_dir=package_name,
verbose=True, logger=_logger)
return Path(zip_destination)
export(destination='.', package_id=None, version=None) ¶
Export a runnable pipeline package.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | str | optional target directory for saving the package. Defaults to ".". | '.' |
package_id | UUID | optional package ID. If None, a new UUID is generated. | None |
version | str | optional version. If None, an automatic version number is generated. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def export(self, destination: str | Path = ".", package_id: Optional[uuid.UUID] = None, version: Optional[str] = None) -> Path:
"""
Export a runnable pipeline package.
Args:
destination (str): optional target directory for saving the package. Defaults to ".".
package_id (UUID): optional package ID. If None, a new UUID is generated.
version (str): optional version. If None, an automatic version number is generated.
"""
try:
# checking whether the user has made a statement about telemetry collection
is_telemetry_allowed()
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
destination = Path(destination)
config_package = None
try:
config_package = self.save(destination, package_id, version)
runtime_package = convert_package(config_package, self.report_writer)
return runtime_package
finally:
if config_package is not None:
Path(config_package).unlink(missing_ok=True)
add_secret_parameter(name, desc=None) ¶
Adds a secret parameter to the pipeline configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with | required |
desc | str | Description of the secret parameter. Must be a string between 3 and 60 characters if provided. | None |
Raises ValueError if the conditions for name or description are not met.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.pydef add_secret_parameter(self, name: str, desc: str | None = None):
"""
Adds a secret parameter to the pipeline configuration.
Args:
name (str): Name of the secret parameter. Must contain only letters, digits, underscores, and hyphens. Cannot start with `__AI_IS_` prefix.
desc (str, optional): Description of the secret parameter. Must be a string between 3 and 60 characters if provided.
Raises ValueError if the conditions for name or description are not met.
"""
self._add_parameter(SecretPipelineParameter(name=name, description=desc))
add_parameter(name, default_value, type_name='String', topic_based=False, desc=None) ¶
Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline. The parameter's default value and its properties are saved in the pipeline configuration and the value of the parameter can later be changed on AI Inference Server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the parameter | required |
desc | str | Description of the parameter (optional) | None |
type_name | str | Data type of the parameter. Defaults to "String". | 'String' |
default_value | str | Default value of the parameter | required |
topic_based | bool | If true, the parameter can be updated from a message queue. | False |
ValueError:
When:
- the default value of the parameter is not of the specified data type (`type_name`) or
- the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
- the specified data type is not given in the right format or
- the type of the given `topic_based` parameter is not `bool`.
- the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def add_parameter(self, name: str, default_value, type_name: str = "String", topic_based: bool = False, desc: str = None):
"""
Adds a parameter to the pipeline configuration, which alters the behavior of the pipeline.
The parameter's default value and its properties are saved in the pipeline configuration
and the value of the parameter can later be changed on AI Inference Server.
Args:
name (str): Name of the parameter
desc (str): Description of the parameter (optional)
type_name (str, optional): Data type of the parameter. Defaults to "String".
default_value (str): Default value of the parameter
topic_based (bool, optional): If true, the parameter can be updated from a message queue.
Raises:
ValueError:
When:
- the default value of the parameter is not of the specified data type (`type_name`) or
- the specified data type itself is not an allowed data type (not a part of `parameter_types` dict) or
- the specified data type is not given in the right format or
- the type of the given `topic_based` parameter is not `bool`.
- the name of the parameter starts with `__AI_IS_` prefix. These are reserved parameters by AI Inference Server
"""
self._add_parameter(PipelineParameter(
name=name,
defaultValue=default_value,
dtype=type_name,
topicBased=topic_based,
description=desc))
convert_package(zip_path, report_writer=None) ¶
@Deprecated, reason: only edge package generation will be supported in the future. Use Pipeline.export(...) instead.
Create an Edge Configuration Package from a given Pipeline Configuration Package.
If the input zip file is {path}/{name}_{version}.zip, the output file will be created as {path}/{name}-edge_{version}.zip. Please make sure that the given zip file comes from a trusted source!
If a file with such a name already exists, it is overwritten.
First, this method verifies that the requirements identified by name and version are either included in PythonPackages.zip or available on pypi.org for the target platform.
Currently, the supported edge devices run Linux on 64-bit x86 architecture, so the accepted Python libraries are restricted to the platform independent ones and packages built for 'x86_64' platforms. AI Inference Server also provides a Python 3.11 and 3.12 runtime environment, so the supported Python libraries are restricted to Python 3.11 and 3.12 compatible packages.
If for the target platform the required dependency is not available on pypi.org and not present in PythonPackages.zip, it will log the problem at ERROR level. Then it downloads all dependencies (either direct or transitive), and creates a new zip file, which is validated against the AI Inference Server's schema. This functionality requires pip with version of 21.3.1 or greater.
This method can be used from the command line too. Example usage:
python -m simaticai convert_package <path_to_pipeline_configuration_package.zip>
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
zip_path | path - like | path to the pipeline configuration package zip file. | required |
report_writer | ReportWriter | a ReportWriter object to write the report for a pipeline. Defaults to None. | None |
Returns:
| Type | Description |
|---|---|
Path | os.PathLike: The path of the created zip file. |
Raises:
| Type | Description |
|---|---|
PipelineValidationError | If the validation fails. See the logger output for details. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/pipeline.py
def convert_package(zip_path: Union[str, os.PathLike], report_writer: Optional[PipelineReportWriter] = None) -> Path:
"""
@Deprecated, reason: only edge package generation will be supported in the future. Use Pipeline.export(...) instead.
Create an Edge Configuration Package from a given Pipeline Configuration Package.
If the input zip file is `{path}/{name}_{version}.zip`, the output file will be created as `{path}/{name}-edge_{version}.zip`.
Please make sure that the given zip file comes from a trusted source!
If a file with such a name already exists, it is overwritten.
First, this method verifies that the requirements identified by name and version are either included
in `PythonPackages.zip` or available on pypi.org for the target platform.
Currently, the supported edge devices run Linux on 64-bit x86 architecture, so the accepted Python libraries are restricted to the platform independent ones and packages built for 'x86_64' platforms.
AI Inference Server also provides a Python 3.11 and 3.12 runtime environment, so the supported Python libraries are restricted to Python 3.11 and 3.12 compatible packages.
If for the target platform the required dependency is not available on pypi.org
and not present in `PythonPackages.zip`, it will log the problem at ERROR level.
Then it downloads all dependencies (either direct or transitive), and creates a new zip
file, which is validated against the AI Inference Server's schema.
This functionality requires pip with version of 21.3.1 or greater.
This method can be used from the command line too.
Example usage:
```
python -m simaticai convert_package <path_to_pipeline_configuration_package.zip>
```
Args:
zip_path (path-like): path to the pipeline configuration package zip file.
report_writer (ReportWriter, optional): a ReportWriter object to write the report for a pipeline. Defaults to None.
Returns:
os.PathLike: The path of the created zip file.
Exceptions:
PipelineValidationError: If the validation fails. See the logger output for details.
"""
try:
# checking whether the user has made a statement about telemetry collection
is_telemetry_allowed()
except RuntimeError as consent_is_undecided:
raise consent_is_undecided
zip_path = Path(zip_path)
if zip_path.stem.find('_') < 0:
raise AssertionError("The input zip file name must contain an underscore character.")
with tempfiles.OpenZipInTemp(zip_path) as zip_dir:
top_level_items = list(zip_dir.iterdir())
if len(top_level_items) != 1:
raise AssertionError("The Pipeline Configuration Package must contain a single top level directory.")
package_dir = zip_dir / top_level_items[0]
runtime_dir = zip_dir / "edge_config_package"
runtime_dir.mkdir(parents=True, exist_ok=True)
config = yaml_helper.read_yaml(package_dir / PIPELINE_CONFIG)
_validate_with_schema("input pipeline_config.yml", config, "pipeline.schema.json")
runtime_config = _generate_runtime_config(config)
if report_writer is not None:
report_writer.set_path(Path(zip_path.parent / f"{zip_path.stem}_package_report.md"))
report_writer.set_pipeline_config(config)
for component in config['dataFlowPipeline']['components']:
source_dir = package_dir / component["name"]
if component["runtime"]["type"] == "python":
python_version = component['runtime']['version']
try:
python_version_validator(python_version)
except ValueError as error:
raise AssertionError(error)
pip_option_no_deps = component["runtime"].get("installTransitiveDependencies", True) is False
if pip_option_no_deps:
_logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
"This may lead to runtime errors if the dependencies are not handled manually.")
dependency_set = _package_component_dependencies(source_dir, python_version, pip_option_no_deps)
if report_writer is not None:
report_writer.add_full_dependency_set(component_name=component["name"], dependency_set=dependency_set)
runtime_config["runtimeConfiguration"]["components"].append({
"name": component["name"],
"device": "IED1",
"targetRuntime": "Python",
})
if component["runtime"]["type"] == "gpuruntime":
runtime_config["runtimeConfiguration"]["components"].append({
"name": component["name"],
"device": "IED1",
"targetRuntime": "gpuruntime",
})
_package_component(source_dir, runtime_dir / 'components' / f"{component['name']}_{component['version']}")
if report_writer is not None:
report_writer.write_report()
_logger.info(f"Report on {zip_path.stem} is saved to {zip_path.parent}.")
shutil.copy(str(package_dir / PIPELINE_CONFIG), str(runtime_dir / PIPELINE_CONFIG))
datalink_metadata_yaml = package_dir / DATALINK_METADATA
if datalink_metadata_yaml.is_file():
shutil.copy(str(datalink_metadata_yaml), runtime_dir / DATALINK_METADATA)
_validate_with_schema(f"generated {RUNTIME_CONFIG}", runtime_config, "runtime.schema.json")
with open(runtime_dir / RUNTIME_CONFIG, "w", encoding="utf8") as file:
yaml.dump(runtime_config, file)
readme_html = package_dir / README_HTML
if readme_html.exists():
(runtime_dir / README_HTML).write_text(readme_html.read_text())
telemetry_yaml = package_dir / TELEMETRY_YAML
if telemetry_yaml.exists():
(runtime_dir / TELEMETRY_YAML).write_text(telemetry_yaml.read_text())
edge_package_path = Path(shutil.make_archive(
# replacing the last occurrence of "_" with "-edge_".
base_name=str(PurePath(zip_path).parent / "-edge_".join(zip_path.stem.rsplit("_", 1))),
format='zip',
root_dir=runtime_dir,
verbose=True,
logger=_logger))
package_size = edge_package_path.stat().st_size # zipped package size in bytes
if package_size > PACKAGE_SIZE_LIMIT:
package_size_GB = "{:.2f}".format(package_size / 1000 / 1000 / 1000)
package_size_limit_GB = "{:.2f}".format(PACKAGE_SIZE_LIMIT / 1000 / 1000 / 1000)
error_msg = f"Pipeline package size {package_size} bytes ({package_size_GB} GB) exceeds the limit of " \
f"{PACKAGE_SIZE_LIMIT} bytes ({package_size_limit_GB} GB). " \
"Please remove unnecessary files and dependencies and try again."
_logger.error(error_msg)
raise RuntimeError(error_msg)
sha256_hash = calc_sha(edge_package_path)
sha_format = f"{sha256_hash} {edge_package_path.name}"
edge_package_path.with_suffix('.sha256').write_text(sha_format)
return edge_package_path
PythonComponent ¶
Bases: Component
A pipeline component implemented using Python scripts and libraries.
A PythonComponent wraps Python code resource files such as saved models into a structured folder, which can be added to a pipeline configuration package.
For a comprehensive overview on how to wrap ML models into Python components, we recommend you refer to the AI SDK User Manual, especially the guideline for writing pipeline components. We also recommend you study the example Python components in the E2E Tutorials for the AI SDK.
A new PythonComponent is empty.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Component name. (default: inference) | 'inference' |
desc | str | Component description (optional) | '' |
version | str | Component version. (default: 0.0.1) | '0.0.1' |
python_version | str | Python version on the target AI Inference Server. At the moment of writing, the current version supports Python 3.11 and 3.12. | '3.12' |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
class PythonComponent(Component):
"""
A pipeline component implemented using Python scripts and libraries.
A `PythonComponent` wraps Python code resource files such as saved models into a structured folder, which can be added to a pipeline
configuration package.
For a comprehensive overview on how to wrap ML models into Python components, we recommend you refer to
the AI SDK User Manual, especially the guideline for writing pipeline components. We also recommend you
study the example Python components in the E2E Tutorials for the AI SDK.
A new `PythonComponent` is empty.
Args:
name (str): Component name. (default: inference)
desc (str): Component description (optional)
version (str): Component version. (default: 0.0.1)
python_version (str): Python version on the target AI Inference Server.
At the moment of writing, the current version supports Python 3.11 and 3.12.
"""
def __init__(self, name: str = "inference", version: str = "0.0.1", python_version: str = '3.12', desc: str = ""):
"""
Creates a new, empty Python component.
Args:
name (str): Component name. (default: inference)
desc (str): Component description (optional)
version (str): Component version. (default: 0.0.1)
python_version (str): Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12.
"""
super().__init__(name=name, desc=desc)
try:
python_version_validator(python_version)
except ValueError as error:
raise AssertionError(error)
self.python_version = str(python_version)
self.version = version
self.metrics = {}
self.entrypoint: Path | None = None
self.resources = {}
self.python_dependencies = PythonDependencies(python_version)
self._replicas = 1
self.is_valid = False
self.no_deps = False
def __repr__(self) -> str:
text = super().__repr__()
if len(self.metrics) > 0:
text += "\nMetrics:\n"
for name, metric in self.metrics.items():
text += f"< {name}{': ' + metric['desc'] if metric.get('desc') is not None else ''}\n"
if len(self.resources):
text += "\nResources:\n"
for path, base in self.resources.items():
text += f" {base}/{path.name}\n".replace('./', '')
if self.entrypoint is not None:
text += f"Entrypoint: {self.entrypoint}\n"
return text
def set_entrypoint(self, entrypoint: str):
"""
Sets the entrypoint module for the component.
The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server.
The script should consume a JSON string and produce another. See the short example below.
The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.
The example code below shows a basic structure of the entrypoint Python code.
```python
import json
import sys
from pathlib import Path
# by adding the parent folder of your modules to system path makes them available for relative import
sys.path.insert(0, str(Path('./src').resolve()))
from my_module import processor # then the processor module can be imported
def run(data: str):
input_data = json.loads(data) # incoming JSON string is loaded as a dictionary
result = processor.process_data(input_data) # the process_data can be called to process the incoming data
# the code below creates the formatted output for the AI Inference Server
if result is None:
answer = {"ready": False, "output": None}
else:
answer = {"ready": True, "output": json.dumps(result)}
return answer
```
Args:
entrypoint (str): Name of the new entrypoint script to be copied
"""
self.is_valid = False
if not any(key.name for key, value in self.resources.items() if key.name == entrypoint and value == '.'):
raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")
self.entrypoint = Path(entrypoint)
def add_resources(self, base_dir: os.PathLike, resources: os.PathLike | list):
"""
Adds files to a component.
To make your file resources available on the AI Inference Server you need to add them to the package resources.
These resources can be Python or config files, serialized ML models or reference data.
They are then available on path {component_root}/{resources} in the runtime environment.
When saving the package they will be copied from {base_dir}/{resources} into the package.
Files in '__pycache__' folders will be excluded.
Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.
Args:
base_dir (path-like): Root folder of your code from which the resources are referred
resources (os.PathLike or List): A single path or list of relative paths to resource files
"""
self.is_valid = False
base_dir = Path(base_dir).resolve().absolute()
if not base_dir.is_dir():
raise AssertionError(f"Parameter 'base_dir' must be a directory and available in path {base_dir}.")
resources = resources if type(resources) is list else [resources]
for resource in resources:
self._add_resource(base_dir, resource)
def _add_resource(self, base_dir: Path, resource: os.PathLike):
self.is_valid = False
if Path(resource).is_absolute() or '..' in resource:
raise AssertionError("The resource path must be relative and cannot contain '/../' elements.")
resource_path = base_dir / resource
if resource_path.is_file():
self._add_resource_file(base_dir, resource_path)
return
if resource_path.is_dir():
for glob_path in resource_path.rglob("*"):
if glob_path.is_file():
self._add_resource_file(base_dir, glob_path)
return
raise AssertionError(f"Specified resource is not a file or directory: '{resource}'")
def _add_resource_file(self, base_dir: Path, resource_path: Path):
self.is_valid = False
for parent in resource_path.parents:
if parent.name == '__pycache__':
return
if resource_path in self.resources.keys():
_logger.warning(f"Resource '{resource_path}' is already added to target directory '{self.resources[resource_path]}'")
return
self.resources[resource_path] = f"{resource_path.parent.relative_to(base_dir)}"
def add_dependencies(self, packages: list):
"""
Adds required dependencies for the Python code.
The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server.
The method will search for the packages for the target platform and collect their transitive dependencies as well.
Packages that are distributed only in source format can be added too, but only if they are pure Python packages.
Args:
packages (list): Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution
"""
self.is_valid = False
self.python_dependencies.add_dependencies(packages)
def set_requirements(self, requirements_path: os.PathLike, no_deps: bool = False):
"""
Reads the defined dependencies from the given `requirements.txt` file and creates a new dependency list. Previously added dependencies will be cleared.
The file format must follow Python's requirements file format defined in PEP 508.
It can contain URLs to additional repositories in the form of `--extra-index-url=my.repo.example.com`.
Args:
requirements_path (str): Path of the given `requirements.txt` file
no_deps (bool): If set to True, dependencies will be downloaded/installed without dependencies. Default False.
"""
self.is_valid = False
self.python_dependencies.set_requirements(requirements_path)
if no_deps:
self.no_deps = True
_logger.warning("WARNING! Requiring packages without their transitive dependencies is only supported on AI Inference Server version >=2.6.0")
_logger.warning("WARNING! Requiring packages without their transitive dependencies may lead to missing dependencies and runtime errors. Use with caution!")
def set_pyproject_toml(self, pyproject_path: os.PathLike):
"""
Reads the defined dependencies from the given `pyproject.toml` file and adds it to the requirements list.
Only the dependencies defined in the `[project]` section will be added to the component.
Args:
pyproject_path (str): Path of the given `pyproject.toml` file
"""
self.is_valid = False
self.python_dependencies.set_pyproject_toml(pyproject_path)
def add_python_packages(self, path: str) -> None:
"""
Adds Python package(s) to the `PythonPackages.zip` file of the component.
The `path` parameter can refer to either a `whl`, a `zip` or a `tar.gz` file.
Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed.
The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server.
The method uses the `tempfile.tempdir` folder, so make sure that the folder is writeable.
The wheel files must fulfill the requirements of the targeted device environment
(e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).
Args:
path (str): Path of the distribution file
Examples:
`component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel')`
adds the wheel file to `PythonPackages.zip` and adds dictionary item `component.dependencies['my_package'] = '0.0.1'`
`component.add_python_packages('../resources/inference-wheels.zip')`
adds all the wheel files in the zip to `PythonPackages.zip` and `component.dependencies`
"""
self.is_valid = False
self.python_dependencies.add_python_packages(path)
def set_parallel_steps(self, replicas):
"""
Sets the number of parallel executors.
This method configures how many instances of the component can be
executed at the same time.
The component must be suitable for parallel execution. The inputs arriving
to the component will be processed by different instances in parallel,
and these instances do not share their state (e.g. variables). Every
instance is initialized separately and receives only a fraction of the inputs.
AI Inference Server supports at most 8 parallel instances.```
Args:
replicas (int): Number of parallel executors. Default is 1.
Raises:
ValueError: if the given argument is not a positive integer.
"""
self.is_valid = False
if (not isinstance(replicas, int)) or replicas < 1:
raise ValueError("Replica count must be a positive integer.")
if 8 < replicas:
_logger.warning("The current maximum of parallel executors is 8.")
self._replicas = replicas
def add_metric(self, name: str, desc: str | None = None):
"""
Adds a metric that will be automatically used as a pipeline output.
Args:
name (str): Name of the metric.
desc (str): Description of the metric. (optional)
"""
if "_" not in name:
raise AssertionError("The metric name must contain at least one underscore")
if self.metrics is None:
self.metrics = {}
if name in self.metrics:
raise AssertionError(f"Metric '{name}' already exists")
self.metrics[name] = {}
if desc is not None:
self.metrics[name]['desc'] = desc
def delete_metric(self, name: str):
"""
Remove a previously added metric.
Args:
name (str): Name of the metric to be deleted.
"""
if name not in self.metrics:
raise AssertionError(f"Component '{self.name}' has no metric '{name}'")
self.metrics.pop(name)
def _to_dict(self):
runtime = {
'type': 'python',
'version': self.python_version,
}
if self.no_deps:
runtime['installTransitiveDependencies'] = False
component_dict = {
**super()._to_dict(),
'version': self.version,
'entrypoint': f"./{self.entrypoint.name}",
'hwType': 'CPU',
'runtime': runtime,
'replicas': self._replicas
}
component_dict["outputType"] += [{
'name': name,
'type': 'String',
'metric': True,
} for name in self.metrics.keys()]
return component_dict
def enable_dependency_optimization(self):
"""
Allows changing repository URLs to optimize the package size
Allows the replacement of the `--index-url` argument during `pip download` to download
CPU runtime optimized dependencies only. Enabling this optimization, the present
`--index-url` will be prepended to the `--extra-index-url` list, and the Pytorch CPU only repository
will be set as the `--index-url`.
A warning message will be printed if the repository URL modification was necessary.
Some dependencies have both CPU and GPU runtime
versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
packaging the additional GPU runtime dependencies just enlarges the package size.
If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
`GPURuntimeComponent`.
"""
self.python_dependencies.enable_dependency_optimization()
def disable_dependency_optimization(self):
"""
Disables any modification to repository URLs
Disables the replacement of the `--index-url` argument during `pip download`.
This way all `--index-url` or `--extra-index-url` arguments will be preserved if they
were present in the requirements.txt file.
Some dependencies have both CPU and GPU runtime
versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
packaging the additional GPU runtime dependencies just enlarges the package size.
A warning message will be printed about the package size if this optimization is disabled and
the dependency list contains GPU optimized dependencies.
Disabling this optimization will not allow the component to run on GPU.
If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
`GPURuntimeComponent`.
"""
self.python_dependencies.disable_dependency_optimization()
def validate(self):
"""
Validates that the component is ready to be serialized and packaged as part of a pipeline.
"""
if not self.is_valid:
if self.entrypoint is None:
raise AssertionError("Entrypoint must be defined")
if not any(key.name for key, value in self.resources.items() if key.name == self.entrypoint.name and value == '.'):
raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")
if len(self.python_dependencies.dependencies) < 1:
_logger.warning(f"WARNING! There are no dependencies defined for component '{self.name}'. Please make sure that all necessary dependencies have been added.")
self.python_dependencies.validate()
self.is_valid = True
_logger.info(f"Component '{self.name}' is valid and ready to use.")
def save(self, destination, validate=True):
"""
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
Validation can be skipped by setting parameter `validate` to False.
This is useful when the component is already validated and only intended to be saved.
The component folder contains the following:
- `requirements.txt` with a list of Python dependencies
- Entry point script defined by the `entrypoint` attribute of the component
- Extra files as added to the specified folders
- `PythonPackages.zip` with the wheel binaries for the environment to be installed
Args:
destination (path-like): Target directory to which the component will be saved.
validate (bool): With value True, triggers component validation. Defaults to True.
"""
if validate:
self.validate()
folder_path = Path(destination) / self.name
folder_path.mkdir(parents=True, exist_ok=True)
for file_path in self.resources:
dir_path = folder_path / self.resources[file_path]
os.makedirs(dir_path, exist_ok=True)
shutil.copy(file_path, dir_path / file_path.name)
self.python_dependencies.save(folder_path)
__init__(name='inference', version='0.0.1', python_version='3.12', desc='') ¶
Creates a new, empty Python component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Component name. (default: inference) | 'inference' |
desc | str | Component description (optional) | '' |
version | str | Component version. (default: 0.0.1) | '0.0.1' |
python_version | str | Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12. | '3.12' |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def __init__(self, name: str = "inference", version: str = "0.0.1", python_version: str = '3.12', desc: str = ""):
"""
Creates a new, empty Python component.
Args:
name (str): Component name. (default: inference)
desc (str): Component description (optional)
version (str): Component version. (default: 0.0.1)
python_version (str): Python version on the target AI Inference Server. At the moment of writing, AI Inference Server supports Python 3.11 and 3.12.
"""
super().__init__(name=name, desc=desc)
try:
python_version_validator(python_version)
except ValueError as error:
raise AssertionError(error)
self.python_version = str(python_version)
self.version = version
self.metrics = {}
self.entrypoint: Path | None = None
self.resources = {}
self.python_dependencies = PythonDependencies(python_version)
self._replicas = 1
self.is_valid = False
self.no_deps = False
set_entrypoint(entrypoint) ¶
Sets the entrypoint module for the component.
The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server. The script should consume a JSON string and produce another. See the short example below.
The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.
The example code below shows a basic structure of the entrypoint Python code.
import json
import sys
from pathlib import Path
# by adding the parent folder of your modules to system path makes them available for relative import
sys.path.insert(0, str(Path('./src').resolve()))
from my_module import processor # then the processor module can be imported
def run(data: str):
input_data = json.loads(data) # incoming JSON string is loaded as a dictionary
result = processor.process_data(input_data) # the process_data can be called to process the incoming data
# the code below creates the formatted output for the AI Inference Server
if result is None:
answer = {"ready": False, "output": None}
else:
answer = {"ready": True, "output": json.dumps(result)}
return answer
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entrypoint | str | Name of the new entrypoint script to be copied | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_entrypoint(self, entrypoint: str):
"""
Sets the entrypoint module for the component.
The entrypoint is the Python code which is responsible for receiving the input data and producing a structured response with the output for the AI Inference Server.
The script should consume a JSON string and produce another. See the short example below.
The file will be copied into the root directory of the component on the AI Inference Server, so every file reference should be aligned.
The example code below shows a basic structure of the entrypoint Python code.
```python
import json
import sys
from pathlib import Path
# by adding the parent folder of your modules to system path makes them available for relative import
sys.path.insert(0, str(Path('./src').resolve()))
from my_module import processor # then the processor module can be imported
def run(data: str):
input_data = json.loads(data) # incoming JSON string is loaded as a dictionary
result = processor.process_data(input_data) # the process_data can be called to process the incoming data
# the code below creates the formatted output for the AI Inference Server
if result is None:
answer = {"ready": False, "output": None}
else:
answer = {"ready": True, "output": json.dumps(result)}
return answer
```
Args:
entrypoint (str): Name of the new entrypoint script to be copied
"""
self.is_valid = False
if not any(key.name for key, value in self.resources.items() if key.name == entrypoint and value == '.'):
raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")
self.entrypoint = Path(entrypoint)
add_resources(base_dir, resources) ¶
Adds files to a component.
To make your file resources available on the AI Inference Server you need to add them to the package resources. These resources can be Python or config files, serialized ML models or reference data. They are then available on path {component_root}/{resources} in the runtime environment. When saving the package they will be copied from {base_dir}/{resources} into the package. Files in 'pycache' folders will be excluded. Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_dir | path - like | Root folder of your code from which the resources are referred | required |
resources | PathLike or List | A single path or list of relative paths to resource files | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_resources(self, base_dir: os.PathLike, resources: os.PathLike | list):
"""
Adds files to a component.
To make your file resources available on the AI Inference Server you need to add them to the package resources.
These resources can be Python or config files, serialized ML models or reference data.
They are then available on path {component_root}/{resources} in the runtime environment.
When saving the package they will be copied from {base_dir}/{resources} into the package.
Files in '__pycache__' folders will be excluded.
Until version 2.3.0 of AI SDK hidden files and folders (starting with '.') are also excluded.
Args:
base_dir (path-like): Root folder of your code from which the resources are referred
resources (os.PathLike or List): A single path or list of relative paths to resource files
"""
self.is_valid = False
base_dir = Path(base_dir).resolve().absolute()
if not base_dir.is_dir():
raise AssertionError(f"Parameter 'base_dir' must be a directory and available in path {base_dir}.")
resources = resources if type(resources) is list else [resources]
for resource in resources:
self._add_resource(base_dir, resource)
add_dependencies(packages) ¶
Adds required dependencies for the Python code.
The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server. The method will search for the packages for the target platform and collect their transitive dependencies as well. Packages that are distributed only in source format can be added too, but only if they are pure Python packages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packages | list | Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_dependencies(self, packages: list):
"""
Adds required dependencies for the Python code.
The list must contain the name of the Python packages or tuples in the form of (name, version) which are required to execute the component on AI Inference Server.
The method will search for the packages for the target platform and collect their transitive dependencies as well.
Packages that are distributed only in source format can be added too, but only if they are pure Python packages.
Args:
packages (list): Can be a list of strings (name) or a list of tuples (name, version) of the required packages for component execution
"""
self.is_valid = False
self.python_dependencies.add_dependencies(packages)
set_requirements(requirements_path, no_deps=False) ¶
Reads the defined dependencies from the given requirements.txt file and creates a new dependency list. Previously added dependencies will be cleared.
The file format must follow Python's requirements file format defined in PEP 508. It can contain URLs to additional repositories in the form of --extra-index-url=my.repo.example.com.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requirements_path | str | Path of the given | required |
no_deps | bool | If set to True, dependencies will be downloaded/installed without dependencies. Default False. | False |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_requirements(self, requirements_path: os.PathLike, no_deps: bool = False):
"""
Reads the defined dependencies from the given `requirements.txt` file and creates a new dependency list. Previously added dependencies will be cleared.
The file format must follow Python's requirements file format defined in PEP 508.
It can contain URLs to additional repositories in the form of `--extra-index-url=my.repo.example.com`.
Args:
requirements_path (str): Path of the given `requirements.txt` file
no_deps (bool): If set to True, dependencies will be downloaded/installed without dependencies. Default False.
"""
self.is_valid = False
self.python_dependencies.set_requirements(requirements_path)
if no_deps:
self.no_deps = True
_logger.warning("WARNING! Requiring packages without their transitive dependencies is only supported on AI Inference Server version >=2.6.0")
_logger.warning("WARNING! Requiring packages without their transitive dependencies may lead to missing dependencies and runtime errors. Use with caution!")
set_pyproject_toml(pyproject_path) ¶
Reads the defined dependencies from the given pyproject.toml file and adds it to the requirements list.
Only the dependencies defined in the [project] section will be added to the component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pyproject_path | str | Path of the given | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_pyproject_toml(self, pyproject_path: os.PathLike):
"""
Reads the defined dependencies from the given `pyproject.toml` file and adds it to the requirements list.
Only the dependencies defined in the `[project]` section will be added to the component.
Args:
pyproject_path (str): Path of the given `pyproject.toml` file
"""
self.is_valid = False
self.python_dependencies.set_pyproject_toml(pyproject_path)
add_python_packages(path) ¶
Adds Python package(s) to the PythonPackages.zip file of the component.
The path parameter can refer to either a whl, a zip or a tar.gz file. Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed. The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server. The method uses the tempfile.tempdir folder, so make sure that the folder is writeable.
The wheel files must fulfill the requirements of the targeted device environment (e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path | str | Path of the distribution file | required |
Examples:
component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel') adds the wheel file to PythonPackages.zip and adds dictionary item component.dependencies['my_package'] = '0.0.1'
component.add_python_packages('../resources/inference-wheels.zip') adds all the wheel files in the zip to PythonPackages.zip and component.dependenciesSource code in
docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.pydef add_python_packages(self, path: str) -> None:
"""
Adds Python package(s) to the `PythonPackages.zip` file of the component.
The `path` parameter can refer to either a `whl`, a `zip` or a `tar.gz` file.
Zip files can be either a source distribution package or a collection of Python packages. Only pure Python source distributions are allowed.
The dependency list of the component will be extended with the files added here, so that they will also get installed on the AI Inference Server.
The method uses the `tempfile.tempdir` folder, so make sure that the folder is writeable.
The wheel files must fulfill the requirements of the targeted device environment
(e.g., the Python version must match the supported Python version of the targeted AI Inference Server, and the platform should be one of the supported ones too).
Args:
path (str): Path of the distribution file
Examples:
`component.add_python_packages('../resources/my_package-0.0.1-py3-none-any.wheel')`
adds the wheel file to `PythonPackages.zip` and adds dictionary item `component.dependencies['my_package'] = '0.0.1'`
`component.add_python_packages('../resources/inference-wheels.zip')`
adds all the wheel files in the zip to `PythonPackages.zip` and `component.dependencies`
"""
self.is_valid = False
self.python_dependencies.add_python_packages(path)
set_parallel_steps(replicas) ¶
Sets the number of parallel executors.
This method configures how many instances of the component can be executed at the same time. The component must be suitable for parallel execution. The inputs arriving to the component will be processed by different instances in parallel, and these instances do not share their state (e.g. variables). Every instance is initialized separately and receives only a fraction of the inputs. AI Inference Server supports at most 8 parallel instances.```
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
replicas | int | Number of parallel executors. Default is 1. | required |
Raises:
| Type | Description |
|---|---|
ValueError | if the given argument is not a positive integer. |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def set_parallel_steps(self, replicas):
"""
Sets the number of parallel executors.
This method configures how many instances of the component can be
executed at the same time.
The component must be suitable for parallel execution. The inputs arriving
to the component will be processed by different instances in parallel,
and these instances do not share their state (e.g. variables). Every
instance is initialized separately and receives only a fraction of the inputs.
AI Inference Server supports at most 8 parallel instances.```
Args:
replicas (int): Number of parallel executors. Default is 1.
Raises:
ValueError: if the given argument is not a positive integer.
"""
self.is_valid = False
if (not isinstance(replicas, int)) or replicas < 1:
raise ValueError("Replica count must be a positive integer.")
if 8 < replicas:
_logger.warning("The current maximum of parallel executors is 8.")
self._replicas = replicas
add_metric(name, desc=None) ¶
Adds a metric that will be automatically used as a pipeline output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the metric. | required |
desc | str | Description of the metric. (optional) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def add_metric(self, name: str, desc: str | None = None):
"""
Adds a metric that will be automatically used as a pipeline output.
Args:
name (str): Name of the metric.
desc (str): Description of the metric. (optional)
"""
if "_" not in name:
raise AssertionError("The metric name must contain at least one underscore")
if self.metrics is None:
self.metrics = {}
if name in self.metrics:
raise AssertionError(f"Metric '{name}' already exists")
self.metrics[name] = {}
if desc is not None:
self.metrics[name]['desc'] = desc
delete_metric(name) ¶
Remove a previously added metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Name of the metric to be deleted. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def delete_metric(self, name: str):
"""
Remove a previously added metric.
Args:
name (str): Name of the metric to be deleted.
"""
if name not in self.metrics:
raise AssertionError(f"Component '{self.name}' has no metric '{name}'")
self.metrics.pop(name)
enable_dependency_optimization() ¶
Allows changing repository URLs to optimize the package size
Allows the replacement of the --index-url argument during pip download to download CPU runtime optimized dependencies only. Enabling this optimization, the present --index-url will be prepended to the --extra-index-url list, and the Pytorch CPU only repository will be set as the --index-url. A warning message will be printed if the repository URL modification was necessary. Some dependencies have both CPU and GPU runtime versions, pytorch for example, but a PythonComponent can only run on CPU, so packaging the additional GPU runtime dependencies just enlarges the package size. If you want to run your model on GPU, convert it to an ONNX model and use it within a GPURuntimeComponent.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.pydef enable_dependency_optimization(self):
"""
Allows changing repository URLs to optimize the package size
Allows the replacement of the `--index-url` argument during `pip download` to download
CPU runtime optimized dependencies only. Enabling this optimization, the present
`--index-url` will be prepended to the `--extra-index-url` list, and the Pytorch CPU only repository
will be set as the `--index-url`.
A warning message will be printed if the repository URL modification was necessary.
Some dependencies have both CPU and GPU runtime
versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
packaging the additional GPU runtime dependencies just enlarges the package size.
If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
`GPURuntimeComponent`.
"""
self.python_dependencies.enable_dependency_optimization()
disable_dependency_optimization() ¶
Disables any modification to repository URLs
Disables the replacement of the --index-url argument during pip download. This way all --index-url or --extra-index-url arguments will be preserved if they were present in the requirements.txt file. Some dependencies have both CPU and GPU runtime versions, pytorch for example, but a PythonComponent can only run on CPU, so packaging the additional GPU runtime dependencies just enlarges the package size. A warning message will be printed about the package size if this optimization is disabled and the dependency list contains GPU optimized dependencies. Disabling this optimization will not allow the component to run on GPU. If you want to run your model on GPU, convert it to an ONNX model and use it within a GPURuntimeComponent.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.pydef disable_dependency_optimization(self):
"""
Disables any modification to repository URLs
Disables the replacement of the `--index-url` argument during `pip download`.
This way all `--index-url` or `--extra-index-url` arguments will be preserved if they
were present in the requirements.txt file.
Some dependencies have both CPU and GPU runtime
versions, pytorch for example, but a `PythonComponent` can only run on CPU, so
packaging the additional GPU runtime dependencies just enlarges the package size.
A warning message will be printed about the package size if this optimization is disabled and
the dependency list contains GPU optimized dependencies.
Disabling this optimization will not allow the component to run on GPU.
If you want to run your model on GPU, convert it to an `ONNX` model and use it within a
`GPURuntimeComponent`.
"""
self.python_dependencies.disable_dependency_optimization()
validate() ¶
Validates that the component is ready to be serialized and packaged as part of a pipeline.Source code in
docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.pydef validate(self):
"""
Validates that the component is ready to be serialized and packaged as part of a pipeline.
"""
if not self.is_valid:
if self.entrypoint is None:
raise AssertionError("Entrypoint must be defined")
if not any(key.name for key, value in self.resources.items() if key.name == self.entrypoint.name and value == '.'):
raise AssertionError("Entrypoint must be added as resource to the root directory before setting up as entrypoint.")
if len(self.python_dependencies.dependencies) < 1:
_logger.warning(f"WARNING! There are no dependencies defined for component '{self.name}'. Please make sure that all necessary dependencies have been added.")
self.python_dependencies.validate()
self.is_valid = True
_logger.info(f"Component '{self.name}' is valid and ready to use.")
save(destination, validate=True) ¶
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package. Validation can be skipped by setting parameter validate to False. This is useful when the component is already validated and only intended to be saved.
The component folder contains the following:
requirements.txtwith a list of Python dependencies- Entry point script defined by the
entrypointattribute of the component - Extra files as added to the specified folders
PythonPackages.zipwith the wheel binaries for the environment to be installed
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
destination | path - like | Target directory to which the component will be saved. | required |
validate | bool | With value True, triggers component validation. Defaults to True. | True |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def save(self, destination, validate=True):
"""
Saves the component to a folder structure, so it can be used as part of a pipeline configuration package.
Validation can be skipped by setting parameter `validate` to False.
This is useful when the component is already validated and only intended to be saved.
The component folder contains the following:
- `requirements.txt` with a list of Python dependencies
- Entry point script defined by the `entrypoint` attribute of the component
- Extra files as added to the specified folders
- `PythonPackages.zip` with the wheel binaries for the environment to be installed
Args:
destination (path-like): Target directory to which the component will be saved.
validate (bool): With value True, triggers component validation. Defaults to True.
"""
if validate:
self.validate()
folder_path = Path(destination) / self.name
folder_path.mkdir(parents=True, exist_ok=True)
for file_path in self.resources:
dir_path = folder_path / self.resources[file_path]
os.makedirs(dir_path, exist_ok=True)
shutil.copy(file_path, dir_path / file_path.name)
self.python_dependencies.save(folder_path)
python_version_validator(version) ¶
Checks if Python version string is valid and describes supported version.
Only version 3.11 and 3.12 is supported. A patch version is optional and accepted but logs a warning.
Accepted syntaxes are:
- {major}.{minor}
- {major}.{minor}.{patch}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version | str | Python version string | required |
Raises:
| Type | Description |
|---|---|
ValueError | if the provided version is not supported |
Source code in docs/industrial-ai-suite/sdk/simaticai/deploy/python_component.py
def python_version_validator(version: str):
"""
Checks if Python version string is valid and describes supported version.
Only version 3.11 and 3.12 is supported. A patch version is optional and accepted but logs a warning.
Accepted syntaxes are:
- {major}.{minor}
- {major}.{minor}.{patch}
Args:
version (str): Python version string
Raises:
ValueError: if the provided version is not supported
"""
error_message = "The defined python version is not supported. Currently supported Python versions are 3.11 and 3.12. Python version must be specified only with major and minor version, e.g. '3.11'."
warning_message = """Required Python version was specified with patch version.
Please note that the patch digit of the required Python version is often not taken into account by the Python ecosystem,
so there is no guarantee it has the desired effect."""
match str(version).split('.'):
case ('3', '11' | '12'):
return # no patch version, so no warning
case ('3', '11' | '12', _):
_logger.warning(warning_message)
case _:
raise ValueError(error_message)