testing¶
Test pipeline configuration package locally¶
When you have created your inference pipeline package, you could go straight on with deploying it to the AI Inference Server. However, we strongly recommend that you test your package before you deploy it. The benefits of local testing are the following:
- You can figure out many potential problems quicker, as you don't have to go through a deployment cycle.
- You can diagnose and troubleshoot issues more easily, as you can inspect artifacts in your development environment.
- You can validate your fixes quicker and move on to further issues that have not surfaced yet due to earlier issues.
- You can easily include the local pipeline tests into the test automation in your build process.
In general, we encourage you to apply state-of-the-art software engineering practices, such as unit testing and test driven development. This means that ideally you already have automated unit or even integration tests in place that make sure that the Python code and the saved models work according to expectations in isolation. This helps you localize errors when you put these pieces together and integrate them as a pipeline configuration package.
AI SDK package simaticai.testing provides two tools for local testing:
- A pipeline validator, that performs a static validation of the package concerning the availability of required Python packages.
- A pipeline runner, that lets you simulate the execution of your pipeline in your Python environment.
Please note that all this functionality applies to pipeline configuration packages, not edge configuration packages. In other words, you must use them before you convert your pipeline configuration package to an edge configuration package using the convert_package function. As the conversion itself is done in an automated way, most potential problems are already present in the package before the conversion, so a verification after conversion would only delay identifying these problems.
For more comprehensive guidance on how to test pipelines before deployment, we recommend you refer to the AI SDK User Manual, especially the chapter concerning local testing of pipeline configuration packages. We also recommend you study the project templates for the AI SDK, which provide concrete code examples that show how to feed a pipeline with different kinds of inputs in a local test.
LocalPipelineRunner ¶
Simulates the execution of a pipeline in a local Python environment.
Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component, each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.
If the caller specifies no path, the working directory is temporary and is removed unless an error occurs. If the caller specifies a working directory with a path argument, the working directory is kept. This behavior can be overridden using boolean parameter cleanup.
Currently, the pipeline runner supports both the current process_input(data: dict) entrypoint signature and the legacy run(data: str) signature. If both entrypoints are present, process_input() takes precedence. Please note however that run() is deprecated, and support for it will be removed in future versions of the pipeline runner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packageZip | path - like | Path to the pipeline configuration package. | required |
path | path - like | Path to the working directory. If unset, a temporary directory is created. | None |
cleanup | bool | If set, the working directory is kept when True, and deleted when False. If unset, a temporary working directory is removed, and an explicit working directory is kept. When an error occurs in a component, the working directory is kept regardless of this value. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
class LocalPipelineRunner:
"""
Simulates the execution of a pipeline in a local Python environment.
Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component,
each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.
If the caller specifies no `path`, the working directory is temporary and is removed unless an error occurs.
If the caller specifies a working directory with a `path` argument, the working directory is kept.
This behavior can be overridden using boolean parameter `cleanup`.
Currently, the pipeline runner supports both the current `process_input(data: dict)` entrypoint signature and the legacy
`run(data: str)` signature. If both entrypoints are present, `process_input()` takes precedence. Please note however that
`run()` is deprecated, and support for it will be removed in future versions of the pipeline runner.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset, a temporary directory is created.
cleanup (bool): If set, the working directory is kept when True, and deleted when False. \
If unset, a temporary working directory is removed, and an explicit working directory is kept. \
When an error occurs in a component, the working directory is kept regardless of this value.
"""
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
"""
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset a temporary directory will be created.
cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
When an error occurs in a component, the working directory will be kept regardless of this value.
"""
self.package_zip: Path = Path(packageZip)
self.path = Path(path).resolve().absolute() if path is not None else None
self.components = {}
self.parameters = {}
self.cleanup = cleanup
self.log_level = loglevel
self.workdir: Path
self.docker = VesselBaseDocker()
self.report_writer = PipelineRunnerReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
def __enter__(self):
self.report_writer.set_package_zip_path(self.package_zip)
timestamp = re.sub(r"[-:]", "", datetime.now(timezone.utc).isoformat(sep="_", timespec="seconds"))
if self.path is not None:
self.workdir = Path(self.path)
self.workdir.mkdir(parents=True, exist_ok=True)
self.cleanup = self.cleanup if self.cleanup is not None else False
else:
self.workdir = Path(tempfile.mkdtemp(prefix=f"LocalPipelineRunner_{timestamp}_"))
self.cleanup = self.cleanup if self.cleanup is not None else True
unzip_components = False
with zipfile.ZipFile(self.package_zip) as zf:
if 'runtime_config.yml' in zf.namelist():
self.workdir = self.workdir / self.package_zip.stem
zf.extractall(path=self.workdir)
unzip_components = True
else:
zf.extractall(path=self.workdir)
self.workdir = self.workdir / zf.namelist()[0]
try:
with open(self.workdir / "pipeline_config.yml") as cf:
config = yaml.load(cf, Loader=yaml.FullLoader)
components = config["dataFlowPipeline"].get("components", [])
for component in components:
component["context"] = None
component["env_dir"] = self.workdir / component['name']
self.components[component["name"]] = component
if unzip_components:
for component in components:
component_zip = f"{component['name']}_{component['version']}.zip"
with zipfile.ZipFile(self.workdir / 'components' / component_zip) as zf:
zf.extractall(path=self.workdir / component["name"])
for parameter in config["dataFlowPipeline"].get("pipelineParameters", {}):
self.parameters[parameter["name"]] = parameter
except Exception:
raise RuntimeError(INVALID_PIPELINE_PACKAGE_MESSAGE)
return self
def __exit__(self, exception_type, value, traceback):
self.update_telemetry_data()
self.report_writer.write_report()
if self.cleanup:
_logger.info("Removing local pipeline runner environment...")
shutil.rmtree(self.workdir.parent)
else:
_logger.info(f"Leaving local pipeline runner environment in its final state at '{self.workdir}'")
def collect_telemetry_data(self) -> dict:
"""
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
dict: A dictionary containing the telemetry data.
"""
telemetry_data = {}
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
if self.components[component]['runtime']['type'] == 'python'))
telemetry_data["pipeline"]["file_extensions"] = []
for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
suffixes = list(set(f.suffix for f in component_dir.rglob("*")
if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
for suffix in suffixes:
if suffix not in telemetry_data["pipeline"]["file_extensions"]:
telemetry_data["pipeline"]["file_extensions"].append(suffix)
return telemetry_data
def update_telemetry_data(self):
"""
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file,
or collecting new telemetry data if the file doesn't exist. It then updates the
"last_test_run" field of the telemetry data with the current timestamp. The updated
telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package
is created with the updated telemetry data. Otherwise, the existing package is
overwritten with the new package containing the updated telemetry data.
"""
_logger.info("Updating telemetry data and the package")
telemetry_path = self.workdir / "telemetry_data.yml"
if telemetry_path.is_file():
telemetry_data = yaml.safe_load(telemetry_path.read_text())
else:
telemetry_data = self.collect_telemetry_data()
telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
telemetry_path.write_text(yaml.dump(telemetry_data))
config_package = False
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
for file in zip_read.namelist():
if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
config_package = True
break
new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
for file in zip_read.namelist():
if TELEMETRY_YAML not in file:
filepath = zip_read.extract(file, path=self.workdir)
zip_write.write(filepath, arcname=file)
else:
zip_write.write(telemetry_path, arcname=file)
if config_package:
shutil.copy(new_zip_path, self.package_zip)
else:
new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
new_sha_path.write_text(calc_sha(new_zip_path))
def _install_requirements(self, component, package_dir, no_index: bool = True):
_logger.info("Installing requirements...")
pip_report_file = component["env_dir"] / "pip_report.json"
package_dir_path = _relative_to(package_dir, component["env_dir"])
pip_report_file_path = _relative_to(pip_report_file, component["env_dir"])
cmd = [
str(component['python_path']),
"-m", "pip", "install",
"--no-warn-script-location",
"-f", f"{package_dir_path}",
"-r", REQUIREMENTS_TXT,
]
if no_index:
cmd += [ "--no-index" ]
use_no_deps = component.get("runtime", {}).get("installTransitiveDependencies", True) is False
if use_no_deps:
cmd += [ "--no-deps" ]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if 0 == result.returncode and use_no_deps:
_logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
"This may lead to runtime errors if the dependencies are not handled manually.")
# generate pip report with a dry run to get the list of installed packages (in case of CPU Python packages)
if 0 == result.returncode and component.get("hwType", None) == "CPU":
cmd += ["--dry-run", "--ignore-installed", "--report", f"{pip_report_file_path}"]
subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
self.report_writer.add_installed_packages(component["name"], pip_report_file)
return result
def _install_from_packages_zip(self, component, package_dir):
result = self._install_requirements(component, package_dir, True)
return 0 == result.returncode
def _install_from_pypi_org(self, component, package_dir):
return self._install_requirements(component, package_dir, False)
def _init_component_venv(self, component: dict):
"""
Creates a virtual environment in which the given component can run.
Args:
component (str): name of the selected component.
"""
_logger.info(f"Creating virtual environment for component '{component['name']}'...")
context_dir = component["env_dir"] / ".venv"
if self.docker.is_vessel:
_logger.info("Creating virtual environment in docker...")
component["context"], component["python_path"] = self.docker._create_venv(component["env_dir"], component["runtime"]["version"])
component["python_path"] = Path(component["python_path"]).resolve()
_logger.info(f"Component context (docker): {component['context']}")
else:
builder = venv.EnvBuilder(with_pip=True, symlinks=False)
builder.create(str(context_dir))
component["context"] = builder.ensure_directories(context_dir)
component['python_path'] = Path(component["context"].env_exe).resolve()
_logger.debug(f"Component python_path: {component['python_path']}")
_logger.info("Upgrading pip...")
cmd = [str(component['python_path']), "-m", "pip", "install", "pip", "--upgrade"]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if result.returncode != 0:
self.cleanup = False
_logger.warning(f"Error upgrading pip:\n{result.stderr}")
cmd = [str(component['python_path']), "-m", "pip", "install", "--upgrade", "setuptools"]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if result.returncode != 0:
self.cleanup = False
_logger.warning(f"Error installing setuptools:\n{result.stderr}")
try:
result = self._install_logmodule(component["python_path"], component["env_dir"])
except Exception as err:
_logger.error(err)
self.cleanup = False
raise RuntimeError("The 'simaticai' Python package is either not installed or does not contain package 'log_module'.") from None
if result.returncode != 0:
self.cleanup = False
raise RuntimeError(f"Error installing log_module:\n{result.stderr}")
req_list = Path(component["env_dir"] / "requirements.list")
req_list.touch(exist_ok=True)
if Path(component["env_dir"] / REQUIREMENTS_TXT).is_file():
dependencies, extra_index, index_url = parse_requirements(component["env_dir"] / REQUIREMENTS_TXT)
requirements = "#".join(dependencies.keys())
req_list.write_text(requirements)
else:
_logger.info(f"'{REQUIREMENTS_TXT}' was not found. No additional dependencies were installed.")
return
package_dir = component["env_dir"] / PYTHON_PACKAGES
package_zip = component["env_dir"] / PYTHON_PACKAGES_ZIP
_logger.info(f"Extracting {PYTHON_PACKAGES_ZIP}")
if package_zip.is_file():
with zipfile.ZipFile(package_zip) as zf:
zf.extractall(path=package_dir.absolute())
else:
_logger.info(f"There is no {PYTHON_PACKAGES_ZIP} to extract.")
package_dir.mkdir(parents=True, exist_ok=True)
success = self._install_from_packages_zip(component, package_dir)
if not success:
msg = f"Warning! Could not install dependencies from {PYTHON_PACKAGES_ZIP}. "
msg += "Trying to install them from pypi.org. The resulting Python environment "
msg += "may be significantly different than the targeted Python environment on the Edge Device!"
_logger.warning(msg)
if self.docker.is_vessel and component["runtime"]["type"] == "python":
raise RuntimeError("The component is running in a docker container. The installation of dependencies from pypi.org is not supported.")
else:
second_install_result = self._install_from_pypi_org(component, package_dir)
if 0 != second_install_result.returncode:
self.cleanup = False
raise RuntimeError(f"Error installing requirements:\n{second_install_result.stderr}")
@staticmethod
def _install_logmodule(python_path, env_dir):
_logger.info("Installing LogModule...")
try:
package_paths = importlib_metadata.files("simaticai")
assert package_paths is not None
logger_wheel = [p for p in package_paths if 'log_module' in str(p)][0].locate()
except Exception:
from importlib.metadata import Distribution
direct_url = Distribution.from_name("simaticai").read_text("direct_url.json")
assert direct_url is not None
direct_url = json.loads(direct_url)['url']
direct_url = direct_url.replace('file://','')
direct_url = Path(direct_url) / 'simaticai' / 'data'
paths = list(direct_url.rglob('*.whl'))
logger_wheel = [p for p in paths if 'log_module' in str(p)][0].resolve()
cmd = [
str(python_path), "-m", "pip",
"install",
"--no-warn-script-location", logger_wheel, "joblib"
]
return subprocess.run(cmd, cwd=env_dir, text=True, stderr=subprocess.PIPE)
def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
"""
Runs the component in its virtual environment with the given input.
This environment is created according to `requirements.txt` in the package.
Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
or a DataStream object which will produce the appropriate input data.
The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.
Args:
name (str): The name of the component to be executed.
data (dict or list): One or more input records for the component.
Returns:
dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.
"""
assert name in self.components, f"Invalid component name: {name}"
component = self.components[name]
assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"
input_payload_path: Path = component["env_dir"] / "input.joblib"
output_payload_path: Path = component["env_dir"] / "output.joblib"
batch_input: bool = component["batch"]["inputBatch"] == "Yes" if component.get("batch") is not None else False
batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False
# Validate and serialize input payload
assert data is not None, f"Can not run component '{name}' without input."
result_is_list = True
if isinstance(data, list):
input_payload = data
elif isinstance(data, DataStream):
input_payload = [item for item in data]
else:
result_is_list = False
input_payload = [data]
validate_payload(input_payload, component["inputType"], batch_input)
joblib.dump(input_payload, input_payload_path)
self.report_writer.set_input_payload_length(name, len(input_payload))
_logger.info(f"Input payload saved as '{input_payload_path}'")
# Assemble command for runnig component
if component['runtime']['type'] == 'python':
# Version check for Python
e_major, e_minor, _, _, _ = sys.version_info
c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))
if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
msg += " Pipeline behavior on AI Inference Server might be different."
_logger.warning(msg)
if component["context"] is None:
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_component.py', component["env_dir"])
req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
args = [
"-m", 'run_component',
"-m", Path(component["entrypoint"]).stem,
"-p", f"{json_params}",
"-r", f"{req_list_path}",
]
else:
# gpuruntime step requires Python environment with onnxruntime installed
if component["context"] is None:
shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])
args = [
"-m", 'run_gpuruntime_component',
"-m", component["entrypoint"],
"-c", "config.pbtxt",
]
args += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self.log_level)
]
# Run the component in the created Python environment
_logger.info(f"Running component '{name}'...")
_logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")
cmd = [str(component['python_path'])] + args
_logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if p.stdout is not None:
for line in p.stdout:
_logger.info(line.strip())
returncode = p.wait()
if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
self.cleanup = False
raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
else:
response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False
# Deserialize and validate output payload
_logger.info(f"Loading output payload from '{output_payload_path}'")
output_payload = joblib.load(output_payload_path)
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
self.report_writer.set_output_payload_length(name, len(output_payload))
if response_wrapped:
output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
else:
output_payload = [output for output in output_payload if output is not None]
validate_payload(output_payload, component["outputType"], batch_output)
if result_is_list:
return output_payload
if 0 < len(output_payload):
return output_payload[0]
return None
def update_parameters(self, parameters: dict):
"""
Validates and updates pipeline parameters.
The elements of the dictionary must match the parameters specified in the pipeline configuration package.
If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
"""
for key,value in parameters.items():
if key not in self.parameters.keys():
if key.upper().startswith("__AI_IS"):
self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
else:
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
for key, value in parameters.items():
self.parameters[key]["defaultValue"] = value
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
"""
Runs all the components sequentially, assuming the output of a component is only consumed by the next.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
and the output is saved as `output.joblib`.
Args:
payload (dict or list): One or more input records for the pipeline.
Returns:
The output of the last component.
"""
for name in self.components.keys():
payload = self.run_component(name, payload)
return payload
__init__(packageZip, path=None, cleanup=None, loglevel=logging.INFO) ¶
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packageZip | path - like | Path to the pipeline configuration package. | required |
path | path - like | Path to the working directory. If unset a temporary directory will be created. | None |
cleanup | bool | If set, the working directory will be kept when True, and deleted when False. If unset, a temporary working directory will be removed, and an explicit working directory will be kept. When an error occurs in a component, the working directory will be kept regardless of this value. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
"""
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset a temporary directory will be created.
cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
When an error occurs in a component, the working directory will be kept regardless of this value.
"""
self.package_zip: Path = Path(packageZip)
self.path = Path(path).resolve().absolute() if path is not None else None
self.components = {}
self.parameters = {}
self.cleanup = cleanup
self.log_level = loglevel
self.workdir: Path
self.docker = VesselBaseDocker()
self.report_writer = PipelineRunnerReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
collect_telemetry_data() ¶
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
dict | dict | A dictionary containing the telemetry data. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def collect_telemetry_data(self) -> dict:
"""
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
dict: A dictionary containing the telemetry data.
"""
telemetry_data = {}
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
if self.components[component]['runtime']['type'] == 'python'))
telemetry_data["pipeline"]["file_extensions"] = []
for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
suffixes = list(set(f.suffix for f in component_dir.rglob("*")
if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
for suffix in suffixes:
if suffix not in telemetry_data["pipeline"]["file_extensions"]:
telemetry_data["pipeline"]["file_extensions"].append(suffix)
return telemetry_data
update_telemetry_data() ¶
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file, or collecting new telemetry data if the file doesn't exist. It then updates the "last_test_run" field of the telemetry data with the current timestamp. The updated telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package is created with the updated telemetry data. Otherwise, the existing package is overwritten with the new package containing the updated telemetry data.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.pydef update_telemetry_data(self):
"""
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file,
or collecting new telemetry data if the file doesn't exist. It then updates the
"last_test_run" field of the telemetry data with the current timestamp. The updated
telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package
is created with the updated telemetry data. Otherwise, the existing package is
overwritten with the new package containing the updated telemetry data.
"""
_logger.info("Updating telemetry data and the package")
telemetry_path = self.workdir / "telemetry_data.yml"
if telemetry_path.is_file():
telemetry_data = yaml.safe_load(telemetry_path.read_text())
else:
telemetry_data = self.collect_telemetry_data()
telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
telemetry_path.write_text(yaml.dump(telemetry_data))
config_package = False
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
for file in zip_read.namelist():
if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
config_package = True
break
new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
for file in zip_read.namelist():
if TELEMETRY_YAML not in file:
filepath = zip_read.extract(file, path=self.workdir)
zip_write.write(filepath, arcname=file)
else:
zip_write.write(telemetry_path, arcname=file)
if config_package:
shutil.copy(new_zip_path, self.package_zip)
else:
new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
new_sha_path.write_text(calc_sha(new_zip_path))
run_component(name, data) ¶
Runs the component in its virtual environment with the given input. This environment is created according to requirements.txt in the package. Additionally 'joblib' and the mock log_module is automatically installed in this virtual environment. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries, or a DataStream object which will produce the appropriate input data. The supplied input data is saved as inputs.joblib in the component runtime directory, and the output is saved as output.joblib.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The name of the component to be executed. | required |
data | dict or list | One or more input records for the component. | required |
Returns:
| Type | Description |
|---|---|
Optional[Union[dict, list]] | dict / list: A list of dictionaries for outputs if there were no errors and field |
Optional[Union[dict, list]] | If the input was a single dict, then a single dict (the first item of the list) or None if there is no output. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
"""
Runs the component in its virtual environment with the given input.
This environment is created according to `requirements.txt` in the package.
Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
or a DataStream object which will produce the appropriate input data.
The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.
Args:
name (str): The name of the component to be executed.
data (dict or list): One or more input records for the component.
Returns:
dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.
"""
assert name in self.components, f"Invalid component name: {name}"
component = self.components[name]
assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"
input_payload_path: Path = component["env_dir"] / "input.joblib"
output_payload_path: Path = component["env_dir"] / "output.joblib"
batch_input: bool = component["batch"]["inputBatch"] == "Yes" if component.get("batch") is not None else False
batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False
# Validate and serialize input payload
assert data is not None, f"Can not run component '{name}' without input."
result_is_list = True
if isinstance(data, list):
input_payload = data
elif isinstance(data, DataStream):
input_payload = [item for item in data]
else:
result_is_list = False
input_payload = [data]
validate_payload(input_payload, component["inputType"], batch_input)
joblib.dump(input_payload, input_payload_path)
self.report_writer.set_input_payload_length(name, len(input_payload))
_logger.info(f"Input payload saved as '{input_payload_path}'")
# Assemble command for runnig component
if component['runtime']['type'] == 'python':
# Version check for Python
e_major, e_minor, _, _, _ = sys.version_info
c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))
if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
msg += " Pipeline behavior on AI Inference Server might be different."
_logger.warning(msg)
if component["context"] is None:
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_component.py', component["env_dir"])
req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
args = [
"-m", 'run_component',
"-m", Path(component["entrypoint"]).stem,
"-p", f"{json_params}",
"-r", f"{req_list_path}",
]
else:
# gpuruntime step requires Python environment with onnxruntime installed
if component["context"] is None:
shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])
args = [
"-m", 'run_gpuruntime_component',
"-m", component["entrypoint"],
"-c", "config.pbtxt",
]
args += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self.log_level)
]
# Run the component in the created Python environment
_logger.info(f"Running component '{name}'...")
_logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")
cmd = [str(component['python_path'])] + args
_logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if p.stdout is not None:
for line in p.stdout:
_logger.info(line.strip())
returncode = p.wait()
if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
self.cleanup = False
raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
else:
response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False
# Deserialize and validate output payload
_logger.info(f"Loading output payload from '{output_payload_path}'")
output_payload = joblib.load(output_payload_path)
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
self.report_writer.set_output_payload_length(name, len(output_payload))
if response_wrapped:
output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
else:
output_payload = [output for output in output_payload if output is not None]
validate_payload(output_payload, component["outputType"], batch_output)
if result_is_list:
return output_payload
if 0 < len(output_payload):
return output_payload[0]
return None
update_parameters(parameters) ¶
Validates and updates pipeline parameters. The elements of the dictionary must match the parameters specified in the pipeline configuration package. If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_parameters(self, parameters: dict):
"""
Validates and updates pipeline parameters.
The elements of the dictionary must match the parameters specified in the pipeline configuration package.
If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
"""
for key,value in parameters.items():
if key not in self.parameters.keys():
if key.upper().startswith("__AI_IS"):
self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
else:
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
for key, value in parameters.items():
self.parameters[key]["defaultValue"] = value
run_pipeline(payload={}) ¶
Runs all the components sequentially, assuming the output of a component is only consumed by the next. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries. For each component the supplied input data is saved as input.joblib in the component runtime directory, and the output is saved as output.joblib.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload | dict or list | One or more input records for the pipeline. | {} |
Returns:
| Type | Description |
|---|---|
Optional[Union[dict, list]] | The output of the last component. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
"""
Runs all the components sequentially, assuming the output of a component is only consumed by the next.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
and the output is saved as `output.joblib`.
Args:
payload (dict or list): One or more input records for the pipeline.
Returns:
The output of the last component.
"""
for name in self.components.keys():
payload = self.run_component(name, payload)
return payload
ComponentRunner ¶
Class to run a Pipeline Component in a virtual environment. Supported Component Types: - PythonComponent - GPURuntimeComponent
Args: component: PythonComponent or GPURuntimeComponent workdir: Path to the directory where the component should be run. If None, the current working directory is used. cleanup: If True, the workdir is deleted after the context manager exits.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pyclass ComponentRunner():
"""
Class to run a Pipeline Component in a virtual environment.
Supported Component Types:
- PythonComponent
- GPURuntimeComponent
Args:
component: PythonComponent or GPURuntimeComponent
workdir: Path to the directory where the component should be run. If None, the current working directory is used.
cleanup: If True, the workdir is deleted after the context manager exits.
"""
def __init__(self, component, workdir=None, cleanup=False):
self.component = component
self.parameters = { '__AI_IS_IMAGE_SET_VISUALIZATION': False }
self.cleanup = cleanup
self._logger = self._set_logger()
self._create_workdir(component, workdir)
self._copy_resources()
self._create_venv()
self._install_requirements()
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
if exception_type is not None:
self._logger.error(f"Exception occurred: {exception_type.__name__}: {exception_value}")
return False
if self.cleanup:
shutil.rmtree(self.workdir)
def _set_logger(self):
"""
Set the logger for the ComponentRunner.
"""
logger = logging.getLogger(__name__)
log_level = os.environ.get("loglevel", "INFO").upper()
logger.setLevel(log_level)
handler = logging.StreamHandler()
handler.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _create_workdir(self, component, workdir):
"""
Create a workdir for the component.
"""
if workdir is None:
self.workdir = Path.cwd() / component.name
else:
self.workdir = Path(workdir).resolve().absolute() / component.name
self.workdir.mkdir(parents=True, exist_ok=True)
self._logger.info(f"Created workdir: {self.workdir}")
def _copy_resources(self):
"""
Copy the required resources of the component to the workdir.
In case of a PythonComponent, also creates the dependencies into requirements.txt.
In case of a GPURuntimeComponent, also creates a requirements.txt with the required onnx and onnxruntime dependencies.
Also adds the runner script to the workdir.
In case of the source folder same as the workdir, the resources are not copied,
so any changes in source will affect the workdir.
"""
if isinstance(self.component, PythonComponent):
same_resources = []
for from_path in self.component.resources.keys():
try:
to_path = self.workdir / self.component.resources[from_path]
to_path.mkdir(parents=True, exist_ok=True)
shutil.copy(from_path, to_path)
except shutil.SameFileError:
same_resources.append(from_path.name)
if same_resources:
self._logger.info("Resources are already in workdir, which are not copied:")
for resource in same_resources:
self._logger.debug(f" - {resource}")
# download manually added Python packages and requirements.txt
self.component.python_dependencies.save(self.workdir)
shutil.copy(PYTHON_RUNNER_PATH, self.workdir)
else:
model_dir = Path(self.workdir / "1")
model_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(self.component.model_path, model_dir / "model.onnx")
Path(self.workdir / "config.pbtxt").write_text(f"{self.component.auto_config}")
shutil.copy(GPU_RUNNER_PATH, self.workdir / "run_component.py")
shutil.copy(GPU_REQUIREMENTS_PATH, self.workdir / "requirements.txt")
shutil.copy(GPU_CONFIG_PATH, self.workdir)
self._logger.info("Resources are copied into the workdir.")
def _create_venv(self):
"""
Create a Python virtual environment in the workdir.
The created virtual environment is stored in the context_dir and
has the same version as the current Python interpreter.
"""
context_dir = self.workdir / ".venv"
builder = venv.EnvBuilder(with_pip=True, symlinks=False)
builder.create(context_dir)
self.context = builder.ensure_directories(context_dir)
self.python_path = Path(self.context.env_exe).resolve()
self._logger.info(f"Python virtualenv created in folder '{self.context.env_dir}'")
def _install_requirements(self):
"""
Installs the required Python dependencies.
"""
cmd = [str(self.python_path), '-m', 'pip', 'install', 'joblib', '-r', 'requirements.txt']
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
result_lines = result.stdout.decode().split("\n")
installed_message = " ".join([line for line in result_lines if "Successfully installed" in line])
self._logger.info(installed_message)
def set_parameters(self, parameter_name, parameter_value):
"""
Set the parameters for the component.
"""
self.parameters[parameter_name] = parameter_value
def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
"""
Run the component with the input payload.
Parameters:
input_payload: Input payload for the component.
Returns:
Output payload from the component.
Side Effects:
- The input payload is saved in the workdir.
- The output payload is loaded from the workdir.
"""
input_payload_path: Path = self.workdir / "input.joblib"
output_payload_path: Path = self.workdir / "output.joblib"
batch_input: bool = self.component.batch.inputBatch
if isinstance(input_payload, DataStream):
input_payload = [item for item in input_payload]
elif isinstance(input_payload, dict):
input_payload = [input_payload]
input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
self._validate_payload(input_payload, input_variables, batch_input)
joblib.dump(input_payload, input_payload_path)
cmd = self._create_command(input_payload_path, output_payload_path)
self._logger.info("Running command: " + " ".join(cmd))
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
self._logger.debug(result.stderr.decode())
output_payload = joblib.load(output_payload_path)
batch_output: bool = self.component.batch.outputBatch
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
output_payload = [output for output in output_payload if output is not None]
output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
if isinstance(self.component, PythonComponent):
output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
self._validate_payload(output_payload, output_variables, batch_output)
return output_payload
def _check_instance(self, element, element_name: str, instance):
if not isinstance(element, instance):
self._logger.error(f"{element_name} must be an instance of {instance.__name__}")
raise ValueError(f"{element_name} must be an instance of {instance.__name__}")
def _check_variable_types(self, variable: dict, value):
if variable["type"] == "String":
self._check_instance(value, "String value", str)
if variable["type"] == "StringArray":
self._check_instance(value, "StringArray value", list)
for i in value:
self._check_instance(i, "StringArray item", str)
if variable["type"] == "Object":
self._check_instance(value, "Object value", dict)
values = list(value.values())
if len(values) != 2:
self._logger.error("Object value must have exactly 2 items")
raise ValueError("Object value must have exactly 2 items")
ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
if not ok:
self._logger.error("Object value must have exactly one 'str' and one 'bytes' field")
raise ValueError("Object value must have exactly one 'str' and one 'bytes' field")
def _validate_payload(self, payload: list, variables: list, batch: bool):
self._check_instance(payload, "Payload", list)
for payload_element in payload:
if batch:
self._check_instance(payload_element, "Batch payload element", list)
else:
payload_element = [payload_element]
for item in payload_element:
self._validate_payload_item(item, variables)
def _validate_payload_item(self, item: dict, variables: list):
self._check_instance(item, "Payload item", dict)
for variable in variables:
name = variable["name"]
value = item.get(name, None)
if value is None:
self._logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
continue
self._check_variable_types(variable, value)
payload_names = set(item.keys())
variable_names = { variable["name"] for variable in variables }
variable_names.add('timestamp')
extra_variables = payload_names - variable_names
if len(extra_variables):
self._logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables}")
def _create_command(self, input_payload_path, output_payload_path):
"""
Create the command to run the component.
"""
cmd = [str(self.python_path), "-m", 'run_component']
if isinstance(self.component, PythonComponent):
cmd += [
"-m", Path(self.component.entrypoint).stem,
"-p", json.dumps(self.parameters)
]
if isinstance(self.component, GPURuntimeComponent):
model_path = Path(self.workdir / "1" / "model.onnx").absolute().resolve()
config_path = Path(self.workdir / "config.pbtxt").absolute().resolve()
cmd += [
"-m", str(model_path),
"-c", str(config_path),
]
cmd += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self._logger.getEffectiveLevel())
]
return cmd
set_parameters(parameter_name, parameter_value) ¶
Set the parameters for the component.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pydef set_parameters(self, parameter_name, parameter_value):
"""
Set the parameters for the component.
"""
self.parameters[parameter_name] = parameter_value
run(input_payload) ¶
Run the component with the input payload.
Parameters: input_payload: Input payload for the component.
Returns: Output payload from the component.
Side Effects: - The input payload is saved in the workdir. - The output payload is loaded from the workdir.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pydef run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
"""
Run the component with the input payload.
Parameters:
input_payload: Input payload for the component.
Returns:
Output payload from the component.
Side Effects:
- The input payload is saved in the workdir.
- The output payload is loaded from the workdir.
"""
input_payload_path: Path = self.workdir / "input.joblib"
output_payload_path: Path = self.workdir / "output.joblib"
batch_input: bool = self.component.batch.inputBatch
if isinstance(input_payload, DataStream):
input_payload = [item for item in input_payload]
elif isinstance(input_payload, dict):
input_payload = [input_payload]
input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
self._validate_payload(input_payload, input_variables, batch_input)
joblib.dump(input_payload, input_payload_path)
cmd = self._create_command(input_payload_path, output_payload_path)
self._logger.info("Running command: " + " ".join(cmd))
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
self._logger.debug(result.stderr.decode())
output_payload = joblib.load(output_payload_path)
batch_output: bool = self.component.batch.outputBatch
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
output_payload = [output for output in output_payload if output is not None]
output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
if isinstance(self.component, PythonComponent):
output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
self._validate_payload(output_payload, output_variables, batch_output)
return output_payload
ComponentRunner ¶
Class to run a Pipeline Component in a virtual environment. Supported Component Types: - PythonComponent - GPURuntimeComponent
Args: component: PythonComponent or GPURuntimeComponent workdir: Path to the directory where the component should be run. If None, the current working directory is used. cleanup: If True, the workdir is deleted after the context manager exits.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pyclass ComponentRunner():
"""
Class to run a Pipeline Component in a virtual environment.
Supported Component Types:
- PythonComponent
- GPURuntimeComponent
Args:
component: PythonComponent or GPURuntimeComponent
workdir: Path to the directory where the component should be run. If None, the current working directory is used.
cleanup: If True, the workdir is deleted after the context manager exits.
"""
def __init__(self, component, workdir=None, cleanup=False):
self.component = component
self.parameters = { '__AI_IS_IMAGE_SET_VISUALIZATION': False }
self.cleanup = cleanup
self._logger = self._set_logger()
self._create_workdir(component, workdir)
self._copy_resources()
self._create_venv()
self._install_requirements()
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
if exception_type is not None:
self._logger.error(f"Exception occurred: {exception_type.__name__}: {exception_value}")
return False
if self.cleanup:
shutil.rmtree(self.workdir)
def _set_logger(self):
"""
Set the logger for the ComponentRunner.
"""
logger = logging.getLogger(__name__)
log_level = os.environ.get("loglevel", "INFO").upper()
logger.setLevel(log_level)
handler = logging.StreamHandler()
handler.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _create_workdir(self, component, workdir):
"""
Create a workdir for the component.
"""
if workdir is None:
self.workdir = Path.cwd() / component.name
else:
self.workdir = Path(workdir).resolve().absolute() / component.name
self.workdir.mkdir(parents=True, exist_ok=True)
self._logger.info(f"Created workdir: {self.workdir}")
def _copy_resources(self):
"""
Copy the required resources of the component to the workdir.
In case of a PythonComponent, also creates the dependencies into requirements.txt.
In case of a GPURuntimeComponent, also creates a requirements.txt with the required onnx and onnxruntime dependencies.
Also adds the runner script to the workdir.
In case of the source folder same as the workdir, the resources are not copied,
so any changes in source will affect the workdir.
"""
if isinstance(self.component, PythonComponent):
same_resources = []
for from_path in self.component.resources.keys():
try:
to_path = self.workdir / self.component.resources[from_path]
to_path.mkdir(parents=True, exist_ok=True)
shutil.copy(from_path, to_path)
except shutil.SameFileError:
same_resources.append(from_path.name)
if same_resources:
self._logger.info("Resources are already in workdir, which are not copied:")
for resource in same_resources:
self._logger.debug(f" - {resource}")
# download manually added Python packages and requirements.txt
self.component.python_dependencies.save(self.workdir)
shutil.copy(PYTHON_RUNNER_PATH, self.workdir)
else:
model_dir = Path(self.workdir / "1")
model_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(self.component.model_path, model_dir / "model.onnx")
Path(self.workdir / "config.pbtxt").write_text(f"{self.component.auto_config}")
shutil.copy(GPU_RUNNER_PATH, self.workdir / "run_component.py")
shutil.copy(GPU_REQUIREMENTS_PATH, self.workdir / "requirements.txt")
shutil.copy(GPU_CONFIG_PATH, self.workdir)
self._logger.info("Resources are copied into the workdir.")
def _create_venv(self):
"""
Create a Python virtual environment in the workdir.
The created virtual environment is stored in the context_dir and
has the same version as the current Python interpreter.
"""
context_dir = self.workdir / ".venv"
builder = venv.EnvBuilder(with_pip=True, symlinks=False)
builder.create(context_dir)
self.context = builder.ensure_directories(context_dir)
self.python_path = Path(self.context.env_exe).resolve()
self._logger.info(f"Python virtualenv created in folder '{self.context.env_dir}'")
def _install_requirements(self):
"""
Installs the required Python dependencies.
"""
cmd = [str(self.python_path), '-m', 'pip', 'install', 'joblib', '-r', 'requirements.txt']
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
result_lines = result.stdout.decode().split("\n")
installed_message = " ".join([line for line in result_lines if "Successfully installed" in line])
self._logger.info(installed_message)
def set_parameters(self, parameter_name, parameter_value):
"""
Set the parameters for the component.
"""
self.parameters[parameter_name] = parameter_value
def run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
"""
Run the component with the input payload.
Parameters:
input_payload: Input payload for the component.
Returns:
Output payload from the component.
Side Effects:
- The input payload is saved in the workdir.
- The output payload is loaded from the workdir.
"""
input_payload_path: Path = self.workdir / "input.joblib"
output_payload_path: Path = self.workdir / "output.joblib"
batch_input: bool = self.component.batch.inputBatch
if isinstance(input_payload, DataStream):
input_payload = [item for item in input_payload]
elif isinstance(input_payload, dict):
input_payload = [input_payload]
input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
self._validate_payload(input_payload, input_variables, batch_input)
joblib.dump(input_payload, input_payload_path)
cmd = self._create_command(input_payload_path, output_payload_path)
self._logger.info("Running command: " + " ".join(cmd))
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
self._logger.debug(result.stderr.decode())
output_payload = joblib.load(output_payload_path)
batch_output: bool = self.component.batch.outputBatch
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
output_payload = [output for output in output_payload if output is not None]
output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
if isinstance(self.component, PythonComponent):
output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
self._validate_payload(output_payload, output_variables, batch_output)
return output_payload
def _check_instance(self, element, element_name: str, instance):
if not isinstance(element, instance):
self._logger.error(f"{element_name} must be an instance of {instance.__name__}")
raise ValueError(f"{element_name} must be an instance of {instance.__name__}")
def _check_variable_types(self, variable: dict, value):
if variable["type"] == "String":
self._check_instance(value, "String value", str)
if variable["type"] == "StringArray":
self._check_instance(value, "StringArray value", list)
for i in value:
self._check_instance(i, "StringArray item", str)
if variable["type"] == "Object":
self._check_instance(value, "Object value", dict)
values = list(value.values())
if len(values) != 2:
self._logger.error("Object value must have exactly 2 items")
raise ValueError("Object value must have exactly 2 items")
ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
if not ok:
self._logger.error("Object value must have exactly one 'str' and one 'bytes' field")
raise ValueError("Object value must have exactly one 'str' and one 'bytes' field")
def _validate_payload(self, payload: list, variables: list, batch: bool):
self._check_instance(payload, "Payload", list)
for payload_element in payload:
if batch:
self._check_instance(payload_element, "Batch payload element", list)
else:
payload_element = [payload_element]
for item in payload_element:
self._validate_payload_item(item, variables)
def _validate_payload_item(self, item: dict, variables: list):
self._check_instance(item, "Payload item", dict)
for variable in variables:
name = variable["name"]
value = item.get(name, None)
if value is None:
self._logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
continue
self._check_variable_types(variable, value)
payload_names = set(item.keys())
variable_names = { variable["name"] for variable in variables }
variable_names.add('timestamp')
extra_variables = payload_names - variable_names
if len(extra_variables):
self._logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables}")
def _create_command(self, input_payload_path, output_payload_path):
"""
Create the command to run the component.
"""
cmd = [str(self.python_path), "-m", 'run_component']
if isinstance(self.component, PythonComponent):
cmd += [
"-m", Path(self.component.entrypoint).stem,
"-p", json.dumps(self.parameters)
]
if isinstance(self.component, GPURuntimeComponent):
model_path = Path(self.workdir / "1" / "model.onnx").absolute().resolve()
config_path = Path(self.workdir / "config.pbtxt").absolute().resolve()
cmd += [
"-m", str(model_path),
"-c", str(config_path),
]
cmd += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self._logger.getEffectiveLevel())
]
return cmd
set_parameters(parameter_name, parameter_value) ¶
Set the parameters for the component.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pydef set_parameters(self, parameter_name, parameter_value):
"""
Set the parameters for the component.
"""
self.parameters[parameter_name] = parameter_value
run(input_payload) ¶
Run the component with the input payload.
Parameters: input_payload: Input payload for the component.
Returns: Output payload from the component.
Side Effects: - The input payload is saved in the workdir. - The output payload is loaded from the workdir.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/component_runner.pydef run(self, input_payload: Optional[Union[dict, list]]) -> Optional[Union[dict, list]]:
"""
Run the component with the input payload.
Parameters:
input_payload: Input payload for the component.
Returns:
Output payload from the component.
Side Effects:
- The input payload is saved in the workdir.
- The output payload is loaded from the workdir.
"""
input_payload_path: Path = self.workdir / "input.joblib"
output_payload_path: Path = self.workdir / "output.joblib"
batch_input: bool = self.component.batch.inputBatch
if isinstance(input_payload, DataStream):
input_payload = [item for item in input_payload]
elif isinstance(input_payload, dict):
input_payload = [input_payload]
input_variables = [{'name': name, 'type': c_input['type']} for name, c_input in self.component.inputs.items()]
self._validate_payload(input_payload, input_variables, batch_input)
joblib.dump(input_payload, input_payload_path)
cmd = self._create_command(input_payload_path, output_payload_path)
self._logger.info("Running command: " + " ".join(cmd))
result = subprocess.run(cmd, cwd=self.workdir, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
self._logger.debug(result.stderr.decode())
output_payload = joblib.load(output_payload_path)
batch_output: bool = self.component.batch.outputBatch
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
output_payload = [output for output in output_payload if output is not None]
output_variables = [{'name': name, 'type': c_output['type']} for name, c_output in self.component.outputs.items()]
if isinstance(self.component, PythonComponent):
output_variables += [{'name': name, 'type': 'String'} for name in self.component.metrics]
self._validate_payload(output_payload, output_variables, batch_output)
return output_payload
DataStream ¶
Base class for datastream generatorsSource code in
docs/industrial-ai-suite/sdk/simaticai/testing/data_stream.pyclass DataStream:
"""
Base class for datastream generators
"""
def __iter__(self):
"""
Empty generator method for child classess to implement.
"""
raise NotImplementedError('Child classes must implement this method.')
__iter__() ¶
Empty generator method for child classess to implement.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/data_stream.pydef __iter__(self):
"""
Empty generator method for child classess to implement.
"""
raise NotImplementedError('Child classes must implement this method.')
This module provides a base class for creating and managing Docker containers with Python virtual environments. It is designed to be used in a context where Docker is available and the user has the necessary permissions to create and manage containers.
VesselBaseDocker ¶
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/docker_venv.py
class VesselBaseDocker:
is_vessel: bool = False # True when running in a Docker container
python_3_11: str = 'python'
python_3_12: str = 'python'
def __init__(self):
if Path("/.ai_sdk_docker").is_file():
self.is_vessel = True
self.python_default = 'python3.11'
self.python_3_11 = 'python3.11'
self.python_3_12 = 'python3.12'
def _create_venv(self, path: str, version: str):
"""
Creates a virtual environment in which the given component can run.
Args:
path (str): Path to the virtual environment.
version (str): Python version to use for the virtual environment.
"""
context_dir = Path(path) / ".venv"
python_path = context_dir / "bin" / "python"
match version:
case "3.11":
python = self.python_3_11
case "3.12":
python = self.python_3_12
case _:
python = self.python_default
_logger.info(f"Creating virtual environment for Python '{version}' in '{path}/.venv' with Python {python}...")
command = [
python,
"-m", "venv", str(context_dir),
"--copies"
]
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode != 0:
_logger.error(f"Error creating virtual environment: {result.stderr}")
raise RuntimeError(f"Error creating virtual environment: {result.stderr}")
return context_dir.resolve(), python_path.resolve()
A pipeline runner that lets you simulate the execution of a pipeline in a local Python environment.
It can be used to locally mimic the behavior of the AI Inference Server concerning loading and running inference pipelines. This is a quick and easy way to find programming or configuration errors before deploying the package. The local pipeline runner also lets you exercise your pipeline component by component. In other words, you can feed single components with inputs and verify the output produced.
LocalPipelineRunner ¶
Simulates the execution of a pipeline in a local Python environment.
Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component, each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.
If the caller specifies no path, the working directory is temporary and is removed unless an error occurs. If the caller specifies a working directory with a path argument, the working directory is kept. This behavior can be overridden using boolean parameter cleanup.
Currently, the pipeline runner supports both the current process_input(data: dict) entrypoint signature and the legacy run(data: str) signature. If both entrypoints are present, process_input() takes precedence. Please note however that run() is deprecated, and support for it will be removed in future versions of the pipeline runner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packageZip | path - like | Path to the pipeline configuration package. | required |
path | path - like | Path to the working directory. If unset, a temporary directory is created. | None |
cleanup | bool | If set, the working directory is kept when True, and deleted when False. If unset, a temporary working directory is removed, and an explicit working directory is kept. When an error occurs in a component, the working directory is kept regardless of this value. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
class LocalPipelineRunner:
"""
Simulates the execution of a pipeline in a local Python environment.
Restriction: only linear pipelines are supported where the pipeline input variables are only used by one component,
each component uses only the outputs of the previous components, and the pipeline output only consists of variables from the last component.
If the caller specifies no `path`, the working directory is temporary and is removed unless an error occurs.
If the caller specifies a working directory with a `path` argument, the working directory is kept.
This behavior can be overridden using boolean parameter `cleanup`.
Currently, the pipeline runner supports both the current `process_input(data: dict)` entrypoint signature and the legacy
`run(data: str)` signature. If both entrypoints are present, `process_input()` takes precedence. Please note however that
`run()` is deprecated, and support for it will be removed in future versions of the pipeline runner.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset, a temporary directory is created.
cleanup (bool): If set, the working directory is kept when True, and deleted when False. \
If unset, a temporary working directory is removed, and an explicit working directory is kept. \
When an error occurs in a component, the working directory is kept regardless of this value.
"""
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
"""
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset a temporary directory will be created.
cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
When an error occurs in a component, the working directory will be kept regardless of this value.
"""
self.package_zip: Path = Path(packageZip)
self.path = Path(path).resolve().absolute() if path is not None else None
self.components = {}
self.parameters = {}
self.cleanup = cleanup
self.log_level = loglevel
self.workdir: Path
self.docker = VesselBaseDocker()
self.report_writer = PipelineRunnerReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
def __enter__(self):
self.report_writer.set_package_zip_path(self.package_zip)
timestamp = re.sub(r"[-:]", "", datetime.now(timezone.utc).isoformat(sep="_", timespec="seconds"))
if self.path is not None:
self.workdir = Path(self.path)
self.workdir.mkdir(parents=True, exist_ok=True)
self.cleanup = self.cleanup if self.cleanup is not None else False
else:
self.workdir = Path(tempfile.mkdtemp(prefix=f"LocalPipelineRunner_{timestamp}_"))
self.cleanup = self.cleanup if self.cleanup is not None else True
unzip_components = False
with zipfile.ZipFile(self.package_zip) as zf:
if 'runtime_config.yml' in zf.namelist():
self.workdir = self.workdir / self.package_zip.stem
zf.extractall(path=self.workdir)
unzip_components = True
else:
zf.extractall(path=self.workdir)
self.workdir = self.workdir / zf.namelist()[0]
try:
with open(self.workdir / "pipeline_config.yml") as cf:
config = yaml.load(cf, Loader=yaml.FullLoader)
components = config["dataFlowPipeline"].get("components", [])
for component in components:
component["context"] = None
component["env_dir"] = self.workdir / component['name']
self.components[component["name"]] = component
if unzip_components:
for component in components:
component_zip = f"{component['name']}_{component['version']}.zip"
with zipfile.ZipFile(self.workdir / 'components' / component_zip) as zf:
zf.extractall(path=self.workdir / component["name"])
for parameter in config["dataFlowPipeline"].get("pipelineParameters", {}):
self.parameters[parameter["name"]] = parameter
except Exception:
raise RuntimeError(INVALID_PIPELINE_PACKAGE_MESSAGE)
return self
def __exit__(self, exception_type, value, traceback):
self.update_telemetry_data()
self.report_writer.write_report()
if self.cleanup:
_logger.info("Removing local pipeline runner environment...")
shutil.rmtree(self.workdir.parent)
else:
_logger.info(f"Leaving local pipeline runner environment in its final state at '{self.workdir}'")
def collect_telemetry_data(self) -> dict:
"""
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
dict: A dictionary containing the telemetry data.
"""
telemetry_data = {}
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
if self.components[component]['runtime']['type'] == 'python'))
telemetry_data["pipeline"]["file_extensions"] = []
for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
suffixes = list(set(f.suffix for f in component_dir.rglob("*")
if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
for suffix in suffixes:
if suffix not in telemetry_data["pipeline"]["file_extensions"]:
telemetry_data["pipeline"]["file_extensions"].append(suffix)
return telemetry_data
def update_telemetry_data(self):
"""
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file,
or collecting new telemetry data if the file doesn't exist. It then updates the
"last_test_run" field of the telemetry data with the current timestamp. The updated
telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package
is created with the updated telemetry data. Otherwise, the existing package is
overwritten with the new package containing the updated telemetry data.
"""
_logger.info("Updating telemetry data and the package")
telemetry_path = self.workdir / "telemetry_data.yml"
if telemetry_path.is_file():
telemetry_data = yaml.safe_load(telemetry_path.read_text())
else:
telemetry_data = self.collect_telemetry_data()
telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
telemetry_path.write_text(yaml.dump(telemetry_data))
config_package = False
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
for file in zip_read.namelist():
if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
config_package = True
break
new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
for file in zip_read.namelist():
if TELEMETRY_YAML not in file:
filepath = zip_read.extract(file, path=self.workdir)
zip_write.write(filepath, arcname=file)
else:
zip_write.write(telemetry_path, arcname=file)
if config_package:
shutil.copy(new_zip_path, self.package_zip)
else:
new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
new_sha_path.write_text(calc_sha(new_zip_path))
def _install_requirements(self, component, package_dir, no_index: bool = True):
_logger.info("Installing requirements...")
pip_report_file = component["env_dir"] / "pip_report.json"
package_dir_path = _relative_to(package_dir, component["env_dir"])
pip_report_file_path = _relative_to(pip_report_file, component["env_dir"])
cmd = [
str(component['python_path']),
"-m", "pip", "install",
"--no-warn-script-location",
"-f", f"{package_dir_path}",
"-r", REQUIREMENTS_TXT,
]
if no_index:
cmd += [ "--no-index" ]
use_no_deps = component.get("runtime", {}).get("installTransitiveDependencies", True) is False
if use_no_deps:
cmd += [ "--no-deps" ]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if 0 == result.returncode and use_no_deps:
_logger.warning(f"Component `{component['name']}` is requested to be installed without transitive dependencies. "
"This may lead to runtime errors if the dependencies are not handled manually.")
# generate pip report with a dry run to get the list of installed packages (in case of CPU Python packages)
if 0 == result.returncode and component.get("hwType", None) == "CPU":
cmd += ["--dry-run", "--ignore-installed", "--report", f"{pip_report_file_path}"]
subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
self.report_writer.add_installed_packages(component["name"], pip_report_file)
return result
def _install_from_packages_zip(self, component, package_dir):
result = self._install_requirements(component, package_dir, True)
return 0 == result.returncode
def _install_from_pypi_org(self, component, package_dir):
return self._install_requirements(component, package_dir, False)
def _init_component_venv(self, component: dict):
"""
Creates a virtual environment in which the given component can run.
Args:
component (str): name of the selected component.
"""
_logger.info(f"Creating virtual environment for component '{component['name']}'...")
context_dir = component["env_dir"] / ".venv"
if self.docker.is_vessel:
_logger.info("Creating virtual environment in docker...")
component["context"], component["python_path"] = self.docker._create_venv(component["env_dir"], component["runtime"]["version"])
component["python_path"] = Path(component["python_path"]).resolve()
_logger.info(f"Component context (docker): {component['context']}")
else:
builder = venv.EnvBuilder(with_pip=True, symlinks=False)
builder.create(str(context_dir))
component["context"] = builder.ensure_directories(context_dir)
component['python_path'] = Path(component["context"].env_exe).resolve()
_logger.debug(f"Component python_path: {component['python_path']}")
_logger.info("Upgrading pip...")
cmd = [str(component['python_path']), "-m", "pip", "install", "pip", "--upgrade"]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if result.returncode != 0:
self.cleanup = False
_logger.warning(f"Error upgrading pip:\n{result.stderr}")
cmd = [str(component['python_path']), "-m", "pip", "install", "--upgrade", "setuptools"]
result = subprocess.run(cmd, cwd=component["env_dir"], text=True, stderr=subprocess.PIPE)
if result.returncode != 0:
self.cleanup = False
_logger.warning(f"Error installing setuptools:\n{result.stderr}")
try:
result = self._install_logmodule(component["python_path"], component["env_dir"])
except Exception as err:
_logger.error(err)
self.cleanup = False
raise RuntimeError("The 'simaticai' Python package is either not installed or does not contain package 'log_module'.") from None
if result.returncode != 0:
self.cleanup = False
raise RuntimeError(f"Error installing log_module:\n{result.stderr}")
req_list = Path(component["env_dir"] / "requirements.list")
req_list.touch(exist_ok=True)
if Path(component["env_dir"] / REQUIREMENTS_TXT).is_file():
dependencies, extra_index, index_url = parse_requirements(component["env_dir"] / REQUIREMENTS_TXT)
requirements = "#".join(dependencies.keys())
req_list.write_text(requirements)
else:
_logger.info(f"'{REQUIREMENTS_TXT}' was not found. No additional dependencies were installed.")
return
package_dir = component["env_dir"] / PYTHON_PACKAGES
package_zip = component["env_dir"] / PYTHON_PACKAGES_ZIP
_logger.info(f"Extracting {PYTHON_PACKAGES_ZIP}")
if package_zip.is_file():
with zipfile.ZipFile(package_zip) as zf:
zf.extractall(path=package_dir.absolute())
else:
_logger.info(f"There is no {PYTHON_PACKAGES_ZIP} to extract.")
package_dir.mkdir(parents=True, exist_ok=True)
success = self._install_from_packages_zip(component, package_dir)
if not success:
msg = f"Warning! Could not install dependencies from {PYTHON_PACKAGES_ZIP}. "
msg += "Trying to install them from pypi.org. The resulting Python environment "
msg += "may be significantly different than the targeted Python environment on the Edge Device!"
_logger.warning(msg)
if self.docker.is_vessel and component["runtime"]["type"] == "python":
raise RuntimeError("The component is running in a docker container. The installation of dependencies from pypi.org is not supported.")
else:
second_install_result = self._install_from_pypi_org(component, package_dir)
if 0 != second_install_result.returncode:
self.cleanup = False
raise RuntimeError(f"Error installing requirements:\n{second_install_result.stderr}")
@staticmethod
def _install_logmodule(python_path, env_dir):
_logger.info("Installing LogModule...")
try:
package_paths = importlib_metadata.files("simaticai")
assert package_paths is not None
logger_wheel = [p for p in package_paths if 'log_module' in str(p)][0].locate()
except Exception:
from importlib.metadata import Distribution
direct_url = Distribution.from_name("simaticai").read_text("direct_url.json")
assert direct_url is not None
direct_url = json.loads(direct_url)['url']
direct_url = direct_url.replace('file://','')
direct_url = Path(direct_url) / 'simaticai' / 'data'
paths = list(direct_url.rglob('*.whl'))
logger_wheel = [p for p in paths if 'log_module' in str(p)][0].resolve()
cmd = [
str(python_path), "-m", "pip",
"install",
"--no-warn-script-location", logger_wheel, "joblib"
]
return subprocess.run(cmd, cwd=env_dir, text=True, stderr=subprocess.PIPE)
def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
"""
Runs the component in its virtual environment with the given input.
This environment is created according to `requirements.txt` in the package.
Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
or a DataStream object which will produce the appropriate input data.
The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.
Args:
name (str): The name of the component to be executed.
data (dict or list): One or more input records for the component.
Returns:
dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.
"""
assert name in self.components, f"Invalid component name: {name}"
component = self.components[name]
assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"
input_payload_path: Path = component["env_dir"] / "input.joblib"
output_payload_path: Path = component["env_dir"] / "output.joblib"
batch_input: bool = component["batch"]["inputBatch"] == "Yes" if component.get("batch") is not None else False
batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False
# Validate and serialize input payload
assert data is not None, f"Can not run component '{name}' without input."
result_is_list = True
if isinstance(data, list):
input_payload = data
elif isinstance(data, DataStream):
input_payload = [item for item in data]
else:
result_is_list = False
input_payload = [data]
validate_payload(input_payload, component["inputType"], batch_input)
joblib.dump(input_payload, input_payload_path)
self.report_writer.set_input_payload_length(name, len(input_payload))
_logger.info(f"Input payload saved as '{input_payload_path}'")
# Assemble command for runnig component
if component['runtime']['type'] == 'python':
# Version check for Python
e_major, e_minor, _, _, _ = sys.version_info
c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))
if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
msg += " Pipeline behavior on AI Inference Server might be different."
_logger.warning(msg)
if component["context"] is None:
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_component.py', component["env_dir"])
req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
args = [
"-m", 'run_component',
"-m", Path(component["entrypoint"]).stem,
"-p", f"{json_params}",
"-r", f"{req_list_path}",
]
else:
# gpuruntime step requires Python environment with onnxruntime installed
if component["context"] is None:
shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])
args = [
"-m", 'run_gpuruntime_component',
"-m", component["entrypoint"],
"-c", "config.pbtxt",
]
args += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self.log_level)
]
# Run the component in the created Python environment
_logger.info(f"Running component '{name}'...")
_logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")
cmd = [str(component['python_path'])] + args
_logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if p.stdout is not None:
for line in p.stdout:
_logger.info(line.strip())
returncode = p.wait()
if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
self.cleanup = False
raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
else:
response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False
# Deserialize and validate output payload
_logger.info(f"Loading output payload from '{output_payload_path}'")
output_payload = joblib.load(output_payload_path)
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
self.report_writer.set_output_payload_length(name, len(output_payload))
if response_wrapped:
output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
else:
output_payload = [output for output in output_payload if output is not None]
validate_payload(output_payload, component["outputType"], batch_output)
if result_is_list:
return output_payload
if 0 < len(output_payload):
return output_payload[0]
return None
def update_parameters(self, parameters: dict):
"""
Validates and updates pipeline parameters.
The elements of the dictionary must match the parameters specified in the pipeline configuration package.
If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
"""
for key,value in parameters.items():
if key not in self.parameters.keys():
if key.upper().startswith("__AI_IS"):
self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
else:
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
for key, value in parameters.items():
self.parameters[key]["defaultValue"] = value
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
"""
Runs all the components sequentially, assuming the output of a component is only consumed by the next.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
and the output is saved as `output.joblib`.
Args:
payload (dict or list): One or more input records for the pipeline.
Returns:
The output of the last component.
"""
for name in self.components.keys():
payload = self.run_component(name, payload)
return payload
__init__(packageZip, path=None, cleanup=None, loglevel=logging.INFO) ¶
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packageZip | path - like | Path to the pipeline configuration package. | required |
path | path - like | Path to the working directory. If unset a temporary directory will be created. | None |
cleanup | bool | If set, the working directory will be kept when True, and deleted when False. If unset, a temporary working directory will be removed, and an explicit working directory will be kept. When an error occurs in a component, the working directory will be kept regardless of this value. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def __init__(self, packageZip: os.PathLike, path: Optional[os.PathLike] = None, cleanup: Optional[bool] = None, loglevel = logging.INFO):
"""
Creates a new component LocalPipelineRunner for the provided pipeline configuration package.
Only works with a pipeline configuration package. Does not work with e.g. an edge configuration package.
Args:
packageZip (path-like): Path to the pipeline configuration package.
path (path-like): Path to the working directory. If unset a temporary directory will be created.
cleanup (bool): If set, the working directory will be kept when True, and deleted when False. \
If unset, a temporary working directory will be removed, and an explicit working directory will be kept. \
When an error occurs in a component, the working directory will be kept regardless of this value.
"""
self.package_zip: Path = Path(packageZip)
self.path = Path(path).resolve().absolute() if path is not None else None
self.components = {}
self.parameters = {}
self.cleanup = cleanup
self.log_level = loglevel
self.workdir: Path
self.docker = VesselBaseDocker()
self.report_writer = PipelineRunnerReportWriter()
report_writer_handler = ReportWriterHandler(self.report_writer)
_logger.addHandler(report_writer_handler)
collect_telemetry_data() ¶
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
dict | dict | A dictionary containing the telemetry data. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def collect_telemetry_data(self) -> dict:
"""
Collects telemetry data about the platform, environment, industrial AI packages, and pipeline.
Returns:
dict: A dictionary containing the telemetry data.
"""
telemetry_data = {}
telemetry_data["platform"] = {}
telemetry_data["platform"]["os"] = platform.system()
telemetry_data["platform"]["release"] = platform.release()
telemetry_data["platform"]["python_version"] = platform.python_version()
_logger.info(f"locals: {locals()}")
telemetry_data["environment"] = {}
env_info = "Jupyter" if any(k for k in locals() if k in ["__IPYTHON__", "get_ipython"]) else MSG_NOT_FOUND
env_info = "GitLab CI/CD" if "GITLAB_CI" in os.environ else env_info
env_info = "Azure DevOps Pipelines" if "TF_BUILD" in os.environ else env_info
env_info = "GitHub Actions" if "GITHUB_ACTIONS" in os.environ else env_info
telemetry_data["environment"]["info"] = env_info
telemetry_data["industrial_ai"] = {}
try:
telemetry_data["industrial_ai"]["simaticai"] = importlib_metadata.version("simaticai")
except importlib_metadata.PackageNotFoundError:
telemetry_data["industrial_ai"]["simaticai"] = MSG_NOT_FOUND
telemetry_data["pipeline"] = {}
telemetry_data["pipeline"]["python_versions"] = list(set(self.components[component]['runtime']['version'] for component in self.components
if self.components[component]['runtime']['type'] == 'python'))
telemetry_data["pipeline"]["file_extensions"] = []
for component_dir in [Path(self.workdir) / c for c in Path(self.workdir).rglob("*") if c.name in self.components.keys()]:
excluded_dirs = set([component_dir / '.venv', component_dir / '__pyache__'])
suffixes = list(set(f.suffix for f in component_dir.rglob("*")
if not (any(excluded_dirs.intersection(f.parents)) or f.suffix in ["", ".zip", ".yml", ".yaml", ".html"])))
for suffix in suffixes:
if suffix not in telemetry_data["pipeline"]["file_extensions"]:
telemetry_data["pipeline"]["file_extensions"].append(suffix)
return telemetry_data
update_telemetry_data() ¶
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file, or collecting new telemetry data if the file doesn't exist. It then updates the "last_test_run" field of the telemetry data with the current timestamp. The updated telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package is created with the updated telemetry data. Otherwise, the existing package is overwritten with the new package containing the updated telemetry data.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.pydef update_telemetry_data(self):
"""
Update the telemetry data and the package.
This method updates the telemetry data by loading the existing data from a YAML file,
or collecting new telemetry data if the file doesn't exist. It then updates the
"last_test_run" field of the telemetry data with the current timestamp. The updated
telemetry data is then written back to the YAML file.
If the package contains a different version of the telemetry data file, a new package
is created with the updated telemetry data. Otherwise, the existing package is
overwritten with the new package containing the updated telemetry data.
"""
_logger.info("Updating telemetry data and the package")
telemetry_path = self.workdir / "telemetry_data.yml"
if telemetry_path.is_file():
telemetry_data = yaml.safe_load(telemetry_path.read_text())
else:
telemetry_data = self.collect_telemetry_data()
telemetry_data["pipeline"]["last_test_run"] = datetime.now().isoformat()
telemetry_path.write_text(yaml.dump(telemetry_data))
config_package = False
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
for file in zip_read.namelist():
if TELEMETRY_YAML in file and file != TELEMETRY_YAML:
config_package = True
break
new_zip_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.zip"
with zipfile.ZipFile(self.package_zip, 'r') as zip_read:
with zipfile.ZipFile(new_zip_path, 'w') as zip_write:
for file in zip_read.namelist():
if TELEMETRY_YAML not in file:
filepath = zip_read.extract(file, path=self.workdir)
zip_write.write(filepath, arcname=file)
else:
zip_write.write(telemetry_path, arcname=file)
if config_package:
shutil.copy(new_zip_path, self.package_zip)
else:
new_sha_path = Path(self.package_zip).parent / f"{Path(self.package_zip).stem}_tested.sha256"
new_sha_path.write_text(calc_sha(new_zip_path))
run_component(name, data) ¶
Runs the component in its virtual environment with the given input. This environment is created according to requirements.txt in the package. Additionally 'joblib' and the mock log_module is automatically installed in this virtual environment. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries, or a DataStream object which will produce the appropriate input data. The supplied input data is saved as inputs.joblib in the component runtime directory, and the output is saved as output.joblib.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The name of the component to be executed. | required |
data | dict or list | One or more input records for the component. | required |
Returns:
| Type | Description |
|---|---|
Optional[Union[dict, list]] | dict / list: A list of dictionaries for outputs if there were no errors and field |
Optional[Union[dict, list]] | If the input was a single dict, then a single dict (the first item of the list) or None if there is no output. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_component(self, name: str, data: Optional[Union[dict, list, DataStream]]) -> Optional[Union[dict, list]]:
"""
Runs the component in its virtual environment with the given input.
This environment is created according to `requirements.txt` in the package.
Additionally 'joblib' and the mock `log_module` is automatically installed in this virtual environment.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries,
or a DataStream object which will produce the appropriate input data.
The supplied input data is saved as `inputs.joblib` in the component runtime directory, and the output is saved as `output.joblib`.
Args:
name (str): The name of the component to be executed.
data (dict or list): One or more input records for the component.
Returns:
dict / list: A list of dictionaries for outputs if there were no errors and field `ready` is true.
If the input was a single dict, then a single dict (the first item of the list) or None if there is no output.
"""
assert name in self.components, f"Invalid component name: {name}"
component = self.components[name]
assert component["runtime"]["type"] in ["python", "gpuruntime"], f"Can not run component '{name}': Runtime type is nor 'python' or 'gpuruntime'"
input_payload_path: Path = component["env_dir"] / "input.joblib"
output_payload_path: Path = component["env_dir"] / "output.joblib"
batch_input: bool = component["batch"]["inputBatch"] == "Yes" if component.get("batch") is not None else False
batch_output: bool = component["batch"]["outputBatch"] == "Yes" if component.get("batch") is not None else False
# Validate and serialize input payload
assert data is not None, f"Can not run component '{name}' without input."
result_is_list = True
if isinstance(data, list):
input_payload = data
elif isinstance(data, DataStream):
input_payload = [item for item in data]
else:
result_is_list = False
input_payload = [data]
validate_payload(input_payload, component["inputType"], batch_input)
joblib.dump(input_payload, input_payload_path)
self.report_writer.set_input_payload_length(name, len(input_payload))
_logger.info(f"Input payload saved as '{input_payload_path}'")
# Assemble command for runnig component
if component['runtime']['type'] == 'python':
# Version check for Python
e_major, e_minor, _, _, _ = sys.version_info
c_major, c_minor, *_ = tuple(str(component["runtime"]["version"]).split('.'))
if not self.docker.is_vessel and (f"{e_major}.{e_minor}" != f"{c_major}.{c_minor}"):
msg = f"The local python version ({e_major}.{e_minor}) and the python version defined for the component ({c_major}.{c_minor}) are different."
msg += " Testing will be done using dependencies that corresponds to the python version of your development environment."
msg += " Pipeline behavior on AI Inference Server might be different."
_logger.warning(msg)
if component["context"] is None:
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_component.py', component["env_dir"])
req_list_path = _relative_to(component["env_dir"] / "requirements.list", component["env_dir"])
json_params = json.dumps({param["name"]: param["defaultValue"] for param in self.parameters.values()})
args = [
"-m", 'run_component',
"-m", Path(component["entrypoint"]).stem,
"-p", f"{json_params}",
"-r", f"{req_list_path}",
]
else:
# gpuruntime step requires Python environment with onnxruntime installed
if component["context"] is None:
shutil.copy(_runner_path / 'gpuruntime_requirements.txt', component["env_dir"] / REQUIREMENTS_TXT)
self._init_component_venv(component)
shutil.copy(_runner_path / 'run_gpuruntime_component.py', component["env_dir"])
shutil.copy(_runner_path.parent / 'model_config_pb2.py', component["env_dir"])
args = [
"-m", 'run_gpuruntime_component',
"-m", component["entrypoint"],
"-c", "config.pbtxt",
]
args += [
"-i", input_payload_path.name,
"-o", output_payload_path.name,
"-ll", logging.getLevelName(self.log_level)
]
# Run the component in the created Python environment
_logger.info(f"Running component '{name}'...")
_logger.info(f"{component['python_path'] =} to {component['env_dir'] =}")
cmd = [str(component['python_path'])] + args
_logger.info(f"Running command: {subprocess.list2cmdline(cmd)}")
p = subprocess.Popen(cmd, cwd=component["env_dir"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if p.stdout is not None:
for line in p.stdout:
_logger.info(line.strip())
returncode = p.wait()
if returncode not in [RETURN_CODE_OK, RETURN_CODE_DEPRECATED]:
self.cleanup = False
raise RuntimeError(f"""\
There was an error while running component '{name}'.
You can check the test results in directory '{component['env_dir']}'
""")
else:
response_wrapped = True if returncode == RETURN_CODE_DEPRECATED else False
# Deserialize and validate output payload
_logger.info(f"Loading output payload from '{output_payload_path}'")
output_payload = joblib.load(output_payload_path)
output_payload = output_payload if isinstance(output_payload, list) else [output_payload]
self.report_writer.set_output_payload_length(name, len(output_payload))
if response_wrapped:
output_payload = [json.loads(item['output']) for item in output_payload if item['ready']]
else:
output_payload = [output for output in output_payload if output is not None]
validate_payload(output_payload, component["outputType"], batch_output)
if result_is_list:
return output_payload
if 0 < len(output_payload):
return output_payload[0]
return None
update_parameters(parameters) ¶
Validates and updates pipeline parameters. The elements of the dictionary must match the parameters specified in the pipeline configuration package. If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def update_parameters(self, parameters: dict):
"""
Validates and updates pipeline parameters.
The elements of the dictionary must match the parameters specified in the pipeline configuration package.
If any of the names or types does not match, all parameters will remain untouched.
Args:
parameters (dict): names and values of parameters to update
Raises:
AssertionError:
When:
- either `name` is not in the configured parameters,
- or `defaultValue` type is different from the configured one
"""
for key,value in parameters.items():
if key not in self.parameters.keys():
if key.upper().startswith("__AI_IS"):
self.parameters[key] = {'defaultValue': False, 'name': key, 'type': 'Boolean'}
else:
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
elif self.parameters[key]["type"] != type_map.get(type(value).__name__):
raise AssertionError(f"Pipeline has no parameters with the name '{key}' and type '{type_map.get(type(value).__name__)}'")
for key, value in parameters.items():
self.parameters[key]["defaultValue"] = value
run_pipeline(payload={}) ¶
Runs all the components sequentially, assuming the output of a component is only consumed by the next. The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries. For each component the supplied input data is saved as input.joblib in the component runtime directory, and the output is saved as output.joblib.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload | dict or list | One or more input records for the pipeline. | {} |
Returns:
| Type | Description |
|---|---|
Optional[Union[dict, list]] | The output of the last component. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.py
def run_pipeline(self, payload: Optional[Union[dict, list, DataStream]] = {}) -> Optional[Union[dict, list]]:
"""
Runs all the components sequentially, assuming the output of a component is only consumed by the next.
The input data can be a single input record in a dictionary or a batch of input records in a list of dictionaries.
For each component the supplied input data is saved as `input.joblib` in the component runtime directory,
and the output is saved as `output.joblib`.
Args:
payload (dict or list): One or more input records for the pipeline.
Returns:
The output of the last component.
"""
for name in self.components.keys():
payload = self.run_component(name, payload)
return payload
validate_payload(data, variables, batch, logger=_logger) ¶
Validates that data is a valid list of input or output payload items. Variables list what variables each playload item has. Batch indicates if the payload items are themselves batches of items or not.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.pydef validate_payload(data: list, variables: list, batch: bool, logger = _logger):
"""
Validates that data is a valid list of input or output payload items.
Variables list what variables each playload item has.
Batch indicates if the payload items are themselves batches of items or not.
"""
global extra_variables_to_log
extra_variables_to_log = set()
assert isinstance(data, list), "payload data must be a 'list'"
for i in data:
if batch:
assert isinstance(i, list), "batch payload items must be 'list' instances"
else:
i = [i]
for j in i:
validate_payload_item(j, variables, logger)
if len(extra_variables_to_log):
logger.warning(f"WARNING! These variables are not declared but are part of the payload: {extra_variables_to_log}")
validate_payload_item(data, variables, logger) ¶
Validates that data is a valid payload item. Variables listed must have a corresponding field in data. The types of the values must match their declared type.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_runner.pydef validate_payload_item(data: dict, variables: list, logger):
"""
Validates that data is a valid payload item.
Variables listed must have a corresponding field in data.
The types of the values must match their declared type.
"""
assert isinstance(data, dict), "payload items must be 'dict' isntances"
for variable in variables:
name = variable["name"]
value = data.get(name, None)
if value is None:
logger.warning(f"WARNING! Variable '{name}' is missing from input, output or metric")
continue
if variable["type"] == "String":
assert isinstance(value, str), "'String' value must be an 'str'"
if variable["type"] == "StringArray":
assert isinstance(value, list), "'StringArray' value must be a 'list'"
assert all(isinstance(i, str) for i in value), "'StringArray' items must be 'str' isntances"
if variable["type"] == "Object":
assert isinstance(value, dict), "'Object' value must be a 'dict'"
values = list(value.values())
assert len(values) == 2, "'Object' value must have exactly 2 items"
ok = isinstance(values[0], str) and isinstance(values[1], bytes) or isinstance(values[1], str) and isinstance(values[0], bytes)
assert ok, "'Object' value must have exactly one 'str' and one 'bytes' field"
payload_names = set(data.keys())
variable_names = { variable["name"] for variable in variables }
variable_names.add('timestamp')
extra_variables = payload_names - variable_names
extra_variables_to_log.update(extra_variables)
Static validation of pipeline packages.
Executes static checks on a pipeline configuration package including:
- Verifying that the Python version required in the package is supported by a known version of the AI Inference Server.
- Verifying that all the required Python packages are either included in the pipeline package itself or available on
pypi.orgfor the target platform.
PipelineValidationError ¶
Bases: Exception
Represents a problem with the pipeline configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
description | str | Description of the error. Mandatory argument. | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
class PipelineValidationError(Exception):
"""
Represents a problem with the pipeline configuration.
Args:
description (str): Description of the error. Mandatory argument.
"""
def __init__(self, description: str) -> None:
assert description is not None
self.value = description
def __str__(self):
return self.value
validate_pipeline_dependencies(zip_path) ¶
@Deprecated, reason: In the future only the edge package will be generated and the same validation is performed during edge package creation.
Validates an already built pipeline configuration package to check if it is compatible with the AI Inference Server.
This method verifies that the requirements identified by name and version are either included in PythonPackages.zip or available on pypi.org for the target platform. If the required dependency for the target platform is not available on pypi.org and not present in PythonPackages.zip it will log the problem at the ERROR level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
zip_path | path - like | Path to the pipeline configuration package zip file | required |
Raises:
| Type | Description |
|---|---|
PipelineValidationError | if the validation fails. See the logger output for details. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def validate_pipeline_dependencies(zip_path):
"""
@Deprecated, reason: In the future only the edge package will be generated and the same validation is performed during edge package creation.
Validates an already built pipeline configuration package to check if it is compatible with the AI Inference Server.
This method verifies that the requirements identified by name and version are either included
in `PythonPackages.zip` or available on pypi.org for the target platform.
If the required dependency for the target platform is not available on pypi.org
and not present in `PythonPackages.zip` it will log the problem at the ERROR level.
Args:
zip_path (path-like): Path to the pipeline configuration package zip file
Raises:
PipelineValidationError: if the validation fails. See the logger output for details.
"""
with tempfiles.OpenZipInTemp(zip_path) as package_dir:
package_dir = next(package_dir.iterdir())
error = read_config_and_download_deps(package_dir)
if error:
raise PipelineValidationError("Requirements of one or more components can not be satisfied.")
_logger.info(f"Validating pipeline package '{zip_path}' was successful.")
download_component_dependencies(component, package_dir) ¶
Download the dependencies of a pipeline component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component | dict | A single component from the parsed | required |
package_dir | Path | The directory where the component was extracted | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def download_component_dependencies(component: dict, package_dir: Path):
"""
Download the dependencies of a pipeline component.
Args:
component (dict): A single component from the parsed `pipeline_configuration.yml`.
package_dir (Path): The directory where the component was extracted
"""
_logger.info(f"Validating requirements of component: {component['name']}")
try:
deployment.python_version_validator(component['runtime']['version'])
except ValueError as error:
raise PipelineValidationError(error)
component_dir = package_dir / component['name']
requirements_file_path = component_dir / REQUIREMENTS_TXT
python_packages_folder = _build_python_packages_folder(component_dir)
return not _are_dependencies_available(requirements_file_path, component['runtime']['version'], python_packages_folder)
read_config_and_download_deps(package_dir) ¶
Reads the pipeline configuration from the package directory and downloads its dependencies
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
package_dir | Path | The directory where the pipeline configuration package was extracted. | required |
Returns:
| Name | Type | Description |
|---|---|---|
bool | bool | True if there was an error during the download of the components, False otherwise |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/pipeline_validator.py
def read_config_and_download_deps(package_dir: Path) -> bool:
"""
Reads the pipeline configuration from the package directory and downloads its dependencies
Args:
package_dir (Path): The directory where the pipeline configuration package was extracted.
Returns:
bool: True if there was an error during the download of the components, False otherwise
"""
try:
config = yaml_helper.read_yaml(package_dir / 'pipeline_config.yml')
error = False
for component in config['dataFlowPipeline']['components']:
error = download_component_dependencies(component, package_dir) or error
return error
except Exception:
raise PipelineValidationError(INVALID_PIPELINE_PACKAGE_MESSAGE)
Utility script for running an entrypoint Python script in a given virtual Python environment. It is designed to be executed from simaticai.testing.PipelineRunner class. It consumes input data from a joblib file and produces output data into a joblib file.
warn_about_unused_dependencies(requirements_list) ¶
Raises a warning if some declared dependencies were not used during test execution
This method compares the imported modules after test execution with the dependency list in the package's requirements.txt file. If the requirements contains more than what is required for execution, it raises a warning.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/run_component.pydef warn_about_unused_dependencies(requirements_list):
"""
Raises a warning if some declared dependencies were not used during test execution
This method compares the imported modules after test execution with the
dependency list in the package's requirements.txt file. If the requirements
contains more than what is required for execution, it raises a warning.
"""
imported_packages = []
packages = importlib.metadata.packages_distributions()
for module_name in sys.modules:
try:
if module_name and '.' not in module_name: # Only check top-level modules
dist = importlib.metadata.distribution(module_name)
imported_packages.append(dist.name)
except Exception:
if module_name in packages:
pkgs = packages.get(module_name)
for pkg in pkgs:
imported_packages.append(pkg)
imported_packages = set([pkg.replace('-','_').lower() for pkg in imported_packages])
requirements_list = set([pkg.replace('-','_').lower() for pkg in requirements_list])
diff = requirements_list.difference(imported_packages)
if 0 < len(diff):
_logger.warning(f"WARNING! The following dependencies were not used during execution: {', '.join(diff)}. Consider removing them from the pipeline package.")
main(args) ¶
Feeds input to the entrypoint and captures output.
Imports entrypoint module given with its name, and triggers its run(...) function with the prepared data in the input file. If pipeline_parameters dictionary is not empty, before triggering run(..) method, the update_parameters(..) method of entrypoint will be called with the dictionary. The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries. One dictionary represents one input for the component with the required variable names and values, which is directly passed to run(). The output file is a dumped joblib result which is a list containing outputs of the component, in the structure returned from run().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args | Namespace | Command line arguments | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/run_component.py
def main(args: argparse.Namespace) -> int:
"""
Feeds input to the entrypoint and captures output.
Imports entrypoint module given with its name, and triggers its `run(...)` function with the prepared data in the input file.
If pipeline_parameters dictionary is not empty, before triggering `run(..)` method, the `update_parameters(..)` method of entrypoint will be called with the dictionary.
The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries.
One dictionary represents one input for the component with the required variable names and values, which is directly passed to `run()`.
The output file is a dumped joblib result which is a list containing outputs of the component, in the structure returned from `run()`.
Args:
args (argparse.Namespace): Command line arguments<br/>
module_name (str): Name of the entrypoint Python script<br/>
input_file (os.Pathlike): Path of the joblib file containing the input payloads<br/>
output_file (os.Pathlike): Path of the joblib file where the outputs will be stored<br/>
pipeline_parameters (json-string): json formatted dictionary defining configurable parameters with their names as key and their values
"""
entrypoint = importlib.import_module(args.module_name)
trigger_method = None
try:
inspect.signature(entrypoint.process_input)
trigger_method = "process_input"
except AttributeError:
try:
inspect.signature(entrypoint.run)
trigger_method = "run"
except AttributeError:
_logger.warning("Method run not found")
if trigger_method is None:
raise RuntimeError("Neither 'run(data: str)' nor 'process_input(data: dict)' entrypoint method can be found.")
if args.pipeline_parameters is not None and args.pipeline_parameters != "{}":
pipeline_parameters = json.loads(args.pipeline_parameters)
else:
pipeline_parameters = {}
try:
_logger.debug(f"Calling `update_parameters(..)` with: {args.pipeline_parameters}")
entrypoint.update_parameters(pipeline_parameters)
except AttributeError:
_logger.warning("Entrypoint does not implement `update_parameters()` method. Skipping pipeline parameter update.")
input_list = joblib.load(args.input_file)
if not isinstance(input_list, list):
raise ValueError("Component input must be supplied as a list.")
if trigger_method == "process_input":
_logger.debug("Calling `process_input(..)`")
else:
_logger.debug("Calling `run(..)`")
output_list = []
for input_data in input_list:
if trigger_method == "process_input":
output_list.append(entrypoint.process_input(input_data))
else:
output_list.append(entrypoint.run(json.dumps(input_data)))
if args.requirements_file is not None:
requirements_list = Path(args.requirements_file).read_text().split('#')
warn_about_unused_dependencies(requirements_list)
joblib.dump(output_list, args.output_file)
if trigger_method == "process_input":
return 0
else:
_logger.warning("Trigger method `run(data: str)` is deprecated and will be removed in the future. Please refer the user manual.")
return 0b10001 # return code 0b10001 means deprecated `run` method was triggered
Utility script for running an ONNX model in a given virtual Python environment. It is designed to be executed from simaticai.testing.PipelineRunner class. It consumes input data from a joblib file and produces output data into a joblib file.
main(model_path, config_path, input_file, output_file) ¶
Feeds input to the ML Model saved in ONNX format and captures output.
Reads the given model and creates an onnxruntime Session to The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries. The output file is a dumped joblib result list containing the input dictionary extended with the generated predictions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_path | str | File path for the stored ML Model in ONNX format | required |
input_file | Pathlike | Path of the joblib file containing the input payloads | required |
output_file | Pathlike | Path of the joblib file where the outputs will be stored | required |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/run_gpuruntime_component.py
def main(model_path, config_path, input_file, output_file):
"""Feeds input to the ML Model saved in ONNX format and captures output.
Reads the given model and creates an onnxruntime Session to
The input file must be a joblib dump, and the joblib must be a dictionary or list of dictionaries.
The output file is a dumped joblib result list containing the input dictionary extended with the generated predictions.
Args:
model_path (str): File path for the stored ML Model in ONNX format
input_file (os.Pathlike): Path of the joblib file containing the input payloads
output_file (os.Pathlike): Path of the joblib file where the outputs will be stored
"""
input_list = joblib.load(input_file)
input_list = input_list if type(input_list) is list else [input_list]
output_list = []
session = InferenceSession(model_path)
model_config = _get_proto_config(config_path)
max_batch_size = model_config.get("maxBatchSize", 0)
inputs = [input for input in session.get_inputs()]
input_names = [input.name for input in inputs]
outputs = [output for output in session.get_outputs()]
output_names = [output.name for output in outputs]
for _input in input_list:
_input_tensor = {}
output = {k: _input.get(k) for k in _input.keys() if k not in input_names}
for _input_info in inputs:
if max_batch_size > 0:
input_shape = _get_new_shape(model_config["input"], _input[_input_info.name], _input_info.name)
if input_shape[0] > max_batch_size:
_logger.warning(f"Received input batch size ({input_shape[0]}) is greater than max_batch_size ({max_batch_size})!")
_input_tensor[_input_info.name] = _input[_input_info.name].reshape(input_shape)
else:
_input_tensor[_input_info.name] = _input[_input_info.name].reshape(_input_info.shape)
model_outputs = session.run(output_names, _input_tensor)
output |= dict(zip(output_names, model_outputs))
output_list.append(output)
joblib.dump(output_list, output_file)
return 0
RunnerConfig module and class is responsible to handle the configuration json.
An instance of RunnerConfig class is being used to setup a LocalPipelineRunner with the required parameters.
TimeSeriesStreamConfig dataclass ¶
TimeSeriesStreamConfig is a configuration class used to manage settings for the time series stream. Attributes: fields (list): List of fields to be included in the time series stream. Defaults to None. count (int): Number of records to be included in the time series stream. Defaults to None. offset (int): Offset to be applied in the time series stream. Defaults to None. batch_size (int): Size of the batch to be used in the time series stream. Defaults to None.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py@dataclass
class TimeSeriesStreamConfig:
"""
TimeSeriesStreamConfig is a configuration class used to manage settings for the time series stream.
Attributes:
fields (list): List of fields to be included in the time series stream. Defaults to None.
count (int): Number of records to be included in the time series stream. Defaults to None.
offset (int): Offset to be applied in the time series stream. Defaults to None.
batch_size (int): Size of the batch to be used in the time series stream. Defaults to None.
"""
fields: list = field(default=None)
count: int = field(default=None)
offset: int = field(default=None)
batch_size: int = field(default=None)
VCAStreamConfig dataclass ¶
VCAStreamConfig is a configuration class used to manage settings for the VCA stream. Attributes: variable_name (str): Name of the variable to be used in the VCA stream. Defaults to None. image_format (str): Format of the images to be used in the VCA stream. Defaults to None. filter (str): Filter to be applied in the VCA stream. Defaults to None.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py@dataclass
class VCAStreamConfig:
"""
VCAStreamConfig is a configuration class used to manage settings for the VCA stream.
Attributes:
variable_name (str): Name of the variable to be used in the VCA stream. Defaults to None.
image_format (str): Format of the images to be used in the VCA stream. Defaults to None.
filter (str): Filter to be applied in the VCA stream. Defaults to None.
"""
variable_name: str = field(default=None)
image_format: str = field(default=None)
filter: str = field(default=None)
RunnerConfig dataclass ¶
RunnerConfig is a configuration class used to manage settings for running tests and streams. Attributes: data (PathLike): Path to the data file or directory. Defaults to None. test_dir (PathLike): Path to the test directory. Defaults to None. cleanup (bool): Indicates whether to clean up resources after execution. Defaults to True. time_series_stream (TimeSeriesStreamConfig): Configuration for the time series stream. vca_stream (VCAStreamConfig): Configuration for the VCA stream. Methods: post_init(): Converts data and test_dir attributes to Path objects if they are not None. dict(): Returns a dictionary representation of the configuration, including nested stream configurations. from_json(config_path: PathLike | str) -> RunnerConfig: Creates a RunnerConfig instance from a JSON configuration file. Args: config_path (PathLike | str): Path to the JSON configuration file. Returns: RunnerConfig: An instance of the RunnerConfig class populated with values from the JSON file. Exceptions: - Logs a warning and returns a default RunnerConfig instance if the file is not found. - Logs a warning and returns a default RunnerConfig instance if the file is not a valid JSON. - Logs a warning and returns a default RunnerConfig instance if any other error occurs during file reading.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/runner_config.py@dataclass
class RunnerConfig:
"""
RunnerConfig is a configuration class used to manage settings for running tests and streams.
Attributes:
data (PathLike): Path to the data file or directory. Defaults to None.
test_dir (PathLike): Path to the test directory. Defaults to None.
cleanup (bool): Indicates whether to clean up resources after execution. Defaults to True.
time_series_stream (TimeSeriesStreamConfig): Configuration for the time series stream.
vca_stream (VCAStreamConfig): Configuration for the VCA stream.
Methods:
__post_init__():
Converts `data` and `test_dir` attributes to `Path` objects if they are not None.
__dict__():
Returns a dictionary representation of the configuration, including nested stream configurations.
from_json(config_path: PathLike | str) -> RunnerConfig:
Creates a `RunnerConfig` instance from a JSON configuration file.
Args:
config_path (PathLike | str): Path to the JSON configuration file.
Returns:
RunnerConfig: An instance of the `RunnerConfig` class populated with values from the JSON file.
Exceptions:
- Logs a warning and returns a default `RunnerConfig` instance if the file is not found.
- Logs a warning and returns a default `RunnerConfig` instance if the file is not a valid JSON.
- Logs a warning and returns a default `RunnerConfig` instance if any other error occurs during file reading.
"""
data: PathLike = field(default=None)
test_dir: PathLike = field(default=None)
cleanup: bool = field(default=True)
time_series_stream: TimeSeriesStreamConfig = field(default_factory=TimeSeriesStreamConfig)
vca_stream: VCAStreamConfig = field(default_factory=VCAStreamConfig)
def __post_init__(self):
if self.data is not None:
self.data = Path(self.data)
if self.test_dir is not None:
self.test_dir = Path(self.test_dir)
@property
def __dict__(self):
return {
"data": self.data,
"test_dir": self.test_dir,
"cleanup": self.cleanup,
"TimeSeriesStream": {
"fields": self.time_series_stream.fields,
"count": self.time_series_stream.count,
"offset": self.time_series_stream.offset,
"batch_size": self.time_series_stream.batch_size
},
"VCAStream": {
"variable_name": self.vca_stream.variable_name,
"image_format": self.vca_stream.image_format,
"filter": self.vca_stream.filter
}
}
@classmethod
def from_json(cls, config_path: PathLike | str):
import json
try:
config_path = Path(config_path)
with open(config_path, 'r') as file:
json_config = json.load(file)
except FileNotFoundError:
logger.warning(f"Configuration file {config_path} does not exist.")
return cls()
except json.JSONDecodeError:
logger.warning(f"Configuration file {config_path} is not a valid JSON.")
return cls()
except Exception as e:
logger.warning(f"An error occurred while reading the configuration file {config_path}: {e}")
return cls()
data = json_config.get('data')
test_dir = json_config.get('test_dir')
cleanup = json_config.get('cleanup', True)
time_series_stream = TimeSeriesStreamConfig(
fields=json_config.get('TimeSeriesStream', {}).get('fields'),
count=json_config.get('TimeSeriesStream', {}).get('count'),
offset=json_config.get('TimeSeriesStream', {}).get('offset'),
batch_size=json_config.get('TimeSeriesStream', {}).get('batch_size')
)
vca_stream = VCAStreamConfig(
variable_name=json_config.get('VCAStream', {}).get('variable_name'),
image_format=json_config.get('VCAStream', {}).get('image_format'),
filter=json_config.get('VCAStream', {}).get('filter')
)
return cls(
data=data,
test_dir=test_dir,
cleanup=cleanup,
time_series_stream=time_series_stream,
vca_stream=vca_stream,
)
TimeSeriesStream ¶
Bases: DataStream
This class creates a generator from a csv file.
The generate function returns a generator that reads the csv file line by line and converts each line to an input dictionary, as if it were received from AI Inference Server.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.pyclass TimeSeriesStream(DataStream):
"""
This class creates a generator from a csv file.
The generate function returns a generator that reads the csv file line by line and
converts each line to an input dictionary, as if it were received from AI Inference Server.
"""
def __init__(self, csv_path: os.PathLike, *, fields: Optional[list] = None, count: Optional[int] = None, offset: Optional[int] = None, batch_size: Optional[int] = None):
"""
Creates a new TimeSeriesStream object
Args:
csv_path (os.Pathlike): Path to the csv file.
fields (Optional[list[str]]): List of required column headers. Will use all columns when None.
"""
self.csv_path = Path(csv_path)
if fields is None:
self.fields = []
else:
self.fields = fields
with open(self.csv_path, "r") as file:
self.header = file.readline().strip().split(",")
if any(key for key in self.fields if key not in self.header):
raise KeyError("The CSV file must contain variable names in the first row.")
if self.fields == []:
self.fields = self.header
fields_len = len(self.fields)
filtered_len = len(list(k for k in self.fields if re.match("[a-zA-Z]+", k)))
if filtered_len != fields_len:
raise KeyError("Column headers should start with a letter.")
if (not isinstance(count, int)) or count < 0:
self.count = 0
else:
self.count = count
if (not isinstance(offset, int)) or offset < 0:
self.offset = 0
else:
self.offset = offset
if (not isinstance(batch_size, int)) or batch_size < 0:
self.batch_size = 0
else:
self.batch_size = batch_size
def _read_value(self, value):
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return value
def _read_csv(self):
with open(self.csv_path, 'r', encoding='UTF-8') as csv_file:
csv_reader = csv.DictReader(csv_file)
for line in csv_reader:
if 0 < len(self.fields):
key_holder = self.fields
else:
key_holder = line.keys()
result = {}
for k in key_holder:
result[k] = self._read_value(line[k])
yield result
def _limit(self):
_max = float('inf')
if 0 < self.count:
_max = self.offset + self.count
counter = 0
for line in self._read_csv():
if _max <= counter:
break
counter += 1
if counter <= self.offset:
continue
yield line
def _batch(self):
aggregate = []
for line in self._limit():
if len(aggregate) < self.batch_size:
aggregate += [line]
continue
yield aggregate
aggregate = [line]
if self.batch_size == len(aggregate):
yield aggregate
else:
_logger.warning(f"WARNING! The length of the given dataset is not divisible by {self.batch_size}. There are {len(aggregate)} inputs remaining in the buffer.")
def __iter__(self):
"""
Creates the input data generator.
Returns: a generator
"""
if 0 < self.batch_size:
return self._batch()
else:
return self._limit()
__init__(csv_path, *, fields=None, count=None, offset=None, batch_size=None) ¶
Creates a new TimeSeriesStream object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
csv_path | Pathlike | Path to the csv file. | required |
fields | Optional[list[str]] | List of required column headers. Will use all columns when None. | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.py
def __init__(self, csv_path: os.PathLike, *, fields: Optional[list] = None, count: Optional[int] = None, offset: Optional[int] = None, batch_size: Optional[int] = None):
"""
Creates a new TimeSeriesStream object
Args:
csv_path (os.Pathlike): Path to the csv file.
fields (Optional[list[str]]): List of required column headers. Will use all columns when None.
"""
self.csv_path = Path(csv_path)
if fields is None:
self.fields = []
else:
self.fields = fields
with open(self.csv_path, "r") as file:
self.header = file.readline().strip().split(",")
if any(key for key in self.fields if key not in self.header):
raise KeyError("The CSV file must contain variable names in the first row.")
if self.fields == []:
self.fields = self.header
fields_len = len(self.fields)
filtered_len = len(list(k for k in self.fields if re.match("[a-zA-Z]+", k)))
if filtered_len != fields_len:
raise KeyError("Column headers should start with a letter.")
if (not isinstance(count, int)) or count < 0:
self.count = 0
else:
self.count = count
if (not isinstance(offset, int)) or offset < 0:
self.offset = 0
else:
self.offset = offset
if (not isinstance(batch_size, int)) or batch_size < 0:
self.batch_size = 0
else:
self.batch_size = batch_size
__iter__() ¶
Creates the input data generator.
Returns: a generatorSource code in
docs/industrial-ai-suite/sdk/simaticai/testing/timeseries_stream.pydef __iter__(self):
"""
Creates the input data generator.
Returns: a generator
"""
if 0 < self.batch_size:
return self._batch()
else:
return self._limit()
This module contains the VCAStream class, which generates image data streams from a specified folder of images. The images are converted into the ImageSet format, suitable for use on testing purposes.
The VCAStream class inherits from DataStream and provides an iterator that yields ImageSet objects containing images in the specified format. vca_stream = VCAStream(data='/path/to/image/folder', variable_name='vision_payload', image_format='BayerRG8') with LocalPipelineRunner("MyPipeline-edge_1.zip") as runner: runner.run_pipeline(vca_stream)Example usage
VCAStream ¶
Bases: DataStream
This class creates a generator from a folder of images.
The generate function returns a generator that walks over the image folder and converts each image into the specified format, BayerRG8 by default. The resulting object is in the ImageSet format, as if it were received from AI Inference Server.Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.pyclass VCAStream(DataStream):
"""
This class creates a generator from a folder of images.
The generate function returns a generator that walks over the image folder and converts
each image into the specified format, BayerRG8 by default. The resulting object is in the
ImageSet format, as if it were received from AI Inference Server.
"""
def __init__(self, data: os.PathLike, variable_name: str = 'vision_payload', image_format: str | ImageFormat = 'BayerRG8', filter: str | None = None):
"""
Creates a new VCAStream object
Args:
data (os.Pathlike): Path to the directory of images
variable_name (str): Name of the variable to store the images (default: 'vision_payload')
image_format (str): Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default),
'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed'
filter (rglob_pattern): Pattern to filter the images (see also: pathlib.rglob())
"""
self.seq = 0
self.data = data
if filter is None or "" == filter.strip():
self.filter = "**/*.[jJpP][pPnN][gGeE]*"
else:
self.filter = filter
if variable_name is None or "" == variable_name.strip():
self.variable_name = 'vision_payload'
else:
self.variable_name = variable_name
if image_format is None:
self.image_format = ImageFormat.BayerRG8
else:
try:
self.image_format = ImageDetails._parse_image_format(image_format)
except ValueError as err:
raise AssertionError(str(err))
self.camera_id = uuid.uuid4()
self._iter_image_paths: list[Path] = []
self._iter_current_index: int = 0
def __iter__(self):
"""
Initializes the iterator by collecting all image paths and resetting the index.
Returns: self (the iterator object)
"""
self._iter_image_paths = list(Path(self.data).rglob(self.filter))
self._iter_current_index = 0
return self
def __next__(self):
"""
Returns the next ImageSet from the image folder.
Raises:
StopIteration: When there are no more images to process.
Returns: A dictionary containing the ImageSet variable.
"""
if len(self._iter_image_paths) == 0:
self.__iter__()
if self._iter_current_index >= len(self._iter_image_paths):
raise StopIteration
image_path = self._iter_image_paths[self._iter_current_index]
self._iter_current_index += 1
return self._create_imageset(image_path)
def _create_imageset(self, image_path):
image_detail = ImageDetails.from_image(image_path=image_path, format=self.image_format)
image_detail.id = f'VCA Stream : {image_detail.id}'
image_detail.metadata = {"ptpstatus": "Disabled", "ptptimestamp": "0"}
image_detail.seq = self.seq
self.seq += 1
imageset = ImageSet(
version='1',
cameraid=str(self.camera_id),
timestamp=image_detail.timestamp,
)
imageset.add_image(image_detail)
result = {self.variable_name: imageset.to_dict()}
return result
__init__(data, variable_name='vision_payload', image_format='BayerRG8', filter=None) ¶
Creates a new VCAStream object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | Pathlike | Path to the directory of images | required |
variable_name | str | Name of the variable to store the images (default: 'vision_payload') | 'vision_payload' |
image_format | str | Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default), 'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed' | 'BayerRG8' |
filter | rglob_pattern | Pattern to filter the images (see also: pathlib.rglob()) | None |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
def __init__(self, data: os.PathLike, variable_name: str = 'vision_payload', image_format: str | ImageFormat = 'BayerRG8', filter: str | None = None):
"""
Creates a new VCAStream object
Args:
data (os.Pathlike): Path to the directory of images
variable_name (str): Name of the variable to store the images (default: 'vision_payload')
image_format (str): Supported image formats: 'BGR' (equivalently: 'BGR8'), 'RGB' (equivalently: 'RGB8'), 'BayerRG8' (default),
'BayerGR8', 'BayerBG8', 'BayerGB8', 'Mono8', 'YUV422Packed', 'YUV422_YUYV_Packed'
filter (rglob_pattern): Pattern to filter the images (see also: pathlib.rglob())
"""
self.seq = 0
self.data = data
if filter is None or "" == filter.strip():
self.filter = "**/*.[jJpP][pPnN][gGeE]*"
else:
self.filter = filter
if variable_name is None or "" == variable_name.strip():
self.variable_name = 'vision_payload'
else:
self.variable_name = variable_name
if image_format is None:
self.image_format = ImageFormat.BayerRG8
else:
try:
self.image_format = ImageDetails._parse_image_format(image_format)
except ValueError as err:
raise AssertionError(str(err))
self.camera_id = uuid.uuid4()
self._iter_image_paths: list[Path] = []
self._iter_current_index: int = 0
__iter__() ¶
Initializes the iterator by collecting all image paths and resetting the index.
Returns: self (the iterator object)Source code in
docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.pydef __iter__(self):
"""
Initializes the iterator by collecting all image paths and resetting the index.
Returns: self (the iterator object)
"""
self._iter_image_paths = list(Path(self.data).rglob(self.filter))
self._iter_current_index = 0
return self
__next__() ¶
Returns the next ImageSet from the image folder.
Raises:
| Type | Description |
|---|---|
StopIteration | When there are no more images to process. |
Source code in docs/industrial-ai-suite/sdk/simaticai/testing/vca_stream.py
def __next__(self):
"""
Returns the next ImageSet from the image folder.
Raises:
StopIteration: When there are no more images to process.
Returns: A dictionary containing the ImageSet variable.
"""
if len(self._iter_image_paths) == 0:
self.__iter__()
if self._iter_current_index >= len(self._iter_image_paths):
raise StopIteration
image_path = self._iter_image_paths[self._iter_current_index]
self._iter_current_index += 1
return self._create_imageset(image_path)