ctf: remove `intersection-range-ns` from `babeltrace.trace-info` query
authorSimon Marchi <simon.marchi@efficios.com>
Wed, 28 Aug 2019 20:31:12 +0000 (16:31 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Wed, 4 Sep 2019 15:38:52 +0000 (11:38 -0400)
The `intersection-range-ns` field of the `src.ctf.fs` component class
response to the `babeltrace.trace-info` query is not essential, since it
can easily be computed from the stream ranges also in the response.
Remove it, and adjust users to compute the intersection themselves.

One user is the CLI and the other is the TraceCollectionMessageIterator
class in the Python bindings.

There is a small implementation detail change in
TraceCollectionMessageIterator.  Previously, we would do one
`babeltrace.trace-info` query per port.  With this patch, we pre-compute
the intersection range for each source component port and store it in a
port name to intersection mapping.  When we create the trimmers, we just look
up that value.

It results in less queries and avoids computing the intersections many
times.  It is also in line with how the CLI does.

Change-Id: Ic5ebded4d810ef992594941dde3316613909c98c
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1994
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/bindings/python/bt2/bt2/trace_collection_message_iterator.py
src/cli/babeltrace2.c
src/plugins/ctf/fs-src/query.c
tests/plugins/src.ctf.fs/query/test_query_trace_info.py

index 74417776d07d15170258b931f075cb1516779efc..4e7347a92a01128a6265f4b2328739540d3044c5 100644 (file)
@@ -339,6 +339,47 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
 
         self._build_graph()
 
+    def _compute_stream_intersections(self):
+        # Pre-compute the trimmer range to use for each port in the graph, when
+        # stream intersection mode is enabled.
+        self._stream_inter_port_to_range = {}
+
+        for src_comp_and_spec in self._src_comps_and_specs:
+            try:
+                inputs = src_comp_and_spec.spec.params['inputs']
+            except KeyError as e:
+                raise ValueError(
+                    'all source components must be created with an "inputs" parameter in stream intersection mode'
+                ) from e
+
+            params = {'inputs': inputs}
+
+            # query the port's component for the `babeltrace.trace-info`
+            # object which contains the range for each stream, from which we can
+            # compute the intersection of the streams in each trace.
+            query_exec = bt2.QueryExecutor(
+                src_comp_and_spec.spec.component_class, 'babeltrace.trace-info', params
+            )
+            trace_infos = query_exec.query()
+
+            for trace_info in trace_infos:
+                begin = max(
+                    [stream['range-ns']['begin'] for stream in trace_info['streams']]
+                )
+                end = min(
+                    [stream['range-ns']['end'] for stream in trace_info['streams']]
+                )
+
+                # Each port associated to this trace will have this computed
+                # range.
+                for stream in trace_info['streams']:
+                    # A port name is unique within a component, but not
+                    # necessarily across all components.  Use a component
+                    # and port name pair to make it unique across the graph.
+                    port_name = str(stream['port-name'])
+                    key = (src_comp_and_spec.comp.addr, port_name)
+                    self._stream_inter_port_to_range[key] = (begin, end)
+
     def _validate_source_component_specs(self, comp_specs):
         for comp_spec in comp_specs:
             if (
@@ -367,49 +408,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
         return msg
 
     def _create_stream_intersection_trimmer(self, component, port):
-        # find the original parameters specified by the user to create
-        # this port's component to get the `inputs` parameter
-        for src_comp_and_spec in self._src_comps_and_specs:
-            if component == src_comp_and_spec.comp:
-                break
-
-        try:
-            inputs = src_comp_and_spec.spec.params['inputs']
-        except Exception as e:
-            raise ValueError(
-                'all source components must be created with an "inputs" parameter in stream intersection mode'
-            ) from e
-
-        params = {'inputs': inputs}
-
-        # query the port's component for the `babeltrace.trace-info`
-        # object which contains the stream intersection range for each
-        # exposed trace
-        query_exec = bt2.QueryExecutor(
-            src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params
-        )
-        trace_info_res = query_exec.query()
-        begin = None
-        end = None
-
-        # find the trace info for this port's trace
-        try:
-            for trace_info in trace_info_res:
-                for stream in trace_info['streams']:
-                    if stream['port-name'] == port.name:
-                        range_ns = trace_info['intersection-range-ns']
-                        begin = range_ns['begin']
-                        end = range_ns['end']
-                        break
-        except Exception:
-            pass
-
-        if begin is None or end is None:
-            raise RuntimeError(
-                'cannot find stream intersection range for port "{}"'.format(port.name)
-            )
-
-        name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name)
+        key = (component.addr, port.name)
+        begin, end = self._stream_inter_port_to_range[key]
+        name = 'trimmer-{}-{}'.format(component.name, port.name)
         return self._create_trimmer(begin, end, name)
 
     def _create_muxer(self):
@@ -574,6 +575,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
             comp = self._create_comp(comp_spec)
             self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
 
+        if self._stream_intersection_mode:
+            self._compute_stream_intersections()
+
         # Now we connect the ports which exist at this point. We allow
         # self._graph_port_added() to automatically connect _new_ ports.
         self._connect_ports = True
index 0b6f2341b295d49b99b07ce091f750ad45846815..82406a0beeb028e77995b5a0db4d81f43fdb27c6 100644 (file)
@@ -1919,6 +1919,118 @@ end:
        return ret;
 }
 
+/*
+ * Compute the intersection of all streams in the array `streams`, write it
+ * in `range`.
+ */
+
+static
+int compute_stream_intersection(const bt_value *streams,
+               struct trace_range *range)
+{
+       unsigned int i;
+       unsigned int stream_count;
+       int ret;
+
+       BT_ASSERT(bt_value_get_type(streams) == BT_VALUE_TYPE_ARRAY);
+
+       stream_count = bt_value_array_get_length(streams);
+
+       BT_ASSERT(stream_count > 0);
+
+       range->intersection_range_begin_ns = 0;
+       range->intersection_range_end_ns = UINT64_MAX;
+
+       for (i = 0; i < stream_count; i++) {
+               int64_t begin_ns, end_ns;
+               uint64_t begin_ns_u, end_ns_u;
+               const bt_value *stream_value;
+               const bt_value *range_ns_value;
+               const bt_value *begin_value;
+               const bt_value *end_value;
+
+               stream_value = bt_value_array_borrow_element_by_index_const(streams, i);
+               if (bt_value_get_type(stream_value) != BT_VALUE_TYPE_MAP) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "expected streams array element to be a map, got %s.",
+                               bt_common_value_type_string(bt_value_get_type(stream_value)));
+                       goto error;
+               }
+
+               range_ns_value = bt_value_map_borrow_entry_value_const(
+                       stream_value, "range-ns");
+               if (!range_ns_value) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "missing expected `range-ns` key in stream map.");
+                       goto error;
+               }
+
+               if (bt_value_get_type(range_ns_value) != BT_VALUE_TYPE_MAP) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "expected `range-ns` entry value of stream map to be a map, got %s.",
+                               bt_common_value_type_string(bt_value_get_type(range_ns_value)));
+                       goto error;
+               }
+
+               begin_value = bt_value_map_borrow_entry_value_const(range_ns_value, "begin");
+               if (!begin_value) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "missing expected `begin` key in range-ns map.");
+                       goto error;
+               }
+
+               if (bt_value_get_type(begin_value) != BT_VALUE_TYPE_SIGNED_INTEGER) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "expected `begin` entry value of range-ns map to be a signed integer, got %s.",
+                               bt_common_value_type_string(bt_value_get_type(range_ns_value)));
+                       goto error;
+               }
+
+               end_value = bt_value_map_borrow_entry_value_const(range_ns_value, "end");
+               if (!end_value) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "missing expected `end` key in range-ns map.");
+                       goto error;
+               }
+
+               if (bt_value_get_type(end_value) != BT_VALUE_TYPE_SIGNED_INTEGER) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: "
+                               "expected `end` entry value of range-ns map to be a signed integer, got %s.",
+                               bt_common_value_type_string(bt_value_get_type(range_ns_value)));
+                       goto error;
+               }
+
+               begin_ns = bt_value_integer_signed_get(begin_value);
+               end_ns = bt_value_integer_signed_get(end_value);
+
+               if (begin_ns < 0 || end_ns < 0 || end_ns < begin_ns) {
+                       BT_CLI_LOGE_APPEND_CAUSE(
+                               "Invalid stream range values: "
+                               "range-ns:begin=%" PRId64 ", "
+                               "range-ns:end=%" PRId64,
+                               begin_ns, end_ns);
+                       ret = -1;
+                       goto error;
+               }
+
+               begin_ns_u = begin_ns;
+               end_ns_u = end_ns;
+
+               range->intersection_range_begin_ns =
+                       MAX(range->intersection_range_begin_ns, begin_ns_u);
+               range->intersection_range_end_ns =
+                       MIN(range->intersection_range_end_ns, end_ns_u);
+       }
+
+       ret = 0;
+       goto end;
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
 static
 int set_stream_intersections(struct cmd_run_ctx *ctx,
                struct bt_config_component *cfg_comp,
@@ -1929,13 +2041,10 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
        int64_t trace_count;
        const bt_value *query_result = NULL;
        const bt_value *trace_info = NULL;
-       const bt_value *intersection_range = NULL;
-       const bt_value *intersection_begin = NULL;
-       const bt_value *intersection_end = NULL;
        const bt_value *stream_infos = NULL;
        const bt_value *stream_info = NULL;
        struct port_id *port_id = NULL;
-       struct trace_range *trace_range = NULL;
+       struct trace_range *stream_intersection = NULL;
        const char *fail_reason = NULL;
        const bt_component_class *comp_cls =
                bt_component_class_source_as_component_class_const(src_comp_cls);
@@ -1968,9 +2077,9 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
        }
 
        for (trace_idx = 0; trace_idx < trace_count; trace_idx++) {
-               int64_t begin, end;
                uint64_t stream_idx;
                int64_t stream_count;
+               struct trace_range trace_intersection;
 
                trace_info = bt_value_array_borrow_element_by_index_const(
                        query_result, trace_idx);
@@ -1980,43 +2089,6 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
                        goto error;
                }
 
-               intersection_range = bt_value_map_borrow_entry_value_const(
-                       trace_info, "intersection-range-ns");
-               if (!intersection_range) {
-                       ret = -1;
-                       BT_LOGD_STR("Cannot retrieve \'intersetion-range-ns\' field from query result.");
-                       goto error;
-               }
-
-               intersection_begin = bt_value_map_borrow_entry_value_const(intersection_range,
-                                                                          "begin");
-               if (!intersection_begin) {
-                       ret = -1;
-                       BT_LOGD_STR("Cannot retrieve intersection-range-ns \'begin\' field from query result.");
-                       goto error;
-               }
-
-               intersection_end = bt_value_map_borrow_entry_value_const(intersection_range,
-                                                                        "end");
-               if (!intersection_end) {
-                       ret = -1;
-                       BT_LOGD_STR("Cannot retrieve intersection-range-ns \'end\' field from query result.");
-                       goto error;
-               }
-
-               begin = bt_value_integer_signed_get(intersection_begin);
-               end = bt_value_integer_signed_get(intersection_end);
-
-               if (begin < 0 || end < 0 || end < begin) {
-                       BT_CLI_LOGE_APPEND_CAUSE(
-                               "Invalid trace stream intersection values: "
-                               "intersection-range-ns:begin=%" PRId64
-                               ", intersection-range-ns:end=%" PRId64,
-                               begin, end);
-                       ret = -1;
-                       goto error;
-               }
-
                stream_infos = bt_value_map_borrow_entry_value_const(trace_info,
                                                                     "streams");
                if (!stream_infos || !bt_value_is_array(stream_infos)) {
@@ -2031,6 +2103,12 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
                        goto error;
                }
 
+               ret = compute_stream_intersection(stream_infos, &trace_intersection);
+               if (ret != 0) {
+                       BT_CLI_LOGE_APPEND_CAUSE("Failed to compute trace streams intersection.");
+                       goto error;
+               }
+
                for (stream_idx = 0; stream_idx < stream_count; stream_idx++) {
                        const bt_value *port_name;
 
@@ -2049,15 +2127,15 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
                                goto error;
                        }
 
-                       trace_range = g_new0(struct trace_range, 1);
-                       if (!trace_range) {
+                       stream_intersection = g_new0(struct trace_range, 1);
+                       if (!stream_intersection) {
                                ret = -1;
                                BT_CLI_LOGE_APPEND_CAUSE(
                                        "Cannot allocate memory for trace_range structure.");
                                goto error;
                        }
-                       trace_range->intersection_range_begin_ns = begin;
-                       trace_range->intersection_range_end_ns = end;
+
+                       *stream_intersection = trace_intersection;
 
                        stream_info = bt_value_array_borrow_element_by_index_const(
                                stream_infos, stream_idx);
@@ -2086,10 +2164,10 @@ int set_stream_intersections(struct cmd_run_ctx *ctx,
 
                        BT_LOGD("Inserting stream intersection ");
 
-                       g_hash_table_insert(ctx->intersections, port_id, trace_range);
+                       g_hash_table_insert(ctx->intersections, port_id, stream_intersection);
 
                        port_id = NULL;
-                       trace_range = NULL;
+                       stream_intersection = NULL;
                }
        }
 
@@ -2102,7 +2180,7 @@ error:
 end:
        bt_value_put_ref(query_result);
        g_free(port_id);
-       g_free(trace_range);
+       g_free(stream_intersection);
        return ret;
 }
 
index 35e4ec97eaf49c22f9fe3320e6b6311d8101eae4..cc58872c8092d480c9cb64de4293c4342bed2034 100644 (file)
@@ -285,11 +285,6 @@ int populate_trace_info(const struct ctf_fs_trace *trace, bt_value *trace_info)
        bt_value_map_insert_entry_status insert_status;
        bt_value_array_append_element_status append_status;
        bt_value *file_groups = NULL;
-       struct range trace_intersection = {
-               .begin_ns = 0,
-               .end_ns = INT64_MAX,
-               .set = false,
-       };
 
        BT_ASSERT(trace->ds_file_groups);
        /* Add trace range info only if it contains streams. */
@@ -324,22 +319,6 @@ int populate_trace_info(const struct ctf_fs_trace *trace, bt_value *trace_info)
                if (ret) {
                        goto end;
                }
-
-               if (group_range.set) {
-                       trace_intersection.begin_ns = MAX(trace_intersection.begin_ns,
-                                       group_range.begin_ns);
-                       trace_intersection.end_ns = MIN(trace_intersection.end_ns,
-                                       group_range.end_ns);
-                       trace_intersection.set = true;
-               }
-       }
-
-       if (trace_intersection.begin_ns < trace_intersection.end_ns) {
-               ret = add_range(trace_info, &trace_intersection,
-                               "intersection-range-ns");
-               if (ret) {
-                       goto end;
-               }
        }
 
 end:
index 1c532bb54f8474159ca2a8efe3bf7c7b37192452..52ce764cdcfd7d54292791fe830e47a4d861e0a9 100644 (file)
@@ -39,13 +39,6 @@ class QueryTraceInfoClockOffsetTestCase(unittest.TestCase):
         ]
 
     def _check(self, trace, offset):
-        self.assertEqual(
-            trace['intersection-range-ns']['begin'], 13515309000000070 + offset
-        )
-        self.assertEqual(
-            trace['intersection-range-ns']['end'], 13515309000000100 + offset
-        )
-
         streams = sorted(trace['streams'], key=sort_predictably)
         self.assertEqual(streams[0]['range-ns']['begin'], 13515309000000000 + offset)
         self.assertEqual(streams[0]['range-ns']['end'], 13515309000000100 + offset)
This page took 0.0320510000000001 seconds and 4 git commands to generate.