Sinks own their input iterators
[babeltrace.git] / lib / plugin-system / sink.c
index 7067e06edc395baaf465eb957b17c9dd9e3c825a..d9b33616528ffd0d89551f997cc1023e69cb23ae 100644 (file)
@@ -27,6 +27,7 @@
  */
 
 #include <babeltrace/compiler.h>
+#include <babeltrace/values.h>
 #include <babeltrace/plugin/sink-internal.h>
 #include <babeltrace/plugin/component-internal.h>
 #include <babeltrace/plugin/notification/notification.h>
@@ -39,17 +40,14 @@ enum bt_component_status bt_component_sink_validate(
        struct bt_component_sink *sink;
 
        sink = container_of(component, struct bt_component_sink, parent);
-       if (sink->registered_notifications_mask == 0) {
-               /*
-                * A sink must be registered to at least one notification type.
-                */
-               printf_error("Invalid sink component; not registered to any notification");
+       if (!sink->consume) {
+               printf_error("Invalid sink component; no notification consumption callback defined.");
                ret = BT_COMPONENT_STATUS_INVALID;
                goto end;
        }
 
-       if (!sink->handle_notification) {
-               printf_error("Invalid sink component; no notification handling callback defined.");
+       if (sink->min_input_count > sink->max_input_count) {
+               printf_error("Invalid sink component; minimum input count > maximum input count.");
                ret = BT_COMPONENT_STATUS_INVALID;
                goto end;
        }
@@ -57,6 +55,15 @@ end:
        return ret;
 }
 
+static
+void bt_component_sink_destroy(struct bt_component *component)
+{
+       struct bt_component_sink *sink = container_of(component,
+                       struct bt_component_sink, parent);
+
+       g_ptr_array_free(sink->inputs, TRUE);
+}
+
 BT_HIDDEN
 struct bt_component *bt_component_sink_create(
                struct bt_component_class *class, struct bt_value *params)
@@ -70,16 +77,24 @@ struct bt_component *bt_component_sink_create(
        }
 
        sink->parent.class = bt_get(class);
-       ret = bt_component_init(&sink->parent, NULL);
+       ret = bt_component_init(&sink->parent, bt_component_sink_destroy);
        if (ret != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
 
+       sink->min_input_count = 1;
+       sink->max_input_count = 1;
+/*
        ret = bt_component_sink_register_notification_type(&sink->parent,
                BT_NOTIFICATION_TYPE_EVENT);
        if (ret != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
+*/
+       sink->inputs = g_ptr_array_new_with_free_func(bt_put);
+       if (!sink->inputs) {
+               goto error;
+       }
 end:
        return sink ? &sink->parent : NULL;
 error:
@@ -87,14 +102,26 @@ error:
        return NULL;
 }
 
-enum bt_component_status bt_component_sink_handle_notification(
-               struct bt_component *component,
-               struct bt_notification *notification)
+static
+enum bt_component_status validate_inputs(struct bt_component_sink *sink)
+{
+       size_t array_size = sink->inputs->len;
+
+       if (array_size < sink->min_input_count ||
+                       array_size > sink->max_input_count) {
+               return BT_COMPONENT_STATUS_INVALID;
+       }
+
+       return BT_COMPONENT_STATUS_OK;
+}
+
+enum bt_component_status bt_component_sink_consume(
+               struct bt_component *component)
 {
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
        struct bt_component_sink *sink = NULL;
 
-       if (!component || !notification) {
+       if (!component) {
                ret = BT_COMPONENT_STATUS_INVALID;
                goto end;
        }
@@ -105,12 +132,21 @@ enum bt_component_status bt_component_sink_handle_notification(
        }
 
        sink = container_of(component, struct bt_component_sink, parent);
-       assert(sink->handle_notification);
-       ret = sink->handle_notification(component, notification);
+       if (!sink->validated_inputs) {
+               ret = validate_inputs(sink);
+               if (ret != BT_COMPONENT_STATUS_OK) {
+                       goto end;
+               }
+               sink->validated_inputs = true;
+       }
+
+       assert(sink->consume);
+       ret = sink->consume(component);
 end:
        return ret;
 }
-
+/*
+static
 enum bt_component_status bt_component_sink_register_notification_type(
                struct bt_component *component, enum bt_notification_type type)
 {
@@ -142,13 +178,41 @@ enum bt_component_status bt_component_sink_register_notification_type(
 end:
        return ret;
 }
+*/
+enum bt_component_status bt_component_sink_set_consume_cb(
+               struct bt_component *component,
+               bt_component_sink_consume_cb consume)
+{
+       struct bt_component_sink *sink;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
 
-enum bt_component_status bt_component_sink_set_handle_notification_cb(
+       if (!component) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       if (!component->initializing) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       sink = container_of(component, struct bt_component_sink, parent);
+       sink->consume = consume;
+end:
+       return ret;
+}
+
+enum bt_component_status bt_component_sink_set_minimum_input_count(
                struct bt_component *component,
-               bt_component_sink_handle_notification_cb handle_notification)
+               unsigned int minimum)
 {
+       struct bt_component_sink *sink;
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
-       struct bt_component_sink *sink = NULL;
 
        if (!component) {
                ret = BT_COMPONENT_STATUS_INVALID;
@@ -160,8 +224,127 @@ enum bt_component_status bt_component_sink_set_handle_notification_cb(
                goto end;
        }
 
+       if (!component->initializing) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
        sink = container_of(component, struct bt_component_sink, parent);
-       sink->handle_notification = handle_notification;
+       sink->min_input_count = minimum;
+end:
+       return ret;
+}
+
+enum bt_component_status bt_component_sink_set_maximum_input_count(
+               struct bt_component *component,
+               unsigned int maximum)
+{
+       struct bt_component_sink *sink;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       if (!component) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       if (!component->initializing) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       sink = container_of(component, struct bt_component_sink, parent);
+       sink->max_input_count = maximum;
+end:
+       return ret;
+}
+
+enum bt_component_status
+bt_component_sink_get_input_count(struct bt_component *component,
+               unsigned int *count)
+{
+       struct bt_component_sink *sink;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       if (!component || !count) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       sink = container_of(component, struct bt_component_sink, parent);
+       *count = (unsigned int) sink->inputs->len;
+end:
+       return ret;
+}
+
+enum bt_component_status
+bt_component_sink_get_input_iterator(struct bt_component *component,
+               unsigned int input, struct bt_notification_iterator **iterator)
+{
+       struct bt_component_sink *sink;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       if (!component || !iterator) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       sink = container_of(component, struct bt_component_sink, parent);
+       if (input >= (unsigned int) sink->inputs->len) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       *iterator = bt_get(g_ptr_array_index(sink->inputs, input));
+end:
+       return ret;
+}
+
+enum bt_component_status
+bt_component_sink_add_iterator(struct bt_component *component,
+               struct bt_notification_iterator *iterator)
+{
+       struct bt_component_sink *sink;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       if (!component || !iterator) {
+               ret = BT_COMPONENT_STATUS_INVALID;
+               goto end;
+       }
+
+       if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       sink = container_of(component, struct bt_component_sink, parent);
+       if (sink->inputs->len == sink->max_input_count) {
+               ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+               goto end;
+       }
+
+       if (sink->add_iterator) {
+               ret = sink->add_iterator(component, iterator);
+               if (ret != BT_COMPONENT_STATUS_OK) {
+                       goto end;
+               }
+       }
+
+       g_ptr_array_add(sink->inputs, bt_get(iterator));
 end:
        return ret;
 }
This page took 0.025991 seconds and 4 git commands to generate.