1/*
2 * Copyright 2001-2015 Haiku, Inc. All rights reserved
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		DarkWyrm, bpmagic@columbus.rr.com
7 *		Axel D��rfler, axeld@pinc-software.de
8 *		Erik Jaesler, erik@cgsoftware.com
9 *		Ingo Weinhold, bonefish@@users.sf.net
10 */
11
12
13// BLooper class spawns a thread that runs a message loop.
14
15
16#include <Looper.h>
17
18#include <new>
19#include <stdio.h>
20#include <stdlib.h>
21
22#include <Autolock.h>
23#include <Message.h>
24#include <MessageFilter.h>
25#include <MessageQueue.h>
26#include <Messenger.h>
27#include <PropertyInfo.h>
28
29#include <AppMisc.h>
30#include <AutoLocker.h>
31#include <DirectMessageTarget.h>
32#include <LooperList.h>
33#include <MessagePrivate.h>
34#include <TokenSpace.h>
35
36
37// debugging
38//#define DBG(x) x
39#define DBG(x)	;
40#define PRINT(x)	DBG({ printf("[%6ld] ", find_thread(NULL)); printf x; })
41
42/*
43#include <Autolock.h>
44#include <Locker.h>
45static BLocker sDebugPrintLocker("BLooper debug print");
46#define PRINT(x)	DBG({						\
47	BAutolock _(sDebugPrintLocker);				\
48	debug_printf("[%6ld] ", find_thread(NULL));	\
49	debug_printf x;								\
50})
51*/
52
53
54#define FILTER_LIST_BLOCK_SIZE	5
55#define DATA_BLOCK_SIZE			5
56
57
58using BPrivate::gDefaultTokens;
59using BPrivate::gLooperList;
60using BPrivate::BLooperList;
61
62port_id _get_looper_port_(const BLooper* looper);
63
64enum {
65	BLOOPER_PROCESS_INTERNALLY = 0,
66	BLOOPER_HANDLER_BY_INDEX
67};
68
69static property_info sLooperPropInfo[] = {
70	{
71		"Handler",
72			{},
73			{B_INDEX_SPECIFIER, B_REVERSE_INDEX_SPECIFIER},
74			NULL, BLOOPER_HANDLER_BY_INDEX,
75			{},
76			{},
77			{}
78	},
79	{
80		"Handlers",
81			{B_GET_PROPERTY},
82			{B_DIRECT_SPECIFIER},
83			NULL, BLOOPER_PROCESS_INTERNALLY,
84			{B_MESSENGER_TYPE},
85			{},
86			{}
87	},
88	{
89		"Handler",
90			{B_COUNT_PROPERTIES},
91			{B_DIRECT_SPECIFIER},
92			NULL, BLOOPER_PROCESS_INTERNALLY,
93			{B_INT32_TYPE},
94			{},
95			{}
96	},
97
98	{ 0 }
99};
100
101struct _loop_data_ {
102	BLooper*	looper;
103	thread_id	thread;
104};
105
106
107//	#pragma mark -
108
109
110BLooper::BLooper(const char* name, int32 priority, int32 portCapacity)
111	:
112	BHandler(name)
113{
114	_InitData(name, priority, -1, portCapacity);
115}
116
117
118BLooper::~BLooper()
119{
120	if (fRunCalled && !fTerminating) {
121		debugger("You can't call delete on a BLooper object "
122			"once it is running.");
123	}
124
125	Lock();
126
127	// In case the looper thread calls Quit() fLastMessage is not deleted.
128	if (fLastMessage) {
129		delete fLastMessage;
130		fLastMessage = NULL;
131	}
132
133	// Close the message port and read and reply to the remaining messages.
134	if (fMsgPort >= 0 && fOwnsPort)
135		close_port(fMsgPort);
136
137	// Clear the queue so our call to IsMessageWaiting() below doesn't give
138	// us bogus info
139	fDirectTarget->Close();
140
141	BMessage* message;
142	while ((message = fDirectTarget->Queue()->NextMessage()) != NULL) {
143		delete message;
144			// msg will automagically post generic reply
145	}
146
147	if (fOwnsPort) {
148		do {
149			delete ReadMessageFromPort(0);
150				// msg will automagically post generic reply
151		} while (IsMessageWaiting());
152
153		delete_port(fMsgPort);
154	}
155	fDirectTarget->Release();
156
157	// Clean up our filters
158	SetCommonFilterList(NULL);
159
160	AutoLocker<BLooperList> ListLock(gLooperList);
161	RemoveHandler(this);
162
163	// Remove all the "child" handlers
164	int32 count = fHandlers.CountItems();
165	for (int32 i = 0; i < count; i++) {
166		BHandler* handler = (BHandler*)fHandlers.ItemAtFast(i);
167		handler->SetNextHandler(NULL);
168		handler->SetLooper(NULL);
169	}
170	fHandlers.MakeEmpty();
171
172	Unlock();
173	gLooperList.RemoveLooper(this);
174	delete_sem(fLockSem);
175}
176
177
178BLooper::BLooper(BMessage* data)
179	: BHandler(data)
180{
181	int32 portCapacity;
182	if (data->FindInt32("_port_cap", &portCapacity) != B_OK || portCapacity < 0)
183		portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY;
184
185	int32 priority;
186	if (data->FindInt32("_prio", &priority) != B_OK)
187		priority = B_NORMAL_PRIORITY;
188
189	_InitData(Name(), priority, -1, portCapacity);
190}
191
192
193BArchivable*
194BLooper::Instantiate(BMessage* data)
195{
196	if (validate_instantiation(data, "BLooper"))
197		return new BLooper(data);
198
199	return NULL;
200}
201
202
203status_t
204BLooper::Archive(BMessage* data, bool deep) const
205{
206	status_t status = BHandler::Archive(data, deep);
207	if (status < B_OK)
208		return status;
209
210	port_info info;
211	status = get_port_info(fMsgPort, &info);
212	if (status == B_OK)
213		status = data->AddInt32("_port_cap", info.capacity);
214
215	thread_info threadInfo;
216	if (get_thread_info(Thread(), &threadInfo) == B_OK)
217		status = data->AddInt32("_prio", threadInfo.priority);
218
219	return status;
220}
221
222
223status_t
224BLooper::PostMessage(uint32 command)
225{
226	BMessage message(command);
227	return _PostMessage(&message, this, NULL);
228}
229
230
231status_t
232BLooper::PostMessage(BMessage* message)
233{
234	return _PostMessage(message, this, NULL);
235}
236
237
238status_t
239BLooper::PostMessage(uint32 command, BHandler* handler, BHandler* replyTo)
240{
241	BMessage message(command);
242	return _PostMessage(&message, handler, replyTo);
243}
244
245
246status_t
247BLooper::PostMessage(BMessage* message, BHandler* handler, BHandler* replyTo)
248{
249	return _PostMessage(message, handler, replyTo);
250}
251
252
253void
254BLooper::DispatchMessage(BMessage* message, BHandler* handler)
255{
256	PRINT(("BLooper::DispatchMessage(%.4s)\n", (char*)&message->what));
257
258	switch (message->what) {
259		case _QUIT_:
260			// Can't call Quit() to do this, because of the slight chance
261			// another thread with have us locked between now and then.
262			fTerminating = true;
263
264			// After returning from DispatchMessage(), the looper will be
265			// deleted in _task0_()
266			break;
267
268		case B_QUIT_REQUESTED:
269			if (handler == this) {
270				_QuitRequested(message);
271				break;
272			}
273
274			// fall through
275
276		default:
277			handler->MessageReceived(message);
278			break;
279	}
280	PRINT(("BLooper::DispatchMessage() done\n"));
281}
282
283
284void
285BLooper::MessageReceived(BMessage* message)
286{
287	// TODO: implement scripting support
288	BHandler::MessageReceived(message);
289}
290
291
292BMessage*
293BLooper::CurrentMessage() const
294{
295	return fLastMessage;
296}
297
298
299BMessage*
300BLooper::DetachCurrentMessage()
301{
302	BMessage* message = fLastMessage;
303	fLastMessage = NULL;
304	return message;
305}
306
307
308void
309BLooper::DispatchExternalMessage(BMessage* message, BHandler* handler,
310	bool& _detached)
311{
312	AssertLocked();
313
314	BMessage* previousMessage = fLastMessage;
315	fLastMessage = message;
316
317	DispatchMessage(message, handler);
318
319	_detached = fLastMessage == NULL;
320	fLastMessage = previousMessage;
321}
322
323
324BMessageQueue*
325BLooper::MessageQueue() const
326{
327	return fDirectTarget->Queue();
328}
329
330
331bool
332BLooper::IsMessageWaiting() const
333{
334	AssertLocked();
335
336	if (!fDirectTarget->Queue()->IsEmpty())
337		return true;
338
339	int32 count;
340	do {
341		count = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, 0);
342	} while (count == B_INTERRUPTED);
343
344	return count > 0;
345}
346
347
348void
349BLooper::AddHandler(BHandler* handler)
350{
351	if (handler == NULL)
352		return;
353
354	AssertLocked();
355
356	if (handler->Looper() == NULL) {
357		fHandlers.AddItem(handler);
358		handler->SetLooper(this);
359		if (handler != this)	// avoid a cycle
360			handler->SetNextHandler(this);
361	}
362}
363
364
365bool
366BLooper::RemoveHandler(BHandler* handler)
367{
368	if (handler == NULL)
369		return false;
370
371	AssertLocked();
372
373	if (handler->Looper() == this && fHandlers.RemoveItem(handler)) {
374		if (handler == fPreferred)
375			fPreferred = NULL;
376
377		handler->SetNextHandler(NULL);
378		handler->SetLooper(NULL);
379		return true;
380	}
381
382	return false;
383}
384
385
386int32
387BLooper::CountHandlers() const
388{
389	AssertLocked();
390
391	return fHandlers.CountItems();
392}
393
394
395BHandler*
396BLooper::HandlerAt(int32 index) const
397{
398	AssertLocked();
399
400	return (BHandler*)fHandlers.ItemAt(index);
401}
402
403
404int32
405BLooper::IndexOf(BHandler* handler) const
406{
407	AssertLocked();
408
409	return fHandlers.IndexOf(handler);
410}
411
412
413BHandler*
414BLooper::PreferredHandler() const
415{
416	return fPreferred;
417}
418
419
420void
421BLooper::SetPreferredHandler(BHandler* handler)
422{
423	if (handler && handler->Looper() == this && IndexOf(handler) >= 0) {
424		fPreferred = handler;
425	} else {
426		fPreferred = NULL;
427	}
428}
429
430
431thread_id
432BLooper::Run()
433{
434	AssertLocked();
435
436	if (fRunCalled) {
437		// Not allowed to call Run() more than once
438		debugger("can't call BLooper::Run twice!");
439		return fThread;
440	}
441
442	fThread = spawn_thread(_task0_, Name(), fInitPriority, this);
443	if (fThread < B_OK)
444		return fThread;
445
446	if (fMsgPort < B_OK)
447		return fMsgPort;
448
449	fRunCalled = true;
450	Unlock();
451
452	status_t err = resume_thread(fThread);
453	if (err < B_OK)
454		return err;
455
456	return fThread;
457}
458
459
460void
461BLooper::Loop()
462{
463	AssertLocked();
464
465	if (fRunCalled) {
466		// Not allowed to call Loop() or Run() more than once
467		debugger("can't call BLooper::Loop twice!");
468		return;
469	}
470
471	fThread = find_thread(NULL);
472	fRunCalled = true;
473
474	task_looper();
475}
476
477
478void
479BLooper::Quit()
480{
481	PRINT(("BLooper::Quit()\n"));
482
483	if (!IsLocked()) {
484		printf("ERROR - you must Lock a looper before calling Quit(), "
485			"team=%" B_PRId32 ", looper=%s\n", Team(),
486			Name() ? Name() : "unnamed");
487	}
488
489	// Try to lock
490	if (!Lock()) {
491		// We're toast already
492		return;
493	}
494
495	PRINT(("  is locked\n"));
496
497	if (!fRunCalled) {
498		PRINT(("  Run() has not been called yet\n"));
499		fTerminating = true;
500		delete this;
501	} else if (find_thread(NULL) == fThread) {
502		PRINT(("  We are the looper thread\n"));
503		fTerminating = true;
504		delete this;
505		exit_thread(0);
506	} else {
507		PRINT(("  Run() has already been called and we are not the looper thread\n"));
508
509		// As with sem in _Lock(), we need to cache this here in case the looper
510		// disappears before we get to the wait_for_thread() below
511		thread_id thread = Thread();
512
513		// We need to unlock here. Otherwise the looper thread can't
514		// dispatch the _QUIT_ message we're going to post.
515		UnlockFully();
516
517		// As per the BeBook, if we've been called by a thread other than
518		// our own, the rest of the message queue has to get processed.  So
519		// we put this in the queue, and when it shows up, we'll call Quit()
520		// from our own thread.
521		// QuitRequested() will not be called in this case.
522		PostMessage(_QUIT_);
523
524		// We have to wait until the looper is done processing any remaining
525		// messages.
526		status_t status;
527		while (wait_for_thread(thread, &status) == B_INTERRUPTED)
528			;
529	}
530
531	PRINT(("BLooper::Quit() done\n"));
532}
533
534
535bool
536BLooper::QuitRequested()
537{
538	return true;
539}
540
541
542bool
543BLooper::Lock()
544{
545	// Defer to global _Lock(); see notes there
546	return _Lock(this, -1, B_INFINITE_TIMEOUT) == B_OK;
547}
548
549
550void
551BLooper::Unlock()
552{
553PRINT(("BLooper::Unlock()\n"));
554	//	Make sure we're locked to begin with
555	AssertLocked();
556
557	//	Decrement fOwnerCount
558	--fOwnerCount;
559PRINT(("  fOwnerCount now: %ld\n", fOwnerCount));
560	//	Check to see if the owner still wants a lock
561	if (fOwnerCount == 0) {
562		//	Set fOwner to invalid thread_id (< 0)
563		fOwner = -1;
564		fCachedStack = 0;
565
566#if DEBUG < 1
567		//	Decrement requested lock count (using fAtomicCount for this)
568		int32 atomicCount = atomic_add(&fAtomicCount, -1);
569PRINT(("  fAtomicCount now: %ld\n", fAtomicCount));
570
571		// Check if anyone is waiting for a lock
572		// and release if it's the case
573		if (atomicCount > 1)
574#endif
575			release_sem(fLockSem);
576	}
577PRINT(("BLooper::Unlock() done\n"));
578}
579
580
581bool
582BLooper::IsLocked() const
583{
584	if (!gLooperList.IsLooperValid(this)) {
585		// The looper is gone, so of course it's not locked
586		return false;
587	}
588
589	uint32 stack;
590	return ((addr_t)&stack & ~(B_PAGE_SIZE - 1)) == fCachedStack
591		|| find_thread(NULL) == fOwner;
592}
593
594
595status_t
596BLooper::LockWithTimeout(bigtime_t timeout)
597{
598	return _Lock(this, -1, timeout);
599}
600
601
602thread_id
603BLooper::Thread() const
604{
605	return fThread;
606}
607
608
609team_id
610BLooper::Team() const
611{
612	return BPrivate::current_team();
613}
614
615
616BLooper*
617BLooper::LooperForThread(thread_id thread)
618{
619	return gLooperList.LooperForThread(thread);
620}
621
622
623thread_id
624BLooper::LockingThread() const
625{
626	return fOwner;
627}
628
629
630int32
631BLooper::CountLocks() const
632{
633	return fOwnerCount;
634}
635
636
637int32
638BLooper::CountLockRequests() const
639{
640	return fAtomicCount;
641}
642
643
644sem_id
645BLooper::Sem() const
646{
647	return fLockSem;
648}
649
650
651BHandler*
652BLooper::ResolveSpecifier(BMessage* message, int32 index, BMessage* specifier,
653	int32 what, const char* property)
654{
655/**
656	@note	When I was first dumping the results of GetSupportedSuites() from
657			various classes, the use of the extra_data field was quite
658			mysterious to me.  Then I dumped BApplication and compared the
659			result against the BeBook's docs for scripting BApplication.  A
660			bunch of it isn't documented, but what is tipped me to the idea
661			that the extra_data is being used as a quick and dirty way to tell
662			what scripting "command" has been sent, e.g., for easy use in a
663			switch statement.  Would certainly be a lot faster than a bunch of
664			string comparisons -- which wouldn't tell the whole story anyway,
665			because of the same name being used for multiple properties.
666 */
667 	BPropertyInfo propertyInfo(sLooperPropInfo);
668	uint32 data;
669	status_t err = B_OK;
670	const char* errMsg = "";
671	if (propertyInfo.FindMatch(message, index, specifier, what, property, &data)
672			>= 0) {
673		switch (data) {
674			case BLOOPER_PROCESS_INTERNALLY:
675				return this;
676
677			case BLOOPER_HANDLER_BY_INDEX:
678			{
679				int32 index = specifier->FindInt32("index");
680				if (what == B_REVERSE_INDEX_SPECIFIER) {
681					index = CountHandlers() - index;
682				}
683				BHandler* target = HandlerAt(index);
684				if (target) {
685					// Specifier has been fully handled
686					message->PopSpecifier();
687					return target;
688				} else {
689					err = B_BAD_INDEX;
690					errMsg = "handler index out of range";
691				}
692				break;
693			}
694
695			default:
696				err = B_BAD_SCRIPT_SYNTAX;
697				errMsg = "Didn't understand the specifier(s)";
698		}
699	} else {
700		return BHandler::ResolveSpecifier(message, index, specifier, what,
701			property);
702	}
703
704	BMessage reply(B_MESSAGE_NOT_UNDERSTOOD);
705	reply.AddInt32("error", err);
706	reply.AddString("message", errMsg);
707	message->SendReply(&reply);
708
709	return NULL;
710}
711
712
713status_t
714BLooper::GetSupportedSuites(BMessage* data)
715{
716	if (data == NULL)
717		return B_BAD_VALUE;
718
719	status_t status = data->AddString("suites", "suite/vnd.Be-looper");
720	if (status == B_OK) {
721		BPropertyInfo PropertyInfo(sLooperPropInfo);
722		status = data->AddFlat("messages", &PropertyInfo);
723		if (status == B_OK)
724			status = BHandler::GetSupportedSuites(data);
725	}
726
727	return status;
728}
729
730
731void
732BLooper::AddCommonFilter(BMessageFilter* filter)
733{
734	if (filter == NULL)
735		return;
736
737	AssertLocked();
738
739	if (filter->Looper()) {
740		debugger("A MessageFilter can only be used once.");
741		return;
742	}
743
744	if (fCommonFilters == NULL)
745		fCommonFilters = new BList(FILTER_LIST_BLOCK_SIZE);
746
747	filter->SetLooper(this);
748	fCommonFilters->AddItem(filter);
749}
750
751
752bool
753BLooper::RemoveCommonFilter(BMessageFilter* filter)
754{
755	AssertLocked();
756
757	if (fCommonFilters == NULL)
758		return false;
759
760	bool result = fCommonFilters->RemoveItem(filter);
761	if (result)
762		filter->SetLooper(NULL);
763
764	return result;
765}
766
767
768void
769BLooper::SetCommonFilterList(BList* filters)
770{
771	AssertLocked();
772
773	BMessageFilter* filter;
774	if (filters) {
775		// Check for ownership issues - a filter can only have one owner
776		for (int32 i = 0; i < filters->CountItems(); ++i) {
777			filter = (BMessageFilter*)filters->ItemAt(i);
778			if (filter->Looper()) {
779				debugger("A MessageFilter can only be used once.");
780				return;
781			}
782		}
783	}
784
785	if (fCommonFilters) {
786		for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) {
787			delete (BMessageFilter*)fCommonFilters->ItemAt(i);
788		}
789
790		delete fCommonFilters;
791		fCommonFilters = NULL;
792	}
793
794	// Per the BeBook, we take ownership of the list
795	fCommonFilters = filters;
796	if (fCommonFilters) {
797		for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) {
798			filter = (BMessageFilter*)fCommonFilters->ItemAt(i);
799			filter->SetLooper(this);
800		}
801	}
802}
803
804
805BList*
806BLooper::CommonFilterList() const
807{
808	return fCommonFilters;
809}
810
811
812status_t
813BLooper::Perform(perform_code d, void* arg)
814{
815	// This is sort of what we're doing for this function everywhere
816	return BHandler::Perform(d, arg);
817}
818
819
820BMessage*
821BLooper::MessageFromPort(bigtime_t timeout)
822{
823	return ReadMessageFromPort(timeout);
824}
825
826
827void BLooper::_ReservedLooper1() {}
828void BLooper::_ReservedLooper2() {}
829void BLooper::_ReservedLooper3() {}
830void BLooper::_ReservedLooper4() {}
831void BLooper::_ReservedLooper5() {}
832void BLooper::_ReservedLooper6() {}
833
834
835#ifdef _BEOS_R5_COMPATIBLE_
836BLooper::BLooper(const BLooper& other)
837{
838	// Copy construction not allowed
839}
840
841
842BLooper&
843BLooper::operator=(const BLooper& other)
844{
845	// Looper copying not allowed
846	return *this;
847}
848#endif
849
850
851BLooper::BLooper(int32 priority, port_id port, const char* name)
852{
853	_InitData(name, priority, port, B_LOOPER_PORT_DEFAULT_CAPACITY);
854}
855
856
857status_t
858BLooper::_PostMessage(BMessage* msg, BHandler* handler, BHandler* replyTo)
859{
860	status_t status;
861	BMessenger messenger(handler, this, &status);
862	if (status == B_OK)
863		return messenger.SendMessage(msg, replyTo, 0);
864
865	return status;
866}
867
868
869/*!
870	Locks a looper either by port or using a direct pointer to the looper.
871
872	\param looper looper to lock, if not NULL
873	\param port port to identify the looper in case \a looper is NULL
874	\param timeout timeout for acquiring the lock
875*/
876status_t
877BLooper::_Lock(BLooper* looper, port_id port, bigtime_t timeout)
878{
879	PRINT(("BLooper::_Lock(%p, %lx)\n", looper, port));
880
881	//	Check params (loop, port)
882	if (looper == NULL && port < 0) {
883		PRINT(("BLooper::_Lock() done 1\n"));
884		return B_BAD_VALUE;
885	}
886
887	thread_id currentThread = find_thread(NULL);
888	int32 oldCount;
889	sem_id sem;
890
891	{
892		AutoLocker<BLooperList> ListLock(gLooperList);
893		if (!ListLock.IsLocked())
894			return B_BAD_VALUE;
895
896		// Look up looper by port_id, if necessary
897		if (looper == NULL) {
898			looper = gLooperList.LooperForPort(port);
899			if (looper == NULL) {
900				PRINT(("BLooper::_Lock() done 3\n"));
901				return B_BAD_VALUE;
902			}
903		} else if (!gLooperList.IsLooperValid(looper)) {
904			// Check looper validity
905			PRINT(("BLooper::_Lock() done 4\n"));
906			return B_BAD_VALUE;
907		}
908
909		// Check for nested lock attempt
910		if (currentThread == looper->fOwner) {
911			++looper->fOwnerCount;
912			PRINT(("BLooper::_Lock() done 5: fOwnerCount: %ld\n", loop->fOwnerCount));
913			return B_OK;
914		}
915
916		// Cache the semaphore, so that we can safely access it after having
917		// unlocked the looper list
918		sem = looper->fLockSem;
919		if (sem < 0) {
920			PRINT(("BLooper::_Lock() done 6\n"));
921			return B_BAD_VALUE;
922		}
923
924		// Bump the requested lock count (using fAtomicCount for this)
925		oldCount = atomic_add(&looper->fAtomicCount, 1);
926	}
927
928	return _LockComplete(looper, oldCount, currentThread, sem, timeout);
929}
930
931
932status_t
933BLooper::_LockComplete(BLooper* looper, int32 oldCount, thread_id thread,
934	sem_id sem, bigtime_t timeout)
935{
936	status_t err = B_OK;
937
938#if DEBUG < 1
939	if (oldCount > 0) {
940#endif
941		do {
942			err = acquire_sem_etc(sem, 1, B_RELATIVE_TIMEOUT, timeout);
943		} while (err == B_INTERRUPTED);
944#if DEBUG < 1
945	}
946#endif
947	if (err == B_OK) {
948		looper->fOwner = thread;
949		looper->fCachedStack = (addr_t)&err & ~(B_PAGE_SIZE - 1);
950		looper->fOwnerCount = 1;
951	}
952
953	PRINT(("BLooper::_LockComplete() done: %lx\n", err));
954	return err;
955}
956
957
958void
959BLooper::_InitData(const char* name, int32 priority, port_id port,
960	int32 portCapacity)
961{
962	fOwner = B_ERROR;
963	fCachedStack = 0;
964	fRunCalled = false;
965	fDirectTarget = new (std::nothrow) BPrivate::BDirectMessageTarget();
966	fCommonFilters = NULL;
967	fLastMessage = NULL;
968	fPreferred = NULL;
969	fThread = B_ERROR;
970	fTerminating = false;
971	fOwnsPort = true;
972	fMsgPort = -1;
973	fAtomicCount = 0;
974
975	if (name == NULL)
976		name = "anonymous looper";
977
978#if DEBUG
979	fLockSem = create_sem(1, name);
980#else
981	fLockSem = create_sem(0, name);
982#endif
983
984	if (portCapacity <= 0)
985		portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY;
986
987	if (port >= 0)
988		fMsgPort = port;
989	else
990		fMsgPort = create_port(portCapacity, name);
991
992	fInitPriority = priority;
993
994	gLooperList.AddLooper(this);
995		// this will also lock this looper
996
997	AddHandler(this);
998}
999
1000
1001void
1002BLooper::AddMessage(BMessage* message)
1003{
1004	_AddMessagePriv(message);
1005
1006	// wakeup looper when being called from other threads if necessary
1007	if (find_thread(NULL) != Thread()
1008		&& fDirectTarget->Queue()->IsNextMessage(message)
1009		&& port_count(fMsgPort) <= 0) {
1010		// there is currently no message waiting, and we need to wakeup the
1011		// looper
1012		write_port_etc(fMsgPort, 0, NULL, 0, B_RELATIVE_TIMEOUT, 0);
1013	}
1014}
1015
1016
1017void
1018BLooper::_AddMessagePriv(BMessage* message)
1019{
1020	// ToDo: if no target token is specified, set to preferred handler
1021	// Others may want to peek into our message queue, so the preferred
1022	// handler must be set correctly already if no token was given
1023
1024	fDirectTarget->Queue()->AddMessage(message);
1025}
1026
1027
1028status_t
1029BLooper::_task0_(void* arg)
1030{
1031	BLooper* looper = (BLooper*)arg;
1032
1033	PRINT(("LOOPER: _task0_()\n"));
1034
1035	if (looper->Lock()) {
1036		PRINT(("LOOPER: looper locked\n"));
1037		looper->task_looper();
1038
1039		delete looper;
1040	}
1041
1042	PRINT(("LOOPER: _task0_() done: thread %ld\n", find_thread(NULL)));
1043	return B_OK;
1044}
1045
1046
1047void*
1048BLooper::ReadRawFromPort(int32* msgCode, bigtime_t timeout)
1049{
1050	PRINT(("BLooper::ReadRawFromPort()\n"));
1051	uint8* buffer = NULL;
1052	ssize_t bufferSize;
1053
1054	do {
1055		bufferSize = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, timeout);
1056	} while (bufferSize == B_INTERRUPTED);
1057
1058	if (bufferSize < B_OK) {
1059		PRINT(("BLooper::ReadRawFromPort(): failed: %ld\n", bufferSize));
1060		return NULL;
1061	}
1062
1063	if (bufferSize > 0)
1064		buffer = (uint8*)malloc(bufferSize);
1065
1066	// we don't want to wait again here, since that can only mean
1067	// that someone else has read our message and our bufferSize
1068	// is now probably wrong
1069	PRINT(("read_port()...\n"));
1070	bufferSize = read_port_etc(fMsgPort, msgCode, buffer, bufferSize,
1071		B_RELATIVE_TIMEOUT, 0);
1072
1073	if (bufferSize < B_OK) {
1074		free(buffer);
1075		return NULL;
1076	}
1077
1078	PRINT(("BLooper::ReadRawFromPort() read: %.4s, %p (%d bytes)\n",
1079		(char*)msgCode, buffer, bufferSize));
1080
1081	return buffer;
1082}
1083
1084
1085BMessage*
1086BLooper::ReadMessageFromPort(bigtime_t timeout)
1087{
1088	PRINT(("BLooper::ReadMessageFromPort()\n"));
1089	int32 msgCode;
1090	BMessage* message = NULL;
1091
1092	void* buffer = ReadRawFromPort(&msgCode, timeout);
1093	if (buffer == NULL)
1094		return NULL;
1095
1096	message = ConvertToMessage(buffer, msgCode);
1097	free(buffer);
1098
1099	PRINT(("BLooper::ReadMessageFromPort() done: %p\n", message));
1100	return message;
1101}
1102
1103
1104BMessage*
1105BLooper::ConvertToMessage(void* buffer, int32 code)
1106{
1107	PRINT(("BLooper::ConvertToMessage()\n"));
1108	if (buffer == NULL)
1109		return NULL;
1110
1111	BMessage* message = new BMessage();
1112	if (message->Unflatten((const char*)buffer) != B_OK) {
1113		PRINT(("BLooper::ConvertToMessage(): unflattening message failed\n"));
1114		delete message;
1115		message = NULL;
1116	}
1117
1118	PRINT(("BLooper::ConvertToMessage(): %p\n", message));
1119	return message;
1120}
1121
1122
1123void
1124BLooper::task_looper()
1125{
1126	PRINT(("BLooper::task_looper()\n"));
1127	// Check that looper is locked (should be)
1128	AssertLocked();
1129	// Unlock the looper
1130	Unlock();
1131
1132	if (IsLocked())
1133		debugger("looper must not be locked!");
1134
1135	// loop: As long as we are not terminating.
1136	while (!fTerminating) {
1137		PRINT(("LOOPER: outer loop\n"));
1138		// TODO: timeout determination algo
1139		//	Read from message port (how do we determine what the timeout is?)
1140		PRINT(("LOOPER: MessageFromPort()...\n"));
1141		BMessage* msg = MessageFromPort();
1142		PRINT(("LOOPER: ...done\n"));
1143
1144		//	Did we get a message?
1145		if (msg)
1146			_AddMessagePriv(msg);
1147
1148		// Get message count from port
1149		int32 msgCount = port_count(fMsgPort);
1150		for (int32 i = 0; i < msgCount; ++i) {
1151			// Read 'count' messages from port (so we will not block)
1152			// We use zero as our timeout since we know there is stuff there
1153			msg = MessageFromPort(0);
1154			if (msg)
1155				_AddMessagePriv(msg);
1156		}
1157
1158		// loop: As long as there are messages in the queue and the port is
1159		//		 empty... and we are not terminating, of course.
1160		bool dispatchNextMessage = true;
1161		while (!fTerminating && dispatchNextMessage) {
1162			PRINT(("LOOPER: inner loop\n"));
1163			// Get next message from queue (assign to fLastMessage after
1164			// locking)
1165			BMessage* message = fDirectTarget->Queue()->NextMessage();
1166
1167			Lock();
1168
1169			fLastMessage = message;
1170
1171			if (fLastMessage == NULL) {
1172				// No more messages: Unlock the looper and terminate the
1173				// dispatch loop.
1174				dispatchNextMessage = false;
1175			} else {
1176				PRINT(("LOOPER: fLastMessage: 0x%lx: %.4s\n", fLastMessage->what,
1177					(char*)&fLastMessage->what));
1178				DBG(fLastMessage->PrintToStream());
1179
1180				// Get the target handler
1181				BHandler* handler = NULL;
1182				BMessage::Private messagePrivate(fLastMessage);
1183				bool usePreferred = messagePrivate.UsePreferredTarget();
1184
1185				if (usePreferred) {
1186					PRINT(("LOOPER: use preferred target\n"));
1187					handler = fPreferred;
1188					if (handler == NULL)
1189						handler = this;
1190				} else {
1191					gDefaultTokens.GetToken(messagePrivate.GetTarget(),
1192						B_HANDLER_TOKEN, (void**)&handler);
1193
1194					// if this handler doesn't belong to us, we drop the message
1195					if (handler != NULL && handler->Looper() != this)
1196						handler = NULL;
1197
1198					PRINT(("LOOPER: use %ld, handler: %p, this: %p\n",
1199						messagePrivate.GetTarget(), handler, this));
1200				}
1201
1202				// Is this a scripting message? (BMessage::HasSpecifiers())
1203				if (handler != NULL && fLastMessage->HasSpecifiers()) {
1204					int32 index = 0;
1205					// Make sure the current specifier is kosher
1206					if (fLastMessage->GetCurrentSpecifier(&index) == B_OK)
1207						handler = resolve_specifier(handler, fLastMessage);
1208				}
1209
1210				if (handler) {
1211					// Do filtering
1212					handler = _TopLevelFilter(fLastMessage, handler);
1213					PRINT(("LOOPER: _TopLevelFilter(): %p\n", handler));
1214					if (handler && handler->Looper() == this)
1215						DispatchMessage(fLastMessage, handler);
1216				}
1217			}
1218
1219			if (fTerminating) {
1220				// we leave the looper locked when we quit
1221				return;
1222			}
1223
1224			message = fLastMessage;
1225			fLastMessage = NULL;
1226
1227			// Unlock the looper
1228			Unlock();
1229
1230			// Delete the current message (fLastMessage)
1231			if (message != NULL)
1232				delete message;
1233
1234			// Are any messages on the port?
1235			if (port_count(fMsgPort) > 0) {
1236				// Do outer loop
1237				dispatchNextMessage = false;
1238			}
1239		}
1240	}
1241	PRINT(("BLooper::task_looper() done\n"));
1242}
1243
1244
1245void
1246BLooper::_QuitRequested(BMessage* message)
1247{
1248	bool isQuitting = QuitRequested();
1249	int32 thread = fThread;
1250
1251	if (isQuitting)
1252		Quit();
1253
1254	// We send a reply to the sender, when they're waiting for a reply or
1255	// if the request message contains a boolean "_shutdown_" field with value
1256	// true. In the latter case the message came from the registrar, asking
1257	// the application to shut down.
1258	bool shutdown;
1259	if (message->IsSourceWaiting()
1260		|| (message->FindBool("_shutdown_", &shutdown) == B_OK && shutdown)) {
1261		BMessage replyMsg(B_REPLY);
1262		replyMsg.AddBool("result", isQuitting);
1263		replyMsg.AddInt32("thread", thread);
1264		message->SendReply(&replyMsg);
1265	}
1266}
1267
1268
1269bool
1270BLooper::AssertLocked() const
1271{
1272	if (!IsLocked()) {
1273		debugger("looper must be locked before proceeding\n");
1274		return false;
1275	}
1276
1277	return true;
1278}
1279
1280
1281BHandler*
1282BLooper::_TopLevelFilter(BMessage* message, BHandler* target)
1283{
1284	if (message == NULL)
1285		return target;
1286
1287	// Apply the common filters first
1288	target = _ApplyFilters(CommonFilterList(), message, target);
1289	if (target) {
1290		if (target->Looper() != this) {
1291			debugger("Targeted handler does not belong to the looper.");
1292			target = NULL;
1293		} else {
1294			// Now apply handler-specific filters
1295			target = _HandlerFilter(message, target);
1296		}
1297	}
1298
1299	return target;
1300}
1301
1302
1303BHandler*
1304BLooper::_HandlerFilter(BMessage* message, BHandler* target)
1305{
1306	// Keep running filters until our handler is NULL, or until the filtering
1307	// handler returns itself as the designated handler
1308	BHandler* previousTarget = NULL;
1309	while (target != NULL && target != previousTarget) {
1310		previousTarget = target;
1311
1312		target = _ApplyFilters(target->FilterList(), message, target);
1313		if (target != NULL && target->Looper() != this) {
1314			debugger("Targeted handler does not belong to the looper.");
1315			target = NULL;
1316		}
1317	}
1318
1319	return target;
1320}
1321
1322
1323BHandler*
1324BLooper::_ApplyFilters(BList* list, BMessage* message, BHandler* target)
1325{
1326	// This is where the action is!
1327
1328	// check the parameters
1329	if (list == NULL || message == NULL)
1330		return target;
1331
1332	// for each filter in the provided list
1333	BMessageFilter* filter = NULL;
1334	for (int32 i = 0; i < list->CountItems(); ++i) {
1335		filter = (BMessageFilter*)list->ItemAt(i);
1336
1337		// check command conditions
1338		if (filter->FiltersAnyCommand() || filter->Command() == message->what) {
1339			// check delivery conditions
1340			message_delivery delivery = filter->MessageDelivery();
1341			bool dropped = message->WasDropped();
1342			if (delivery == B_ANY_DELIVERY
1343				|| (delivery == B_DROPPED_DELIVERY && dropped)
1344				|| (delivery == B_PROGRAMMED_DELIVERY && !dropped)) {
1345				// check source conditions
1346				message_source source = filter->MessageSource();
1347				bool remote = message->IsSourceRemote();
1348				if (source == B_ANY_SOURCE
1349					|| (source == B_REMOTE_SOURCE && remote)
1350					|| (source == B_LOCAL_SOURCE && !remote)) {
1351					// Are we using an "external" function?
1352					filter_result result;
1353					filter_hook filterFunction = filter->FilterFunction();
1354					if (filterFunction != NULL)
1355						result = filterFunction(message, &target, filter);
1356					else
1357						result = filter->Filter(message, &target);
1358
1359					// Is further processing allowed?
1360					if (result == B_SKIP_MESSAGE) {
1361						// no, time to bail out
1362						return NULL;
1363					}
1364				}
1365			}
1366		}
1367	}
1368
1369	return target;
1370}
1371
1372
1373void
1374BLooper::check_lock()
1375{
1376	// this is a cheap variant of AssertLocked()
1377	// it is used in situations where it's clear that the looper is valid,
1378	// i.e. from handlers
1379	uint32 stack;
1380	if (((addr_t)&stack & ~(B_PAGE_SIZE - 1)) == fCachedStack
1381		|| fOwner == find_thread(NULL)) {
1382		return;
1383	}
1384
1385	debugger("Looper must be locked.");
1386}
1387
1388
1389BHandler*
1390BLooper::resolve_specifier(BHandler* target, BMessage* message)
1391{
1392	// check params
1393	if (!target || !message)
1394		return NULL;
1395
1396	int32 index;
1397	BMessage specifier;
1398	int32 form;
1399	const char* property;
1400	status_t err = B_OK;
1401	BHandler* newTarget = target;
1402	// loop to deal with nested specifiers
1403	// (e.g., the 3rd button on the 4th view)
1404	do {
1405		err = message->GetCurrentSpecifier(&index, &specifier, &form,
1406			&property);
1407		if (err != B_OK) {
1408			BMessage reply(B_REPLY);
1409			reply.AddInt32("error", err);
1410			message->SendReply(&reply);
1411			return NULL;
1412		}
1413		// current target gets what was the new target
1414		target = newTarget;
1415		newTarget = target->ResolveSpecifier(message, index, &specifier, form,
1416			property);
1417		// check that new target is owned by looper; use IndexOf() to avoid
1418		// dereferencing newTarget (possible race condition with object
1419		// destruction by another looper)
1420		if (newTarget == NULL || IndexOf(newTarget) < 0)
1421			return NULL;
1422
1423		// get current specifier index (may change in ResolveSpecifier())
1424		err = message->GetCurrentSpecifier(&index);
1425	} while (newTarget && newTarget != target && err == B_OK && index >= 0);
1426
1427	return newTarget;
1428}
1429
1430
1431/*!	Releases all eventually nested locks. Must be called with the lock
1432	actually held.
1433*/
1434void
1435BLooper::UnlockFully()
1436{
1437	AssertLocked();
1438
1439	// Clear the owner count
1440	fOwnerCount = 0;
1441	// Nobody owns the lock now
1442	fOwner = -1;
1443	fCachedStack = 0;
1444#if DEBUG < 1
1445	// There is now one less thread holding a lock on this looper
1446	int32 atomicCount = atomic_add(&fAtomicCount, -1);
1447	if (atomicCount > 1)
1448#endif
1449		release_sem(fLockSem);
1450}
1451
1452
1453//	#pragma mark -
1454
1455
1456port_id
1457_get_looper_port_(const BLooper* looper)
1458{
1459	return looper->fMsgPort;
1460}
1461