Python ROS 2 Package Template
The Python ROS 2 Package template generates flexible, prototype-friendly ROS 2 nodes using modern Python practices and the rclpy
library.
Overview
This template creates Python-based ROS 2 nodes with: - Modern Python: Python 3.12+ with type hints and async support - Async/Await: Native asyncio integration for concurrent operations - Rich Ecosystem: Easy integration with scientific computing, ML, and web frameworks - Testing: Comprehensive pytest integration - Documentation: Auto-generated docstrings and type annotations
Generated Structure
your_package/
├── package.xml # ROS 2 package manifest
├── setup.py # Python package configuration
├── setup.cfg # Python packaging metadata
├── README.md # Package documentation
├── CONTRIBUTING.md # Development guidelines
├── Agents.md # AI interaction guide
├── your_package/
│ ├── __init__.py # Package initialization
│ ├── your_package_node.py # Main node implementation
│ └── utils.py # Utility functions
├── launch/
│ └── your_package_node.launch.py # Launch configuration
├── resource/
│ └── your_package # Ament index resource
├── test/
│ ├── __init__.py # Test package
│ └── test_your_package_node.py # Unit tests
└── docs/
└── index.md # Package documentation
Key Features
Async/Await Support
Native asyncio integration for concurrent operations:
import asyncio
import rclpy
from rclpy.node import Node
class AsyncNode(Node):
def __init__(self):
super().__init__('async_node')
async def run_async_operation(self):
"""Example async method"""
await asyncio.sleep(1.0)
self.get_logger().info('Async operation completed')
def timer_callback(self):
"""Timer callback that can spawn async tasks"""
asyncio.create_task(self.run_async_operation())
Type Hints and Documentation
Comprehensive type annotations and docstrings:
from typing import Optional, List
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class PublisherNode(Node):
"""A ROS 2 node that publishes string messages.
This node demonstrates basic publishing functionality with
configurable message content and publishing rate.
"""
def __init__(self, topic_name: str = 'chatter',
publish_rate: float = 1.0) -> None:
"""Initialize the publisher node.
Args:
topic_name: Name of the topic to publish to
publish_rate: Publishing frequency in Hz
"""
super().__init__('publisher_node')
self.publisher: rclpy.publisher.Publisher[String] = self.create_publisher(
String, topic_name, 10)
self.timer: rclpy.timer.Timer = self.create_timer(
1.0 / publish_rate, self.timer_callback)
self.message_count: int = 0
def timer_callback(self) -> None:
"""Publish a message on timer trigger."""
msg = String()
msg.data = f'Hello ROS 2! Message #{self.message_count}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.message_count += 1
pytest Integration
Comprehensive testing with ROS 2 fixtures:
import pytest
import rclpy
from rclpy.node import Node
from your_package.your_package_node import PublisherNode
class TestPublisherNode:
"""Test cases for PublisherNode."""
@pytest.fixture
def node(self):
"""Fixture providing a PublisherNode instance."""
rclpy.init()
node = PublisherNode()
yield node
node.destroy_node()
rclpy.shutdown()
def test_node_creation(self, node):
"""Test that node is created successfully."""
assert node.get_name() == 'publisher_node'
assert node.publisher is not None
def test_parameter_declaration(self, node):
"""Test parameter handling."""
# Test parameter access and modification
pass
Usage Examples
Basic Publisher
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Service Server
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(
AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(
f'Incoming request: {request.a} + {request.b} = {response.sum}')
return response
def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
rclpy.spin(minimal_service)
minimal_service.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Async Operations
#!/usr/bin/env python3
import asyncio
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class AsyncPublisher(Node):
def __init__(self):
super().__init__('async_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
"""Timer callback that demonstrates async operations."""
asyncio.create_task(self.publish_async())
async def publish_async(self):
"""Async publishing method."""
# Simulate async I/O operation
await asyncio.sleep(0.1)
msg = String()
msg.data = 'Async message'
self.publisher_.publish(msg)
self.get_logger().info('Published async message')
def main(args=None):
rclpy.init(args=args)
node = AsyncPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Configuration Options
Communication Patterns
- Publishers: Data broadcasting with configurable QoS
- Subscribers: Data reception with callback functions
- Services: Synchronous request-response communication
- Actions: Long-running task coordination
- Parameters: Dynamic configuration management
Quality of Service (QoS)
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
# Best effort, volatile QoS for sensor data
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
depth=10
)
# Reliable, transient local QoS for state data
state_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
depth=1
)
Building and Testing
Installation
# Install the package
pip install -e .
# Or using colcon
colcon build --packages-select your_package
source install/setup.bash
Running Tests
# Using pytest directly
pytest
# Using colcon
colcon test --packages-select your_package
colcon test-result --verbose
Running the Node
# Direct execution
python -m your_package.your_package_node
# Using ROS 2 launch
ros2 launch your_package your_package_node.launch.py
# Using ros2 run (if executable entry point is configured)
ros2 run your_package your_package_node
Best Practices
Performance Optimization
- Minimize global interpreter lock (GIL) impact
- Use numpy for numerical computations
- Consider multiprocessing for CPU-intensive tasks
- Use async operations for I/O-bound tasks
Error Handling
try:
# ROS 2 operations
pass
except rclpy.exceptions.ROSInterruptException:
pass # Expected during shutdown
except Exception as e:
self.get_logger().error(f'Unexpected error: {e}')
raise
Logging
# Use appropriate log levels
self.get_logger().debug('Detailed debugging information')
self.get_logger().info('General information messages')
self.get_logger().warn('Warning conditions')
self.get_logger().error('Error conditions')
self.get_logger().fatal('Fatal error conditions')
Parameter Management
# Declare parameters with defaults
self.declare_parameter('rate', 1.0)
self.declare_parameter('topic_name', 'default_topic')
# Access parameters
rate = self.get_parameter('rate').value
topic = self.get_parameter('topic_name').value
# Parameter callbacks for dynamic reconfiguration
def parameter_callback(self, params):
for param in params:
if param.name == 'rate':
# Update rate logic
pass
return SetParametersResult(successful=True)
self.add_on_set_parameters_callback(self.parameter_callback)
Integration Examples
NumPy Integration
import numpy as np
from sensor_msgs.msg import PointCloud2
class PointCloudProcessor(Node):
def __init__(self):
super().__init__('pointcloud_processor')
self.subscription = self.create_subscription(
PointCloud2, 'pointcloud', self.process_pointcloud, 10)
def process_pointcloud(self, msg):
"""Process point cloud data using NumPy."""
# Convert ROS message to numpy array
points = self.pointcloud2_to_numpy(msg)
# Perform computations
centroids = np.mean(points, axis=0)
self.get_logger().info(f'Centroid: {centroids}')
Web Framework Integration
from flask import Flask
import threading
class WebEnabledNode(Node):
def __init__(self):
super().__init__('web_node')
self.app = Flask(__name__)
@self.app.route('/status')
def status():
return {'node_name': self.get_name(), 'status': 'running'}
# Run Flask in separate thread
self.web_thread = threading.Thread(target=self.app.run)
self.web_thread.daemon = True
self.web_thread.start()
Troubleshooting
Common Issues
Import Errors
Symptoms: ModuleNotFoundError
when running
Solution: Ensure package is installed and Python path is correct
Async Operation Issues
Symptoms: Async operations not working as expected Solution: Ensure proper event loop management and avoid blocking operations
Parameter Access Fails
Symptoms: Parameter values not updating Solution: Check parameter declaration and callback registration
Debug Tips
- Use
rclpy.logging
for detailed logging configuration - Enable debug logging:
ros2 run your_package your_node --ros-args --log-level debug
- Use Python debugger:
import pdb; pdb.set_trace()
- Profile performance with
cProfile
Advanced Usage
Multi-Threaded Executors
import rclpy
from rclpy.executors import MultiThreadedExecutor
def main(args=None):
rclpy.init(args=args)
# Create nodes
node1 = Node1()
node2 = Node2()
# Use multi-threaded executor
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node1)
executor.add_node(node2)
try:
executor.spin()
finally:
executor.shutdown()
node1.destroy_node()
node2.destroy_node()
rclpy.shutdown()
Custom Message Types
# In setup.py, add message dependencies
# In package.xml, add <depend>your_custom_msgs</depend>
from your_custom_msgs.msg import CustomMessage
class CustomMessageNode(Node):
def __init__(self):
super().__init__('custom_node')
self.publisher_ = self.create_publisher(CustomMessage, 'custom_topic', 10)
Lifecycle Management
from rclpy.lifecycle import LifecycleNode, LifecycleState, TransitionCallbackReturn
class LifecycleExampleNode(LifecycleNode):
def __init__(self):
super().__init__('lifecycle_node')
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Configuring...')
# Initialize resources
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Activating...')
# Start operations
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Deactivating...')
# Stop operations
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Cleaning up...')
# Release resources
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Shutting down...')
# Final cleanup
return TransitionCallbackReturn.SUCCESS
Migration from ROS 1
Key differences when migrating from ROS 1 Python nodes:
- Replace rospy
with rclpy
- Use Node
instead of node handles
- Update message/service type imports
- Use rclpy.spin()
instead of rospy.spin()
- Update parameter access methods
- Use modern Python features and async patterns
Contributing
To improve the Python template:
- Add more async operation examples
- Include additional testing patterns
- Enhance type hint coverage
- Add performance benchmarking
- Include more integration examples
See the contributing guide for details on modifying templates.