From bfc01c15f56c58cefc8680f7faed4c5e3650fa38 Mon Sep 17 00:00:00 2001 From: Chip Black Date: Mon, 21 Feb 2011 03:53:11 -0600 Subject: [PATCH] Add some subscription (a.k.a. "follow") functionality --- Makefile | 2 +- common/stringbucket.c | 112 ++++++++++++++++++++++++++++++++++++++++ common/stringbucket.h | 17 ++++++ database/subscription.c | 60 +++++++++++++++++++++ database/subscription.h | 14 +++++ http/http_blerg.c | 49 ++++++++++++++++++ 6 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 common/stringbucket.c create mode 100644 common/stringbucket.h create mode 100644 database/subscription.c create mode 100644 database/subscription.h diff --git a/Makefile b/Makefile index d18e276..9a935f5 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ HTTP_LIBDIRS = $(MHD_LIBDIR) $(YAJL_LIBDIR) CGI_LIBDIRS = $(CGI_UTIL_LIBDIR) $(YAJL_LIBDIR) targets = blerg.a blergtool blerglatest blerg.httpd blerg.cgi rss.cgi -blerg_a_objects = database/database.o database/tags.o database/util.o +blerg_a_objects = database/database.o database/tags.o database/util.o database/subscription.o common/stringbucket.o blergtool_objects = tools/blergtool.o blerg.a blerglatest_objects = tools/blerglatest.o blerg.a common/json.o rss_objects = cgi/rss.o cgi/canned_responses.o common/app.o common/escapery.o blerg.a diff --git a/common/stringbucket.c b/common/stringbucket.c new file mode 100644 index 0000000..ec827d1 --- /dev/null +++ b/common/stringbucket.c @@ -0,0 +1,112 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define STRINGBUCKET_STRINGSIZE 64 + +const char * strnchr(const char *s, int c, int n) { + const char *ptr = s; + while (n > 0) { + if (*ptr == c) + return ptr; + ptr++; + n--; + } + return NULL; +} + +struct stringbucket * stringbucket_open(const char *filename) { + struct stat st; + struct stringbucket *obj = malloc(sizeof(struct stringbucket)); + + if (obj == NULL) { + perror("stringbucket allocate"); + return NULL; + } + + obj->fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600); + flock(obj->fd, LOCK_SH); + fstat(obj->fd, &st); + flock(obj->fd, LOCK_UN); + obj->size = st.st_size; + obj->list = mmap(NULL, obj->size, PROT_READ | PROT_WRITE, MAP_SHARED, obj->fd, 0); + if (obj->list == NULL) { + perror("stringbucket mmap"); + close(obj->fd); + free(obj); + return 0; + } + + return obj; +} + +void stringbucket_close(struct stringbucket *sb) { + munmap(sb->list, sb->size); + close(sb->fd); + free(sb); +} + +int stringbucket_find(struct stringbucket *sb, const char *string) { + char * end = sb->list + sb->size; + int string_len = strlen(string); + + char * ptr = sb->list; + while (ptr < end) { + char * next = (char *) strnchr(ptr, '\n', end - ptr); + if (next == NULL) + next = end; + int len = next - ptr; + if (len > STRINGBUCKET_STRINGSIZE) + len = STRINGBUCKET_STRINGSIZE; + if (memcmp(ptr, string, (len < string_len ? string_len : len)) == 0) { + return (ptr - sb->list); + } + ptr = next + 1; + } + + return -1; +} + +int stringbucket_add(struct stringbucket *sb, const char *string) { + if (stringbucket_find(sb, string) != -1) return 0; + flock(sb->fd, LOCK_EX); + write(sb->fd, string, strlen(string)); + write(sb->fd, "\n", 1); + flock(sb->fd, LOCK_UN); + return 1; +} + +int stringbucket_delete(struct stringbucket *sb, const char *string) { + int pos = stringbucket_find(sb, string); + if (pos == -1) return 0; + + /* We doin' it DOS style! */ + sb->list[pos] = 0; +} + +void stringbucket_iterate(struct stringbucket *sb, void (*iter)(char *, void *), void *stuff) { + char string[STRINGBUCKET_STRINGSIZE + 1]; + char * ptr = sb->list; + char * end = sb->list + sb->size; + + while (ptr < end) { + char * next = (char *) strnchr(ptr, '\n', end - ptr); + if (next == NULL) + next = end; + if (ptr[0] == 0) { + ptr = next + 1; + continue; + } + int len = next - ptr; + memcpy(string, ptr, len); + string[len] = 0; + iter(string, stuff); + ptr = next + 1; + } +} diff --git a/common/stringbucket.h b/common/stringbucket.h new file mode 100644 index 0000000..5594bd8 --- /dev/null +++ b/common/stringbucket.h @@ -0,0 +1,17 @@ +#ifndef _STRINGBUCKET_H +#define _STRINGBUCKET_H + +struct stringbucket { + int fd; + char * list; + int size; +}; + +struct stringbucket * stringbucket_open(const char *filename); +void stringbucket_close(struct stringbucket *sb); +int stringbucket_find(struct stringbucket *sb, const char *string); +int stringbucket_add(struct stringbucket *sb, const char *string); +int stringbucket_delete(struct stringbucket *sb, const char *string); +void stringbucket_iterate(struct stringbucket *sb, void (*iter)(char *string, void *stuff), void *stuff); + +#endif //_STRINGBUCKET_H diff --git a/database/subscription.c b/database/subscription.c new file mode 100644 index 0000000..e6ae94a --- /dev/null +++ b/database/subscription.c @@ -0,0 +1,60 @@ +#include +#include +#include +#include +#include +#include "subscription.h" +#include "stringbucket.h" +#include "config.h" + +int subscription_add(const char *from, const char *to) { + char filename[512]; + struct stringbucket * sb; + + snprintf(filename, 512, "%s/%s/subscriptions", DATA_PATH, from); + sb = stringbucket_open(filename); + stringbucket_add(sb, to); + stringbucket_close(sb); + + snprintf(filename, 512, "%s/%s/subscribers", DATA_PATH, to); + sb = stringbucket_open(filename); + stringbucket_add(sb, from); + stringbucket_close(sb); +} + +int subscription_remove(const char *from, const char *to) { + char filename[512]; + struct stringbucket * sb; + + snprintf(filename, 512, "%s/%s/subscriptions", DATA_PATH, from); + sb = stringbucket_open(filename); + stringbucket_delete(sb, to); + stringbucket_close(sb); + + snprintf(filename, 512, "%s/%s/subscribers", DATA_PATH, to); + sb = stringbucket_open(filename); + stringbucket_delete(sb, from); + stringbucket_close(sb); +} + +void subscription_notify_add_item(char *to, void *stuff) { + char filename[512]; + + snprintf(filename, 512, "%s/%s/subscription_feed", DATA_PATH, to); + int fd = open(filename, O_WRONLY | O_APPEND | O_CREAT); + write(fd, stuff, sizeof(struct subscription_record)); + close(fd); +} + +int subscription_notify(const char *author, uint64_t record) { + char filename[512]; + struct subscription_record r; + + strncpy(r.author, author, 32); + r.record = record; + + snprintf(filename, 512, "%s/%s/subscribers", DATA_PATH, author); + struct stringbucket * sb = stringbucket_open(filename); + stringbucket_iterate(sb, subscription_notify_add_item, &r); + stringbucket_close(sb); +} diff --git a/database/subscription.h b/database/subscription.h new file mode 100644 index 0000000..3d8ce4f --- /dev/null +++ b/database/subscription.h @@ -0,0 +1,14 @@ +#ifndef _SUBSCRIPTION_H +#define _SUBSCRIPTION_H + +#include + +struct subscription_record { + char author[32]; + uint64_t record; +}; + +int subscription_add(const char *from, const char *to); +int subscription_remove(const char *from, const char *to); + +#endif /* _SUBSCRIPTION_H */ diff --git a/http/http_blerg.c b/http/http_blerg.c index 025fbb0..a6d3a5f 100644 --- a/http/http_blerg.c +++ b/http/http_blerg.c @@ -28,6 +28,12 @@ struct put_state { int data_size; }; +struct subscribe_state { + struct MHD_PostProcessor *pp; + char username[33]; + char to[33]; +}; + struct get_state { struct blerg *b; yajl_gen g; @@ -209,6 +215,22 @@ int POST_put_iterator(void *cls, enum MHD_ValueKind kind, const char *key, const return MHD_YES; } +int POST_subscribe_iterator(void *cls, enum MHD_ValueKind kind, const char *key, const char *filename, const char *content_type, const char *transfer_encoding, const char *data, uint64_t off, size_t size) { + struct subscribe_state *ss = cls; + + if (strncmp(key, "username", 9) == 0) { + if (size > 32) size = 32; + memcpy(ss->username, data, size); + ss->username[size] = 0; + } else if (strncmp(key, "to", 3) == 0) { + if (size > 32) size = 32; + memcpy(ss->to, data, size); + ss->to[size] = 0; + } + + return MHD_YES; +} + struct MHD_Response *create_response_for_range(struct blerg *b, uint64_t from, uint64_t to) { struct MHD_Response *response; struct get_state *gs = malloc(sizeof(struct get_state)); @@ -535,6 +557,33 @@ ahc_derp (void *cls, struct MHD_Connection *connection, const char *url, const c } else { return respond_JSON_Failure(connection); } + } else if (strncmp(url, "/subscribe", 11) == 0) { + struct subscribe_state *ss = (struct subscribe_state *) *ptr; + + if (ss == NULL) { + if (strcmp(method, MHD_HTTP_METHOD_POST) != 0) + return respond_405(connection); + + struct subscribe_state *ss = malloc(sizeof(struct subscribe_state)); + ss->username[0] = ss->to[0] = 0; + ss->pp = MHD_create_post_processor(connection, 1024, &POST_subscribe_iterator, ss); + *ptr = ss; + return MHD_YES; + } + + if (*upload_data_size) { + MHD_post_process(ss->pp, upload_data, *upload_data_size); + *upload_data_size = 0; + return MHD_YES; + } + + const char *given_token = MHD_lookup_connection_value(connection, MHD_COOKIE_KIND, "auth"); + if (auth_check_token(ss->username, given_token)) { + subscription_add(ss->username, ss->to); + return respond_JSON_Success(connection); + } else { + return respond_JSON_Failure(connection); + } } else { return respond_404(connection); } -- 2.25.1