Add subscription support to perl lib
[blerg.git] / database / database.c
1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2  * BSD-style license.  Please see the COPYING file for details.
3  */
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <stdio.h>
8 #include <time.h>
9 #include <unistd.h>
10 #include <sys/stat.h>
11 #include <sys/types.h>
12 #include <sys/mman.h>
13 #include <sys/file.h>
14 #include <fcntl.h>
15 #include "database.h"
16 #include "subscription.h"
17 #include "util.h"
18 #include "config.h"
19
20 #define CHECK_VALID_BLERG(r)                               \
21         if (blerg == NULL) {                               \
22                 fprintf(stderr, "Invalid struct blerg\n"); \
23                 return r;                                  \
24         }
25
26 uint64_t blerg_get_record_count(struct blerg *blerg) {
27         uint64_t count;
28         flock(blerg->meta_fd, LOCK_SH);
29         count = blerg->meta->sequence;
30         flock(blerg->meta_fd, LOCK_UN);
31         return count;
32 }
33
34 /* Returns last usable record */
35 uint64_t blerg_increment_record_count(struct blerg *blerg) {
36         uint64_t count;
37         flock(blerg->meta_fd, LOCK_EX);
38         count = blerg->meta->sequence++;
39         flock(blerg->meta_fd, LOCK_UN);
40         return count;
41 }
42
43 void blerg_segment_close(struct blerg *blerg) {
44         if (blerg->data != NULL)
45                 munmap((void *)blerg->data, blerg->data_size);
46         if (blerg->data_fd != -1)
47                 close(blerg->data_fd);
48         if (blerg->index != NULL)
49                 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
50         if (blerg->index_fd != -1)
51                 close(blerg->index_fd);
52 }
53
54 int blerg_remap_data(struct blerg *blerg) {
55         struct stat st;
56
57         if (blerg->data != NULL)
58                 munmap(blerg->data, blerg->data_size);
59         fstat(blerg->data_fd, &st);
60         blerg->data_size = st.st_size;
61         if (blerg->data_size == 0) {
62                 /* Can't map an empty data file. */
63                 return 1;
64         }
65         blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
66         if (blerg->data == MAP_FAILED) {
67                 perror("Could not remap data");
68                 return 0;
69         }
70         return 1;
71 }
72
73 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
74         char filename[512];
75         uint64_t max_sequence_no = blerg_get_record_count(blerg);
76         struct stat st;
77
78         if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
79                 return 1;
80         }
81
82         if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
83                 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
84                 return 0;
85         }
86         if (new_segment < 0) {
87                 fprintf(stderr, "Cannot switch to negative segment\n");
88                 return 0;
89         }
90
91         blerg_segment_close(blerg);
92
93         /* Load and map the index */
94         snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
95         blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
96         if (blerg->index_fd == -1) {
97                 perror("Could not open index");
98                 goto open_failed_index_open;
99         }
100         flock(blerg->index_fd, LOCK_EX);
101         fstat(blerg->index_fd, &st);
102         if (st.st_size == 0) {
103                 /* ftruncate() means never having to say you're sorry.  Sorry
104                    in this case meaning "allocating disk space for a 1MB file
105                    full or zeroes". */
106                 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
107         }
108         flock(blerg->index_fd, LOCK_UN);
109
110         blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
111         if (blerg->index == MAP_FAILED) {
112                 perror("Could not mmap index");
113                 goto open_failed_index_mmap;
114         }
115
116         /* Load data file */
117         sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
118         blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
119         if (blerg->data_fd == -1) {
120                 perror("Could not open data");
121                 goto open_failed_data_open;
122         }
123
124         if (!blerg_remap_data(blerg)) {
125                 goto open_failed_data_mmap;
126         }
127
128         blerg->current_segment = new_segment;
129
130         return 1;
131
132 open_failed_data_mmap:
133         close(blerg->data_fd);
134 open_failed_data_open:
135         munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
136 open_failed_index_mmap:
137         close(blerg->index_fd);
138 open_failed_index_open:
139         return 0;
140 }
141
142 int blerg_exists(const char *name) {
143         int namelen = strlen(name);
144         char filename[512];
145
146         if (!valid_name(name)) {
147                 fprintf(stderr, "Invalid name\n");
148                 return 0;
149         }
150
151         snprintf(filename, 512, "%s/%s", DATA_PATH, name);
152         if (access(filename, F_OK) == -1)
153                 return 0;
154         else
155                 return 1;
156 }
157
158 struct blerg *blerg_open(const char *name) {
159         int namelen = strlen(name);
160         char filename[512];
161         struct stat st;
162         uint64_t sequence;
163
164         if (!valid_name(name)) {
165                 fprintf(stderr, "Invalid name\n");
166                 return NULL;
167         }
168         struct blerg *blerg = malloc(sizeof(struct blerg));
169         if (!blerg) {
170                 perror("Cannot allocate memory for blerg");
171                 goto open_failed_blerg_malloc;
172         }
173         blerg->name = malloc(namelen + 1);
174         memcpy(blerg->name, name, namelen + 1);
175         blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
176         blerg->meta = NULL;
177         blerg->index = NULL;
178         blerg->data = NULL;
179
180         /* Make the directory if it doesn't exist */
181         blerg->base_path = malloc(512);
182         snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
183         if (access(blerg->base_path, F_OK) == -1)
184                 mkdir(blerg->base_path, 0755);
185
186         /* Open and map metadata */
187         snprintf(filename, 512, "%s/meta", blerg->base_path);
188         blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
189         if (blerg->meta_fd == -1) {
190                 perror("Could not open metadata");
191                 goto open_failed_meta_open;
192         }
193         fstat(blerg->meta_fd, &st);
194         if (st.st_size < sizeof(struct meta)) {
195                 /* Extend the file if sizeof(struct meta) is larger than the
196                    file. This allows seamless upgrades as long as struct meta
197                    only adds members. */
198                 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
199         }
200         blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
201         if (blerg->meta == MAP_FAILED) {
202                 perror("Could not map metadata");
203                 goto open_failed_meta_mmap;
204         }
205
206         /* Open and map index and data for the current segment */
207         blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
208         if (!blerg_segment_switch(blerg, blerg->current_segment)) {
209                 fprintf(stderr, "Could not switch segment\n");
210                 goto open_failed_segment_switch;
211         }
212
213         return blerg;
214
215 open_failed_segment_switch:
216         munmap((void *)blerg->meta, sizeof(struct meta));
217 open_failed_meta_mmap:
218         close(blerg->meta_fd);
219 open_failed_meta_open:
220         free(blerg->name);
221         free(blerg);
222 open_failed_blerg_malloc:
223         return NULL;
224 }
225
226 int blerg_close(struct blerg *blerg) {
227         CHECK_VALID_BLERG(0)
228         blerg_segment_close(blerg);
229         munmap((void *)blerg->meta, sizeof(struct meta));
230         close(blerg->meta_fd);
231         free(blerg->base_path);
232         free(blerg->name);
233         free(blerg);
234         return 1;
235 }
236
237 int blerg_store(struct blerg *blerg, const char *data, int len) {
238         struct stat st;
239         int n;
240
241         CHECK_VALID_BLERG(-1)
242
243         if (len > MAX_RECORD_SIZE || len <= 0) {
244                 fprintf(stderr, "len out of bounds\n");
245                 return -1;
246         }
247
248         flock(blerg->index_fd, LOCK_EX);
249         flock(blerg->data_fd, LOCK_EX);
250
251         uint64_t record = blerg_get_record_count(blerg);
252         if (record == -1) {  /* Intentional signed-unsigned coercion */
253                 fprintf(stderr, "Could not find free record\n");
254                 return -1;
255         }
256         int segment = record / RECORDS_PER_SEGMENT;
257         if (segment != blerg->current_segment)
258                 blerg_segment_switch(blerg, segment);
259         int seg_rec = record % RECORDS_PER_SEGMENT;
260
261         /* Get the position for the new data */
262         fstat(blerg->data_fd, &st);
263         int curpos = st.st_size;
264
265         /* Write data to the data log */
266         n = write(blerg->data_fd, data, len);
267         if (n < len) {
268                 perror("Could not write data");
269                 /* Truncate anything we may have written */
270                 ftruncate(blerg->data_fd, curpos);
271                 return -1;
272         }
273
274         /* Update the index */
275         blerg->index[seg_rec].flags = 0x0001;
276         blerg->index[seg_rec].offset = curpos;
277         blerg->index[seg_rec].length = len;
278         blerg->index[seg_rec].timestamp = time(NULL);
279
280         /* And finally increment the record count */
281         blerg_increment_record_count(blerg);
282
283         flock(blerg->data_fd, LOCK_UN);
284         flock(blerg->index_fd, LOCK_UN);
285
286         if (!blerg_get_mute(blerg)) {
287                 /* Now do those dirty microblogging deeds */
288                 tag_scan(blerg->name, data, len, record);
289                 subscription_notify(blerg->name, record);
290         }
291
292         return record;
293 }
294
295 int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
296         CHECK_VALID_BLERG(0)
297         if (record < 0 || record >= blerg_get_record_count(blerg)) {
298                 fprintf(stderr, "Invalid record\n");
299                 return 0;
300         }
301         if (data == NULL || length == NULL) {
302                 fprintf(stderr, "data or length is null\n");
303                 return 0;
304         }
305
306         int segment = record / RECORDS_PER_SEGMENT;
307         if (segment != blerg->current_segment)
308                 blerg_segment_switch(blerg, segment);
309         int seg_rec = record % RECORDS_PER_SEGMENT;
310
311         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
312                 fprintf(stderr, "Invalid record\n");
313                 return 0;
314         }
315
316         int rec_offset = blerg->index[seg_rec].offset;
317         int rec_length = blerg->index[seg_rec].length;
318         if (rec_offset >= blerg->data_size) {
319                 /* We're accessing an out-of-bounds record in our mmap.  Remap
320                    and recheck. */
321                 if (!blerg_remap_data(blerg)) {
322                         return 0;
323                 }
324                 if (rec_offset >= blerg->data_size) {
325                         fprintf(stderr, "Record offset outside of data!?");
326                         return 0;
327                 }
328         }
329
330         *data = malloc(rec_length);
331         if (*data == NULL) {
332                 perror("Could not allocate string in fetch");
333                 return 0;
334         }
335
336         memcpy(*data, blerg->data + rec_offset, rec_length);
337
338         *length = rec_length;
339
340         return 1;
341 }
342
343 time_t blerg_get_timestamp(struct blerg *blerg, int record) {
344         CHECK_VALID_BLERG(0)
345         if (record < 0 || record >= blerg_get_record_count(blerg)) {
346                 fprintf(stderr, "Invalid record\n");
347                 return 0;
348         }
349
350         int segment = record / RECORDS_PER_SEGMENT;
351         if (segment != blerg->current_segment)
352                 blerg_segment_switch(blerg, segment);
353         int seg_rec = record % RECORDS_PER_SEGMENT;
354
355         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
356                 fprintf(stderr, "Invalid record\n");
357                 return 0;
358         }
359
360         return blerg->index[seg_rec].timestamp;
361 }
362
363 int blerg_set_subscription_mark(struct blerg *blerg) {
364         CHECK_VALID_BLERG(0)
365         blerg->meta->subscription_mark = subscription_count_items(blerg->name);
366         return 1;
367 }
368
369 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
370         CHECK_VALID_BLERG(0)
371         return blerg->meta->subscription_mark;
372 }
373
374 int blerg_set_mute(struct blerg *blerg, int v) {
375         CHECK_VALID_BLERG(0)
376         if (v) {
377                 blerg->meta->status |= BLERGMETA_MUTED;
378         } else {
379                 blerg->meta->status &= ~BLERGMETA_MUTED;
380         }
381         return 1;
382 }
383
384 int blerg_get_mute(struct blerg *blerg) {
385         CHECK_VALID_BLERG(0)
386         return (blerg->meta->status & BLERGMETA_MUTED) > 0;
387 }