gRPC API 사용하기
본 섹션에서는 gRPC API를 사용하여 MORAI Simulator를 제어하는 방법에 대하여 설명한다.
사용자는 gRPC API를 통해 UI를 사용하지 않아도, 시뮬레이터에서 맵 로드, NPC 차량 생성 및 제어 등을 수행할 수 있고, 사용자가 원하는 테스트 시나리오 환경을 구성할 수 있다.
사전 요구 사항
Example code 다운로드
24.R2 / 26.R1 버전과 호환되는 예제 코드
https://github.com/MORAI-Autonomous/MORAI-DriveExample_GRPC/tree/grpc-py
Protocol Documentation
https://github.com/MORAI-Autonomous/grpc-docs
Python 환경 설정
python 버전
example code는 python 3.7.3에서 작성 및 테스트 진행
설치 필요 python 모듈
grpcio : 1.44.0
grpcio-tools : 1.44.0
gRPC API 사용 순서
1] Morai Launcher 실행 후 로그인
2] Morai Launcher에서 특정 시뮬레이터 버전 설치 및 실행
3] 시뮬레이터의 맵 선택 및 차량 선택 화면에 위치한 상태에서 example.py 스크립트 실행

4] 결과
맵이 로드되고 Ego 차량이 Built-In 모드로 주행하는 것을 확인
NPC 1, 2 차량이 생성되어 원을 그리며 주행하는 것을 확인
장애물이 2개 생성된 것을 확인
예제 코드 설명
파일 구조
example 코드
example.py
gRPC 통신에 사용되는 proto 파일
lib/grpc/src/define.py
lib/grpc/src/api
lib/grpc/src/proto
예제 코드 동작 방식
main 함수에서 GRPCClient 클래스를 생성하여 해당 클래스를 통해 gRPC API를 호출하여 시뮬레이터와 gRPC 통신을 수행
현재 시뮬레이터와 gRPC 통신을 하기 위해서는 아래와 같은 IP, Port를 사용
localhost:7789
프로그램은 바로 종료되지 않기 위해 무한 루프를 돌다가 ‘Ctrl + C’를 누르면 프로그램 종료
예제 코드의 동작 순서는 아래와 같다.
client 생성 후 connect 하여 gRPC 통신 시작
start simulation 함수를 호출하여 사용자가 원하는 맵과 차량 로드, ego 차량을 원하는 위치로 이동
spawn vehicle 함수를 호출하여 npc 차량 생성
spawn obstacle 함수를 호출하여 장애물 생성
multi ego ctrl 함수를 thread로 생성하여 npc 제어 명령 주기적으로 송신
get ego status 함수를 호출하여 차량 상태 정보 수신
import os, sys
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_path)
sys.path.append(os.path.normpath(os.path.join(current_path, 'lib\grpc\src')))
sys.path.append(os.path.normpath(os.path.join(current_path, 'lib\grpc\src\proto')))
import time
import threading
from lib.grpc.src.defines import CLIENT_KEY, MORAI_SIM_ADDRESS, MORAI_SIM_PORT, TEST_VEHICLE_MODEL, map_and_vehicle, vehicle_transform_001, vehicle_transform_002, vehicle_transform_003, cruise_on_link, sync_mode_realtime
from lib.grpc.src.api.morai_sim_client import MoraiSimClient
from lib.grpc.src.proto.morai.common.type_pb2 import Transform
from proto.morai.environment.environment_enum_pb2 import WeatherType
class GRPCClient():
def __init__(self):
self.grpc_client = MoraiSimClient(CLIENT_KEY)
self.grpc_client.connect(MORAI_SIM_ADDRESS, MORAI_SIM_PORT)
self.grpc_client.start_simulation(map_and_vehicle, vehicle_transform_001, cruise_on_link, sync_mode_realtime)
world = self.grpc_client.get_simulation_world()
world.set_time(13)
world.set_weather(WeatherType.WEATHER_TYPE_SUNNY)
self.ego = world.get_ego()
self.vehicle1 = world.spawn_vehicle(vehicle_transform_002, TEST_VEHICLE_MODEL, 'vehicle1', velocity=30, multi_ego=True)
self.vehicle2 = world.spawn_vehicle(vehicle_transform_003, TEST_VEHICLE_MODEL, 'vehicle2', velocity=30, multi_ego=True)
transform1 = Transform()
transform1.location.x = 208.68639761359265
transform1.location.y = 1661.4038564971388
transform1.location.z = 0.0
transform1.rotation.x = 0.0
transform1.rotation.y = 0.0
transform1.rotation.z = 0.0
world.spawn_obstacle(transform1, 'CargoBox', 'obstacle_1')
transform2 = Transform()
transform2.location.x = 212.4813728324434
transform2.location.y = 1605.9207673354679
transform2.location.z = 0.0
transform2.rotation.x = 0.0
transform2.rotation.x = 0.0
transform2.rotation.x = 0.0
world.spawn_obstacle(transform2, 'WoodBox', 'obstacle_2')
world.pause()
world.resume()
def send_multi_ego_ctrl_cmd(self):
while True:
self.vehicle1.control(1, 0.9, 0.0, -1.0, 0.0, 0.0, 0)
self.vehicle2.control(2, 0.0, 0.0, -1.0, 30.0, 0.0, 0)
def get_ego_status(self):
return self.ego.get_actor_state()
def finalize(self):
self.grpc_client.finalize()
if __name__ == "__main__":
print('start example.')
print('press ctrl + c for exit.')
client = GRPCClient()
ctrl_npc = threading.Thread(target=client.send_multi_ego_ctrl_cmd)
ctrl_npc.start()
try:
while True:
ego_status = client.get_ego_status()
print(ego_status)
time.sleep(1.0)
except KeyboardInterrupt:
client.finalize()