# -*- coding: utf-8 -*-
#
# This file is part of MARV Robotics
#
# Copyright 2016-2017 Ternaris
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import os
import re
import sys
from collections import defaultdict, namedtuple
from itertools import groupby
from logging import getLogger
import capnp
import genpy
import rosbag
import marv
import marv_nodes
from marv.scanner import DatasetInfo
from .bag_capnp import Bagmeta, Header, Message
_Baginfo = namedtuple('Baginfo', ('filename', 'name', 'timestamp', 'idx'))
class Baginfo(_Baginfo):
def __new__(cls, filename, name, timestamp=None, idx=None):
idx = None if idx is None else int(idx)
return super(Baginfo, cls).__new__(cls, filename, name, timestamp, idx)
[docs]def scan(dirpath, dirnames, filenames):
"""Default scanner for ROS bag files
The file naming convention employed by ``rosbag record`` is
understood to create sets of split bags. Bags that do not fit that
naming convention will be treated as individual datasets.
For more information on scanners see :any:`marv.scanner`.
Args:
dirpath (str): The path to the directory currently being
scanned.
dirnames (str): Sorted list of subdirectories of the directory
currently being scanned. Change this in-place to control
further traversal.
filenames (str): Sorted list of files within the directory
currently being scanned.
Returns:
A list of :class:`marv.scanner.DatasetInfo` instances mapping
set of files to dataset names. Absolute filenames must
start with :paramref:`.dirpath`, relative filenames are
automatically prefixed with it.
See :ref:`cfg_c_scanner` config key.
"""
log = getLogger('{}.{}'.format(__name__, 'scan'))
regex = re.compile(r'^(.+?)(?:_(\d{4}(?:-\d{2}){5})_(\d+))?.bag$')
groups = groupby([Baginfo(x, *re.match(regex, x).groups())
for x in reversed(filenames)
if x.endswith('.bag')],
lambda x: x.name)
orphans = []
datasets = []
for name, group in groups:
group = list(group)
bags = []
prev_idx = None
for bag in group:
idx = bag.idx
expected_idx = idx if prev_idx is None else prev_idx - 1
if idx != expected_idx:
orphans.extend(bags)
bags[:] = []
bags.insert(0, os.path.join(dirpath, bag.filename))
prev_idx = idx
if not idx:
datasets.insert(0, DatasetInfo(name, bags))
bags = []
orphans.extend(bags)
for orphan in sorted(orphans):
log.warn("Orphaned bag '%s'", orphan)
return datasets
def read_messages(paths, topics=None, start_time=None, end_time=None):
"""Iterate chronologically raw BagMessage for topic from paths."""
bags = {path: rosbag.Bag(path) for path in paths}
gens = {path: bag.read_messages(topics=topics, start_time=start_time,
end_time=end_time, raw=True)
for path, bag in bags.items()}
msgs = {}
prev_timestamp = genpy.Time(0)
while True:
for key in (gens.viewkeys() - msgs.viewkeys()):
try:
msgs[key] = gens[key].next()
except StopIteration:
bags[key].close()
del bags[key]
del gens[key]
if not msgs:
break
next_key = reduce(lambda x, y: x if x[1].timestamp < y[1].timestamp else y,
msgs.items())[0]
next_msg = msgs.pop(next_key)
assert next_msg.timestamp >= prev_timestamp
yield next_msg
prev_timestamp = next_msg.timestamp
[docs]@marv.node(Message, Header, group='ondemand')
@marv.input('dataset', marv_nodes.dataset)
@marv.input('bagmeta', bagmeta)
def raw_messages(dataset, bagmeta):
"""Stream messages from a set of bag files."""
bagmeta, dataset = yield marv.pull_all(bagmeta, dataset)
bagtopics = {x.name: x for x in bagmeta.topics}
paths = [x.path for x in dataset.files if x.path.endswith('.bag')]
requested = yield marv.get_requested()
alltopics = set()
bytopic = defaultdict(list)
groups = {}
for name in [x.name for x in requested]:
if ':' in name:
reqtop, reqtype = name.split(':')
topics = [topic.name for topic in bagmeta.topics
if ((reqtop == '*' or reqtop == topic.name) and
(reqtype == '*' or reqtype == topic.msg_type))]
group = groups[name] = yield marv.create_group(name)
create_stream = group.create_stream
else:
topics = [name] if name in bagtopics else []
group = None
create_stream = marv.create_stream
for topic in topics:
info = bagtopics[topic]
# TODO: start/end_time per topic?
header = {'start_time': bagmeta.start_time,
'end_time': bagmeta.end_time,
'msg_count': info.msg_count,
'msg_type': info.msg_type,
'topic': topic}
stream = yield create_stream(topic, **header)
bytopic[topic].append(stream)
alltopics.update(topics)
if group:
yield group.finish()
if not alltopics:
return
for topic, raw, t in read_messages(paths, topics=list(alltopics)):
dct = {'data': raw[1], 'timestamp': t.to_nsec()}
for stream in bytopic[topic]:
yield stream.msg(dct)
messages = raw_messages