Shortcuts

grutopia.core.env

env

class grutopia.core.gym_env.Env(simulator_runtime: SimulatorRuntime)[source]

Gym Env for a single environment with a single learning agent.

close()[source]

close the environment

finished() bool[source]

check if all tasks are finished

get_dt()[source]

Get dt of simulation environment. :returns: dt.

get_observations() dict[Any, Any] | Any[source]

Get observations from Isaac environment

Returns:

observation

Return type:

observation (gym.Space)

render(mode='human')[source]

Compute the render frames as specified by render_mode during the initialization of the environment.

The environment’s metadata render modes (env.metadata[“render_modes”]) should contain the possible ways to implement the render modes. In addition, list versions for most render modes is achieved through gymnasium.make which automatically applies a wrapper to collect rendered frames.

Note

As the render_mode is known during __init__, the objects used to render the environment state should be initialised in __init__.

By convention, if the render_mode is:

  • None (default): no render is computed.

  • “human”: The environment is continuously rendered in the current display or terminal, usually for human consumption. This rendering should occur during step() and render() doesn’t need to be called. Returns None.

  • “rgb_array”: Return a single frame representing the current state of the environment. A frame is a np.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image.

  • “ansi”: Return a strings (str) or StringIO.StringIO containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors).

  • “rgb_array_list” and “ansi_list”: List based version of render modes are possible (except Human) through the wrapper, gymnasium.wrappers.RenderCollection that is automatically applied during gymnasium.make(..., render_mode="rgb_array_list"). The frames collected are popped after render() is called or reset().

Note

Make sure that your class’s metadata "render_modes" key includes the list of supported modes.

Changed in version 0.25.0: The render function was changed to no longer accept parameters, rather these parameters should be specified in the environment initialised, i.e., gymnasium.make("CartPole-v1", render_mode="human")

reset(*, seed=None, options=None) tuple[gymnasium.spaces.space.Space, dict[str, Any]][source]

Resets the environment to an initial internal state, returning an initial observation and info.

Parameters:
  • seed (optional int) – The seed that is used to initialize the environment’s PRNG (np_random).

  • options (optional dict) – Additional information to specify how the environment is reset (optional, depending on the specific environment)

Returns:

Observation of the initial state. info (dictionary): Contains the key task_runtime if there is an unfinished task

Return type:

observation (ObsType)

property simulation_app

simulation app instance

property simulation_runtime

config of simulation environment

step(action: Any) tuple[Any, float, bool, bool, dict[str, Any]][source]

run step with given action(with isaac step)

TODO: Implement the conversion between dict and action space/obs space

Parameters:

action (Any) – an action provided by the agent to update the environment state.

Returns:

An element of the environment’s observation_space as the next observation due to the agent actions. reward (float): The reward as a result of taking the action. terminated (bool): Whether the agent reaches the terminal state. If true, the user needs to call reset(). truncated (bool): Whether the truncation condition outside the scope of the MDP is satisfied.

Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds. Can be used to end the episode prematurely before a terminal state is reached. If true, the user needs to call reset().

info (dict): Contains auxiliary diagnostic information (helpful for debugging, learning, and logging).

Currently, it contains nothing.

Return type:

observation (Any)

runner

class grutopia.core.runner.SimulatorRunner(simulator_runtime: SimulatorRuntime)[source]
clear_single_task(task_name: str)[source]

Clear single task with task_name

Parameters:

task_name (str) – Task name to clear.

get_obs() Dict[source]

Get obs :returns: obs from isaac sim. :rtype: Dict

next_episode(task_name: str | None = None) TaskRuntime | None[source]

Switch to the next episode.

This method cleanups a finished task specified by task name and then switches to the next task if exists.

Parameters:

task_name (Optional[str]) – The task name of the finished task.

Returns:

new task runtime.

Return type:

TaskRuntime

Raises:

RuntimeError – If the specified task_name is not found in the current tasks.

reset(task: str | None = None) Tuple[Dict, TaskRuntime | None][source]

Reset the task.

Parameters:
  • task (str) – A task name to reset. if task is None, it always means the reset is invoked for the first time

  • step(). (before agents invoke) –

Returns:

A tuple of two values. The first is a dict of observations. The second is a TaskRuntime object representing the new task runtime.

Return type:

Tuple[Dict, TaskRuntime]

step(actions: Dict | None = None, render: bool = True) Tuple[Dict, Dict[str, bool], Dict[str, float]][source]

Step function to advance the simulation environment by one time step.

This method processes actions for active tasks, steps the simulation world, collects observations, updates metrics, and determines task terminations. It also handles rendering based on specified intervals.

Parameters:
  • actions (Union[Dict, None], optional) – A dictionary mapping task names to another dictionary of robot names and their respective actions. If None, no actions are applied. Defaults to None.

  • render (bool, optional) – Flag indicating whether to render the simulation at this step. True triggers rendering if the render interval is met. Defaults to True.

Returns:

  • obs (Dict): A dictionary containing observations for each task, further divided by robot names and their observation data.

  • terminated_status (Dict[str, bool]): A dictionary mapping task names to boolean values indicating whether the task has terminated.

  • reward (Dict[str, float]): A dictionary that would contain rewards for each task or robot; however, the actual return and computation of rewards is not shown in the provided code snippet.

Return type:

Tuple[Dict, Dict[str, bool], Dict[str, float]]

Raises:

Exception – If an error occurs when applying an action to a robot, the exception is logged and re-raised, providing context about the task, robot, and current tasks state.

Notes

  • The _world.step() method advances the simulation, optionally rendering the environment based on the render flag and the render interval.

  • get_obs() is a method to collect observations from the simulation world, though its implementation details are not shown.

  • Metrics for each task are updated, and upon task completion, results are saved to a JSON file. This includes a flag ‘normally_end’ set to True, which seems to indicate normal termination of the task.

  • The function also manages a mechanism to prevent further action application and metric updates for tasks that have been marked as finished.

Caution

The snippet contains a TODO comment suggesting there’s an aspect requiring attention related to “Key optimization interval,” which isn’t addressed in the docstring or the code shown.

stop()[source]

Stop all current operations and clean up the World