Skip to main content
Skip table of contents

[예제]MSC API 및 UDP로 시뮬레이터 제어하기

본 섹션에서는 MSC API와 UDP 통신으로 MORAI Simulator를 제어하는 방법에 대해서 설명한다.

MSC API를 사용하여 MORAI Simulator를 실행 후, UDP 통신을 적용한 예제 코드를 통해 차량 주행 테스트까지 진행한다.


사전 요구 사항

UDP 통신을 적용한 MSC API를 사용하기 위하여 필요한 설치 프로그램 정보는 아래와 같다.

  • Python 3.7.5

  • MORAI SIM(버전 상관 없음)

예제 진행 절차

본 섹션에서 다루는 예제를 아래의 절차대로 진행한다.

Step 1: MORAI Launcher 실행

MORAI Simulator Control API | Step-1:-MORAI-Launcher-실행 방법과 동일하다.

Step 2: 예제 코드 다운로드

아래 경로의 예제 코드를 다운로드한다.

다운로드한 예제 코드의 주요 파일 구성은 다음과 같다.

  • msc_udp : msc_udp_exmaple 파일

Step 3: 예제 코드로 MORAI Simulator 실행

1] 파라미터 정보 수정

아래의 사항을 고려하여 다운로드한 예제 코드의 파라미터를 수정한다.

  • 시뮬레이터 실행과 예제 코드를 실행하는 사용자 환경이 다른 경우(IP가 다른 환경), params.txt와 params.json 파일 내 관련 IP 정보를 수정해야 한다.

  • params.txt 내 id, pw, version , map, vehicle 과 같은 파라미터를 사용자 환경에 맞게 기입한다(대소문자 구분).
    예:
    receive_user_ip: 127.0.0.1
    receive_user_port: 10329
    request_dst_ip: 127.0.0.1
    request_dst_port: 10509
    user_id: id
    user_pw: pw
    version: v.3.9.210226.3
    map/vehicle: K-City/Niro

2] Simulator Network setting

  • Network Setting은 Scenario를 저장할 때의 네트워크 정보를 적용.

  • Simulator의 Edit > Network Setting 메뉴에서 해당 테스트 Scenario를 저장할 때 필요한 네트워크 정보를 아래와 같이 입력한다.

    예:

    • Host IP: 127.0.0.1

    • Destination IP: 127.0.0.1

    • Ego Ctrl Cmd

      • Host PORT: 7601

      • Destination PORT: 7600

    • Ego Vehicle Status

      • Host PORT: 8001

      • Destination PORT: 8000

    • Object Info

      • Host PORT: 8101

      • Destination PORT: 8100

IP / Ego Ctrl Cmd

Ego Vehicle Status / Object Info

3] 예제 코드 실행

api.py를 실행하면 parmas.txt 내 id, pw ,version, map/vehicle 정보에 맞추어 MORAI 시뮬레이터를 실행 및 주행 테스트를 진행할 수 있다.

예제 코드 설명

UDP 통신을 적용한 unit_test_ros.py 예제 코드에 관한 설명은 아래와 같다.

  • class launcher_start: parmas.txt 내 파라미터 정보를 읽어 MSC API로 시뮬레이션을 시작하는 class

  • class gen_planner: unit test 수행하는 class. 해당 class 에서 msc를 사용하고자 할 때 아래와 같은 메소드를 실행.

    • commander( ): define.py의 Command_list에 정의된 기능을 사용 하고자 할 때

    • custom_command( ): gen_planner class안에서 사용자가 option을 변경하며 MSC 명령을 사용하고자 할 때

PY
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time,os,json,threading
from lib.utils import *

from lib.define import *


from lib.controller import *


class launcher_start(controller):
        
    def __init__(self):                
        self.controller = controller()

    def launcher_start(self):
        
        while True:           
            
            if self.controller.update():             
                self.controller.is_waitting() #status_wait 
                self.controller.is_downloading() #check_is_download_status                                      

                if self.controller.is_befor_login():
                                        
                    self.controller.commander(Command.LOGIN)#Login명령
                    
                
                if self.controller.is_after_login() or self.controller.is_after_sim_quit_to_launcher():  # is_after_sim_quit_to_launcher : Simulator에서 quit 명령 후 Launcher 복귀 상태 확인

                    self.controller.commander(Command.SELECT_VER)#version select명령

                if self.controller.is_not_find_version(): #version_error         
                    break
                    
                if self.controller.is_can_execute_sim(): 
                                                                    
                    self.controller.commander(Command.EXECUTE_SIM) #Simulator 실행 명령                        

                    self.controller.watting_execute()
                                            
                if self.controller.is_sim_not_install():                                
                    self.controller.commander(Command.INSTALL_SIM) #Simulator 설치 명령                     


                if self.controller.is_sim_lobby():                    

                    self.controller.commander(Command.MAP_VEHICLE_SELECT)#시뮬레이션/옵션 변경 실행 명령

                    self.controller.watting_loading()

                if self.controller.is_sim_playing():          

                    self.controller.commander(Command.NET_SETTING) #Network setting
                    
                    self.controller.commander(Command.SEN_SETTING) #Sensor setting

                    self.controller.commander(Command.SCEN_SETTING) #Scenraio setting
                    planner(self.controller)
                    break

            else :
                print("[NO Simulator Control Data]")
                time.sleep(1)            



class planner:

    def __init__(self,controller):
        path = os.path.dirname( os.path.abspath( __file__ ) )      

        with open(os.path.join(path,("params.json")),'r') as fp :
            params = json.load(fp)

        params=params["params"]
        user_ip = params["user_ip"]
        host_ip = params["host_ip"]


        status_port =params["vehicle_status_dst_port"]
        object_port =params["object_info_dst_port"]
        get_traffic_port=params["get_traffic_dst_port"]

        set_traffic_port=params["set_traffic_host_port"]
        ctrl_cmd_port = params["ctrl_cmd_host_port"]

        planner_path_file_name = params["planner_path_file_name"]

        traffic_greenlight_setting= params["traffic_greenlight_setting"]
        
        
        self.status=udp_parser(user_ip, status_port,'status')
        self.obj=udp_parser(user_ip, object_port,'obj')
        self.traffic=udp_parser(user_ip, get_traffic_port,'get_traffic')


        self.ctrl_cmd=udp_sender(host_ip,ctrl_cmd_port,'ctrl_cmd')
        self.set_traffic=udp_sender(host_ip,set_traffic_port,'set_traffic')
        

        self.txt_reader=pathReader()
        self.global_path=self.txt_reader.read(planner_path_file_name)


        vel_planner=velocityPlanning(60,2) ## 경로기반 속도 계획
        self.vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)        

        self.pure_pursuit=purePursuit()
        self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
        
        self.vo=vaildObject()## 장애물 유무 확인

        self.pid=pidController() ## pidController import        
        self.current_waypoint=0
  

        self._is_status=False
        while not self._is_status :
            # if not self.status.get_data() :
            controller.commander(Command.SCEN_SETTING)
            print('No Status Data Cannot run main_loop')
            time.sleep(0.5)
        # else :
            self._is_status=True                

        self.main_loop()

    
    def main_loop(self):        
        
        self.timer=threading.Timer(0.1,self.main_loop)
        self.timer.start()
        
        status_data=self.status.get_data()
        obj_data=self.obj.get_data()
        traffic_data = self.traffic.get_data()
        position_x=status_data[4]
        position_y=status_data[5]
        position_z=status_data[6]
        heading=status_data[9]# degree
        velocity=status_data[10]

        #set trafficlight (green)
        if not len(traffic_data) == 0 and traffic_greenlight_setting == "True": #set trafficlight (green)                        
            self.set_traffic.send_data([False,traffic_data[1],16])
            traffic_data[3]=16


        #fine_local_path, waypoint
        local_path,current_waypoint =findLocalPath(self.global_path,position_x,position_y)
        
        
        ## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
        self.vo.get_object(obj_data)
        global_obj,local_obj=self.vo.calc_vaild_obj([position_x,position_y,(heading)/180*pi])

        if not len(traffic_data) == 0:
            self.cc.checkObject(local_path,global_obj,local_obj,traffic_data[1],traffic_data[3])
        else:
            self.cc.checkObject(local_path,global_obj,local_obj,[],[])         

        #pure_pursuit. get_steering_angle_value                
        self.pure_pursuit.getPath(local_path) 
        self.pure_pursuit.getEgoStatus(position_x,position_y,position_z,velocity,heading)
            
        steering_angle=self.pure_pursuit.steering_angle()


        #ACC                 
        cc_vel = self.cc.acc(local_obj,velocity,self.vel_profile[current_waypoint]) ## advanced cruise control 적용한 속도 계획        
        print(self.vel_profile[current_waypoint])
        target_velocity = cc_vel
              

        control_input=self.pid.pid(target_velocity,velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
        if control_input > 0 :
            accel= control_input
            brake= 0
        else :
            accel= 0
            brake= -control_input

        ctrl_mode = 2 # 2 = AutoMode / 1 = KeyBoard
        Gear = 4 # 4 1 : (P / parking ) 2 (R / reverse) 3 (N / Neutral)  4 : (D / Drive) 5 : (L)


        self.ctrl_cmd.send_data([ctrl_mode,Gear,accel,brake,steering_angle])     
        
        print('current_waypoint:{0}\tglobal_path_lenght:{1}'.format(self.current_waypoint,len(self.global_path)))
        print("sterring_angle:{0}\taccel:{1}\tbrake:{2}".format(steering_angle,accel,brake))
        
        
        if self.current_waypoint == len(self.global_path)-1: #Scenario끝나면 다시 Scenraio load           
            self.current_waypoint=0
            self.controller.commander(Command.SCEN_SETTING)

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.