Check paths in blerglatest
[blerg.git] / database / database.c
1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2  * BSD-style license.  Please see the COPYING file for details.
3  */
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <stdio.h>
8 #include <time.h>
9 #include <unistd.h>
10 #include <sys/stat.h>
11 #include <sys/types.h>
12 #include <sys/mman.h>
13 #include <sys/file.h>
14 #include <fcntl.h>
15 #include "database.h"
16 #include "configuration.h"
17 #include "subscription.h"
18 #include "util.h"
19 #include "config.h"
20
21 #define CHECK_VALID_BLERG(r)                               \
22         if (blerg == NULL) {                               \
23                 fprintf(stderr, "Invalid struct blerg\n"); \
24                 return r;                                  \
25         }
26
27 int blerg_init() {
28         if (!blerg_configuration_init()) {
29                 return 0;
30         }
31         return 1;
32 }
33
34 uint64_t blerg_get_record_count(struct blerg *blerg) {
35         uint64_t count;
36         flock(blerg->meta_fd, LOCK_SH);
37         count = blerg->meta->sequence;
38         flock(blerg->meta_fd, LOCK_UN);
39         return count;
40 }
41
42 /* Returns last usable record */
43 uint64_t blerg_increment_record_count(struct blerg *blerg) {
44         uint64_t count;
45         flock(blerg->meta_fd, LOCK_EX);
46         count = blerg->meta->sequence++;
47         flock(blerg->meta_fd, LOCK_UN);
48         return count;
49 }
50
51 void blerg_segment_close(struct blerg *blerg) {
52         if (blerg->data != NULL)
53                 munmap((void *)blerg->data, blerg->data_size);
54         if (blerg->data_fd != -1)
55                 close(blerg->data_fd);
56         if (blerg->index != NULL)
57                 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
58         if (blerg->index_fd != -1)
59                 close(blerg->index_fd);
60 }
61
62 int blerg_remap_data(struct blerg *blerg) {
63         struct stat st;
64
65         if (blerg->data != NULL)
66                 munmap(blerg->data, blerg->data_size);
67         fstat(blerg->data_fd, &st);
68         blerg->data_size = st.st_size;
69         if (blerg->data_size == 0) {
70                 /* Can't map an empty data file. */
71                 return 1;
72         }
73         blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
74         if (blerg->data == MAP_FAILED) {
75                 perror("Could not remap data");
76                 return 0;
77         }
78         return 1;
79 }
80
81 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
82         char filename[512];
83         uint64_t max_sequence_no = blerg_get_record_count(blerg);
84         struct stat st;
85
86         if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
87                 return 1;
88         }
89
90         if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
91                 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
92                 return 0;
93         }
94         if (new_segment < 0) {
95                 fprintf(stderr, "Cannot switch to negative segment\n");
96                 return 0;
97         }
98
99         blerg_segment_close(blerg);
100
101         /* Load and map the index */
102         snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
103         blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
104         if (blerg->index_fd == -1) {
105                 perror("Could not open index");
106                 goto open_failed_index_open;
107         }
108         flock(blerg->index_fd, LOCK_EX);
109         fstat(blerg->index_fd, &st);
110         if (st.st_size == 0) {
111                 /* ftruncate() means never having to say you're sorry.  Sorry
112                    in this case meaning "allocating disk space for a 1MB file
113                    full or zeroes". */
114                 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
115         }
116         flock(blerg->index_fd, LOCK_UN);
117
118         blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
119         if (blerg->index == MAP_FAILED) {
120                 perror("Could not mmap index");
121                 goto open_failed_index_mmap;
122         }
123
124         /* Load data file */
125         sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
126         blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
127         if (blerg->data_fd == -1) {
128                 perror("Could not open data");
129                 goto open_failed_data_open;
130         }
131
132         if (!blerg_remap_data(blerg)) {
133                 goto open_failed_data_mmap;
134         }
135
136         blerg->current_segment = new_segment;
137
138         return 1;
139
140 open_failed_data_mmap:
141         close(blerg->data_fd);
142 open_failed_data_open:
143         munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
144 open_failed_index_mmap:
145         close(blerg->index_fd);
146 open_failed_index_open:
147         return 0;
148 }
149
150 int blerg_exists(const char *name) {
151         int namelen = strlen(name);
152         char filename[512];
153
154         if (!valid_name(name)) {
155                 fprintf(stderr, "Invalid name\n");
156                 return 0;
157         }
158
159         snprintf(filename, 512, "%s/%s", blergconf.data_path, name);
160         if (access(filename, F_OK) == -1)
161                 return 0;
162         else
163                 return 1;
164 }
165
166 struct blerg *blerg_open(const char *name) {
167         int namelen = strlen(name);
168         char filename[512];
169         struct stat st;
170         uint64_t sequence;
171
172         if (!valid_name(name)) {
173                 fprintf(stderr, "Invalid name\n");
174                 return NULL;
175         }
176         struct blerg *blerg = malloc(sizeof(struct blerg));
177         if (!blerg) {
178                 perror("Cannot allocate memory for blerg");
179                 goto open_failed_blerg_malloc;
180         }
181         blerg->name = malloc(namelen + 1);
182         memcpy(blerg->name, name, namelen + 1);
183         blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
184         blerg->meta = NULL;
185         blerg->index = NULL;
186         blerg->data = NULL;
187
188         /* Make the directory if it doesn't exist */
189         blerg->base_path = malloc(512);
190         snprintf(blerg->base_path, 512, "%s/%s", blergconf.data_path, name);
191         if (access(blerg->base_path, F_OK) == -1)
192                 mkdir(blerg->base_path, 0755);
193
194         /* Open and map metadata */
195         snprintf(filename, 512, "%s/meta", blerg->base_path);
196         blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
197         if (blerg->meta_fd == -1) {
198                 perror("Could not open metadata");
199                 goto open_failed_meta_open;
200         }
201         fstat(blerg->meta_fd, &st);
202         if (st.st_size < sizeof(struct meta)) {
203                 /* Extend the file if sizeof(struct meta) is larger than the
204                    file. This allows seamless upgrades as long as struct meta
205                    only adds members. */
206                 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
207         }
208         blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
209         if (blerg->meta == MAP_FAILED) {
210                 perror("Could not map metadata");
211                 goto open_failed_meta_mmap;
212         }
213
214         /* Open and map index and data for the current segment */
215         blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
216         if (!blerg_segment_switch(blerg, blerg->current_segment)) {
217                 fprintf(stderr, "Could not switch segment\n");
218                 goto open_failed_segment_switch;
219         }
220
221         return blerg;
222
223 open_failed_segment_switch:
224         munmap((void *)blerg->meta, sizeof(struct meta));
225 open_failed_meta_mmap:
226         close(blerg->meta_fd);
227 open_failed_meta_open:
228         free(blerg->name);
229         free(blerg);
230 open_failed_blerg_malloc:
231         return NULL;
232 }
233
234 int blerg_close(struct blerg *blerg) {
235         CHECK_VALID_BLERG(0)
236         blerg_segment_close(blerg);
237         munmap((void *)blerg->meta, sizeof(struct meta));
238         close(blerg->meta_fd);
239         free(blerg->base_path);
240         free(blerg->name);
241         free(blerg);
242         return 1;
243 }
244
245 uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
246         struct stat st;
247         int n;
248
249         CHECK_VALID_BLERG(BLERG_INVALID_RECORD)
250
251         if (length > MAX_RECORD_SIZE || length <= 0) {
252                 fprintf(stderr, "length out of bounds\n");
253                 return BLERG_INVALID_RECORD;
254         }
255
256         flock(blerg->index_fd, LOCK_EX);
257         flock(blerg->data_fd, LOCK_EX);
258
259         uint64_t record = blerg_get_record_count(blerg);
260         if (record == -1) {  /* Intentional signed-unsigned coercion */
261                 fprintf(stderr, "Could not find free record\n");
262                 return BLERG_INVALID_RECORD;
263         }
264         int segment = record / RECORDS_PER_SEGMENT;
265         if (segment != blerg->current_segment)
266                 blerg_segment_switch(blerg, segment);
267         int seg_rec = record % RECORDS_PER_SEGMENT;
268
269         /* Get the position for the new data */
270         fstat(blerg->data_fd, &st);
271         int curpos = st.st_size;
272
273         /* Write data to the data log */
274         n = write(blerg->data_fd, data, length);
275         if (n < length) {
276                 perror("Could not write data");
277                 /* Truncate anything we may have written */
278                 ftruncate(blerg->data_fd, curpos);
279                 return BLERG_INVALID_RECORD;
280         }
281
282         /* Update the index */
283         blerg->index[seg_rec].flags = 0x0001;
284         blerg->index[seg_rec].offset = curpos;
285         blerg->index[seg_rec].length = length;
286         blerg->index[seg_rec].timestamp = time(NULL);
287
288         /* And finally increment the record count */
289         blerg_increment_record_count(blerg);
290
291         flock(blerg->data_fd, LOCK_UN);
292         flock(blerg->index_fd, LOCK_UN);
293
294         if (!blerg_get_mute(blerg)) {
295                 /* Now do those dirty microblogging deeds */
296                 tag_scan(blerg->name, data, length, record);
297                 subscription_notify(blerg->name, record);
298         }
299
300         return record;
301 }
302
303 int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
304         CHECK_VALID_BLERG(0)
305         if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
306                 fprintf(stderr, "Invalid record\n");
307                 return 0;
308         }
309         if (data == NULL || length == NULL) {
310                 fprintf(stderr, "data or length is null\n");
311                 return 0;
312         }
313
314         int segment = record / RECORDS_PER_SEGMENT;
315         if (segment != blerg->current_segment)
316                 blerg_segment_switch(blerg, segment);
317         int seg_rec = record % RECORDS_PER_SEGMENT;
318
319         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
320                 fprintf(stderr, "Invalid record\n");
321                 return 0;
322         }
323
324         int rec_offset = blerg->index[seg_rec].offset;
325         int rec_length = blerg->index[seg_rec].length;
326         if (rec_offset >= blerg->data_size) {
327                 /* We're accessing an out-of-bounds record in our mmap.  Remap
328                    and recheck. */
329                 if (!blerg_remap_data(blerg)) {
330                         return 0;
331                 }
332                 if (rec_offset >= blerg->data_size) {
333                         fprintf(stderr, "Record offset outside of data!?");
334                         return 0;
335                 }
336         }
337
338         *data = malloc(rec_length);
339         if (*data == NULL) {
340                 perror("Could not allocate string in fetch");
341                 return 0;
342         }
343
344         memcpy(*data, blerg->data + rec_offset, rec_length);
345
346         *length = rec_length;
347
348         return 1;
349 }
350
351 time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
352         CHECK_VALID_BLERG(0)
353         if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
354                 fprintf(stderr, "Invalid record\n");
355                 return 0;
356         }
357
358         int segment = record / RECORDS_PER_SEGMENT;
359         if (segment != blerg->current_segment)
360                 blerg_segment_switch(blerg, segment);
361         int seg_rec = record % RECORDS_PER_SEGMENT;
362
363         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
364                 fprintf(stderr, "Invalid record\n");
365                 return 0;
366         }
367
368         return blerg->index[seg_rec].timestamp;
369 }
370
371 int blerg_set_subscription_mark(struct blerg *blerg) {
372         CHECK_VALID_BLERG(0)
373         blerg->meta->subscription_mark = subscription_count_items(blerg->name);
374         return 1;
375 }
376
377 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
378         CHECK_VALID_BLERG(0)
379         return blerg->meta->subscription_mark;
380 }
381
382 int blerg_set_mute(struct blerg *blerg, int v) {
383         CHECK_VALID_BLERG(0)
384         if (v) {
385                 blerg->meta->status |= BLERGMETA_MUTED;
386         } else {
387                 blerg->meta->status &= ~BLERGMETA_MUTED;
388         }
389         return 1;
390 }
391
392 int blerg_get_mute(struct blerg *blerg) {
393         CHECK_VALID_BLERG(0)
394         return (blerg->meta->status & BLERGMETA_MUTED) > 0;
395 }