Error when fetching tags and direction isn't 1 or -1
[blerg.git] / database / database.c
1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2  * BSD-style license.  Please see the COPYING file for details.
3  */
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <stdio.h>
8 #include <time.h>
9 #include <unistd.h>
10 #include <sys/stat.h>
11 #include <sys/types.h>
12 #include <sys/mman.h>
13 #include <sys/file.h>
14 #include <fcntl.h>
15 #include "database.h"
16 #include "subscription.h"
17 #include "util.h"
18 #include "config.h"
19
20 uint64_t blerg_get_record_count(struct blerg *blerg) {
21         uint64_t count;
22         flock(blerg->meta_fd, LOCK_SH);
23         count = blerg->meta->sequence;
24         flock(blerg->meta_fd, LOCK_UN);
25         return count;
26 }
27
28 /* Returns last usable record */
29 uint64_t blerg_increment_record_count(struct blerg *blerg) {
30         uint64_t count;
31         flock(blerg->meta_fd, LOCK_EX);
32         count = blerg->meta->sequence++;
33         flock(blerg->meta_fd, LOCK_UN);
34         return count;
35 }
36
37 void blerg_segment_close(struct blerg *blerg) {
38         if (blerg->data != NULL)
39                 munmap((void *)blerg->data, blerg->data_size);
40         if (blerg->data_fd != -1)
41                 close(blerg->data_fd);
42         if (blerg->index != NULL)
43                 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
44         if (blerg->index_fd != -1)
45                 close(blerg->index_fd);
46 }
47
48 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
49         char filename[512];
50         uint64_t max_sequence = blerg_get_record_count(blerg);
51         struct stat st;
52
53         if (new_segment > max_sequence / RECORDS_PER_SEGMENT) {
54                 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
55                 return 0;
56         }
57
58         blerg_segment_close(blerg);
59
60         /* Load and map the index */
61         snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
62         blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
63         if (blerg->index_fd == -1) {
64                 perror("Could not open index");
65                 goto open_failed_index_open;
66         }
67         flock(blerg->index_fd, LOCK_EX);
68         fstat(blerg->index_fd, &st);
69         if (st.st_size == 0) {
70                 int i;
71                 struct record r;
72                 memset((void *)&r, 0, sizeof(struct record));
73                 for (i = 0; i < RECORDS_PER_SEGMENT; i++) {
74                         write(blerg->index_fd, &r, sizeof(struct record));
75                 }
76         }
77         flock(blerg->index_fd, LOCK_UN);
78
79         blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
80         if (blerg->index == MAP_FAILED) {
81                 perror("Could not mmap index");
82                 goto open_failed_index_mmap;
83         }
84
85         /* Load data file */
86         sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
87         blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
88         fstat(blerg->data_fd, &st);
89         blerg->data_size = st.st_size;
90         if (blerg->data_fd == -1) {
91                 perror("Could not open data");
92                 goto open_failed_data_open;
93         }
94
95         if (blerg->data_size > 0) {
96                 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
97                 if (blerg->data == MAP_FAILED) {
98                         perror("Could not mmap data");
99                         goto open_failed_data_mmap;
100                 }
101         }
102
103         return 1;
104
105 open_failed_data_mmap:
106         close(blerg->data_fd);
107 open_failed_data_open:
108         munmap((void *)blerg->index, sizeof(RECORDS_PER_SEGMENT * sizeof(struct record)));
109 open_failed_index_mmap:
110         close(blerg->index_fd);
111 open_failed_index_open:
112         return 0;
113 }
114
115 int blerg_exists(const char *name) {
116         int namelen = strlen(name);
117         char filename[512];
118
119         if (!valid_name(name)) {
120                 fprintf(stderr, "Invalid name\n");
121                 return 0;
122         }
123
124         snprintf(filename, 512, "%s/%s", DATA_PATH, name);
125         if (access(filename, F_OK) == -1)
126                 return 0;
127         else
128                 return 1;
129 }
130
131 struct blerg *blerg_open(const char *name) {
132         int namelen = strlen(name);
133         char filename[512];
134         struct stat st;
135         uint64_t sequence;
136
137         if (!valid_name(name)) {
138                 fprintf(stderr, "Invalid name\n");
139                 return NULL;
140         }
141         struct blerg *blerg = malloc(sizeof(struct blerg));
142         if (!blerg) {
143                 perror("Cannot allocate memory for blerg");
144                 goto open_failed_blerg_malloc;
145         }
146         blerg->name = malloc(namelen + 1);
147         memcpy(blerg->name, name, namelen + 1);
148         blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
149         blerg->meta = NULL;
150         blerg->index = NULL;
151         blerg->data = NULL;
152
153         /* Make the directory if it doesn't exist */
154         blerg->base_path = malloc(512);
155         snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
156         if (access(blerg->base_path, F_OK) == -1)
157                 mkdir(blerg->base_path, 0755);
158
159         /* Open and map metadata */
160         snprintf(filename, 512, "%s/meta", blerg->base_path);
161         blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
162         if (blerg->meta_fd == -1) {
163                 perror("Could not open metadata");
164                 goto open_failed_meta_open;
165         }
166         fstat(blerg->meta_fd, &st);
167         if (st.st_size < sizeof(struct meta)) {
168                 // Fill the difference in size between sizeof(struct meta) and
169                 // the file size with nulls.  This allows seamless upgrades as
170                 // long as struct meta only adds members.
171                 int len = sizeof(struct meta) - st.st_size;
172                 char *buf = (char *) malloc(len);
173                 memset(buf, 0, len);
174                 int tmpfd = dup(blerg->meta_fd);
175                 FILE* tmp = fdopen(tmpfd, "a");
176                 fwrite(buf, len, 1, tmp);
177                 fclose(tmp);
178                 free(buf);
179         }
180         blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
181         if (blerg->meta == MAP_FAILED) {
182                 perror("Could not map metadata");
183                 goto open_failed_meta_mmap;
184         }
185
186         /* Open and map index and data for the current segment */
187         blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
188         if (!blerg_segment_switch(blerg, blerg->current_segment)) {
189                 fprintf(stderr, "Could not switch segment\n");
190                 goto open_failed_segment_switch;
191         }
192
193         return blerg;
194
195 open_failed_segment_switch:
196         munmap((void *)blerg->meta, sizeof(struct meta));
197 open_failed_meta_mmap:
198         close(blerg->meta_fd);
199 open_failed_meta_open:
200         free(blerg->name);
201         free(blerg);
202 open_failed_blerg_malloc:
203         return NULL;
204 }
205
206 int blerg_close(struct blerg *blerg) {
207         blerg_segment_close(blerg);
208         munmap((void *)blerg->meta, sizeof(struct meta));
209         close(blerg->meta_fd);
210         free(blerg->base_path);
211         free(blerg->name);
212         free(blerg);
213         return 1;
214 }
215
216 int blerg_store(struct blerg *blerg, const char *data, int len) {
217         if (len > MAX_RECORD_SIZE) {
218                 fprintf(stderr, "len > 64K\n");
219                 return -1;
220         }
221
222         flock(blerg->index_fd, LOCK_EX);
223         flock(blerg->data_fd, LOCK_EX);
224
225         uint64_t record = blerg_increment_record_count(blerg);
226         if (record == -1) {
227                 fprintf(stderr, "Could not find free record\n");
228                 return -1;
229         }
230         int segment = record / RECORDS_PER_SEGMENT;
231         if (segment != blerg->current_segment)
232                 blerg_segment_switch(blerg, segment);
233         int seg_rec = record % RECORDS_PER_SEGMENT;
234
235         /* Get the position for the new data */
236         FILE *datafile = fdopen(dup(blerg->data_fd), "a");
237         fseek(datafile, 0, SEEK_END);
238         int curpos = ftell(datafile);
239         fclose(datafile);
240
241         int bytes = 0;
242         do {
243                 int n = write(blerg->data_fd, data + bytes, len);
244                 if (n == -1) {
245                         perror("Could not write data");
246                         /* Truncate anything we may have written */
247                         ftruncate(blerg->data_fd, curpos);
248                         return -1;
249                 }
250                 bytes += n;
251         } while (bytes < len);
252         blerg->index[seg_rec].flags = 0x0001;
253         blerg->index[seg_rec].offset = curpos;
254         blerg->index[seg_rec].length = len;
255         blerg->index[seg_rec].timestamp = time(NULL);
256
257         flock(blerg->data_fd, LOCK_UN);
258         flock(blerg->index_fd, LOCK_UN);
259
260         tag_scan(blerg->name, data, len, record);
261         subscription_notify(blerg->name, record);
262
263         return record;
264 }
265
266 int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
267         if (record < 0) {
268                 fprintf(stderr, "Invalid record\n");
269                 return 0;
270         }
271
272         int segment = record / RECORDS_PER_SEGMENT;
273         if (segment != blerg->current_segment)
274                 blerg_segment_switch(blerg, segment);
275         int seg_rec = record % RECORDS_PER_SEGMENT;
276
277         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
278                 fprintf(stderr, "Invalid record\n");
279                 return 0;
280         }
281
282         int rec_offset = blerg->index[seg_rec].offset;
283         int rec_length = blerg->index[seg_rec].length;
284         if (rec_offset >= blerg->data_size) {
285                 /* We're accessing an out-of-bounds record in our mmap.
286                    Recheck size and remap. */
287                 struct stat st;
288                 fstat(blerg->data_fd, &st);
289                 blerg->data_size = st.st_size;
290                 if (rec_offset > blerg->data_size) {
291                         fprintf(stderr, "Record offset outside of data!?");
292                         return 0;
293                 }
294
295                 munmap(blerg->data, blerg->data_size);
296                 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
297                 if (blerg->data == MAP_FAILED) {
298                         perror("Could not remap data");
299                         return 0;
300                 }
301         }
302
303         *data = malloc(rec_length);
304         if (*data == NULL) {
305                 perror("Could not allocate string in fetch");
306                 return 0;
307         }
308
309         memcpy(*data, blerg->data + rec_offset, rec_length);
310
311         *length = rec_length;
312
313         return 1;
314 }
315
316 time_t blerg_get_timestamp(struct blerg *blerg, int record) {
317         if (record < 0) {
318                 fprintf(stderr, "Invalid record\n");
319                 return 0;
320         }
321
322         int segment = record / RECORDS_PER_SEGMENT;
323         if (segment != blerg->current_segment)
324                 blerg_segment_switch(blerg, segment);
325         int seg_rec = record % RECORDS_PER_SEGMENT;
326
327         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
328                 fprintf(stderr, "Invalid record\n");
329                 return 0;
330         }
331
332         return blerg->index[seg_rec].timestamp;
333 }
334
335 int blerg_set_subscription_mark(struct blerg *blerg) {
336         blerg->meta->subscription_mark = subscription_count_items(blerg->name);
337 }
338
339 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
340         return blerg->meta->subscription_mark;
341 }