Rename data_available to data_pending
[lttng-tools.git] / src / common / relayd / relayd.c
index cf3649397425840b9c0c0e0ae07d22073255f0fa..daaf44ef04561a40725afdc618e7b91061ef83ae 100644 (file)
@@ -67,6 +67,7 @@ static int send_command(struct lttcomm_sock *sock,
 
        ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
        if (ret < 0) {
+               ret = -errno;
                goto error;
        }
 
@@ -90,6 +91,7 @@ static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
 
        ret = sock->ops->recvmsg(sock, data, size, 0);
        if (ret < 0) {
+               ret = -errno;
                goto error;
        }
 
@@ -283,6 +285,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
        /* Only send data header. */
        ret = sock->ops->sendmsg(sock, hdr, size, 0);
        if (ret < 0) {
+               ret = -errno;
                goto error;
        }
 
@@ -341,3 +344,95 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
 error:
        return ret;
 }
+
+/*
+ * Check for data availability for a given stream id.
+ *
+ * Return 0 if NOT pending, 1 if so and a negative value on error.
+ */
+int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+               uint64_t last_net_seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_data_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd data pending for stream id %" PRIu64, stream_id);
+
+       msg.stream_id = htobe64(stream_id);
+       msg.last_net_seq_num = htobe64(last_net_seq_num);
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
+                       sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code >= LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd data pending replied error %d", ret);
+       }
+
+       /* At this point, the ret code is either 1 or 0 */
+       ret = reply.ret_code;
+
+       DBG("Relayd data is %s pending for stream id %" PRIu64,
+                       ret == 1 ? "NOT" : "", stream_id);
+
+error:
+       return ret;
+}
+
+/*
+ * Check on the relayd side for a quiescent state on the control socket.
+ */
+int relayd_quiescent_control(struct lttcomm_sock *sock)
+{
+       int ret;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd checking quiescent control state");
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd quiescent control replied error %d", ret);
+               goto error;
+       }
+
+       /* Control socket is quiescent */
+       return 0;
+
+error:
+       return ret;
+}
This page took 0.029014 seconds and 5 git commands to generate.