Shortcuts

Source code for grutopia.core.runner

from typing import List

# import numpy as np
from omni.isaac.core import World
from omni.isaac.core.prims.xform_prim import XFormPrim
from omni.isaac.core.utils.stage import add_reference_to_stage  # noqa F401
from omni.physx.scripts import utils
from pxr import Usd  # noqa

# Init
from grutopia.core.config import SimulatorConfig, TaskUserConfig
from grutopia.core.register import import_all_modules_for_register
from grutopia.core.scene import delete_prim_in_stage  # noqa F401
from grutopia.core.scene import create_object, create_scene  # noqa F401
from grutopia.core.task.task import BaseTask, create_task
from grutopia.core.util import log
from grutopia.npc import NPC


[docs]class SimulatorRunner: def __init__(self, config: SimulatorConfig): import_all_modules_for_register() self._simulator_config = config.config physics_dt = self._simulator_config.simulator.physics_dt if self._simulator_config.simulator.physics_dt is not None else None rendering_dt = self._simulator_config.simulator.rendering_dt if self._simulator_config.simulator.rendering_dt is not None else None physics_dt = eval(physics_dt) if isinstance(physics_dt, str) else physics_dt rendering_dt = eval(rendering_dt) if isinstance(rendering_dt, str) else rendering_dt self.dt = physics_dt log.debug(f'Simulator physics dt: {self.dt}') self._world = World(physics_dt=self.dt, rendering_dt=rendering_dt, stage_units_in_meters=1.0) self._scene = self._world.scene self._stage = self._world.stage # setup scene prim_path = '/' if self._simulator_config.env_set.bg_type is None: self._scene.add_default_ground_plane() elif self._simulator_config.env_set.bg_type != 'default': source, prim_path = create_scene(self._simulator_config.env_set.bg_path, prim_path_root='background') add_reference_to_stage(source, prim_path) self.npc: List[NPC] = [] for npc_config in config.config.npc: self.npc.append(NPC(npc_config)) self.render_interval = self._simulator_config.simulator.rendering_interval if self._simulator_config.simulator.rendering_interval is not None else 5 log.info(f'rendering interval: {self.render_interval}') self.render_trigger = 0 @property def current_tasks(self) -> dict[str, BaseTask]: return self._world._current_tasks def _warm_up(self, steps=10, render=True): for _ in range(steps): self._world.step(render=render) def add_tasks(self, configs: List[TaskUserConfig]): for config in configs: task = create_task(config, self._scene) self._world.add_task(task) self._world.reset() self._warm_up() def step(self, actions: dict, render: bool = True): for task_name, action_dict in actions.items(): task = self.current_tasks.get(task_name) for name, action in action_dict.items(): if name in task.robots: task.robots[name].apply_action(action) self.render_trigger += 1 render = render and self.render_trigger > self.render_interval if self.render_trigger > self.render_interval: self.render_trigger = 0 self._world.step(render=render) obs = self.get_obs() for npc in self.npc: try: npc.feed(obs) except Exception as e: log.error(f'fail to feed npc {npc.name} with obs: {e}') if render: return obs def get_obs(self): obs = {} for task_name, task in self.current_tasks.items(): obs[task_name] = task.get_observations() return obs def get_current_time_step_index(self) -> int: return self._world.current_time_step_index def reset(self, tasks: List[str] = None): if tasks is None: self._world.reset() return for task in tasks: self.current_tasks[task].individual_reset() def get_obj(self, name: str) -> XFormPrim: return self._world.scene.get_object(name) def remove_collider(self, prim_path: str): build = self._world.stage.GetPrimAtPath(prim_path) if build.IsValid(): utils.removeCollider(build) def add_collider(self, prim_path: str): build = self._world.stage.GetPrimAtPath(prim_path) if build.IsValid(): utils.setCollider(build, approximationShape=None)