| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | |||
| 5 | 6 | 7 | 8 | 9 | 10 | 11 |
| 12 | 13 | 14 | 15 | 16 | 17 | 18 |
| 19 | 20 | 21 | 22 | 23 | 24 | 25 |
| 26 | 27 | 28 | 29 | 30 |
- CNN
- Python
- CPP
- Humble
- Seoul National University
- deep learning
- 밑바닥부터 시작하는 딥러닝2
- SQLD
- Robocup@Home 2026
- file system
- Data Science
- Gentoo2
- do it! 알고리즘 코딩테스트: c++편
- Machine Learning
- ROS2
- On-memory file system
- computer vision
- System Call
- paper review
- Baekjoon
- C++
- Multimedia
- cs231n
- Linux
- Optimization
- Process
- Operating System
- BFS
- DFS
- RNN
- Today
- Total
newhaneul
[Robocup@Home 2026] 2026.01.26 (Grounded-SAM2, cyclonedds.xml) 본문
[Robocup@Home 2026] 2026.01.26 (Grounded-SAM2, cyclonedds.xml)
뉴하늘 2026. 1. 26. 23:02Thor 실행 절차
1. RPC
2. UPC
3. Power on
Grounded-SAM2
Grounded-SAM2는 현재 컴퓨터 비전 분야, 특히 로봇 공학에서 핫한 기술 중 하나이다. 이 기술은 Grounding DINO와 SAM 2를 합체시킨 파이프라인이다.
1. Grounding DINO (Detector)
- 역할: 텍스트를 이해하고 물체의 위치를 Bounding Box로 찾아낸다.
- 특징 (Open-Set Detection): 기존의 YOLO 같은 모델은 학습된 80개 클래스만 찾을 수 있었지만, DINO는 학습하지 않은 물체도 언어를 통해 찾는다.
2. SAM 2 (Segment Anything Model 2)
- 역할: 박스 정보를 받으면, 그 안에서 물체의 정확한 Mask를 픽셀 단위로 따낸다.
- 특징: Meta에서 만든 모델로, 경계선을 따는 능력이 뛰어나다. SAM 2는 기존 SAM보다 속도가 빨라졌고, 비디오에서도 물체를 놓치지 않고 추적하는 기능이 강화되었다.
1단계: Input:
- Image: 카메라가 찍은 현재 화면
- Text prompt: "bottle"
2단계: Detection (Grounding DINO):
- 이미지 전체를 훑어서 'bottle'이라고 생각되는 부분에 Bounding Box를 만든다.
- 결과: [x1, y1, x2, y2] 좌표 리스트
3단계: Segmentation (SAM 2):
- DINO가 찾은 박스 좌표를 힌트 (Prompt)로 받는다.
- 박스 안에서 배경을 제외한 진짜 물체 영역 (Mask)만 색칠한다.
4단계: Output:
- 배경이 제거된 깔끔한 물체 이미지 (Mask)
[추가]: Mask + Depth Image → 3D Point Cloud:
핀홀 카메라 모델 (Pinhole Camera Model)이라는 기하학 공식을 사용해 Back-Projection 과정으로 Point Cloud 변환할 수 있다.
1단계: 겹치기 (Masking)
- RealSense 카메라는 RGB 이미지와 Depth 이미지 (거리 맵)를 동시에 준다.
- SAM2가 준 Mask를 Depth 이미지 위에 겹친다.
- 결과: 배경을 제외한, 물병에 해당하는 픽셀들의 거리값 (Z)만 남는다.
2단계: 2D→3D로 변환
- 카메라의 내부 파라미터 (Intrinsics)를 사용하여 3D 좌표 (X, Y, Z)를 계산한다.
- 알고 있는 값:
- u, v: 이미지 상의 픽셀 좌표 (가로, 세로 위치)
- Z: Depth 이미지에서 가져온 그 픽셀의 깊이(거리)
- f_x, f_y: 카메라 렌즈의 초점 거리 (Focal Length)
- c_x, c_y: 이미지의 중심점 (Principal Point)
- 변환 공식:

# 1. SAM2가 준 Mask를 Depth 이미지 크기에 맞춤
mask_resized = cv2.resize(mask...)
# 2. 마스크 된 영역의 Depth(Z) 값만 가져옴
z = depth_m[v, u]
# 3. 공식 대입해서 3D 좌표(x, y, z) 계산
# _back_project_scaled 함수 내부
x = (u - cx) * z / fx
y = (v - cy) * z / fy
Topic
- Publishers
| Topic | Type | 설명 |
| /grounded_sam/pointcloud/objects | PointCloud2 | 객체 PointCloud (2048개 포인트) |
| /grounded_sam/pointcloud/background | PointCloud2 | 배경 PointCloud (1/16개로 다운 샘플링) |
| /grounded_sam/debug_image | Image | 마스크/바운딩박스 시각화 이미지 |
| /grounded_sam/detections | Detection2DArray | 2D 탐지 결과 |
| /grounded_sam/object_info | String | 객체 정보 (JSON) |
| /grounded_sam/status | String | 현재 상태 |
- Subscribers
| Topic | Type | 설명 |
| /camera/camera/color/image_raw | Image | RGB 이미지 |
| /camera/camera/aligned_depth_to_color/image_raw | Image | depth 이미지 |
| /camera/camera/aligned_depth_to_color/camera_info | CameraInfo | 카메라 내부 파라미터 |
| /grounded_sam/search | String | 검색할 객체 이름 |
| /grounded_sam/stop | Bool | 탐지 중지 명령 |
Node (/home/thor/vision_ws/src/dino_detection/grounded_dino_ros/grounded_sam_node.py)
#!/home/nvidia/vision_ws/src/detection_venv/bin/python
"""
GroundedSAM2 Detection & Tracking Node for ROS2 Humble (NumPy Optimized)
기능:
- Action Server로 객체 탐색 및 트래킹
- SAM2 마스크 기반 3D PointCloud 생성
- 객체: Voxel Grid + FPS로 균일한 2048개 점 생성 (토픽: /objects)
- 배경: 객체 제외 영역을 다운샘플링하여 별도 퍼블리시 (토픽: /background)
- [NEW] Detection 완료 후 저장된 PointCloud를 반복 퍼블리시
- [NEW] Service로 퍼블리시 중지
"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup
from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2, PointField
from std_msgs.msg import String, Bool, Header
from std_srvs.srv import SetBool, Trigger
from vision_msgs.msg import Detection2D, Detection2DArray, ObjectHypothesisWithPose
from inha_interfaces.action import Detection
from inha_interfaces.srv import SetEnable, SavePointCloud, GetDetectionStatus, StartReplay, StopReplay
from inha_interfaces.msg import ObjectInfo
import cv2
import numpy as np
import torch
from PIL import Image as PILImage
import time
import os
import sys
import struct
import json
import threading
from rcl_interfaces.msg import SetParametersResult
# ============ 오프라인 모드 설정 ============
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["TRANSFORMERS_OFFLINE"] = "1"
os.environ["HF_DATASETS_OFFLINE"] = "1"
# Grounded-SAM-2 패키지 경로 추가
GROUNDED_SAM2_PATH = os.path.expanduser("/home/ubuntu/vision_ws/src/Grounded-SAM-2")
if GROUNDED_SAM2_PATH not in sys.path:
sys.path.insert(0, GROUNDED_SAM2_PATH)
# GroundingDINO (로컬 패키지)
try:
from grounding_dino.groundingdino.util.inference import load_model as load_gdino_model, predict as gdino_predict
import grounding_dino.groundingdino.datasets.transforms as T
GDINO_AVAILABLE = True
except ImportError as e:
GDINO_AVAILABLE = False
print(f"[WARN] GroundingDINO not available: {e}")
# SAM2
try:
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor
SAM2_AVAILABLE = True
except ImportError as e:
SAM2_AVAILABLE = False
print(f"[WARN] SAM2 not available: {e}")
class GroundedSAM2Node(Node):
STATE_IDLE = "IDLE"
STATE_SEARCHING = "SEARCHING"
STATE_TRACKING = "TRACKING"
STATE_REPLAY = "REPLAY" # 저장된 PC 반복 퍼블리시 모드
def __init__(self):
super().__init__('grounded_sam2_node')
self.callback_group = ReentrantCallbackGroup()
self.service_callback_group = MutuallyExclusiveCallbackGroup()
# ==================== Parameters ====================
self.declare_parameter('image_topic', '/camera/camera/color/image_raw')
self.declare_parameter('depth_topic', '/camera/camera/aligned_depth_to_color/image_raw')
self.declare_parameter('camera_info_topic', '/camera/camera/aligned_depth_to_color/camera_info')
self.declare_parameter('image_type', 'raw')
self.declare_parameter('search_timeout', 20.0)
self.declare_parameter('box_threshold', 0.55)
self.declare_parameter('text_threshold', 0.60)
# PointCloud 파라미터
self.declare_parameter('enable_pointcloud', True)
self.declare_parameter('min_depth', 0.1)
self.declare_parameter('max_depth', 2.5)
self.declare_parameter('depth_scale', 0.001)
self.declare_parameter('object_voxel_size', 0.015)
self.declare_parameter('background_voxel_size', 0.10)
self.declare_parameter('target_points', 2048)
# Replay 파라미터
self.declare_parameter('replay_rate', 1.0) # Hz
# Auto Replay 파라미터
self.declare_parameter('auto_replay_on_success', True)
self.declare_parameter('auto_replay_rate', 0.5) # 0.5 Hz (2초마다)
self.declare_parameter('auto_replay_background', True)
# 모델 경로
self.declare_parameter('gdino_config', 'grounding_dino/groundingdino/config/GroundingDINO_SwinT_OGC.py')
self.declare_parameter('gdino_checkpoint', 'gdino_checkpoints/groundingdino_swint_ogc.pth')
self.declare_parameter('sam2_config', 'configs/sam2.1/sam2.1_hiera_t.yaml')
self.declare_parameter('sam2_checkpoint', 'checkpoints/sam2.1_hiera_tiny.pt')
# 파라미터 로드
self.image_topic = self.get_parameter('image_topic').value
self.depth_topic = self.get_parameter('depth_topic').value
self.camera_info_topic = self.get_parameter('camera_info_topic').value
self.image_type = self.get_parameter('image_type').value
self.search_timeout = self.get_parameter('search_timeout').value
self.box_threshold = self.get_parameter('box_threshold').value
self.text_threshold = self.get_parameter('text_threshold').value
self.enable_pointcloud = self.get_parameter('enable_pointcloud').value
self.min_depth = self.get_parameter('min_depth').value
self.max_depth = self.get_parameter('max_depth').value
self.depth_scale = self.get_parameter('depth_scale').value
self.object_voxel_size = self.get_parameter('object_voxel_size').value
self.background_voxel_size = self.get_parameter('background_voxel_size').value
self.target_points = self.get_parameter('target_points').value
self.replay_rate = self.get_parameter('replay_rate').value
self.auto_replay_on_success = self.get_parameter('auto_replay_on_success').value
self.auto_replay_rate = self.get_parameter('auto_replay_rate').value
self.auto_replay_background = self.get_parameter('auto_replay_background').value
self.gdino_config = os.path.join(GROUNDED_SAM2_PATH, self.get_parameter('gdino_config').value)
self.gdino_checkpoint = os.path.join(GROUNDED_SAM2_PATH, self.get_parameter('gdino_checkpoint').value)
self.sam2_config = self.get_parameter('sam2_config').value
self.sam2_checkpoint = os.path.join(GROUNDED_SAM2_PATH, self.get_parameter('sam2_checkpoint').value)
# ==================== Camera Configuration (수정됨) ====================
# [NEW] camera_id에 따른 토픽 매핑 설정
# 1. Front 카메라는 image_rect_raw (보정된 이미지) 사용
# 2. Left/Right는 추후 연결 시 image_rect_raw가 있다면 수정 필요
self.camera_configs = {
'head': {
'image': '/camera/camera_head/color/image_rect_raw',
'depth': '/camera/camera_head/aligned_depth_to_color/image_raw',
'info': '/camera/camera_head/aligned_depth_to_color/camera_info'
},
'right': {
'image': '/camera/camera_right/color/image_rect_raw',
'depth': '/camera/camera_right/aligned_depth_to_color/image_raw',
'info': '/camera/camera_right/aligned_depth_to_color/camera_info'
},
'left': {
'image': '/camera/camera_left/color/image_rect_raw',
'depth': '/camera/camera_left/aligned_depth_to_color/image_raw',
'info': '/camera/camera_left/aligned_depth_to_color/camera_info'
},
}
# 현재 활성화된 카메라 ID
self.current_camera_id = 'head'
# ==================== State ====================
self.state = self.STATE_IDLE
self.text_prompt = ""
self.search_start_time = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.action_goal_handle = None
self.action_lock = threading.Lock()
self.tracking_masks = None
self.tracking_boxes = None
self.tracking_labels = []
self.tracking_scores = []
self.current_segmentation_masks = []
self.gdino_transform = None
# Camera Data
self.latest_rgb = None
self.latest_rgb_header = None
self.latest_depth = None
self.latest_depth_header = None
self.fx = self.fy = self.cx = self.cy = None
self.intrinsic_width = 0
self.intrinsic_height = 0
self.camera_frame = None
# 저장된 PointCloud 데이터
self.saved_object_points = None
self.saved_object_labels = None
self.saved_background_points = None
self.saved_background_labels = None
self.saved_object_info = None
self.saved_header_frame = "camera_color_optical_frame"
# Replay 타이머
self.replay_timer = None
# ==================== Publishers ====================
self.status_pub = self.create_publisher(String, '/grounded_sam/status', 10)
self.debug_image_pub = self.create_publisher(Image, '/grounded_sam/debug_image', 10)
self.detection_pub = self.create_publisher(Detection2DArray, '/grounded_sam/detections', 10)
self.object_info_pub = self.create_publisher(String, '/grounded_sam/object_info', 10)
# PointCloud Publishers
self.pointcloud_pub = self.create_publisher(PointCloud2, '/grounded_sam/pointcloud/objects', 10)
self.background_pub = self.create_publisher(PointCloud2, '/grounded_sam/pointcloud/background', 10)
# ==================== Subscribers ====================
# 초기 구독은 파라미터 값으로 시작 (head 설정값 사용)
initial_config = self.camera_configs['head']
self.image_topic = initial_config['image']
self.depth_topic = initial_config['depth']
self.camera_info_topic = initial_config['info']
self._create_camera_subscribers(self.image_topic, self.depth_topic, self.camera_info_topic)
self.search_sub = self.create_subscription(String, '/grounded_sam/search', self.search_callback, 10)
self.stop_sub = self.create_subscription(Bool, '/grounded_sam/stop', self.stop_callback, 10)
# ==================== Action Server ====================
self._action_server = ActionServer(
self, Detection, 'grounded_sam_detection',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=self.callback_group
)
# ==================== Services (inha_interfaces) ====================
self.save_pc_srv = self.create_service(
SavePointCloud, '/grounded_sam/save_pointcloud',
self._save_pointcloud_callback,
callback_group=self.service_callback_group
)
self.get_status_srv = self.create_service(
GetDetectionStatus, '/grounded_sam/get_status',
self._get_status_callback,
callback_group=self.service_callback_group
)
self.start_replay_srv = self.create_service(
StartReplay, '/grounded_sam/start_replay',
self._start_replay_callback,
callback_group=self.service_callback_group
)
self.stop_replay_srv = self.create_service(
StopReplay, '/grounded_sam/stop_replay',
self._stop_replay_srv_callback,
callback_group=self.service_callback_group
)
self.set_enable_srv = self.create_service(
SetEnable, '/grounded_sam/set_enable',
self._set_enable_callback,
callback_group=self.service_callback_group
)
self._load_models()
self._cb_handle = self.add_on_set_parameters_callback(self.on_param_change)
self.timeout_timer = self.create_timer(0.1, self.check_timeout)
self._print_init_info()
def _print_init_info(self):
self.get_logger().info("=" * 60)
self.get_logger().info("GroundedSAM2 Node with PointCloud Replay")
self.get_logger().info(f" Device: {self.device}")
self.get_logger().info(f" Replay Rate: {self.replay_rate} Hz")
self.get_logger().info("")
self.get_logger().info(" Services:")
self.get_logger().info(" /grounded_sam/save_and_replay (Trigger)")
self.get_logger().info(" - Detection 후 PC 저장 & Replay 시작")
self.get_logger().info(" /grounded_sam/stop_replay (Trigger)")
self.get_logger().info(" - Replay 중지")
self.get_logger().info(" /grounded_sam/replay_control (SetBool)")
self.get_logger().info(" - true: Replay 시작, false: Replay 중지")
self.get_logger().info("=" * 60)
# [NEW] 카메라 구독 생성 도우미 함수
def _create_camera_subscribers(self, rgb_topic, depth_topic, info_topic):
self.get_logger().info(f"Subscribing to camera: \n RGB: {rgb_topic}\n Depth: {depth_topic}\n Info: {info_topic}")
if self.image_type == 'raw':
self.rgb_sub = self.create_subscription(Image, rgb_topic, self._image_callback, 10)
else:
self.rgb_sub = self.create_subscription(CompressedImage, rgb_topic, self._image_compressed_callback, 10)
self.depth_sub = self.create_subscription(Image, depth_topic, self._depth_callback, 10)
self.info_sub = self.create_subscription(CameraInfo, info_topic, self._camera_info_callback, 10)
# [NEW] 카메라 전환 함수
def _switch_camera(self, camera_id):
# 이미 같은 카메라를 보고 있다면 전환하지 않음 (단, 토픽이 다르면 전환)
target_config = self.camera_configs.get(camera_id)
if target_config is None:
self.get_logger().warn(f"Unknown camera_id '{camera_id}'. Using current configuration.")
return
target_image_topic = target_config['image']
# 현재 토픽과 다르다면 구독자 재생성
if self.image_topic != target_image_topic:
self.get_logger().info(f"Switching camera source to '{camera_id}'...")
# 1. 기존 구독자 제거
if self.rgb_sub: self.destroy_subscription(self.rgb_sub)
if self.depth_sub: self.destroy_subscription(self.depth_sub)
if self.info_sub: self.destroy_subscription(self.info_sub)
# 2. 내부 데이터 초기화 (이전 카메라 잔상 방지)
self.latest_rgb = None
self.latest_depth = None
self.fx = None # Intrinsics 초기화
# 3. 토픽 정보 업데이트
self.image_topic = target_image_topic
self.depth_topic = target_config['depth']
self.camera_info_topic = target_config['info']
self.current_camera_id = camera_id
# 4. 새 토픽으로 구독 시작
self._create_camera_subscribers(self.image_topic, self.depth_topic, self.camera_info_topic)
# 5. 데이터가 들어올 때까지 잠시 대기 (안정화)
time.sleep(0.5)
def _load_models(self):
self.gdino_model = None
self.sam2_predictor = None
original_cwd = os.getcwd()
os.chdir(GROUNDED_SAM2_PATH)
try:
if GDINO_AVAILABLE:
self.get_logger().info("Loading GroundingDINO...")
self.gdino_model = load_gdino_model(self.gdino_config, self.gdino_checkpoint, device=self.device)
self.gdino_transform = T.Compose([
T.RandomResize([800], max_size=1333),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
self.get_logger().info("GroundingDINO loaded successfully")
if SAM2_AVAILABLE:
self.get_logger().info("Loading SAM2...")
sam2_model = build_sam2(self.sam2_config, self.sam2_checkpoint, device=self.device)
self.sam2_predictor = SAM2ImagePredictor(sam2_model)
self.get_logger().info("SAM2 loaded successfully")
except Exception as e:
self.get_logger().error(f"Model Load Error: {e}")
import traceback
traceback.print_exc()
finally:
os.chdir(original_cwd)
# ==================== Camera Callbacks ====================
def _image_callback(self, msg: Image):
try:
cv_image = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1)
if msg.encoding == 'rgb8':
self.latest_rgb = cv_image
elif msg.encoding == 'bgr8':
self.latest_rgb = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
else:
self.latest_rgb = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
self.latest_rgb_header = msg.header
# SEARCHING 또는 TRACKING 상태에서만 처리 (REPLAY 상태에서는 처리 안함)
if self.state in [self.STATE_SEARCHING, self.STATE_TRACKING]:
self.process(self.latest_rgb, msg.header)
except Exception as e:
self.get_logger().warn(f"Image Error: {e}")
def _image_compressed_callback(self, msg: CompressedImage):
try:
np_arr = np.frombuffer(msg.data, dtype=np.uint8)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
self.latest_rgb = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
self.latest_rgb_header = msg.header
if self.state in [self.STATE_SEARCHING, self.STATE_TRACKING]:
self.process(self.latest_rgb, msg.header)
except Exception as e:
self.get_logger().warn(f"Compressed Image Error: {e}")
def _depth_callback(self, msg: Image):
try:
self.latest_depth = np.frombuffer(msg.data, dtype=np.uint16).reshape(msg.height, msg.width)
self.latest_depth_header = msg.header
except Exception as e:
self.get_logger().warn(f"Depth Error: {e}")
def _camera_info_callback(self, msg: CameraInfo):
# 카메라가 변경되었거나, 처음 수신하는 경우 업데이트
if self.fx is None or self.camera_frame != msg.header.frame_id:
K = np.array(msg.k).reshape(3, 3)
self.fx, self.fy = K[0, 0], K[1, 1]
self.cx, self.cy = K[0, 2], K[1, 2]
self.camera_frame = msg.header.frame_id
self.intrinsic_width, self.intrinsic_height = msg.width, msg.height
self.saved_header_frame = self.camera_frame
self.get_logger().info(f"Camera Info Updated: Frame={self.camera_frame}, Resolution={msg.width}x{msg.height}")
# ==================== Action Callbacks ====================
def goal_callback(self, goal_request):
with self.action_lock:
# REPLAY 상태에서도 새로운 Detection 요청 허용 (Replay 중지됨)
if self.state in [self.STATE_SEARCHING, self.STATE_TRACKING]:
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
return CancelResponse.ACCEPT
def execute_callback(self, goal_handle):
# Replay 중이면 중지 & 모델 다시 로드
if self.state == self.STATE_REPLAY:
self._stop_replay()
self._reload_models()
direction = goal_handle.request.camera_id.lower()
target = goal_handle.request.target_object.strip()
self.get_logger().info(f"Action Goal: direction={direction}, target={target}")
# [NEW] 카메라 동적 전환 (Action의 direction을 이용)
self._switch_camera(direction)
with self.action_lock:
self.action_goal_handle = goal_handle
self.text_prompt = self._preprocess_prompt(target)
self.state = self.STATE_SEARCHING
self.search_start_time = time.time()
self._reset_tracking_state()
feedback = Detection.Feedback()
feedback.current_state = "SEARCHING"
goal_handle.publish_feedback(feedback)
start = time.time()
found = False
while time.time() - start < self.search_timeout:
if goal_handle.is_cancel_requested:
self._reset_state()
goal_handle.canceled()
return Detection.Result(success=False, error_message="Canceled")
if self.state == self.STATE_TRACKING:
found = True
break
time.sleep(0.05)
result = Detection.Result()
if found:
self.get_logger().info(f"Object '{target}' found! Starting auto-replay...")
result.success = True
# 자동 Replay 시작 (inference 1회 후 바로 전환)
if self.auto_replay_on_success:
# 잠시 대기하여 마지막 PC가 저장되도록 함
time.sleep(0.1)
self.replay_rate = self.auto_replay_rate
self.publish_background_in_replay = self.auto_replay_background
self._stop_tracking()
self._unload_models()
self._start_replay()
feedback.current_state = "REPLAY"
goal_handle.publish_feedback(feedback)
self.get_logger().info(f"Auto-replay started at {self.replay_rate} Hz (models unloaded)")
goal_handle.succeed()
else:
self.get_logger().warn(f"Object '{target}' not found (timeout)")
result.success = False
result.error_message = f"Object not found within {self.search_timeout}s"
self._reset_state()
goal_handle.abort()
self.action_goal_handle = None
return result
# ==================== Service Callbacks ====================
def _save_pointcloud_callback(self, request, response):
"""PointCloud 저장"""
if self.saved_object_points is None or len(self.saved_object_points) == 0:
response.success = False
response.message = "No pointcloud data. Run detection first."
response.object_points = 0
response.background_points = 0
return response
# 파일로 저장 (옵션)
if request.save_to_file:
import datetime
# [CHANGED] 저장 경로를 vision_ws/data/pointclouds/로 변경
save_dir = os.path.expanduser('/home/ubuntu/vision_ws/data/pointclouds')
os.makedirs(save_dir, exist_ok=True)
filename = request.filename if request.filename else f"pc_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.npz"
filepath = os.path.join(save_dir, filename)
np.savez(filepath,
object_points=self.saved_object_points,
object_labels=self.saved_object_labels,
background_points=self.saved_background_points,
background_labels=self.saved_background_labels,
object_info=json.dumps(self.saved_object_info),
frame_id=self.saved_header_frame)
self.get_logger().info(f"PointCloud saved to {filepath}")
response.message = f"PointCloud saved to {filepath}"
else:
response.message = "PointCloud in memory (not saved to file)"
response.success = True
response.object_points = len(self.saved_object_points) if self.saved_object_points is not None else 0
response.background_points = len(self.saved_background_points) if self.saved_background_points is not None else 0
return response
def _get_status_callback(self, request, response):
"""현재 상태 반환"""
response.state = self.state
response.has_saved_data = self.saved_object_points is not None and len(self.saved_object_points) > 0
response.object_points = len(self.saved_object_points) if self.saved_object_points is not None else 0
response.background_points = len(self.saved_background_points) if self.saved_background_points is not None else 0
response.target_object = self.text_prompt.rstrip('.')
response.replay_rate = float(self.replay_rate)
return response
def _start_replay_callback(self, request, response):
"""Replay 시작"""
if self.saved_object_points is None or len(self.saved_object_points) == 0:
response.success = False
response.message = "No saved pointcloud. Run detection first."
return response
# Rate 설정
if request.rate > 0:
self.replay_rate = request.rate
self.publish_background_in_replay = request.publish_background
# Tracking 중이면 중지
self._stop_tracking()
# [NEW] 모델 언로드 (GPU 메모리 해제)
self._unload_models()
self._start_replay()
response.success = True
response.message = f"Replay started at {self.replay_rate} Hz (models unloaded)"
return response
def _stop_replay_srv_callback(self, request, response):
"""Replay 중지 서비스 콜백"""
if self.state != self.STATE_REPLAY:
response.success = False
response.message = "Not in replay mode"
return response
self._stop_replay()
response.success = True
response.message = "Replay stopped"
self.get_logger().info(response.message)
return response
# [NEW] 모델 언로드 함수
def _unload_models(self):
"""GPU 메모리 해제를 위해 모델 언로드"""
self.get_logger().info("Unloading models to free GPU memory...")
# SAM2 언로드
if self.sam2_predictor is not None:
del self.sam2_predictor
self.sam2_predictor = None
# GroundingDINO 언로드
if self.gdino_model is not None:
del self.gdino_model
self.gdino_model = None
# GPU 캐시 정리
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.get_logger().info("Models unloaded, GPU memory freed")
# [NEW] 모델 다시 로드 (필요시)
def _reload_models(self):
"""모델 다시 로드"""
if self.gdino_model is None or self.sam2_predictor is None:
self.get_logger().info("Reloading models...")
self._load_models()
def _set_enable_callback(self, request, response):
"""Detection 활성화/비활성화"""
if request.enable:
# Replay 중이면 중지하고 IDLE로
self._stop_replay()
# [NEW] 모델 다시 로드
self._reload_models()
response.success = True
response.message = "Detection enabled (models reloaded)"
else:
# 모든 처리 중지
self._stop_replay()
self._reset_state()
# [NEW] 모델 언로드
self._unload_models()
response.success = True
response.message = "Detection disabled (models unloaded)"
self.get_logger().info(response.message)
return response
# ==================== Replay Logic ====================
def _start_replay(self):
"""Replay 모드 시작"""
with self.action_lock:
self.state = self.STATE_REPLAY
# 기존 타이머 제거
if self.replay_timer is not None:
self.replay_timer.cancel()
self.replay_timer = None
# 새 타이머 생성
period = 1.0 / self.replay_rate
self.replay_timer = self.create_timer(period, self._replay_callback)
self._publish_status("REPLAY_STARTED")
self.get_logger().info(f"Replay mode started: {self.replay_rate} Hz")
def _stop_replay(self):
"""Replay 모드 중지"""
if self.replay_timer is not None:
self.replay_timer.cancel()
self.replay_timer = None
with self.action_lock:
if self.state == self.STATE_REPLAY:
self.state = self.STATE_IDLE
self._publish_status("REPLAY_STOPPED")
def _stop_tracking(self):
"""Tracking 중지 (Detection은 유지, Replay 전환용)"""
with self.action_lock:
self.state = self.STATE_IDLE
self._reset_tracking_state()
def _replay_callback(self):
"""저장된 PointCloud 퍼블리시 (Timer callback)"""
if self.state != self.STATE_REPLAY:
return
header = Header()
header.stamp = self.get_clock().now().to_msg()
header.frame_id = self.saved_header_frame
# Object PointCloud 퍼블리시
if self.saved_object_points is not None and len(self.saved_object_points) > 0:
pc_msg = self._create_pointcloud2(
self.saved_object_points,
self.saved_object_labels,
header
)
self.pointcloud_pub.publish(pc_msg)
if self.saved_object_info:
self.object_info_pub.publish(String(data=json.dumps(self.saved_object_info)))
# Background PointCloud 퍼블리시 (옵션)
if getattr(self, 'publish_background_in_replay', True):
if self.saved_background_points is not None and len(self.saved_background_points) > 0:
bg_msg = self._create_pointcloud2(
self.saved_background_points,
self.saved_background_labels,
header
)
self.background_pub.publish(bg_msg)
# ==================== Topic Callbacks ====================
def search_callback(self, msg: String):
raw = msg.data.strip()
if not raw: return
# Replay 중이면 중지
self._stop_replay()
with self.action_lock:
if self.state in [self.STATE_SEARCHING, self.STATE_TRACKING]: return
self.text_prompt = self._preprocess_prompt(raw)
self.state = self.STATE_SEARCHING
self.search_start_time = time.time()
self._reset_tracking_state()
def stop_callback(self, msg: Bool):
if msg.data:
self._stop_replay()
self._reset_state()
self._publish_status("STOPPED")
def check_timeout(self):
if self.state == self.STATE_SEARCHING and self.search_start_time and not self.action_goal_handle:
if time.time() - self.search_start_time > self.search_timeout:
self._publish_status("NOT_FOUND")
self._reset_state()
def on_param_change(self, params):
for p in params:
if p.name == 'object_voxel_size': self.object_voxel_size = p.value
elif p.name == 'target_points': self.target_points = p.value
elif p.name == 'replay_rate':
self.replay_rate = p.value
# Replay 중이면 타이머 재설정
if self.state == self.STATE_REPLAY:
self._start_replay()
return SetParametersResult(successful=True)
def _preprocess_prompt(self, text):
t = text.lower().strip()
return t if t.endswith(".") else t + "."
def _reset_state(self):
with self.action_lock:
self.state = self.STATE_IDLE
self.text_prompt = ""
self.search_start_time = None
self._reset_tracking_state()
def _reset_tracking_state(self):
self.tracking_masks = None
self.tracking_boxes = None
self.tracking_labels = []
self.tracking_scores = []
self.current_segmentation_masks = []
def _publish_status(self, status):
self.status_pub.publish(String(data=status))
# ==================== Processing Core ====================
def process(self, img_rgb, header):
if self.state == self.STATE_SEARCHING:
self._process_search(img_rgb, header)
elif self.state == self.STATE_TRACKING:
self._process_tracking(img_rgb, header)
def _process_search(self, img_rgb, header):
if not self.gdino_model or not self.text_prompt:
return
H, W = img_rgb.shape[:2]
try:
pil_img = PILImage.fromarray(img_rgb)
img_t, _ = self.gdino_transform(pil_img, None)
boxes, scores, phrases = gdino_predict(
self.gdino_model, img_t, self.text_prompt,
self.box_threshold, self.text_threshold, self.device
)
if len(boxes) > 0:
boxes_scaled = boxes * torch.Tensor([W, H, W, H])
from torchvision.ops import box_convert
boxes_xyxy = box_convert(boxes=boxes_scaled, in_fmt="cxcywh", out_fmt="xyxy").numpy()
scores_np = scores.numpy()
self.get_logger().info(f"Found {len(boxes)} objects: {phrases}")
if self.sam2_predictor:
self._init_tracking_with_sam2(img_rgb, boxes_xyxy, phrases, scores_np)
else:
self._init_tracking_bbox_only(boxes_xyxy, phrases, scores_np)
self.state = self.STATE_TRACKING
self._publish_status("FOUND")
self._visualize_and_publish(img_rgb.copy(), header)
else:
self._publish_empty_detection(img_rgb.copy(), header)
except Exception as e:
self.get_logger().error(f"Search Error: {e}")
def _init_tracking_with_sam2(self, img_rgb, boxes, labels, scores):
self.sam2_predictor.set_image(img_rgb)
masks, _, _ = self.sam2_predictor.predict(box=boxes, multimask_output=False)
if masks.ndim == 4: masks = masks.squeeze(1)
elif masks.ndim == 3 and len(boxes) == 1: masks = masks[None]
self.tracking_masks = masks
self.tracking_boxes = boxes
self.tracking_labels = list(labels)
self.tracking_scores = scores.tolist() if hasattr(scores, 'tolist') else list(scores)
def _init_tracking_bbox_only(self, boxes, labels, scores):
self.tracking_masks = None
self.tracking_boxes = boxes
self.tracking_labels = list(labels)
self.tracking_scores = scores.tolist() if hasattr(scores, 'tolist') else list(scores)
def _process_tracking(self, img_rgb, header):
if self.tracking_boxes is None or len(self.tracking_boxes) == 0:
self._reset_state()
self._publish_status("LOST")
return
if self.sam2_predictor and self.tracking_masks is not None:
try:
self.sam2_predictor.set_image(img_rgb)
masks, _, _ = self.sam2_predictor.predict(box=self.tracking_boxes, multimask_output=False)
if masks.ndim == 4: masks = masks.squeeze(1)
elif masks.ndim == 3 and len(self.tracking_boxes) == 1: masks = masks[None]
self.tracking_masks = masks
new_boxes = []
for i, mask in enumerate(masks):
if mask.ndim == 3: mask = mask.squeeze()
ys, xs = np.where(mask)
if len(xs) > 0: new_boxes.append([xs.min(), ys.min(), xs.max(), ys.max()])
elif i < len(self.tracking_boxes): new_boxes.append(self.tracking_boxes[i])
if new_boxes: self.tracking_boxes = np.array(new_boxes)
except Exception:
pass
self._visualize_and_publish(img_rgb.copy(), header)
self._publish_status("TRACKING")
def _visualize_and_publish(self, img_copy, header):
H, W = img_copy.shape[:2]
detections_list = []
self.current_segmentation_masks = []
color_palette = [(0,255,0), (255,0,0), (0,0,255), (255,255,0), (255,0,255)]
if self.tracking_boxes is not None:
for i, box in enumerate(self.tracking_boxes):
x1, y1, x2, y2 = [int(v) for v in box]
label = self.tracking_labels[i] if i < len(self.tracking_labels) else "obj"
score = self.tracking_scores[i] if i < len(self.tracking_scores) else 0.0
color = color_palette[i % len(color_palette)]
mask_2d = None
if self.tracking_masks is not None and i < len(self.tracking_masks):
mask = self.tracking_masks[i]
if mask.ndim == 3: mask = mask.squeeze()
if mask.shape[:2] != (H, W):
mask = cv2.resize(mask.astype(np.uint8), (W, H), interpolation=cv2.INTER_NEAREST)
mask_2d = mask.astype(bool)
if mask_2d.any():
overlay = np.array(color, dtype=np.uint8)
img_copy[mask_2d] = (img_copy[mask_2d]*0.5 + overlay*0.5).astype(np.uint8)
contours, _ = cv2.findContours(mask_2d.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(img_copy, contours, -1, color, 2)
self.current_segmentation_masks.append(mask_2d)
cv2.rectangle(img_copy, (x1, y1), (x2, y2), color, 2)
cv2.putText(img_copy, f"{label} {score:.2f}", (x1, max(0, y1-5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
det = Detection2D()
det.header = header
det.bbox.center.position.x = float((x1+x2)/2)
det.bbox.center.position.y = float((y1+y2)/2)
det.bbox.size_x, det.bbox.size_y = float(x2-x1), float(y2-y1)
hyp = ObjectHypothesisWithPose()
hyp.hypothesis.class_id = label
hyp.hypothesis.score = float(score)
det.results.append(hyp)
detections_list.append(det)
img_bgr = cv2.cvtColor(img_copy, cv2.COLOR_RGB2BGR)
debug_msg = Image(header=header, height=H, width=W, encoding='bgr8', is_bigendian=0, step=W*3, data=img_bgr.tobytes())
self.debug_image_pub.publish(debug_msg)
det_array = Detection2DArray(header=header, detections=detections_list)
self.detection_pub.publish(det_array)
if self.enable_pointcloud:
self._generate_and_publish_pointcloud(detections_list, header)
def _publish_empty_detection(self, img_copy, header):
img_bgr = cv2.cvtColor(img_copy, cv2.COLOR_RGB2BGR)
H, W = img_bgr.shape[:2]
debug_msg = Image(header=header, height=H, width=W, encoding='bgr8', is_bigendian=0, step=W*3, data=img_bgr.tobytes())
self.debug_image_pub.publish(debug_msg)
self.detection_pub.publish(Detection2DArray(header=header))
# ==================== PointCloud Generation ====================
def _generate_and_publish_pointcloud(self, detections, header):
if self.fx is None or self.latest_depth is None: return
try:
depth_m = self.latest_depth.astype(np.float32) * self.depth_scale
depth_H, depth_W = depth_m.shape[:2]
full_object_mask = np.zeros((depth_H, depth_W), dtype=bool)
all_obj_points = []
all_obj_labels = []
object_info_list = []
# --- 1. 객체 PointCloud 생성 ---
for i, det in enumerate(detections):
if i >= len(self.current_segmentation_masks): continue
mask = self.current_segmentation_masks[i]
if mask is None: continue
if mask.shape[:2] != (depth_H, depth_W):
mask_resized = cv2.resize(mask.astype(np.uint8), (depth_W, depth_H), interpolation=cv2.INTER_NEAREST).astype(bool)
else:
mask_resized = mask.astype(bool)
full_object_mask = np.logical_or(full_object_mask, mask_resized)
points_3d = self._back_project_scaled(depth_m, mask_resized, depth_W, depth_H)
if len(points_3d) > 0:
# [CHANGED] Voxel size 줄여서 더 많은 포인트 유지
points_voxel = self._voxel_grid_downsample_numpy(points_3d, self.object_voxel_size)
# FPS로 정확히 target_points개 샘플링
points_fps = self._fps_sampling_numpy(points_voxel, self.target_points)
object_id = i + 1
all_obj_points.append(points_fps)
all_obj_labels.extend([object_id] * len(points_fps))
centroid = np.mean(points_fps, axis=0)
object_info_list.append({
'id': object_id,
'class': det.results[0].hypothesis.class_id,
'score': det.results[0].hypothesis.score,
'num_points': len(points_fps),
'centroid': {'x': float(centroid[0]), 'y': float(centroid[1]), 'z': float(centroid[2])}
})
# Publish & Save Object PC
pc_header = Header(stamp=self.latest_depth_header.stamp, frame_id=self.camera_frame)
if all_obj_points:
combined_points = np.vstack(all_obj_points)
combined_labels = np.array(all_obj_labels, dtype=np.int32)
# [NEW] 저장
self.saved_object_points = combined_points.copy()
self.saved_object_labels = combined_labels.copy()
self.saved_object_info = object_info_list.copy()
self.saved_header_frame = self.camera_frame
pc_msg = self._create_pointcloud2(combined_points, combined_labels, pc_header)
self.pointcloud_pub.publish(pc_msg)
self.object_info_pub.publish(String(data=json.dumps(object_info_list)))
# --- 2. 배경 PointCloud 생성 ---
bg_mask = np.logical_and(~full_object_mask, (depth_m > self.min_depth) & (depth_m < self.max_depth))
# [CHANGED] stride 줄여서 더 많은 포인트 샘플링
stride = 2 # 4 -> 2
bg_y, bg_x = np.where(bg_mask[::stride, ::stride])
bg_y *= stride
bg_x *= stride
if len(bg_y) > 0:
bg_z = depth_m[bg_y, bg_x]
if self.intrinsic_width > 0:
sx, sy = depth_W / self.intrinsic_width, depth_H / self.intrinsic_height
fx, fy, cx, cy = self.fx*sx, self.fy*sy, self.cx*sx, self.cy*sy
else:
fx, fy, cx, cy = self.fx, self.fy, self.cx, self.cy
bg_px = (bg_x - cx) * bg_z / fx
bg_py = (bg_y - cy) * bg_z / fy
bg_points_3d = np.stack([bg_px, bg_py, bg_z], axis=1)
# [CHANGED] 배경 voxel size 줄임 (0.10 -> 0.05)
bg_voxel_size = self.background_voxel_size # 파라미터 사용
bg_points_down = self._voxel_grid_downsample_numpy(bg_points_3d, bg_voxel_size)
bg_labels = np.zeros(len(bg_points_down), dtype=np.int32)
# [NEW] 저장
self.saved_background_points = bg_points_down.copy()
self.saved_background_labels = bg_labels.copy()
bg_msg = self._create_pointcloud2(bg_points_down, bg_labels, pc_header)
self.background_pub.publish(bg_msg)
except Exception as e:
self.get_logger().error(f"PC Gen Error: {e}")
def _back_project_scaled(self, depth_m, mask, w, h):
if self.intrinsic_width > 0:
sx, sy = w / self.intrinsic_width, h / self.intrinsic_height
fx, fy, cx, cy = self.fx*sx, self.fy*sy, self.cx*sx, self.cy*sy
else:
fx, fy, cx, cy = self.fx, self.fy, self.cx, self.cy
v, u = np.where(mask)
if len(v) == 0: return np.zeros((0, 3))
z = depth_m[v, u]
valid = (z > self.min_depth) & (z < self.max_depth)
u, v, z = u[valid], v[valid], z[valid]
if len(z) == 0: return np.zeros((0, 3))
x = (u - cx) * z / fx
y = (v - cy) * z / fy
return np.stack([x, y, z], axis=1)
def _voxel_grid_downsample_numpy(self, points, voxel_size):
if len(points) == 0: return points
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
_, unique_indices = np.unique(voxel_indices, axis=0, return_index=True)
return points[unique_indices]
def _fps_sampling_numpy(self, points, n_samples):
"""FPS 샘플링 - 포인트가 부족하면 복제하여 정확히 n_samples개 반환"""
N = len(points)
if N == 0: return np.zeros((n_samples, 3)) # 빈 배열 대신 0으로 채움
# [CHANGED] 포인트가 부족하면 복제하여 n_samples개 맞춤
if N < n_samples:
# 기존 포인트 반복 + 약간의 노이즈 추가
repeats = (n_samples // N) + 1
points_repeated = np.tile(points, (repeats, 1))[:n_samples]
# 작은 노이즈 추가 (중복 포인트 구분용)
noise = np.random.normal(0, 0.001, points_repeated.shape)
return points_repeated + noise
# 포인트가 충분하면 FPS 수행
PRE_SAMPLE = min(n_samples * 3, N)
if N > PRE_SAMPLE:
indices = np.random.choice(N, PRE_SAMPLE, replace=False)
points = points[indices]
N = len(points)
farthest_pts = np.zeros((n_samples, 3))
farthest_pts[0] = points[np.random.randint(N)]
distances = np.full(N, np.inf)
for i in range(1, n_samples):
dist_sq = np.sum((points - farthest_pts[i-1])**2, axis=1)
distances = np.minimum(distances, dist_sq)
farthest_pts[i] = points[np.argmax(distances)]
return farthest_pts
def _create_pointcloud2(self, points, labels, header):
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='label', offset=12, datatype=PointField.INT32, count=1),
]
point_step = 16
data = []
for i in range(len(points)):
x, y, z = points[i]
l = labels[i]
data.append(struct.pack('fffi', x, y, z, l))
return PointCloud2(
header=header, height=1, width=len(points), is_dense=True,
is_bigendian=False, fields=fields, point_step=point_step,
row_step=point_step*len(points), data=b''.join(data)
)
def main(args=None):
rclpy.init(args=args)
node = GroundedSAM2Node()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py (/home/thor/vision_ws/src/dino_detection/setup.py)
...
entry_points={
'console_scripts': [
'dino_node = grounded_dino_ros.dino_node:main',
'sam_node = grounded_dino_ros.sam_node:main',
'grounded_sam_node = grounded_dino_ros.grounded_sam_node:main',
'grounded_sam_node_gh = grounded_dino_ros.grounded_sam_node_gh:main',
],
},
노드 구동하고 액션 명령까지 내리는 전체 과정
0단계: cyclonedds.xml
# [토르]
# /home/thor/temp_cyclone/cyclonedds.xml
# [도커]
# /home/ubuntu/temp_cyclone/cyclonedds.xml
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
<Domain Id="any">
<General>
<Interfaces>
<!--<NetworkInterface name="wlx78205115>-->
<NetworkInterface name="enP2p1s0" priority="default"/>
</Interfaces>
<AllowMulticast>false</AllowMulticast>
<MaxMessageSize>65500B</MaxMessageSize>
</General>
<Discovery>
<EnableTopicDiscoveryEndpoints>true</EnableTopicDiscoveryEndpoints>
<ParticipantIndex>auto</ParticipantIndex>
<MaxAutoParticipantIndex>100</MaxAutoParticipantIndex>
<Peers>
<Peer Address="127.0.0.1"/>
<Peer Address="192.168.78.20"/>
</Peers>
</Discovery>
<Internal>
<Watermarks>
<WhcHigh>32MB</WhcHigh>
</Watermarks>
</Internal>
</Domain>
</CycloneDDS>
Local과 Docker 간의 양방향 통신 테스트를 확인하기 위해 메시지를 전달하여 확인해본다.
Host(말하기) ➡ Docker(듣기)
- Docker (듣는 쪽) 실행:
# [터미널: Docker]
ros2 topic echo /test_ping
- Host (말하는 쪽) 실행:
# [터미널: Host]
ros2 topic pub /test_ping std_msgs/msg/String "data: 'Hello from Host'"
1단계: Left 카메라 실행 (터미널 1)
먼저 카메라 드라이버를 실행하여 토픽을 발생시킨다. 코드에 정의된 경로(/camera/camera_left/...)와 맞추기 위해 namespace와 camera_name을 정확히 입력해야 한다.
또한 카메라 별로 고유의 시리얼 넘버가 존재하므로 해당하는 시리얼 번호를 같이 입력해주면 특정 카메라를 켜줄 수 있다.
# 터미널 1
# [토르]
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI=file:///home/thor/temp_cyclone/cyclonedds.xml
export ROS_DOMAIN_ID=101
ros2 launch realsense2_camera rs_launch.py \
camera_name:=camera_left \
namespace:=camera \
align_depth.enable:=true
# 다수의 카메라가 있을 때 특정 카메라만 키는 명령어 예시
# 왼쪽
ros2 launch realsense2_camera rs_launch.py \
camera_name:=camera_left \
namespace:="" \
align_depth.enable:=true \
serial_no:="819612071111"
# 오른쪽
ros2 launch realsense2_camera rs_launch.py \
camera_name:=camera_right \
namespace:="" \
align_depth.enable:=true \
serial_no:="819612072222"
# 헤드
ros2 launch realsense2_camera rs_launch.py \
camera_name:=camera_head \
namespace:="" \
align_depth.enable:=true \
serial_no:="819612073333"
2단계: 노드 빌드 및 실행 (터미널 2)
# 터미널 2
# [호스트 터미널] 컨테이너 실행 (없다면)
xhost +
docker run --rm -it \
-v /home/thor/vision_ws:/home/ubuntu/vision_ws \
-v /tmp/.X11-unix:/tmp/.X11-unix \
-v $HOME/.Xauthority:/root/.Xauthority:rw \
-v ~/temp_cyclone:/home/ubuntu/temp_cyclone \
-e DISPLAY=$DISPLAY \
-e QT_X11_NO_MITSHM=1 \
--network=host \
--gpus all \
--ipc=host \
--ulimit memlock=-1 \
--ulimit stack=67108864 \
--cap-add SYS_TIME \
--name vision_sam3 \
detect_jh_6
# [도커]
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI=file:///home/ubuntu/temp_cyclone/cyclonedds.xml
export ROS_DOMAIN_ID=101
# [도커 내부]
cd /home/ubuntu/vision_ws
# 1. 빌드 (코드를 수정했으므로 필수)
colcon build --packages-select grounded_dino_ros
# 2. 환경 설정
source install/setup.bash
# 3. 노드 실행 (네트워크 최적화 환경변수 포함)
UCX_TLS=tcp UCX_NET_DEVICES=all ros2 run grounded_dino_ros grounded_sam_node_gh
3단계: 상태 모니터링 (터미널 3)
로봇이 실제로 물체를 찾고 있는지, 포인트 클라우드를 발행하는지 확인하기 위해 모니터링을 켜둔다.
# 터미널 3
# [토르]
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI=file:///home/thor/temp_cyclone/cyclonedds.xml
export ROS_DOMAIN_ID=101
# 토픽 Hz 모니터링
ros2 topic hz /grounded_sam/pointcloud/objects
4단계: Action 명령 보내기 (터미널 4)
Action은 노드가 수행할 수 있는 기능 중 하나일 뿐이다. 따라서 노드는 항상 켜져 있어야 하고, 액션은 필요할 때만 호출하여 사용하는 구조이다.
'camera_id: left'를 지정하여 노드가 내부적으로 카메라 구독을 Left로 바꾸게 하고, 탐지를 시작한다.
# 터미널 4
# [토르]
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI=file:///home/thor/temp_cyclone/cyclonedds.xml
export ROS_DOMAIN_ID=101
# Action Goal 전송 (Left 카메라로 컵 찾기)
ros2 action send_goal /grounded_sam_detection inha_interfaces/action/Detection \
"{camera_id: 'left', target_object: 'bottle'}"
- ros2 action send_goal: ROS 2의 액션 클라이언트를 실행하여 서버에 특정 목표를 전달
- /grounded_sam_detection: 액션 서버의 이름, 실행 중인 노드가 해당 이름으로 서비스를 제공해야 한다.
- inha_interfaces/action/Detection: 액션의 타입, inha_interfaces 패키지에 정의된 Detection이라는 액션 정의 파일을 기반으로 통신한다.
- "{camera_id: 'right', target_object: 'cup'}": 서버에 전달하는 목표 데이터이다.
** inha_interfaces/action/Detection **
# Goal
string camera_id
string target_object
---
# Result
bool success
string error_message
---
# Feedback
string current_state
5단계: 결과 확인 (로그 흐름)
터미널 2 (노드 실행창)에서 다음과 같은 로그가 순서대로 찍히는지 확인한다.
1. Action Goal 수신:
[INFO]: Action Goal: direction=left, target=cup
2. 카메라 전환:
[INFO]: Switching camera source to 'left'...
[INFO]: Subscribing to camera: ... /camera/camera_left/color/image_rect_raw
3. 탐색 시작:
[INFO]: Found 1 objects: cup ...
4. 성공 및 Replay
Waiting for an action server to become available...
Sending goal:
camera_id: left
target_object: bottle
Goal accepted with ID: 6ebb626de29448009a214959e28c9789
Result:
success: true
error_message: ''
Goal finished with status: SUCCEEDED
[INFO]: Object 'cup' found! Starting auto-replay...
[INFO]: Auto-replay started at 0.5 Hz (models unloaded)
5. [터미널 3 (모니터링 창)]: 아래와 같은 근처의 값이 뜨기 시작하면 성공
average rate: 0.500
6. RQT: 시각화
rqt

7. 작업 중지
# 작업 완료 후 중지
ros2 service call /grounded_sam/stop_replay inha_interfaces/srv/StopReplay'3. Robotics > Robocup@Home 2026' 카테고리의 다른 글
| [Robocup@Home 2026] 2026.01.27 (Adobe Premiere) (0) | 2026.01.27 |
|---|---|
| [Robocup@Home 2026] 2026.01.25 (bag, URDF, Xacro) (0) | 2026.01.25 |
| [Robocup@Home 2026] 2026.01.25 (Launch, 패키지) (0) | 2026.01.25 |
| [Robocup@Home 2026] 2026.01.24 (Python 클래스에서 파라미터 사용하기, 메시지와 서비스를 만들고 적용하기) (0) | 2026.01.24 |
| [Robocup@Home 2026] 2026.01.23 (manipulation task, 서비스 노드와 클라이언트 노드 구현) (0) | 2026.01.23 |
