X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=93d458b1f0f24e813f68b7f1ac312fa6ffdc335c;hb=72f11bd7fc4530cd73cd26ee4aa1196d2b2774bd;hp=d29804cd624f03795e78b706e11af58f7b3b8ebc;hpb=878c34cf4708a7b7d52d4e2ea1bdda853ba6a790;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index d29804cd6..93d458b1f 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -557,11 +559,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); } @@ -600,7 +603,7 @@ void *thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *conn = NULL; DBG("[thread] Live viewer relay dispatcher started"); @@ -623,7 +626,8 @@ void *thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head, + &viewer_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the live-viewer " "relay command queue"); @@ -800,14 +804,9 @@ int viewer_list_sessions(struct relay_connection *conn) } health_code_update(); - rcu_read_unlock(); ret = 0; - goto end; - end_unlock: rcu_read_unlock(); - -end: return ret; } @@ -931,6 +930,8 @@ int viewer_get_new_streams(struct relay_connection *conn) health_code_update(); + memset(&response, 0, sizeof(response)); + rcu_read_lock(); session = session_find_by_id(conn->sessions_ht, session_id); if (!session) { @@ -1032,6 +1033,8 @@ int viewer_attach_session(struct relay_connection *conn) health_code_update(); + memset(&response, 0, sizeof(response)); + if (!conn->viewer_session) { DBG("Client trying to attach before creating a live viewer session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION); @@ -1201,6 +1204,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, */ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); index->timestamp_end = htobe64(rstream->beacon_ts_end); + index->stream_id = htobe64(rstream->ctf_stream_id); goto index_ready; } else if (rstream->total_index_received <= vstream->last_sent_index && !vstream->close_write_flag) { @@ -1314,7 +1318,7 @@ int viewer_get_next_index(struct relay_connection *conn) ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (ret < 0) { - goto end; + goto end_unlock; } else if (ret == 1) { /* * This means the viewer index data structure has been populated by the @@ -1337,11 +1341,13 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); pthread_mutex_lock(&vstream->overwrite_lock); if (vstream->abort_flag) { /* The file is being overwritten by the writer, we cannot use it. */ pthread_mutex_unlock(&vstream->overwrite_lock); ret = viewer_stream_rotate(vstream, rstream); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (ret < 0) { goto end_unlock; } else if (ret == 1) { @@ -1357,6 +1363,7 @@ int viewer_get_next_index(struct relay_connection *conn) read_ret = lttng_read(vstream->index_read_fd, &packet_index, sizeof(packet_index)); pthread_mutex_unlock(&vstream->overwrite_lock); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (read_ret < 0) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); @@ -1615,6 +1622,8 @@ int viewer_get_metadata(struct relay_connection *conn) } health_code_update(); + memset(&reply, 0, sizeof(reply)); + rcu_read_lock(); stream = viewer_stream_find_by_id(be64toh(request.stream_id)); if (!stream || !stream->metadata_flag) { @@ -1715,6 +1724,7 @@ int viewer_create_session(struct relay_connection *conn) DBG("Viewer create session received"); + memset(&resp, 0, sizeof(resp)); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); conn->viewer_session = zmalloc(sizeof(*conn->viewer_session)); if (!conn->viewer_session) { @@ -1746,6 +1756,7 @@ void live_relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; + memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_ERR_UNK); (void) send_response(conn->sock, &reply, sizeof(reply)); } @@ -2100,7 +2111,7 @@ int live_start_threads(struct lttng_uri *uri, } /* Init relay command queue. */ - cds_wfq_init(&viewer_conn_queue.queue); + cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail); /* Set up max poll set size */ lttng_poll_set_max_size();