health_code_update();
- while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ for (;;) {
health_code_update();
/* Atomically prepare the queue futex */
futex_nto1_prepare(&viewer_conn_queue.futex);
+ if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ break;
+ }
+
do {
health_code_update();
{
int ret;
- if (trace->session->connection_closed
+ if ((trace->session->connection_closed || rstream->closed)
&& rstream->index_received_seqcount
== vstream->index_sent_seqcount) {
- /* Last index sent and session connection is closed. */
+ /*
+ * Last index sent and session connection or relay
+ * stream are closed.
+ */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
static
int viewer_get_packet(struct relay_connection *conn)
{
- int ret, send_data = 0;
- char *data = NULL;
- uint32_t len = 0;
- ssize_t read_len;
+ int ret;
+ char *reply = NULL;
struct lttng_viewer_get_packet get_packet_info;
- struct lttng_viewer_trace_packet reply;
+ struct lttng_viewer_trace_packet reply_header;
struct relay_viewer_stream *vstream = NULL;
+ uint32_t reply_size = sizeof(reply_header);
+ uint32_t packet_data_len = 0;
+ ssize_t read_len;
DBG2("Relay get data packet");
health_code_update();
/* From this point on, the error label can be reached. */
- memset(&reply, 0, sizeof(reply));
+ memset(&reply_header, 0, sizeof(reply_header));
vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
if (!vstream) {
DBG("Client requested packet of unknown stream id %" PRIu64,
be64toh(get_packet_info.stream_id));
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
goto send_reply_nolock;
+ } else {
+ packet_data_len = be32toh(get_packet_info.len);
+ reply_size += packet_data_len;
}
- pthread_mutex_lock(&vstream->stream->lock);
-
- len = be32toh(get_packet_info.len);
- data = zmalloc(len);
- if (!data) {
- PERROR("relay data zmalloc");
+ reply = zmalloc(reply_size);
+ if (!reply) {
+ PERROR("packet reply zmalloc");
+ reply_size = sizeof(reply_header);
goto error;
}
+ pthread_mutex_lock(&vstream->stream->lock);
ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
SEEK_SET);
if (ret < 0) {
be64toh(get_packet_info.offset));
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd, data, len);
- if (read_len < len) {
+ read_len = lttng_read(vstream->stream_fd->fd,
+ reply + sizeof(reply_header),
+ packet_data_len);
+ if (read_len < packet_data_len) {
PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
vstream->stream_fd->fd,
be64toh(get_packet_info.offset));
goto error;
}
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
- reply.len = htobe32(len);
- send_data = 1;
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+ reply_header.len = htobe32(packet_data_len);
goto send_reply;
error:
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
if (vstream) {
pthread_mutex_unlock(&vstream->stream->lock);
}
send_reply_nolock:
- reply.flags = htobe32(reply.flags);
health_code_update();
- ret = send_response(conn->sock, &reply, sizeof(reply));
- if (ret < 0) {
- goto end_free;
+ if (reply) {
+ memcpy(reply, &reply_header, sizeof(reply_header));
+ ret = send_response(conn->sock, reply, reply_size);
+ } else {
+ /* No reply to send. */
+ ret = send_response(conn->sock, &reply_header,
+ reply_size);
}
- health_code_update();
- if (send_data) {
- health_code_update();
- ret = send_response(conn->sock, data, len);
- if (ret < 0) {
- goto end_free;
- }
- health_code_update();
+ health_code_update();
+ if (ret < 0) {
+ PERROR("sendmsg of packet data failed");
+ goto end_free;
}
- DBG("Sent %u bytes for stream %" PRIu64, len,
+ DBG("Sent %u bytes for stream %" PRIu64, reply_size,
be64toh(get_packet_info.stream_id));
end_free:
- free(data);
+ free(reply);
end:
if (vstream) {
viewer_stream_put(vstream);