[예제]MSC API 및 ROS로 시뮬레이터 제어하기
본 섹션에서는 MSC API와 ROS 통신으로 MORAI SIM을 제어하는 방법에 대해서 설명한다.
MSC API를 사용하여 MORAI Simulator를 실행 후, ROS 통신을 적용한 예제 코드를 통해 차량 주행 테스트까지 진행한다.
사전 요구 사항
ROS 통신을 적용한 MSC API를 사용하기 위하여 필요한 설치 프로그램 정보는 아래와 같다.
Python 3.7.5
MORAI SIM(버전 상관 없음)
예제 진행 절차
본 섹션에서 다루는 예제를 아래의 절차대로 진행한다.
Step 1: MORAI Launcher 실행
MORAI Simulator Control API | Step-1:-MORAI-Launcher-실행 방법과 동일하다.
Step 2: 예제 코드 다운로드
아래 경로의 예제 코드를 다운로드한다.
다운로드한 예제 코드의 주요 파일 구성은 아래와 같다.
K-City: K-City Scenraio 파일 포함, ‘Morai_Launcher_Data / SaveFile / Scenario’ 경로에 복사
msc_ros: msc_ros_example 파일 포함, workspace/src 하위에 복사
params.txt: 예제 코드를 실행하는 사용자 환경에서 MORAI SIM와 통신하기 위해 필요한 파라미터 정보를 포함
Step 3: 예제 코드로 MORAI Simulator 실행
1] 파라미터 정보 수정
아래의 사항을 고려하여 다운로드한 예제 코드의 파라미터를 수정한다.
시뮬레이터 및 Code 실행이 다른 환경(IP 가 다른 환경)에서 진행 되면 params.txt , params.json 의 ip를 수정해야 한다.
시뮬레이터 실행과 예제 코드를 실행하는 사용자 환경이 다른 경우(IP가 다른 환경), params.txt와 params.json 파일 내 관련 IP 정보를 수정해야 한다.
params.txt 내 id, pw, version , map, vehicle 과 같은 파라미터를 사용자 환경에 맞게 기입한다(대소문자 구분).
예:
receive_user_ip: 127.0.0.1
receive_user_port: 10329
request_dst_ip: 127.0.0.1
request_dst_port: 10509
user_id: id
user_pw: pw
version: v.3.9.210226.3
map/vehicle: K-City/Niro
2] Simulator Network 설정
Simulator의 Edit > Network Setting 메뉴에서 해당 테스트 Scenario를 저장할 때 필요한 네트워크 정보를 아래와 같이 입력한다.
예:
Bridge IP: 127.0.0.1
Bridge PORT: 9090
3] 예제 코드 실행
unit_test_ros.py를 실행하면 parmas.txt 내 id, pw ,version, map/vehicle 정보에 맞추어 MORAI 시뮬레이터를 실행 및 주행 테스트를 진행할 수 있다.
예제 코드 설명네네트
ROS 통신을 적용한 unit_test_ros.py 예제 코드에 관한 설명은 아래와 같다.
class sim_ctrl
parmas.txt에 입력한 정보를 읽어 MSC를 통해 시뮬레이션을 시작해주는 classclass gen_planner
unit test 알고리즘
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy,rospkg
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import json
from morai_msgs.msg import EgoVehicleStatus,ObjectInfo,CtrlCmd,ScenarioLoad
from lib.ros_utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController
from lib.udp_parser import udp_parser,udp_sender
#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
params[i]=(params[i].split(':')[1].replace(" ","")).replace('\r','')
recive_user_ip = params[0]
recive_user_port = int(params[1])
request_dst_ip = params[2]
request_dst_port = int(params[3])
class sim_ctrl :
def __init__(self):
self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')
self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status')
self.main_loop()
def main_loop(self):
while True:
self.status_data=self.get_status.get_data()
print("data : ",self.status_data)
if not len(self.status_data)==0:
self.data_platform = self.status_data[0]
self.data_stage = self.status_data[1]
self.data_status = self.status_data[2]
self.command_platform = self.status_data[3]
self.command_cmd = self.status_data[4]
self.command_option = self.status_data[5]
self.command_result = self.status_data[6]
if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
cmd_platform="0x01"
cmd_command="0x0001"
cmd_option=params[4]+"/"+params[5]#(ID/PW)
self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
cmd_platform = "0x01"
cmd_command = "0x0002"
cmd_option = params[6]#simulator version
self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
if self.data_status=="0x0001": #Result = 성공
cmd_platform="0x01"
cmd_command="0x0004"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령
time.sleep(3)
elif self.data_status=="0x0012": #Result = Simulator 설치 안됨
cmd_platform="0x01"
cmd_command="0x0003"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
cmd_platform="0x02"
cmd_command="0x0001"
cmd_option=params[7]#(MAP/Vehicle)
self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
time.sleep(3)
while True: #로딩이 정상적으로 완료될때까지 대기.
_,self.data_stage,_,_,_,_,_=self.get_status.get_data()
print(self.data_stage)
if self.data_stage== "0x02":
if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태
break
time.sleep(1)
break
time.sleep(1)
else :
print("[NO Simulator Control Data]")
time.sleep(0.5)
gen_planner() ##test알고리즘
def send_data(self, data):
try:
print("send",data)
cmd_platform=int(data[0],0)
cmd_command=int(data[1],0)
cmd_option=data[2]
self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
except ValueError:
print("Invalid input")
class gen_planner():
def __init__(self):
rospy.init_node('gen_planner', anonymous=True)
#publisher
ctrl_pub = rospy.Publisher('/ctrl_cmd',CtrlCmd, queue_size=1) ## Vehicl Control
scenario_pub = rospy.Publisher('/ScenarioLoad',ScenarioLoad, queue_size=1) ## Vehicl Control
ctrl_msg= CtrlCmd()
scenarioLoad_msgs = ScenarioLoad()
scenarioLoad_msgs.file_name = "scenario_example_ros"
scenarioLoad_msgs.load_ego_vehicle_data=True
scenarioLoad_msgs.load_pedestrian_data=True
scenarioLoad_msgs.load_surrounding_vehicle_data=True
scenarioLoad_msgs.load_object_data=True
#subscriber
rospy.Subscriber("/Ego_topic", EgoVehicleStatus, self.statusCB) ## Vehicl Status Subscriber
rospy.Subscriber("/Object_topic", ObjectInfo, self.objectInfoCB) ## Object information Subscriber
#def
self.is_status=False ## 차량 상태 점검
self.is_obj=False ## 장애물 상태 점검
#class
path_reader=pathReader('gen_ros') ## 경로 파일의 위치
pure_pursuit=purePursuit() ## purePursuit import
self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
self.vo=vaildObject()
pid=pidController() ## pidController import
#read path
self.global_path=path_reader.read_txt("scenario_example.txt") ## 출력할 경로의 이름
vel_planner=velocityPlanning(40/3.6,1.5) ## 속도 계획
vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)
#time var
count=0
rate = rospy.Rate(30) # 30hz
while not rospy.is_shutdown():
if self.is_status==True and self.is_obj==True: ## 차량의 상태, 장애물 상태 점검
if count==0:
time.sleep(5)
scenario_pub.publish(scenarioLoad_msgs)
## global_path와 차량의 status_msg를 이용해 현제 waypoint와 local_path를 생성
local_path,self.current_waypoint=findLocalPath(self.global_path,self.status_msg,0)
## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
self.vo.get_object(self.object_info_msg.num_of_objects,self.object_info_msg.object_type,self.object_info_msg.pose_x,self.object_info_msg.pose_y,self.object_info_msg.velocity)
global_obj,local_obj=self.vo.calc_vaild_obj([self.status_msg.pose_x,self.status_msg.pose_y,(self.status_msg.heading+90)/180*pi])
self.cc.checkObject(local_path,global_obj,local_obj)
pure_pursuit.getPath(local_path) ## pure_pursuit 알고리즘에 Local path 적용
pure_pursuit.getEgoStatus(self.status_msg) ## pure_pursuit 알고리즘에 차량의 status 적용
ctrl_msg.steering=-pure_pursuit.steering_angle()/180*pi
cc_vel = self.cc.acc(local_obj,self.status_msg.velocity,vel_profile[self.current_waypoint],self.status_msg) ## advanced cruise control 적용한 속도 계획
target_velocity = cc_vel
control_input=pid.pid(target_velocity,self.status_msg.velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
if control_input > 0 :
ctrl_msg.accel= control_input
ctrl_msg.brake= 0
else :
ctrl_msg.accel= 0
ctrl_msg.brake= -control_input
if self.status_msg.velocity < 3.0 and target_velocity<=0.0:
ctrl_msg.accel=0
ctrl_msg.brake=1
print(self.current_waypoint,len(self.global_path.poses)-1)
print(ctrl_msg)
ctrl_pub.publish(ctrl_msg) ## Vehicl Control 출력
if self.current_waypoint == (len(self.global_path.poses)-1) :
print("??????????")
count=0
self.current_waypoint=0
scenario_pub.publish(scenarioLoad_msgs)
time.sleep(3)
count+=1
rate.sleep()
def statusCB(self,data): ## Vehicl Status Subscriber
self.status_msg=data
self.is_status=True
def objectInfoCB(self,data): ## Object information Subscriber
self.object_info_msg=data
self.is_obj=True
if __name__ == "__main__":
ctrl=sim_ctrl()