X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=8c15c7e135016098f3cb90a94923ec9eb3fc9d93;hb=a500c2570f64a51f0f22d7eb2c0555dd58786521;hp=5581dce191dc760e324a966c331e14b88bc7fc45;hpb=0f907de1f20c91a2bd1a08ee4d50332d1958754b;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5581dce19..8c15c7e13 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -556,7 +556,7 @@ int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata, ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str + target_offset, len); if (ret < 0) { - ERR("ustctl write metadata fail with ret %d, len %ld", ret, len); + ERR("ustctl write metadata fail with ret %d, len %" PRIu64, ret, len); goto error; } @@ -579,11 +579,12 @@ static int flush_channel(uint64_t chan_key) struct lttng_ht *ht; struct lttng_ht_iter iter; - DBG("UST consumer flush channel key %lu", chan_key); + DBG("UST consumer flush channel key %" PRIu64, chan_key); + rcu_read_lock(); channel = consumer_find_channel(chan_key); if (!channel) { - ERR("UST consumer flush channel %lu not found", chan_key); + ERR("UST consumer flush channel %" PRIu64 " not found", chan_key); ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto error; } @@ -591,20 +592,19 @@ static int flush_channel(uint64_t chan_key) ht = consumer_data.stream_per_chan_id_ht; /* For each stream of the channel id, flush it. */ - rcu_read_lock(); cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { ustctl_flush_buffer(stream->ustream, 1); } - rcu_read_unlock(); - error: + rcu_read_unlock(); return ret; } /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. + * RCU read side lock MUST be acquired before calling this function. * * Return 0 on success else an LTTng error code. */ @@ -613,11 +613,11 @@ static int close_metadata(uint64_t chan_key) int ret; struct lttng_consumer_channel *channel; - DBG("UST consumer close metadata key %lu", chan_key); + DBG("UST consumer close metadata key %" PRIu64, chan_key); channel = consumer_find_channel(chan_key); if (!channel) { - ERR("UST consumer close metadata %lu not found", chan_key); + ERR("UST consumer close metadata %" PRIu64 " not found", chan_key); ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto error; } @@ -648,7 +648,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) int ret; struct lttng_consumer_channel *metadata; - DBG("UST consumer setup metadata key %lu", key); + DBG("UST consumer setup metadata key %" PRIu64, key); metadata = consumer_find_channel(key); if (!metadata) { @@ -701,7 +701,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, int ret, ret_code = LTTNG_OK; char *metadata_str; - DBG("UST consumer push metadata key %lu of len %lu", key, len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); metadata_str = zmalloc(len * sizeof(char)); if (!metadata_str) { @@ -883,6 +883,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_UST_CHAN_PER_CPU: channel->type = CONSUMER_CHANNEL_TYPE_DATA; attr.type = LTTNG_UST_CHAN_PER_CPU; + /* + * Set refcount to 1 for owner. Below, we will + * pass ownership to the + * consumer_thread_channel_poll() thread. + */ + channel->refcount = 1; break; case LTTNG_UST_CHAN_METADATA: channel->type = CONSUMER_CHANNEL_TYPE_METADATA; @@ -898,10 +904,22 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } + if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + ret = consumer_metadata_cache_allocate(channel); + if (ret < 0) { + ERR("Allocating metadata cache"); + goto end_channel_error; + } + consumer_timer_switch_start(channel, attr.switch_timer_interval); + attr.switch_timer_interval = 0; + } + /* * Add the channel to the internal state AFTER all streams were created * and successfully sent to session daemon. This way, all streams must * be ready before this channel is visible to the threads. + * If add_channel succeeds, ownership of the channel is + * passed to consumer_thread_channel_poll(). */ ret = add_channel(channel, ctx); if (ret < 0) { @@ -923,16 +941,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { - ret = consumer_metadata_cache_allocate(channel); - if (ret < 0) { - ERR("Allocating metadata cache"); - goto end_channel_error; - } - consumer_timer_switch_start(channel, attr.switch_timer_interval); - attr.switch_timer_interval = 0; - } - break; } case LTTNG_CONSUMER_GET_CHANNEL: @@ -943,7 +951,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer get channel key %lu not found", key); + ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -994,7 +1002,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer get channel key %lu not found", key); + ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -1033,12 +1041,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, uint64_t offset = msg.u.push_metadata.target_offset; struct lttng_consumer_channel *channel; - DBG("UST consumer push metadata key %lu of len %lu", key, len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, + len); channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer push metadata %lu not found", key); + ERR("UST consumer push metadata %" PRIu64 " not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto end_msg_sessiond; } /* Tell session daemon we are ready to receive the metadata. */ @@ -1274,7 +1284,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * happen and it is OK with the code flow. */ DBG("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", + "(ret: %ld != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); } err = ustctl_put_next_subbuf(ustream); @@ -1433,7 +1443,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - DBG("Consumer received unexpected message size %d (expects %lu)", + DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg)); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /*