commit:5b9a1c597b87485253a11339067b60d08534f8b0
author:Chip Black
committer:Chip Black
date:Sat Jun 1 03:02:51 2013 -0500
parents:d81158e4b60f0e0dff1e13adb253c2e01865de0d
Bug hunting: Deadlocks, data safety, and dumb shit

- Don't preallocate index files, instead just ftruncate() them to size.
  Index locality is pretty well moot when we're never reading more than
  50 entries at a time.
- Factor out data remapping
- Added extra bounds checks for segments and records
- Wasn't actually updating blerg->current_segment when switching
  segments, causing deadlock when the second record is written in a new
  segment.
- Use posix_fallocate() instead of writing zeroes
- Update record count *after* successfully writing a record, not before
- Use fstat to determine new record data offset instead of ftell (not
  sure if this is better, but it sure looks cleaner).
- Don't bother trying to write the rest of the data when we get a short
  write.  Truncate and move on.
diff --git a/database/database.c b/database/database.c
line changes: +77/-58
index e6e8707..c0ec591
--- a/database/database.c
+++ b/database/database.c
@@ -45,15 +45,42 @@ void blerg_segment_close(struct blerg *blerg) {
 		close(blerg->index_fd);
 }
 
+int blerg_remap_data(struct blerg *blerg) {
+	struct stat st;
+
+	if (blerg->data != NULL)
+		munmap(blerg->data, blerg->data_size);
+	fstat(blerg->data_fd, &st);
+	if (st.st_size == 0) {
+		/* Can't map an empty data file. */
+		return 1;
+	}
+	blerg->data = (char *) mmap(NULL, st.st_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
+	if (blerg->data == MAP_FAILED) {
+		perror("Could not remap data");
+		return 0;
+	}
+	blerg->data_size = st.st_size;
+	return 1;
+}
+
 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
 	char filename[512];
-	uint64_t max_sequence = blerg_get_record_count(blerg);
+	uint64_t max_sequence_no = blerg_get_record_count(blerg);
 	struct stat st;
 
-	if (new_segment > max_sequence / RECORDS_PER_SEGMENT) {
+	if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
+		return 1;
+	}
+
+	if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
 		fprintf(stderr, "Cannot switch to sequence beyond last record\n");
 		return 0;
 	}
+	if (new_segment < 0) {
+		fprintf(stderr, "Cannot switch to negative segment\n");
+		return 0;
+	}
 
 	blerg_segment_close(blerg);
 
@@ -67,12 +94,10 @@ int blerg_segment_switch(struct blerg *blerg, int new_segment) {
 	flock(blerg->index_fd, LOCK_EX);
 	fstat(blerg->index_fd, &st);
 	if (st.st_size == 0) {
-		int i;
-		struct record r;
-		memset((void *)&r, 0, sizeof(struct record));
-		for (i = 0; i < RECORDS_PER_SEGMENT; i++) {
-			write(blerg->index_fd, &r, sizeof(struct record));
-		}
+		/* ftruncate() means never having to say you're sorry.  Sorry
+		   in this case meaning "allocating disk space for a 1MB file
+		   full or zeroes". */
+		ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
 	}
 	flock(blerg->index_fd, LOCK_UN);
 
@@ -85,27 +110,23 @@ int blerg_segment_switch(struct blerg *blerg, int new_segment) {
 	/* Load data file */
 	sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
 	blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
-	fstat(blerg->data_fd, &st);
-	blerg->data_size = st.st_size;
 	if (blerg->data_fd == -1) {
 		perror("Could not open data");
 		goto open_failed_data_open;
 	}
 
-	if (blerg->data_size > 0) {
-		blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
-		if (blerg->data == MAP_FAILED) {
-			perror("Could not mmap data");
-			goto open_failed_data_mmap;
-		}
+	if (!blerg_remap_data(blerg)) {
+		goto open_failed_data_mmap;
 	}
 
+	blerg->current_segment = new_segment;
+
 	return 1;
 
 open_failed_data_mmap:
 	close(blerg->data_fd);
 open_failed_data_open:
-	munmap((void *)blerg->index, sizeof(RECORDS_PER_SEGMENT * sizeof(struct record)));
+	munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
 open_failed_index_mmap:
 	close(blerg->index_fd);
 open_failed_index_open:
@@ -165,17 +186,10 @@ struct blerg *blerg_open(const char *name) {
 	}
 	fstat(blerg->meta_fd, &st);
 	if (st.st_size < sizeof(struct meta)) {
-		// Fill the difference in size between sizeof(struct meta) and
-		// the file size with nulls.  This allows seamless upgrades as
-		// long as struct meta only adds members.
-		int len = sizeof(struct meta) - st.st_size;
-		char *buf = (char *) malloc(len);
-		memset(buf, 0, len);
-		int tmpfd = dup(blerg->meta_fd);
-		FILE* tmp = fdopen(tmpfd, "a");
-		fwrite(buf, len, 1, tmp);
-		fclose(tmp);
-		free(buf);
+		/* Extend the file if sizeof(struct meta) is larger than the
+		   file. This allows seamless upgrades as long as struct meta
+		   only adds members. */
+		posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
 	}
 	blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
 	if (blerg->meta == MAP_FAILED) {
@@ -214,6 +228,9 @@ int blerg_close(struct blerg *blerg) {
 }
 
 int blerg_store(struct blerg *blerg, const char *data, int len) {
+	struct stat st;
+	int n;
+
 	if (len > MAX_RECORD_SIZE) {
 		fprintf(stderr, "len > 64K\n");
 		return -1;
@@ -222,8 +239,8 @@ int blerg_store(struct blerg *blerg, const char *data, int len) {
 	flock(blerg->index_fd, LOCK_EX);
 	flock(blerg->data_fd, LOCK_EX);
 
-	uint64_t record = blerg_increment_record_count(blerg);
-	if (record == -1) {
+	uint64_t record = blerg_get_record_count(blerg);
+	if (record == -1) {  /* Intentional signed-unsigned coercion */
 		fprintf(stderr, "Could not find free record\n");
 		return -1;
 	}
@@ -233,30 +250,31 @@ int blerg_store(struct blerg *blerg, const char *data, int len) {
 	int seg_rec = record % RECORDS_PER_SEGMENT;
 
 	/* Get the position for the new data */
-	FILE *datafile = fdopen(dup(blerg->data_fd), "a");
-	fseek(datafile, 0, SEEK_END);
-	int curpos = ftell(datafile);
-	fclose(datafile);
-
-	int bytes = 0;
-	do {
-		int n = write(blerg->data_fd, data + bytes, len);
-		if (n == -1) {
-			perror("Could not write data");
-			/* Truncate anything we may have written */
-			ftruncate(blerg->data_fd, curpos);
-			return -1;
-		}
-		bytes += n;
-	} while (bytes < len);
+	fstat(blerg->data_fd, &st);
+	int curpos = st.st_size;
+
+	/* Write data to the data log */
+	n = write(blerg->data_fd, data, len);
+	if (n < len) {
+		perror("Could not write data");
+		/* Truncate anything we may have written */
+		ftruncate(blerg->data_fd, curpos);
+		return -1;
+	}
+
+	/* Update the index */
 	blerg->index[seg_rec].flags = 0x0001;
 	blerg->index[seg_rec].offset = curpos;
 	blerg->index[seg_rec].length = len;
 	blerg->index[seg_rec].timestamp = time(NULL);
 
+	/* And finally increment the record count */
+	blerg_increment_record_count(blerg);
+
 	flock(blerg->data_fd, LOCK_UN);
 	flock(blerg->index_fd, LOCK_UN);
 
+	/* Now do those dirty microblogging deeds */
 	tag_scan(blerg->name, data, len, record);
 	subscription_notify(blerg->name, record);
 
@@ -268,6 +286,10 @@ int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
 		fprintf(stderr, "Invalid record\n");
 		return 0;
 	}
+	if (record >= blerg_get_record_count(blerg)) {
+		fprintf(stderr, "Invalid record\n");
+		return 0;
+	}
 
 	int segment = record / RECORDS_PER_SEGMENT;
 	if (segment != blerg->current_segment)
@@ -282,20 +304,13 @@ int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
 	int rec_offset = blerg->index[seg_rec].offset;
 	int rec_length = blerg->index[seg_rec].length;
 	if (rec_offset >= blerg->data_size) {
-		/* We're accessing an out-of-bounds record in our mmap.
-		   Recheck size and remap. */
-		struct stat st;
-		fstat(blerg->data_fd, &st);
-		blerg->data_size = st.st_size;
-		if (rec_offset > blerg->data_size) {
-			fprintf(stderr, "Record offset outside of data!?");
+		/* We're accessing an out-of-bounds record in our mmap.  Remap
+		   and recheck. */
+		if (!blerg_remap_data(blerg)) {
 			return 0;
 		}
-
-		munmap(blerg->data, blerg->data_size);
-		blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
-		if (blerg->data == MAP_FAILED) {
-			perror("Could not remap data");
+		if (rec_offset >= blerg->data_size) {
+			fprintf(stderr, "Record offset outside of data!?");
 			return 0;
 		}
 	}
@@ -318,6 +333,10 @@ time_t blerg_get_timestamp(struct blerg *blerg, int record) {
 		fprintf(stderr, "Invalid record\n");
 		return 0;
 	}
+	if (record >= blerg_get_record_count(blerg)) {
+		fprintf(stderr, "Invalid record\n");
+		return 0;
+	}
 
 	int segment = record / RECORDS_PER_SEGMENT;
 	if (segment != blerg->current_segment)