Looper.cpp revision eaad52e8d72af216e4b306cd34323e30bb976130
1/*
2 * Copyright 2001-2007, Haiku.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Erik Jaesler (erik@cgsoftware.com)
7 *		DarkWyrm (bpmagic@columbus.rr.com)
8 *		Ingo Weinhold, bonefish@@users.sf.net
9 *		Axel D��rfler, axeld@pinc-software.de
10 */
11
12/*!	BLooper class spawns a thread that runs a message loop. */
13
14#include <AppMisc.h>
15#include <AutoLocker.h>
16#include <DirectMessageTarget.h>
17#include <LooperList.h>
18#include <MessagePrivate.h>
19#include <TokenSpace.h>
20
21#include <Autolock.h>
22#include <Looper.h>
23#include <Message.h>
24#include <MessageFilter.h>
25#include <MessageQueue.h>
26#include <Messenger.h>
27#include <PropertyInfo.h>
28
29#include <new>
30#include <stdio.h>
31
32
33// debugging
34//#define DBG(x) x
35#define DBG(x)	;
36#define PRINT(x)	DBG({ printf("[%6ld] ", find_thread(NULL)); printf x; })
37
38/*
39#include <Autolock.h>
40#include <Locker.h>
41static BLocker sDebugPrintLocker("BLooper debug print");
42#define PRINT(x)	DBG({						\
43	BAutolock _(sDebugPrintLocker);				\
44	debug_printf("[%6ld] ", find_thread(NULL));	\
45	debug_printf x;								\
46})
47*/
48
49
50#define FILTER_LIST_BLOCK_SIZE	5
51#define DATA_BLOCK_SIZE			5
52
53// Globals ---------------------------------------------------------------------
54using BPrivate::gDefaultTokens;
55using BPrivate::gLooperList;
56using BPrivate::BLooperList;
57
58port_id _get_looper_port_(const BLooper* looper);
59
60enum {
61	BLOOPER_PROCESS_INTERNALLY = 0,
62	BLOOPER_HANDLER_BY_INDEX
63};
64
65static property_info gLooperPropInfo[] = {
66	{
67		"Handler",
68			{},
69			{B_INDEX_SPECIFIER, B_REVERSE_INDEX_SPECIFIER},
70			NULL, BLOOPER_HANDLER_BY_INDEX,
71			{},
72			{},
73			{}
74	},
75	{
76		"Handlers",
77			{B_GET_PROPERTY},
78			{B_DIRECT_SPECIFIER},
79			NULL, BLOOPER_PROCESS_INTERNALLY,
80			{B_MESSENGER_TYPE},
81			{},
82			{}
83	},
84	{
85		"Handler",
86			{B_COUNT_PROPERTIES},
87			{B_DIRECT_SPECIFIER},
88			NULL, BLOOPER_PROCESS_INTERNALLY,
89			{B_INT32_TYPE},
90			{},
91			{}
92	},
93	{}
94};
95
96struct _loop_data_ {
97	BLooper*	looper;
98	thread_id	thread;
99};
100
101
102//	#pragma mark -
103
104
105BLooper::BLooper(const char* name, int32 priority, int32 port_capacity)
106	:	BHandler(name)
107{
108	_InitData(name, priority, port_capacity);
109}
110
111
112BLooper::~BLooper()
113{
114	if (fRunCalled && !fTerminating) {
115		debugger("You can't call delete on a BLooper object "
116				 "once it is running.");
117	}
118
119	Lock();
120
121	// In case the looper thread calls Quit() fLastMessage is not deleted.
122	if (fLastMessage) {
123		delete fLastMessage;
124		fLastMessage = NULL;
125	}
126
127	// Close the message port and read and reply to the remaining messages.
128	if (fMsgPort >= 0)
129		close_port(fMsgPort);
130
131	// Clear the queue so our call to IsMessageWaiting() below doesn't give
132	// us bogus info
133	fDirectTarget->Close();
134
135	BMessage *message;
136	while ((message = fDirectTarget->Queue()->NextMessage()) != NULL) {
137		delete message;
138			// msg will automagically post generic reply
139	}
140
141	do {
142		delete ReadMessageFromPort(0);
143			// msg will automagically post generic reply
144	} while (IsMessageWaiting());
145
146	fDirectTarget->Release();
147	delete_port(fMsgPort);
148
149	// Clean up our filters
150	SetCommonFilterList(NULL);
151
152	AutoLocker<BLooperList> ListLock(gLooperList);
153	RemoveHandler(this);
154
155	// Remove all the "child" handlers
156	BHandler *child;
157	while (CountHandlers()) {
158		child = HandlerAt(0);
159		if (child)
160			RemoveHandler(child);
161	}
162
163	Unlock();
164	gLooperList.RemoveLooper(this);
165	delete_sem(fLockSem);
166}
167
168
169BLooper::BLooper(BMessage *data)
170	: BHandler(data)
171{
172	int32 portCapacity;
173	if (data->FindInt32("_port_cap", &portCapacity) != B_OK
174		|| portCapacity < 0)
175		portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY;
176
177	_InitData(Name(), B_NORMAL_PRIORITY, portCapacity);
178}
179
180
181BArchivable *
182BLooper::Instantiate(BMessage *data)
183{
184	if (validate_instantiation(data, "BLooper"))
185		return new BLooper(data);
186
187	return NULL;
188}
189
190
191status_t
192BLooper::Archive(BMessage *data, bool deep) const
193{
194	status_t status = BHandler::Archive(data, deep);
195	if (status < B_OK)
196		return status;
197
198	port_info info;
199	status = get_port_info(fMsgPort, &info);
200	if (status == B_OK)
201		status = data->AddInt32("_port_cap", info.capacity);
202
203	// TODO: what about the thread priority?
204
205	return status;
206}
207
208
209status_t
210BLooper::PostMessage(uint32 command)
211{
212	BMessage message(command);
213	return _PostMessage(&message, this, NULL);
214}
215
216
217status_t
218BLooper::PostMessage(BMessage *message)
219{
220	return _PostMessage(message, this, NULL);
221}
222
223
224status_t
225BLooper::PostMessage(uint32 command, BHandler *handler,
226	BHandler *replyTo)
227{
228	BMessage message(command);
229	return _PostMessage(&message, handler, replyTo);
230}
231
232
233status_t
234BLooper::PostMessage(BMessage *message, BHandler *handler,
235	BHandler *replyTo)
236{
237	return _PostMessage(message, handler, replyTo);
238}
239
240
241void
242BLooper::DispatchMessage(BMessage *message, BHandler *handler)
243{
244	PRINT(("BLooper::DispatchMessage(%.4s)\n", (char*)&message->what));
245
246	switch (message->what) {
247		case _QUIT_:
248			// Can't call Quit() to do this, because of the slight chance
249			// another thread with have us locked between now and then.
250			fTerminating = true;
251
252			// After returning from DispatchMessage(), the looper will be
253			// deleted in _task0_()
254			break;
255
256		case B_QUIT_REQUESTED:
257			if (handler == this) {
258				_QuitRequested(message);
259				break;
260			}
261
262			// fall through
263
264		default:
265			handler->MessageReceived(message);
266			break;
267	}
268	PRINT(("BLooper::DispatchMessage() done\n"));
269}
270
271
272void
273BLooper::MessageReceived(BMessage *msg)
274{
275	// TODO: implement scripting support
276	BHandler::MessageReceived(msg);
277}
278
279
280BMessage*
281BLooper::CurrentMessage() const
282{
283	return fLastMessage;
284}
285
286
287BMessage*
288BLooper::DetachCurrentMessage()
289{
290	BMessage* msg = fLastMessage;
291	fLastMessage = NULL;
292	return msg;
293}
294
295
296BMessageQueue*
297BLooper::MessageQueue() const
298{
299	return fDirectTarget->Queue();
300}
301
302
303bool
304BLooper::IsMessageWaiting() const
305{
306	AssertLocked();
307
308	if (!fDirectTarget->Queue()->IsEmpty())
309		return true;
310
311	int32 count;
312	do {
313		count = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, 0);
314	} while (count == B_INTERRUPTED);
315
316	return count > 0;
317}
318
319
320void
321BLooper::AddHandler(BHandler* handler)
322{
323	if (handler == NULL)
324		return;
325
326	AssertLocked();
327
328	if (handler->Looper() == NULL) {
329		fHandlers.AddItem(handler);
330		handler->SetLooper(this);
331		if (handler != this)	// avoid a cycle
332			handler->SetNextHandler(this);
333	}
334}
335
336
337bool
338BLooper::RemoveHandler(BHandler* handler)
339{
340	if (handler == NULL)
341		return false;
342
343	AssertLocked();
344
345	if (handler->Looper() == this && fHandlers.RemoveItem(handler)) {
346		if (handler == fPreferred)
347			fPreferred = NULL;
348
349		handler->SetNextHandler(NULL);
350		handler->SetLooper(NULL);
351		return true;
352	}
353
354	return false;
355}
356
357
358int32
359BLooper::CountHandlers() const
360{
361	AssertLocked();
362
363	return fHandlers.CountItems();
364}
365
366
367BHandler*
368BLooper::HandlerAt(int32 index) const
369{
370	AssertLocked();
371
372	return (BHandler*)fHandlers.ItemAt(index);
373}
374
375
376int32
377BLooper::IndexOf(BHandler* handler) const
378{
379	AssertLocked();
380
381	return fHandlers.IndexOf(handler);
382}
383
384
385BHandler*
386BLooper::PreferredHandler() const
387{
388	return fPreferred;
389}
390
391
392void
393BLooper::SetPreferredHandler(BHandler* handler)
394{
395	if (handler && handler->Looper() == this && IndexOf(handler) >= 0) {
396		fPreferred = handler;
397	} else {
398		fPreferred = NULL;
399	}
400}
401
402
403thread_id
404BLooper::Run()
405{
406	AssertLocked();
407
408	if (fRunCalled) {
409		// Not allowed to call Run() more than once
410		debugger("can't call BLooper::Run twice!");
411		return fThread;
412	}
413
414	fThread = spawn_thread(_task0_, Name(), fInitPriority, this);
415	if (fThread < B_OK)
416		return fThread;
417
418	if (fMsgPort < B_OK)
419		return fMsgPort;
420
421	fRunCalled = true;
422	Unlock();
423
424	status_t err = resume_thread(fThread);
425	if (err < B_OK)
426		return err;
427
428	return fThread;
429}
430
431
432void
433BLooper::Quit()
434{
435	PRINT(("BLooper::Quit()\n"));
436
437	if (!IsLocked()) {
438		printf("ERROR - you must Lock a looper before calling Quit(), "
439			   "team=%ld, looper=%s\n", Team(), Name() ? Name() : "unnamed");
440	}
441
442	// Try to lock
443	if (!Lock()) {
444		// We're toast already
445		return;
446	}
447
448	PRINT(("  is locked\n"));
449
450	if (!fRunCalled) {
451		PRINT(("  Run() has not been called yet\n"));
452		fTerminating = true;
453		delete this;
454	} else if (find_thread(NULL) == fThread) {
455		PRINT(("  We are the looper thread\n"));
456		fTerminating = true;
457		delete this;
458		exit_thread(0);
459	} else {
460		PRINT(("  Run() has already been called and we are not the looper thread\n"));
461
462		// As with sem in _Lock(), we need to cache this here in case the looper
463		// disappears before we get to the wait_for_thread() below
464		thread_id tid = Thread();
465
466		// We need to unlock here. Otherwise the looper thread can't
467		// dispatch the _QUIT_ message we're going to post.
468		UnlockFully();
469
470		// As per the BeBook, if we've been called by a thread other than
471		// our own, the rest of the message queue has to get processed.  So
472		// we put this in the queue, and when it shows up, we'll call Quit()
473		// from our own thread.
474		// A little testing with BMessageFilter shows _QUIT_ is being used here.
475		// I got suspicious when my test QuitRequested() wasn't getting called
476		// when Quit() was invoked from another thread.  Makes a nice proof that
477		// this is how it's handled, too.
478
479		while (PostMessage(_QUIT_) == B_WOULD_BLOCK) {
480			// There's a slight chance that PostMessage() will return B_WOULD_BLOCK
481			// because the port is full, so we'll wait a bit and re-post until
482			// we won't block.
483			snooze(25000);
484		}
485
486		// We have to wait until the looper is done processing any remaining messages.
487		int32 temp;
488		while (wait_for_thread(tid, &temp) == B_INTERRUPTED)
489			;
490	}
491
492	PRINT(("BLooper::Quit() done\n"));
493}
494
495
496bool
497BLooper::QuitRequested()
498{
499	return true;
500}
501
502
503bool
504BLooper::Lock()
505{
506	// Defer to global _Lock(); see notes there
507	return _Lock(this, -1, B_INFINITE_TIMEOUT) == B_OK;
508}
509
510
511void
512BLooper::Unlock()
513{
514PRINT(("BLooper::Unlock()\n"));
515	//	Make sure we're locked to begin with
516	AssertLocked();
517
518	//	Decrement fOwnerCount
519	--fOwnerCount;
520PRINT(("  fOwnerCount now: %ld\n", fOwnerCount));
521	//	Check to see if the owner still wants a lock
522	if (fOwnerCount == 0) {
523		//	Set fOwner to invalid thread_id (< 0)
524		fOwner = -1;
525
526#if DEBUG < 1
527		//	Decrement requested lock count (using fAtomicCount for this)
528		int32 atomicCount = atomic_add(&fAtomicCount, -1);
529PRINT(("  fAtomicCount now: %ld\n", fAtomicCount));
530
531		// Check if anyone is waiting for a lock
532		// and release if it's the case
533		if (atomicCount > 1)
534#endif
535			release_sem(fLockSem);
536	}
537PRINT(("BLooper::Unlock() done\n"));
538}
539
540
541bool
542BLooper::IsLocked() const
543{
544	if (!gLooperList.IsLooperValid(this)) {
545		// The looper is gone, so of course it's not locked
546		return false;
547	}
548
549	return find_thread(NULL) == fOwner;
550}
551
552
553status_t
554BLooper::LockWithTimeout(bigtime_t timeout)
555{
556	return _Lock(this, -1, timeout);
557}
558
559
560thread_id
561BLooper::Thread() const
562{
563	return fThread;
564}
565
566
567team_id
568BLooper::Team() const
569{
570	return BPrivate::current_team();
571}
572
573
574BLooper*
575BLooper::LooperForThread(thread_id thread)
576{
577	return gLooperList.LooperForThread(thread);
578}
579
580
581thread_id
582BLooper::LockingThread() const
583{
584	return fOwner;
585}
586
587
588int32
589BLooper::CountLocks() const
590{
591	return fOwnerCount;
592}
593
594
595int32
596BLooper::CountLockRequests() const
597{
598	return fAtomicCount;
599}
600
601
602sem_id
603BLooper::Sem() const
604{
605	return fLockSem;
606}
607
608
609BHandler*
610BLooper::ResolveSpecifier(BMessage* msg, int32 index,
611	BMessage* specifier, int32 form, const char* property)
612{
613/**
614	@note	When I was first dumping the results of GetSupportedSuites() from
615			various classes, the use of the extra_data field was quite
616			mysterious to me.  Then I dumped BApplication and compared the
617			result against the BeBook's docs for scripting BApplication.  A
618			bunch of it isn't documented, but what is tipped me to the idea
619			that the extra_data is being used as a quick and dirty way to tell
620			what scripting "command" has been sent, e.g., for easy use in a
621			switch statement.  Would certainly be a lot faster than a bunch of
622			string comparisons -- which wouldn't tell the whole story anyway,
623			because of the same name being used for multiple properties.
624 */
625 	BPropertyInfo propertyInfo(gLooperPropInfo);
626	uint32 data;
627	status_t err = B_OK;
628	const char* errMsg = "";
629	if (propertyInfo.FindMatch(msg, index, specifier, form, property, &data) >= 0) {
630		switch (data) {
631			case BLOOPER_PROCESS_INTERNALLY:
632				return this;
633
634			case BLOOPER_HANDLER_BY_INDEX:
635			{
636				int32 index = specifier->FindInt32("index");
637				if (form == B_REVERSE_INDEX_SPECIFIER) {
638					index = CountHandlers() - index;
639				}
640				BHandler* target = HandlerAt(index);
641				if (target) {
642					// Specifier has been fully handled
643					msg->PopSpecifier();
644					return target;
645				} else {
646					err = B_BAD_INDEX;
647					errMsg = "handler index out of range";
648				}
649				break;
650			}
651
652			default:
653				err = B_BAD_SCRIPT_SYNTAX;
654				errMsg = "Didn't understand the specifier(s)";
655				break;
656		}
657	} else {
658		return BHandler::ResolveSpecifier(msg, index, specifier, form,
659			property);
660	}
661
662	BMessage reply(B_MESSAGE_NOT_UNDERSTOOD);
663	reply.AddInt32("error", err);
664	reply.AddString("message", errMsg);
665	msg->SendReply(&reply);
666
667	return NULL;
668}
669
670
671status_t
672BLooper::GetSupportedSuites(BMessage* data)
673{
674	if (data == NULL)
675		return B_BAD_VALUE;
676
677	status_t status = data->AddString("suites", "suite/vnd.Be-looper");
678	if (status == B_OK) {
679		BPropertyInfo PropertyInfo(gLooperPropInfo);
680		status = data->AddFlat("messages", &PropertyInfo);
681		if (status == B_OK)
682			status = BHandler::GetSupportedSuites(data);
683	}
684
685	return status;
686}
687
688
689void
690BLooper::AddCommonFilter(BMessageFilter* filter)
691{
692	if (!filter)
693		return;
694
695	AssertLocked();
696
697	if (filter->Looper()) {
698		debugger("A MessageFilter can only be used once.");
699		return;
700	}
701
702	if (!fCommonFilters)
703		fCommonFilters = new BList(FILTER_LIST_BLOCK_SIZE);
704
705	filter->SetLooper(this);
706	fCommonFilters->AddItem(filter);
707}
708
709
710bool
711BLooper::RemoveCommonFilter(BMessageFilter* filter)
712{
713	AssertLocked();
714
715	if (!fCommonFilters)
716		return false;
717
718	bool result = fCommonFilters->RemoveItem(filter);
719	if (result)
720		filter->SetLooper(NULL);
721
722	return result;
723}
724
725
726void
727BLooper::SetCommonFilterList(BList* filters)
728{
729	// We have a somewhat serious problem here.  It is entirely possible in R5
730	// to assign a given list of filters to *two* BLoopers simultaneously.  This
731	// becomes problematic when the loopers are destroyed: the last looper
732	// destroyed will have a problem when it tries to delete a filter list that
733	// has already been deleted.  In R5, this results in a general protection
734	// fault.  We fix this by checking the filter list for ownership issues.
735
736	AssertLocked();
737
738	BMessageFilter* filter;
739	if (filters) {
740		// Check for ownership issues
741		for (int32 i = 0; i < filters->CountItems(); ++i) {
742			filter = (BMessageFilter*)filters->ItemAt(i);
743			if (filter->Looper()) {
744				debugger("A MessageFilter can only be used once.");
745				return;
746			}
747		}
748	}
749
750	if (fCommonFilters) {
751		for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) {
752			delete (BMessageFilter*)fCommonFilters->ItemAt(i);
753		}
754
755		delete fCommonFilters;
756		fCommonFilters = NULL;
757	}
758
759	// Per the BeBook, we take ownership of the list
760	fCommonFilters = filters;
761	if (fCommonFilters) {
762		for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) {
763			filter = (BMessageFilter*)fCommonFilters->ItemAt(i);
764			filter->SetLooper(this);
765		}
766	}
767}
768
769
770BList*
771BLooper::CommonFilterList() const
772{
773	return fCommonFilters;
774}
775
776
777status_t
778BLooper::Perform(perform_code d, void* arg)
779{
780	// This is sort of what we're doing for this function everywhere
781	return BHandler::Perform(d, arg);
782}
783
784
785BMessage*
786BLooper::MessageFromPort(bigtime_t timeout)
787{
788	return ReadMessageFromPort(timeout);
789}
790
791
792void BLooper::_ReservedLooper1() {}
793void BLooper::_ReservedLooper2() {}
794void BLooper::_ReservedLooper3() {}
795void BLooper::_ReservedLooper4() {}
796void BLooper::_ReservedLooper5() {}
797void BLooper::_ReservedLooper6() {}
798
799
800BLooper::BLooper(const BLooper&)
801{
802	// Copy construction not allowed
803}
804
805
806BLooper& BLooper::operator=(const BLooper& )
807{
808	// Looper copying not allowed
809	return *this;
810}
811
812
813BLooper::BLooper(int32 priority, port_id port, const char* name)
814{
815	// This must be a legacy constructor
816	fMsgPort = port;
817	_InitData(name, priority, B_LOOPER_PORT_DEFAULT_CAPACITY);
818}
819
820
821status_t
822BLooper::_PostMessage(BMessage *msg, BHandler *handler,
823	BHandler *replyTo)
824{
825	AutoLocker<BLooperList> listLocker(gLooperList);
826	if (!listLocker.IsLocked())
827		return B_ERROR;
828
829	if (!gLooperList.IsLooperValid(this))
830		return B_BAD_VALUE;
831
832	// Does handler belong to this looper?
833	if (handler && handler->Looper() != this)
834		return B_MISMATCHED_VALUES;
835
836	status_t status;
837	BMessenger messenger(handler, this, &status);
838	if (status == B_OK)
839		status = messenger.SendMessage(msg, replyTo, 0);
840
841	return status;
842}
843
844
845/*!
846	Locks a looper either by port or using a direct pointer to the looper.
847
848	\param looper looper to lock, if not NULL
849	\param port port to identify the looper in case \a looper is NULL
850	\param timeout timeout for acquiring the lock
851*/
852status_t
853BLooper::_Lock(BLooper* looper, port_id port, bigtime_t timeout)
854{
855	PRINT(("BLooper::_Lock(%p, %lx)\n", looper, port));
856
857	//	Check params (loop, port)
858	if (looper == NULL && port < 0) {
859		PRINT(("BLooper::_Lock() done 1\n"));
860		return B_BAD_VALUE;
861	}
862
863	thread_id currentThread = find_thread(NULL);
864	int32 oldCount;
865	sem_id sem;
866
867	{
868		AutoLocker<BLooperList> ListLock(gLooperList);
869		if (!ListLock.IsLocked())
870			return B_BAD_VALUE;
871
872		// Look up looper by port_id, if necessary
873		if (looper == NULL) {
874			looper = gLooperList.LooperForPort(port);
875			if (looper == NULL) {
876				PRINT(("BLooper::_Lock() done 3\n"));
877				return B_BAD_VALUE;
878			}
879		} else if (!gLooperList.IsLooperValid(looper)) {
880			//	Check looper validity
881			PRINT(("BLooper::_Lock() done 4\n"));
882			return B_BAD_VALUE;
883		}
884
885		// Check for nested lock attempt
886		if (currentThread == looper->fOwner) {
887			++looper->fOwnerCount;
888			PRINT(("BLooper::_Lock() done 5: fOwnerCount: %ld\n", loop->fOwnerCount));
889			return B_OK;
890		}
891
892		// Cache the semaphore, so that we can safely access it after having
893		// unlocked the looper list
894		sem = looper->fLockSem;
895		if (sem < 0) {
896			PRINT(("BLooper::_Lock() done 6\n"));
897			return B_BAD_VALUE;
898		}
899
900		// Bump the requested lock count (using fAtomicCount for this)
901		oldCount = atomic_add(&looper->fAtomicCount, 1);
902	}
903
904	return _LockComplete(looper, oldCount, currentThread, sem, timeout);
905}
906
907
908status_t
909BLooper::_LockComplete(BLooper *looper, int32 oldCount, thread_id thread,
910	sem_id sem, bigtime_t timeout)
911{
912	status_t err = B_OK;
913
914#if DEBUG < 1
915	if (oldCount > 0) {
916#endif
917		do {
918			err = acquire_sem_etc(sem, 1, B_RELATIVE_TIMEOUT, timeout);
919		} while (err == B_INTERRUPTED);
920#if DEBUG < 1
921	}
922#endif
923	if (err == B_OK) {
924		looper->fOwner = thread;
925		looper->fCachedStack = (addr_t)&err & ~(B_PAGE_SIZE - 1);
926		looper->fOwnerCount = 1;
927	}
928
929	PRINT(("BLooper::_LockComplete() done: %lx\n", err));
930	return err;
931}
932
933
934void
935BLooper::_InitData(const char *name, int32 priority, int32 portCapacity)
936{
937	fOwner = B_ERROR;
938	fRunCalled = false;
939	fDirectTarget = new (std::nothrow) BPrivate::BDirectMessageTarget();
940	fCommonFilters = NULL;
941	fLastMessage = NULL;
942	fPreferred = NULL;
943	fThread = B_ERROR;
944	fTerminating = false;
945	fMsgPort = -1;
946	fAtomicCount = 0;
947
948	if (name == NULL)
949		name = "anonymous looper";
950
951#if DEBUG
952	fLockSem = create_sem(1, name);
953#else
954	fLockSem = create_sem(0, name);
955#endif
956
957	if (portCapacity <= 0)
958		portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY;
959
960	fMsgPort = create_port(portCapacity, name);
961
962	fInitPriority = priority;
963
964	gLooperList.AddLooper(this);
965	AddHandler(this);
966}
967
968
969void
970BLooper::AddMessage(BMessage* message)
971{
972	_AddMessagePriv(message);
973
974	// wakeup looper when being called from other threads if necessary
975	if (find_thread(NULL) != Thread()
976		&& fDirectTarget->Queue()->IsNextMessage(message) && port_count(fMsgPort) <= 0) {
977		// there is currently no message waiting, and we need to wakeup the looper
978		write_port_etc(fMsgPort, 0, NULL, 0, B_RELATIVE_TIMEOUT, 0);
979	}
980}
981
982
983void
984BLooper::_AddMessagePriv(BMessage* msg)
985{
986	// ToDo: if no target token is specified, set to preferred handler
987	// Others may want to peek into our message queue, so the preferred
988	// handler must be set correctly already if no token was given
989
990	fDirectTarget->Queue()->AddMessage(msg);
991}
992
993
994status_t
995BLooper::_task0_(void *arg)
996{
997	BLooper *looper = (BLooper *)arg;
998
999	PRINT(("LOOPER: _task0_()\n"));
1000
1001	if (looper->Lock()) {
1002		PRINT(("LOOPER: looper locked\n"));
1003		looper->task_looper();
1004
1005		delete looper;
1006	}
1007
1008	PRINT(("LOOPER: _task0_() done: thread %ld\n", find_thread(NULL)));
1009	return B_OK;
1010}
1011
1012
1013void *
1014BLooper::ReadRawFromPort(int32 *msgCode, bigtime_t timeout)
1015{
1016	PRINT(("BLooper::ReadRawFromPort()\n"));
1017	uint8 *buffer = NULL;
1018	ssize_t bufferSize;
1019
1020	do {
1021		bufferSize = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, timeout);
1022	} while (bufferSize == B_INTERRUPTED);
1023
1024	if (bufferSize < B_OK) {
1025		PRINT(("BLooper::ReadRawFromPort(): failed: %ld\n", bufferSize));
1026		return NULL;
1027	}
1028
1029	if (bufferSize > 0)
1030		buffer = (uint8 *)malloc(bufferSize);
1031
1032	// we don't want to wait again here, since that can only mean
1033	// that someone else has read our message and our bufferSize
1034	// is now probably wrong
1035	PRINT(("read_port()...\n"));
1036	bufferSize = read_port_etc(fMsgPort, msgCode, buffer, bufferSize,
1037		B_RELATIVE_TIMEOUT, 0);
1038
1039	if (bufferSize < B_OK) {
1040		free(buffer);
1041		return NULL;
1042	}
1043
1044	PRINT(("BLooper::ReadRawFromPort() read: %.4s, %p (%d bytes)\n", (char *)msgCode, buffer, bufferSize));
1045	return buffer;
1046}
1047
1048
1049BMessage *
1050BLooper::ReadMessageFromPort(bigtime_t timeout)
1051{
1052	PRINT(("BLooper::ReadMessageFromPort()\n"));
1053	int32 msgCode;
1054	BMessage *message = NULL;
1055
1056	void *buffer = ReadRawFromPort(&msgCode, timeout);
1057	if (!buffer)
1058		return NULL;
1059
1060	message = ConvertToMessage(buffer, msgCode);
1061	free(buffer);
1062
1063	PRINT(("BLooper::ReadMessageFromPort() done: %p\n", message));
1064	return message;
1065}
1066
1067
1068BMessage *
1069BLooper::ConvertToMessage(void *buffer, int32 code)
1070{
1071	PRINT(("BLooper::ConvertToMessage()\n"));
1072	if (!buffer)
1073		return NULL;
1074
1075	BMessage *message = new BMessage();
1076	if (message->Unflatten((const char *)buffer) != B_OK) {
1077		PRINT(("BLooper::ConvertToMessage(): unflattening message failed\n"));
1078		delete message;
1079		message = NULL;
1080	}
1081
1082	PRINT(("BLooper::ConvertToMessage(): %p\n", message));
1083	return message;
1084}
1085
1086
1087void
1088BLooper::task_looper()
1089{
1090	PRINT(("BLooper::task_looper()\n"));
1091	// Check that looper is locked (should be)
1092	AssertLocked();
1093	// Unlock the looper
1094	Unlock();
1095
1096	if (IsLocked())
1097		debugger("looper must not be locked!");
1098
1099	// loop: As long as we are not terminating.
1100	while (!fTerminating) {
1101		PRINT(("LOOPER: outer loop\n"));
1102		// TODO: timeout determination algo
1103		//	Read from message port (how do we determine what the timeout is?)
1104		PRINT(("LOOPER: MessageFromPort()...\n"));
1105		BMessage *msg = MessageFromPort();
1106		PRINT(("LOOPER: ...done\n"));
1107
1108		//	Did we get a message?
1109		if (msg)
1110			_AddMessagePriv(msg);
1111
1112		// Get message count from port
1113		int32 msgCount = port_count(fMsgPort);
1114		for (int32 i = 0; i < msgCount; ++i) {
1115			// Read 'count' messages from port (so we will not block)
1116			// We use zero as our timeout since we know there is stuff there
1117			msg = MessageFromPort(0);
1118			if (msg)
1119				_AddMessagePriv(msg);
1120		}
1121
1122		// loop: As long as there are messages in the queue and the port is
1123		//		 empty... and we are not terminating, of course.
1124		bool dispatchNextMessage = true;
1125		while (!fTerminating && dispatchNextMessage) {
1126			PRINT(("LOOPER: inner loop\n"));
1127			// Get next message from queue (assign to fLastMessage)
1128			fLastMessage = fDirectTarget->Queue()->NextMessage();
1129
1130			Lock();
1131
1132			if (!fLastMessage) {
1133				// No more messages: Unlock the looper and terminate the
1134				// dispatch loop.
1135				dispatchNextMessage = false;
1136			} else {
1137				PRINT(("LOOPER: fLastMessage: 0x%lx: %.4s\n", fLastMessage->what,
1138					(char*)&fLastMessage->what));
1139				DBG(fLastMessage->PrintToStream());
1140
1141				// Get the target handler
1142				BHandler *handler = NULL;
1143				BMessage::Private messagePrivate(fLastMessage);
1144				bool usePreferred = messagePrivate.UsePreferredTarget();
1145
1146				if (usePreferred) {
1147					PRINT(("LOOPER: use preferred target\n"));
1148					handler = fPreferred;
1149					if (handler == NULL)
1150						handler = this;
1151				} else {
1152					gDefaultTokens.GetToken(messagePrivate.GetTarget(),
1153						B_HANDLER_TOKEN, (void **)&handler);
1154
1155					// if this handler doesn't belong to us, we drop the message
1156					if (handler != NULL && handler->Looper() != this)
1157						handler = NULL;
1158
1159					PRINT(("LOOPER: use %ld, handler: %p, this: %p\n",
1160						messagePrivate.GetTarget(), handler, this));
1161				}
1162
1163				// Is this a scripting message? (BMessage::HasSpecifiers())
1164				if (handler != NULL && fLastMessage->HasSpecifiers()) {
1165					int32 index = 0;
1166					// Make sure the current specifier is kosher
1167					if (fLastMessage->GetCurrentSpecifier(&index) == B_OK)
1168						handler = resolve_specifier(handler, fLastMessage);
1169				}
1170
1171				if (handler) {
1172					// Do filtering
1173					handler = _TopLevelFilter(fLastMessage, handler);
1174					PRINT(("LOOPER: _TopLevelFilter(): %p\n", handler));
1175					if (handler && handler->Looper() == this)
1176						DispatchMessage(fLastMessage, handler);
1177				}
1178			}
1179
1180			if (fTerminating) {
1181				// we leave the looper locked when we quit
1182				return;
1183			}
1184
1185			// Unlock the looper
1186			Unlock();
1187
1188			// Delete the current message (fLastMessage)
1189			if (fLastMessage) {
1190				delete fLastMessage;
1191				fLastMessage = NULL;
1192			}
1193
1194			// Are any messages on the port?
1195			if (port_count(fMsgPort) > 0) {
1196				// Do outer loop
1197				dispatchNextMessage = false;
1198			}
1199		}
1200	}
1201	PRINT(("BLooper::task_looper() done\n"));
1202}
1203
1204
1205void
1206BLooper::_QuitRequested(BMessage *msg)
1207{
1208	bool isQuitting = QuitRequested();
1209
1210	// We send a reply to the sender, when they're waiting for a reply or
1211	// if the request message contains a boolean "_shutdown_" field with value
1212	// true. In the latter case the message came from the registrar, asking
1213	// the application to shut down.
1214	bool shutdown;
1215	if (msg->IsSourceWaiting()
1216		|| (msg->FindBool("_shutdown_", &shutdown) == B_OK && shutdown)) {
1217		BMessage replyMsg(B_REPLY);
1218		replyMsg.AddBool("result", isQuitting);
1219		replyMsg.AddInt32("thread", fThread);
1220		msg->SendReply(&replyMsg);
1221	}
1222
1223	if (isQuitting)
1224		Quit();
1225}
1226
1227
1228bool
1229BLooper::AssertLocked() const
1230{
1231	if (!IsLocked()) {
1232		debugger("looper must be locked before proceeding\n");
1233		return false;
1234	}
1235
1236	return true;
1237}
1238
1239
1240BHandler *
1241BLooper::_TopLevelFilter(BMessage* msg, BHandler* target)
1242{
1243	if (msg) {
1244		// Apply the common filters first
1245		target = _ApplyFilters(CommonFilterList(), msg, target);
1246		if (target) {
1247			if (target->Looper() != this) {
1248				debugger("Targeted handler does not belong to the looper.");
1249				target = NULL;
1250			} else {
1251				// Now apply handler-specific filters
1252				target = _HandlerFilter(msg, target);
1253			}
1254		}
1255	}
1256
1257	return target;
1258}
1259
1260
1261BHandler *
1262BLooper::_HandlerFilter(BMessage* msg, BHandler* target)
1263{
1264	// Keep running filters until our handler is NULL, or until the filtering
1265	// handler returns itself as the designated handler
1266	BHandler* previousTarget = NULL;
1267	while (target != NULL && target != previousTarget) {
1268		previousTarget = target;
1269
1270		target = _ApplyFilters(target->FilterList(), msg, target);
1271		if (target != NULL && target->Looper() != this) {
1272			debugger("Targeted handler does not belong to the looper.");
1273			target = NULL;
1274		}
1275	}
1276
1277	return target;
1278}
1279
1280
1281BHandler *
1282BLooper::_ApplyFilters(BList* list, BMessage* msg, BHandler* target)
1283{
1284	// This is where the action is!
1285	// Check the parameters
1286	if (!list || !msg)
1287		return target;
1288
1289	// For each filter in the provided list
1290	BMessageFilter* filter = NULL;
1291	for (int32 i = 0; i < list->CountItems(); ++i) {
1292		filter = (BMessageFilter*)list->ItemAt(i);
1293
1294		// Check command conditions
1295		if (filter->FiltersAnyCommand() || (filter->Command() == msg->what)) {
1296			// Check delivery conditions
1297			message_delivery delivery = filter->MessageDelivery();
1298			bool dropped = msg->WasDropped();
1299			if (delivery == B_ANY_DELIVERY
1300				|| (delivery == B_DROPPED_DELIVERY && dropped)
1301				|| (delivery == B_PROGRAMMED_DELIVERY && !dropped)) {
1302				// Check source conditions
1303				message_source source = filter->MessageSource();
1304				bool remote = msg->IsSourceRemote();
1305				if (source == B_ANY_SOURCE
1306					|| (source == B_REMOTE_SOURCE && remote)
1307					|| (source == B_LOCAL_SOURCE && !remote)) {
1308					// Are we using an "external" function?
1309					filter_result result;
1310					filter_hook func = filter->FilterFunction();
1311					if (func)
1312						result = func(msg, &target, filter);
1313					else
1314						result = filter->Filter(msg, &target);
1315
1316					// Is further processing allowed?
1317					if (result == B_SKIP_MESSAGE) {
1318						// No; time to bail out
1319						return NULL;
1320					}
1321				}
1322			}
1323		}
1324	}
1325
1326	return target;
1327}
1328
1329
1330void
1331BLooper::check_lock()
1332{
1333	// This is a cheap variant of AssertLocked()
1334	// It is used in situations where it's clear that the looper is valid,
1335	// ie. from handlers
1336	uint32 stack;
1337	if (((uint32)&stack & ~(B_PAGE_SIZE - 1)) == fCachedStack
1338		|| fOwner == find_thread(NULL))
1339		return;
1340
1341	debugger("Looper must be locked.");
1342}
1343
1344
1345BHandler *
1346BLooper::resolve_specifier(BHandler* target, BMessage* msg)
1347{
1348	// Check params
1349	if (!target || !msg)
1350		return NULL;
1351
1352	int32 index;
1353	BMessage specifier;
1354	int32 form;
1355	const char* property;
1356	status_t err = B_OK;
1357	BHandler* newTarget = target;
1358	//	Loop to deal with nested specifiers
1359	//	(e.g., the 3rd button on the 4th view)
1360	do {
1361		err = msg->GetCurrentSpecifier(&index, &specifier, &form, &property);
1362		if (err) {
1363			BMessage reply(B_REPLY);
1364			reply.AddInt32("error", err);
1365			msg->SendReply(&reply);
1366			return NULL;
1367		}
1368		//	Current target gets what was the new target
1369		target = newTarget;
1370		newTarget = target->ResolveSpecifier(msg, index, &specifier, form,
1371			property);
1372		//	Check that new target is owned by looper;
1373		//	use IndexOf() to avoid dereferencing newTarget
1374		//	(possible race condition with object destruction
1375		//	by another looper)
1376		if (!newTarget || IndexOf(newTarget) < 0)
1377			return NULL;
1378
1379		//	Get current specifier index (may change in ResolveSpecifier())
1380		err = msg->GetCurrentSpecifier(&index);
1381	} while (newTarget && newTarget != target && !err && index >= 0);
1382
1383	return newTarget;
1384}
1385
1386
1387void
1388BLooper::UnlockFully()
1389{
1390	AssertLocked();
1391
1392/**
1393	@note	What we're doing here is completely undoing the current owner's lock
1394			on the looper.  This is actually pretty easy, since the owner only
1395			has a single aquisition on the semaphore; every subsequent "lock"
1396			is just an increment to the owner count.  The whole thing is quite
1397			similar to Unlock(), except that we clear the ownership variables,
1398			rather than merely decrementing them.
1399 */
1400	// Clear the owner count
1401	fOwnerCount = 0;
1402	// Nobody owns the lock now
1403	fOwner = -1;
1404#if DEBUG < 1
1405	// There is now one less thread holding a lock on this looper
1406	int32 atomicCount = atomic_add(&fAtomicCount, -1);
1407	if (atomicCount > 1)
1408#endif
1409		release_sem(fLockSem);
1410}
1411
1412
1413//	#pragma mark -
1414
1415
1416port_id
1417_get_looper_port_(const BLooper *looper)
1418{
1419	return looper->fMsgPort;
1420}
1421
1422