summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar bigfoot547 <[email protected]>2025-11-14 22:13:45 -0600
committerLibravatar bigfoot547 <[email protected]>2025-11-15 00:21:37 -0600
commit635b33df0f7d567626fab4f1acaa91e5609e92ca (patch)
tree981e50865b73732a9ea34a4e35d8229be6db5489
parentinitial commit (diff)
update some stuff
-rw-r--r--config.h.in32
-rw-r--r--connection.c45
-rw-r--r--connection.h15
-rw-r--r--main.c80
-rw-r--r--meson.build10
-rw-r--r--pipeline.c324
-rw-r--r--pipeline.h65
7 files changed, 499 insertions, 72 deletions
diff --git a/config.h.in b/config.h.in
index 9ef3491..59956d4 100644
--- a/config.h.in
+++ b/config.h.in
@@ -3,6 +3,9 @@
#mesondefine HAS_ATTR_WUR
#mesondefine HAS_ATTR_ALWAYS_INLINE
+#mesondefine HAS_ATTR_MALLOC
+#mesondefine HAS_ATTR_FORMAT
+#mesondefine HAS_RESTRICT
#ifdef HAS_ATTR_WUR
#define ATTR_WUR __attribute__((warn_unused_result))
@@ -16,4 +19,33 @@
#define ATTR_ALWAYS_INLINE
#endif
+#ifdef HAS_ATTR_MALLOC
+#define ATTR_MALLOC(_x)
+#else
+#define ATTR_MALLOC(_x) __attribute__((malloc _x))
+#endif
+
+#ifdef HAS_ATTR_FORMAT
+#define ATTR_FORMAT(_x) __attribute__((format _x))
+#else
+#define ATTR_FORMAT(_x)
+#endif
+
+#ifdef HAS_RESTRICT
+#define PTX_RESTRICT restrict
+#else
+#define PTX_RESTRICT
+#endif
+
+/* meson doesn't support this one yet :( */
+#ifdef __has_attribute
+# if __has_attribute(access)
+# define ATTR_ACCESS(_x) __attribute__((access _x))
+# else
+# define ATTR_ACCESS(_x)
+# endif
+#else
+#define ATTR_ACCESS(_x)
+#endif
+
#endif /* include guard */
diff --git a/connection.c b/connection.c
new file mode 100644
index 0000000..ecbb7fa
--- /dev/null
+++ b/connection.c
@@ -0,0 +1,45 @@
+#include "connection.h"
+#include "pipeline.h"
+#include <stdlib.h>
+
+struct ptx_connection *ptx_connection_new(void)
+{
+ struct ptx_connection *conn = malloc(sizeof(struct ptx_connection));
+ if (!conn) return NULL;
+
+ /* initialize these in case we goto cleanup later */
+ conn->cli_read = NULL;
+ conn->cli_write = NULL;
+ conn->srv_read = NULL;
+ conn->srv_write = NULL;
+
+ /* do initialization */
+
+ if (!(conn->cli_read = ptx_pipeline_new(4))) goto cleanup;
+ if (!(conn->cli_write = ptx_pipeline_new(4))) goto cleanup;
+ if (!(conn->srv_read = ptx_pipeline_new(4))) goto cleanup;
+ if (!(conn->srv_write = ptx_pipeline_new(4))) goto cleanup;
+
+ return conn;
+
+cleanup:
+ ptx_connection_free(conn);
+
+ return NULL;
+}
+
+void ptx_connection_free(struct ptx_connection *conn)
+{
+ /* NOTE: this function might be called with a partially-constructed or inconsistent object */
+
+ if (!conn) return;
+
+ /* do cleanup */
+
+ ptx_pipeline_free(conn->cli_read);
+ ptx_pipeline_free(conn->cli_write);
+ ptx_pipeline_free(conn->srv_read);
+ ptx_pipeline_free(conn->srv_write);
+
+ free(conn);
+}
diff --git a/connection.h b/connection.h
new file mode 100644
index 0000000..416e076
--- /dev/null
+++ b/connection.h
@@ -0,0 +1,15 @@
+#ifndef PTXMC_CONNECTION_H_INCLUDED
+#define PTXMC_CONNECTION_H_INCLUDED
+
+#include "pipeline.h"
+
+struct ptx_connection
+{
+ ptx_pipeline_t *cli_read, *cli_write;
+ ptx_pipeline_t *srv_read, *srv_write;
+};
+
+struct ptx_connection *ptx_connection_new(void);
+void ptx_connection_free(struct ptx_connection *conn);
+
+#endif /* include guard */
diff --git a/main.c b/main.c
index e1e22ec..be91332 100644
--- a/main.c
+++ b/main.c
@@ -1,25 +1,61 @@
+#include <stdio.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdarg.h>
+
#include "pipeline.h"
#define UNUSED(_v) ((void)(_v))
-PIPELINE_STAGE int stage(struct pipeline_ctx_t *ctx, void *data, size_t len)
+PIPELINE_STAGE int identity_stage_handle(const ptx_pipeline_ctx_t *ctx, void *data, size_t sz)
+{
+ return ptx_pipeline_ctx_next(ctx, data, sz);
+}
+
+PIPELINE_INIT int debug_stage_init(const char *name, void **ppuser, va_list init_args)
+{
+ static const char *const messages[] = { "no continue :(", "continue :)" };
+
+ UNUSED(name);
+
+ int config = !!va_arg(init_args, int);
+ *ppuser = (void *)(uintptr_t)config;
+
+ printf("Debug stage %s (init): %s\n", name, messages[config]);
+
+ return 0;
+}
+
+PIPELINE_CLEANUP void debug_stage_cleanup(const char *name, void *puser)
{
- UNUSED(ctx);
- UNUSED(data);
- UNUSED(len);
- return 1;
+ printf("Debug stage %s (cleanup): %p\n", name, puser);
}
-PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t len) {
- return pipeline_next(ctx, data, len);
+PIPELINE_STAGE int debug_stage_handle(const ptx_pipeline_ctx_t *ctx, void *data, size_t sz)
+{
+ bool cont = !!(uintptr_t)(*ptx_pipeline_ctx_get_user(ctx));
+ printf("Debug stage %s:\n Data: %p\n Size: %zu\n", ptx_pipeline_ctx_get_name(ctx), data, sz);
+ return cont ? ptx_pipeline_ctx_next(ctx, data, sz) : 0;
}
+static const struct ptx_pipeline_stage_funcs identity_stage = {
+ .init = NULL,
+ .handler = &identity_stage_handle,
+ .cleanup = NULL
+};
+
+static const struct ptx_pipeline_stage_funcs debug_stage = {
+ .init = &debug_stage_init,
+ .handler = &debug_stage_handle,
+ .cleanup = &debug_stage_cleanup
+};
+
/* mockup:
* Pipeline signature: D -> D*1
* int simple_1to1_stage(struct pipeline_ctx *ctx, void *data, size_t len) {
* void *out = ... transform data ...;
* size_t out_len = ^^^;
- * return pipeline_next(ctx, out, out_len);
+ * return ptx_pipeline_next(ctx, out, out_len);
* }
*
* Pipeline signature: D -> D*(0,n)
@@ -30,7 +66,7 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t
* return -1; ... next_data failed somehow ...
* }
*
- * if ((err = pipeline_next(ctx, out, out_len)) < 0) {
+ * if ((err = ptx_pipeline_next(ctx, out, out_len)) < 0) {
* return err;
* }
* }
@@ -41,7 +77,7 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t
* int stateful_stage(struct pipeline_ctx *ctx, void *data, size_t len) {
* my_stage_t *stg = ctx->me;
* stg->stat_in += len;
- * return pipeline_next(ctx, data, len);
+ * return ptx_pipeline_next(ctx, data, len);
* }
*
* Pipeline signature: D -> void (i.e., never calls next)
@@ -53,5 +89,29 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t
*/
int main(void) {
+ ptx_pipeline_t *pl = ptx_pipeline_new(1);
+ if (!pl) {
+ fputs("Allocation failure: pipeline\n", stderr);
+ return 1;
+ }
+
+ if (ptx_pipeline_add_before(pl, &identity_stage, "identity1", NULL) < 0) {
+ fputs("Failed to add identity1 to pipeline\n", stderr);
+ return 1;
+ }
+
+ if (ptx_pipeline_add_after(pl, &debug_stage, "debug1", "identity1", true) < 0) {
+ fputs("Failed to add debug1 to pipeline\n", stderr);
+ return 1;
+ }
+
+ if (ptx_pipeline_add_after(pl, &debug_stage, "debug2", "debug1", false) < 0) {
+ fputs("Failed to add debug2 to pipeline\n", stderr);
+ return 1;
+ }
+
+ printf("Handle returned: %d\n", ptx_pipeline_handle(pl, "sussy", 10));
+ ptx_pipeline_free(pl);
+
return 0;
}
diff --git a/meson.build b/meson.build
index 4e2d1b6..da4fe0e 100644
--- a/meson.build
+++ b/meson.build
@@ -5,8 +5,16 @@ c_comp = meson.get_compiler('c')
conf_data = configuration_data()
conf_data.set('HAS_ATTR_WUR', c_comp.has_function_attribute('warn_unused_result'))
conf_data.set('HAS_ATTR_ALWAYS_INLINE', c_comp.has_function_attribute('always_inline'))
+conf_data.set('HAS_ATTR_MALLOC', c_comp.has_function_attribute('malloc'))
+conf_data.set('HAS_ATTR_FORMAT', c_comp.has_function_attribute('format'))
+conf_data.set('HAS_RESTRICT', c_comp.has_type('int *restrict'))
configure_file(input : 'config.h.in', output : 'config.h', configuration : conf_data)
conf_include = include_directories('.')
-executable('ptxmc', 'main.c', 'pipeline.c', include_directories : conf_include)
+executable('ptxmc',
+ 'main.c',
+ 'pipeline.c',
+ 'connection.c',
+ include_directories : conf_include,
+ c_args : [ '-D_XOPEN_SOURCE=700' ])
diff --git a/pipeline.c b/pipeline.c
index 477421f..bead598 100644
--- a/pipeline.c
+++ b/pipeline.c
@@ -1,59 +1,311 @@
#include "pipeline.h"
+
#include <string.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <assert.h>
+#include <limits.h>
-int pipeline_add_stage_after(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name)
+struct ptx__pipeline_stage_tag
{
- if (!name) {
- /* name == NULL: insert at end of pipeline */
- stage->next = NULL;
+ struct ptx_pipeline_stage_funcs impl;
+ char *name;
+ void *user;
+};
-#if 0
- if (pl->first) {
- struct pipeline_stage_t *tail = pl->first;
- for (; tail->next; tail = tail->next);
- tail->next = stage;
- } else { /* corner case: pipeline is empty */
- pl->first = stage;
- }
-#else
- /* i like this implementation because it's pretty */
- struct pipeline_stage_t **ptail = &pl->first;
- for (; *ptail; ptail = &(*ptail)->next);
- *ptail = stage;
-#endif
+struct ptx__pipeline_ctx_tag
+{
+ ptx_pipeline_t *pipeline;
+ size_t curidx;
+};
+
+struct ptx__pipeline_tag
+{
+ size_t stages_len, stages_cap;
+ struct ptx__pipeline_stage_tag *stages;
+ bool handling;
+};
+
+static int ptx__pipeline_reserve(ptx_pipeline_t *pl, size_t req)
+{
+ assert(pl);
+ assert(req < SSIZE_MAX);
+
+ if (req < pl->stages_cap) return 0;
+
+ struct ptx__pipeline_stage_tag *temp = realloc(pl->stages, req * sizeof(struct ptx__pipeline_stage_tag));
+ if (!temp) return -1;
+
+ pl->stages = temp;
+ pl->stages_cap = req;
+
+ return 1;
+}
- return 1;
+/* allocates a new pipeline, with space for `initial_size' new stages. */
+ATTR_WUR ptx_pipeline_t *ptx_pipeline_new(size_t initial_size)
+{
+ ptx_pipeline_t *pl = malloc(sizeof(ptx_pipeline_t));
+ if (!pl) return NULL;
+
+ pl->stages_len = 0;
+ pl->stages_cap = 0;
+ pl->stages = NULL;
+ pl->handling = false;
+
+ ptx__pipeline_reserve(pl, initial_size < 1 ? 1 : initial_size);
+
+ return pl;
+}
+
+static void ptx__cleanup_pipeline_stage(struct ptx__pipeline_stage_tag *stage)
+{
+ assert(stage);
+
+ if (stage->impl.cleanup) {
+ (*stage->impl.cleanup)(stage->name, stage->user);
}
- for (struct pipeline_stage_t *cur = pl->first; cur; cur = cur->next) {
- if (name == cur->name || !strcmp(name, cur->name)) {
- stage->next = cur->next;
- cur->next = stage;
- return 1;
- }
+ free(stage->name);
+
+ /* clobber the structure just cuz */
+ memset(stage, 0, sizeof(*stage));
+}
+
+/* frees an existing pipeline stage. note that this will call the cleanup function in the stages as well. */
+void ptx_pipeline_free(ptx_pipeline_t *pl)
+{
+ if (!pl) return;
+
+ for (size_t idx = 0; idx < pl->stages_len; ++idx) {
+ ptx__cleanup_pipeline_stage(pl->stages + idx);
+ }
+
+ free(pl->stages);
+ free(pl);
+}
+
+static int ptx__init_pipeline_stage(
+ struct ptx__pipeline_stage_tag *PTX_RESTRICT pstage,
+ const struct ptx_pipeline_stage_funcs *PTX_RESTRICT stage_impl,
+ const char *name,
+ va_list setup_args)
+{
+ assert(pstage);
+ assert(stage_impl);
+ assert(name);
+
+ char *namedup = NULL;
+ void *user = NULL;
+
+ /* I'd rather try to duplicate the name first. Otherwise, we'd have to clean up the
+ * stage's user data right after initializing it. */
+ namedup = strdup(name);
+ if (!namedup) goto cleanup;
+
+ if (stage_impl->init && (*stage_impl->init)(name, &user, setup_args) < 0) {
+ /* setup failed :((( */
+ goto cleanup;
}
+ /* yay it worked :) */
+ pstage->user = user;
+ pstage->name = namedup;
+ memcpy(&pstage->impl, stage_impl, sizeof(*stage_impl));
+
return 0;
+
+cleanup:
+ free(namedup);
+ return -1;
}
-int pipeline_add_stage_before(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name)
+/* this will grow a pipeline to fit a new stage.
+ * this function also shifts entries in `pl->stages' such that `to_add' is an empty stage. */
+static int ptx__pipeline_prepare_insert_stage_at(ptx_pipeline_t *pipeline, size_t to_add)
{
- if (!name || !pl->first || name == pl->first->name || !strcmp(name, pl->first->name)) {
- stage->next = pl->first;
- pl->first = stage;
- return 1;
+ assert(to_add <= pipeline->stages_len);
+ if (ptx__pipeline_reserve(pipeline, pipeline->stages_len + 1) < 0) {
+ return -1;
}
- /* note that it's okay we don't properly check if prev->first->name equals name here, since that was done above. */
- for (struct pipeline_stage_t *cur = pl->first->next, *prev = pl->first; cur; prev = cur, cur = cur->next) {
- if (name == cur->name || !strcmp(name, cur->name)) {
- prev->next = stage;
- stage->next = cur;
- return 1;
+ /* don't perform the move if we wouldn't be moving anything. */
+ if (to_add < pipeline->stages_len) {
+ memmove(pipeline->stages + to_add + 1,
+ pipeline->stages + to_add,
+ sizeof(struct ptx__pipeline_stage_tag) * (pipeline->stages_len - to_add));
+ }
+
+ /* increment the length of the array here for consistency.
+ * NOTE: this function still leaves the pipeline in an inconsistent state:
+ * there is an uninitialized stage at `to_add' :( */
+ ++pipeline->stages_len;
+
+ return 0;
+}
+
+static ssize_t ptx__pipeline_find_stage(ptx_pipeline_t *ptx, const char *name)
+{
+ assert(ptx);
+ assert(name);
+
+ for (size_t idx = 0; idx < ptx->stages_len; ++idx) {
+ if (!strcmp(ptx->stages[idx].name, name)) {
+ return idx;
}
}
+ return -1;
+}
+
+static int ptx__pipeline_add_at(
+ ptx_pipeline_t *pipeline,
+ const struct ptx_pipeline_stage_funcs *stage_impl,
+ const char *name,
+ size_t idx,
+ va_list init_args)
+{
+ struct ptx__pipeline_stage_tag temp_stage;
+ memset(&temp_stage, 0, sizeof(temp_stage));
+
+ /* first, initialize the stage itself */
+ if (ptx__init_pipeline_stage(&temp_stage, stage_impl, name, init_args) < 0) {
+ return -1;
+ }
+
+ /* next, grow the pipeline to fit the new stage */
+ if (ptx__pipeline_prepare_insert_stage_at(pipeline, idx) < 0) {
+ /* uh oh, it failed. clean up the stage we just initialized */
+ ptx__cleanup_pipeline_stage(&temp_stage);
+ return -1;
+ }
+
+ /* finally, put the stage into the array */
+ memmove(pipeline->stages + idx, &temp_stage, sizeof(temp_stage));
return 0;
}
+/* adds `stage' with `name' to `pipeline' after `after' *
+ * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */
+int ptx_pipeline_add_after(
+ ptx_pipeline_t *pipeline,
+ const struct ptx_pipeline_stage_funcs *stage,
+ const char *name,
+ const char *after,
+ ...)
+{
+ assert(pipeline);
+ assert(!pipeline->handling);
+
+ size_t idx;
+
+ if (after) {
+ ssize_t sidx = ptx__pipeline_find_stage(pipeline, after);
+ if (sidx < 0) return 0;
+
+ idx = sidx + 1; /* after */
+ } else {
+ idx = pipeline->stages_len;
+ }
+
+ va_list init_args;
+ va_start(init_args, after);
+
+ int ret = ptx__pipeline_add_at(pipeline, stage, name, idx, init_args);
+
+ va_end(init_args);
+ return ret;
+}
+
+/* adds `stage' with `name' to `pipeline' before `before'
+ * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */
+int ptx_pipeline_add_before(
+ ptx_pipeline_t *pipeline,
+ const struct ptx_pipeline_stage_funcs *stage,
+ const char *name,
+ const char *before,
+ ...)
+{
+ assert(pipeline);
+ assert(!pipeline->handling);
+
+ size_t idx;
+
+ if (before) {
+ ssize_t sidx = ptx__pipeline_find_stage(pipeline, before);
+ if (sidx < 0) return 0;
+
+ idx = sidx; /* before */
+ } else {
+ idx = 0;
+ }
+
+ va_list init_args;
+ va_start(init_args, before);
+
+ int ret = ptx__pipeline_add_at(pipeline, stage, name, idx, init_args);
+ va_end(init_args);
+ return ret;
+}
+
+/* returns: number of stages removed from pipeline */
+int ptx_pipeline_remove_stage(ptx_pipeline_t *pipeline, const char *name)
+{
+ assert(pipeline);
+ assert(!pipeline->handling);
+
+ ssize_t idx = ptx__pipeline_find_stage(pipeline, name);
+ if (idx < 0) return 0;
+
+ ptx__cleanup_pipeline_stage(pipeline->stages + idx);
+ memmove(pipeline->stages + idx, pipeline->stages + idx + 1, sizeof(pipeline->stages[0]) * (pipeline->stages_len - idx));
+
+ --pipeline->stages_len;
+ return 1;
+}
+
+void **ptx_pipeline_ctx_get_user(const ptx_pipeline_ctx_t *ctx)
+{
+ assert(ctx);
+ return &ctx->pipeline->stages[ctx->curidx].user;
+}
+
+const char *ptx_pipeline_ctx_get_name(const ptx_pipeline_ctx_t *ctx)
+{
+ assert(ctx);
+ return ctx->pipeline->stages[ctx->curidx].name;
+}
+
+int ptx_pipeline_ctx_next(const ptx_pipeline_ctx_t *ctx, void *nextdata, size_t nextsize)
+{
+ assert(ctx->curidx + 1 < ctx->pipeline->stages_len);
+
+ ptx_pipeline_ctx_t new_ctx;
+ new_ctx.pipeline = ctx->pipeline;
+ new_ctx.curidx = ctx->curidx + 1;
+
+ assert(new_ctx.pipeline->stages[new_ctx.curidx].impl.handler);
+
+ return (*new_ctx.pipeline->stages[new_ctx.curidx].impl.handler)(&new_ctx, nextdata, nextsize);
+}
+
+int ptx_pipeline_handle(ptx_pipeline_t *pipeline, void *data, size_t sz)
+{
+ assert(pipeline);
+ assert(!pipeline->handling);
+
+ if (pipeline->stages_len == 0) return 0; /* pipeline does nothing... */
+
+ pipeline->handling = true;
+
+ ptx_pipeline_ctx_t ctx;
+ ctx.pipeline = pipeline;
+ ctx.curidx = 0;
+
+ assert(pipeline->stages[0].impl.handler);
+ int ret = (*pipeline->stages[0].impl.handler)(&ctx, data, sz);
+
+ pipeline->handling = 0;
+ return ret;
+}
diff --git a/pipeline.h b/pipeline.h
index e164c58..562f1df 100644
--- a/pipeline.h
+++ b/pipeline.h
@@ -3,39 +3,54 @@
#include <assert.h>
#include <stddef.h>
+#include <stdarg.h>
+
#include "config.h"
-#define PIPELINE_STAGE ATTR_WUR
+#define PIPELINE_STAGE ATTR_WUR ATTR_ACCESS((read_only, 1))
+#define PIPELINE_INIT ATTR_ACCESS((read_only, 1))
+#define PIPELINE_CLEANUP ATTR_ACCESS((read_only, 1))
-struct pipeline_ctx_t;
-typedef int (pipeline_stage_proc)(struct pipeline_ctx_t * /*ctx*/, void * /*data*/, size_t /*len*/) PIPELINE_STAGE;
+typedef struct ptx__pipeline_ctx_tag ptx_pipeline_ctx_t;
+typedef struct ptx__pipeline_tag ptx_pipeline_t;
-struct pipeline_stage_t
-{
- const char *name;
- pipeline_stage_proc *proc;
- struct pipeline_stage_t *next;
- void *user;
-};
+typedef int (ptx_pipeline_proc_t)(const ptx_pipeline_ctx_t * /*ctx*/, void * /*data*/, size_t /*len*/) PIPELINE_STAGE;
+typedef int (ptx_pipeline_init_proc_t)(const char * /*name*/, void ** /*ppuser*/, va_list /*init_args*/) PIPELINE_INIT;
+typedef void (ptx_pipeline_cleanup_proc_t)(const char * /*name*/, void * /*puser*/) PIPELINE_CLEANUP;
-struct pipeline_t
-{
- struct pipeline_stage_t *first;
+struct ptx_pipeline_stage_funcs {
+ ptx_pipeline_proc_t *handler;
+ ptx_pipeline_init_proc_t *init;
+ ptx_pipeline_cleanup_proc_t *cleanup;
};
-struct pipeline_ctx_t
-{
- struct pipeline_stage_t *stage;
-};
+/* allocates a new pipeline, with space for `initial_size' new stages. */
+ptx_pipeline_t *ptx_pipeline_new(size_t initial_size) ATTR_WUR ATTR_MALLOC() ATTR_MALLOC((ptx_pipeline_free, 1));
+
+/* frees an existing pipeline stage. note that this will call the cleanup function in the stages as well. */
+void ptx_pipeline_free(ptx_pipeline_t *pl);
+
+/* adds `stage' with `name' to `pipeline' after `after' *
+ * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */
+int ptx_pipeline_add_after(ptx_pipeline_t *pipeline, const struct ptx_pipeline_stage_funcs *stage, const char *name, const char *after, ...);
+/* adds `stage' with `name' to `pipeline' before `before'
+ * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */
+int ptx_pipeline_add_before(ptx_pipeline_t *pipeline, const struct ptx_pipeline_stage_funcs *stage, const char *name, const char *before, ...);
+
+/* TODO: could theoretically have versions of the above functions that don't defensively copy `name' */
+
+/* returns: number of stages removed from pipeline */
+int ptx_pipeline_remove_stage(ptx_pipeline_t *pipeline, const char *name);
+
+/* this pointer is guaranteed to be valid as long as the pipeline context is active (i.e., within this handler function)
+ * If you need to access user data of a stage from outside of a handler function, TODO */
+void **ptx_pipeline_ctx_get_user(const ptx_pipeline_ctx_t *ctx);
+const char *ptx_pipeline_ctx_get_name(const ptx_pipeline_ctx_t *ctx);
+
+int ptx_pipeline_ctx_next(const ptx_pipeline_ctx_t *ctx, void *nextdata, size_t nextsize);
-int pipeline_add_stage_before(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name);
-int pipeline_add_stage_after(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name);
+void ptx_pipeline_ctx_diag(const ptx_pipeline_ctx_t *ctx, int lvl, const char *fmt, ...) ATTR_FORMAT((printf, 3, 4)); /* TODO: pipeline stages should be able to send diagnostic messages */
-ATTR_ALWAYS_INLINE inline int pipeline_next(struct pipeline_ctx_t *ctx, void *data, size_t len)
-{
- assert(ctx->stage->next);
- ctx->stage = ctx->stage->next;
- return (ctx->stage->proc)(ctx, data, len);
-}
+int ptx_pipeline_handle(ptx_pipeline_t *pipeline, void *data, size_t sz); /* TODO: diagnostic */
#endif /* include guard */