diff options
| -rw-r--r-- | config.h.in | 32 | ||||
| -rw-r--r-- | connection.c | 45 | ||||
| -rw-r--r-- | connection.h | 15 | ||||
| -rw-r--r-- | main.c | 80 | ||||
| -rw-r--r-- | meson.build | 10 | ||||
| -rw-r--r-- | pipeline.c | 324 | ||||
| -rw-r--r-- | pipeline.h | 65 |
7 files changed, 499 insertions, 72 deletions
diff --git a/config.h.in b/config.h.in index 9ef3491..59956d4 100644 --- a/config.h.in +++ b/config.h.in @@ -3,6 +3,9 @@ #mesondefine HAS_ATTR_WUR #mesondefine HAS_ATTR_ALWAYS_INLINE +#mesondefine HAS_ATTR_MALLOC +#mesondefine HAS_ATTR_FORMAT +#mesondefine HAS_RESTRICT #ifdef HAS_ATTR_WUR #define ATTR_WUR __attribute__((warn_unused_result)) @@ -16,4 +19,33 @@ #define ATTR_ALWAYS_INLINE #endif +#ifdef HAS_ATTR_MALLOC +#define ATTR_MALLOC(_x) +#else +#define ATTR_MALLOC(_x) __attribute__((malloc _x)) +#endif + +#ifdef HAS_ATTR_FORMAT +#define ATTR_FORMAT(_x) __attribute__((format _x)) +#else +#define ATTR_FORMAT(_x) +#endif + +#ifdef HAS_RESTRICT +#define PTX_RESTRICT restrict +#else +#define PTX_RESTRICT +#endif + +/* meson doesn't support this one yet :( */ +#ifdef __has_attribute +# if __has_attribute(access) +# define ATTR_ACCESS(_x) __attribute__((access _x)) +# else +# define ATTR_ACCESS(_x) +# endif +#else +#define ATTR_ACCESS(_x) +#endif + #endif /* include guard */ diff --git a/connection.c b/connection.c new file mode 100644 index 0000000..ecbb7fa --- /dev/null +++ b/connection.c @@ -0,0 +1,45 @@ +#include "connection.h" +#include "pipeline.h" +#include <stdlib.h> + +struct ptx_connection *ptx_connection_new(void) +{ + struct ptx_connection *conn = malloc(sizeof(struct ptx_connection)); + if (!conn) return NULL; + + /* initialize these in case we goto cleanup later */ + conn->cli_read = NULL; + conn->cli_write = NULL; + conn->srv_read = NULL; + conn->srv_write = NULL; + + /* do initialization */ + + if (!(conn->cli_read = ptx_pipeline_new(4))) goto cleanup; + if (!(conn->cli_write = ptx_pipeline_new(4))) goto cleanup; + if (!(conn->srv_read = ptx_pipeline_new(4))) goto cleanup; + if (!(conn->srv_write = ptx_pipeline_new(4))) goto cleanup; + + return conn; + +cleanup: + ptx_connection_free(conn); + + return NULL; +} + +void ptx_connection_free(struct ptx_connection *conn) +{ + /* NOTE: this function might be called with a partially-constructed or inconsistent object */ + + if (!conn) return; + + /* do cleanup */ + + ptx_pipeline_free(conn->cli_read); + ptx_pipeline_free(conn->cli_write); + ptx_pipeline_free(conn->srv_read); + ptx_pipeline_free(conn->srv_write); + + free(conn); +} diff --git a/connection.h b/connection.h new file mode 100644 index 0000000..416e076 --- /dev/null +++ b/connection.h @@ -0,0 +1,15 @@ +#ifndef PTXMC_CONNECTION_H_INCLUDED +#define PTXMC_CONNECTION_H_INCLUDED + +#include "pipeline.h" + +struct ptx_connection +{ + ptx_pipeline_t *cli_read, *cli_write; + ptx_pipeline_t *srv_read, *srv_write; +}; + +struct ptx_connection *ptx_connection_new(void); +void ptx_connection_free(struct ptx_connection *conn); + +#endif /* include guard */ @@ -1,25 +1,61 @@ +#include <stdio.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdarg.h> + #include "pipeline.h" #define UNUSED(_v) ((void)(_v)) -PIPELINE_STAGE int stage(struct pipeline_ctx_t *ctx, void *data, size_t len) +PIPELINE_STAGE int identity_stage_handle(const ptx_pipeline_ctx_t *ctx, void *data, size_t sz) +{ + return ptx_pipeline_ctx_next(ctx, data, sz); +} + +PIPELINE_INIT int debug_stage_init(const char *name, void **ppuser, va_list init_args) +{ + static const char *const messages[] = { "no continue :(", "continue :)" }; + + UNUSED(name); + + int config = !!va_arg(init_args, int); + *ppuser = (void *)(uintptr_t)config; + + printf("Debug stage %s (init): %s\n", name, messages[config]); + + return 0; +} + +PIPELINE_CLEANUP void debug_stage_cleanup(const char *name, void *puser) { - UNUSED(ctx); - UNUSED(data); - UNUSED(len); - return 1; + printf("Debug stage %s (cleanup): %p\n", name, puser); } -PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t len) { - return pipeline_next(ctx, data, len); +PIPELINE_STAGE int debug_stage_handle(const ptx_pipeline_ctx_t *ctx, void *data, size_t sz) +{ + bool cont = !!(uintptr_t)(*ptx_pipeline_ctx_get_user(ctx)); + printf("Debug stage %s:\n Data: %p\n Size: %zu\n", ptx_pipeline_ctx_get_name(ctx), data, sz); + return cont ? ptx_pipeline_ctx_next(ctx, data, sz) : 0; } +static const struct ptx_pipeline_stage_funcs identity_stage = { + .init = NULL, + .handler = &identity_stage_handle, + .cleanup = NULL +}; + +static const struct ptx_pipeline_stage_funcs debug_stage = { + .init = &debug_stage_init, + .handler = &debug_stage_handle, + .cleanup = &debug_stage_cleanup +}; + /* mockup: * Pipeline signature: D -> D*1 * int simple_1to1_stage(struct pipeline_ctx *ctx, void *data, size_t len) { * void *out = ... transform data ...; * size_t out_len = ^^^; - * return pipeline_next(ctx, out, out_len); + * return ptx_pipeline_next(ctx, out, out_len); * } * * Pipeline signature: D -> D*(0,n) @@ -30,7 +66,7 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t * return -1; ... next_data failed somehow ... * } * - * if ((err = pipeline_next(ctx, out, out_len)) < 0) { + * if ((err = ptx_pipeline_next(ctx, out, out_len)) < 0) { * return err; * } * } @@ -41,7 +77,7 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t * int stateful_stage(struct pipeline_ctx *ctx, void *data, size_t len) { * my_stage_t *stg = ctx->me; * stg->stat_in += len; - * return pipeline_next(ctx, data, len); + * return ptx_pipeline_next(ctx, data, len); * } * * Pipeline signature: D -> void (i.e., never calls next) @@ -53,5 +89,29 @@ PIPELINE_STAGE int identity_stage(struct pipeline_ctx_t *ctx, void *data, size_t */ int main(void) { + ptx_pipeline_t *pl = ptx_pipeline_new(1); + if (!pl) { + fputs("Allocation failure: pipeline\n", stderr); + return 1; + } + + if (ptx_pipeline_add_before(pl, &identity_stage, "identity1", NULL) < 0) { + fputs("Failed to add identity1 to pipeline\n", stderr); + return 1; + } + + if (ptx_pipeline_add_after(pl, &debug_stage, "debug1", "identity1", true) < 0) { + fputs("Failed to add debug1 to pipeline\n", stderr); + return 1; + } + + if (ptx_pipeline_add_after(pl, &debug_stage, "debug2", "debug1", false) < 0) { + fputs("Failed to add debug2 to pipeline\n", stderr); + return 1; + } + + printf("Handle returned: %d\n", ptx_pipeline_handle(pl, "sussy", 10)); + ptx_pipeline_free(pl); + return 0; } diff --git a/meson.build b/meson.build index 4e2d1b6..da4fe0e 100644 --- a/meson.build +++ b/meson.build @@ -5,8 +5,16 @@ c_comp = meson.get_compiler('c') conf_data = configuration_data() conf_data.set('HAS_ATTR_WUR', c_comp.has_function_attribute('warn_unused_result')) conf_data.set('HAS_ATTR_ALWAYS_INLINE', c_comp.has_function_attribute('always_inline')) +conf_data.set('HAS_ATTR_MALLOC', c_comp.has_function_attribute('malloc')) +conf_data.set('HAS_ATTR_FORMAT', c_comp.has_function_attribute('format')) +conf_data.set('HAS_RESTRICT', c_comp.has_type('int *restrict')) configure_file(input : 'config.h.in', output : 'config.h', configuration : conf_data) conf_include = include_directories('.') -executable('ptxmc', 'main.c', 'pipeline.c', include_directories : conf_include) +executable('ptxmc', + 'main.c', + 'pipeline.c', + 'connection.c', + include_directories : conf_include, + c_args : [ '-D_XOPEN_SOURCE=700' ]) @@ -1,59 +1,311 @@ #include "pipeline.h" + #include <string.h> +#include <stdlib.h> +#include <stdbool.h> +#include <assert.h> +#include <limits.h> -int pipeline_add_stage_after(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name) +struct ptx__pipeline_stage_tag { - if (!name) { - /* name == NULL: insert at end of pipeline */ - stage->next = NULL; + struct ptx_pipeline_stage_funcs impl; + char *name; + void *user; +}; -#if 0 - if (pl->first) { - struct pipeline_stage_t *tail = pl->first; - for (; tail->next; tail = tail->next); - tail->next = stage; - } else { /* corner case: pipeline is empty */ - pl->first = stage; - } -#else - /* i like this implementation because it's pretty */ - struct pipeline_stage_t **ptail = &pl->first; - for (; *ptail; ptail = &(*ptail)->next); - *ptail = stage; -#endif +struct ptx__pipeline_ctx_tag +{ + ptx_pipeline_t *pipeline; + size_t curidx; +}; + +struct ptx__pipeline_tag +{ + size_t stages_len, stages_cap; + struct ptx__pipeline_stage_tag *stages; + bool handling; +}; + +static int ptx__pipeline_reserve(ptx_pipeline_t *pl, size_t req) +{ + assert(pl); + assert(req < SSIZE_MAX); + + if (req < pl->stages_cap) return 0; + + struct ptx__pipeline_stage_tag *temp = realloc(pl->stages, req * sizeof(struct ptx__pipeline_stage_tag)); + if (!temp) return -1; + + pl->stages = temp; + pl->stages_cap = req; + + return 1; +} - return 1; +/* allocates a new pipeline, with space for `initial_size' new stages. */ +ATTR_WUR ptx_pipeline_t *ptx_pipeline_new(size_t initial_size) +{ + ptx_pipeline_t *pl = malloc(sizeof(ptx_pipeline_t)); + if (!pl) return NULL; + + pl->stages_len = 0; + pl->stages_cap = 0; + pl->stages = NULL; + pl->handling = false; + + ptx__pipeline_reserve(pl, initial_size < 1 ? 1 : initial_size); + + return pl; +} + +static void ptx__cleanup_pipeline_stage(struct ptx__pipeline_stage_tag *stage) +{ + assert(stage); + + if (stage->impl.cleanup) { + (*stage->impl.cleanup)(stage->name, stage->user); } - for (struct pipeline_stage_t *cur = pl->first; cur; cur = cur->next) { - if (name == cur->name || !strcmp(name, cur->name)) { - stage->next = cur->next; - cur->next = stage; - return 1; - } + free(stage->name); + + /* clobber the structure just cuz */ + memset(stage, 0, sizeof(*stage)); +} + +/* frees an existing pipeline stage. note that this will call the cleanup function in the stages as well. */ +void ptx_pipeline_free(ptx_pipeline_t *pl) +{ + if (!pl) return; + + for (size_t idx = 0; idx < pl->stages_len; ++idx) { + ptx__cleanup_pipeline_stage(pl->stages + idx); + } + + free(pl->stages); + free(pl); +} + +static int ptx__init_pipeline_stage( + struct ptx__pipeline_stage_tag *PTX_RESTRICT pstage, + const struct ptx_pipeline_stage_funcs *PTX_RESTRICT stage_impl, + const char *name, + va_list setup_args) +{ + assert(pstage); + assert(stage_impl); + assert(name); + + char *namedup = NULL; + void *user = NULL; + + /* I'd rather try to duplicate the name first. Otherwise, we'd have to clean up the + * stage's user data right after initializing it. */ + namedup = strdup(name); + if (!namedup) goto cleanup; + + if (stage_impl->init && (*stage_impl->init)(name, &user, setup_args) < 0) { + /* setup failed :((( */ + goto cleanup; } + /* yay it worked :) */ + pstage->user = user; + pstage->name = namedup; + memcpy(&pstage->impl, stage_impl, sizeof(*stage_impl)); + return 0; + +cleanup: + free(namedup); + return -1; } -int pipeline_add_stage_before(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name) +/* this will grow a pipeline to fit a new stage. + * this function also shifts entries in `pl->stages' such that `to_add' is an empty stage. */ +static int ptx__pipeline_prepare_insert_stage_at(ptx_pipeline_t *pipeline, size_t to_add) { - if (!name || !pl->first || name == pl->first->name || !strcmp(name, pl->first->name)) { - stage->next = pl->first; - pl->first = stage; - return 1; + assert(to_add <= pipeline->stages_len); + if (ptx__pipeline_reserve(pipeline, pipeline->stages_len + 1) < 0) { + return -1; } - /* note that it's okay we don't properly check if prev->first->name equals name here, since that was done above. */ - for (struct pipeline_stage_t *cur = pl->first->next, *prev = pl->first; cur; prev = cur, cur = cur->next) { - if (name == cur->name || !strcmp(name, cur->name)) { - prev->next = stage; - stage->next = cur; - return 1; + /* don't perform the move if we wouldn't be moving anything. */ + if (to_add < pipeline->stages_len) { + memmove(pipeline->stages + to_add + 1, + pipeline->stages + to_add, + sizeof(struct ptx__pipeline_stage_tag) * (pipeline->stages_len - to_add)); + } + + /* increment the length of the array here for consistency. + * NOTE: this function still leaves the pipeline in an inconsistent state: + * there is an uninitialized stage at `to_add' :( */ + ++pipeline->stages_len; + + return 0; +} + +static ssize_t ptx__pipeline_find_stage(ptx_pipeline_t *ptx, const char *name) +{ + assert(ptx); + assert(name); + + for (size_t idx = 0; idx < ptx->stages_len; ++idx) { + if (!strcmp(ptx->stages[idx].name, name)) { + return idx; } } + return -1; +} + +static int ptx__pipeline_add_at( + ptx_pipeline_t *pipeline, + const struct ptx_pipeline_stage_funcs *stage_impl, + const char *name, + size_t idx, + va_list init_args) +{ + struct ptx__pipeline_stage_tag temp_stage; + memset(&temp_stage, 0, sizeof(temp_stage)); + + /* first, initialize the stage itself */ + if (ptx__init_pipeline_stage(&temp_stage, stage_impl, name, init_args) < 0) { + return -1; + } + + /* next, grow the pipeline to fit the new stage */ + if (ptx__pipeline_prepare_insert_stage_at(pipeline, idx) < 0) { + /* uh oh, it failed. clean up the stage we just initialized */ + ptx__cleanup_pipeline_stage(&temp_stage); + return -1; + } + + /* finally, put the stage into the array */ + memmove(pipeline->stages + idx, &temp_stage, sizeof(temp_stage)); return 0; } +/* adds `stage' with `name' to `pipeline' after `after' * + * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */ +int ptx_pipeline_add_after( + ptx_pipeline_t *pipeline, + const struct ptx_pipeline_stage_funcs *stage, + const char *name, + const char *after, + ...) +{ + assert(pipeline); + assert(!pipeline->handling); + + size_t idx; + + if (after) { + ssize_t sidx = ptx__pipeline_find_stage(pipeline, after); + if (sidx < 0) return 0; + + idx = sidx + 1; /* after */ + } else { + idx = pipeline->stages_len; + } + + va_list init_args; + va_start(init_args, after); + + int ret = ptx__pipeline_add_at(pipeline, stage, name, idx, init_args); + + va_end(init_args); + return ret; +} + +/* adds `stage' with `name' to `pipeline' before `before' + * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */ +int ptx_pipeline_add_before( + ptx_pipeline_t *pipeline, + const struct ptx_pipeline_stage_funcs *stage, + const char *name, + const char *before, + ...) +{ + assert(pipeline); + assert(!pipeline->handling); + + size_t idx; + + if (before) { + ssize_t sidx = ptx__pipeline_find_stage(pipeline, before); + if (sidx < 0) return 0; + + idx = sidx; /* before */ + } else { + idx = 0; + } + + va_list init_args; + va_start(init_args, before); + + int ret = ptx__pipeline_add_at(pipeline, stage, name, idx, init_args); + va_end(init_args); + return ret; +} + +/* returns: number of stages removed from pipeline */ +int ptx_pipeline_remove_stage(ptx_pipeline_t *pipeline, const char *name) +{ + assert(pipeline); + assert(!pipeline->handling); + + ssize_t idx = ptx__pipeline_find_stage(pipeline, name); + if (idx < 0) return 0; + + ptx__cleanup_pipeline_stage(pipeline->stages + idx); + memmove(pipeline->stages + idx, pipeline->stages + idx + 1, sizeof(pipeline->stages[0]) * (pipeline->stages_len - idx)); + + --pipeline->stages_len; + return 1; +} + +void **ptx_pipeline_ctx_get_user(const ptx_pipeline_ctx_t *ctx) +{ + assert(ctx); + return &ctx->pipeline->stages[ctx->curidx].user; +} + +const char *ptx_pipeline_ctx_get_name(const ptx_pipeline_ctx_t *ctx) +{ + assert(ctx); + return ctx->pipeline->stages[ctx->curidx].name; +} + +int ptx_pipeline_ctx_next(const ptx_pipeline_ctx_t *ctx, void *nextdata, size_t nextsize) +{ + assert(ctx->curidx + 1 < ctx->pipeline->stages_len); + + ptx_pipeline_ctx_t new_ctx; + new_ctx.pipeline = ctx->pipeline; + new_ctx.curidx = ctx->curidx + 1; + + assert(new_ctx.pipeline->stages[new_ctx.curidx].impl.handler); + + return (*new_ctx.pipeline->stages[new_ctx.curidx].impl.handler)(&new_ctx, nextdata, nextsize); +} + +int ptx_pipeline_handle(ptx_pipeline_t *pipeline, void *data, size_t sz) +{ + assert(pipeline); + assert(!pipeline->handling); + + if (pipeline->stages_len == 0) return 0; /* pipeline does nothing... */ + + pipeline->handling = true; + + ptx_pipeline_ctx_t ctx; + ctx.pipeline = pipeline; + ctx.curidx = 0; + + assert(pipeline->stages[0].impl.handler); + int ret = (*pipeline->stages[0].impl.handler)(&ctx, data, sz); + + pipeline->handling = 0; + return ret; +} @@ -3,39 +3,54 @@ #include <assert.h> #include <stddef.h> +#include <stdarg.h> + #include "config.h" -#define PIPELINE_STAGE ATTR_WUR +#define PIPELINE_STAGE ATTR_WUR ATTR_ACCESS((read_only, 1)) +#define PIPELINE_INIT ATTR_ACCESS((read_only, 1)) +#define PIPELINE_CLEANUP ATTR_ACCESS((read_only, 1)) -struct pipeline_ctx_t; -typedef int (pipeline_stage_proc)(struct pipeline_ctx_t * /*ctx*/, void * /*data*/, size_t /*len*/) PIPELINE_STAGE; +typedef struct ptx__pipeline_ctx_tag ptx_pipeline_ctx_t; +typedef struct ptx__pipeline_tag ptx_pipeline_t; -struct pipeline_stage_t -{ - const char *name; - pipeline_stage_proc *proc; - struct pipeline_stage_t *next; - void *user; -}; +typedef int (ptx_pipeline_proc_t)(const ptx_pipeline_ctx_t * /*ctx*/, void * /*data*/, size_t /*len*/) PIPELINE_STAGE; +typedef int (ptx_pipeline_init_proc_t)(const char * /*name*/, void ** /*ppuser*/, va_list /*init_args*/) PIPELINE_INIT; +typedef void (ptx_pipeline_cleanup_proc_t)(const char * /*name*/, void * /*puser*/) PIPELINE_CLEANUP; -struct pipeline_t -{ - struct pipeline_stage_t *first; +struct ptx_pipeline_stage_funcs { + ptx_pipeline_proc_t *handler; + ptx_pipeline_init_proc_t *init; + ptx_pipeline_cleanup_proc_t *cleanup; }; -struct pipeline_ctx_t -{ - struct pipeline_stage_t *stage; -}; +/* allocates a new pipeline, with space for `initial_size' new stages. */ +ptx_pipeline_t *ptx_pipeline_new(size_t initial_size) ATTR_WUR ATTR_MALLOC() ATTR_MALLOC((ptx_pipeline_free, 1)); + +/* frees an existing pipeline stage. note that this will call the cleanup function in the stages as well. */ +void ptx_pipeline_free(ptx_pipeline_t *pl); + +/* adds `stage' with `name' to `pipeline' after `after' * + * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */ +int ptx_pipeline_add_after(ptx_pipeline_t *pipeline, const struct ptx_pipeline_stage_funcs *stage, const char *name, const char *after, ...); +/* adds `stage' with `name' to `pipeline' before `before' + * returns: number of stages added to pipeline (0 or 1), or -1 if there was an allocation or initialization error. */ +int ptx_pipeline_add_before(ptx_pipeline_t *pipeline, const struct ptx_pipeline_stage_funcs *stage, const char *name, const char *before, ...); + +/* TODO: could theoretically have versions of the above functions that don't defensively copy `name' */ + +/* returns: number of stages removed from pipeline */ +int ptx_pipeline_remove_stage(ptx_pipeline_t *pipeline, const char *name); + +/* this pointer is guaranteed to be valid as long as the pipeline context is active (i.e., within this handler function) + * If you need to access user data of a stage from outside of a handler function, TODO */ +void **ptx_pipeline_ctx_get_user(const ptx_pipeline_ctx_t *ctx); +const char *ptx_pipeline_ctx_get_name(const ptx_pipeline_ctx_t *ctx); + +int ptx_pipeline_ctx_next(const ptx_pipeline_ctx_t *ctx, void *nextdata, size_t nextsize); -int pipeline_add_stage_before(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name); -int pipeline_add_stage_after(struct pipeline_t *pl, struct pipeline_stage_t *stage, const char *name); +void ptx_pipeline_ctx_diag(const ptx_pipeline_ctx_t *ctx, int lvl, const char *fmt, ...) ATTR_FORMAT((printf, 3, 4)); /* TODO: pipeline stages should be able to send diagnostic messages */ -ATTR_ALWAYS_INLINE inline int pipeline_next(struct pipeline_ctx_t *ctx, void *data, size_t len) -{ - assert(ctx->stage->next); - ctx->stage = ctx->stage->next; - return (ctx->stage->proc)(ctx, data, len); -} +int ptx_pipeline_handle(ptx_pipeline_t *pipeline, void *data, size_t sz); /* TODO: diagnostic */ #endif /* include guard */ |
