18d33dc29SFrançois Revol/*
25ed41cffSMichael Lotz * Copyright 2009, 2017, Haiku, Inc.
38d33dc29SFrançois Revol * Distributed under the terms of the MIT License.
48d33dc29SFrançois Revol *
58d33dc29SFrançois Revol * Authors:
68d33dc29SFrançois Revol *		Michael Lotz <mmlr@mlotz.ch>
78d33dc29SFrançois Revol */
88d33dc29SFrançois Revol
98d33dc29SFrançois Revol#include "StreamingRingBuffer.h"
108d33dc29SFrançois Revol
118d33dc29SFrançois Revol#include <Autolock.h>
128d33dc29SFrançois Revol
138d33dc29SFrançois Revol#include <stdio.h>
148d33dc29SFrançois Revol#include <stdlib.h>
158d33dc29SFrançois Revol#include <string.h>
168d33dc29SFrançois Revol
175ed41cffSMichael Lotz
185ed41cffSMichael Lotz#ifdef CLIENT_COMPILE
195ed41cffSMichael Lotz#define TRACE_ALWAYS(x...)		printf("StreamingRingBuffer: " x)
205ed41cffSMichael Lotz#else
215ed41cffSMichael Lotz#define TRACE_ALWAYS(x...)		debug_printf("StreamingRingBuffer: " x)
225ed41cffSMichael Lotz#endif
235ed41cffSMichael Lotz
245ed41cffSMichael Lotz#define TRACE(x...)				/*TRACE_ALWAYS(x)*/
255ed41cffSMichael Lotz#define TRACE_ERROR(x...)		TRACE_ALWAYS(x)
268d33dc29SFrançois Revol
278d33dc29SFrançois Revol
288d33dc29SFrançois RevolStreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
298d33dc29SFrançois Revol	:
308d33dc29SFrançois Revol	fReaderWaiting(false),
318d33dc29SFrançois Revol	fWriterWaiting(false),
325ed41cffSMichael Lotz	fCancelRead(false),
335ed41cffSMichael Lotz	fCancelWrite(false),
348d33dc29SFrançois Revol	fReaderNotifier(-1),
358d33dc29SFrançois Revol	fWriterNotifier(-1),
368d33dc29SFrançois Revol	fReaderLocker("StreamingRingBuffer reader"),
378d33dc29SFrançois Revol	fWriterLocker("StreamingRingBuffer writer"),
388d33dc29SFrançois Revol	fDataLocker("StreamingRingBuffer data"),
398d33dc29SFrançois Revol	fBuffer(NULL),
408d33dc29SFrançois Revol	fBufferSize(bufferSize),
418d33dc29SFrançois Revol	fReadable(0),
428d33dc29SFrançois Revol	fReadPosition(0),
438d33dc29SFrançois Revol	fWritePosition(0)
448d33dc29SFrançois Revol{
458d33dc29SFrançois Revol	fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
468d33dc29SFrançois Revol	fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
478d33dc29SFrançois Revol
488d33dc29SFrançois Revol	fBuffer = (uint8 *)malloc(fBufferSize);
498d33dc29SFrançois Revol	if (fBuffer == NULL)
508d33dc29SFrançois Revol		fBufferSize = 0;
518d33dc29SFrançois Revol}
528d33dc29SFrançois Revol
538d33dc29SFrançois Revol
548d33dc29SFrançois RevolStreamingRingBuffer::~StreamingRingBuffer()
558d33dc29SFrançois Revol{
568d33dc29SFrançois Revol	delete_sem(fReaderNotifier);
578d33dc29SFrançois Revol	delete_sem(fWriterNotifier);
588d33dc29SFrançois Revol	free(fBuffer);
598d33dc29SFrançois Revol}
608d33dc29SFrançois Revol
618d33dc29SFrançois Revol
628d33dc29SFrançois Revolstatus_t
638d33dc29SFrançois RevolStreamingRingBuffer::InitCheck()
648d33dc29SFrançois Revol{
658d33dc29SFrançois Revol	if (fReaderNotifier < 0)
668d33dc29SFrançois Revol		return fReaderNotifier;
678d33dc29SFrançois Revol	if (fWriterNotifier < 0)
688d33dc29SFrançois Revol		return fWriterNotifier;
698d33dc29SFrançois Revol	if (fBuffer == NULL)
708d33dc29SFrançois Revol		return B_NO_MEMORY;
718d33dc29SFrançois Revol
728d33dc29SFrançois Revol	return B_OK;
738d33dc29SFrançois Revol}
748d33dc29SFrançois Revol
758d33dc29SFrançois Revol
768d33dc29SFrançois Revolint32
778d33dc29SFrançois RevolStreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
788d33dc29SFrançois Revol{
798d33dc29SFrançois Revol	BAutolock readerLock(fReaderLocker);
808d33dc29SFrançois Revol	if (!readerLock.IsLocked())
818d33dc29SFrançois Revol		return B_ERROR;
828d33dc29SFrançois Revol
838d33dc29SFrançois Revol	BAutolock dataLock(fDataLocker);
848d33dc29SFrançois Revol	if (!dataLock.IsLocked())
858d33dc29SFrançois Revol		return B_ERROR;
868d33dc29SFrançois Revol
878d33dc29SFrançois Revol	int32 readSize = 0;
888d33dc29SFrançois Revol	while (length > 0) {
898d33dc29SFrançois Revol		size_t copyLength = min_c(length, fBufferSize - fReadPosition);
908d33dc29SFrançois Revol		copyLength = min_c(copyLength, fReadable);
918d33dc29SFrançois Revol
928d33dc29SFrançois Revol		if (copyLength == 0) {
938d33dc29SFrançois Revol			if (onlyBlockOnNoData && readSize > 0)
948d33dc29SFrançois Revol				return readSize;
958d33dc29SFrançois Revol
968d33dc29SFrançois Revol			fReaderWaiting = true;
978d33dc29SFrançois Revol			dataLock.Unlock();
988d33dc29SFrançois Revol
998d33dc29SFrançois Revol			status_t result;
1008d33dc29SFrançois Revol			do {
1018d33dc29SFrançois Revol				TRACE("waiting in reader\n");
1028d33dc29SFrançois Revol				result = acquire_sem(fReaderNotifier);
1035ed41cffSMichael Lotz				TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
1045ed41cffSMichael Lotz					result);
1058d33dc29SFrançois Revol			} while (result == B_INTERRUPTED);
1068d33dc29SFrançois Revol
1078d33dc29SFrançois Revol			if (result != B_OK)
1088d33dc29SFrançois Revol				return result;
1098d33dc29SFrançois Revol
1105ed41cffSMichael Lotz			if (!dataLock.Lock()) {
1115ed41cffSMichael Lotz				TRACE_ERROR("failed to acquire data lock\n");
1128d33dc29SFrançois Revol				return B_ERROR;
1135ed41cffSMichael Lotz			}
1145ed41cffSMichael Lotz
1155ed41cffSMichael Lotz			if (fCancelRead) {
1165ed41cffSMichael Lotz				TRACE("read canceled\n");
1175ed41cffSMichael Lotz				fCancelRead = false;
1185ed41cffSMichael Lotz				return B_CANCELED;
1195ed41cffSMichael Lotz			}
1208d33dc29SFrançois Revol
1218d33dc29SFrançois Revol			continue;
1228d33dc29SFrançois Revol		}
1238d33dc29SFrançois Revol
1248d33dc29SFrançois Revol		// support discarding input
1258d33dc29SFrançois Revol		if (buffer != NULL) {
1268d33dc29SFrançois Revol			memcpy(buffer, fBuffer + fReadPosition, copyLength);
1278d33dc29SFrançois Revol			buffer = (uint8 *)buffer + copyLength;
1288d33dc29SFrançois Revol		}
1298d33dc29SFrançois Revol
1308d33dc29SFrançois Revol		fReadPosition = (fReadPosition + copyLength) % fBufferSize;
1318d33dc29SFrançois Revol		fReadable -= copyLength;
1328d33dc29SFrançois Revol		readSize += copyLength;
1338d33dc29SFrançois Revol		length -= copyLength;
1348d33dc29SFrançois Revol
1358d33dc29SFrançois Revol		if (fWriterWaiting) {
1368d33dc29SFrançois Revol			release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
1378d33dc29SFrançois Revol			fWriterWaiting = false;
1388d33dc29SFrançois Revol		}
1398d33dc29SFrançois Revol	}
1408d33dc29SFrançois Revol
1418d33dc29SFrançois Revol	return readSize;
1428d33dc29SFrançois Revol}
1438d33dc29SFrançois Revol
1448d33dc29SFrançois Revol
1458d33dc29SFrançois Revolstatus_t
1468d33dc29SFrançois RevolStreamingRingBuffer::Write(const void *buffer, size_t length)
1478d33dc29SFrançois Revol{
1488d33dc29SFrançois Revol	BAutolock writerLock(fWriterLocker);
1498d33dc29SFrançois Revol	if (!writerLock.IsLocked())
1508d33dc29SFrançois Revol		return B_ERROR;
1518d33dc29SFrançois Revol
1528d33dc29SFrançois Revol	BAutolock dataLock(fDataLocker);
1538d33dc29SFrançois Revol	if (!dataLock.IsLocked())
1548d33dc29SFrançois Revol		return B_ERROR;
1558d33dc29SFrançois Revol
1568d33dc29SFrançois Revol	while (length > 0) {
1578d33dc29SFrançois Revol		size_t copyLength = min_c(length, fBufferSize - fWritePosition);
1588d33dc29SFrançois Revol		copyLength = min_c(copyLength, fBufferSize - fReadable);
1598d33dc29SFrançois Revol
1608d33dc29SFrançois Revol		if (copyLength == 0) {
1618d33dc29SFrançois Revol			fWriterWaiting = true;
1628d33dc29SFrançois Revol			dataLock.Unlock();
1638d33dc29SFrançois Revol
1648d33dc29SFrançois Revol			status_t result;
1658d33dc29SFrançois Revol			do {
1668d33dc29SFrançois Revol				TRACE("waiting in writer\n");
1678d33dc29SFrançois Revol				result = acquire_sem(fWriterNotifier);
1685ed41cffSMichael Lotz				TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
1695ed41cffSMichael Lotz					result);
1708d33dc29SFrançois Revol			} while (result == B_INTERRUPTED);
1718d33dc29SFrançois Revol
1728d33dc29SFrançois Revol			if (result != B_OK)
1738d33dc29SFrançois Revol				return result;
1748d33dc29SFrançois Revol
1755ed41cffSMichael Lotz			if (!dataLock.Lock()) {
1765ed41cffSMichael Lotz				TRACE_ERROR("failed to acquire data lock\n");
1778d33dc29SFrançois Revol				return B_ERROR;
1785ed41cffSMichael Lotz			}
1795ed41cffSMichael Lotz
1805ed41cffSMichael Lotz			if (fCancelWrite) {
1815ed41cffSMichael Lotz				TRACE("write canceled\n");
1825ed41cffSMichael Lotz				fCancelWrite = false;
1835ed41cffSMichael Lotz				return B_CANCELED;
1845ed41cffSMichael Lotz			}
1858d33dc29SFrançois Revol
1868d33dc29SFrançois Revol			continue;
1878d33dc29SFrançois Revol		}
1888d33dc29SFrançois Revol
1898d33dc29SFrançois Revol		memcpy(fBuffer + fWritePosition, buffer, copyLength);
1908d33dc29SFrançois Revol		fWritePosition = (fWritePosition + copyLength) % fBufferSize;
1918d33dc29SFrançois Revol		fReadable += copyLength;
1928d33dc29SFrançois Revol
1938d33dc29SFrançois Revol		buffer = (uint8 *)buffer + copyLength;
1948d33dc29SFrançois Revol		length -= copyLength;
1958d33dc29SFrançois Revol
1968d33dc29SFrançois Revol		if (fReaderWaiting) {
1978d33dc29SFrançois Revol			release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
1988d33dc29SFrançois Revol			fReaderWaiting = false;
1998d33dc29SFrançois Revol		}
2008d33dc29SFrançois Revol	}
2018d33dc29SFrançois Revol
2028d33dc29SFrançois Revol	return B_OK;
2038d33dc29SFrançois Revol}
2045ed41cffSMichael Lotz
2055ed41cffSMichael Lotz
2065ed41cffSMichael Lotzvoid
2075ed41cffSMichael LotzStreamingRingBuffer::MakeEmpty()
2085ed41cffSMichael Lotz{
2095ed41cffSMichael Lotz	BAutolock dataLock(fDataLocker);
2105ed41cffSMichael Lotz	if (!dataLock.IsLocked())
2115ed41cffSMichael Lotz		return;
2125ed41cffSMichael Lotz
2135ed41cffSMichael Lotz	fReadPosition = fWritePosition = 0;
2145ed41cffSMichael Lotz	fReadable = 0;
2155ed41cffSMichael Lotz
2165ed41cffSMichael Lotz	if (fWriterWaiting) {
2175ed41cffSMichael Lotz		release_sem_etc(fWriterNotifier, 1, 0);
2185ed41cffSMichael Lotz		fWriterWaiting = false;
2195ed41cffSMichael Lotz		fCancelWrite = true;
2205ed41cffSMichael Lotz	}
2215ed41cffSMichael Lotz
2225ed41cffSMichael Lotz	if (fReaderWaiting) {
2235ed41cffSMichael Lotz		release_sem_etc(fReaderNotifier, 1, 0);
2245ed41cffSMichael Lotz		fReaderWaiting = false;
2255ed41cffSMichael Lotz		fCancelRead = true;
2265ed41cffSMichael Lotz	}
2275ed41cffSMichael Lotz}
228