bdbd11330f638a05223c81d256da6ef2da646dfd
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25 #include <common/compat/endian.h>
26
27 #include "lttng-relayd.h"
28 #include "stream.h"
29 #include "index.h"
30 #include "connection.h"
31
32 /*
33 * Allocate a new relay index object. Pass the stream in which it is
34 * contained as parameter. The sequence number will be used as the hash
35 * table key.
36 *
37 * Called with stream mutex held.
38 * Return allocated object or else NULL on error.
39 */
40 static struct relay_index *relay_index_create(struct relay_stream *stream,
41 uint64_t net_seq_num)
42 {
43 struct relay_index *index;
44
45 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
46 stream->stream_handle, net_seq_num);
47
48 index = zmalloc(sizeof(*index));
49 if (!index) {
50 PERROR("Relay index zmalloc");
51 goto end;
52 }
53 if (!stream_get(stream)) {
54 ERR("Cannot get stream");
55 free(index);
56 index = NULL;
57 goto end;
58 }
59 index->stream = stream;
60
61 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
62 pthread_mutex_init(&index->lock, NULL);
63 urcu_ref_init(&index->ref);
64
65 end:
66 return index;
67 }
68
69 /*
70 * Add unique relay index to the given hash table. In case of a collision, the
71 * already existing object is put in the given _index variable.
72 *
73 * RCU read side lock MUST be acquired.
74 */
75 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
76 struct relay_index *index)
77 {
78 struct cds_lfht_node *node_ptr;
79 struct relay_index *_index;
80
81 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
82 stream->stream_handle, index->index_n.key);
83
84 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
85 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
86 stream->indexes_ht->match_fct, &index->index_n,
87 &index->index_n.node);
88 if (node_ptr != &index->index_n.node) {
89 _index = caa_container_of(node_ptr, struct relay_index,
90 index_n.node);
91 } else {
92 _index = NULL;
93 }
94 return _index;
95 }
96
97 /*
98 * Should be called with RCU read-side lock held.
99 */
100 static bool relay_index_get(struct relay_index *index)
101 {
102 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
103 index->stream->stream_handle, index->index_n.key,
104 (int) index->ref.refcount);
105
106 return urcu_ref_get_unless_zero(&index->ref);
107 }
108
109 /*
110 * Get a relayd index in within the given stream, or create it if not
111 * present.
112 *
113 * Called with stream mutex held.
114 * Return index object or else NULL on error.
115 */
116 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
117 uint64_t net_seq_num)
118 {
119 struct lttng_ht_node_u64 *node;
120 struct lttng_ht_iter iter;
121 struct relay_index *index = NULL;
122
123 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
124 stream->stream_handle, net_seq_num);
125
126 rcu_read_lock();
127 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
128 node = lttng_ht_iter_get_node_u64(&iter);
129 if (node) {
130 index = caa_container_of(node, struct relay_index, index_n);
131 } else {
132 struct relay_index *oldindex;
133
134 index = relay_index_create(stream, net_seq_num);
135 if (!index) {
136 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
137 stream->stream_handle, net_seq_num);
138 goto end;
139 }
140 oldindex = relay_index_add_unique(stream, index);
141 if (oldindex) {
142 /* Added concurrently, keep old. */
143 relay_index_put(index);
144 index = oldindex;
145 if (!relay_index_get(index)) {
146 index = NULL;
147 }
148 } else {
149 stream->indexes_in_flight++;
150 index->in_hash_table = true;
151 }
152 }
153 end:
154 rcu_read_unlock();
155 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
156 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
157 return index;
158 }
159
160 int relay_index_set_file(struct relay_index *index,
161 struct lttng_index_file *index_file,
162 uint64_t data_offset)
163 {
164 int ret = 0;
165
166 pthread_mutex_lock(&index->lock);
167 if (index->index_file) {
168 ret = -1;
169 goto end;
170 }
171 lttng_index_file_get(index_file);
172 index->index_file = index_file;
173 index->index_data.offset = data_offset;
174 end:
175 pthread_mutex_unlock(&index->lock);
176 return ret;
177 }
178
179 int relay_index_set_data(struct relay_index *index,
180 const struct ctf_packet_index *data)
181 {
182 int ret = 0;
183
184 pthread_mutex_lock(&index->lock);
185 if (index->has_index_data) {
186 ret = -1;
187 goto end;
188 }
189 /* Set everything except data_offset. */
190 index->index_data.packet_size = data->packet_size;
191 index->index_data.content_size = data->content_size;
192 index->index_data.timestamp_begin = data->timestamp_begin;
193 index->index_data.timestamp_end = data->timestamp_end;
194 index->index_data.events_discarded = data->events_discarded;
195 index->index_data.stream_id = data->stream_id;
196 index->has_index_data = true;
197 end:
198 pthread_mutex_unlock(&index->lock);
199 return ret;
200 }
201
202 static void index_destroy(struct relay_index *index)
203 {
204 free(index);
205 }
206
207 static void index_destroy_rcu(struct rcu_head *rcu_head)
208 {
209 struct relay_index *index =
210 caa_container_of(rcu_head, struct relay_index, rcu_node);
211
212 index_destroy(index);
213 }
214
215 /* Stream lock must be held by the caller. */
216 static void index_release(struct urcu_ref *ref)
217 {
218 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
219 struct relay_stream *stream = index->stream;
220 int ret;
221 struct lttng_ht_iter iter;
222
223 if (index->index_file) {
224 lttng_index_file_put(index->index_file);
225 index->index_file = NULL;
226 }
227 if (index->in_hash_table) {
228 /* Delete index from hash table. */
229 iter.iter.node = &index->index_n.node;
230 ret = lttng_ht_del(stream->indexes_ht, &iter);
231 assert(!ret);
232 stream->indexes_in_flight--;
233 }
234
235 stream_put(index->stream);
236 index->stream = NULL;
237
238 call_rcu(&index->rcu_node, index_destroy_rcu);
239 }
240
241 /*
242 * Called with stream mutex held.
243 *
244 * Stream lock must be held by the caller.
245 */
246 void relay_index_put(struct relay_index *index)
247 {
248 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
249 index->stream->stream_handle, index->index_n.key,
250 (int) index->ref.refcount);
251 /*
252 * Ensure existance of index->lock for index unlock.
253 */
254 rcu_read_lock();
255 /*
256 * Index lock ensures that concurrent test and update of stream
257 * ref is atomic.
258 */
259 assert(index->ref.refcount != 0);
260 urcu_ref_put(&index->ref, index_release);
261 rcu_read_unlock();
262 }
263
264 /*
265 * Try to flush index to disk. Releases self-reference to index once
266 * flush succeeds.
267 *
268 * Stream lock must be held by the caller.
269 * Return 0 on successful flush, a negative value on error, or positive
270 * value if no flush was performed.
271 */
272 int relay_index_try_flush(struct relay_index *index)
273 {
274 int ret = 1;
275 bool flushed = false;
276 int fd;
277
278 pthread_mutex_lock(&index->lock);
279 if (index->flushed) {
280 goto skip;
281 }
282 /* Check if we are ready to flush. */
283 if (!index->has_index_data || !index->index_file) {
284 goto skip;
285 }
286 fd = index->index_file->fd;
287 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
288 " on fd %d", index->stream->stream_handle,
289 index->index_n.key, fd);
290 flushed = true;
291 index->flushed = true;
292 ret = lttng_index_file_write(index->index_file, &index->index_data);
293 skip:
294 pthread_mutex_unlock(&index->lock);
295
296 if (flushed) {
297 /* Put self-ref from index now that it has been flushed. */
298 relay_index_put(index);
299 }
300 return ret;
301 }
302
303 /*
304 * Close every relay index within a given stream, without flushing
305 * them.
306 */
307 void relay_index_close_all(struct relay_stream *stream)
308 {
309 struct lttng_ht_iter iter;
310 struct relay_index *index;
311
312 rcu_read_lock();
313 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
314 index, index_n.node) {
315 /* Put self-ref from index. */
316 relay_index_put(index);
317 }
318 rcu_read_unlock();
319 }
320
321 void relay_index_close_partial_fd(struct relay_stream *stream)
322 {
323 struct lttng_ht_iter iter;
324 struct relay_index *index;
325
326 rcu_read_lock();
327 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
328 index, index_n.node) {
329 if (!index->index_file) {
330 continue;
331 }
332 /*
333 * Partial index has its index_file: we have only
334 * received its info from the data socket.
335 * Put self-ref from index.
336 */
337 relay_index_put(index);
338 }
339 rcu_read_unlock();
340 }
341
342 uint64_t relay_index_find_last(struct relay_stream *stream)
343 {
344 struct lttng_ht_iter iter;
345 struct relay_index *index;
346 uint64_t net_seq_num = -1ULL;
347
348 rcu_read_lock();
349 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
350 index, index_n.node) {
351 if (net_seq_num == -1ULL ||
352 index->index_n.key > net_seq_num) {
353 net_seq_num = index->index_n.key;
354 }
355 }
356 rcu_read_unlock();
357 return net_seq_num;
358 }
359
360 /*
361 * Update the index file of an already existing relay_index.
362 * Offsets by 'removed_data_count' the offset field of an index.
363 */
364 static
365 int relay_index_switch_file(struct relay_index *index,
366 struct lttng_index_file *new_index_file,
367 uint64_t removed_data_count)
368 {
369 int ret = 0;
370 uint64_t offset;
371
372 pthread_mutex_lock(&index->lock);
373 if (!index->index_file) {
374 ERR("No index_file");
375 ret = 0;
376 goto end;
377 }
378
379 lttng_index_file_put(index->index_file);
380 lttng_index_file_get(new_index_file);
381 index->index_file = new_index_file;
382 offset = be64toh(index->index_data.offset);
383 index->index_data.offset = htobe64(offset - removed_data_count);
384
385 end:
386 pthread_mutex_unlock(&index->lock);
387 return ret;
388 }
389
390 /*
391 * Switch the index file of all pending indexes for a stream and update the
392 * data offset by substracting the last safe position.
393 * Stream lock must be held.
394 */
395 int relay_index_switch_all_files(struct relay_stream *stream)
396 {
397 struct lttng_ht_iter iter;
398 struct relay_index *index;
399 int ret = 0;
400
401 rcu_read_lock();
402 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
403 index, index_n.node) {
404 DBG("Update index to fd %d", stream->index_file->fd);
405 ret = relay_index_switch_file(index, stream->index_file,
406 stream->pos_after_last_complete_data_index);
407 if (ret) {
408 goto end;
409 }
410 }
411 end:
412 rcu_read_unlock();
413 return ret;
414 }
415
416 /*
417 * Set index data from the control port to a given index object.
418 */
419 int relay_index_set_control_data(struct relay_index *index,
420 const struct lttcomm_relayd_index *data,
421 unsigned int minor_version)
422 {
423 /* The index on disk is encoded in big endian. */
424 const struct ctf_packet_index index_data = {
425 .packet_size = htobe64(data->packet_size),
426 .content_size = htobe64(data->content_size),
427 .timestamp_begin = htobe64(data->timestamp_begin),
428 .timestamp_end = htobe64(data->timestamp_end),
429 .events_discarded = htobe64(data->events_discarded),
430 .stream_id = htobe64(data->stream_id),
431 };
432
433 if (minor_version >= 8) {
434 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
435 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
436 } else {
437 uint64_t unset_value = -1ULL;
438
439 index->index_data.stream_instance_id = htobe64(unset_value);
440 index->index_data.packet_seq_num = htobe64(unset_value);
441 }
442
443 return relay_index_set_data(index, &index_data);
444 }
This page took 0.037895 seconds and 4 git commands to generate.