Source code for grutopia.core.datahub.isaac_data
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from grutopia.core.util import log
class MetaActionData(BaseModel):
"""
action status in grutopia
"""
controller: str
data: Any
class ActionData(BaseModel):
"""
action status in grutopia
"""
robot: str
controllers: List[MetaActionData]
class _IsaacData(BaseModel):
"""
isaac status in grutopia
"""
actions: Optional[Dict[str, Dict[str, Any]]] = {}
obs: Optional[Dict[str, Dict[str, Any]]] = {}
task_idx_counter: Optional[int] = 0
finished_tasks: Optional[List[str]] = []
[docs]class IsaacData:
"""
isaac status in grutopia
There are three types of isaac status:
* Action
* Observation
* Episode
structure of isaac status like this::
{
actions: {
"task_0":{
robot_1: {
cap: param,
controller: param,
}
}
},
observations: {
"task_0":{
robot_1: {
obs_1: data,
obs_2: data
}
}
},
task_idx_counter: 0
finished_tasks: []
}
"""
data: _IsaacData = _IsaacData(actions={}, obs={})
def __init__(self) -> None:
pass
@classmethod
def get_all(cls) -> _IsaacData:
return cls.data
# Observation
[docs] @classmethod
def set_obs_data(cls, obs: Dict[str, Dict[str, Any]]) -> None:
"""
Set isaac observations data
Args:
obs (Dict[str, Dict[str, Any]]): obs data with task_name key.
"""
for task_id, obs_data in obs.items():
cls.data.obs[task_id] = obs_data
[docs] @classmethod
def set_obs_data_by_task_name(cls, task_name: str, obs: Dict[str, Dict[str, Any]]) -> None:
"""
Set isaac observations data by task name
Args:
task_name: isaac task name
obs (Dict[str, Dict[str, Any]]): obs data with robot_name key.
"""
for robot_name, obs_data in obs.items():
cls.data.obs[task_name][robot_name] = obs_data
[docs] @classmethod
def set_obs_by_task_name_and_robot_name(
cls, task_name: str, robot_name: str, obs: Dict[str, Dict[str, Any]]
) -> None:
"""
Set isaac observation by task name and robot name
Args:
task_name: isaac task name
robot_name: isaac robot name
obs (Dict[str, Dict[str, Any]]): obs data with task_name key.
"""
cls.data.obs[task_name][robot_name] = obs
[docs] @classmethod
def get_obs(cls) -> Dict[str, Dict]:
"""
Get isaac observation by task name
Args:
task_name: isaac task name
Returns:
isaac observation data
"""
return cls.data.obs
[docs] @classmethod
def get_obs_by_task_name(cls, task_name: str) -> Dict[str, Dict]:
"""
Get isaac observation by task name
Args:
task_name: isaac task name
Returns:
isaac observation data
"""
if task_name in cls.data.obs:
return cls.data.obs[task_name]
return {}
[docs] @classmethod
def get_obs_by_task_name_and_robot_name(cls, task_name: str, robot_name: str) -> Dict[str, Dict]:
"""
Get isaac observation by task name and robot name
Args:
task_name: isaac task name
robot_name: isaac robot name
Returns:
isaac observation data
"""
if task_name in cls.data.obs:
if robot_name in cls.data.obs[task_name]:
return cls.data.obs[task_name][robot_name]
return {}
# Action
[docs] @classmethod
def set_actions(cls, actions: Dict[str, ActionData]) -> None:
"""
Set action by task_name
Args:
actions (Dict[str, ActionData]): action data with task_name key
"""
for task_name, action in actions.items():
if cls.data.actions.get(task_name) is None:
cls.data.actions[task_name] = {}
cls.data.actions[task_name].update(action.model_dump())
[docs] @classmethod
def get_action_by_task_name(cls, task_name: str) -> Dict[str, Dict]:
"""
Get action by task name
Returns:
action(dict like {robot_name: {controller_name: param}})
"""
if task_name in cls.data.actions:
return cls.data.actions[task_name]
return {}
[docs] @classmethod
def get_action_by_task_name_and_robot_name(cls, task_name: str, robot_name: str) -> Dict[str, Dict]:
"""
Get action by task name and robot name
Returns:
action(dict like {robot_name: {controller_name: param}})
"""
if task_name in cls.data.actions:
if robot_name in cls.data.actions[task_name]:
return cls.data.actions[task_name][robot_name]
return {}
[docs] @classmethod
def gen_task_idx(cls) -> str:
"""
Generate a id for isaac task
Returns:
task id
"""
task_idx = cls.data.task_idx_counter
cls.data.task_idx_counter += 1
return str(task_idx)
[docs] @classmethod
def set_episode_finished(cls, task_name: str):
"""
Set isaac task {task_name} finished
Args:
task_name (str): isaac task_name
"""
if task_name not in cls.data.finished_tasks:
cls.data.finished_tasks.append(task_name)
log.info(f'Set isaac task {task_name} finished')
return
@classmethod
def get_episode_finished(cls, task_name: str) -> bool:
return task_name in cls.data.finished_tasks