OpenSCENARIO API
본 섹션에서는 MORAI OpenSCENARIO API를 사용하는 방법에 대하여 설명한다.
OpenSCENARIO API는 사용자의 Python 코드 단에서 Scenario Runner의 OpenSCENARIO 파일 Import 및 Simulation 기능을 직접 호출하여 사용할 수 있도록 필요한 명령 및 데이터를 전달하는 인터페이스이다.
사전 요구 사항
OpenSCENARIO API를 사용하기 위하여 필요한 설치 프로그램 정보는 아래와 같다.
Python 및 추가 설치 모듈
numpy (1.19.1)
PyQt5
matplotlib
scipy
eventhandler
pyproj
grpcio(1.39.0)
grpcio-tools (1.39.0)
pyconcrete
- 22.R2와 호환되는 API를 사용하는 경우 설치 필요
- 패키지 설치 전set PYCONCRETE_PASSPHRASE={your passphrase}
명령어로 암호문 설정 필요sourcedefender
- 22.R3 부터는 pyconcrete 대신 sourcedefender 설치 필요
MORAI SIM
예제 코드
아래와 같이 MORAI SIM 버전 및 OS에 맞는 Python 버전 및 예제 코드를 사용해야 한다.
MORAI SIM 버전 | Python 버전 | 예제 코드 | 비고 |
---|---|---|---|
22.R2 | Window: python 3.7.3 | Window | pyconcrete로 만든 API로 OS마다 다른 예제 코드 제공 |
Linux(ubuntu 18.04): python 3.7.13 | Linux | ||
22.R3 | python 3.7.3 | sourcedefender를 사용한 버전으로 운영체제는 호환되면 python 버전은 3.7.3 버전을 사용해야 함 |
OpenSCENARIO API 사용 순서
아래의 순서로 사용자 측 예제 코드에서 OpenSCENARIO API를 호출하여 Scenario Runner 기능을 사용할 수 있다.
1] MORAI Launcher 실행 및 로그인
2] 특정 시뮬레이터 버전 설치 및 실행
3] 시뮬레이터의 맵 선택 및 차량 선택 화면에 위치한 상태에서 main.py 스크립트 실행
4] 5개의 시나리오가 연속적으로 실행되는 것을 확인
예제 코드 설명
파일 구조
main.py
open_scenario_client_wrapper.py
open_scenario_importer_wrapper.py
data 폴더
OpenSCENARIO 예제 파일
lib 폴더
암호화하여 제공된 MORAI OpenSCENARIO 라이브러리
예제 코드 동작 방식
main 함수에서 시나리오 파일에 대해 차례대로 OpenScenarioImporterWrapper
클래스를 사용하여 시나리오 파일을 Import 하고, OpenScenarioClientWrapper
클래스를 통해 시나리오 파일 Simulation을 시작한다.
OpenScenarioImporterWrapper
, OpenScenarioClientWrapper
클래스는 MORAI OpenSCENARIO 라이브러리 사용 예시를 보여주기 위한 클래스로 사용자가 임의로 수정 가능하다.
현재 시뮬레이터와 gRPC 통신을 하기 위해서는 아래와 같은 IP, Port를 사용
localhost:7789
main.py
import os
import sys
import numpy as np
import math
import time
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_path)
import sourcedefender
from open_scenario_importer_wrapper import OpenScenarioImporterWrapper
from open_scenario_client_wrapper import OpenScenarioClientWrapper
if __name__ == '__main__':
# 실행할 시나리오 리스트
scenario_list = [
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_1.xosc',
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_2.xosc',
'.\data\openscenario\R_KR_PG_K-City\Scenario_CCRB_3.xosc',
'.\data\openscenario\V_RHT_Suburb_03\Scenario_Animal_Deer.xosc',
'.\data\openscenario\V_RHT_Suburb_03\Scenario_UTurn.xosc'
]
client_wrapper = OpenScenarioClientWrapper('127.0.0.1', '7789')
for scenario_file_path in scenario_list:
# OpenSCENARIO 파일 Import
importer_wrapper = OpenScenarioImporterWrapper()
abs_scenario_file_path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), scenario_file_path))
importer_wrapper.import_open_scenario(abs_scenario_file_path)
client_wrapper.set_open_scenario_importer(importer_wrapper.scenario_importer)
# 시나리오 시작
client_wrapper.start_scenario()
# 시나리오가 종료될 때까지 대기
# 시나리오가 종료되면 자동으로 OpenScenarioClientWrapper에서 Stop Scenario 호출
while client_wrapper.get_stop_status():
time.sleep(1)
open_scenario_client_wrapper.py
import os
import sys
import threading
from PyQt5 import QtCore, QtWidgets, QtGui, uic
from PyQt5.QtWidgets import *
from PyQt5.QtGui import QIntValidator
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject
from lib.openscenario.client.open_scenario_client import OpenScenarioClient
class OpenScenarioClientWrapper(QObject):
def __init__(self, ip, port):
super().__init__()
self.client = OpenScenarioClient(ip, port, self)
self.open_scenario_importer = None
def set_open_scenario_importer(self, open_scenario_importer):
""" open_scenario_importer instance 설정 """
self.client.set_open_scenario_importer(open_scenario_importer)
def start_scenario(self):
self.client.start_scenario()
def stop_scenario(self):
self.client.stop_scenario()
def get_stop_status(self):
return self.client.is_start
@pyqtSlot(bool, str)
def start_scenario_callback(self, result, desc):
"""scenario runner에서 사용 시 callback 함수를 등록해줘야 하기 때문에 에러나지 않게 하기 위해 정의"""
pass
@pyqtSlot(bool, str)
def stop_scenario_callback(self, result, desc):
"""scenario runner에서 사용 시 callback 함수를 등록해줘야 하기 때문에 에러나지 않게 하기 위해 정의"""
pass
open_scenario_importer_wrapper.py
import os
import sys
from lib.openscenario.class_defs.enumerations import *
from lib.openscenario.open_scenario_importer import OpenScenarioImporter
from lib.mgeo.class_defs.mgeo import MGeo
class OpenScenarioImporterWrapper():
""" OpenScenario 파일 Import 클래스 """
def __init__(self):
self.scenario_importer = OpenScenarioImporter()
self.scenario_data = None
self.mgeo_folder_path = None
def import_open_scenario(self, scenario_file_path):
self.scenario_importer.import_open_scenario(scenario_file_path)
self.mgeo_folder_path = self.scenario_importer.get_mgeo_folder_path()
self.mgeo_planner_map = MGeo.create_instance_from_json(self.mgeo_folder_path)
self.scenario_importer.set_mgeo(self.mgeo_planner_map)
self.scenario_importer.update_scenario_data()
def clear(self):
self.scenario_importer.clear()