Output a warning if packets are lost
[babeltrace.git] / formats / ctf / ctf.c
index 40da01da2d57619d1257fc42a4666923afaaeaa2..a617497e05b719d544e5d662137632d4cb3d3718 100644 (file)
@@ -53,6 +53,7 @@
 #include "metadata/ctf-ast.h"
 #include "events-private.h"
 #include <babeltrace/compat/memstream.h>
+#include <babeltrace/compat/fcntl.h>
 
 #define LOG2_CHAR_BIT  3
 
@@ -420,16 +421,26 @@ void print_uuid(FILE *fp, unsigned char *uuid)
  * Given we have discarded counters of those two types merged into the
  * events_discarded counter, we need to use the union of those ranges:
  *   [ prev_timestamp_end, timestamp_end ]
+ *
+ * Lost packets occur if the tracer overwrote some subbuffer(s) before the
+ * consumer had time to extract them. We keep track of those gaps with the
+ * packet sequence number in each packet.
  */
 static
-void ctf_print_discarded(FILE *fp, struct ctf_stream_definition *stream)
+void ctf_print_discarded_lost(FILE *fp, struct ctf_stream_definition *stream)
 {
-       if (!stream->events_discarded || !babeltrace_ctf_console_output) {
+       if ((!stream->events_discarded && !stream->packets_lost) ||
+                       !babeltrace_ctf_console_output) {
                return;
        }
        fflush(stdout);
-       fprintf(fp, "[warning] Tracer discarded %" PRIu64 " events between [",
-               stream->events_discarded);
+       if (stream->events_discarded) {
+               fprintf(fp, "[warning] Tracer discarded %" PRIu64 " events between [",
+                               stream->events_discarded);
+       } else if (stream->packets_lost) {
+               fprintf(fp, "[warning] Tracer lost %" PRIu64 " trace packets between [",
+                               stream->packets_lost);
+       }
        if (opt_clock_cycles) {
                ctf_print_timestamp(fp, stream,
                                stream->prev.cycles.end);
@@ -768,8 +779,6 @@ int ctf_init_pos(struct ctf_stream_pos *pos, struct bt_trace_descriptor *trace,
                pos->parent.rw_table = write_dispatch_table;
                pos->parent.event_cb = ctf_write_event;
                pos->parent.trace = trace;
-               if (fd >= 0)
-                       ctf_packet_seek(&pos->parent, 0, SEEK_SET);     /* position for write */
                break;
        default:
                assert(0);
@@ -802,6 +811,7 @@ void ctf_update_current_packet_index(struct ctf_stream_definition *stream,
                struct packet_index *cur_index)
 {
        uint64_t events_discarded_diff;
+       uint64_t packets_lost_diff = 0;
 
        /* Update packet index time information */
 
@@ -829,6 +839,11 @@ void ctf_update_current_packet_index(struct ctf_stream_definition *stream,
                        prev_index->ts_real.timestamp_end;
 
                events_discarded_diff -= prev_index->events_discarded;
+               /* packet_seq_num stays at 0 if not produced by the tracer */
+               if (cur_index->packet_seq_num) {
+                       packets_lost_diff = cur_index->packet_seq_num -
+                               prev_index->packet_seq_num - 1;
+               }
                /*
                 * Deal with 32-bit wrap-around if the tracer provided a
                 * 32-bit field.
@@ -849,6 +864,7 @@ void ctf_update_current_packet_index(struct ctf_stream_definition *stream,
                                stream->current.real.begin;
        }
        stream->events_discarded = events_discarded_diff;
+       stream->packets_lost = packets_lost_diff;
 }
 
 /*
@@ -862,7 +878,6 @@ void ctf_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence)
        struct ctf_file_stream *file_stream =
                container_of(pos, struct ctf_file_stream, pos);
        int ret;
-       off_t off;
        struct packet_index *packet_index, *prev_index;
 
        switch (whence) {
@@ -906,9 +921,11 @@ void ctf_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence)
                }
                pos->content_size = -1U;        /* Unknown at this point */
                pos->packet_size = WRITE_PACKET_LEN;
-               off = posix_fallocate(pos->fd, pos->mmap_offset,
-                                     pos->packet_size / CHAR_BIT);
-               assert(off >= 0);
+               do {
+                       ret = bt_posix_fallocate(pos->fd, pos->mmap_offset,
+                                             pos->packet_size / CHAR_BIT);
+               } while (ret == EINTR);
+               assert(ret == 0);
                pos->offset = 0;
        } else {
        read_next_packet:
@@ -958,7 +975,7 @@ void ctf_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence)
                 * timestamps.
                 */
                if ((&file_stream->parent)->stream_class->trace->parent.collection) {
-                       ctf_print_discarded(stderr, &file_stream->parent);
+                       ctf_print_discarded_lost(stderr, &file_stream->parent);
                }
 
                packet_index = &g_array_index(pos->packet_index,
@@ -1109,7 +1126,7 @@ int ctf_trace_metadata_packet_read(struct ctf_trace *td, FILE *in,
                memcpy(td->uuid, header.uuid, sizeof(header.uuid));
                CTF_TRACE_SET_FIELD(td, uuid);
        } else {
-               if (babeltrace_uuid_compare(header.uuid, td->uuid))
+               if (bt_uuid_compare(header.uuid, td->uuid))
                        return -EINVAL;
        }
 
@@ -1510,6 +1527,7 @@ int create_stream_one_packet_index(struct ctf_stream_pos *pos,
        int ret;
 
 begin:
+       memset(&packet_index, 0, sizeof(packet_index));
        if (!pos->mmap_offset) {
                first_packet = 1;
        }
@@ -1541,14 +1559,6 @@ begin:
        pos->offset = 0;        /* Position of the packet header */
 
        packet_index.offset = pos->mmap_offset;
-       packet_index.content_size = 0;
-       packet_index.packet_size = 0;
-       packet_index.ts_real.timestamp_begin = 0;
-       packet_index.ts_real.timestamp_end = 0;
-       packet_index.ts_cycles.timestamp_begin = 0;
-       packet_index.ts_cycles.timestamp_end = 0;
-       packet_index.events_discarded = 0;
-       packet_index.events_discarded_len = 0;
 
        /* read and check header, set stream id (and check) */
        if (file_stream->parent.trace_packet_header) {
@@ -1595,7 +1605,7 @@ begin:
                                elem = bt_array_index(defarray, i);
                                uuidval[i] = bt_get_unsigned_int(elem);
                        }
-                       ret = babeltrace_uuid_compare(td->uuid, uuidval);
+                       ret = bt_uuid_compare(td->uuid, uuidval);
                        if (ret) {
                                fprintf(stderr, "[error] Unique Universal Identifiers do not match.\n");
                                return -EINVAL;
@@ -1695,6 +1705,19 @@ begin:
                        packet_index.events_discarded = bt_get_unsigned_int(field);
                        packet_index.events_discarded_len = bt_get_int_len(field);
                }
+
+               /* read packet_seq_num from header */
+               len_index = bt_struct_declaration_lookup_field_index(
+                               file_stream->parent.stream_packet_context->declaration,
+                               g_quark_from_static_string("packet_seq_num"));
+               if (len_index >= 0) {
+                       struct bt_definition *field;
+
+                       field = bt_struct_definition_get_field_from_index(
+                                       file_stream->parent.stream_packet_context,
+                                       len_index);
+                       packet_index.packet_seq_num = bt_get_unsigned_int(field);
+               }
        } else {
                /* Use file size for packet size */
                packet_index.packet_size = filesize * CHAR_BIT;
@@ -1835,7 +1858,7 @@ int import_stream_packet_index(struct ctf_trace *td,
        struct ctf_packet_index *ctf_index = NULL;
        struct ctf_packet_index_file_hdr index_hdr;
        struct packet_index index;
-       uint32_t packet_index_len;
+       uint32_t packet_index_len, index_minor;
        int ret = 0;
        int first_packet = 1;
        size_t len;
@@ -1855,14 +1878,16 @@ int import_stream_packet_index(struct ctf_trace *td,
                goto error;
        }
        if (be32toh(index_hdr.index_major) != CTF_INDEX_MAJOR) {
-               fprintf(stderr, "[error] Incompatible index file %" PRIu64
-                               ".%" PRIu64 ", supported %d.%d\n",
-                               be64toh(index_hdr.index_major),
-                               be64toh(index_hdr.index_minor), CTF_INDEX_MAJOR,
+               fprintf(stderr, "[error] Incompatible index file %" PRIu32
+                               ".%" PRIu32 ", supported %d.%d\n",
+                               be32toh(index_hdr.index_major),
+                               be32toh(index_hdr.index_minor), CTF_INDEX_MAJOR,
                                CTF_INDEX_MINOR);
                ret = -1;
                goto error;
        }
+       index_minor = be32toh(index_hdr.index_minor);
+
        packet_index_len = be32toh(index_hdr.packet_index_len);
        if (packet_index_len == 0) {
                fprintf(stderr, "[error] Packet index length cannot be 0.\n");
@@ -1889,6 +1914,10 @@ int import_stream_packet_index(struct ctf_trace *td,
                index.events_discarded_len = 64;
                index.data_offset = -1;
                stream_id = be64toh(ctf_index->stream_id);
+               if (index_minor >= 1) {
+                       index.stream_instance_id = be64toh(ctf_index->stream_instance_id);
+                       index.packet_seq_num = be64toh(ctf_index->packet_seq_num);
+               }
 
                if (!first_packet) {
                        /* add index to packet array */
@@ -2009,7 +2038,7 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
        snprintf(index_name, strlen(path) + sizeof(INDEX_PATH),
                        INDEX_PATH, path);
 
-       if (faccessat(td->dirfd, index_name, O_RDONLY, flags) < 0) {
+       if (bt_faccessat(td->dirfd, td->parent.path, index_name, O_RDONLY, 0) < 0) {
                ret = create_stream_packet_index(td, file_stream);
                if (ret) {
                        fprintf(stderr, "[error] Stream index creation error.\n");
This page took 0.025927 seconds and 4 git commands to generate.