Source code for grutopia.core.task.task
# import random
import traceback
from abc import ABC, abstractmethod
from functools import wraps
from typing import Any, Dict
from omni.isaac.core.scenes.scene import Scene
from omni.isaac.core.tasks import BaseTask as OmniBaseTask
from omni.isaac.core.utils.prims import create_prim
from grutopia.core.config import TaskUserConfig
from grutopia.core.robot import init_robots
from grutopia.core.scene import create_object, create_scene
from grutopia.core.task.metric import BaseMetric, create_metric
from grutopia.core.util import log
[docs]class BaseTask(OmniBaseTask, ABC):
"""
wrap of omniverse isaac sim's base task
* enable register for auto register task
* contains robots
"""
tasks = {}
def __init__(self, config: TaskUserConfig, scene: Scene):
self.objects = None
self.robots = None
name = config.name
offset = config.offset
super().__init__(name=name, offset=offset)
self._scene = scene
self.config = config
self.metrics: dict[str, BaseMetric] = {}
self.steps = 0
self.work = True
for metric_config in config.metrics:
self.metrics[metric_config.name] = create_metric(metric_config)
def load(self):
if self.config.scene_asset_path is not None:
source, prim_path = create_scene(self.config.scene_asset_path,
prim_path_root=f'World/env_{self.config.env_id}/scene')
create_prim(prim_path,
usd_path=source,
scale=self.config.scene_scale,
translation=[self.config.offset[idx] + i for idx, i in enumerate(self.config.scene_position)])
self.robots = init_robots(self.config, self._scene)
self.objects = {}
for obj in self.config.objects:
_object = create_object(obj)
_object.set_up_scene(self._scene)
self.objects[obj.name] = _object
log.info(self.robots)
log.info(self.objects)
[docs] def get_observations(self) -> Dict[str, Any]:
"""
Returns current observations from the objects needed for the behavioral layer.
Return:
Dict[str, Any]: observation of robots in this task
"""
if not self.work:
return {}
obs = {}
for robot_name, robot in self.robots.items():
try:
obs[robot_name] = robot.get_obs()
except Exception as e:
log.error(self.name)
log.error(e)
traceback.print_exc()
return {}
return obs
def update_metrics(self):
for _, metric in self.metrics.items():
metric.update()
[docs] def calculate_metrics(self) -> dict:
metrics_res = {}
for name, metric in self.metrics.items():
metrics_res[name] = metric.calc()
return metrics_res
[docs] @abstractmethod
def is_done(self) -> bool:
"""
Returns True of the task is done.
Raises:
NotImplementedError: this must be overridden.
"""
raise NotImplementedError
[docs] def individual_reset(self):
"""
reload this task individually without reloading whole world.
"""
raise NotImplementedError
[docs] def pre_step(self, time_step_index: int, simulation_time: float) -> None:
"""called before stepping the physics simulation.
Args:
time_step_index (int): [description]
simulation_time (float): [description]
"""
self.steps += 1
return
[docs] def post_reset(self) -> None:
"""Calls while doing a .reset() on the world."""
self.steps = 0
for robot in self.robots.values():
robot.post_reset()
return
[docs] @classmethod
def register(cls, name: str):
"""
Register a task with its name(decorator).
Args:
name(str): name of the task
"""
def decorator(tasks_class):
cls.tasks[name] = tasks_class
@wraps(tasks_class)
def wrapped_function(*args, **kwargs):
return tasks_class(*args, **kwargs)
return wrapped_function
return decorator
def create_task(config: TaskUserConfig, scene: Scene):
task_cls = BaseTask.tasks[config.type]
return task_cls(config, scene)