The chat responses are generated using Generative AI technology for intuitive search and may not be entirely accurate. They are not intended as professional advice. For full details, including our use rights, privacy practices and potential export control restrictions, please refer to our Generative AI Service Privacy Information. As this is a test version, please let us know if something irritating comes up. Like you get recommended a chocolate fudge ice cream instead of an energy managing application. If that occurs, please use the feedback button in our contact form!
Skip to content

Revolutionize your AI operations across locations with seamless cloud integration. Our Industrial AI Suite runs on a new line of Industrial PCs powered by NVIDIA's GPUs accelerating AI execution. This makes complex AI tasks in advanced automation broadly available and boosts efficiency.

testing

Test pipeline configuration package locally

When you have created your inference pipeline package, you could go straight on with deploying it to the AI Inference Server. However, we strongly recommend that you test your package before you deploy it. The benefits of local testing are the following:

  • You can figure out many potential problems quicker, as you don't have to go through a deployment cycle.
  • You can diagnose and troubleshoot issues more easily, as you can inspect artifacts in your development environment.
  • You can validate your fixes quicker and move on to further issues that have not surfaced yet due to earlier issues.
  • You can easily include the local pipeline tests into the test automation in your build process.

In general, we encourage you to apply state-of-the-art software engineering practices, such as unit testing and test driven development. This means that ideally you already have automated unit or even integration tests in place that make sure that the Python code and the saved models work according to expectations in isolation. This helps you localize errors when you put these pieces together and integrate them as a pipeline configuration package.

AI SDK package simaticai.testing provides two tools for local testing:

  • A pipeline validator, that performs a static validation of the package concerning the availability of required Python packages.
  • A pipeline runner, that lets you simulate the execution of your pipeline in your Python environment.

Please note that all this functionality applies to pipeline configuration packages, not edge configuration packages. In other words, you must use them before you convert your pipeline configuration package to an edge configuration package using the convert_package function. As the conversion itself is done in an automated way, most potential problems are already present in the package before the conversion, so a verification after conversion would only delay identifying these problems.

For more comprehensive guidance on how to test pipelines before deployment, we recommend you refer to the AI SDK User Manual, especially the chapter concerning local testing of pipeline configuration packages. We also recommend you study the project templates for the AI SDK, which provide concrete code examples that show how to feed a pipeline with different kinds of inputs in a local test.

LocalPipelineRunner

Simulates the execution of a pipeline in a local Python environment.

Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component, each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.

If the caller specifies no path, the working directory is temporary and is removed unless an error occurs. If the caller specifies a working directory with a path argument, the working directory is kept. This behavior can be overridden using boolean parameter cleanup.

Currently, the pipeline runner supports both the current process_input(data: dict) entrypoint signature and the legacy run(data: str) signature. If both entrypoints are present, process_input() takes precedence. Please note however that run() is deprecated, and support for it will be removed in future versions of the pipeline runner.

Parameters:

NameTypeDescriptionDefault
packageZippath - like

Path to the pipeline configuration package.

required
pathpath - like

Path to the working directory. If unset, a temporary directory is created.

None
cleanupbool

If set, the working directory is kept when True, and deleted when False. If unset, a temporary working directory is removed, and an explicit working directory is kept. When an error occurs in a component, the working directory is kept regardless of this value.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
class LocalPipelineRunner:
    """
    Simulates the execution of a pipeline in a local Python environment.

    Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component,
    each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.

    If the caller specifies no `path`, the working directory is temporary and is removed unless an error occurs.
    If the caller specifies a working directory with a `path` argument, the working directory is kept.
    This behavior can be overridden using boolean parameter `cleanup`.

    Currently, the pipeline runner supports both the current `process_input(data: dict)` entrypoint signature and the legacy
    `run(data: str)` signature. If both entrypoints are present, `process_input()` takes precedence. Please note however that
    `run()` is deprecated, and support for it will be removed in future versions of the pipeline runner.

    Args:
        packageZip (path-like): Path to the pipeline configuration package.
        path (path-like): Path to the working directory. If unset, a temporary directory is created.
        cleanup (bool): If set, the working directory is kept when True, and deleted when False. \
            If unset, a temporary working directory is removed, and an explicit working directory is kept. \
            When an error occurs in a component, the working directory is kept regardless of this value.
    """

    def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
        """
        Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

        Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

        Args:
            packageZip (path-like): Path to the pipeline configuration package.
            path (path-like): Path to the working directory. If unset a temporary directory will be created.
            cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
                If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
                When an error occurs in a component, the working directory will be kept regardless of this value.
        """
        self.package_zip: Path = Path(packageZip)
        self.path = Path(path).resolve().absolute() if path is not None else None
        self.components = {}
        self.parameters = {}
        self.cleanup = cleanup
        self.log_level = loglevel
        self.workdir: Path
        self.docker = VesselBaseDocker()

        self.report_writer = PipelineRunnerReportWriter()
        report_writer_handler = ReportWriterHandler(self.report_writer)
        _logger.addHandler(report_writer_handler)

    def __enter__(self):
        self.report_writer.set_package_zip_path(self.package_zip)
        timestamp = re.sub(r"[-:]", "", datetime.now(timezone.utc).isoformat(sep="_", timespec="seconds"))
        if self.path is not None:
            self.workdir = Path(self.path)
            self.workdir.mkdir(parents=True, exist_ok=True)
            self.cleanup = self.cleanup if self.cleanup is not None else False
        else:
            self.workdir = Path(tempfile.mkdtemp(prefix=f"LocalPipelineRunner_{timestamp}_"))
            self.cleanup = self.cleanup if self.cleanup is not None else True

        unzip_components = False
        with zipfile.ZipFile(self.package_zip) as zf:
            if 'runtime_config.yml' in zf.namelist():
                self.workdir = self.workdir / self.package_zip.stem
                zf.extractall(path=self.workdir)
                unzip_components = True
            else:
                zf.extractall(path=self.workdir)
                self.workdir = self.workdir / zf.namelist()[0]

        try:
            with open(self.workdir / "pipeline_config.yml") as cf:
                config = yaml.load(cf, Loader=yaml.FullLoader)

            components = config["dataFlowPipeline"].get("components", [])
            for component in components:
                component["context"] = None
                component["env_dir"] = self.workdir / component['name']
                self.components[component["name"]] = component

            if unzip_components:
                for component in components:
                    component_zip = f"{component['name']}_{component['version']}.zip"
                    with zipfile.ZipFile(self.workdir / 'components' / component_zip) as zf:
                        zf.extractall(path=self.workdir / component["name"])

            for parameter in config["dataFlowPipeline"].get("pipelineParameters", {}):
                self.parameters[parameter["name"]] = parameter

        except Exception:
            raise RuntimeError(INVALID_PIPELINE_PACKAGE_MESSAGE)

        return self

    def __exit__(self, exception_type, value, traceback):

        self.update_telemetry_data()
        self.report_writer.write_report()

        if self.cleanup:
            _logger.info("Removing local pipeline runner environment...")
            shutil.rmtree(self.workdir.parent)
        else:
            _logger.info(f"Leaving local pipeline runner environment in its final state at '{self.workdir}'")

    def collect_telemetry_data(self) -> dict:
        """
        Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

        Returns:
            dict: A dictionary containing the telemetry data.
        """
        telemetry_data = {}

        telemetry_data["platform"] = {}
        telemetry_data["platform"]["os"] = platform.system()
        telemetry_data["platform"]["release"] = platform.release()
        telemetry_data["platform"]["python_version"] = platform.python_version()

        _logger.info(f"locals: {locals()}")
        telemetry_data["environment"] = {}
        env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
        env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
        env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
        env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
        telemetry_data["environment"]["info"] = env_info

        telemetry_data["industrial_ai"] = {}
        try:
            telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
        except importlib_metadata.PackageNotFoundError:
            telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND

        telemetry_data["pipeline"] = {}
        telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
                                                                 if self.components[component]['runtime']['type'] == 'python'))
        telemetry_data["pipeline"]["file_extensions"] = []
        for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
            excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
            suffixes = list(set(f.suffix for f in component_dir.rglob("*")
                                if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
            for suffix in suffixes:
                if suffix not in telemetry_data["pipeline"]["file_extensions"]:
                    telemetry_data["pipeline"]["file_extensions"].append(suffix)

        return telemetry_data

    def update_telemetry_data(self):
        """
        Update the telemetry data and the package.

        This method updates the telemetry data by loading the existing data from a YAML file,
        or collecting new telemetry data if the file doesn't exist. It then updates the
        "last_test_run" field of the telemetry data with the current timestamp. The updated
        telemetry data is then written back to the YAML file.

        If the package contains a different version of the telemetry data file, a new package
        is created with the updated telemetry data. Otherwise, the existing package is
        overwritten with the new package containing the updated telemetry data.
        """
        _logger.info("Updating telemetry data and the package")
        telemetry_path = self.workdir / "telemetry_data.yml"
        if telemetry_path.is_file():
            telemetry_data = yaml.safe_load(telemetry_path.read_text())
        else:
            telemetry_data = self.collect_telemetry_data()

        telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
        telemetry_path.write_text(yaml.dump(telemetry_data))

        config_package = False
        with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
            for file in zip_read.namelist():
                if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
                    config_package = True
                    break

        new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"

        with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
            with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
                for file in zip_read.namelist():
                    if TELEMETRY_YAML not in file:
                        filepath = zip_read.extract(file, path=self.workdir)
                        zip_write.write(filepath, arcname=file)
                    else:
                        zip_write.write(telemetry_path, arcname=file)

        if config_package:
            shutil.copy(new_zip_path, self.package_zip)
        else:
            new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
            new_sha_path.write_text(calc_sha(new_zip_path))

    def _install_requirements(self, component, package_dir, no_index: bool = True):
        _logger.info("Installing requirements...")
        pip_report_file = component["env_dir"] / "pip_report.json"
        package_dir_path = _relative_to(package_dir, component["env_dir"])
        pip_report_file_path = _relative_to(pip_report_file, component["env_dir"])
        cmd = [
            str(component['python_path']),
            "-m", "pip", "install",
            "--no-warn-script-location",
            "-f", f"{package_dir_path}",
            "-r", REQUIREMENTS_TXT,
        ]

        if no_index:
            cmd += [ "--no-index" ]

        use_no_deps = component.get("runtime", {}).get("installTransitiveDependencies", True) is False
        if use_no_deps:
            cmd += [ "--no-deps" ]

        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)

        if 0 == result.returncode and use_no_deps:
            _logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
                            "This may lead to runtime errors if the dependencies are not handled manually.")

        # generate pip report with a dry run to get the list of installed packages (in case of CPU Python packages)
        if 0 == result.returncode and component.get("hwType", None) == "CPU":
            cmd += ["--dry-run", "--ignore-installed", "--report", f"{pip_report_file_path}"]
            subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
            self.report_writer.add_installed_packages(component["name"], pip_report_file)
        return result

    def _install_from_packages_zip(self, component, package_dir):
        result = self._install_requirements(component, package_dir, True)
        return 0 == result.returncode

    def _install_from_pypi_org(self, component, package_dir):
        return self._install_requirements(component, package_dir, False)

    def _init_component_venv(self, component: dict):
        """
        Creates a virtual environment in which the given component can run.

        Args:
            component (str): name of the selected component.
        """
        _logger.info(f"Creating virtual environment for component '{component['name']}'...")
        context_dir = component["env_dir"] / ".venv"

        if self.docker.is_vessel:
            _logger.info("Creating virtual environment in docker...")
            component["context"], component["python_path"] = self.docker._create_venv(component["env_dir"], component["runtime"]["version"])
            component["python_path"] = Path(component["python_path"]).resolve()
            _logger.info(f"Component context (docker): {component['context']}")
        else:
            builder = venv.EnvBuilder(with_pip=True, symlinks=False)
            builder.create(str(context_dir))
            component["context"] = builder.ensure_directories(context_dir)
            component['python_path'] = Path(component["context"].env_exe).resolve()
        _logger.debug(f"Component python_path: {component['python_path']}")

        _logger.info("Upgrading pip...")
        cmd = [str(component['python_path']), "-m", "pip", "install", "pip", "--upgrade"]
        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
        if result.returncode != 0:
            self.cleanup = False
            _logger.warning(f"Error upgrading pip:\n{result.stderr}")

        cmd = [str(component['python_path']), "-m", "pip", "install", "--upgrade", "setuptools"]
        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
        if result.returncode != 0:
            self.cleanup = False
            _logger.warning(f"Error installing setuptools:\n{result.stderr}")

        try:
            result = self._install_logmodule(component["python_path"], component["env_dir"])
        except Exception as err:
            _logger.error(err)
            self.cleanup = False
            raise RuntimeError("The 'simaticai' Python package is either not installed or does not contain package 'log_module'.") from None
        if result.returncode != 0:
            self.cleanup = False
            raise RuntimeError(f"Error installing log_module:\n{result.stderr}")

        req_list = Path(component["env_dir"] / "requirements.list")
        req_list.touch(exist_ok=True)
        if Path(component["env_dir"] / REQUIREMENTS_TXT).is_file():
            dependencies, extra_index, index_url = parse_requirements(component["env_dir"] / REQUIREMENTS_TXT)
            requirements = "#".join(dependencies.keys())
            req_list.write_text(requirements)
        else:
            _logger.info(f"'{REQUIREMENTS_TXT}' was not found. No additional dependencies were installed.")
            return

        package_dir = component["env_dir"] / PYTHON_PACKAGES
        package_zip = component["env_dir"] / PYTHON_PACKAGES_ZIP

        _logger.info(f"Extracting {PYTHON_PACKAGES_ZIP}")
        if package_zip.is_file():
            with zipfile.ZipFile(package_zip) as zf:
                zf.extractall(path=package_dir.absolute())
        else:
            _logger.info(f"There is no {PYTHON_PACKAGES_ZIP} to extract.")
            package_dir.mkdir(parents=True, exist_ok=True)

        success = self._install_from_packages_zip(component, package_dir)
        if not success:
            msg = f"Warning! Could not install dependencies from {PYTHON_PACKAGES_ZIP}. "
            msg += "Trying to install them from pypi.org. The resulting Python environment "
            msg += "may be significantly different than the targeted Python environment on the Edge Device!"
            _logger.warning(msg)

            if self.docker.is_vessel and component["runtime"]["type"] == "python":
                raise RuntimeError("The component is running in a docker container. The installation of dependencies from pypi.org is not supported.")
            else:
                second_install_result = self._install_from_pypi_org(component, package_dir)
                if 0 != second_install_result.returncode:
                    self.cleanup = False
                    raise RuntimeError(f"Error installing requirements:\n{second_install_result.stderr}")

    @staticmethod
    def _install_logmodule(python_path, env_dir):
        _logger.info("Installing LogModule...")
        try:
            package_paths = importlib_metadata.files("simaticai")
            assert package_paths is not None
            logger_wheel = [p for p in package_paths if 'log_module' in str(p)][0].locate()
        except Exception:
            from importlib.metadata import Distribution
            direct_url = Distribution.from_name("simaticai").read_text("direct_url.json")
            assert direct_url is not None
            direct_url = json.loads(direct_url)['url']
            direct_url = direct_url.replace('file://','')
            direct_url = Path(direct_url) / 'simaticai' / 'data'
            paths = list(direct_url.rglob('*.whl'))
            logger_wheel = [p for p in paths if 'log_module' in str(p)][0].resolve()

        cmd = [
            str(python_path), "-m", "pip",
            "install",
            "--no-warn-script-location", logger_wheel, "joblib"
        ]
        return subprocess.run(cmd, cwd=env_dir, text=True, stderr=subprocess.PIPE)

    def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
        """
        Runs the component in its virtual environment with the given input.
        This environment is created according to `requirements.txt` in the package.
        Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
        or a DataStream object which will produce the appropriate input data.
        The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.

        Args:
            name (str): The name of the component to be executed.
            data (dict or list): One or more input records for the component.

        Returns:
            dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
            If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

        """
        assert name in self.components, f"Invalid component name: {name}"
        component = self.components[name]

        assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"

        input_payload_path: Path  = component["env_dir"] / "input.joblib"
        output_payload_path: Path = component["env_dir"] / "output.joblib"

        batch_input: bool  = component["batch"]["inputBatch"]  == "Yes" if component.get("batch") is not None else False
        batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False

        # Validate and serialize input payload
        assert data is not None, f"Can not run component '{name}' without input."
        result_is_list = True
        if isinstance(data, list):
            input_payload = data
        elif isinstance(data, DataStream):
            input_payload = [item for item in data]
        else:
            result_is_list = False
            input_payload = [data]
        validate_payload(input_payload, component["inputType"], batch_input)
        joblib.dump(input_payload, input_payload_path)
        self.report_writer.set_input_payload_length(name, len(input_payload))
        _logger.info(f"Input payload saved as '{input_payload_path}'")

        # Assemble command for runnig component
        if component['runtime']['type'] == 'python':
            # Version check for Python
            e_major, e_minor, _, _, _ = sys.version_info
            c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))

            if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
                msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
                msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
                msg += " Pipeline behavior on AI Inference Server might be different."
                _logger.warning(msg)

            if component["context"] is None:
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_component.py', component["env_dir"])

            req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
            json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
            args = [
                "-m", 'run_component',
                "-m", Path(component["entrypoint"]).stem,
                "-p", f"{json_params}",
                "-r", f"{req_list_path}",
            ]

        else:
            # gpuruntime step requires Python environment with onnxruntime installed
            if component["context"] is None:
                shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
                shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])

            args = [
                "-m", 'run_gpuruntime_component',
                "-m", component["entrypoint"],
                "-c", "config.pbtxt",
            ]

        args += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self.log_level)
        ]

        # Run the component in the created Python environment
        _logger.info(f"Running component '{name}'...")
        _logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")

        cmd = [str(component['python_path'])] + args
        _logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
        p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        if p.stdout is not None:
            for line in p.stdout:
                _logger.info(line.strip())
        returncode = p.wait()
        if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
            self.cleanup = False
            raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
        else:
            response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False

        # Deserialize and validate output payload
        _logger.info(f"Loading output payload from '{output_payload_path}'")
        output_payload = joblib.load(output_payload_path)
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        self.report_writer.set_output_payload_length(name, len(output_payload))
        if response_wrapped:
            output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
        else:
            output_payload = [output for output in output_payload if output is not None]
        validate_payload(output_payload, component["outputType"], batch_output)

        if result_is_list:
            return output_payload
        if 0 < len(output_payload):
            return output_payload[0]
        return None

    def update_parameters(self, parameters: dict):
        """
        Validates and updates pipeline parameters.
        The elements of the dictionary must match the parameters specified in the pipeline configuration package.
        If any of the names or types does not match, all parameters will remain untouched.

        Args:

            parameters (dict): names and values of parameters to update

        Raises:

            AssertionError:
                When:
                - either `name` is not in the configured parameters,
                - or `defaultValue` type is different from the configured one
        """
        for key,value in parameters.items():
            if key not in self.parameters.keys():
                if key.upper().startswith("__AI_IS"):
                    self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
                else:
                    raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
            elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
                raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")

        for key, value in parameters.items():
            self.parameters[key]["defaultValue"] = value

    def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
        """
        Runs all the components sequentially, assuming the output of a component is only consumed by the next.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
        For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
        and the output is saved as `output.joblib`.

        Args:
            payload (dict or list): One or more input records for the pipeline.

        Returns:
            The output of the last component.
        """

        for name in self.components.keys():
            payload = self.run_component(name, payload)

        return payload

__init__(packageZip, path=None, cleanup=None, loglevel=logging.INFO)

Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

Parameters:

NameTypeDescriptionDefault
packageZippath - like

Path to the pipeline configuration package.

required
pathpath - like

Path to the working directory. If unset a temporary directory will be created.

None
cleanupbool

If set, the working directory will be kept when True, and deleted when False. If unset, a temporary working directory will be removed, and an explicit working directory will be kept. When an error occurs in a component, the working directory will be kept regardless of this value.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
    """
    Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

    Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

    Args:
        packageZip (path-like): Path to the pipeline configuration package.
        path (path-like): Path to the working directory. If unset a temporary directory will be created.
        cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
            If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
            When an error occurs in a component, the working directory will be kept regardless of this value.
    """
    self.package_zip: Path = Path(packageZip)
    self.path = Path(path).resolve().absolute() if path is not None else None
    self.components = {}
    self.parameters = {}
    self.cleanup = cleanup
    self.log_level = loglevel
    self.workdir: Path
    self.docker = VesselBaseDocker()

    self.report_writer = PipelineRunnerReportWriter()
    report_writer_handler = ReportWriterHandler(self.report_writer)
    _logger.addHandler(report_writer_handler)

collect_telemetry_data()

Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

Returns:

NameTypeDescription
dictdict

A dictionary containing the telemetry data.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def collect_telemetry_data(self) -> dict:
    """
    Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

    Returns:
        dict: A dictionary containing the telemetry data.
    """
    telemetry_data = {}

    telemetry_data["platform"] = {}
    telemetry_data["platform"]["os"] = platform.system()
    telemetry_data["platform"]["release"] = platform.release()
    telemetry_data["platform"]["python_version"] = platform.python_version()

    _logger.info(f"locals: {locals()}")
    telemetry_data["environment"] = {}
    env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
    env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
    env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
    env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
    telemetry_data["environment"]["info"] = env_info

    telemetry_data["industrial_ai"] = {}
    try:
        telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
    except importlib_metadata.PackageNotFoundError:
        telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND

    telemetry_data["pipeline"] = {}
    telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
                                                             if self.components[component]['runtime']['type'] == 'python'))
    telemetry_data["pipeline"]["file_extensions"] = []
    for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
        excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
        suffixes = list(set(f.suffix for f in component_dir.rglob("*")
                            if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
        for suffix in suffixes:
            if suffix not in telemetry_data["pipeline"]["file_extensions"]:
                telemetry_data["pipeline"]["file_extensions"].append(suffix)

    return telemetry_data

update_telemetry_data()

Update the telemetry data and the package.

This method updates the telemetry data by loading the existing data from a YAML file, or collecting new telemetry data if the file doesn't exist. It then updates the "last_test_run" field of the telemetry data with the current timestamp. The updated telemetry data is then written back to the YAML file.

If the package contains a different version of the telemetry data file, a new package is created with the updated telemetry data. Otherwise, the existing package is overwritten with the new package containing the updated telemetry data.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_telemetry_data(self):
    """
    Update the telemetry data and the package.

    This method updates the telemetry data by loading the existing data from a YAML file,
    or collecting new telemetry data if the file doesn't exist. It then updates the
    "last_test_run" field of the telemetry data with the current timestamp. The updated
    telemetry data is then written back to the YAML file.

    If the package contains a different version of the telemetry data file, a new package
    is created with the updated telemetry data. Otherwise, the existing package is
    overwritten with the new package containing the updated telemetry data.
    """
    _logger.info("Updating telemetry data and the package")
    telemetry_path = self.workdir / "telemetry_data.yml"
    if telemetry_path.is_file():
        telemetry_data = yaml.safe_load(telemetry_path.read_text())
    else:
        telemetry_data = self.collect_telemetry_data()

    telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
    telemetry_path.write_text(yaml.dump(telemetry_data))

    config_package = False
    with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
        for file in zip_read.namelist():
            if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
                config_package = True
                break

    new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"

    with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
        with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
            for file in zip_read.namelist():
                if TELEMETRY_YAML not in file:
                    filepath = zip_read.extract(file, path=self.workdir)
                    zip_write.write(filepath, arcname=file)
                else:
                    zip_write.write(telemetry_path, arcname=file)

    if config_package:
        shutil.copy(new_zip_path, self.package_zip)
    else:
        new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
        new_sha_path.write_text(calc_sha(new_zip_path))

run_component(name, data)

Runs the component in its virtual environment with the given input. This environment is created according to requirements.txt in the package. Additionally 'joblib' and the mock log_module is automatically installed in this virtual environment. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries, or a DataStream object which will produce the appropriate input data. The supplied input data is saved as inputs.joblib in the component runtime directory, and the output is saved as output.joblib.

Parameters:

NameTypeDescriptionDefault
namestr

The name of the component to be executed.

required
datadict or list

One or more input records for the component.

required

Returns:

TypeDescription
Optional[Union[dict, list]]

dict / list: A list of dictionaries for outputs if there were no errors and field ready is true.

Optional[Union[dict, list]]

If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
    def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
        """
        Runs the component in its virtual environment with the given input.
        This environment is created according to `requirements.txt` in the package.
        Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
        or a DataStream object which will produce the appropriate input data.
        The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.

        Args:
            name (str): The name of the component to be executed.
            data (dict or list): One or more input records for the component.

        Returns:
            dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
            If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

        """
        assert name in self.components, f"Invalid component name: {name}"
        component = self.components[name]

        assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"

        input_payload_path: Path  = component["env_dir"] / "input.joblib"
        output_payload_path: Path = component["env_dir"] / "output.joblib"

        batch_input: bool  = component["batch"]["inputBatch"]  == "Yes" if component.get("batch") is not None else False
        batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False

        # Validate and serialize input payload
        assert data is not None, f"Can not run component '{name}' without input."
        result_is_list = True
        if isinstance(data, list):
            input_payload = data
        elif isinstance(data, DataStream):
            input_payload = [item for item in data]
        else:
            result_is_list = False
            input_payload = [data]
        validate_payload(input_payload, component["inputType"], batch_input)
        joblib.dump(input_payload, input_payload_path)
        self.report_writer.set_input_payload_length(name, len(input_payload))
        _logger.info(f"Input payload saved as '{input_payload_path}'")

        # Assemble command for runnig component
        if component['runtime']['type'] == 'python':
            # Version check for Python
            e_major, e_minor, _, _, _ = sys.version_info
            c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))

            if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
                msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
                msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
                msg += " Pipeline behavior on AI Inference Server might be different."
                _logger.warning(msg)

            if component["context"] is None:
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_component.py', component["env_dir"])

            req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
            json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
            args = [
                "-m", 'run_component',
                "-m", Path(component["entrypoint"]).stem,
                "-p", f"{json_params}",
                "-r", f"{req_list_path}",
            ]

        else:
            # gpuruntime step requires Python environment with onnxruntime installed
            if component["context"] is None:
                shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
                shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])

            args = [
                "-m", 'run_gpuruntime_component',
                "-m", component["entrypoint"],
                "-c", "config.pbtxt",
            ]

        args += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self.log_level)
        ]

        # Run the component in the created Python environment
        _logger.info(f"Running component '{name}'...")
        _logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")

        cmd = [str(component['python_path'])] + args
        _logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
        p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        if p.stdout is not None:
            for line in p.stdout:
                _logger.info(line.strip())
        returncode = p.wait()
        if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
            self.cleanup = False
            raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
        else:
            response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False

        # Deserialize and validate output payload
        _logger.info(f"Loading output payload from '{output_payload_path}'")
        output_payload = joblib.load(output_payload_path)
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        self.report_writer.set_output_payload_length(name, len(output_payload))
        if response_wrapped:
            output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
        else:
            output_payload = [output for output in output_payload if output is not None]
        validate_payload(output_payload, component["outputType"], batch_output)

        if result_is_list:
            return output_payload
        if 0 < len(output_payload):
            return output_payload[0]
        return None

update_parameters(parameters)

Validates and updates pipeline parameters. The elements of the dictionary must match the parameters specified in the pipeline configuration package. If any of the names or types does not match, all parameters will remain untouched.

Args:

parameters (dict): names and values of parameters to update

Raises:

AssertionError:
    When:
    - either `name` is not in the configured parameters,
    - or `defaultValue` type is different from the configured one
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_parameters(self, parameters: dict):
    """
    Validates and updates pipeline parameters.
    The elements of the dictionary must match the parameters specified in the pipeline configuration package.
    If any of the names or types does not match, all parameters will remain untouched.

    Args:

        parameters (dict): names and values of parameters to update

    Raises:

        AssertionError:
            When:
            - either `name` is not in the configured parameters,
            - or `defaultValue` type is different from the configured one
    """
    for key,value in parameters.items():
        if key not in self.parameters.keys():
            if key.upper().startswith("__AI_IS"):
                self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
            else:
                raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
        elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
            raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")

    for key, value in parameters.items():
        self.parameters[key]["defaultValue"] = value

run_pipeline(payload={})

Runs all the components sequentially, assuming the output of a component is only consumed by the next. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries. For each component the supplied input data is saved as input.joblib in the component runtime directory, and the output is saved as output.joblib.

Parameters:

NameTypeDescriptionDefault
payloaddict or list

One or more input records for the pipeline.

{}

Returns:

TypeDescription
Optional[Union[dict, list]]

The output of the last component.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
    """
    Runs all the components sequentially, assuming the output of a component is only consumed by the next.
    The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
    For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
    and the output is saved as `output.joblib`.

    Args:
        payload (dict or list): One or more input records for the pipeline.

    Returns:
        The output of the last component.
    """

    for name in self.components.keys():
        payload = self.run_component(name, payload)

    return payload

ComponentRunner

Class to run a Pipeline Component in a virtual environment. Supported Component Types: - PythonComponent - GPURuntimeComponent

Args: component: PythonComponent or GPURuntimeComponent workdir: Path to the directory where the component should be run. If None, the current working directory is used. cleanup: If True, the workdir is deleted after the context manager exits.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
class ComponentRunner():
    """
    Class to run a Pipeline Component in a virtual environment.
    Supported Component Types:
    - PythonComponent
    - GPURuntimeComponent

    Args:
    component: PythonComponent or GPURuntimeComponent
    workdir: Path to the directory where the component should be run. If None, the current working directory is used.
    cleanup: If True, the workdir is deleted after the context manager exits.

    """
    def __init__(self, component, workdir=None, cleanup=False):
        self.component = component
        self.parameters = { '__AI_IS_IMAGE_SET_VISUALIZATION': False }
        self.cleanup = cleanup

        self._logger = self._set_logger()

        self._create_workdir(component, workdir)
        self._copy_resources()
        self._create_venv()
        self._install_requirements()

    def __enter__(self):
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        if exception_type is not None:
            self._logger.error(f"Exception occurred: {exception_type.__name__}: {exception_value}")
            return False
        if self.cleanup:
            shutil.rmtree(self.workdir)

    def _set_logger(self):
        """
        Set the logger for the ComponentRunner.
        """
        logger = logging.getLogger(__name__)
        log_level = os.environ.get("loglevel", "INFO").upper()
        logger.setLevel(log_level)
        handler = logging.StreamHandler()
        handler.setLevel(log_level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger

    def _create_workdir(self, component, workdir):
        """
        Create a workdir for the component.
        """
        if workdir is None:
            self.workdir = Path.cwd() / component.name
        else:
            self.workdir = Path(workdir).resolve().absolute() / component.name

        self.workdir.mkdir(parents=True, exist_ok=True)
        self._logger.info(f"Created workdir: {self.workdir}")

    def _copy_resources(self):
        """
        Copy the required resources of the component to the workdir.
        In case of a PythonComponent, also creates the dependencies into requirements.txt.
        In case of a GPURuntimeComponent, also creates a requirements.txt with the required onnx and onnxruntime dependencies.
        Also adds the runner script to the workdir.
        In case of the source folder same as the workdir, the resources are not copied,
        so any changes in source will affect the workdir.
        """
        if isinstance(self.component, PythonComponent):
            same_resources = []
            for from_path in self.component.resources.keys():
                try:
                    to_path = self.workdir / self.component.resources[from_path]
                    to_path.mkdir(parents=True, exist_ok=True)
                    shutil.copy(from_path, to_path)
                except shutil.SameFileError:
                    same_resources.append(from_path.name)
            if same_resources:
                self._logger.info("Resources are already in workdir, which are not copied:")
                for resource in same_resources:
                    self._logger.debug(f"  - {resource}")

            # download manually added Python packages and requirements.txt
            self.component.python_dependencies.save(self.workdir)
            shutil.copy(PYTHON_RUNNER_PATH, self.workdir)
        else:
            model_dir = Path(self.workdir / "1")
            model_dir.mkdir(parents=True, exist_ok=True)
            shutil.copy(self.component.model_path, model_dir / "model.onnx")
            Path(self.workdir / "config.pbtxt").write_text(f"{self.component.auto_config}")

            shutil.copy(GPU_RUNNER_PATH, self.workdir / "run_component.py")
            shutil.copy(GPU_REQUIREMENTS_PATH, self.workdir / "requirements.txt")
            shutil.copy(GPU_CONFIG_PATH, self.workdir)

        self._logger.info("Resources are copied into the workdir.")

    def _create_venv(self):
        """
        Create a Python virtual environment in the workdir.
        The created virtual environment is stored in the context_dir and
        has the same version as the current Python interpreter.
        """
        context_dir = self.workdir / ".venv"
        builder = venv.EnvBuilder(with_pip=True, symlinks=False)
        builder.create(context_dir)

        self.context = builder.ensure_directories(context_dir)
        self.python_path = Path(self.context.env_exe).resolve()

        self._logger.info(f"Python virtualenv created in folder '{self.context.env_dir}'")

    def _install_requirements(self):
        """
        Installs the required Python dependencies.
        """
        cmd = [str(self.python_path), '-m', 'pip', 'install', 'joblib', '-r', 'requirements.txt']
        result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
        if result.returncode != 0:
            raise RuntimeError(result.stderr.decode())

        result_lines = result.stdout.decode().split("\n")
        installed_message = " ".join([line for line in result_lines if "Successfully installed" in line])
        self._logger.info(installed_message)

    def set_parameters(self, parameter_name, parameter_value):
        """
        Set the parameters for the component.
        """
        self.parameters[parameter_name] = parameter_value

    def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
        """
        Run the component with the input payload.

        Parameters:
        input_payload: Input payload for the component.

        Returns:
        Output payload from the component.

        Side Effects:
        - The input payload is saved in the workdir.
        - The output payload is loaded from the workdir.
        """
        input_payload_path: Path  = self.workdir / "input.joblib"
        output_payload_path: Path = self.workdir / "output.joblib"

        batch_input: bool  = self.component.batch.inputBatch
        if isinstance(input_payload, DataStream):
            input_payload = [item for item in input_payload]
        elif isinstance(input_payload, dict):
            input_payload = [input_payload]

        input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
        self._validate_payload(input_payload, input_variables, batch_input)

        joblib.dump(input_payload, input_payload_path)

        cmd = self._create_command(input_payload_path, output_payload_path)

        self._logger.info("Running command: " + " ".join(cmd))

        result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
        if result.returncode != 0:
            raise RuntimeError(result.stderr.decode())

        self._logger.debug(result.stderr.decode())
        output_payload = joblib.load(output_payload_path)

        batch_output: bool = self.component.batch.outputBatch
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        output_payload = [output for output in output_payload if output is not None]
        output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
        if isinstance(self.component, PythonComponent):
            output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
        self._validate_payload(output_payload, output_variables, batch_output)

        return output_payload

    def _check_instance(self, element, element_name: str, instance):
        if not isinstance(element, instance):
            self._logger.error(f"{element_name} must be an instance of {instance.__name__}")
            raise ValueError(f"{element_name} must be an instance of {instance.__name__}")

    def _check_variable_types(self, variable: dict, value):
        if variable["type"] == "String":
            self._check_instance(value, "String value", str)

        if variable["type"] == "StringArray":
            self._check_instance(value, "StringArray value", list)
            for i in value:
                self._check_instance(i, "StringArray item", str)

        if variable["type"] == "Object":
            self._check_instance(value, "Object value", dict)
            values = list(value.values())
            if len(values) != 2:
                self._logger.error("Object value must have exactly 2 items")
                raise ValueError("Object value must have exactly 2 items")
            ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
            if not ok:
                self._logger.error("Object value must have exactly one 'str' and one 'bytes' field")
                raise ValueError("Object value must have exactly one 'str' and one 'bytes' field")

    def _validate_payload(self, payload: list, variables: list, batch: bool):
        self._check_instance(payload, "Payload", list)
        for payload_element in payload:
            if batch:
                self._check_instance(payload_element, "Batch payload element", list)
            else:
                payload_element = [payload_element]

            for item in payload_element:
                self._validate_payload_item(item, variables)

    def _validate_payload_item(self, item: dict, variables: list):
        self._check_instance(item, "Payload item", dict)
        for variable in variables:
            name = variable["name"]
            value = item.get(name, None)
            if value is None:
                self._logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
                continue

            self._check_variable_types(variable, value)

        payload_names = set(item.keys())
        variable_names = { variable["name"] for variable in variables }
        variable_names.add('timestamp')
        extra_variables = payload_names - variable_names
        if len(extra_variables):
            self._logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables}")

    def _create_command(self, input_payload_path, output_payload_path):
        """
        Create the command to run the component.

        """
        cmd = [str(self.python_path), "-m", 'run_component']
        if isinstance(self.component, PythonComponent):
            cmd += [
                "-m", Path(self.component.entrypoint).stem,
                "-p", json.dumps(self.parameters)
            ]
        if isinstance(self.component, GPURuntimeComponent):
            model_path = Path(self.workdir / "1" / "model.onnx").absolute().resolve()
            config_path = Path(self.workdir / "config.pbtxt").absolute().resolve()
            cmd += [
                "-m", str(model_path),
                "-c", str(config_path),
            ]

        cmd += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self._logger.getEffectiveLevel())
        ]
        return cmd

set_parameters(parameter_name, parameter_value)

Set the parameters for the component.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
def set_parameters(self, parameter_name, parameter_value):
    """
    Set the parameters for the component.
    """
    self.parameters[parameter_name] = parameter_value

run(input_payload)

Run the component with the input payload.

Parameters: input_payload: Input payload for the component.

Returns: Output payload from the component.

Side Effects: - The input payload is saved in the workdir. - The output payload is loaded from the workdir.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
    """
    Run the component with the input payload.

    Parameters:
    input_payload: Input payload for the component.

    Returns:
    Output payload from the component.

    Side Effects:
    - The input payload is saved in the workdir.
    - The output payload is loaded from the workdir.
    """
    input_payload_path: Path  = self.workdir / "input.joblib"
    output_payload_path: Path = self.workdir / "output.joblib"

    batch_input: bool  = self.component.batch.inputBatch
    if isinstance(input_payload, DataStream):
        input_payload = [item for item in input_payload]
    elif isinstance(input_payload, dict):
        input_payload = [input_payload]

    input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
    self._validate_payload(input_payload, input_variables, batch_input)

    joblib.dump(input_payload, input_payload_path)

    cmd = self._create_command(input_payload_path, output_payload_path)

    self._logger.info("Running command: " + " ".join(cmd))

    result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
    if result.returncode != 0:
        raise RuntimeError(result.stderr.decode())

    self._logger.debug(result.stderr.decode())
    output_payload = joblib.load(output_payload_path)

    batch_output: bool = self.component.batch.outputBatch
    output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
    output_payload = [output for output in output_payload if output is not None]
    output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
    if isinstance(self.component, PythonComponent):
        output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
    self._validate_payload(output_payload, output_variables, batch_output)

    return output_payload

ComponentRunner

Class to run a Pipeline Component in a virtual environment. Supported Component Types: - PythonComponent - GPURuntimeComponent

Args: component: PythonComponent or GPURuntimeComponent workdir: Path to the directory where the component should be run. If None, the current working directory is used. cleanup: If True, the workdir is deleted after the context manager exits.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
class ComponentRunner():
    """
    Class to run a Pipeline Component in a virtual environment.
    Supported Component Types:
    - PythonComponent
    - GPURuntimeComponent

    Args:
    component: PythonComponent or GPURuntimeComponent
    workdir: Path to the directory where the component should be run. If None, the current working directory is used.
    cleanup: If True, the workdir is deleted after the context manager exits.

    """
    def __init__(self, component, workdir=None, cleanup=False):
        self.component = component
        self.parameters = { '__AI_IS_IMAGE_SET_VISUALIZATION': False }
        self.cleanup = cleanup

        self._logger = self._set_logger()

        self._create_workdir(component, workdir)
        self._copy_resources()
        self._create_venv()
        self._install_requirements()

    def __enter__(self):
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        if exception_type is not None:
            self._logger.error(f"Exception occurred: {exception_type.__name__}: {exception_value}")
            return False
        if self.cleanup:
            shutil.rmtree(self.workdir)

    def _set_logger(self):
        """
        Set the logger for the ComponentRunner.
        """
        logger = logging.getLogger(__name__)
        log_level = os.environ.get("loglevel", "INFO").upper()
        logger.setLevel(log_level)
        handler = logging.StreamHandler()
        handler.setLevel(log_level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger

    def _create_workdir(self, component, workdir):
        """
        Create a workdir for the component.
        """
        if workdir is None:
            self.workdir = Path.cwd() / component.name
        else:
            self.workdir = Path(workdir).resolve().absolute() / component.name

        self.workdir.mkdir(parents=True, exist_ok=True)
        self._logger.info(f"Created workdir: {self.workdir}")

    def _copy_resources(self):
        """
        Copy the required resources of the component to the workdir.
        In case of a PythonComponent, also creates the dependencies into requirements.txt.
        In case of a GPURuntimeComponent, also creates a requirements.txt with the required onnx and onnxruntime dependencies.
        Also adds the runner script to the workdir.
        In case of the source folder same as the workdir, the resources are not copied,
        so any changes in source will affect the workdir.
        """
        if isinstance(self.component, PythonComponent):
            same_resources = []
            for from_path in self.component.resources.keys():
                try:
                    to_path = self.workdir / self.component.resources[from_path]
                    to_path.mkdir(parents=True, exist_ok=True)
                    shutil.copy(from_path, to_path)
                except shutil.SameFileError:
                    same_resources.append(from_path.name)
            if same_resources:
                self._logger.info("Resources are already in workdir, which are not copied:")
                for resource in same_resources:
                    self._logger.debug(f"  - {resource}")

            # download manually added Python packages and requirements.txt
            self.component.python_dependencies.save(self.workdir)
            shutil.copy(PYTHON_RUNNER_PATH, self.workdir)
        else:
            model_dir = Path(self.workdir / "1")
            model_dir.mkdir(parents=True, exist_ok=True)
            shutil.copy(self.component.model_path, model_dir / "model.onnx")
            Path(self.workdir / "config.pbtxt").write_text(f"{self.component.auto_config}")

            shutil.copy(GPU_RUNNER_PATH, self.workdir / "run_component.py")
            shutil.copy(GPU_REQUIREMENTS_PATH, self.workdir / "requirements.txt")
            shutil.copy(GPU_CONFIG_PATH, self.workdir)

        self._logger.info("Resources are copied into the workdir.")

    def _create_venv(self):
        """
        Create a Python virtual environment in the workdir.
        The created virtual environment is stored in the context_dir and
        has the same version as the current Python interpreter.
        """
        context_dir = self.workdir / ".venv"
        builder = venv.EnvBuilder(with_pip=True, symlinks=False)
        builder.create(context_dir)

        self.context = builder.ensure_directories(context_dir)
        self.python_path = Path(self.context.env_exe).resolve()

        self._logger.info(f"Python virtualenv created in folder '{self.context.env_dir}'")

    def _install_requirements(self):
        """
        Installs the required Python dependencies.
        """
        cmd = [str(self.python_path), '-m', 'pip', 'install', 'joblib', '-r', 'requirements.txt']
        result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
        if result.returncode != 0:
            raise RuntimeError(result.stderr.decode())

        result_lines = result.stdout.decode().split("\n")
        installed_message = " ".join([line for line in result_lines if "Successfully installed" in line])
        self._logger.info(installed_message)

    def set_parameters(self, parameter_name, parameter_value):
        """
        Set the parameters for the component.
        """
        self.parameters[parameter_name] = parameter_value

    def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
        """
        Run the component with the input payload.

        Parameters:
        input_payload: Input payload for the component.

        Returns:
        Output payload from the component.

        Side Effects:
        - The input payload is saved in the workdir.
        - The output payload is loaded from the workdir.
        """
        input_payload_path: Path  = self.workdir / "input.joblib"
        output_payload_path: Path = self.workdir / "output.joblib"

        batch_input: bool  = self.component.batch.inputBatch
        if isinstance(input_payload, DataStream):
            input_payload = [item for item in input_payload]
        elif isinstance(input_payload, dict):
            input_payload = [input_payload]

        input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
        self._validate_payload(input_payload, input_variables, batch_input)

        joblib.dump(input_payload, input_payload_path)

        cmd = self._create_command(input_payload_path, output_payload_path)

        self._logger.info("Running command: " + " ".join(cmd))

        result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
        if result.returncode != 0:
            raise RuntimeError(result.stderr.decode())

        self._logger.debug(result.stderr.decode())
        output_payload = joblib.load(output_payload_path)

        batch_output: bool = self.component.batch.outputBatch
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        output_payload = [output for output in output_payload if output is not None]
        output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
        if isinstance(self.component, PythonComponent):
            output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
        self._validate_payload(output_payload, output_variables, batch_output)

        return output_payload

    def _check_instance(self, element, element_name: str, instance):
        if not isinstance(element, instance):
            self._logger.error(f"{element_name} must be an instance of {instance.__name__}")
            raise ValueError(f"{element_name} must be an instance of {instance.__name__}")

    def _check_variable_types(self, variable: dict, value):
        if variable["type"] == "String":
            self._check_instance(value, "String value", str)

        if variable["type"] == "StringArray":
            self._check_instance(value, "StringArray value", list)
            for i in value:
                self._check_instance(i, "StringArray item", str)

        if variable["type"] == "Object":
            self._check_instance(value, "Object value", dict)
            values = list(value.values())
            if len(values) != 2:
                self._logger.error("Object value must have exactly 2 items")
                raise ValueError("Object value must have exactly 2 items")
            ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
            if not ok:
                self._logger.error("Object value must have exactly one 'str' and one 'bytes' field")
                raise ValueError("Object value must have exactly one 'str' and one 'bytes' field")

    def _validate_payload(self, payload: list, variables: list, batch: bool):
        self._check_instance(payload, "Payload", list)
        for payload_element in payload:
            if batch:
                self._check_instance(payload_element, "Batch payload element", list)
            else:
                payload_element = [payload_element]

            for item in payload_element:
                self._validate_payload_item(item, variables)

    def _validate_payload_item(self, item: dict, variables: list):
        self._check_instance(item, "Payload item", dict)
        for variable in variables:
            name = variable["name"]
            value = item.get(name, None)
            if value is None:
                self._logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
                continue

            self._check_variable_types(variable, value)

        payload_names = set(item.keys())
        variable_names = { variable["name"] for variable in variables }
        variable_names.add('timestamp')
        extra_variables = payload_names - variable_names
        if len(extra_variables):
            self._logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables}")

    def _create_command(self, input_payload_path, output_payload_path):
        """
        Create the command to run the component.

        """
        cmd = [str(self.python_path), "-m", 'run_component']
        if isinstance(self.component, PythonComponent):
            cmd += [
                "-m", Path(self.component.entrypoint).stem,
                "-p", json.dumps(self.parameters)
            ]
        if isinstance(self.component, GPURuntimeComponent):
            model_path = Path(self.workdir / "1" / "model.onnx").absolute().resolve()
            config_path = Path(self.workdir / "config.pbtxt").absolute().resolve()
            cmd += [
                "-m", str(model_path),
                "-c", str(config_path),
            ]

        cmd += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self._logger.getEffectiveLevel())
        ]
        return cmd

set_parameters(parameter_name, parameter_value)

Set the parameters for the component.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
def set_parameters(self, parameter_name, parameter_value):
    """
    Set the parameters for the component.
    """
    self.parameters[parameter_name] = parameter_value

run(input_payload)

Run the component with the input payload.

Parameters: input_payload: Input payload for the component.

Returns: Output payload from the component.

Side Effects: - The input payload is saved in the workdir. - The output payload is loaded from the workdir.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.py
def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
    """
    Run the component with the input payload.

    Parameters:
    input_payload: Input payload for the component.

    Returns:
    Output payload from the component.

    Side Effects:
    - The input payload is saved in the workdir.
    - The output payload is loaded from the workdir.
    """
    input_payload_path: Path  = self.workdir / "input.joblib"
    output_payload_path: Path = self.workdir / "output.joblib"

    batch_input: bool  = self.component.batch.inputBatch
    if isinstance(input_payload, DataStream):
        input_payload = [item for item in input_payload]
    elif isinstance(input_payload, dict):
        input_payload = [input_payload]

    input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
    self._validate_payload(input_payload, input_variables, batch_input)

    joblib.dump(input_payload, input_payload_path)

    cmd = self._create_command(input_payload_path, output_payload_path)

    self._logger.info("Running command: " + " ".join(cmd))

    result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
    if result.returncode != 0:
        raise RuntimeError(result.stderr.decode())

    self._logger.debug(result.stderr.decode())
    output_payload = joblib.load(output_payload_path)

    batch_output: bool = self.component.batch.outputBatch
    output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
    output_payload = [output for output in output_payload if output is not None]
    output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
    if isinstance(self.component, PythonComponent):
        output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
    self._validate_payload(output_payload, output_variables, batch_output)

    return output_payload

DataStream

Base class for datastream generators

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/data_stream.py
class DataStream:
    """
    Base class for datastream generators
    """

    def __iter__(self):
        """
        Empty generator method for child classess to implement.
        """
        raise NotImplementedError('Child classes must implement this method.')

__iter__()

Empty generator method for child classess to implement.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/data_stream.py
def __iter__(self):
    """
    Empty generator method for child classess to implement.
    """
    raise NotImplementedError('Child classes must implement this method.')

This module provides a base class for creating and managing Docker containers with Python virtual environments. It is designed to be used in a context where Docker is available and the user has the necessary permissions to create and manage containers.

VesselBaseDocker

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/docker_venv.py
class VesselBaseDocker:
    is_vessel: bool = False  # True when running in a Docker container
    python_3_11: str = 'python'
    python_3_12: str = 'python'

    def __init__(self):
        if Path("/.ai_sdk_docker").is_file():
            self.is_vessel = True
            self.python_default = 'python3.11'
            self.python_3_11 = 'python3.11'
            self.python_3_12 = 'python3.12'

    def _create_venv(self, path: str, version: str):
        """
        Creates a virtual environment in which the given component can run.

        Args:
            path (str): Path to the virtual environment.
            version (str): Python version to use for the virtual environment.
        """

        context_dir = Path(path) / ".venv"
        python_path = context_dir / "bin" / "python"

        match version:
            case "3.11":
                python = self.python_3_11
            case "3.12":
                python = self.python_3_12
            case _:
                python = self.python_default
        _logger.info(f"Creating virtual environment for Python '{version}' in '{path}/.venv' with Python {python}...")

        command = [
            python,
            "-m", "venv", str(context_dir),
            "--copies"
        ]
        result = subprocess.run(command, capture_output=True, text=True)
        if result.returncode != 0:
            _logger.error(f"Error creating virtual environment: {result.stderr}")
            raise RuntimeError(f"Error creating virtual environment: {result.stderr}")

        return context_dir.resolve(), python_path.resolve()

A pipeline runner that lets you simulate the execution of a pipeline in a local Python environment.

It can be used to locally mimic the behavior of the AI Inference Server concerning loading and running inference pipelines. This is a quick and easy way to find programming or configuration errors before deploying the package. The local pipeline runner also lets you exercise your pipeline component by component. In other words, you can feed single components with inputs and verify the output produced.

LocalPipelineRunner

Simulates the execution of a pipeline in a local Python environment.

Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component, each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.

If the caller specifies no path, the working directory is temporary and is removed unless an error occurs. If the caller specifies a working directory with a path argument, the working directory is kept. This behavior can be overridden using boolean parameter cleanup.

Currently, the pipeline runner supports both the current process_input(data: dict) entrypoint signature and the legacy run(data: str) signature. If both entrypoints are present, process_input() takes precedence. Please note however that run() is deprecated, and support for it will be removed in future versions of the pipeline runner.

Parameters:

NameTypeDescriptionDefault
packageZippath - like

Path to the pipeline configuration package.

required
pathpath - like

Path to the working directory. If unset, a temporary directory is created.

None
cleanupbool

If set, the working directory is kept when True, and deleted when False. If unset, a temporary working directory is removed, and an explicit working directory is kept. When an error occurs in a component, the working directory is kept regardless of this value.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
class LocalPipelineRunner:
    """
    Simulates the execution of a pipeline in a local Python environment.

    Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component,
    each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.

    If the caller specifies no `path`, the working directory is temporary and is removed unless an error occurs.
    If the caller specifies a working directory with a `path` argument, the working directory is kept.
    This behavior can be overridden using boolean parameter `cleanup`.

    Currently, the pipeline runner supports both the current `process_input(data: dict)` entrypoint signature and the legacy
    `run(data: str)` signature. If both entrypoints are present, `process_input()` takes precedence. Please note however that
    `run()` is deprecated, and support for it will be removed in future versions of the pipeline runner.

    Args:
        packageZip (path-like): Path to the pipeline configuration package.
        path (path-like): Path to the working directory. If unset, a temporary directory is created.
        cleanup (bool): If set, the working directory is kept when True, and deleted when False. \
            If unset, a temporary working directory is removed, and an explicit working directory is kept. \
            When an error occurs in a component, the working directory is kept regardless of this value.
    """

    def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
        """
        Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

        Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

        Args:
            packageZip (path-like): Path to the pipeline configuration package.
            path (path-like): Path to the working directory. If unset a temporary directory will be created.
            cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
                If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
                When an error occurs in a component, the working directory will be kept regardless of this value.
        """
        self.package_zip: Path = Path(packageZip)
        self.path = Path(path).resolve().absolute() if path is not None else None
        self.components = {}
        self.parameters = {}
        self.cleanup = cleanup
        self.log_level = loglevel
        self.workdir: Path
        self.docker = VesselBaseDocker()

        self.report_writer = PipelineRunnerReportWriter()
        report_writer_handler = ReportWriterHandler(self.report_writer)
        _logger.addHandler(report_writer_handler)

    def __enter__(self):
        self.report_writer.set_package_zip_path(self.package_zip)
        timestamp = re.sub(r"[-:]", "", datetime.now(timezone.utc).isoformat(sep="_", timespec="seconds"))
        if self.path is not None:
            self.workdir = Path(self.path)
            self.workdir.mkdir(parents=True, exist_ok=True)
            self.cleanup = self.cleanup if self.cleanup is not None else False
        else:
            self.workdir = Path(tempfile.mkdtemp(prefix=f"LocalPipelineRunner_{timestamp}_"))
            self.cleanup = self.cleanup if self.cleanup is not None else True

        unzip_components = False
        with zipfile.ZipFile(self.package_zip) as zf:
            if 'runtime_config.yml' in zf.namelist():
                self.workdir = self.workdir / self.package_zip.stem
                zf.extractall(path=self.workdir)
                unzip_components = True
            else:
                zf.extractall(path=self.workdir)
                self.workdir = self.workdir / zf.namelist()[0]

        try:
            with open(self.workdir / "pipeline_config.yml") as cf:
                config = yaml.load(cf, Loader=yaml.FullLoader)

            components = config["dataFlowPipeline"].get("components", [])
            for component in components:
                component["context"] = None
                component["env_dir"] = self.workdir / component['name']
                self.components[component["name"]] = component

            if unzip_components:
                for component in components:
                    component_zip = f"{component['name']}_{component['version']}.zip"
                    with zipfile.ZipFile(self.workdir / 'components' / component_zip) as zf:
                        zf.extractall(path=self.workdir / component["name"])

            for parameter in config["dataFlowPipeline"].get("pipelineParameters", {}):
                self.parameters[parameter["name"]] = parameter

        except Exception:
            raise RuntimeError(INVALID_PIPELINE_PACKAGE_MESSAGE)

        return self

    def __exit__(self, exception_type, value, traceback):

        self.update_telemetry_data()
        self.report_writer.write_report()

        if self.cleanup:
            _logger.info("Removing local pipeline runner environment...")
            shutil.rmtree(self.workdir.parent)
        else:
            _logger.info(f"Leaving local pipeline runner environment in its final state at '{self.workdir}'")

    def collect_telemetry_data(self) -> dict:
        """
        Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

        Returns:
            dict: A dictionary containing the telemetry data.
        """
        telemetry_data = {}

        telemetry_data["platform"] = {}
        telemetry_data["platform"]["os"] = platform.system()
        telemetry_data["platform"]["release"] = platform.release()
        telemetry_data["platform"]["python_version"] = platform.python_version()

        _logger.info(f"locals: {locals()}")
        telemetry_data["environment"] = {}
        env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
        env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
        env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
        env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
        telemetry_data["environment"]["info"] = env_info

        telemetry_data["industrial_ai"] = {}
        try:
            telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
        except importlib_metadata.PackageNotFoundError:
            telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND

        telemetry_data["pipeline"] = {}
        telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
                                                                 if self.components[component]['runtime']['type'] == 'python'))
        telemetry_data["pipeline"]["file_extensions"] = []
        for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
            excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
            suffixes = list(set(f.suffix for f in component_dir.rglob("*")
                                if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
            for suffix in suffixes:
                if suffix not in telemetry_data["pipeline"]["file_extensions"]:
                    telemetry_data["pipeline"]["file_extensions"].append(suffix)

        return telemetry_data

    def update_telemetry_data(self):
        """
        Update the telemetry data and the package.

        This method updates the telemetry data by loading the existing data from a YAML file,
        or collecting new telemetry data if the file doesn't exist. It then updates the
        "last_test_run" field of the telemetry data with the current timestamp. The updated
        telemetry data is then written back to the YAML file.

        If the package contains a different version of the telemetry data file, a new package
        is created with the updated telemetry data. Otherwise, the existing package is
        overwritten with the new package containing the updated telemetry data.
        """
        _logger.info("Updating telemetry data and the package")
        telemetry_path = self.workdir / "telemetry_data.yml"
        if telemetry_path.is_file():
            telemetry_data = yaml.safe_load(telemetry_path.read_text())
        else:
            telemetry_data = self.collect_telemetry_data()

        telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
        telemetry_path.write_text(yaml.dump(telemetry_data))

        config_package = False
        with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
            for file in zip_read.namelist():
                if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
                    config_package = True
                    break

        new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"

        with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
            with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
                for file in zip_read.namelist():
                    if TELEMETRY_YAML not in file:
                        filepath = zip_read.extract(file, path=self.workdir)
                        zip_write.write(filepath, arcname=file)
                    else:
                        zip_write.write(telemetry_path, arcname=file)

        if config_package:
            shutil.copy(new_zip_path, self.package_zip)
        else:
            new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
            new_sha_path.write_text(calc_sha(new_zip_path))

    def _install_requirements(self, component, package_dir, no_index: bool = True):
        _logger.info("Installing requirements...")
        pip_report_file = component["env_dir"] / "pip_report.json"
        package_dir_path = _relative_to(package_dir, component["env_dir"])
        pip_report_file_path = _relative_to(pip_report_file, component["env_dir"])
        cmd = [
            str(component['python_path']),
            "-m", "pip", "install",
            "--no-warn-script-location",
            "-f", f"{package_dir_path}",
            "-r", REQUIREMENTS_TXT,
        ]

        if no_index:
            cmd += [ "--no-index" ]

        use_no_deps = component.get("runtime", {}).get("installTransitiveDependencies", True) is False
        if use_no_deps:
            cmd += [ "--no-deps" ]

        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)

        if 0 == result.returncode and use_no_deps:
            _logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
                            "This may lead to runtime errors if the dependencies are not handled manually.")

        # generate pip report with a dry run to get the list of installed packages (in case of CPU Python packages)
        if 0 == result.returncode and component.get("hwType", None) == "CPU":
            cmd += ["--dry-run", "--ignore-installed", "--report", f"{pip_report_file_path}"]
            subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
            self.report_writer.add_installed_packages(component["name"], pip_report_file)
        return result

    def _install_from_packages_zip(self, component, package_dir):
        result = self._install_requirements(component, package_dir, True)
        return 0 == result.returncode

    def _install_from_pypi_org(self, component, package_dir):
        return self._install_requirements(component, package_dir, False)

    def _init_component_venv(self, component: dict):
        """
        Creates a virtual environment in which the given component can run.

        Args:
            component (str): name of the selected component.
        """
        _logger.info(f"Creating virtual environment for component '{component['name']}'...")
        context_dir = component["env_dir"] / ".venv"

        if self.docker.is_vessel:
            _logger.info("Creating virtual environment in docker...")
            component["context"], component["python_path"] = self.docker._create_venv(component["env_dir"], component["runtime"]["version"])
            component["python_path"] = Path(component["python_path"]).resolve()
            _logger.info(f"Component context (docker): {component['context']}")
        else:
            builder = venv.EnvBuilder(with_pip=True, symlinks=False)
            builder.create(str(context_dir))
            component["context"] = builder.ensure_directories(context_dir)
            component['python_path'] = Path(component["context"].env_exe).resolve()
        _logger.debug(f"Component python_path: {component['python_path']}")

        _logger.info("Upgrading pip...")
        cmd = [str(component['python_path']), "-m", "pip", "install", "pip", "--upgrade"]
        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
        if result.returncode != 0:
            self.cleanup = False
            _logger.warning(f"Error upgrading pip:\n{result.stderr}")

        cmd = [str(component['python_path']), "-m", "pip", "install", "--upgrade", "setuptools"]
        result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
        if result.returncode != 0:
            self.cleanup = False
            _logger.warning(f"Error installing setuptools:\n{result.stderr}")

        try:
            result = self._install_logmodule(component["python_path"], component["env_dir"])
        except Exception as err:
            _logger.error(err)
            self.cleanup = False
            raise RuntimeError("The 'simaticai' Python package is either not installed or does not contain package 'log_module'.") from None
        if result.returncode != 0:
            self.cleanup = False
            raise RuntimeError(f"Error installing log_module:\n{result.stderr}")

        req_list = Path(component["env_dir"] / "requirements.list")
        req_list.touch(exist_ok=True)
        if Path(component["env_dir"] / REQUIREMENTS_TXT).is_file():
            dependencies, extra_index, index_url = parse_requirements(component["env_dir"] / REQUIREMENTS_TXT)
            requirements = "#".join(dependencies.keys())
            req_list.write_text(requirements)
        else:
            _logger.info(f"'{REQUIREMENTS_TXT}' was not found. No additional dependencies were installed.")
            return

        package_dir = component["env_dir"] / PYTHON_PACKAGES
        package_zip = component["env_dir"] / PYTHON_PACKAGES_ZIP

        _logger.info(f"Extracting {PYTHON_PACKAGES_ZIP}")
        if package_zip.is_file():
            with zipfile.ZipFile(package_zip) as zf:
                zf.extractall(path=package_dir.absolute())
        else:
            _logger.info(f"There is no {PYTHON_PACKAGES_ZIP} to extract.")
            package_dir.mkdir(parents=True, exist_ok=True)

        success = self._install_from_packages_zip(component, package_dir)
        if not success:
            msg = f"Warning! Could not install dependencies from {PYTHON_PACKAGES_ZIP}. "
            msg += "Trying to install them from pypi.org. The resulting Python environment "
            msg += "may be significantly different than the targeted Python environment on the Edge Device!"
            _logger.warning(msg)

            if self.docker.is_vessel and component["runtime"]["type"] == "python":
                raise RuntimeError("The component is running in a docker container. The installation of dependencies from pypi.org is not supported.")
            else:
                second_install_result = self._install_from_pypi_org(component, package_dir)
                if 0 != second_install_result.returncode:
                    self.cleanup = False
                    raise RuntimeError(f"Error installing requirements:\n{second_install_result.stderr}")

    @staticmethod
    def _install_logmodule(python_path, env_dir):
        _logger.info("Installing LogModule...")
        try:
            package_paths = importlib_metadata.files("simaticai")
            assert package_paths is not None
            logger_wheel = [p for p in package_paths if 'log_module' in str(p)][0].locate()
        except Exception:
            from importlib.metadata import Distribution
            direct_url = Distribution.from_name("simaticai").read_text("direct_url.json")
            assert direct_url is not None
            direct_url = json.loads(direct_url)['url']
            direct_url = direct_url.replace('file://','')
            direct_url = Path(direct_url) / 'simaticai' / 'data'
            paths = list(direct_url.rglob('*.whl'))
            logger_wheel = [p for p in paths if 'log_module' in str(p)][0].resolve()

        cmd = [
            str(python_path), "-m", "pip",
            "install",
            "--no-warn-script-location", logger_wheel, "joblib"
        ]
        return subprocess.run(cmd, cwd=env_dir, text=True, stderr=subprocess.PIPE)

    def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
        """
        Runs the component in its virtual environment with the given input.
        This environment is created according to `requirements.txt` in the package.
        Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
        or a DataStream object which will produce the appropriate input data.
        The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.

        Args:
            name (str): The name of the component to be executed.
            data (dict or list): One or more input records for the component.

        Returns:
            dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
            If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

        """
        assert name in self.components, f"Invalid component name: {name}"
        component = self.components[name]

        assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"

        input_payload_path: Path  = component["env_dir"] / "input.joblib"
        output_payload_path: Path = component["env_dir"] / "output.joblib"

        batch_input: bool  = component["batch"]["inputBatch"]  == "Yes" if component.get("batch") is not None else False
        batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False

        # Validate and serialize input payload
        assert data is not None, f"Can not run component '{name}' without input."
        result_is_list = True
        if isinstance(data, list):
            input_payload = data
        elif isinstance(data, DataStream):
            input_payload = [item for item in data]
        else:
            result_is_list = False
            input_payload = [data]
        validate_payload(input_payload, component["inputType"], batch_input)
        joblib.dump(input_payload, input_payload_path)
        self.report_writer.set_input_payload_length(name, len(input_payload))
        _logger.info(f"Input payload saved as '{input_payload_path}'")

        # Assemble command for runnig component
        if component['runtime']['type'] == 'python':
            # Version check for Python
            e_major, e_minor, _, _, _ = sys.version_info
            c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))

            if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
                msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
                msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
                msg += " Pipeline behavior on AI Inference Server might be different."
                _logger.warning(msg)

            if component["context"] is None:
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_component.py', component["env_dir"])

            req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
            json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
            args = [
                "-m", 'run_component',
                "-m", Path(component["entrypoint"]).stem,
                "-p", f"{json_params}",
                "-r", f"{req_list_path}",
            ]

        else:
            # gpuruntime step requires Python environment with onnxruntime installed
            if component["context"] is None:
                shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
                shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])

            args = [
                "-m", 'run_gpuruntime_component',
                "-m", component["entrypoint"],
                "-c", "config.pbtxt",
            ]

        args += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self.log_level)
        ]

        # Run the component in the created Python environment
        _logger.info(f"Running component '{name}'...")
        _logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")

        cmd = [str(component['python_path'])] + args
        _logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
        p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        if p.stdout is not None:
            for line in p.stdout:
                _logger.info(line.strip())
        returncode = p.wait()
        if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
            self.cleanup = False
            raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
        else:
            response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False

        # Deserialize and validate output payload
        _logger.info(f"Loading output payload from '{output_payload_path}'")
        output_payload = joblib.load(output_payload_path)
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        self.report_writer.set_output_payload_length(name, len(output_payload))
        if response_wrapped:
            output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
        else:
            output_payload = [output for output in output_payload if output is not None]
        validate_payload(output_payload, component["outputType"], batch_output)

        if result_is_list:
            return output_payload
        if 0 < len(output_payload):
            return output_payload[0]
        return None

    def update_parameters(self, parameters: dict):
        """
        Validates and updates pipeline parameters.
        The elements of the dictionary must match the parameters specified in the pipeline configuration package.
        If any of the names or types does not match, all parameters will remain untouched.

        Args:

            parameters (dict): names and values of parameters to update

        Raises:

            AssertionError:
                When:
                - either `name` is not in the configured parameters,
                - or `defaultValue` type is different from the configured one
        """
        for key,value in parameters.items():
            if key not in self.parameters.keys():
                if key.upper().startswith("__AI_IS"):
                    self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
                else:
                    raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
            elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
                raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")

        for key, value in parameters.items():
            self.parameters[key]["defaultValue"] = value

    def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
        """
        Runs all the components sequentially, assuming the output of a component is only consumed by the next.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
        For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
        and the output is saved as `output.joblib`.

        Args:
            payload (dict or list): One or more input records for the pipeline.

        Returns:
            The output of the last component.
        """

        for name in self.components.keys():
            payload = self.run_component(name, payload)

        return payload

__init__(packageZip, path=None, cleanup=None, loglevel=logging.INFO)

Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

Parameters:

NameTypeDescriptionDefault
packageZippath - like

Path to the pipeline configuration package.

required
pathpath - like

Path to the working directory. If unset a temporary directory will be created.

None
cleanupbool

If set, the working directory will be kept when True, and deleted when False. If unset, a temporary working directory will be removed, and an explicit working directory will be kept. When an error occurs in a component, the working directory will be kept regardless of this value.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
    """
    Creates a new component LocalPipelineRunner for the provided pipeline configuration package.

    Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.

    Args:
        packageZip (path-like): Path to the pipeline configuration package.
        path (path-like): Path to the working directory. If unset a temporary directory will be created.
        cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
            If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
            When an error occurs in a component, the working directory will be kept regardless of this value.
    """
    self.package_zip: Path = Path(packageZip)
    self.path = Path(path).resolve().absolute() if path is not None else None
    self.components = {}
    self.parameters = {}
    self.cleanup = cleanup
    self.log_level = loglevel
    self.workdir: Path
    self.docker = VesselBaseDocker()

    self.report_writer = PipelineRunnerReportWriter()
    report_writer_handler = ReportWriterHandler(self.report_writer)
    _logger.addHandler(report_writer_handler)

collect_telemetry_data()

Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

Returns:

NameTypeDescription
dictdict

A dictionary containing the telemetry data.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def collect_telemetry_data(self) -> dict:
    """
    Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.

    Returns:
        dict: A dictionary containing the telemetry data.
    """
    telemetry_data = {}

    telemetry_data["platform"] = {}
    telemetry_data["platform"]["os"] = platform.system()
    telemetry_data["platform"]["release"] = platform.release()
    telemetry_data["platform"]["python_version"] = platform.python_version()

    _logger.info(f"locals: {locals()}")
    telemetry_data["environment"] = {}
    env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
    env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
    env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
    env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
    telemetry_data["environment"]["info"] = env_info

    telemetry_data["industrial_ai"] = {}
    try:
        telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
    except importlib_metadata.PackageNotFoundError:
        telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND

    telemetry_data["pipeline"] = {}
    telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
                                                             if self.components[component]['runtime']['type'] == 'python'))
    telemetry_data["pipeline"]["file_extensions"] = []
    for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
        excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
        suffixes = list(set(f.suffix for f in component_dir.rglob("*")
                            if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
        for suffix in suffixes:
            if suffix not in telemetry_data["pipeline"]["file_extensions"]:
                telemetry_data["pipeline"]["file_extensions"].append(suffix)

    return telemetry_data

update_telemetry_data()

Update the telemetry data and the package.

This method updates the telemetry data by loading the existing data from a YAML file, or collecting new telemetry data if the file doesn't exist. It then updates the "last_test_run" field of the telemetry data with the current timestamp. The updated telemetry data is then written back to the YAML file.

If the package contains a different version of the telemetry data file, a new package is created with the updated telemetry data. Otherwise, the existing package is overwritten with the new package containing the updated telemetry data.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_telemetry_data(self):
    """
    Update the telemetry data and the package.

    This method updates the telemetry data by loading the existing data from a YAML file,
    or collecting new telemetry data if the file doesn't exist. It then updates the
    "last_test_run" field of the telemetry data with the current timestamp. The updated
    telemetry data is then written back to the YAML file.

    If the package contains a different version of the telemetry data file, a new package
    is created with the updated telemetry data. Otherwise, the existing package is
    overwritten with the new package containing the updated telemetry data.
    """
    _logger.info("Updating telemetry data and the package")
    telemetry_path = self.workdir / "telemetry_data.yml"
    if telemetry_path.is_file():
        telemetry_data = yaml.safe_load(telemetry_path.read_text())
    else:
        telemetry_data = self.collect_telemetry_data()

    telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
    telemetry_path.write_text(yaml.dump(telemetry_data))

    config_package = False
    with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
        for file in zip_read.namelist():
            if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
                config_package = True
                break

    new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"

    with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
        with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
            for file in zip_read.namelist():
                if TELEMETRY_YAML not in file:
                    filepath = zip_read.extract(file, path=self.workdir)
                    zip_write.write(filepath, arcname=file)
                else:
                    zip_write.write(telemetry_path, arcname=file)

    if config_package:
        shutil.copy(new_zip_path, self.package_zip)
    else:
        new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
        new_sha_path.write_text(calc_sha(new_zip_path))

run_component(name, data)

Runs the component in its virtual environment with the given input. This environment is created according to requirements.txt in the package. Additionally 'joblib' and the mock log_module is automatically installed in this virtual environment. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries, or a DataStream object which will produce the appropriate input data. The supplied input data is saved as inputs.joblib in the component runtime directory, and the output is saved as output.joblib.

Parameters:

NameTypeDescriptionDefault
namestr

The name of the component to be executed.

required
datadict or list

One or more input records for the component.

required

Returns:

TypeDescription
Optional[Union[dict, list]]

dict / list: A list of dictionaries for outputs if there were no errors and field ready is true.

Optional[Union[dict, list]]

If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
    def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
        """
        Runs the component in its virtual environment with the given input.
        This environment is created according to `requirements.txt` in the package.
        Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
        The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
        or a DataStream object which will produce the appropriate input data.
        The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.

        Args:
            name (str): The name of the component to be executed.
            data (dict or list): One or more input records for the component.

        Returns:
            dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
            If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.

        """
        assert name in self.components, f"Invalid component name: {name}"
        component = self.components[name]

        assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"

        input_payload_path: Path  = component["env_dir"] / "input.joblib"
        output_payload_path: Path = component["env_dir"] / "output.joblib"

        batch_input: bool  = component["batch"]["inputBatch"]  == "Yes" if component.get("batch") is not None else False
        batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False

        # Validate and serialize input payload
        assert data is not None, f"Can not run component '{name}' without input."
        result_is_list = True
        if isinstance(data, list):
            input_payload = data
        elif isinstance(data, DataStream):
            input_payload = [item for item in data]
        else:
            result_is_list = False
            input_payload = [data]
        validate_payload(input_payload, component["inputType"], batch_input)
        joblib.dump(input_payload, input_payload_path)
        self.report_writer.set_input_payload_length(name, len(input_payload))
        _logger.info(f"Input payload saved as '{input_payload_path}'")

        # Assemble command for runnig component
        if component['runtime']['type'] == 'python':
            # Version check for Python
            e_major, e_minor, _, _, _ = sys.version_info
            c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))

            if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
                msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
                msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
                msg += " Pipeline behavior on AI Inference Server might be different."
                _logger.warning(msg)

            if component["context"] is None:
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_component.py', component["env_dir"])

            req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
            json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
            args = [
                "-m", 'run_component',
                "-m", Path(component["entrypoint"]).stem,
                "-p", f"{json_params}",
                "-r", f"{req_list_path}",
            ]

        else:
            # gpuruntime step requires Python environment with onnxruntime installed
            if component["context"] is None:
                shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
                self._init_component_venv(component)
                shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
                shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])

            args = [
                "-m", 'run_gpuruntime_component',
                "-m", component["entrypoint"],
                "-c", "config.pbtxt",
            ]

        args += [
            "-i", input_payload_path.name,
            "-o", output_payload_path.name,
            "-ll", logging.getLevelName(self.log_level)
        ]

        # Run the component in the created Python environment
        _logger.info(f"Running component '{name}'...")
        _logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")

        cmd = [str(component['python_path'])] + args
        _logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
        p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        if p.stdout is not None:
            for line in p.stdout:
                _logger.info(line.strip())
        returncode = p.wait()
        if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
            self.cleanup = False
            raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
        else:
            response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False

        # Deserialize and validate output payload
        _logger.info(f"Loading output payload from '{output_payload_path}'")
        output_payload = joblib.load(output_payload_path)
        output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
        self.report_writer.set_output_payload_length(name, len(output_payload))
        if response_wrapped:
            output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
        else:
            output_payload = [output for output in output_payload if output is not None]
        validate_payload(output_payload, component["outputType"], batch_output)

        if result_is_list:
            return output_payload
        if 0 < len(output_payload):
            return output_payload[0]
        return None

update_parameters(parameters)

Validates and updates pipeline parameters. The elements of the dictionary must match the parameters specified in the pipeline configuration package. If any of the names or types does not match, all parameters will remain untouched.

Args:

parameters (dict): names and values of parameters to update

Raises:

AssertionError:
    When:
    - either `name` is not in the configured parameters,
    - or `defaultValue` type is different from the configured one
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_parameters(self, parameters: dict):
    """
    Validates and updates pipeline parameters.
    The elements of the dictionary must match the parameters specified in the pipeline configuration package.
    If any of the names or types does not match, all parameters will remain untouched.

    Args:

        parameters (dict): names and values of parameters to update

    Raises:

        AssertionError:
            When:
            - either `name` is not in the configured parameters,
            - or `defaultValue` type is different from the configured one
    """
    for key,value in parameters.items():
        if key not in self.parameters.keys():
            if key.upper().startswith("__AI_IS"):
                self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
            else:
                raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
        elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
            raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")

    for key, value in parameters.items():
        self.parameters[key]["defaultValue"] = value

run_pipeline(payload={})

Runs all the components sequentially, assuming the output of a component is only consumed by the next. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries. For each component the supplied input data is saved as input.joblib in the component runtime directory, and the output is saved as output.joblib.

Parameters:

NameTypeDescriptionDefault
payloaddict or list

One or more input records for the pipeline.

{}

Returns:

TypeDescription
Optional[Union[dict, list]]

The output of the last component.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
    """
    Runs all the components sequentially, assuming the output of a component is only consumed by the next.
    The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
    For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
    and the output is saved as `output.joblib`.

    Args:
        payload (dict or list): One or more input records for the pipeline.

    Returns:
        The output of the last component.
    """

    for name in self.components.keys():
        payload = self.run_component(name, payload)

    return payload

validate_payload(data, variables, batch, logger=_logger)

Validates that data is a valid list of input or output payload items. Variables list what variables each playload item has. Batch indicates if the payload items are themselves batches of items or not.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def validate_payload(data: list, variables: list, batch: bool, logger = _logger):
    """
    Validates that data is a valid list of input or output payload items.
    Variables list what variables each playload item has.
    Batch indicates if the payload items are themselves batches of items or not.
    """
    global extra_variables_to_log
    extra_variables_to_log = set()

    assert isinstance(data, list), "payload data must be a 'list'"
    for i in data:
        if batch:
            assert isinstance(i, list), "batch payload items must be 'list' instances"
        else:
            i = [i]
        for j in i:
            validate_payload_item(j, variables, logger)
    if len(extra_variables_to_log):
        logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables_to_log}")

validate_payload_item(data, variables, logger)

Validates that data is a valid payload item. Variables listed must have a corresponding field in data. The types of the values must match their declared type.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def validate_payload_item(data: dict, variables: list, logger):
    """
    Validates that data is a valid payload item.
    Variables listed must have a corresponding field in data.
    The types of the values must match their declared type.
    """
    assert isinstance(data, dict), "payload items must be 'dict' isntances"
    for variable in variables:
        name = variable["name"]

        value = data.get(name, None)
        if value is None:
            logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
            continue

        if variable["type"] == "String":
            assert isinstance(value, str), "'String' value must be an 'str'"

        if variable["type"] == "StringArray":
            assert isinstance(value, list), "'StringArray' value must be a 'list'"
            assert all(isinstance(i, str) for i in value), "'StringArray' items must be 'str' isntances"

        if variable["type"] == "Object":
            assert isinstance(value, dict), "'Object' value must be a 'dict'"
            values = list(value.values())
            assert len(values) == 2, "'Object' value must have exactly 2 items"
            ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
            assert ok, "'Object' value must have exactly one 'str' and one 'bytes' field"

    payload_names = set(data.keys())
    variable_names = { variable["name"] for variable in variables }
    variable_names.add('timestamp')
    extra_variables = payload_names - variable_names
    extra_variables_to_log.update(extra_variables)

Static validation of pipeline packages.

Executes static checks on a pipeline configuration package including:

  • Verifying that the Python version required in the package is supported by a known version of the AI Inference Server.
  • Verifying that all the required Python packages are either included in the pipeline package itself or available on pypi.org for the target platform.

PipelineValidationError

Bases: Exception

Represents a problem with the pipeline configuration.

Parameters:

NameTypeDescriptionDefault
descriptionstr

Description of the error. Mandatory argument.

required
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
class PipelineValidationError(Exception):
    """
    Represents a problem with the pipeline configuration.

    Args:
        description (str): Description of the error. Mandatory argument.
    """
    def __init__(self, description: str) -> None:
        assert description is not None
        self.value = description

    def __str__(self):
        return self.value

validate_pipeline_dependencies(zip_path)

@Deprecated, reason: In the future only the edge package will be generated and the same validation is performed during edge package creation.

Validates an already built pipeline configuration package to check if it is compatible with the AI Inference Server.

This method verifies that the requirements identified by name and version are either included in PythonPackages.zip or available on pypi.org for the target platform. If the required dependency for the target platform is not available on pypi.org and not present in PythonPackages.zip it will log the problem at the ERROR level.

Parameters:

NameTypeDescriptionDefault
zip_pathpath - like

Path to the pipeline configuration package zip file

required

Raises:

TypeDescription
PipelineValidationError

if the validation fails. See the logger output for details.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def validate_pipeline_dependencies(zip_path):
    """
    @Deprecated, reason: In the future only the edge package will be generated and the same validation is performed during edge package creation.

    Validates an already built pipeline configuration package to check if it is compatible with the AI Inference Server.

    This method verifies that the requirements identified by name and version are either included
    in `PythonPackages.zip` or available on pypi.org for the target platform.
    If the required dependency for the target platform is not available on pypi.org
    and not present in `PythonPackages.zip` it will log the problem at the ERROR level.

    Args:
        zip_path (path-like): Path to the pipeline configuration package zip file

    Raises:
        PipelineValidationError: if the validation fails. See the logger output for details.
    """
    with tempfiles.OpenZipInTemp(zip_path) as package_dir:
        package_dir = next(package_dir.iterdir())
        error = read_config_and_download_deps(package_dir)
        if error:
            raise PipelineValidationError("Requirements of one or more components can not be satisfied.")
    _logger.info(f"Validating pipeline package '{zip_path}' was successful.")

download_component_dependencies(component, package_dir)

Download the dependencies of a pipeline component.

Parameters:

NameTypeDescriptionDefault
componentdict

A single component from the parsed pipeline_configuration.yml.

required
package_dirPath

The directory where the component was extracted

required
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def download_component_dependencies(component: dict, package_dir: Path):
    """
    Download the dependencies of a pipeline component.

    Args:
        component (dict): A single component from the parsed `pipeline_configuration.yml`.
        package_dir (Path): The directory where the component was extracted
    """
    _logger.info(f"Validating requirements of component: {component['name']}")
    try:
        deployment.python_version_validator(component['runtime']['version'])
    except ValueError as error:
        raise PipelineValidationError(error)

    component_dir = package_dir / component['name']
    requirements_file_path = component_dir / REQUIREMENTS_TXT

    python_packages_folder = _build_python_packages_folder(component_dir)

    return not _are_dependencies_available(requirements_file_path, component['runtime']['version'], python_packages_folder)

read_config_and_download_deps(package_dir)

Reads the pipeline configuration from the package directory and downloads its dependencies

Parameters:

NameTypeDescriptionDefault
package_dirPath

The directory where the pipeline configuration package was extracted.

required

Returns:

NameTypeDescription
boolbool

True if there was an error during the download of the components, False otherwise

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def read_config_and_download_deps(package_dir: Path) -> bool:
    """
    Reads the pipeline configuration from the package directory and downloads its dependencies

    Args:
        package_dir (Path): The directory where the pipeline configuration package was extracted.

    Returns:
        bool: True if there was an error during the download of the components, False otherwise
    """
    try:
        config = yaml_helper.read_yaml(package_dir / 'pipeline_config.yml')
        error = False
        for component in config['dataFlowPipeline']['components']:
            error = download_component_dependencies(component, package_dir) or error
        return error
    except Exception:
        raise PipelineValidationError(INVALID_PIPELINE_PACKAGE_MESSAGE)

Utility script for running an entrypoint Python script in a given virtual Python environment. It is designed to be executed from simaticai.testing.PipelineRunner class. It consumes input data from a joblib file and produces output data into a joblib file.

warn_about_unused_dependencies(requirements_list)

Raises a warning if some declared dependencies were not used during test execution

This method compares the imported modules after test execution with the dependency list in the package's requirements.txt file. If the requirements contains more than what is required for execution, it raises a warning.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/run_component.py
def warn_about_unused_dependencies(requirements_list):
    """
    Raises a warning if some declared dependencies were not used during test execution

    This method compares the imported modules after test execution with the
    dependency list in the package's requirements.txt file. If the requirements
    contains more than what is required for execution, it raises a warning.
    """
    imported_packages = []
    packages = importlib.metadata.packages_distributions()

    for module_name in sys.modules:
        try:
            if module_name and '.' not in module_name:  # Only check top-level modules
                dist = importlib.metadata.distribution(module_name)
                imported_packages.append(dist.name)
        except Exception:
            if module_name in packages:
                pkgs = packages.get(module_name)
                for pkg in pkgs:
                    imported_packages.append(pkg)

    imported_packages = set([pkg.replace('-','_').lower() for pkg in imported_packages])
    requirements_list = set([pkg.replace('-','_').lower() for pkg in requirements_list])

    diff = requirements_list.difference(imported_packages)

    if 0 < len(diff):
        _logger.warning(f"WARNING! The following dependencies were not used during execution: {', '.join(diff)}. Consider removing them from the pipeline package.")

main(args)

Feeds input to the entrypoint and captures output.

Imports entrypoint module given with its name, and triggers its run(...) function with the prepared data in the input file. If pipeline_parameters dictionary is not empty, before triggering run(..) method, the update_parameters(..) method of entrypoint will be called with the dictionary. The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries. One dictionary represents one input for the component with the required variable names and values, which is directly passed to run(). The output file is a dumped joblib result which is a list containing outputs of the component, in the structure returned from run().

Parameters:

NameTypeDescriptionDefault
argsNamespace

Command line arguments
module_name (str): Name of the entrypoint Python script
input_file (os.Pathlike): Path of the joblib file containing the input payloads
output_file (os.Pathlike): Path of the joblib file where the outputs will be stored
pipeline_parameters (json-string): json formatted dictionary defining configurable parameters with their names as key and their values

required
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/run_component.py
def main(args: argparse.Namespace) -> int:
    """
    Feeds input to the entrypoint and captures output.

    Imports entrypoint module given with its name, and triggers its `run(...)` function with the prepared data in the input file.
    If pipeline_parameters dictionary is not empty, before triggering `run(..)` method, the `update_parameters(..)` method of entrypoint will be called with the dictionary.
    The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries.
    One dictionary represents one input for the component with the required variable names and values, which is directly passed to `run()`.
    The output file is a dumped joblib result which is a list containing outputs of the component, in the structure returned from `run()`.

    Args:
        args (argparse.Namespace): Command line arguments<br/>
            module_name (str): Name of the entrypoint Python script<br/>
            input_file (os.Pathlike): Path of the joblib file containing the input payloads<br/>
            output_file (os.Pathlike): Path of the joblib file where the outputs will be stored<br/>
            pipeline_parameters (json-string): json formatted dictionary defining configurable parameters with their names as key and their values
    """

    entrypoint = importlib.import_module(args.module_name)
    trigger_method = None

    try:
        inspect.signature(entrypoint.process_input)
        trigger_method = "process_input"
    except AttributeError:
        try:
            inspect.signature(entrypoint.run)
            trigger_method = "run"
        except AttributeError:
            _logger.warning("Method run not found")

    if trigger_method is None:
        raise RuntimeError("Neither 'run(data: str)' nor 'process_input(data: dict)' entrypoint method can be found.")

    if args.pipeline_parameters is not None and args.pipeline_parameters != "{}":
        pipeline_parameters = json.loads(args.pipeline_parameters)
    else:
        pipeline_parameters = {}

    try:
        _logger.debug(f"Calling `update_parameters(..)` with: {args.pipeline_parameters}")
        entrypoint.update_parameters(pipeline_parameters)
    except AttributeError:
        _logger.warning("Entrypoint does not implement `update_parameters()` method. Skipping pipeline parameter update.")

    input_list = joblib.load(args.input_file)
    if not isinstance(input_list, list):
        raise ValueError("Component input must be supplied as a list.")

    if trigger_method == "process_input":
        _logger.debug("Calling `process_input(..)`")
    else:
        _logger.debug("Calling `run(..)`")

    output_list = []
    for input_data in input_list:
        if trigger_method == "process_input":
            output_list.append(entrypoint.process_input(input_data))
        else:
            output_list.append(entrypoint.run(json.dumps(input_data)))

    if args.requirements_file is not None:
        requirements_list = Path(args.requirements_file).read_text().split('#')
        warn_about_unused_dependencies(requirements_list)

    joblib.dump(output_list, args.output_file)

    if trigger_method == "process_input":
        return 0
    else:
        _logger.warning("Trigger method `run(data: str)` is deprecated and will be removed in the future. Please refer the user manual.")
        return 0b10001  # return code 0b10001 means deprecated `run` method was triggered

Utility script for running an ONNX model in a given virtual Python environment. It is designed to be executed from simaticai.testing.PipelineRunner class. It consumes input data from a joblib file and produces output data into a joblib file.

main(model_path, config_path, input_file, output_file)

Feeds input to the ML Model saved in ONNX format and captures output.

Reads the given model and creates an onnxruntime Session to The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries. The output file is a dumped joblib result list containing the input dictionary extended with the generated predictions.

Parameters:

NameTypeDescriptionDefault
model_pathstr

File path for the stored ML Model in ONNX format

required
input_filePathlike

Path of the joblib file containing the input payloads

required
output_filePathlike

Path of the joblib file where the outputs will be stored

required
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/run_gpuruntime_component.py
def main(model_path, config_path, input_file, output_file):
    """Feeds input to the ML Model saved in ONNX format and captures output.

    Reads the given model and creates an onnxruntime Session to
    The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries.
    The output file is a dumped joblib result list containing the input dictionary extended with the generated predictions.

    Args:
        model_path (str): File path for the stored ML Model in ONNX format
        input_file (os.Pathlike): Path of the joblib file containing the input payloads
        output_file (os.Pathlike): Path of the joblib file where the outputs will be stored
    """
    input_list = joblib.load(input_file)
    input_list = input_list if type(input_list) is list else [input_list]

    output_list = []
    session = InferenceSession(model_path)

    model_config = _get_proto_config(config_path)
    max_batch_size = model_config.get("maxBatchSize", 0)

    inputs = [input for input in session.get_inputs()]
    input_names = [input.name for input in inputs]

    outputs = [output for output in session.get_outputs()]
    output_names = [output.name for output in outputs]

    for _input in input_list:
        _input_tensor = {}
        output = {k: _input.get(k) for k in _input.keys() if k not in input_names}
        for _input_info in inputs:
            if max_batch_size > 0:
                input_shape = _get_new_shape(model_config["input"], _input[_input_info.name], _input_info.name)
                if input_shape[0] > max_batch_size:
                    _logger.warning(f"Received input batch size ({input_shape[0]}) is greater than max_batch_size ({max_batch_size})!")
                _input_tensor[_input_info.name] = _input[_input_info.name].reshape(input_shape)
            else:
                _input_tensor[_input_info.name] = _input[_input_info.name].reshape(_input_info.shape)

        model_outputs = session.run(output_names, _input_tensor)

        output |= dict(zip(output_names, model_outputs))
        output_list.append(output)

    joblib.dump(output_list, output_file)

    return 0

RunnerConfig module and class is responsible to handle the configuration json.
An instance of RunnerConfig class is being used to setup a LocalPipelineRunner with the required parameters.

TimeSeriesStreamConfig dataclass

TimeSeriesStreamConfig is a configuration class used to manage settings for the time series stream. Attributes: fields (list): List of fields to be included in the time series stream. Defaults to None. count (int): Number of records to be included in the time series stream. Defaults to None. offset (int): Offset to be applied in the time series stream. Defaults to None. batch_size (int): Size of the batch to be used in the time series stream. Defaults to None.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py
@dataclass
class TimeSeriesStreamConfig:
    """
    TimeSeriesStreamConfig is a configuration class used to manage settings for the time series stream.
    Attributes:
        fields (list): List of fields to be included in the time series stream. Defaults to None.
        count (int): Number of records to be included in the time series stream. Defaults to None.
        offset (int): Offset to be applied in the time series stream. Defaults to None.
        batch_size (int): Size of the batch to be used in the time series stream. Defaults to None.
    """
    fields: list = field(default=None)
    count: int = field(default=None)
    offset: int = field(default=None)
    batch_size: int = field(default=None)

VCAStreamConfig dataclass

VCAStreamConfig is a configuration class used to manage settings for the VCA stream. Attributes: variable_name (str): Name of the variable to be used in the VCA stream. Defaults to None. image_format (str): Format of the images to be used in the VCA stream. Defaults to None. filter (str): Filter to be applied in the VCA stream. Defaults to None.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py
@dataclass
class VCAStreamConfig:
    """
    VCAStreamConfig is a configuration class used to manage settings for the VCA stream.
    Attributes:
        variable_name (str): Name of the variable to be used in the VCA stream. Defaults to None.
        image_format (str): Format of the images to be used in the VCA stream. Defaults to None.
        filter (str): Filter to be applied in the VCA stream. Defaults to None.
    """
    variable_name: str = field(default=None)
    image_format: str = field(default=None)
    filter: str = field(default=None)

RunnerConfig dataclass

RunnerConfig is a configuration class used to manage settings for running tests and streams. Attributes: data (PathLike): Path to the data file or directory. Defaults to None. test_dir (PathLike): Path to the test directory. Defaults to None. cleanup (bool): Indicates whether to clean up resources after execution. Defaults to True. time_series_stream (TimeSeriesStreamConfig): Configuration for the time series stream. vca_stream (VCAStreamConfig): Configuration for the VCA stream. Methods: post_init(): Converts data and test_dir attributes to Path objects if they are not None. dict(): Returns a dictionary representation of the configuration, including nested stream configurations. from_json(config_path: PathLike | str) -> RunnerConfig: Creates a RunnerConfig instance from a JSON configuration file. Args: config_path (PathLike | str): Path to the JSON configuration file. Returns: RunnerConfig: An instance of the RunnerConfig class populated with values from the JSON file. Exceptions: - Logs a warning and returns a default RunnerConfig instance if the file is not found. - Logs a warning and returns a default RunnerConfig instance if the file is not a valid JSON. - Logs a warning and returns a default RunnerConfig instance if any other error occurs during file reading.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py
@dataclass
class RunnerConfig:
    """
    RunnerConfig is a configuration class used to manage settings for running tests and streams.
    Attributes:
        data (PathLike): Path to the data file or directory. Defaults to None.
        test_dir (PathLike): Path to the test directory. Defaults to None.
        cleanup (bool): Indicates whether to clean up resources after execution. Defaults to True.
        time_series_stream (TimeSeriesStreamConfig): Configuration for the time series stream.
        vca_stream (VCAStreamConfig): Configuration for the VCA stream.
    Methods:
        __post_init__():
            Converts `data` and `test_dir` attributes to `Path` objects if they are not None.
        __dict__():
            Returns a dictionary representation of the configuration, including nested stream configurations.
        from_json(config_path: PathLike | str) -> RunnerConfig:
            Creates a `RunnerConfig` instance from a JSON configuration file.
            Args:
                config_path (PathLike | str): Path to the JSON configuration file.
            Returns:
                RunnerConfig: An instance of the `RunnerConfig` class populated with values from the JSON file.
            Exceptions:
                - Logs a warning and returns a default `RunnerConfig` instance if the file is not found.
                - Logs a warning and returns a default `RunnerConfig` instance if the file is not a valid JSON.
                - Logs a warning and returns a default `RunnerConfig` instance if any other error occurs during file reading.

    """
    data: PathLike = field(default=None)
    test_dir: PathLike = field(default=None)
    cleanup: bool = field(default=True)
    time_series_stream: TimeSeriesStreamConfig = field(default_factory=TimeSeriesStreamConfig)
    vca_stream: VCAStreamConfig = field(default_factory=VCAStreamConfig)

    def __post_init__(self):
        if self.data is not None:
            self.data = Path(self.data)
        if self.test_dir is not None:
            self.test_dir = Path(self.test_dir)

    @property
    def __dict__(self):
        return {
            "data": self.data,
            "test_dir": self.test_dir,
            "cleanup": self.cleanup,
            "TimeSeriesStream": {
                "fields": self.time_series_stream.fields,
                "count": self.time_series_stream.count,
                "offset": self.time_series_stream.offset,
                "batch_size": self.time_series_stream.batch_size
            },
            "VCAStream": {
                "variable_name": self.vca_stream.variable_name,
                "image_format": self.vca_stream.image_format,
                "filter": self.vca_stream.filter
            }
        }

    @classmethod
    def from_json(cls, config_path: PathLike | str):
        import json

        try:
            config_path = Path(config_path)
            with open(config_path, 'r') as file:
                json_config = json.load(file)

        except FileNotFoundError:
            logger.warning(f"Configuration file {config_path} does not exist.")
            return cls()
        except json.JSONDecodeError:
            logger.warning(f"Configuration file {config_path} is not a valid JSON.")
            return cls()
        except Exception as e:
            logger.warning(f"An error occurred while reading the configuration file {config_path}: {e}")
            return cls()

        data = json_config.get('data')
        test_dir = json_config.get('test_dir')
        cleanup = json_config.get('cleanup', True)

        time_series_stream = TimeSeriesStreamConfig(
            fields=json_config.get('TimeSeriesStream', {}).get('fields'),
            count=json_config.get('TimeSeriesStream', {}).get('count'),
            offset=json_config.get('TimeSeriesStream', {}).get('offset'),
            batch_size=json_config.get('TimeSeriesStream', {}).get('batch_size')
        )

        vca_stream = VCAStreamConfig(
            variable_name=json_config.get('VCAStream', {}).get('variable_name'),
            image_format=json_config.get('VCAStream', {}).get('image_format'),
            filter=json_config.get('VCAStream', {}).get('filter')
        )
        return cls(
            data=data,
            test_dir=test_dir,
            cleanup=cleanup,
            time_series_stream=time_series_stream,
            vca_stream=vca_stream,
        )

TimeSeriesStream

Bases: DataStream

This class creates a generator from a csv file.

The generate function returns a generator that reads the csv file line by line and converts each line to an input dictionary, as if it were received from AI Inference Server.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.py
class TimeSeriesStream(DataStream):
    """
    This class creates a generator from a csv file.

    The generate function returns a generator that reads the csv file line by line and
    converts each line to an input dictionary, as if it were received from AI Inference Server.
    """

    def __init__(self, csv_path: os.PathLike, *, fields: Optional[list] = None, count: Optional[int] = None, offset: Optional[int] = None, batch_size: Optional[int] = None):
        """
        Creates a new TimeSeriesStream object

        Args:
            csv_path (os.Pathlike): Path to the csv file.
            fields (Optional[list[str]]): List of required column headers. Will use all columns when None.

        """
        self.csv_path = Path(csv_path)
        if fields is None:
            self.fields = []
        else:
            self.fields = fields

        with open(self.csv_path, "r") as file:
            self.header = file.readline().strip().split(",")

        if any(key for key in self.fields if key not in self.header):
            raise KeyError("The CSV file must contain variable names in the first row.")

        if self.fields == []:
            self.fields = self.header
            fields_len = len(self.fields)
            filtered_len = len(list(k for k in self.fields if re.match("[a-zA-Z]+", k)))
            if filtered_len != fields_len:
                raise KeyError("Column headers should start with a letter.")

        if (not isinstance(count, int)) or count < 0:
            self.count = 0
        else:
            self.count = count
        if (not isinstance(offset, int)) or offset < 0:
            self.offset = 0
        else:
            self.offset = offset
        if (not isinstance(batch_size, int)) or batch_size < 0:
            self.batch_size = 0
        else:
            self.batch_size = batch_size

    def _read_value(self, value):
        try:
            return int(value)
        except ValueError:
            try:
                return float(value)
            except ValueError:
                return value

    def _read_csv(self):
        with open(self.csv_path, 'r', encoding='UTF-8') as csv_file:
            csv_reader = csv.DictReader(csv_file)
            for line in csv_reader:
                if 0 < len(self.fields):
                    key_holder = self.fields
                else:
                    key_holder = line.keys()
                result = {}
                for k in key_holder:
                    result[k] = self._read_value(line[k])
                yield result

    def _limit(self):
        _max = float('inf')
        if 0 < self.count:
            _max = self.offset + self.count
        counter = 0
        for line in self._read_csv():
            if _max <= counter:
                break
            counter += 1
            if counter <= self.offset:
                continue
            yield line

    def _batch(self):
        aggregate = []
        for line in self._limit():
            if len(aggregate) < self.batch_size:
                aggregate += [line]
                continue
            yield aggregate
            aggregate = [line]
        if self.batch_size == len(aggregate):
            yield aggregate
        else:
            _logger.warning(f"WARNING! The length of the given dataset is not divisible by {self.batch_size}. There are {len(aggregate)} inputs remaining in the buffer.")

    def __iter__(self):
        """
        Creates the input data generator.

        Returns: a generator
        """
        if 0 < self.batch_size:
            return self._batch()
        else:
            return self._limit()

__init__(csv_path, *, fields=None, count=None, offset=None, batch_size=None)

Creates a new TimeSeriesStream object

Parameters:

NameTypeDescriptionDefault
csv_pathPathlike

Path to the csv file.

required
fieldsOptional[list[str]]

List of required column headers. Will use all columns when None.

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.py
def __init__(self, csv_path: os.PathLike, *, fields: Optional[list] = None, count: Optional[int] = None, offset: Optional[int] = None, batch_size: Optional[int] = None):
    """
    Creates a new TimeSeriesStream object

    Args:
        csv_path (os.Pathlike): Path to the csv file.
        fields (Optional[list[str]]): List of required column headers. Will use all columns when None.

    """
    self.csv_path = Path(csv_path)
    if fields is None:
        self.fields = []
    else:
        self.fields = fields

    with open(self.csv_path, "r") as file:
        self.header = file.readline().strip().split(",")

    if any(key for key in self.fields if key not in self.header):
        raise KeyError("The CSV file must contain variable names in the first row.")

    if self.fields == []:
        self.fields = self.header
        fields_len = len(self.fields)
        filtered_len = len(list(k for k in self.fields if re.match("[a-zA-Z]+", k)))
        if filtered_len != fields_len:
            raise KeyError("Column headers should start with a letter.")

    if (not isinstance(count, int)) or count < 0:
        self.count = 0
    else:
        self.count = count
    if (not isinstance(offset, int)) or offset < 0:
        self.offset = 0
    else:
        self.offset = offset
    if (not isinstance(batch_size, int)) or batch_size < 0:
        self.batch_size = 0
    else:
        self.batch_size = batch_size

__iter__()

Creates the input data generator.

Returns: a generator

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.py
def __iter__(self):
    """
    Creates the input data generator.

    Returns: a generator
    """
    if 0 < self.batch_size:
        return self._batch()
    else:
        return self._limit()

This module contains the VCAStream class, which generates image data streams from a specified folder of images. The images are converted into the ImageSet format, suitable for use on testing purposes.

The VCAStream class inherits from DataStream and provides an iterator that yields ImageSet objects containing images in the specified format.

Example usage

vca_stream = VCAStream(data='/path/to/image/folder', variable_name='vision_payload', image_format='BayerRG8') with LocalPipelineRunner("MyPipeline-edge_1.zip") as runner: runner.run_pipeline(vca_stream)

VCAStream

Bases: DataStream

This class creates a generator from a folder of images.

The generate function returns a generator that walks over the image folder and converts each image into the specified format, BayerRG8 by default. The resulting object is in the ImageSet format, as if it were received from AI Inference Server.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
class VCAStream(DataStream):
    """
    This class creates a generator from a folder of images.

    The generate function returns a generator that walks over the image folder and converts
    each image into the specified format, BayerRG8 by default. The resulting object is in the
    ImageSet format, as if it were received from AI Inference Server.
    """

    def __init__(self, data: os.PathLike, variable_name: str = 'vision_payload', image_format: str | ImageFormat = 'BayerRG8', filter: str | None = None):
        """
        Creates a new VCAStream object

        Args:
            data (os.Pathlike): Path to the directory of images
            variable_name (str): Name of the variable to store the images (default: 'vision_payload')
            image_format (str): Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default),
                'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed'
            filter (rglob_pattern): Pattern to filter the images (see also: pathlib.rglob())
        """
        self.seq = 0
        self.data = data
        if filter is None or "" == filter.strip():
            self.filter = "**/*.[jJpP][pPnN][gGeE]*"
        else:
            self.filter = filter
        if variable_name is None or "" == variable_name.strip():
            self.variable_name = 'vision_payload'
        else:
            self.variable_name = variable_name

        if image_format is None:
            self.image_format = ImageFormat.BayerRG8
        else:
            try:
                self.image_format = ImageDetails._parse_image_format(image_format)
            except ValueError as err:
                raise AssertionError(str(err))

        self.camera_id = uuid.uuid4()
        self._iter_image_paths: list[Path] = []
        self._iter_current_index: int = 0

    def __iter__(self):
        """
        Initializes the iterator by collecting all image paths and resetting the index.

        Returns: self (the iterator object)
        """
        self._iter_image_paths = list(Path(self.data).rglob(self.filter))
        self._iter_current_index = 0
        return self

    def __next__(self):
        """
        Returns the next ImageSet from the image folder.

        Raises:
            StopIteration: When there are no more images to process.

        Returns: A dictionary containing the ImageSet variable.
        """
        if len(self._iter_image_paths) == 0:
            self.__iter__()

        if self._iter_current_index >= len(self._iter_image_paths):
            raise StopIteration

        image_path = self._iter_image_paths[self._iter_current_index]
        self._iter_current_index += 1
        return self._create_imageset(image_path)

    def _create_imageset(self, image_path):

        image_detail = ImageDetails.from_image(image_path=image_path, format=self.image_format)
        image_detail.id = f'VCA Stream : {image_detail.id}'
        image_detail.metadata = {"ptpstatus": "Disabled", "ptptimestamp": "0"}
        image_detail.seq = self.seq
        self.seq += 1

        imageset = ImageSet(
            version='1',
            cameraid=str(self.camera_id),
            timestamp=image_detail.timestamp,
        )
        imageset.add_image(image_detail)

        result = {self.variable_name: imageset.to_dict()}
        return result

__init__(data, variable_name='vision_payload', image_format='BayerRG8', filter=None)

Creates a new VCAStream object

Parameters:

NameTypeDescriptionDefault
dataPathlike

Path to the directory of images

required
variable_namestr

Name of the variable to store the images (default: 'vision_payload')

'vision_payload'
image_formatstr

Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default), 'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed'

'BayerRG8'
filterrglob_pattern

Pattern to filter the images (see also: pathlib.rglob())

None
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
def __init__(self, data: os.PathLike, variable_name: str = 'vision_payload', image_format: str | ImageFormat = 'BayerRG8', filter: str | None = None):
    """
    Creates a new VCAStream object

    Args:
        data (os.Pathlike): Path to the directory of images
        variable_name (str): Name of the variable to store the images (default: 'vision_payload')
        image_format (str): Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default),
            'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed'
        filter (rglob_pattern): Pattern to filter the images (see also: pathlib.rglob())
    """
    self.seq = 0
    self.data = data
    if filter is None or "" == filter.strip():
        self.filter = "**/*.[jJpP][pPnN][gGeE]*"
    else:
        self.filter = filter
    if variable_name is None or "" == variable_name.strip():
        self.variable_name = 'vision_payload'
    else:
        self.variable_name = variable_name

    if image_format is None:
        self.image_format = ImageFormat.BayerRG8
    else:
        try:
            self.image_format = ImageDetails._parse_image_format(image_format)
        except ValueError as err:
            raise AssertionError(str(err))

    self.camera_id = uuid.uuid4()
    self._iter_image_paths: list[Path] = []
    self._iter_current_index: int = 0

__iter__()

Initializes the iterator by collecting all image paths and resetting the index.

Returns: self (the iterator object)

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
def __iter__(self):
    """
    Initializes the iterator by collecting all image paths and resetting the index.

    Returns: self (the iterator object)
    """
    self._iter_image_paths = list(Path(self.data).rglob(self.filter))
    self._iter_current_index = 0
    return self

__next__()

Returns the next ImageSet from the image folder.

Raises:

TypeDescription
StopIteration

When there are no more images to process.

Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
def __next__(self):
    """
    Returns the next ImageSet from the image folder.

    Raises:
        StopIteration: When there are no more images to process.

    Returns: A dictionary containing the ImageSet variable.
    """
    if len(self._iter_image_paths) == 0:
        self.__iter__()

    if self._iter_current_index >= len(self._iter_image_paths):
        raise StopIteration

    image_path = self._iter_image_paths[self._iter_current_index]
    self._iter_current_index += 1
    return self._create_imageset(image_path)