UST periodical metadata flush
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 5d9dc96c7925fd61d1efa44f43637e31df79b9e1..431b94626736d042d92db89db0774e7e941cc3e7 100644 (file)
 #include <inttypes.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <signal.h>
 
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/consumer-metadata-cache.h>
+#include <common/consumer-timer.h>
 
 #include "ust-consumer.h"
 
@@ -88,14 +91,14 @@ static int add_channel(struct lttng_consumer_channel *channel,
        if (ctx->on_recv_channel != NULL) {
                ret = ctx->on_recv_channel(channel);
                if (ret == 0) {
-                       ret = consumer_add_channel(channel);
+                       ret = consumer_add_channel(channel, ctx);
                } else if (ret < 0) {
                        /* Most likely an ENOMEM. */
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto error;
                }
        } else {
-               ret = consumer_add_channel(channel);
+               ret = consumer_add_channel(channel, ctx);
        }
 
        DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
@@ -368,11 +371,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
                goto error;
        }
 
-       ret = ustctl_stream_close_wakeup_fd(stream->ustream);
-       if (ret < 0) {
-               goto error;
-       }
-
 error:
        return ret;
 }
@@ -401,6 +399,11 @@ static int send_sessiond_channel(int sock,
                goto error;
        }
 
+       ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+       if (ret < 0) {
+               goto error;
+       }
+
        /* The channel was sent successfully to the sessiond at this point. */
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
@@ -476,6 +479,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
                goto error;
        }
 
+       channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+       if (ret < 0) {
+               goto error;
+       }
+
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
@@ -524,10 +533,12 @@ error:
 /*
  * Write metadata to the given channel using ustctl to convert the string to
  * the ringbuffer.
+ * Called only from consumer_metadata_cache_write.
+ * The metadata cache lock MUST be acquired to write in the cache.
  *
  * Return 0 on success else a negative value.
  */
-static int push_metadata(struct lttng_consumer_channel *metadata,
+int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata,
                const char *metadata_str, uint64_t target_offset, uint64_t len)
 {
        int ret;
@@ -537,13 +548,13 @@ static int push_metadata(struct lttng_consumer_channel *metadata,
 
        DBG("UST consumer writing metadata to channel %s", metadata->name);
 
-       assert(target_offset == metadata->contig_metadata_written);
-       ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+       assert(target_offset <= metadata->metadata_cache->max_offset);
+       ret = ustctl_write_metadata_to_channel(metadata->uchan,
+                       metadata_str + target_offset, len);
        if (ret < 0) {
                ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
                goto error;
        }
-       metadata->contig_metadata_written += len;
 
        ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
 
@@ -551,6 +562,43 @@ error:
        return ret;
 }
 
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer flush channel key %lu", chan_key);
+
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer flush channel %lu not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, flush it. */
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+                       ustctl_flush_buffer(stream->ustream, 1);
+       }
+       rcu_read_unlock();
+
+error:
+       return ret;
+}
+
 /*
  * Close metadata stream wakeup_fd using the given key to retrieve the channel.
  *
@@ -576,6 +624,11 @@ static int close_metadata(uint64_t chan_key)
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                goto error;
        }
+       if (channel->switch_timer_enabled == 1) {
+               DBG("Deleting timer on metadata channel");
+               consumer_timer_switch_stop(channel);
+       }
+       consumer_metadata_cache_destroy(channel);
 
 error:
        return ret;
@@ -635,6 +688,51 @@ error:
        return ret;
 }
 
+/*
+ * Receive the metadata updates from the sessiond.
+ */
+int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
+               uint64_t len, struct lttng_consumer_channel *channel)
+{
+       int ret, ret_code = LTTNG_OK;
+       char *metadata_str;
+
+       DBG("UST consumer push metadata key %lu of len %lu", key, len);
+
+       metadata_str = zmalloc(len * sizeof(char));
+       if (!metadata_str) {
+               PERROR("zmalloc metadata string");
+               ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+               goto end;
+       }
+
+       /* Receive metadata string. */
+       ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+       if (ret < 0) {
+               /* Session daemon is dead so return gracefully. */
+               ret_code = ret;
+               goto end_free;
+       }
+
+       pthread_mutex_lock(&channel->metadata_cache->lock);
+       ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+       if (ret < 0) {
+               /* Unable to handle metadata. Notify session daemon. */
+               ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+       }
+       pthread_mutex_unlock(&channel->metadata_cache->lock);
+
+       while (consumer_metadata_cache_flushed(channel, offset + len)) {
+               DBG("Waiting for metadata to be flushed");
+               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
+       }
+
+end_free:
+       free(metadata_str);
+end:
+       return ret_code;
+}
+
 /*
  * Receive command from session daemon and process it.
  *
@@ -761,6 +859,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.overwrite = msg.u.ask_channel.overwrite;
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+               attr.chan_id = msg.u.ask_channel.chan_id;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
                /* Translate the event output type to UST. */
@@ -803,6 +902,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_channel_error;
                }
 
+
                /*
                 * Channel and streams are now created. Inform the session daemon that
                 * everything went well and should wait to receive the channel and
@@ -817,6 +917,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+                       ret = consumer_metadata_cache_allocate(channel);
+                       if (ret < 0) {
+                               ERR("Allocating metadata cache");
+                               goto end_channel_error;
+                       }
+                       consumer_timer_switch_start(channel, attr.switch_timer_interval);
+                       attr.switch_timer_interval = 0;
+               }
+
                break;
        }
        case LTTNG_CONSUMER_GET_CHANNEL:
@@ -898,14 +1008,24 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_FLUSH_CHANNEL:
+       {
+               int ret;
+
+               ret = flush_channel(msg.u.flush_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
                uint64_t len = msg.u.push_metadata.len;
-               uint64_t target_offset = msg.u.push_metadata.target_offset;
                uint64_t key = msg.u.push_metadata.key;
+               uint64_t offset = msg.u.push_metadata.target_offset;
                struct lttng_consumer_channel *channel;
-               char *metadata_str;
 
                DBG("UST consumer push metadata key %lu of len %lu", key, len);
 
@@ -915,13 +1035,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
                }
 
-               metadata_str = zmalloc(len * sizeof(char));
-               if (!metadata_str) {
-                       PERROR("zmalloc metadata string");
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto end_msg_sessiond;
-               }
-
                /* Tell session daemon we are ready to receive the metadata. */
                ret = consumer_send_status_msg(sock, LTTNG_OK);
                if (ret < 0) {
@@ -934,22 +1047,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               /* Receive metadata string. */
-               ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+               ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
+                               len, channel);
                if (ret < 0) {
-                       /* Session daemon is dead so return gracefully. */
+                       /* error receiving from sessiond */
                        goto end_nosignal;
-               }
-
-               ret = push_metadata(channel, metadata_str, target_offset, len);
-               free(metadata_str);
-               if (ret < 0) {
-                       /* Unable to handle metadata. Notify session daemon. */
-                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               } else {
+                       ret_code = ret;
                        goto end_msg_sessiond;
                }
-
-               goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_SETUP_METADATA:
        {
@@ -1167,6 +1273,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
+
 end:
        return ret;
 }
@@ -1277,3 +1384,106 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
        }
        rcu_read_unlock();
 }
+
+void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+       if (ret < 0) {
+               ERR("Unable to close wakeup fd");
+       }
+}
+
+int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *channel)
+{
+       struct lttcomm_metadata_request_msg request;
+       struct lttcomm_consumer_msg msg;
+       enum lttng_error_code ret_code = LTTNG_OK;
+       uint64_t len, key, offset;
+       int ret;
+
+       assert(channel);
+       assert(channel->metadata_cache);
+
+       /* send the metadata request to sessiond */
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER64_UST:
+               request.bits_per_long = 64;
+               break;
+       case LTTNG_CONSUMER32_UST:
+               request.bits_per_long = 32;
+               break;
+       default:
+               request.bits_per_long = 0;
+               break;
+       }
+
+       request.session_id = channel->session_id;
+       request.uid = channel->uid;
+       request.key = channel->key;
+       DBG("Sending metadata request to sessiond, session %" PRIu64,
+                       channel->session_id);
+
+       ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
+                       sizeof(request));
+       if (ret < 0) {
+               ERR("Asking metadata to sessiond");
+               goto end;
+       }
+
+       /* Receive the metadata from sessiond */
+       ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
+                       sizeof(msg));
+       if (ret != sizeof(msg)) {
+               DBG("Consumer received unexpected message size %d (expects %lu)",
+                       ret, sizeof(msg));
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               /*
+                * The ret value might 0 meaning an orderly shutdown but this is ok
+                * since the caller handles this.
+                */
+               goto end;
+       }
+
+       if (msg.cmd_type == LTTNG_ERR_UND) {
+               /* No registry found */
+               (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
+                               ret_code);
+               ret = 0;
+               goto end;
+       } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
+               ERR("Unexpected cmd_type received %d", msg.cmd_type);
+               ret = -1;
+               goto end;
+       }
+
+       len = msg.u.push_metadata.len;
+       key = msg.u.push_metadata.key;
+       offset = msg.u.push_metadata.target_offset;
+
+       assert(key == channel->key);
+       if (len == 0) {
+               DBG("No new metadata to receive for key %" PRIu64, key);
+       }
+
+       /* Tell session daemon we are ready to receive the metadata. */
+       ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
+                       LTTNG_OK);
+       if (ret < 0 || len == 0) {
+               /*
+                * Somehow, the session daemon is not responding anymore or there is
+                * nothing to receive.
+                */
+               goto end;
+       }
+
+       ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                       key, offset, len, channel);
+       (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+       ret = 0;
+
+end:
+       return ret;
+}
This page took 0.029706 seconds and 5 git commands to generate.