X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=0fd41f1a804f61c2e0582e4bb5b6118bdac831a4;hp=61acafec2098ecd70a5b1889309992e858a0e3cb;hb=7e0a437914bdad867b9cdfa61d7cba163dfc4c8b;hpb=797bc362b6845f7e8f50922f53fc4683c573fc55 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 61acafec2..0fd41f1a8 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -452,10 +452,11 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri) if (ret < 0) { goto error; } - DBG("Listening on sock %d for live", sock->fd); + DBG("Listening on sock %d for lttng-live", sock->fd); ret = sock->ops->bind(sock); if (ret < 0) { + PERROR("Failed to bind lttng-live socket"); goto error; } @@ -654,12 +655,16 @@ void *thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&viewer_conn_queue.futex); + if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -830,6 +835,11 @@ int viewer_list_sessions(struct relay_connection *conn) health_code_update(); + if (session->connection_closed) { + /* Skip closed session */ + continue; + } + if (count >= buf_count) { struct lttng_viewer_session *newbuf; uint32_t new_buf_count = buf_count << 1; @@ -1034,12 +1044,12 @@ int viewer_attach_session(struct relay_connection *conn) session = session_get_by_id(be64toh(request.session_id)); if (!session) { DBG("Relay session %" PRIu64 " not found", - be64toh(request.session_id)); + (uint64_t) be64toh(request.session_id)); response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); goto send_reply; } DBG("Attach session ID %" PRIu64 " received", - be64toh(request.session_id)); + (uint64_t) be64toh(request.session_id)); if (session->live_timer == 0) { DBG("Not live session"); @@ -1084,7 +1094,7 @@ int viewer_attach_session(struct relay_connection *conn) if (closed) { send_streams = 0; response.streams_count = 0; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); + response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); goto send_reply; } @@ -1296,7 +1306,7 @@ int viewer_get_next_index(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { DBG("Client requested index of unknown stream id %" PRIu64, - be64toh(request_index.stream_id)); + (uint64_t) be64toh(request_index.stream_id)); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } @@ -1410,7 +1420,7 @@ int viewer_get_next_index(struct relay_connection *conn) */ DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64, rstream->stream_handle, - be64toh(packet_index.offset)); + (uint64_t) be64toh(packet_index.offset)); viewer_index.offset = packet_index.offset; viewer_index.packet_size = packet_index.packet_size; viewer_index.content_size = packet_index.content_size; @@ -1478,13 +1488,15 @@ error_put: static int viewer_get_packet(struct relay_connection *conn) { - int ret, send_data = 0; - char *data = NULL; - uint32_t len = 0; - ssize_t read_len; + int ret; + off_t lseek_ret; + char *reply = NULL; struct lttng_viewer_get_packet get_packet_info; - struct lttng_viewer_trace_packet reply; + struct lttng_viewer_trace_packet reply_header; struct relay_viewer_stream *vstream = NULL; + uint32_t reply_size = sizeof(reply_header); + uint32_t packet_data_len = 0; + ssize_t read_len; DBG2("Relay get data packet"); @@ -1498,76 +1510,78 @@ int viewer_get_packet(struct relay_connection *conn) health_code_update(); /* From this point on, the error label can be reached. */ - memset(&reply, 0, sizeof(reply)); + memset(&reply_header, 0, sizeof(reply_header)); vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id)); if (!vstream) { DBG("Client requested packet of unknown stream id %" PRIu64, - be64toh(get_packet_info.stream_id)); - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + (uint64_t) be64toh(get_packet_info.stream_id)); + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); goto send_reply_nolock; + } else { + packet_data_len = be32toh(get_packet_info.len); + reply_size += packet_data_len; } - pthread_mutex_lock(&vstream->stream->lock); - - len = be32toh(get_packet_info.len); - data = zmalloc(len); - if (!data) { - PERROR("relay data zmalloc"); + reply = zmalloc(reply_size); + if (!reply) { + PERROR("packet reply zmalloc"); + reply_size = sizeof(reply_header); goto error; } - ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), + pthread_mutex_lock(&vstream->stream->lock); + lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), SEEK_SET); - if (ret < 0) { + if (lseek_ret < 0) { PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd, - be64toh(get_packet_info.offset)); + (uint64_t) be64toh(get_packet_info.offset)); goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); - if (read_len < len) { + read_len = lttng_read(vstream->stream_fd->fd, + reply + sizeof(reply_header), + packet_data_len); + if (read_len < packet_data_len) { PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, vstream->stream_fd->fd, - be64toh(get_packet_info.offset)); + (uint64_t) be64toh(get_packet_info.offset)); goto error; } - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); - reply.len = htobe32(len); - send_data = 1; + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); + reply_header.len = htobe32(packet_data_len); goto send_reply; error: - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: if (vstream) { pthread_mutex_unlock(&vstream->stream->lock); } send_reply_nolock: - reply.flags = htobe32(reply.flags); health_code_update(); - ret = send_response(conn->sock, &reply, sizeof(reply)); - if (ret < 0) { - goto end_free; + if (reply) { + memcpy(reply, &reply_header, sizeof(reply_header)); + ret = send_response(conn->sock, reply, reply_size); + } else { + /* No reply to send. */ + ret = send_response(conn->sock, &reply_header, + reply_size); } - health_code_update(); - if (send_data) { - health_code_update(); - ret = send_response(conn->sock, data, len); - if (ret < 0) { - goto end_free; - } - health_code_update(); + health_code_update(); + if (ret < 0) { + PERROR("sendmsg of packet data failed"); + goto end_free; } - DBG("Sent %u bytes for stream %" PRIu64, len, - be64toh(get_packet_info.stream_id)); + DBG("Sent %u bytes for stream %" PRIu64, reply_size, + (uint64_t) be64toh(get_packet_info.stream_id)); end_free: - free(data); + free(reply); end: if (vstream) { viewer_stream_put(vstream); @@ -1616,7 +1630,7 @@ int viewer_get_metadata(struct relay_connection *conn) * find it. */ DBG("Client requested metadata of unknown stream id %" PRIu64, - be64toh(request.stream_id)); + (uint64_t) be64toh(request.stream_id)); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } @@ -1702,7 +1716,7 @@ send_reply: } DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len, - be64toh(request.stream_id)); + (uint64_t) be64toh(request.stream_id)); DBG("Metadata sent"); @@ -1791,7 +1805,7 @@ int viewer_detach_session(struct relay_connection *conn) session = session_get_by_id(be64toh(request.session_id)); if (!session) { DBG("Relay session %" PRIu64 " not found", - be64toh(request.session_id)); + (uint64_t) be64toh(request.session_id)); response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK); goto send_reply; } @@ -2066,7 +2080,7 @@ exit: error: lttng_poll_clean(&events); - /* Cleanup reamaining connection object. */ + /* Cleanup remaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter, destroy_conn,