free(session);
}
+static void close_stream(struct relay_stream *stream,
+ struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht)
+{
+ int delret;
+ struct relay_viewer_stream *vstream;
+ struct lttng_ht_iter iter;
+
+ assert(stream);
+ assert(viewer_streams_ht);
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle,
+ viewer_streams_ht);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ vstream->total_index_received = stream->total_index_received;
+ }
+
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!delret);
+ iter.iter.node = &stream->ctf_trace_node.node;
+ delret = lttng_ht_del(ctf_traces_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node, deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_close_stream stream_info;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- int ret, send_ret;
- struct lttng_ht_iter iter;
DBG("Close stream received");
stream->close_flag = 1;
if (close_stream_check(stream)) {
- int delret;
- struct relay_viewer_stream *vstream;
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
-
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
- }
-
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- vstream->total_index_received = stream->total_index_received;
- }
-
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!delret);
- iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(cmd->ctf_traces_ht, &iter);
- assert(!delret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+ struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- int cret;
- struct lttng_ht_iter iter;
-
- cret = close(stream->fd);
- if (cret < 0) {
- PERROR("close stream process data");
- }
-
- cret = close(stream->index_fd);
- if (cret < 0) {
- PERROR("close stream index_fd");
- }
- iter.iter.node = &stream->stream_n.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d after recv data", stream->fd);
+ close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht);
}
end_rcu_unlock:
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection, indexes_ht,
+ relay_ctx->viewer_streams_ht);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
&iter, relay_connection, sessions_ht);
}
}
- rcu_read_unlock();
error_poll_create:
- lttng_ht_destroy(indexes_ht);
+ {
+ struct relay_index *index;
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
+ relay_index_delete(index, indexes_ht);
+ }
+ lttng_ht_destroy(indexes_ht);
+ }
+ rcu_read_unlock();
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error: