• Docs >
  • How to Add Custom Task
Shortcuts

How to Add Custom Task

This tutorial will show you how to add a custom task

1. Defining a New Task

Before adding a new task, we need to clarify the following points:

  • Task Naming: What will the task be called?

  • Task Objective: What specific Objective will the task achieve?

  • Task Termination: Will the task end, and how will we determine that?

  • Metrics Calculation: What metrics need to be calculated for the task?

Here’s how we define our SocialNavigationTask based on the above points:

  • Name: FiniteStepTask

  • Objective: Demo

  • Termination:

    • The Task will end.

    • End Criteria: The task will conclude either after finite steps.

  • Metrics Calculation:

    • Record the total distance a robot moves from start to finish.

    • (Additional metrics as needed.)

To add a custom task, you need to:

  • Create a config class for task config, inheriting from the grutopia.core.config.task.TaskCfg.

  • Create a class for task, inheriting from the grutopia.core.task.BaseTask.

To add a custom metric, you need to:

  • Create a config class for metric config, inheriting from the grutopia.core.config.metric.MetricUserConfig.

  • Create a class for metric, inheriting from the grutopia.core.task.metric.BaseMetric.

2. Create Task Config Class

Here’s an example of a config class for a task:

from typing import List, Optional

from grutopia.core.config import EpisodeCfg
from grutopia.core.config.task import TaskCfg


class FiniteStepTaskEpisodeCfg(EpisodeCfg):
  """
  Configuration for a finite step task episode.

  If necessary, you can add what you want to add to the episode as needed.
  """
  pass


class FiniteStepTaskCfg(TaskCfg):
  type: Optional[str] = 'FiniteStepTask'
  episodes: List[FiniteStepTaskEpisodeCfg]

3. Create Task Class

from omni.isaac.core.scenes import Scene

from grutopia.core.runtime.task_runtime import TaskRuntime
from grutopia.core.task import BaseTask


@BaseTask.register('FiniteStepTask')
class FiniteStepTask(BaseTask):
    def __init__(self, runtime: TaskRuntime, scene: Scene):
        super().__init__(runtime, scene)
        self.stop_count = 0
        self.max_step = 500  # default max_step
        # ======= Validate task setting =======
        if not runtime.task_settings:
            pass
        elif not isinstance(runtime.task_settings, dict):
            raise ValueError('task_settings of FiniteStepTask must be a dict')
        if 'max_step' in runtime.task_settings:
            self.max_step = runtime.task_settings['max_step']
        # ======= Validate task setting =======

    def is_done(self) -> bool:
        self.stop_count += 1
        return self.stop_count > self.max_step
  • The is_done method has been overridden based on the End Criteria defined above.

  • This task is ready for use, but it will not output any results by itself. To make it work in practice, you must define Metrics and configure them.

4. Create Metrics Config Class

Here’s an example of a config class for a metric:

# This is also the simplest configuration.
from typing import Optional

from grutopia.core.config.metric import MetricUserConfig


class SimpleMetricCfg(MetricUserConfig):
    type: Optional[str] = 'SimpleMetric'

5. Create Metrics Class

In this doc, we demonstrate a simple metrics used to track the total distance a robot moves.

import numpy as np
from pydantic import BaseModel

from grutopia.core.config.metric import MetricCfg
from grutopia.core.runtime.task_runtime import TaskRuntime
from grutopia.core.task.metric import BaseMetric
from grutopia.core.util import log


class SimpleMetricParam(BaseModel):
    robot_name: str


@BaseMetric.register('SimpleMetric')
class SimpleMetric(BaseMetric):
    """
    Calculate the total distance a robot moves
    """

    def __init__(self, config: MetricCfg, task_runtime: TaskRuntime):
        super().__init__(config, task_runtime)
        self.distance: float = 0.0
        self.position = None
        self.param = SimpleMetricParam(**config.metric_config)
        _robot_name = self.param.robot_name
        self.robot_name = (
            _robot_name + '_' + str(self.task_runtime.env.env_id)
        )  # real robot name in isaac sim: {robot_name}_{env_id}

    def reset(self):
        self.distance = 0.0

    def update(self, task_obs: dict):
        """
        This function is called at each world step.
        """
        if self.position is None:
            self.position = task_obs[self.robot_name]['position']
            return
        self.distance += np.linalg.norm(self.position - task_obs[self.robot_name]['position'])
        self.position = task_obs[self.robot_name]['position']
        return

    def calc(self):
        """
        This function is called to calculate the metrics when the episode is terminated.
        """
        log.info('SimpleMetric calc() called.')
        return self.distance
  • The update method will be invoked after every step.

  • The calc method will be invoked at the end of an episode.

  • The reset method be invoked when an episode is reset. Reset within one episode is not supported yet, so this method won’t be used at the time.

6. Task Usage Preview

To use the custom task and metrics, you can simply include them in the configuration settings as follows

...
config = Config(
    simulator=SimConfig(physics_dt=1 / 240, rendering_dt=1 / 240, use_fabric=False),
    task_config=FiniteStepTaskCfg(
        task_settings={'max_step': 300},
        metrics=[
            SimpleMetricCfg(metric_config={'robot_name': 'h1'}),
            # Add more metrics here.
        ],
        metrics_save_path='./h1_simple_metric.jsonl',
        episodes=[
            ...
        ],
    ),
)

sim_runtime = SimulatorRuntime(config_class=config)

import_extensions()
# import custom extensions here.

env = Env(sim_runtime)
...

You can also run the h1_traveled_distance.py in the demo directly.

And you can check result in ./h1_simple_metric.jsonl

{"SimpleMetric": 0.7508679775492055, "normally_end": true}