/main/file.c
#include <stdio.h>
#include <stdbool.h>
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include "freertos/stream_buffer.h"
#include "esp_log.h"

#include "file.h"

#define BUFSIZE 4096
#define TAG "file reader"

typedef enum {
	FILE_READER_OPEN,
	FILE_READER_SUBSCRIBE,
	FILE_READER_START,
	FILE_READER_STOP,
	FILE_READER_SEEK,
	FILE_READER_CLOSE,
} file_reader_command_t;

typedef struct {
	long offset;
	int whence;
} file_reader_seek_args_t;

typedef struct {
	file_reader_command_t command;
	union {
		const char* open_filename;
		StreamBufferHandle_t subscribe_sb;
		file_reader_seek_args_t seek;
	};
} file_reader_command_args_t;

QueueHandle_t file_reader_command_queue = NULL;
TaskHandle_t  file_reader_task_handle = NULL;
StreamBufferHandle_t file_reader_streambuffer = NULL;
SemaphoreHandle_t seek_sync_sem = NULL;

void file_reader_task(void* pvParameters) {
	BaseType_t err;
	bool reading = false;
	size_t bytes_read = 0, bytes_sent = 0, total_bytes = 0;
	FILE* f = NULL;
	int64_t start = 0;

	uint8_t *buf = (uint8_t *) malloc(BUFSIZE);
	if (buf == NULL) {
		ESP_LOGE(TAG, "Could not allocate read buffer");
		return;
	}

	ESP_LOGI(TAG, "Reader task started");

	while (1) {
		file_reader_command_args_t a;
		err = xQueueReceive(file_reader_command_queue, &a, reading ? 0 : portMAX_DELAY);
		if (err == pdTRUE) {
			ESP_LOGI(TAG, "Received command %d", a.command);
			switch (a.command) {
			case FILE_READER_OPEN:
				if (f) {
					fclose(f);
				}
				f = fopen(a.open_filename, "r");
				if (f == NULL) {
					ESP_LOGE(TAG, "Could not open %s", a.open_filename);
				}
				break;
			case FILE_READER_SUBSCRIBE:
				file_reader_streambuffer = a.subscribe_sb;
				break;
			case FILE_READER_START:
				start = esp_timer_get_time();
				reading = true;
				break;
			case FILE_READER_STOP:
				reading = false;
				break;
			case FILE_READER_SEEK:
				if (f == NULL) {
					ESP_LOGE(TAG, "Cannot seek without file");
					break;
				}
				fseek(f, a.seek.offset, a.seek.whence);
				bytes_sent = bytes_read = 0;
				if (file_reader_streambuffer) {
					err = xStreamBufferReset(file_reader_streambuffer);
					if (err != pdPASS) {
						ESP_LOGE(TAG, "Could not reset streambuffer during seek");
					}
				}
				xSemaphoreGive(seek_sync_sem);
				break;
			case FILE_READER_CLOSE:
				fclose(f);
				f = NULL;
				reading = false;
				bytes_sent = bytes_read = 0;
				break;
			}
		}

		if (reading) {
			//ESP_LOGI(TAG, "%d sent %d read", bytes_sent, bytes_read);
			if (bytes_sent == bytes_read) {
				if (f == NULL) {
					ESP_LOGE(TAG, "FILE pointer null while reading");
					reading = false;
				} else if (feof(f)) {
					ESP_LOGI(TAG, "end of file");
					fclose(f);
					break;
				} else {
					bytes_sent = 0;
					bytes_read = fread(buf, 1, BUFSIZE, f);
					total_bytes += bytes_read;
					if (total_bytes >= 1048576) {
						int64_t now = esp_timer_get_time();
						ESP_LOGI(TAG, "%0.1f KB/sec", 1000.0 * (double)total_bytes / (double)(now - start));
						start = now;
						total_bytes = 0;
					}
					//ESP_LOGI(TAG, "read %d bytes", bytes_read);
				}
			}
			bytes_sent += xStreamBufferSend(
				file_reader_streambuffer,
				buf + bytes_sent,
				bytes_read - bytes_sent,
				500 / portTICK_PERIOD_MS
			);
		}
	}

	vTaskDelete(NULL);
}

int file_init() {
	BaseType_t err;

	seek_sync_sem = xSemaphoreCreateBinary();
	file_reader_command_queue = xQueueCreate(1, sizeof(file_reader_command_args_t));
	
	err = xTaskCreate(file_reader_task, "file reader", 4096, NULL, 2, &file_reader_task_handle);
	if (err != pdPASS) {
		ESP_LOGE(TAG, "could not initialize file reader task: %d", err);
		return 0;
	}

	return 1;
}

void file_reader_open(const char* filename) {
	file_reader_command_args_t a;
	a.command = FILE_READER_OPEN;
	a.open_filename = filename;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}

void file_reader_subscribe(StreamBufferHandle_t bh) {
	file_reader_command_args_t a;
	a.command = FILE_READER_SUBSCRIBE;
	a.subscribe_sb = bh;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}

void file_reader_start() {
	file_reader_command_args_t a;
	a.command = FILE_READER_START;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}

void file_reader_stop() {
	file_reader_command_args_t a;
	a.command = FILE_READER_STOP;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}

void file_reader_seek(long offset, int whence) {
	file_reader_command_args_t a;
	a.command = FILE_READER_SEEK;
	a.seek.offset = offset;
	a.seek.whence = whence;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
	xSemaphoreTake(seek_sync_sem, portMAX_DELAY);
}

void file_reader_close() {
	file_reader_command_args_t a;
	a.command = FILE_READER_CLOSE;
	xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}