Add email backend CGI
[blerg.git] / database / database.c
1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2  * BSD-style license.  Please see the COPYING file for details.
3  */
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <stdio.h>
8 #include <time.h>
9 #include <unistd.h>
10 #include <sys/stat.h>
11 #include <sys/types.h>
12 #include <sys/mman.h>
13 #include <sys/file.h>
14 #include <fcntl.h>
15 #include "database.h"
16 #include "configuration.h"
17 #include "subscription.h"
18 #include "tags.h"
19 #include "util.h"
20 #include "config.h"
21
22 #define CHECK_VALID_BLERG(r)                               \
23         if (blerg == NULL) {                               \
24                 fprintf(stderr, "Invalid struct blerg\n"); \
25                 return r;                                  \
26         }
27
28 int blerg_init() {
29         if (!blerg_configuration_init()) {
30                 return 0;
31         }
32         return 1;
33 }
34
35 uint64_t blerg_get_record_count(struct blerg *blerg) {
36         uint64_t count;
37         flock(blerg->meta_fd, LOCK_SH);
38         count = blerg->meta->sequence;
39         flock(blerg->meta_fd, LOCK_UN);
40         return count;
41 }
42
43 /* Returns last usable record */
44 uint64_t blerg_increment_record_count(struct blerg *blerg) {
45         uint64_t count;
46         flock(blerg->meta_fd, LOCK_EX);
47         count = blerg->meta->sequence++;
48         flock(blerg->meta_fd, LOCK_UN);
49         return count;
50 }
51
52 void blerg_segment_close(struct blerg *blerg) {
53         if (blerg->data != NULL)
54                 munmap((void *)blerg->data, blerg->data_size);
55         if (blerg->data_fd != -1)
56                 close(blerg->data_fd);
57         if (blerg->index != NULL)
58                 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
59         if (blerg->index_fd != -1)
60                 close(blerg->index_fd);
61 }
62
63 int blerg_remap_data(struct blerg *blerg) {
64         struct stat st;
65
66         if (blerg->data != NULL)
67                 munmap(blerg->data, blerg->data_size);
68         fstat(blerg->data_fd, &st);
69         blerg->data_size = st.st_size;
70         if (blerg->data_size == 0) {
71                 /* Can't map an empty data file. */
72                 return 1;
73         }
74         blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
75         if (blerg->data == MAP_FAILED) {
76                 perror("Could not remap data");
77                 return 0;
78         }
79         return 1;
80 }
81
82 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
83         char filename[FILENAME_MAX];
84         uint64_t max_sequence_no = blerg_get_record_count(blerg);
85         struct stat st;
86
87         if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
88                 return 1;
89         }
90
91         if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
92                 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
93                 return 0;
94         }
95         if (new_segment < 0) {
96                 fprintf(stderr, "Cannot switch to negative segment\n");
97                 return 0;
98         }
99
100         blerg_segment_close(blerg);
101
102         /* Load and map the index */
103         snprintf(filename, FILENAME_MAX, "%s/index%d", blerg->base_path, new_segment);
104         blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
105         if (blerg->index_fd == -1) {
106                 perror("Could not open index");
107                 goto open_failed_index_open;
108         }
109         flock(blerg->index_fd, LOCK_EX);
110         fstat(blerg->index_fd, &st);
111         if (st.st_size == 0) {
112                 /* ftruncate() means never having to say you're sorry.  Sorry
113                    in this case meaning "allocating disk space for a 1MB file
114                    full or zeroes". */
115                 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
116         }
117         flock(blerg->index_fd, LOCK_UN);
118
119         blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
120         if (blerg->index == MAP_FAILED) {
121                 perror("Could not mmap index");
122                 goto open_failed_index_mmap;
123         }
124
125         /* Load data file */
126         snprintf(filename, FILENAME_MAX, "%s/data%d", blerg->base_path, new_segment);
127         blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
128         if (blerg->data_fd == -1) {
129                 perror("Could not open data");
130                 goto open_failed_data_open;
131         }
132
133         if (!blerg_remap_data(blerg)) {
134                 goto open_failed_data_mmap;
135         }
136
137         blerg->current_segment = new_segment;
138
139         return 1;
140
141 open_failed_data_mmap:
142         close(blerg->data_fd);
143 open_failed_data_open:
144         munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
145 open_failed_index_mmap:
146         close(blerg->index_fd);
147 open_failed_index_open:
148         return 0;
149 }
150
151 int blerg_exists(const char *name) {
152         char filename[FILENAME_MAX];
153
154         if (!valid_name(name)) {
155                 fprintf(stderr, "Invalid name\n");
156                 return 0;
157         }
158
159         snprintf(filename, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
160         if (access(filename, F_OK) == -1)
161                 return 0;
162         else
163                 return 1;
164 }
165
166 struct blerg *blerg_open(const char *name) {
167         int namelen = strlen(name);
168         char filename[FILENAME_MAX];
169         struct stat st;
170
171         if (!valid_name(name)) {
172                 fprintf(stderr, "Invalid name\n");
173                 return NULL;
174         }
175         struct blerg *blerg = malloc(sizeof(struct blerg));
176         if (!blerg) {
177                 perror("Cannot allocate memory for blerg");
178                 goto open_failed_blerg_malloc;
179         }
180         blerg->name = malloc(namelen + 1);
181         memcpy(blerg->name, name, namelen + 1);
182         blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
183         blerg->meta = NULL;
184         blerg->index = NULL;
185         blerg->data = NULL;
186
187         /* Make the directory if it doesn't exist */
188         blerg->base_path = malloc(FILENAME_MAX);
189         snprintf(blerg->base_path, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
190         if (access(blerg->base_path, F_OK) == -1)
191                 mkdir(blerg->base_path, 0755);
192
193         /* Open and map metadata */
194         snprintf(filename, FILENAME_MAX, "%s/meta", blerg->base_path);
195         blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
196         if (blerg->meta_fd == -1) {
197                 perror("Could not open metadata");
198                 goto open_failed_meta_open;
199         }
200         fstat(blerg->meta_fd, &st);
201         if (st.st_size < sizeof(struct meta)) {
202                 /* Extend the file if sizeof(struct meta) is larger than the
203                    file. This allows seamless upgrades as long as struct meta
204                    only adds members. */
205                 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
206         }
207         blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
208         if (blerg->meta == MAP_FAILED) {
209                 perror("Could not map metadata");
210                 goto open_failed_meta_mmap;
211         }
212
213         /* Open and map index and data for the current segment */
214         blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
215         if (!blerg_segment_switch(blerg, blerg->current_segment)) {
216                 fprintf(stderr, "Could not switch segment\n");
217                 goto open_failed_segment_switch;
218         }
219
220         return blerg;
221
222 open_failed_segment_switch:
223         munmap((void *)blerg->meta, sizeof(struct meta));
224 open_failed_meta_mmap:
225         close(blerg->meta_fd);
226 open_failed_meta_open:
227         free(blerg->name);
228         free(blerg);
229 open_failed_blerg_malloc:
230         return NULL;
231 }
232
233 int blerg_close(struct blerg *blerg) {
234         CHECK_VALID_BLERG(0)
235         blerg_segment_close(blerg);
236         munmap((void *)blerg->meta, sizeof(struct meta));
237         close(blerg->meta_fd);
238         free(blerg->base_path);
239         free(blerg->name);
240         free(blerg);
241         return 1;
242 }
243
244 uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
245         struct stat st;
246         int n;
247
248         CHECK_VALID_BLERG(BLERG_INVALID_RECORD)
249
250         if (length > MAX_RECORD_SIZE || length <= 0) {
251                 fprintf(stderr, "length out of bounds\n");
252                 return BLERG_INVALID_RECORD;
253         }
254
255         flock(blerg->index_fd, LOCK_EX);
256         flock(blerg->data_fd, LOCK_EX);
257
258         uint64_t record = blerg_get_record_count(blerg);
259         if (record == -1) {  /* Intentional signed-unsigned coercion */
260                 fprintf(stderr, "Could not find free record\n");
261                 return BLERG_INVALID_RECORD;
262         }
263         int segment = record / RECORDS_PER_SEGMENT;
264         if (segment != blerg->current_segment)
265                 blerg_segment_switch(blerg, segment);
266         int seg_rec = record % RECORDS_PER_SEGMENT;
267
268         /* Get the position for the new data */
269         fstat(blerg->data_fd, &st);
270         int curpos = st.st_size;
271
272         /* Write data to the data log */
273         n = write(blerg->data_fd, data, length);
274         if (n < length) {
275                 perror("Could not write data");
276                 /* Truncate anything we may have written */
277                 ftruncate(blerg->data_fd, curpos);
278                 return BLERG_INVALID_RECORD;
279         }
280
281         /* Update the index */
282         blerg->index[seg_rec].flags = 0x0001;
283         blerg->index[seg_rec].offset = curpos;
284         blerg->index[seg_rec].length = length;
285         blerg->index[seg_rec].timestamp = time(NULL);
286
287         /* And finally increment the record count */
288         blerg_increment_record_count(blerg);
289
290         flock(blerg->data_fd, LOCK_UN);
291         flock(blerg->index_fd, LOCK_UN);
292
293         if (!blerg_get_status(blerg, BLERGSTATUS_MUTED)) {
294                 /* Now do those dirty microblogging deeds */
295                 tag_scan(blerg->name, data, length, record);
296                 subscription_notify(blerg->name, record);
297         }
298
299         return record;
300 }
301
302 int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
303         CHECK_VALID_BLERG(0)
304         if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
305                 fprintf(stderr, "Invalid record\n");
306                 return 0;
307         }
308         if (data == NULL || length == NULL) {
309                 fprintf(stderr, "data or length is null\n");
310                 return 0;
311         }
312
313         int segment = record / RECORDS_PER_SEGMENT;
314         if (segment != blerg->current_segment)
315                 blerg_segment_switch(blerg, segment);
316         int seg_rec = record % RECORDS_PER_SEGMENT;
317
318         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
319                 fprintf(stderr, "Invalid record\n");
320                 return 0;
321         }
322
323         int rec_offset = blerg->index[seg_rec].offset;
324         int rec_length = blerg->index[seg_rec].length;
325         if (rec_offset >= blerg->data_size) {
326                 /* We're accessing an out-of-bounds record in our mmap.  Remap
327                    and recheck. */
328                 if (!blerg_remap_data(blerg)) {
329                         return 0;
330                 }
331                 if (rec_offset >= blerg->data_size) {
332                         fprintf(stderr, "Record offset outside of data!?");
333                         return 0;
334                 }
335         }
336
337         *data = malloc(rec_length);
338         if (*data == NULL) {
339                 perror("Could not allocate string in fetch");
340                 return 0;
341         }
342
343         memcpy(*data, blerg->data + rec_offset, rec_length);
344
345         *length = rec_length;
346
347         return 1;
348 }
349
350 time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
351         CHECK_VALID_BLERG(0)
352         if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
353                 fprintf(stderr, "Invalid record\n");
354                 return 0;
355         }
356
357         int segment = record / RECORDS_PER_SEGMENT;
358         if (segment != blerg->current_segment)
359                 blerg_segment_switch(blerg, segment);
360         int seg_rec = record % RECORDS_PER_SEGMENT;
361
362         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
363                 fprintf(stderr, "Invalid record\n");
364                 return 0;
365         }
366
367         return blerg->index[seg_rec].timestamp;
368 }
369
370 int blerg_set_subscription_mark(struct blerg *blerg) {
371         CHECK_VALID_BLERG(0)
372         blerg->meta->subscription_mark = subscription_count_items(blerg->name);
373         return 1;
374 }
375
376 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
377         CHECK_VALID_BLERG(0)
378         return blerg->meta->subscription_mark;
379 }
380
381 int blerg_set_status(struct blerg *blerg, uint32_t status, int v) {
382         CHECK_VALID_BLERG(0)
383         if (v) {
384                 blerg->meta->status |= status;
385         } else {
386                 blerg->meta->status &= ~status;
387         }
388         return 1;
389 }
390
391 int blerg_get_status(struct blerg *blerg, uint32_t status) {
392         CHECK_VALID_BLERG(0)
393         return (blerg->meta->status & status) > 0;
394 }