15bc160ddSAxel Dörfler/*
25bc160ddSAxel Dörfler * Copyright 2008, Axel D��rfler, axeld@pinc-software.de.
35bc160ddSAxel Dörfler * Distributed under the terms of the MIT License.
45bc160ddSAxel Dörfler */
55bc160ddSAxel Dörfler
65bc160ddSAxel Dörfler
75bc160ddSAxel Dörfler#include "AdaptiveBuffering.h"
85bc160ddSAxel Dörfler
95bc160ddSAxel Dörfler#include <stdlib.h>
105bc160ddSAxel Dörfler
115bc160ddSAxel Dörfler
125bc160ddSAxel Dörfler//#define TRACE(x...) printf(x)
135bc160ddSAxel Dörfler#define TRACE(x...) ;
145bc160ddSAxel Dörfler
155bc160ddSAxel Dörfler
165bc160ddSAxel DörflerAdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize,
175bc160ddSAxel Dörfler		size_t maxBufferSize, uint32 count)
185bc160ddSAxel Dörfler	:
195bc160ddSAxel Dörfler	fWriterThread(-1),
205bc160ddSAxel Dörfler	fBuffers(NULL),
215bc160ddSAxel Dörfler	fReadBytes(NULL),
225bc160ddSAxel Dörfler	fBufferCount(count),
235bc160ddSAxel Dörfler	fReadIndex(0),
245bc160ddSAxel Dörfler	fWriteIndex(0),
255bc160ddSAxel Dörfler	fReadCount(0),
265bc160ddSAxel Dörfler	fWriteCount(0),
275bc160ddSAxel Dörfler	fMaxBufferSize(maxBufferSize),
285bc160ddSAxel Dörfler	fCurrentBufferSize(initialBufferSize),
295bc160ddSAxel Dörfler	fReadSem(-1),
305bc160ddSAxel Dörfler	fWriteSem(-1),
315bc160ddSAxel Dörfler	fFinishedSem(-1),
325bc160ddSAxel Dörfler	fWriteStatus(B_OK),
335bc160ddSAxel Dörfler	fWriteTime(0),
345bc160ddSAxel Dörfler	fFinished(false),
355bc160ddSAxel Dörfler	fQuit(false)
365bc160ddSAxel Dörfler{
375bc160ddSAxel Dörfler}
385bc160ddSAxel Dörfler
395bc160ddSAxel Dörfler
405bc160ddSAxel DörflerAdaptiveBuffering::~AdaptiveBuffering()
415bc160ddSAxel Dörfler{
425bc160ddSAxel Dörfler	_QuitWriter();
435bc160ddSAxel Dörfler
445bc160ddSAxel Dörfler	delete_sem(fReadSem);
455bc160ddSAxel Dörfler	delete_sem(fWriteSem);
465bc160ddSAxel Dörfler
475bc160ddSAxel Dörfler	if (fBuffers != NULL) {
485bc160ddSAxel Dörfler		for (uint32 i = 0; i < fBufferCount; i++) {
495bc160ddSAxel Dörfler			if (fBuffers[i] == NULL)
505bc160ddSAxel Dörfler				break;
515bc160ddSAxel Dörfler
525bc160ddSAxel Dörfler			free(fBuffers[i]);
535bc160ddSAxel Dörfler		}
545bc160ddSAxel Dörfler
555bc160ddSAxel Dörfler		free(fBuffers);
565bc160ddSAxel Dörfler	}
575bc160ddSAxel Dörfler
585bc160ddSAxel Dörfler	free(fReadBytes);
595bc160ddSAxel Dörfler}
605bc160ddSAxel Dörfler
615bc160ddSAxel Dörfler
625bc160ddSAxel Dörflerstatus_t
635bc160ddSAxel DörflerAdaptiveBuffering::Init()
645bc160ddSAxel Dörfler{
655bc160ddSAxel Dörfler	fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t));
665bc160ddSAxel Dörfler	if (fReadBytes == NULL)
675bc160ddSAxel Dörfler		return B_NO_MEMORY;
685bc160ddSAxel Dörfler
695bc160ddSAxel Dörfler	fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*));
705bc160ddSAxel Dörfler	if (fBuffers == NULL)
715bc160ddSAxel Dörfler		return B_NO_MEMORY;
725bc160ddSAxel Dörfler
735bc160ddSAxel Dörfler	for (uint32 i = 0; i < fBufferCount; i++) {
745bc160ddSAxel Dörfler		fBuffers[i] = (uint8*)malloc(fMaxBufferSize);
755bc160ddSAxel Dörfler		if (fBuffers[i] == NULL)
765bc160ddSAxel Dörfler			return B_NO_MEMORY;
775bc160ddSAxel Dörfler	}
785bc160ddSAxel Dörfler
795bc160ddSAxel Dörfler	fReadSem = create_sem(0, "reader");
805bc160ddSAxel Dörfler	if (fReadSem < B_OK)
815bc160ddSAxel Dörfler		return fReadSem;
825bc160ddSAxel Dörfler
835bc160ddSAxel Dörfler	fWriteSem = create_sem(fBufferCount - 1, "writer");
845bc160ddSAxel Dörfler	if (fWriteSem < B_OK)
855bc160ddSAxel Dörfler		return fWriteSem;
865bc160ddSAxel Dörfler
875bc160ddSAxel Dörfler	fFinishedSem = create_sem(0, "finished");
885bc160ddSAxel Dörfler	if (fFinishedSem < B_OK)
895bc160ddSAxel Dörfler		return fFinishedSem;
905bc160ddSAxel Dörfler
915bc160ddSAxel Dörfler	fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY,
925bc160ddSAxel Dörfler		this);
935bc160ddSAxel Dörfler	if (fWriterThread < B_OK)
945bc160ddSAxel Dörfler		return fWriterThread;
955bc160ddSAxel Dörfler
965bc160ddSAxel Dörfler	return resume_thread(fWriterThread);
975bc160ddSAxel Dörfler}
985bc160ddSAxel Dörfler
995bc160ddSAxel Dörfler
1005bc160ddSAxel Dörflerstatus_t
1015bc160ddSAxel DörflerAdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length)
1025bc160ddSAxel Dörfler{
1035bc160ddSAxel Dörfler	*_length = 0;
1045bc160ddSAxel Dörfler	return B_OK;
1055bc160ddSAxel Dörfler}
1065bc160ddSAxel Dörfler
1075bc160ddSAxel Dörfler
1085bc160ddSAxel Dörflerstatus_t
1095bc160ddSAxel DörflerAdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/)
1105bc160ddSAxel Dörfler{
1115bc160ddSAxel Dörfler	return B_OK;
1125bc160ddSAxel Dörfler}
1135bc160ddSAxel Dörfler
1145bc160ddSAxel Dörfler
1155bc160ddSAxel Dörflerstatus_t
1165bc160ddSAxel DörflerAdaptiveBuffering::Run()
1175bc160ddSAxel Dörfler{
1185bc160ddSAxel Dörfler	fReadIndex = 0;
1195bc160ddSAxel Dörfler	fWriteIndex = 0;
1205bc160ddSAxel Dörfler	fReadCount = 0;
1215bc160ddSAxel Dörfler	fWriteCount = 0;
1225bc160ddSAxel Dörfler	fWriteStatus = B_OK;
1235bc160ddSAxel Dörfler	fWriteTime = 0;
1245bc160ddSAxel Dörfler
1255bc160ddSAxel Dörfler	while (fWriteStatus >= B_OK) {
1265bc160ddSAxel Dörfler		bigtime_t start = system_time();
1275bc160ddSAxel Dörfler		int32 index = fReadIndex;
1285bc160ddSAxel Dörfler
1295bc160ddSAxel Dörfler		TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index,
1305bc160ddSAxel Dörfler			fCurrentBufferSize);
1315bc160ddSAxel Dörfler
1325bc160ddSAxel Dörfler		fReadBytes[index] = fCurrentBufferSize;
1335bc160ddSAxel Dörfler		status_t status = Read(fBuffers[index], &fReadBytes[index]);
1345bc160ddSAxel Dörfler		if (status < B_OK)
1355bc160ddSAxel Dörfler			return status;
1365bc160ddSAxel Dörfler
1375bc160ddSAxel Dörfler		TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]);
1385bc160ddSAxel Dörfler
1395bc160ddSAxel Dörfler		fReadCount++;
1405bc160ddSAxel Dörfler		fReadIndex = (index + 1) % fBufferCount;
1415bc160ddSAxel Dörfler		if (fReadBytes[index] == 0)
1425bc160ddSAxel Dörfler			fFinished = true;
1435bc160ddSAxel Dörfler		release_sem(fReadSem);
1445bc160ddSAxel Dörfler
1455bc160ddSAxel Dörfler		while (acquire_sem(fWriteSem) == B_INTERRUPTED)
1465bc160ddSAxel Dörfler			;
1475bc160ddSAxel Dörfler
1485bc160ddSAxel Dörfler		if (fFinished)
1495bc160ddSAxel Dörfler			break;
1505bc160ddSAxel Dörfler
1515bc160ddSAxel Dörfler		bigtime_t readTime = system_time() - start;
1525bc160ddSAxel Dörfler		uint32 writeTime = fWriteTime;
1535bc160ddSAxel Dörfler		if (writeTime) {
1545bc160ddSAxel Dörfler			if (writeTime > readTime) {
1555bc160ddSAxel Dörfler				fCurrentBufferSize = fCurrentBufferSize * 8/9;
1565bc160ddSAxel Dörfler				fCurrentBufferSize &= ~65535;
1575bc160ddSAxel Dörfler			} else {
1585bc160ddSAxel Dörfler				fCurrentBufferSize = fCurrentBufferSize * 9/8;
1595bc160ddSAxel Dörfler				fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535;
1605bc160ddSAxel Dörfler
1615bc160ddSAxel Dörfler				if (fCurrentBufferSize > fMaxBufferSize)
1625bc160ddSAxel Dörfler					fCurrentBufferSize = fMaxBufferSize;
1635bc160ddSAxel Dörfler			}
1645bc160ddSAxel Dörfler		}
1655bc160ddSAxel Dörfler	}
1665bc160ddSAxel Dörfler
1675bc160ddSAxel Dörfler	while (acquire_sem(fFinishedSem) == B_INTERRUPTED)
1685bc160ddSAxel Dörfler		;
1695bc160ddSAxel Dörfler
1705bc160ddSAxel Dörfler	return fWriteStatus;
1715bc160ddSAxel Dörfler}
1725bc160ddSAxel Dörfler
1735bc160ddSAxel Dörfler
1745bc160ddSAxel Dörflervoid
1755bc160ddSAxel DörflerAdaptiveBuffering::_QuitWriter()
1765bc160ddSAxel Dörfler{
1775bc160ddSAxel Dörfler	if (fWriterThread >= B_OK) {
1785bc160ddSAxel Dörfler		fQuit = true;
1795bc160ddSAxel Dörfler		release_sem(fReadSem);
1805bc160ddSAxel Dörfler
1815bc160ddSAxel Dörfler		status_t status;
1825bc160ddSAxel Dörfler		wait_for_thread(fWriterThread, &status);
1835bc160ddSAxel Dörfler
1845bc160ddSAxel Dörfler		fWriterThread = -1;
1855bc160ddSAxel Dörfler	}
1865bc160ddSAxel Dörfler}
1875bc160ddSAxel Dörfler
1885bc160ddSAxel Dörfler
1895bc160ddSAxel Dörflerstatus_t
1905bc160ddSAxel DörflerAdaptiveBuffering::_Writer()
1915bc160ddSAxel Dörfler{
1925bc160ddSAxel Dörfler	while (true) {
1935bc160ddSAxel Dörfler		while (acquire_sem(fReadSem) == B_INTERRUPTED)
1945bc160ddSAxel Dörfler			;
1955bc160ddSAxel Dörfler		if (fQuit)
1965bc160ddSAxel Dörfler			break;
1975bc160ddSAxel Dörfler
1985bc160ddSAxel Dörfler		bigtime_t start = system_time();
1995bc160ddSAxel Dörfler
2005bc160ddSAxel Dörfler		TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex,
2015bc160ddSAxel Dörfler			fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
2025bc160ddSAxel Dörfler
2035bc160ddSAxel Dörfler		fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
2045bc160ddSAxel Dörfler
2055bc160ddSAxel Dörfler		TRACE("%ld. write done\n", fWriteCount);
2065bc160ddSAxel Dörfler
2075bc160ddSAxel Dörfler		fWriteIndex = (fWriteIndex + 1) % fBufferCount;
2085bc160ddSAxel Dörfler		fWriteTime = uint32(system_time() - start);
2095bc160ddSAxel Dörfler		fWriteCount++;
2105bc160ddSAxel Dörfler
2115bc160ddSAxel Dörfler		release_sem(fWriteSem);
2125bc160ddSAxel Dörfler
2135bc160ddSAxel Dörfler		if (fWriteStatus < B_OK)
2145bc160ddSAxel Dörfler			return fWriteStatus;
2155bc160ddSAxel Dörfler		if (fFinished && fWriteCount == fReadCount)
2165bc160ddSAxel Dörfler			release_sem(fFinishedSem);
2175bc160ddSAxel Dörfler	}
2185bc160ddSAxel Dörfler
2195bc160ddSAxel Dörfler	return B_OK;
2205bc160ddSAxel Dörfler}
2215bc160ddSAxel Dörfler
2225bc160ddSAxel Dörfler
2235bc160ddSAxel Dörfler/*static*/ status_t
2245bc160ddSAxel DörflerAdaptiveBuffering::_Writer(void* self)
2255bc160ddSAxel Dörfler{
2265bc160ddSAxel Dörfler	return ((AdaptiveBuffering*)self)->_Writer();
2275bc160ddSAxel Dörfler}
2285bc160ddSAxel Dörfler
229