Fix: src.ctf.lttng-live: expect NEW_STREAM/METADATA for inactive streams
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.c
index 4139546611fd20a865cd8ba676fd8478706f003d..9b3e24003ece58e26c961179d6bb28f4c36980f6 100644 (file)
@@ -110,6 +110,24 @@ enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status(
        }
 }
 
+static inline
+void viewer_connection_close_socket(
+               struct live_viewer_connection *viewer_connection)
+{
+       bt_self_component_class *self_comp_class =
+               viewer_connection->self_comp_class;
+       bt_self_component *self_comp =
+               viewer_connection->self_comp;
+       int ret = bt_socket_close(viewer_connection->control_sock);
+       if (ret == -1) {
+               BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(
+                       self_comp, self_comp_class,
+                       "Error closing viewer connection socket: ", ".");
+       }
+
+       viewer_connection->control_sock = BT_INVALID_SOCKET;
+}
+
 /*
  * This function receives a message from the Relay daemon.
  * If it received the entire message, it returns _OK,
@@ -157,12 +175,14 @@ enum lttng_live_viewer_status lttng_live_recv(
                                }
                        } else {
                                /*
-                                * For any other types of socket error, returng
-                                * an error.
+                                * For any other types of socket error, close
+                                * the socket and return an error.
                                 */
                                LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
                                        self_comp, self_comp_class,
                                        "Error receiving from Relay", ".");
+
+                               viewer_connection_close_socket(viewer_connection);
                                status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                                goto end;
                        }
@@ -172,10 +192,12 @@ enum lttng_live_viewer_status lttng_live_recv(
                         * connection was orderly shutdown from the other peer.
                         * If that happens when we are trying to receive
                         * a message from it, it means something when wrong.
+                        * Close the socket and return an error.
                         */
-                       status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                        BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                                self_comp_class, "Remote side has closed connection");
+                       viewer_connection_close_socket(viewer_connection);
+                       status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                        goto end;
                }
 
@@ -236,11 +258,14 @@ enum lttng_live_viewer_status lttng_live_send(
                                }
                        } else {
                                /*
-                                * The send() call returned an error.
+                                * For any other types of socket error, close
+                                * the socket and return an error.
                                 */
                                LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
                                        self_comp, self_comp_class,
                                        "Error sending to Relay", ".");
+
+                               viewer_connection_close_socket(viewer_connection);
                                status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                                goto end;
                        }
@@ -387,7 +412,7 @@ end:
 
 static
 enum lttng_live_viewer_status lttng_live_connect_viewer(
-       struct live_viewer_connection *viewer_connection)
+               struct live_viewer_connection *viewer_connection)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
@@ -1032,7 +1057,7 @@ end:
 }
 
 BT_HIDDEN
-enum lttng_live_viewer_status lttng_live_attach_session(
+enum lttng_live_viewer_status lttng_live_session_attach(
                struct lttng_live_session *session,
                bt_self_message_iterator *self_msg_iter)
 {
@@ -1134,7 +1159,7 @@ end:
 }
 
 BT_HIDDEN
-enum lttng_live_viewer_status lttng_live_detach_session(
+enum lttng_live_viewer_status lttng_live_session_detach(
                struct lttng_live_session *session)
 {
        struct lttng_viewer_cmd cmd;
@@ -1150,7 +1175,12 @@ enum lttng_live_viewer_status lttng_live_detach_session(
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
 
-       if (!session->attached) {
+       /*
+        * The session might already be detached and the viewer socket might
+        * already been closed. This happens when calling this function when
+        * tearing down the graph after an error.
+        */
+       if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
                return 0;
        }
 
@@ -1217,8 +1247,8 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_metadata rq;
        struct lttng_viewer_metadata_packet rp;
-       char *data = NULL;
-       ssize_t ret_len;
+       gchar *data = NULL;
+       ssize_t writelen;
        struct lttng_live_session *session = trace->session;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                session->lttng_live_msg_iter;
@@ -1282,24 +1312,28 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
                        BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                                "Received get_metadata response: unknown");
                        status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
-                       goto error;
+                       goto end;
        }
 
        len = be64toh(rp.len);
-       BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
-       if (len <= 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Erroneous response length");
-               status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
-               goto error;
+       if (len == 0) {
+               /*
+                * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
+                * length of 0. This means we must try again. This scenario
+                * arises when a clear command is performed on an lttng session.
+                */
+               BT_COMP_LOGD("Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
+               goto empty_metadata_packet_retry;
        }
 
-       data = calloc(1, len);
+       BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
+
+       data = g_new0(gchar, len);
        if (!data) {
                BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
                        "Failed to allocate data buffer", ".");
                status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
-               goto error;
+               goto end;
        }
 
        viewer_status = lttng_live_recv(viewer_connection, data, len);
@@ -1307,29 +1341,26 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
                viewer_handle_recv_status(self_comp, NULL,
                        viewer_status, "get metadata packet");
                status = (enum lttng_live_get_one_metadata_status) viewer_status;
-               goto error;
+               goto end;
        }
 
        /*
         * Write the metadata to the file handle.
         */
-       do {
-               ret_len = fwrite(data, 1, len, fp);
-       } while (ret_len < 0 && errno == EINTR);
-       if (ret_len < 0) {
+       writelen = fwrite(data, sizeof(uint8_t), len, fp);
+       if (writelen != len) {
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Writing in the metadata file stream");
                status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
-               goto error;
+               goto end;
        }
-       BT_ASSERT(ret_len == len);
+
+empty_metadata_packet_retry:
        *reply_len = len;
        status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
 
-error:
-       free(data);
-
 end:
+       g_free(data);
        return status;
 }
 
@@ -1351,6 +1382,19 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
        pindex->events_discarded = be64toh(lindex->events_discarded);
 }
 
+static
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
+{
+       uint64_t session_idx;
+
+       for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+                       session_idx++) {
+               struct lttng_live_session *session =
+                       g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+               session->new_streams_needed = true;
+       }
+}
+
 BT_HIDDEN
 enum lttng_live_iterator_status lttng_live_get_next_index(
                struct lttng_live_msg_iter *lttng_live_msg_iter,
@@ -1405,6 +1449,13 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
        flags = be32toh(rp.flags);
        rp_status = be32toh(rp.status);
 
+       if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+               BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
+                       "response=%"PRId32 ", response-flag=NEW_STREAM",
+                       rp_status);
+               lttng_live_need_new_streams(lttng_live_msg_iter);
+       }
+
        switch (rp_status) {
        case LTTNG_VIEWER_INDEX_INACTIVE:
        {
@@ -1445,10 +1496,6 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
                        BT_COMP_LOGD("Received get_next_index response: new metadata needed");
                        trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
                }
-               if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
-                       BT_COMP_LOGD("Received get_next_index response: new streams needed");
-                       lttng_live_need_new_streams(lttng_live_msg_iter);
-               }
                status = LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
        }
@@ -1607,7 +1654,7 @@ end:
  * Request new streams for a session.
  */
 BT_HIDDEN
-enum lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_session_get_new_streams(
                struct lttng_live_session *session,
                bt_self_message_iterator *self_msg_iter)
 {
This page took 0.031917 seconds and 4 git commands to generate.