b7507a022076e0e6be789523111c08f4fcad2d66
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25
26 #include "lttng-relayd.h"
27 #include "index.h"
28
29 /*
30 * Deferred free of a relay index object. MUST only be called by a call RCU.
31 */
32 static void deferred_free_relay_index(struct rcu_head *head)
33 {
34 struct relay_index *index =
35 caa_container_of(head, struct relay_index, rcu_node);
36
37 if (index->to_close_fd >= 0) {
38 int ret;
39
40 ret = close(index->to_close_fd);
41 if (ret < 0) {
42 PERROR("Relay index to close fd %d", index->to_close_fd);
43 }
44 }
45
46 relay_index_free(index);
47 }
48
49 /*
50 * Allocate a new relay index object using the given stream ID and sequence
51 * number as the hash table key.
52 *
53 * Return allocated object or else NULL on error.
54 */
55 struct relay_index *relay_index_create(uint64_t stream_id,
56 uint64_t net_seq_num)
57 {
58 struct relay_index *index;
59
60 DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
61 stream_id, net_seq_num);
62
63 index = zmalloc(sizeof(*index));
64 if (index == NULL) {
65 PERROR("Relay index zmalloc");
66 goto error;
67 }
68
69 index->to_close_fd = -1;
70 lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
71
72 error:
73 return index;
74 }
75
76 /*
77 * Find a relayd index in the given hash table.
78 *
79 * Return index object or else NULL on error.
80 */
81 struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
82 {
83 struct lttng_ht_node_two_u64 *node;
84 struct lttng_ht_iter iter;
85 struct lttng_ht_two_u64 key;
86 struct relay_index *index = NULL;
87
88 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
89 stream_id, net_seq_num);
90
91 key.key1 = stream_id;
92 key.key2 = net_seq_num;
93
94 lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
95 node = lttng_ht_iter_get_node_two_u64(&iter);
96 if (node == NULL) {
97 goto end;
98 }
99 index = caa_container_of(node, struct relay_index, index_n);
100
101 end:
102 DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
103 (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
104 return index;
105 }
106
107 /*
108 * Add unique relay index to the given hash table. In case of a collision, the
109 * already existing object is put in the given _index variable.
110 *
111 * RCU read side lock MUST be acquired.
112 */
113 void relay_index_add(struct relay_index *index, struct relay_index **_index)
114 {
115 struct cds_lfht_node *node_ptr;
116
117 assert(index);
118
119 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
120 index->index_n.key.key1, index->index_n.key.key2);
121
122 node_ptr = cds_lfht_add_unique(indexes_ht->ht,
123 indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
124 indexes_ht->match_fct, (void *) &index->index_n.key,
125 &index->index_n.node);
126 if (node_ptr != &index->index_n.node) {
127 *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
128 }
129 }
130
131 /*
132 * Write index on disk to the given fd. Once done error or not, it is removed
133 * from the hash table and destroy the object.
134 *
135 * MUST be called with a RCU read side lock held.
136 *
137 * Return 0 on success else a negative value.
138 */
139 int relay_index_write(int fd, struct relay_index *index)
140 {
141 int ret;
142 struct lttng_ht_iter iter;
143
144 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
145 " on fd %d", index->index_n.key.key1,
146 index->index_n.key.key2, fd);
147
148 /* Delete index from hash table. */
149 iter.iter.node = &index->index_n.node;
150 ret = lttng_ht_del(indexes_ht, &iter);
151 assert(!ret);
152 call_rcu(&index->rcu_node, deferred_free_relay_index);
153
154 return index_write(fd, &index->index_data, sizeof(index->index_data));
155 }
156
157 /*
158 * Free the given index.
159 */
160 void relay_index_free(struct relay_index *index)
161 {
162 free(index);
163 }
164
165 /*
166 * Safely free the given index using a call RCU.
167 */
168 void relay_index_free_safe(struct relay_index *index)
169 {
170 if (!index) {
171 return;
172 }
173
174 call_rcu(&index->rcu_node, deferred_free_relay_index);
175 }
176
177 /*
178 * Delete index from the given hash table.
179 *
180 * RCU read side lock MUST be acquired.
181 */
182 void relay_index_delete(struct relay_index *index)
183 {
184 int ret;
185 struct lttng_ht_iter iter;
186
187 DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
188 " deleted.", index->index_n.key.key1,
189 index->index_n.key.key2);
190
191 /* Delete index from hash table. */
192 iter.iter.node = &index->index_n.node;
193 ret = lttng_ht_del(indexes_ht, &iter);
194 assert(!ret);
195 }
196
197 /*
198 * Destroy every relay index with the given stream id as part of the key.
199 */
200 void relay_index_destroy_by_stream_id(uint64_t stream_id)
201 {
202 struct lttng_ht_iter iter;
203 struct relay_index *index;
204
205 rcu_read_lock();
206 cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
207 if (index->index_n.key.key1 == stream_id) {
208 relay_index_delete(index);
209 relay_index_free_safe(index);
210 }
211 }
212 rcu_read_unlock();
213 }
This page took 0.033654 seconds and 4 git commands to generate.