1/*
2 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Paweł Dziepak, pdziepak@quarnos.org
7 */
8
9
10#include "Connection.h"
11
12#include <arpa/inet.h>
13#include <errno.h>
14#include <stdlib.h>
15#include <string.h>
16#include <unistd.h>
17
18#include <AutoDeleter.h>
19#include <net/dns_resolver.h>
20#include <util/kernel_cpp.h>
21#include <util/Random.h>
22
23
24#define NFS4_PORT		2049
25
26#define	LAST_FRAGMENT	0x80000000
27#define MAX_PACKET_SIZE	65535
28
29#define NFS_MIN_PORT	665
30
31
32bool
33PeerAddress::operator==(const PeerAddress& address)
34{
35	return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0
36		&& fProtocol == address.fProtocol;
37}
38
39
40bool
41PeerAddress::operator<(const PeerAddress& address)
42{
43	int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress));
44	return compare < 0 || (compare == 0 && fProtocol < address.fProtocol);
45}
46
47
48PeerAddress&
49PeerAddress::operator=(const PeerAddress& address)
50{
51	fAddress = address.fAddress;
52	fProtocol = address.fProtocol;
53	return *this;
54}
55
56
57PeerAddress::PeerAddress()
58	:
59	fProtocol(0)
60{
61	memset(&fAddress, 0, sizeof(fAddress));
62}
63
64
65PeerAddress::PeerAddress(int networkFamily)
66	:
67	fProtocol(0)
68{
69	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
70
71	memset(&fAddress, 0, sizeof(fAddress));
72
73	fAddress.ss_family = networkFamily;
74	switch (networkFamily) {
75		case AF_INET:
76			fAddress.ss_len = sizeof(sockaddr_in);
77			break;
78		case AF_INET6:
79			fAddress.ss_len = sizeof(sockaddr_in6);
80			break;
81	}
82}
83
84
85const char*
86PeerAddress::ProtocolString() const
87{
88	static const char* tcpName = "tcp";
89	static const char* udpName = "udp";
90	static const char* unknown = "";
91
92	switch (fProtocol) {
93		case IPPROTO_TCP:
94			return tcpName;
95		case IPPROTO_UDP:
96			return udpName;
97		default:
98			return unknown;
99	}
100}
101
102
103void
104PeerAddress::SetProtocol(const char* protocol)
105{
106	ASSERT(protocol != NULL);
107
108	if (strcmp(protocol, "tcp") == 0)
109		fProtocol = IPPROTO_TCP;
110	else if (strcmp(protocol, "udp") == 0)
111		fProtocol = IPPROTO_UDP;
112}
113
114
115char*
116PeerAddress::UniversalAddress() const
117{
118	char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16));
119	if (uAddr == NULL)
120		return NULL;
121
122	if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL)
123		return NULL;
124
125	char port[16];
126	sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff);
127	strcat(uAddr, port);
128
129	return uAddr;
130}
131
132
133socklen_t
134PeerAddress::AddressSize() const
135{
136	switch (Family()) {
137		case AF_INET:
138			return sizeof(sockaddr_in);
139		case AF_INET6:
140			return sizeof(sockaddr_in6);
141		default:
142			return 0;
143	}
144}
145
146
147uint16
148PeerAddress::Port() const
149{
150	uint16 port;
151
152	switch (Family()) {
153		case AF_INET:
154			port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port;
155			break;
156		case AF_INET6:
157			port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port;
158			break;
159		default:
160			port = 0;
161	}
162
163	return ntohs(port);
164}
165
166
167void
168PeerAddress::SetPort(uint16 port)
169{
170	port = htons(port);
171
172	switch (Family()) {
173		case AF_INET:
174			reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port;
175			break;
176		case AF_INET6:
177			reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port;
178			break;
179	}
180}
181
182const void*
183PeerAddress::InAddr() const
184{
185	switch (Family()) {
186		case AF_INET:
187			return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr;
188		case AF_INET6:
189			return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr;
190		default:
191			return NULL;
192	}
193}
194
195
196size_t
197PeerAddress::InAddrSize() const
198{
199	switch (Family()) {
200		case AF_INET:
201			return sizeof(in_addr);
202		case AF_INET6:
203			return sizeof(in6_addr);
204		default:
205			return 0;
206	}
207}
208
209
210AddressResolver::AddressResolver(const char* name)
211	:
212	fHead(NULL),
213	fCurrent(NULL),
214	fForcedPort(htons(NFS4_PORT)),
215	fForcedProtocol(IPPROTO_TCP)
216{
217	fStatus = ResolveAddress(name);
218}
219
220
221AddressResolver::~AddressResolver()
222{
223	freeaddrinfo(fHead);
224}
225
226
227status_t
228AddressResolver::ResolveAddress(const char* name)
229{
230	ASSERT(name != NULL);
231
232	if (fHead != NULL) {
233		freeaddrinfo(fHead);
234		fHead = NULL;
235		fCurrent = NULL;
236	}
237
238	// getaddrinfo() is very expensive when called from kernel, so we do not
239	// want to call it unless there is no other choice.
240	struct sockaddr_in addr;
241	memset(&addr, 0, sizeof(addr));
242	if (inet_aton(name, &addr.sin_addr) == 1) {
243		addr.sin_len = sizeof(addr);
244		addr.sin_family = AF_INET;
245		addr.sin_port = htons(NFS4_PORT);
246
247		memcpy(&fAddress.fAddress, &addr, sizeof(addr));
248		fAddress.fProtocol = IPPROTO_TCP;
249		return B_OK;
250	}
251
252	status_t result = getaddrinfo(name, NULL, NULL, &fHead);
253	fCurrent = fHead;
254
255	return result;
256}
257
258
259void
260AddressResolver::ForceProtocol(const char* protocol)
261{
262	ASSERT(protocol != NULL);
263
264	if (strcmp(protocol, "tcp") == 0)
265		fForcedProtocol = IPPROTO_TCP;
266	else if (strcmp(protocol, "udp") == 0)
267		fForcedProtocol = IPPROTO_UDP;
268
269	fAddress.SetProtocol(protocol);
270}
271
272
273void
274AddressResolver::ForcePort(uint16 port)
275{
276	fForcedPort = htons(port);
277	fAddress.SetPort(port);
278}
279
280
281status_t
282AddressResolver::GetNextAddress(PeerAddress* address)
283{
284	ASSERT(address != NULL);
285
286	if (fStatus != B_OK)
287		return fStatus;
288
289	if (fHead == NULL) {
290		*address = fAddress;
291		fStatus = B_NAME_NOT_FOUND;
292		return B_OK;
293	}
294
295	address->fProtocol = fForcedProtocol;
296
297	while (fCurrent != NULL) {
298		if (fCurrent->ai_family == AF_INET) {
299			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in));
300			reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port
301				= fForcedPort;
302		} else if (fCurrent->ai_family == AF_INET6) {
303			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6));
304			reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port
305				= fForcedPort;
306		} else {
307			fCurrent = fCurrent->ai_next;
308			continue;
309		}
310
311		fCurrent = fCurrent->ai_next;
312		return B_OK;
313	}
314
315	return B_NAME_NOT_FOUND;
316}
317
318
319Connection::Connection(const PeerAddress& address)
320	:
321	ConnectionBase(address)
322{
323}
324
325
326ConnectionListener::ConnectionListener(const PeerAddress& address)
327	:
328	ConnectionBase(address)
329{
330}
331
332
333ConnectionBase::ConnectionBase(const PeerAddress& address)
334	:
335	fWaitCancel(create_sem(0, NULL)),
336	fSocket(-1),
337	fPeerAddress(address)
338{
339	mutex_init(&fSocketLock, NULL);
340}
341
342
343ConnectionStream::ConnectionStream(const PeerAddress& address)
344	:
345	Connection(address)
346{
347}
348
349
350ConnectionPacket::ConnectionPacket(const PeerAddress& address)
351	:
352	Connection(address)
353{
354}
355
356
357ConnectionBase::~ConnectionBase()
358{
359	if (fSocket != -1)
360		close(fSocket);
361	mutex_destroy(&fSocketLock);
362	delete_sem(fWaitCancel);
363}
364
365
366status_t
367ConnectionBase::GetLocalAddress(PeerAddress* address)
368{
369	ASSERT(address != NULL);
370
371	address->fProtocol = fPeerAddress.fProtocol;
372
373	socklen_t addressSize = sizeof(address->fAddress);
374	return getsockname(fSocket,	(struct sockaddr*)&address->fAddress,
375		&addressSize);
376}
377
378
379status_t
380ConnectionStream::Send(const void* buffer, uint32 size)
381{
382	ASSERT(buffer != NULL);
383
384	status_t result;
385
386	uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32)));
387	if (buf == NULL)
388		return B_NO_MEMORY;
389	MemoryDeleter _(buf);
390
391	buf[0] = htonl(size | LAST_FRAGMENT);
392	memcpy(buf + 1, buffer, size);
393
394	// More than one threads may send data and ksend is allowed to send partial
395	// data. Need a lock here.
396	uint32 sent = 0;
397	mutex_lock(&fSocketLock);
398	do {
399		result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0);
400		sent += result;
401	} while (result > 0 && sent < size + sizeof(uint32));
402	mutex_unlock(&fSocketLock);
403	if (result < 0) {
404		result = errno;
405		return result;
406	} else if (result == 0)
407		return B_IO_ERROR;
408
409	return B_OK;
410}
411
412
413status_t
414ConnectionPacket::Send(const void* buffer, uint32 size)
415{
416	ASSERT(buffer != NULL);
417	ASSERT(size < 65535);
418
419	// send on DGRAM sockets is atomic. No need to lock.
420	status_t result = send(fSocket, buffer,  size, 0);
421	if (result < 0)
422		return errno;
423	return B_OK;
424}
425
426
427status_t
428ConnectionStream::Receive(void** _buffer, uint32* _size)
429{
430	ASSERT(_buffer != NULL);
431	ASSERT(_size != NULL);
432
433	status_t result;
434
435	uint32 size = 0;
436	void* buffer = NULL;
437
438	uint32 record_size;
439	bool last_one = false;
440
441	object_wait_info object[2];
442	object[0].object = fWaitCancel;
443	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
444	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
445
446	object[1].object = fSocket;
447	object[1].type = B_OBJECT_TYPE_FD;
448	object[1].events = B_EVENT_READ;
449
450	do {
451		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
452		object[1].events = B_EVENT_READ;
453
454		result = wait_for_objects(object, 2);
455		if (result < B_OK
456			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
457			free(buffer);
458			return ECONNABORTED;
459		} else if ((object[1].events & B_EVENT_READ) == 0)
460			continue;
461
462		// There is only one listener thread per connection. No need to lock.
463		uint32 received = 0;
464		do {
465			result = recv(fSocket, ((uint8*)&record_size) + received,
466							sizeof(record_size) - received, 0);
467			received += result;
468		} while (result > 0 && received < sizeof(record_size));
469		if (result < 0) {
470			result = errno;
471			free(buffer);
472			return result;
473		} else if (result == 0) {
474			free(buffer);
475			return ECONNABORTED;
476		}
477
478		record_size = ntohl(record_size);
479		ASSERT(record_size > 0);
480
481		last_one = (record_size & LAST_FRAGMENT) != 0;
482		record_size &= LAST_FRAGMENT - 1;
483
484		void* ptr = realloc(buffer, size + record_size);
485		if (ptr == NULL) {
486			free(buffer);
487			return B_NO_MEMORY;
488		} else
489			buffer = ptr;
490		MemoryDeleter bufferDeleter(buffer);
491
492		received = 0;
493		do {
494			result = recv(fSocket, (uint8*)buffer + size + received,
495							record_size - received, 0);
496			received += result;
497		} while (result > 0 && received < record_size);
498		if (result < 0)
499			return errno;
500		else if (result == 0)
501			return ECONNABORTED;
502
503		bufferDeleter.Detach();
504		size += record_size;
505	} while (!last_one);
506
507
508	*_buffer = buffer;
509	*_size = size;
510
511	return B_OK;
512}
513
514
515status_t
516ConnectionPacket::Receive(void** _buffer, uint32* _size)
517{
518	ASSERT(_buffer != NULL);
519	ASSERT(_size != NULL);
520
521	status_t result;
522	int32 size = MAX_PACKET_SIZE;
523	void* buffer = malloc(size);
524
525	if (buffer == NULL)
526		return B_NO_MEMORY;
527
528	object_wait_info object[2];
529	object[0].object = fWaitCancel;
530	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
531	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
532
533	object[1].object = fSocket;
534	object[1].type = B_OBJECT_TYPE_FD;
535	object[1].events = B_EVENT_READ;
536
537	do {
538		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
539		object[1].events = B_EVENT_READ;
540
541		result = wait_for_objects(object, 2);
542		if (result < B_OK
543			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
544			free(buffer);
545			return ECONNABORTED;
546		} else if ((object[1].events & B_EVENT_READ) == 0)
547			continue;
548		break;
549	} while (true);
550
551	// There is only one listener thread per connection. No need to lock.
552	size = recv(fSocket, buffer, size, 0);
553	if (size < 0) {
554		result = errno;
555		free(buffer);
556		return result;
557	} else if (size == 0) {
558		free(buffer);
559		return ECONNABORTED;
560	}
561
562	*_buffer = buffer;
563	*_size = size;
564
565	return B_OK;
566}
567
568
569Connection*
570Connection::CreateObject(const PeerAddress& address)
571{
572	switch (address.fProtocol) {
573		case IPPROTO_TCP:
574			return new(std::nothrow) ConnectionStream(address);
575		case IPPROTO_UDP:
576			return new(std::nothrow) ConnectionPacket(address);
577		default:
578			return NULL;
579	}
580}
581
582
583status_t
584Connection::Connect(Connection **_connection, const PeerAddress& address)
585{
586	ASSERT(_connection != NULL);
587
588	Connection* conn = CreateObject(address);
589	if (conn == NULL)
590		return B_NO_MEMORY;
591
592	status_t result;
593	if (conn->fWaitCancel < B_OK) {
594		result = conn->fWaitCancel;
595		delete conn;
596		return result;
597	}
598
599	result = conn->Connect();
600	if (result != B_OK) {
601		delete conn;
602		return result;
603	}
604
605	*_connection = conn;
606
607	return B_OK;
608}
609
610
611status_t
612Connection::SetTo(Connection **_connection, int socket,
613	const PeerAddress& address)
614{
615	ASSERT(_connection != NULL);
616	ASSERT(socket != -1);
617
618	Connection* conn = CreateObject(address);
619	if (conn == NULL)
620		return B_NO_MEMORY;
621
622	status_t result;
623	if (conn->fWaitCancel < B_OK) {
624		result = conn->fWaitCancel;
625		delete conn;
626		return result;
627	}
628
629	conn->fSocket = socket;
630
631	*_connection = conn;
632
633	return B_OK;
634}
635
636
637status_t
638Connection::Connect()
639{
640	switch (fPeerAddress.fProtocol) {
641		case IPPROTO_TCP:
642			fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP);
643			break;
644		case IPPROTO_UDP:
645			fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP);
646			break;
647		default:
648			return B_BAD_VALUE;
649	}
650	if (fSocket < 0)
651		return errno;
652
653	status_t result;
654	uint16 port, attempt = 0;
655
656	PeerAddress address(fPeerAddress.Family());
657
658	do {
659		port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT);
660		port += NFS_MIN_PORT;
661
662		if (attempt == 9)
663			port = 0;
664		attempt++;
665
666		address.SetPort(port);
667		result = bind(fSocket, (sockaddr*)&address.fAddress,
668			address.AddressSize());
669	} while (attempt <= 10 && result != B_OK);
670
671	if (attempt > 10) {
672		close(fSocket);
673		return result;
674	}
675
676	result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress,
677		fPeerAddress.AddressSize());
678	if (result != 0) {
679		result = errno;
680		close(fSocket);
681		return result;
682	}
683
684	return B_OK;
685}
686
687
688status_t
689Connection::Reconnect()
690{
691	release_sem(fWaitCancel);
692	close(fSocket);
693	acquire_sem(fWaitCancel);
694	return Connect();
695}
696
697
698void
699ConnectionBase::Disconnect()
700{
701	release_sem(fWaitCancel);
702
703	close(fSocket);
704	fSocket = -1;
705}
706
707
708status_t
709ConnectionListener::Listen(ConnectionListener** listener, int networkFamily,
710	uint16 port)
711{
712	ASSERT(listener != NULL);
713	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
714
715	int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP);
716	if (sock < 0)
717		return errno;
718
719	PeerAddress address(networkFamily);
720	address.SetPort(port);
721	address.fProtocol = IPPROTO_TCP;
722
723	status_t result = bind(sock, (sockaddr*)&address.fAddress,
724		address.AddressSize());
725	if (result != B_OK) {
726		close(sock);
727		return errno;
728	}
729
730	if (listen(sock, 5) != B_OK) {
731		close(sock);
732		return errno;
733	}
734
735	*listener = new(std::nothrow) ConnectionListener(address);
736	if (*listener == NULL) {
737		close(sock);
738		return B_NO_MEMORY;
739	}
740
741	if ((*listener)->fWaitCancel < B_OK) {
742		result = (*listener)->fWaitCancel;
743		close(sock);
744		delete *listener;
745		return result;
746	}
747
748	(*listener)->fSocket = sock;
749
750	return B_OK;
751}
752
753
754status_t
755ConnectionListener::AcceptConnection(Connection** connection)
756{
757	ASSERT(connection != NULL);
758
759	object_wait_info object[2];
760	object[0].object = fWaitCancel;
761	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
762	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
763
764	object[1].object = fSocket;
765	object[1].type = B_OBJECT_TYPE_FD;
766	object[1].events = B_EVENT_READ;
767
768	do {
769		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
770		object[1].events = B_EVENT_READ;
771
772		status_t result = wait_for_objects(object, 2);
773		if (result < B_OK
774			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
775			return ECONNABORTED;
776		} else if ((object[1].events & B_EVENT_READ) == 0)
777			continue;
778		break;
779	} while (true);
780
781	sockaddr_storage addr;
782	socklen_t length = sizeof(addr);
783	int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length);
784	if (sock < 0)
785		return errno;
786
787	PeerAddress address;
788	address.fProtocol = IPPROTO_TCP;
789	address.fAddress = addr;
790
791	status_t result = Connection::SetTo(connection, sock, address);
792	if (result != B_OK) {
793		close(sock);
794		return result;
795	}
796
797	return B_OK;
798}
799
800