wq.c

/**
 * Copyright (c) 2024, SWGY, Inc. <ron@sw.gy>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or (at
 * your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 */

#include <err.h>
#include <stdlib.h>

#include "wq.h"

/**
 * Initialize and return a new work queue.
 */
struct wq_head *
wq_init(void)
{
	struct wq_head *result;
	result = calloc(1, sizeof(struct wq_head));
	STAILQ_INIT(&result->_head);
	pthread_mutex_init(&result->mutex, NULL);
	return result;
}

/**
 * Enqueue the provided task on the work queue headed by `head`.
 * The memory is owned by this queue until popped.
 */
int
wq_q(struct wq_head *head, struct task *t)
{
	if (pthread_mutex_lock(&head->mutex) != 0)
		err(1, "Failed to acquire lock");

	STAILQ_INSERT_TAIL(&head->_head, t, q);

	if (pthread_mutex_unlock(&head->mutex) != 0)
		err(1, "Failed to release lock");
	return 0;
}

/**
 * Return the next task in the queue headed by `head`. Return NULL
 * if the queue is empty.
 * The memory associated with the returned task pointer is the
 * responsibility of the caller.
 */
struct task *
wq_pop(struct wq_head *head)
{
	struct task *result;

	if (pthread_mutex_lock(&head->mutex) != 0)
		err(1, "Failed to acquire lock");

	if (STAILQ_EMPTY(&head->_head)) {
		result = NULL;
	} else {
		result = STAILQ_FIRST(&head->_head);
		STAILQ_REMOVE_HEAD(&head->_head, q);
	}

	if (pthread_mutex_unlock(&head->mutex) != 0)
		err(1, "Failed to release lock");

	return result;
}

/* Return 1 if empty, zero if non-empty, -1 on error */
int
wq_empty(struct wq_head *h)
{
	int result = -1;
	if (pthread_mutex_lock(&h->mutex) != 0)
		err(1, "Failed to acquire lock");

	if (STAILQ_EMPTY(&h->_head))
		result = 1;
	else
		result = 0;

	if (pthread_mutex_unlock(&h->mutex) != 0)
		err(1, "Failed to release lock");

	return result;
}

/**
 * Free resources associated with the provided work queue.
 */
void
wq_free(struct wq_head *h)
{
#ifdef __OpenBSD__
	struct task *a, *b;
#endif
	
	if (pthread_mutex_lock(&h->mutex) != 0)
		err(1, "Failed to acquire lock");

#ifdef __OpenBSD__

	STAILQ_FOREACH_SAFE(a, &h->_head, q, b) {
		STAILQ_REMOVE(&h->_head, a, task, q);
		free(a);
	}
#else
	/* TODO: Safely free this memory on GNU systems */
#endif

	if (pthread_mutex_unlock(&h->mutex) != 0)
		err(1, "Failed to release lock");

}