From 30947af01a064c13fd12c8faf8f13e3d9fd8087f Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Wed, 28 Aug 2019 16:31:12 -0400 Subject: [PATCH] ctf: remove `intersection-range-ns` from `babeltrace.trace-info` query The `intersection-range-ns` field of the `src.ctf.fs` component class response to the `babeltrace.trace-info` query is not essential, since it can easily be computed from the stream ranges also in the response. Remove it, and adjust users to compute the intersection themselves. One user is the CLI and the other is the TraceCollectionMessageIterator class in the Python bindings. There is a small implementation detail change in TraceCollectionMessageIterator. Previously, we would do one `babeltrace.trace-info` query per port. With this patch, we pre-compute the intersection range for each source component port and store it in a port name to intersection mapping. When we create the trimmers, we just look up that value. It results in less queries and avoids computing the intersections many times. It is also in line with how the CLI does. Change-Id: Ic5ebded4d810ef992594941dde3316613909c98c Signed-off-by: Simon Marchi Reviewed-on: https://review.lttng.org/c/babeltrace/+/1994 Tested-by: jenkins Reviewed-by: Philippe Proulx --- .../bt2/trace_collection_message_iterator.py | 90 ++++----- src/cli/babeltrace2.c | 176 +++++++++++++----- src/plugins/ctf/fs-src/query.c | 21 --- .../src.ctf.fs/query/test_query_trace_info.py | 7 - 4 files changed, 174 insertions(+), 120 deletions(-) diff --git a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py index 74417776..4e7347a9 100644 --- a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py +++ b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py @@ -339,6 +339,47 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._build_graph() + def _compute_stream_intersections(self): + # Pre-compute the trimmer range to use for each port in the graph, when + # stream intersection mode is enabled. + self._stream_inter_port_to_range = {} + + for src_comp_and_spec in self._src_comps_and_specs: + try: + inputs = src_comp_and_spec.spec.params['inputs'] + except KeyError as e: + raise ValueError( + 'all source components must be created with an "inputs" parameter in stream intersection mode' + ) from e + + params = {'inputs': inputs} + + # query the port's component for the `babeltrace.trace-info` + # object which contains the range for each stream, from which we can + # compute the intersection of the streams in each trace. + query_exec = bt2.QueryExecutor( + src_comp_and_spec.spec.component_class, 'babeltrace.trace-info', params + ) + trace_infos = query_exec.query() + + for trace_info in trace_infos: + begin = max( + [stream['range-ns']['begin'] for stream in trace_info['streams']] + ) + end = min( + [stream['range-ns']['end'] for stream in trace_info['streams']] + ) + + # Each port associated to this trace will have this computed + # range. + for stream in trace_info['streams']: + # A port name is unique within a component, but not + # necessarily across all components. Use a component + # and port name pair to make it unique across the graph. + port_name = str(stream['port-name']) + key = (src_comp_and_spec.comp.addr, port_name) + self._stream_inter_port_to_range[key] = (begin, end) + def _validate_source_component_specs(self, comp_specs): for comp_spec in comp_specs: if ( @@ -367,49 +408,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): return msg def _create_stream_intersection_trimmer(self, component, port): - # find the original parameters specified by the user to create - # this port's component to get the `inputs` parameter - for src_comp_and_spec in self._src_comps_and_specs: - if component == src_comp_and_spec.comp: - break - - try: - inputs = src_comp_and_spec.spec.params['inputs'] - except Exception as e: - raise ValueError( - 'all source components must be created with an "inputs" parameter in stream intersection mode' - ) from e - - params = {'inputs': inputs} - - # query the port's component for the `babeltrace.trace-info` - # object which contains the stream intersection range for each - # exposed trace - query_exec = bt2.QueryExecutor( - src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params - ) - trace_info_res = query_exec.query() - begin = None - end = None - - # find the trace info for this port's trace - try: - for trace_info in trace_info_res: - for stream in trace_info['streams']: - if stream['port-name'] == port.name: - range_ns = trace_info['intersection-range-ns'] - begin = range_ns['begin'] - end = range_ns['end'] - break - except Exception: - pass - - if begin is None or end is None: - raise RuntimeError( - 'cannot find stream intersection range for port "{}"'.format(port.name) - ) - - name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) + key = (component.addr, port.name) + begin, end = self._stream_inter_port_to_range[key] + name = 'trimmer-{}-{}'.format(component.name, port.name) return self._create_trimmer(begin, end, name) def _create_muxer(self): @@ -574,6 +575,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): comp = self._create_comp(comp_spec) self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) + if self._stream_intersection_mode: + self._compute_stream_intersections() + # Now we connect the ports which exist at this point. We allow # self._graph_port_added() to automatically connect _new_ ports. self._connect_ports = True diff --git a/src/cli/babeltrace2.c b/src/cli/babeltrace2.c index 0b6f2341..82406a0b 100644 --- a/src/cli/babeltrace2.c +++ b/src/cli/babeltrace2.c @@ -1919,6 +1919,118 @@ end: return ret; } +/* + * Compute the intersection of all streams in the array `streams`, write it + * in `range`. + */ + +static +int compute_stream_intersection(const bt_value *streams, + struct trace_range *range) +{ + unsigned int i; + unsigned int stream_count; + int ret; + + BT_ASSERT(bt_value_get_type(streams) == BT_VALUE_TYPE_ARRAY); + + stream_count = bt_value_array_get_length(streams); + + BT_ASSERT(stream_count > 0); + + range->intersection_range_begin_ns = 0; + range->intersection_range_end_ns = UINT64_MAX; + + for (i = 0; i < stream_count; i++) { + int64_t begin_ns, end_ns; + uint64_t begin_ns_u, end_ns_u; + const bt_value *stream_value; + const bt_value *range_ns_value; + const bt_value *begin_value; + const bt_value *end_value; + + stream_value = bt_value_array_borrow_element_by_index_const(streams, i); + if (bt_value_get_type(stream_value) != BT_VALUE_TYPE_MAP) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "expected streams array element to be a map, got %s.", + bt_common_value_type_string(bt_value_get_type(stream_value))); + goto error; + } + + range_ns_value = bt_value_map_borrow_entry_value_const( + stream_value, "range-ns"); + if (!range_ns_value) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "missing expected `range-ns` key in stream map."); + goto error; + } + + if (bt_value_get_type(range_ns_value) != BT_VALUE_TYPE_MAP) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "expected `range-ns` entry value of stream map to be a map, got %s.", + bt_common_value_type_string(bt_value_get_type(range_ns_value))); + goto error; + } + + begin_value = bt_value_map_borrow_entry_value_const(range_ns_value, "begin"); + if (!begin_value) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "missing expected `begin` key in range-ns map."); + goto error; + } + + if (bt_value_get_type(begin_value) != BT_VALUE_TYPE_SIGNED_INTEGER) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "expected `begin` entry value of range-ns map to be a signed integer, got %s.", + bt_common_value_type_string(bt_value_get_type(range_ns_value))); + goto error; + } + + end_value = bt_value_map_borrow_entry_value_const(range_ns_value, "end"); + if (!end_value) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "missing expected `end` key in range-ns map."); + goto error; + } + + if (bt_value_get_type(end_value) != BT_VALUE_TYPE_SIGNED_INTEGER) { + BT_CLI_LOGE_APPEND_CAUSE("Unexpected format of `babeltrace.trace-info` query result: " + "expected `end` entry value of range-ns map to be a signed integer, got %s.", + bt_common_value_type_string(bt_value_get_type(range_ns_value))); + goto error; + } + + begin_ns = bt_value_integer_signed_get(begin_value); + end_ns = bt_value_integer_signed_get(end_value); + + if (begin_ns < 0 || end_ns < 0 || end_ns < begin_ns) { + BT_CLI_LOGE_APPEND_CAUSE( + "Invalid stream range values: " + "range-ns:begin=%" PRId64 ", " + "range-ns:end=%" PRId64, + begin_ns, end_ns); + ret = -1; + goto error; + } + + begin_ns_u = begin_ns; + end_ns_u = end_ns; + + range->intersection_range_begin_ns = + MAX(range->intersection_range_begin_ns, begin_ns_u); + range->intersection_range_end_ns = + MIN(range->intersection_range_end_ns, end_ns_u); + } + + ret = 0; + goto end; +error: + ret = -1; + +end: + return ret; +} + static int set_stream_intersections(struct cmd_run_ctx *ctx, struct bt_config_component *cfg_comp, @@ -1929,13 +2041,10 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, int64_t trace_count; const bt_value *query_result = NULL; const bt_value *trace_info = NULL; - const bt_value *intersection_range = NULL; - const bt_value *intersection_begin = NULL; - const bt_value *intersection_end = NULL; const bt_value *stream_infos = NULL; const bt_value *stream_info = NULL; struct port_id *port_id = NULL; - struct trace_range *trace_range = NULL; + struct trace_range *stream_intersection = NULL; const char *fail_reason = NULL; const bt_component_class *comp_cls = bt_component_class_source_as_component_class_const(src_comp_cls); @@ -1968,9 +2077,9 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, } for (trace_idx = 0; trace_idx < trace_count; trace_idx++) { - int64_t begin, end; uint64_t stream_idx; int64_t stream_count; + struct trace_range trace_intersection; trace_info = bt_value_array_borrow_element_by_index_const( query_result, trace_idx); @@ -1980,43 +2089,6 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, goto error; } - intersection_range = bt_value_map_borrow_entry_value_const( - trace_info, "intersection-range-ns"); - if (!intersection_range) { - ret = -1; - BT_LOGD_STR("Cannot retrieve \'intersetion-range-ns\' field from query result."); - goto error; - } - - intersection_begin = bt_value_map_borrow_entry_value_const(intersection_range, - "begin"); - if (!intersection_begin) { - ret = -1; - BT_LOGD_STR("Cannot retrieve intersection-range-ns \'begin\' field from query result."); - goto error; - } - - intersection_end = bt_value_map_borrow_entry_value_const(intersection_range, - "end"); - if (!intersection_end) { - ret = -1; - BT_LOGD_STR("Cannot retrieve intersection-range-ns \'end\' field from query result."); - goto error; - } - - begin = bt_value_integer_signed_get(intersection_begin); - end = bt_value_integer_signed_get(intersection_end); - - if (begin < 0 || end < 0 || end < begin) { - BT_CLI_LOGE_APPEND_CAUSE( - "Invalid trace stream intersection values: " - "intersection-range-ns:begin=%" PRId64 - ", intersection-range-ns:end=%" PRId64, - begin, end); - ret = -1; - goto error; - } - stream_infos = bt_value_map_borrow_entry_value_const(trace_info, "streams"); if (!stream_infos || !bt_value_is_array(stream_infos)) { @@ -2031,6 +2103,12 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, goto error; } + ret = compute_stream_intersection(stream_infos, &trace_intersection); + if (ret != 0) { + BT_CLI_LOGE_APPEND_CAUSE("Failed to compute trace streams intersection."); + goto error; + } + for (stream_idx = 0; stream_idx < stream_count; stream_idx++) { const bt_value *port_name; @@ -2049,15 +2127,15 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, goto error; } - trace_range = g_new0(struct trace_range, 1); - if (!trace_range) { + stream_intersection = g_new0(struct trace_range, 1); + if (!stream_intersection) { ret = -1; BT_CLI_LOGE_APPEND_CAUSE( "Cannot allocate memory for trace_range structure."); goto error; } - trace_range->intersection_range_begin_ns = begin; - trace_range->intersection_range_end_ns = end; + + *stream_intersection = trace_intersection; stream_info = bt_value_array_borrow_element_by_index_const( stream_infos, stream_idx); @@ -2086,10 +2164,10 @@ int set_stream_intersections(struct cmd_run_ctx *ctx, BT_LOGD("Inserting stream intersection "); - g_hash_table_insert(ctx->intersections, port_id, trace_range); + g_hash_table_insert(ctx->intersections, port_id, stream_intersection); port_id = NULL; - trace_range = NULL; + stream_intersection = NULL; } } @@ -2102,7 +2180,7 @@ error: end: bt_value_put_ref(query_result); g_free(port_id); - g_free(trace_range); + g_free(stream_intersection); return ret; } diff --git a/src/plugins/ctf/fs-src/query.c b/src/plugins/ctf/fs-src/query.c index 35e4ec97..cc58872c 100644 --- a/src/plugins/ctf/fs-src/query.c +++ b/src/plugins/ctf/fs-src/query.c @@ -285,11 +285,6 @@ int populate_trace_info(const struct ctf_fs_trace *trace, bt_value *trace_info) bt_value_map_insert_entry_status insert_status; bt_value_array_append_element_status append_status; bt_value *file_groups = NULL; - struct range trace_intersection = { - .begin_ns = 0, - .end_ns = INT64_MAX, - .set = false, - }; BT_ASSERT(trace->ds_file_groups); /* Add trace range info only if it contains streams. */ @@ -324,22 +319,6 @@ int populate_trace_info(const struct ctf_fs_trace *trace, bt_value *trace_info) if (ret) { goto end; } - - if (group_range.set) { - trace_intersection.begin_ns = MAX(trace_intersection.begin_ns, - group_range.begin_ns); - trace_intersection.end_ns = MIN(trace_intersection.end_ns, - group_range.end_ns); - trace_intersection.set = true; - } - } - - if (trace_intersection.begin_ns < trace_intersection.end_ns) { - ret = add_range(trace_info, &trace_intersection, - "intersection-range-ns"); - if (ret) { - goto end; - } } end: diff --git a/tests/plugins/src.ctf.fs/query/test_query_trace_info.py b/tests/plugins/src.ctf.fs/query/test_query_trace_info.py index 1c532bb5..52ce764c 100644 --- a/tests/plugins/src.ctf.fs/query/test_query_trace_info.py +++ b/tests/plugins/src.ctf.fs/query/test_query_trace_info.py @@ -39,13 +39,6 @@ class QueryTraceInfoClockOffsetTestCase(unittest.TestCase): ] def _check(self, trace, offset): - self.assertEqual( - trace['intersection-range-ns']['begin'], 13515309000000070 + offset - ) - self.assertEqual( - trace['intersection-range-ns']['end'], 13515309000000100 + offset - ) - streams = sorted(trace['streams'], key=sort_predictably) self.assertEqual(streams[0]['range-ns']['begin'], 13515309000000000 + offset) self.assertEqual(streams[0]['range-ns']['end'], 13515309000000100 + offset) -- 2.34.1