#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Programmatically control a ROS bag file.
This module implements the base class, and the various functions.
Currently implemented are:
* ``rosbag play``
"""
import logging
import subprocess as sp
import time
try:
from types import StringTypes
except ImportError:
StringTypes = str
try:
input = raw_input
except NameError:
pass
logger = logging.getLogger("bag_player")
[docs]class BagError(Exception):
"""
Catch bag player exceptions.
"""
[docs]class MissingBagError(BagError):
"""
Bag file was not specified.
"""
msg = "No Bag files were specified."
[docs]class BagNotRunningError(BagError):
"""
Raised when interaction is attempted with a bag file which is not running.
"""
def __init__(self, action="talk to"):
message = u"Cannot {} process while bag is not running.".format(action)
super(BagNotRunningError, self).__init__(message)
[docs]class Bag(object):
"""
Open and manipulate a bag file programmatically.
Parameters
----------
filenames : StringTypes | List[StringTypes]
The location of the bag files.
Attributes
----------
filenames : List[StringTypes]
The location of the bag files.
process : subprocess.Popen
The process containing the running bag file.
"""
def __init__(self, filenames):
if filenames in ("", u"", []):
raise MissingBagError
if isinstance(filenames, StringTypes):
filenames = [filenames]
self.filenames = filenames
self.process = None
[docs] def send(self, string):
"""
Write something to process stdin.
Parameters
----------
string : str
The string to write.
Raises
------
BagNotRunningError
If interaction is attempted when the bag file is not running.
"""
try:
self.process.stdin.write(string)
except AttributeError:
raise BagNotRunningError()
[docs] def stop(self):
"""
Stop a running bag file.
Raises
------
BagNotRunningError
If the bag file is not running.
"""
try:
self.process.terminate()
self.process.kill()
except AttributeError:
raise BagNotRunningError("stop")
[docs] def wait(self):
"""
Block until process is complete.
Raises
------
BagNotRunningError
If the bag file is not running.
"""
try:
self.process.wait()
except AttributeError:
raise BagNotRunningError("wait for")
@property
def is_running(self):
"""
Check whether the bag file is running.
Returns
-------
bool
The bag file is running.
"""
try:
return self.process.poll() is None
except AttributeError:
return False
[docs] def __enter__(self):
"""
Context manager entry point.
"""
return self
# noinspection PyUnusedLocal
[docs] def __exit__(self, exc_type, exc_value, traceback):
"""
Context manager exit point.
"""
time.sleep(1) # For pretty output.
if self.is_running:
if exc_type is None:
logger.warning("Exited while process is still running.")
logger.info("Hint: Use Bag.wait() or Bag.play(wait=True) "
"to wait until completion.")
else:
self.stop()
if exc_type == KeyboardInterrupt:
logger.info("User exit.")
return True
elif exc_type is not None:
logger.critical("An error occurred. Exiting.")
else:
logger.info("Goodbye!")
def __repr__(self):
return "<Bag({})>".format(self.filenames)
[docs]class BagPlayer(Bag):
"""
Play Bag files.
"""
[docs] def play(self, wait=False, stdin=sp.PIPE, stdout=None, stderr=None,
quiet=None, immediate=None, start_paused=None, queue_size=None,
publish_clock=None, clock_publish_freq=None, delay=None,
publish_rate_multiplier=None, start_time=None, duration=None,
loop=None, keep_alive=None):
"""
Play the bag file.
Parameters
----------
wait : Optional[Bool]
Wait until completion.
stdin : Optional[file]
The stdin buffer. Default is subprocess.PIPE.
stdout : Optional[file]
The stdout buffer.
stderr : Optional[file]
The stderr buffer.
quiet : Optional[Bool]
Suppress console output.
immediate : Optional[Bool]
Play back all messages without waiting.
start_paused : Optional[Bool]
Start in paused mode.
queue_size : Optional[int]
Set outgoing queue size. Default is 100.
publish_clock : Optional[Bool]
Publish the clock time.
clock_publish_freq : Optional[float]
The frequency, in Hz, at which to publish the clock time. Default is
100.
delay : Optional[float]
The number of seconds to sleep afer every advertise call (e.g., to
allow subscribers to connect).
publish_rate_multiplier : Optional[float]
The factor by which to multiply the publish rate.
start_time : Optional[float]
The number of seconds into the bag file at which to start.
duration : Optional[float]
The number of seconds from the start to play.
loop : Optional[Bool]
Loop playback.
keep_alive : Optional[Bool]
Keep alive past end of bag (e.g. for publishing latched topics).
"""
arguments = ["rosbag", "play"]
arguments.extend(self.filenames)
if quiet:
arguments.append("-q")
if immediate:
arguments.append("-i")
if start_paused:
arguments.append("--pause")
if queue_size is not None:
arguments.append("--queue={}".format(queue_size))
if publish_clock:
arguments.append("--clock")
if clock_publish_freq is not None:
arguments.append("--hz={}".format(clock_publish_freq))
if delay is not None:
arguments.append("--delay={}".format(delay))
if publish_rate_multiplier is not None:
arguments.append("--rate={}".format(publish_rate_multiplier))
if start_time is not None:
arguments.append("--start={}".format(start_time))
if duration is not None:
arguments.append("--duration={}".format(duration))
if loop:
arguments.append("-l")
if keep_alive:
arguments.append("-k")
self.process = sp.Popen(arguments,
stdin=stdin, stdout=stdout, stderr=stderr)
if wait:
self.wait()
[docs] def pause(self):
"""
Pause the bag file.
"""
self.send(" ")
[docs] def resume(self):
"""
Resume the bag file.
"""
self.send(" ")
[docs] def step(self):
"""
Step through a paused bag file.
"""
self.send("s")