Source code for airsim.types

from __future__ import print_function
import msgpackrpc  # install as admin: pip install msgpack-rpc-python
import numpy as np  # pip install numpy
import math


[docs]class MsgpackMixin: attribute_order = [] def __repr__(self): from pprint import pformat return "<" + type(self).__name__ + "> " + pformat(vars(self), indent=4, width=1)
[docs] def to_msgpack(self, *args, **kwargs): encoded = [] for attr_name, attr_type in self.attribute_order: value = getattr(self, attr_name) if isinstance(value, list) and value and isinstance(value[0], MsgpackMixin): encoded.append([v.to_msgpack() for v in value]) elif isinstance(value, MsgpackMixin): encoded.append(value.to_msgpack()) else: encoded.append(value) return encoded
[docs] @classmethod def from_msgpack(cls, encoded): obj = cls() if len(encoded) != len(cls.attribute_order): raise ValueError("Length of encoded data does not match number of attributes") for index, (attr_name, attr_type) in enumerate(cls.attribute_order): value = encoded[index] if issubclass(attr_type, MsgpackMixin): value = attr_type.from_msgpack(value) setattr(obj, attr_name, value) return obj
class _ImageType(type): @property def Scene(cls): return 0 def DepthPlanar(cls): return 1 def DepthPerspective(cls): return 2 def DepthVis(cls): return 3 def DisparityNormalized(cls): return 4 def Segmentation(cls): return 5 def SurfaceNormals(cls): return 6 def Infrared(cls): return 7 def OpticalFlow(cls): return 8 def OpticalFlowVis(cls): return 9 def __getattr__(self, key): if key == 'DepthPlanner': print( '\033[31m' + "DepthPlanner has been (correctly) renamed to DepthPlanar. Please use ImageType.DepthPlanar instead." + '\033[0m') raise AttributeError
[docs]class ImageType(metaclass=_ImageType): Scene = 0 DepthPlanar = 1 DepthPerspective = 2 DepthVis = 3 DisparityNormalized = 4 Segmentation = 5 SurfaceNormals = 6 Infrared = 7 OpticalFlow = 8 OpticalFlowVis = 9
[docs]class DrivetrainType: MaxDegreeOfFreedom = 0 ForwardOnly = 1
[docs]class LandedState: Landed = 0 Flying = 1
[docs]class WeatherParameter: Rain = 0 Roadwetness = 1 Snow = 2 RoadSnow = 3 MapleLeaf = 4 RoadLeaf = 5 Dust = 6 Fog = 7 Enabled = 8
[docs]class Vector2r(MsgpackMixin): x_val = 0.0 y_val = 0.0 attribute_order = [ ('x_val', float), ('y_val', float) ] def __init__(self, x_val=0.0, y_val=0.0): self.x_val = x_val self.y_val = y_val
[docs]class Vector3r(MsgpackMixin): x_val = 0.0 y_val = 0.0 z_val = 0.0 attribute_order = [ ('x_val', float), ('y_val', float), ('z_val', float) ] def __init__(self, x_val=0.0, y_val=0.0, z_val=0.0): self.x_val = x_val self.y_val = y_val self.z_val = z_val
[docs] @staticmethod def nanVector3r(): return Vector3r(np.nan, np.nan, np.nan)
[docs] def containsNan(self): return (math.isnan(self.x_val) or math.isnan(self.y_val) or math.isnan(self.z_val))
def __add__(self, other): return Vector3r(self.x_val + other.x_val, self.y_val + other.y_val, self.z_val + other.z_val) def __sub__(self, other): return Vector3r(self.x_val - other.x_val, self.y_val - other.y_val, self.z_val - other.z_val) def __truediv__(self, other): if type(other) in [int, float] + np.sctypes['int'] + np.sctypes['uint'] + np.sctypes['float']: return Vector3r(self.x_val / other, self.y_val / other, self.z_val / other) else: raise TypeError('unsupported operand type(s) for /: %s and %s' % (str(type(self)), str(type(other)))) def __mul__(self, other): if type(other) in [int, float] + np.sctypes['int'] + np.sctypes['uint'] + np.sctypes['float']: return Vector3r(self.x_val * other, self.y_val * other, self.z_val * other) else: raise TypeError('unsupported operand type(s) for *: %s and %s' % (str(type(self)), str(type(other))))
[docs] def dot(self, other): if type(self) == type(other): return self.x_val * other.x_val + self.y_val * other.y_val + self.z_val * other.z_val else: raise TypeError('unsupported operand type(s) for \'dot\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def cross(self, other): if type(self) == type(other): cross_product = np.cross(self.to_numpy_array(), other.to_numpy_array()) return Vector3r(cross_product[0], cross_product[1], cross_product[2]) else: raise TypeError( 'unsupported operand type(s) for \'cross\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def get_length(self): return (self.x_val ** 2 + self.y_val ** 2 + self.z_val ** 2) ** 0.5
[docs] def distance_to(self, other): return ((self.x_val - other.x_val) ** 2 + (self.y_val - other.y_val) ** 2 + ( self.z_val - other.z_val) ** 2) ** 0.5
[docs] def to_Quaternionr(self): return Quaternionr(self.x_val, self.y_val, self.z_val, 0)
[docs] def to_numpy_array(self): return np.array([self.x_val, self.y_val, self.z_val], dtype=np.float32)
def __iter__(self): return iter((self.x_val, self.y_val, self.z_val))
[docs]class Quaternionr(MsgpackMixin): w_val = 0.0 x_val = 0.0 y_val = 0.0 z_val = 0.0 attribute_order = [ ('w_val', float), ('x_val', float), ('y_val', float), ('z_val', float) ] def __init__(self, x_val=0.0, y_val=0.0, z_val=0.0, w_val=1.0): self.x_val = x_val self.y_val = y_val self.z_val = z_val self.w_val = w_val
[docs] @staticmethod def nanQuaternionr(): return Quaternionr(np.nan, np.nan, np.nan, np.nan)
[docs] def containsNan(self): return (math.isnan(self.w_val) or math.isnan(self.x_val) or math.isnan(self.y_val) or math.isnan(self.z_val))
def __add__(self, other): if type(self) == type(other): return Quaternionr(self.x_val + other.x_val, self.y_val + other.y_val, self.z_val + other.z_val, self.w_val + other.w_val) else: raise TypeError('unsupported operand type(s) for +: %s and %s' % (str(type(self)), str(type(other)))) def __mul__(self, other): if type(self) == type(other): t, x, y, z = self.w_val, self.x_val, self.y_val, self.z_val a, b, c, d = other.w_val, other.x_val, other.y_val, other.z_val return Quaternionr(w_val=a * t - b * x - c * y - d * z, x_val=b * t + a * x + d * y - c * z, y_val=c * t + a * y + b * z - d * x, z_val=d * t + z * a + c * x - b * y) else: raise TypeError('unsupported operand type(s) for *: %s and %s' % (str(type(self)), str(type(other)))) def __truediv__(self, other): if type(other) == type(self): return self * other.inverse() elif type(other) in [int, float] + np.sctypes['int'] + np.sctypes['uint'] + np.sctypes['float']: return Quaternionr(self.x_val / other, self.y_val / other, self.z_val / other, self.w_val / other) else: raise TypeError('unsupported operand type(s) for /: %s and %s' % (str(type(self)), str(type(other))))
[docs] def dot(self, other): if type(self) == type(other): return self.x_val * other.x_val + self.y_val * other.y_val + self.z_val * other.z_val + self.w_val * other.w_val else: raise TypeError('unsupported operand type(s) for \'dot\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def cross(self, other): if type(self) == type(other): return (self * other - other * self) / 2 else: raise TypeError( 'unsupported operand type(s) for \'cross\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def outer_product(self, other): if type(self) == type(other): return (self.inverse() * other - other.inverse() * self) / 2 else: raise TypeError( 'unsupported operand type(s) for \'outer_product\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def rotate(self, other): if type(self) == type(other): if other.get_length() == 1: return other * self * other.inverse() else: raise ValueError('length of the other Quaternionr must be 1') else: raise TypeError( 'unsupported operand type(s) for \'rotate\': %s and %s' % (str(type(self)), str(type(other))))
[docs] def conjugate(self): return Quaternionr(-self.x_val, -self.y_val, -self.z_val, self.w_val)
[docs] def star(self): return self.conjugate()
[docs] def inverse(self): return self.star() / self.dot(self)
[docs] def sgn(self): return self / self.get_length()
[docs] def get_length(self): return (self.x_val ** 2 + self.y_val ** 2 + self.z_val ** 2 + self.w_val ** 2) ** 0.5
[docs] def to_numpy_array(self): return np.array([self.x_val, self.y_val, self.z_val, self.w_val], dtype=np.float32)
def __iter__(self): return iter((self.x_val, self.y_val, self.z_val, self.w_val))
[docs]class Pose(MsgpackMixin): position = Vector3r() orientation = Quaternionr() attribute_order = [ ('position', Vector3r), ('orientation', Quaternionr) ] def __init__(self, position_val=None, orientation_val=None): position_val = position_val if position_val is not None else Vector3r() orientation_val = orientation_val if orientation_val is not None else Quaternionr() self.position = position_val self.orientation = orientation_val
[docs] @staticmethod def nanPose(): return Pose(Vector3r.nanVector3r(), Quaternionr.nanQuaternionr())
[docs] def containsNan(self): return (self.position.containsNan() or self.orientation.containsNan())
def __iter__(self): return iter((self.position, self.orientation))
[docs]class CollisionInfo(MsgpackMixin): has_collided = False normal = Vector3r() impact_point = Vector3r() position = Vector3r() penetration_depth = 0.0 time_stamp = 0.0 object_name = "" object_id = -1 attribute_order = [ ('has_collided', bool), ('normal', Vector3r), ('impact_point', Vector3r), ('position', Vector3r), ('penetration_depth', float), ('time_stamp', np.uint64), ('object_name', str), ('object_id', int) ]
[docs]class GeoPoint(MsgpackMixin): latitude = 0.0 longitude = 0.0 altitude = 0.0 attribute_order = [ ('latitude', float), ('longitude', float), ('altitude', float) ]
[docs]class YawMode(MsgpackMixin): is_rate = True yaw_or_rate = 0.0 attribute_order = [ ('is_rate', bool), ('yaw_or_rate', float) ] def __init__(self, is_rate=True, yaw_or_rate=0.0): self.is_rate = is_rate self.yaw_or_rate = yaw_or_rate
[docs]class RCData(MsgpackMixin): timestamp = 0 pitch, roll, throttle, yaw = (0.0,) * 4 # init 4 variable to 0.0 switch1, switch2, switch3, switch4 = (0,) * 4 switch5, switch6, switch7, switch8 = (0,) * 4 is_initialized = False is_valid = False attribute_order = [ ('timestamp', np.uint64), ('pitch', float), ('roll', float), ('throttle', float), ('yaw', float), ('switch1', int), ('switch2', int), ('switch3', int), ('switch4', int), ('switch5', int), ('switch6', int), ('switch7', int), ('switch8', int), ('is_initialized', bool), ('is_valid', bool) ] def __init__(self, timestamp=0, pitch=0.0, roll=0.0, throttle=0.0, yaw=0.0, switch1=0, switch2=0, switch3=0, switch4=0, switch5=0, switch6=0, switch7=0, switch8=0, is_initialized=False, is_valid=False): self.timestamp = timestamp self.pitch = pitch self.roll = roll self.throttle = throttle self.yaw = yaw self.switch1 = switch1 self.switch2 = switch2 self.switch3 = switch3 self.switch4 = switch4 self.switch5 = switch5 self.switch6 = switch6 self.switch7 = switch7 self.switch8 = switch8 self.is_initialized = is_initialized self.is_valid = is_valid
[docs]class ImageRequest(MsgpackMixin): camera_name = '0' image_type = ImageType.Scene pixels_as_float = False compress = False attribute_order = [ ('camera_name', str), ('image_type', int), ('pixels_as_float', bool), ('compress', bool) ] def __init__(self, camera_name, image_type, pixels_as_float=False, compress=True): # todo: in future remove str(), it's only for compatibility to pre v1.2 self.camera_name = str(camera_name) self.image_type = image_type self.pixels_as_float = pixels_as_float self.compress = compress
[docs]class ImageResponse(MsgpackMixin): image_data_uint8 = np.array([], dtype=np.uint8) image_data_float = np.array([], dtype=float) camera_name = '' camera_position = Vector3r() camera_orientation = Quaternionr() time_stamp = np.uint64(0) message = '' pixels_as_float = 0.0 compress = True width = 0 height = 0 image_type = ImageType.Scene attribute_order = [ ('image_data_uint8', np.ndarray), ('image_data_float', np.ndarray), ('camera_position', Vector3r), ('camera_name', str), ('camera_orientation', Quaternionr), ('time_stamp', np.uint64), ('message', str), ('pixels_as_float', bool), ('compress', bool), ('width', int), ('height', int), ('image_type', int) ]
[docs]class CarControls(MsgpackMixin): throttle = 0.0 steering = 0.0 brake = 0.0 handbrake = False is_manual_gear = False manual_gear = 0 gear_immediate = True attribute_order = [ ('throttle', float), ('steering', float), ('brake', float), ('handbrake', bool), ('is_manual_gear', bool), ('manual_gear', int), ('gear_immediate', bool) ] def __init__(self, throttle=0, steering=0, brake=0, handbrake=False, is_manual_gear=False, manual_gear=0, gear_immediate=True): self.throttle = throttle self.steering = steering self.brake = brake self.handbrake = handbrake self.is_manual_gear = is_manual_gear self.manual_gear = manual_gear self.gear_immediate = gear_immediate
[docs] def set_throttle(self, throttle_val, forward): if (forward): self.is_manual_gear = False self.manual_gear = 0 self.throttle = abs(throttle_val) else: self.is_manual_gear = False self.manual_gear = -1 self.throttle = - abs(throttle_val)
[docs]class KinematicsState(MsgpackMixin): position = Vector3r() orientation = Quaternionr() linear_velocity = Vector3r() angular_velocity = Vector3r() linear_acceleration = Vector3r() angular_acceleration = Vector3r() attribute_order = [ ('position', Vector3r), ('orientation', Quaternionr), ('linear_velocity', Vector3r), ('angular_velocity', Vector3r), ('linear_acceleration', Vector3r), ('angular_acceleration', Vector3r) ]
[docs]class EnvironmentState(MsgpackMixin): position = Vector3r() geo_point = GeoPoint() gravity = Vector3r() air_pressure = 0.0 temperature = 0.0 air_density = 0.0 attribute_order = [ ('position', Vector3r), ('geo_point', GeoPoint), ('gravity', Vector3r), ('air_pressure', float), ('temperature', float), ('air_density', float) ]
[docs]class CarState(MsgpackMixin): speed = 0.0 gear = 0 rpm = 0.0 maxrpm = 0.0 handbrake = False collision = CollisionInfo() kinematics_estimated = KinematicsState() timestamp = np.uint64(0) attribute_order = [ ('speed', float), ('gear', int), ('rpm', float), ('maxrpm', float), ('handbrake', bool), ('collision', CollisionInfo), ('kinematics_estimated', KinematicsState), ('timestamp', np.uint64) ]
[docs]class MultirotorState(MsgpackMixin): collision = CollisionInfo() kinematics_estimated = KinematicsState() gps_location = GeoPoint() timestamp = np.uint64(0) landed_state = LandedState.Landed rc_data = RCData() ready = False ready_message = "" can_arm = False attribute_order = [ ('collision', CollisionInfo), ('kinematics_estimated', KinematicsState), ('gps_location', GeoPoint), ('timestamp', np.uint64), ('landed_state', int), ('rc_data', RCData), ('ready', bool), ('ready_message', str), ('can_arm', bool) ]
[docs]class RotorStates(MsgpackMixin): timestamp = np.uint64(0) rotors = [] attribute_order = [ ('timestamp', np.uint64), ('rotors', list) ]
[docs]class ProjectionMatrix(MsgpackMixin): matrix = [] attribute_order = [ ('matrix', list) ]
[docs]class CameraInfo(MsgpackMixin): pose = Pose() fov = -1 proj_mat = ProjectionMatrix() attribute_order = [ ('pose', Pose), ('fov', float), # Field of View as Float ('proj_mat', ProjectionMatrix) ]
[docs]class LidarData(MsgpackMixin): time_stamp = np.uint64(0) point_cloud = 0.0 pose = Pose() segmentation = 0 attribute_order = [ ('time_stamp', np.uint64), ('point_cloud', float), ('pose', Pose), ('segmentation', int) ]
[docs]class ImuData(MsgpackMixin): time_stamp = np.uint64(0) orientation = Quaternionr() angular_velocity = Vector3r() linear_acceleration = Vector3r() attribute_order = [ ('time_stamp', np.uint64), ('orientation', Quaternionr), ('angular_velocity', Vector3r), ('linear_acceleration', Vector3r) ]
[docs]class BarometerData(MsgpackMixin): time_stamp = np.uint64(0) altitude = Quaternionr() pressure = Vector3r() qnh = Vector3r() attribute_order = [ ('time_stamp', np.uint64), ('altitude', float), ('pressure', float), ('qnh', float) ]
[docs]class MagnetometerData(MsgpackMixin): time_stamp = np.uint64(0) magnetic_field_body = Vector3r() magnetic_field_covariance = 0.0 attribute_order = [ ('time_stamp', np.uint64), ('magnetic_field_body', Vector3r), ('magnetic_field_covariance', float) ]
[docs]class GnssFixType(MsgpackMixin): GNSS_FIX_NO_FIX = 0 GNSS_FIX_TIME_ONLY = 1 GNSS_FIX_2D_FIX = 2 GNSS_FIX_3D_FIX = 3
[docs]class GnssReport(MsgpackMixin): geo_point = GeoPoint() eph = 0.0 epv = 0.0 velocity = Vector3r() fix_type = GnssFixType() time_utc = np.uint64(0) attribute_order = [ ('geo_point', GeoPoint), ('eph', float), ('epv', float), ('velocity', Vector3r), ('fix_type', int), ('time_utc', np.uint64) ]
[docs]class GpsData(MsgpackMixin): time_stamp = np.uint64(0) gnss = GnssReport() is_valid = False attribute_order = [ ('time_stamp', np.uint64), ('gnss', GnssReport), ('is_valid', bool) ]
[docs]class DistanceSensorData(MsgpackMixin): time_stamp = np.uint64(0) distance = 0.0 min_distance = 0.0 max_distance = 0.0 relative_pose = Pose() attribute_order = [ ('time_stamp', np.uint64), ('distance', float), ('min_distance', float), ('max_distance', float), ('relative_pose', Pose) ]
[docs]class Box2D(MsgpackMixin): min = Vector2r() max = Vector2r() attribute_order = [ ('min', Vector2r), ('max', Vector2r) ]
[docs]class Box3D(MsgpackMixin): min = Vector3r() max = Vector3r() attribute_order = [ ('min', Vector3r), ('max', Vector3r) ]
[docs]class DetectionInfo(MsgpackMixin): name = '' geo_point = GeoPoint() box2D = Box2D() box3D = Box3D() relative_pose = Pose() attribute_order = [ ('name', str), ('geo_point', GeoPoint), ('box2D', Box2D), ('box3D', Box3D), ('relative_pose', Pose) ]
[docs]class PIDGains(): """ Struct to store values of PID gains. Used to transmit controller gain values while instantiating AngleLevel/AngleRate/Velocity/PositionControllerGains objects. Attributes: kP (float): Proportional gain kI (float): Integrator gain kD (float): Derivative gain """ def __init__(self, kp, ki, kd): self.kp = kp self.ki = ki self.kd = kd
[docs] def to_list(self): return [self.kp, self.ki, self.kd]
[docs]class AngleRateControllerGains(): """ Struct to contain controller gains used by angle level PID controller Attributes: roll_gains (PIDGains): kP, kI, kD for roll axis pitch_gains (PIDGains): kP, kI, kD for pitch axis yaw_gains (PIDGains): kP, kI, kD for yaw axis """ def __init__(self, roll_gains=PIDGains(0.25, 0, 0), pitch_gains=PIDGains(0.25, 0, 0), yaw_gains=PIDGains(0.25, 0, 0)): self.roll_gains = roll_gains self.pitch_gains = pitch_gains self.yaw_gains = yaw_gains
[docs] def to_lists(self): return [self.roll_gains.kp, self.pitch_gains.kp, self.yaw_gains.kp], [self.roll_gains.ki, self.pitch_gains.ki, self.yaw_gains.ki], [self.roll_gains.kd, self.pitch_gains.kd, self.yaw_gains.kd]
[docs]class AngleLevelControllerGains(): """ Struct to contain controller gains used by angle rate PID controller Attributes: roll_gains (PIDGains): kP, kI, kD for roll axis pitch_gains (PIDGains): kP, kI, kD for pitch axis yaw_gains (PIDGains): kP, kI, kD for yaw axis """ def __init__(self, roll_gains=PIDGains(2.5, 0, 0), pitch_gains=PIDGains(2.5, 0, 0), yaw_gains=PIDGains(2.5, 0, 0)): self.roll_gains = roll_gains self.pitch_gains = pitch_gains self.yaw_gains = yaw_gains
[docs] def to_lists(self): return [self.roll_gains.kp, self.pitch_gains.kp, self.yaw_gains.kp], [self.roll_gains.ki, self.pitch_gains.ki, self.yaw_gains.ki], [self.roll_gains.kd, self.pitch_gains.kd, self.yaw_gains.kd]
[docs]class VelocityControllerGains(): """ Struct to contain controller gains used by velocity PID controller Attributes: x_gains (PIDGains): kP, kI, kD for X axis y_gains (PIDGains): kP, kI, kD for Y axis z_gains (PIDGains): kP, kI, kD for Z axis """ def __init__(self, x_gains=PIDGains(0.2, 0, 0), y_gains=PIDGains(0.2, 0, 0), z_gains=PIDGains(2.0, 2.0, 0)): self.x_gains = x_gains self.y_gains = y_gains self.z_gains = z_gains
[docs] def to_lists(self): return [self.x_gains.kp, self.y_gains.kp, self.z_gains.kp], [self.x_gains.ki, self.y_gains.ki, self.z_gains.ki], [self.x_gains.kd, self.y_gains.kd, self.z_gains.kd]
[docs]class PositionControllerGains(): """ Struct to contain controller gains used by position PID controller Attributes: x_gains (PIDGains): kP, kI, kD for X axis y_gains (PIDGains): kP, kI, kD for Y axis z_gains (PIDGains): kP, kI, kD for Z axis """ def __init__(self, x_gains=PIDGains(0.25, 0, 0), y_gains=PIDGains(0.25, 0, 0), z_gains=PIDGains(0.25, 0, 0)): self.x_gains = x_gains self.y_gains = y_gains self.z_gains = z_gains
[docs] def to_lists(self): return [self.x_gains.kp, self.y_gains.kp, self.z_gains.kp], [self.x_gains.ki, self.y_gains.ki, self.z_gains.ki], [self.x_gains.kd, self.y_gains.kd, self.z_gains.kd]
[docs]class MeshPositionVertexBuffersResponse(MsgpackMixin): position = Vector3r() orientation = Quaternionr() vertices = 0.0 indices = 0.0 name = '' attribute_order = [ ('position', Vector3r), ('orientation', Quaternionr), ('vertices', float), ('indices', float), ('name', str) ]