Fix: Relay daemon ownership and reference counting
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #define _LGPL_SOURCE
22 #include <assert.h>
23
24 #include <common/common.h>
25 #include <common/utils.h>
26
27 #include "lttng-relayd.h"
28 #include "stream.h"
29 #include "index.h"
30
31 /*
32 * Allocate a new relay index object. Pass the stream in which it is
33 * contained as parameter. The sequence number will be used as the hash
34 * table key.
35 *
36 * Called with stream mutex held.
37 * Return allocated object or else NULL on error.
38 */
39 static struct relay_index *relay_index_create(struct relay_stream *stream,
40 uint64_t net_seq_num)
41 {
42 struct relay_index *index;
43
44 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
45 stream->stream_handle, net_seq_num);
46
47 index = zmalloc(sizeof(*index));
48 if (!index) {
49 PERROR("Relay index zmalloc");
50 goto end;
51 }
52 if (!stream_get(stream)) {
53 ERR("Cannot get stream");
54 free(index);
55 index = NULL;
56 goto end;
57 }
58 index->stream = stream;
59
60 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
61 pthread_mutex_init(&index->lock, NULL);
62 pthread_mutex_init(&index->reflock, NULL);
63 urcu_ref_init(&index->ref);
64
65 end:
66 return index;
67 }
68
69 /*
70 * Add unique relay index to the given hash table. In case of a collision, the
71 * already existing object is put in the given _index variable.
72 *
73 * RCU read side lock MUST be acquired.
74 */
75 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
76 struct relay_index *index)
77 {
78 struct cds_lfht_node *node_ptr;
79 struct relay_index *_index;
80
81 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
82 stream->stream_handle, index->index_n.key);
83
84 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
85 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
86 stream->indexes_ht->match_fct, &index->index_n,
87 &index->index_n.node);
88 if (node_ptr != &index->index_n.node) {
89 _index = caa_container_of(node_ptr, struct relay_index,
90 index_n.node);
91 } else {
92 _index = NULL;
93 }
94 return _index;
95 }
96
97 /*
98 * Should be called with RCU read-side lock held.
99 */
100 static bool relay_index_get(struct relay_index *index)
101 {
102 bool has_ref = false;
103
104 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
105 index->stream->stream_handle, index->index_n.key,
106 (int) index->ref.refcount);
107
108 /* Confirm that the index refcount has not reached 0. */
109 pthread_mutex_lock(&index->reflock);
110 if (index->ref.refcount != 0) {
111 has_ref = true;
112 urcu_ref_get(&index->ref);
113 }
114 pthread_mutex_unlock(&index->reflock);
115
116 return has_ref;
117 }
118
119 /*
120 * Get a relayd index in within the given stream, or create it if not
121 * present.
122 *
123 * Called with stream mutex held.
124 * Return index object or else NULL on error.
125 */
126 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
127 uint64_t net_seq_num)
128 {
129 struct lttng_ht_node_u64 *node;
130 struct lttng_ht_iter iter;
131 struct relay_index *index = NULL;
132
133 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
134 stream->stream_handle, net_seq_num);
135
136 rcu_read_lock();
137 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
138 node = lttng_ht_iter_get_node_u64(&iter);
139 if (node) {
140 index = caa_container_of(node, struct relay_index, index_n);
141 } else {
142 struct relay_index *oldindex;
143
144 index = relay_index_create(stream, net_seq_num);
145 if (!index) {
146 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
147 index->stream->stream_handle, net_seq_num);
148 goto end;
149 }
150 oldindex = relay_index_add_unique(stream, index);
151 if (oldindex) {
152 /* Added concurrently, keep old. */
153 relay_index_put(index);
154 index = oldindex;
155 if (!relay_index_get(index)) {
156 index = NULL;
157 }
158 } else {
159 stream->indexes_in_flight++;
160 index->in_hash_table = true;
161 }
162 }
163 end:
164 rcu_read_unlock();
165 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
166 (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
167 return index;
168 }
169
170 int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
171 uint64_t data_offset)
172 {
173 int ret = 0;
174
175 pthread_mutex_lock(&index->lock);
176 if (index->index_fd) {
177 ret = -1;
178 goto end;
179 }
180 stream_fd_get(index_fd);
181 index->index_fd = index_fd;
182 index->index_data.offset = data_offset;
183 end:
184 pthread_mutex_unlock(&index->lock);
185 return ret;
186 }
187
188 int relay_index_set_data(struct relay_index *index,
189 const struct ctf_packet_index *data)
190 {
191 int ret = 0;
192
193 pthread_mutex_lock(&index->lock);
194 if (index->has_index_data) {
195 ret = -1;
196 goto end;
197 }
198 /* Set everything except data_offset. */
199 index->index_data.packet_size = data->packet_size;
200 index->index_data.content_size = data->content_size;
201 index->index_data.timestamp_begin = data->timestamp_begin;
202 index->index_data.timestamp_end = data->timestamp_end;
203 index->index_data.events_discarded = data->events_discarded;
204 index->index_data.stream_id = data->stream_id;
205 index->has_index_data = true;
206 end:
207 pthread_mutex_unlock(&index->lock);
208 return ret;
209 }
210
211 static void index_destroy(struct relay_index *index)
212 {
213 free(index);
214 }
215
216 static void index_destroy_rcu(struct rcu_head *rcu_head)
217 {
218 struct relay_index *index =
219 caa_container_of(rcu_head, struct relay_index, rcu_node);
220
221 index_destroy(index);
222 }
223
224 /* Stream lock must be held by the caller. */
225 static void index_release(struct urcu_ref *ref)
226 {
227 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
228 struct relay_stream *stream = index->stream;
229 int ret;
230 struct lttng_ht_iter iter;
231
232 if (index->index_fd) {
233 stream_fd_put(index->index_fd);
234 index->index_fd = NULL;
235 }
236 if (index->in_hash_table) {
237 /* Delete index from hash table. */
238 iter.iter.node = &index->index_n.node;
239 ret = lttng_ht_del(stream->indexes_ht, &iter);
240 assert(!ret);
241 stream->indexes_in_flight--;
242 }
243
244 stream_put(index->stream);
245 index->stream = NULL;
246
247 call_rcu(&index->rcu_node, index_destroy_rcu);
248 }
249
250 /*
251 * Called with stream mutex held.
252 *
253 * Stream lock must be held by the caller.
254 */
255 void relay_index_put(struct relay_index *index)
256 {
257 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
258 index->stream->stream_handle, index->index_n.key,
259 (int) index->ref.refcount);
260 /*
261 * Ensure existance of index->lock for index unlock.
262 */
263 rcu_read_lock();
264 /*
265 * Index lock ensures that concurrent test and update of stream
266 * ref is atomic.
267 */
268 pthread_mutex_lock(&index->reflock);
269 assert(index->ref.refcount != 0);
270 urcu_ref_put(&index->ref, index_release);
271 pthread_mutex_unlock(&index->reflock);
272 rcu_read_unlock();
273 }
274
275 /*
276 * Try to flush index to disk. Releases self-reference to index once
277 * flush succeeds.
278 *
279 * Stream lock must be held by the caller.
280 * Return 0 on successful flush, a negative value on error, or positive
281 * value if no flush was performed.
282 */
283 int relay_index_try_flush(struct relay_index *index)
284 {
285 int ret = 1;
286 bool flushed = false;
287 int fd;
288
289 pthread_mutex_lock(&index->lock);
290 if (index->flushed) {
291 goto skip;
292 }
293 /* Check if we are ready to flush. */
294 if (!index->has_index_data || !index->index_fd) {
295 goto skip;
296 }
297 fd = index->index_fd->fd;
298 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
299 " on fd %d", index->stream->stream_handle,
300 index->index_n.key, fd);
301 flushed = true;
302 index->flushed = true;
303 ret = index_write(fd, &index->index_data, sizeof(index->index_data));
304 if (ret == sizeof(index->index_data)) {
305 ret = 0;
306 } else {
307 ret = -1;
308 }
309 skip:
310 pthread_mutex_unlock(&index->lock);
311
312 if (flushed) {
313 /* Put self-ref from index now that it has been flushed. */
314 relay_index_put(index);
315 }
316 return ret;
317 }
318
319 /*
320 * Close every relay index within a given stream, without flushing
321 * them.
322 */
323 void relay_index_close_all(struct relay_stream *stream)
324 {
325 struct lttng_ht_iter iter;
326 struct relay_index *index;
327
328 rcu_read_lock();
329 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
330 index, index_n.node) {
331 /* Put self-ref from index. */
332 relay_index_put(index);
333 }
334 rcu_read_unlock();
335 }
This page took 0.036216 seconds and 5 git commands to generate.