From 9b3caf1f841cf451ba162bcaf83a96ba0c834a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Luka=20=C5=A0ijanec?= Date: Sun, 8 Jan 2023 23:48:11 +0100 Subject: downloading metadata over tcp works, asan reports a memory leak about nodes --- src/bencoding.c | 2 +- src/dht.c | 830 +++++++++++++++++++++++++++++++++++++------------------- src/main.c | 82 +++--- src/tcp.c | 62 +++++ 4 files changed, 669 insertions(+), 307 deletions(-) create mode 100644 src/tcp.c (limited to 'src') diff --git a/src/bencoding.c b/src/bencoding.c index e456486..a46380e 100644 --- a/src/bencoding.c +++ b/src/bencoding.c @@ -513,7 +513,7 @@ struct bencoding * bdecode_safe (const char * s, int len, enum benc opts, unsign return NULL; default: if (!(s[0] >= '0' && s[0] <= '9')) { /* not a string. not checking this would allow DoS for parsing "lx" */ - fprintf(stderr, "bencoding: unknown type %d - %c\n", s[0], s[0]); + // fprintf(stderr, "bencoding: unknown type %d - %c\n", s[0], s[0]); free(b); return NULL; } diff --git a/src/dht.c b/src/dht.c index 93bfd09..6998cfb 100644 --- a/src/dht.c +++ b/src/dht.c @@ -51,6 +51,25 @@ void bin2hex (char * h, const unsigned char * b, size_t l) { } } +/** + * converts a hexadecimal string to bytes + * + * b and h may not overlap, unless they are the same address + * + * @param b [out] array of bytes to write to with capacity l + * @param h [in] array of hex to read from with 2l hex digits + * @param l [in] length of output array + */ + +void hex2bin (unsigned char * b, const char * h, int l) { + for (int i = 0; i < l; i++) { + char ms = *h++; + char ls = *h++; + b[i] = (ms >= 'a' ? ms - 'a' + 10 : (ms >= 'A' ? ms - 'A' + 10 : ms - '0')) << 4; + b[i] |= (ls >= 'a' ? ls - 'a' + 10 : (ls >= 'A' ? ls - 'A' + 10 : ls - '0')); + } +} + /** * node representation */ @@ -163,7 +182,6 @@ void bucket_free (struct bucket * b) { } enum flags { - pex = 1 << 0, /**< peer supports BEP-0011 pex */ nometasupport = 1 << 1, /**< peer does not support BEP-0009 metadata fetch extension */ nometa = 1 << 2, /**< peer has no metadata downloaded */ goodmeta = 1 << 3, /**< peer gave us good metadata that is currently stored in the torrent */ @@ -211,7 +229,7 @@ void peer_print (FILE * s, const struct peer * p) { char remote[INET6_ADDRSTRLEN + 64]; if (!inet_ntop(p->addr.sin6_family, p->addr.sin6_addr.s6_addr, remote, INET6_ADDRSTRLEN+7)) snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno)); - fprintf(s, "%s:%d %s%s%s%s%s%s%s", remote, ntohs(p->addr.sin6_port), p->flags & pex ? " pex" : "", p->flags & nometasupport ? " nometasupport" : "", p->flags & nometa ? " nometa" : "", p->flags & goodmeta ? " goodmeta" : "", p->flags & badmeta ? " badmeta" : "", p->flags & unreachable ? " unreachable" : "", p->flags & requested ? " requested" : ""); + fprintf(s, "%s/%d %s%s%s%s%s%s", remote, ntohs(p->addr.sin6_port), p->flags & nometasupport ? " nometasupport" : "", p->flags & nometa ? " nometa" : "", p->flags & goodmeta ? " goodmeta" : "", p->flags & badmeta ? " badmeta" : "", p->flags & unreachable ? " unreachable" : "", p->flags & requested ? " requested" : ""); } /** @@ -233,10 +251,12 @@ enum state { handshake_sent = 1, handshake_received = 1 << 1, extension_sent = 1 << 2, - extension_received = 1 << 3 + extension_received = 1 << 3, + incoming = 1 << 4, + outgoing = 1 << 5 }; -#define DLTO 60 /**< timeout for torrent downloading - if this amount of seconds has passed and no new metadata piece was received, mark peer as unreach and disconnect */ +#define DLTO 5 /**< timeout for torrent downloading - if this amount of seconds has passed and no new metadata piece was received, mark peer as unreach and disconnect */ /** * torrent we are either interested in or are storing metadata for @@ -247,10 +267,14 @@ struct torrent { unsigned char ut_pex; /**< remote's byte for pex */ enum state state; /**< state of tcp connection */ int socket; /**< tcp socket for bep-0009, enough to be stored in torrent since we only connect to one peer at a time */ - void * userdata; /**< library user may use this to his will in dht->connection() and peer->disconnection() */ +#ifdef TORRENT_USERDATA + TORRENT_USERDATA +#else + void * userdata; /**< library user may use this to his will in dht->connection() and torrent->disconnection() */ +#endif void (* disconnection)(struct torrent *); /**< user provided function that is called before torrent->socket is to be closed and must be removed from the pollfds list. that's a perfect time to store metadata, provided that ->dl->flags & goodmeta */ struct peer * dl; /**< peer that's currently used for downloading metadata (only one at a time) */ - unsigned time; /**< beginning of metadata download process or time of last received metadata piece */ + time_t time; /**< beginning of metadata download process or time of last received metadata piece */ enum interested type; /**< is truthy only for manually added torrents */ unsigned char hash[20]; /**< infohash */ struct peer * peers; @@ -260,6 +284,10 @@ struct torrent { int progress; /**< number of pieces of metadata already downloaded */ int size; /**< number of bytes of metadata for info torrents */ unsigned char * metadata; /**< metadata being downloaded */ + void (* intentions)(struct torrent *); /**< user provided function that is called to update the user with my intentions - do I expect data to be read or written or both with this socket. the user should check if (t->state & incoming) and (t->state & outgoing) and set POLLIN and POLLOUT for example in his pollfds */ + unsigned char * packet; /**< packet being constructed from tcp for info torrents, 32727 bytes */ + int recvd; /**< length of received data for current packet */ + char * software; /**< can be read from disconnection() - software string client sent, may be NULL */ }; /** @@ -275,13 +303,36 @@ struct torrent * torrent_init (void) { } /** - * free a torrent object and it's nodes and peers + * kill peer connection and metadata download of torrent */ -void torrent_free (struct torrent * t) { - t->disconnection(t); +void disconnect (struct torrent * t) { + if (t->disconnection && t->dl) + t->disconnection(t); if (t->socket != -1) close(t->socket); + t->socket = -1; + t->state = 0; + t->dl = NULL; + t->size = 0; + t->progress = 0; + t->ut_metadata = 0; + t->ut_pex = 0; + free(t->packet); + t->packet = NULL; + t->recvd = 0; + free(t->software); + t->software = NULL; +} + +/** + * free a torrent object and it's nodes and peers + */ + +void torrent_free (struct torrent * t) { + if (!t) + return; + disconnect(t); struct node * n = t->nodes; while (n) { struct node * old = n; @@ -294,6 +345,7 @@ void torrent_free (struct torrent * t) { p = p->next; peer_free(old); } + free(t->software); free(t->metadata); free(t); } @@ -306,22 +358,6 @@ int torrent_compare (const void * a, const void * b) { return memcmp(((const struct torrent *) a)->hash, ((const struct torrent *) b)->hash, 20); } -/** - * kill peer connection and metadata download of torrent - */ - -void disconnect (struct torrent * t) { - t->disconnection(t); - close(t->socket); - t->socket = -1; - t->state = 0; - t->dl = NULL; - t->size = 0; - t->progress = 0; - t->ut_metadata = 0; - t->ut_pex = 0; -} - /** * prints a torrent, for debugging purposes * @@ -333,12 +369,15 @@ void torrent_print (FILE * s, const struct torrent * t) { char buf[41]; buf[40] = '\0'; bin2hex(buf, t->hash, 20); - printf("magnet:?xt=urn:btih:%s%s%s%s\n\t**** PEERS ****\n", buf, t->type & announce ? " announce" : "", t->type & peers ? " peers" : "", t->type & info ? " info" : ""); + printf("magnet:?xt=urn:btih:%s%s%s%s%s%s%s%s\n\t**** PEERS ****\n", buf, t->type & announce ? " announce" : "", t->type & peers ? " peers" : "", t->type & info ? " info" : "", t->state & handshake_sent ? " handshake_sent" : "", t->state & handshake_received ? " handshake_received" : "", t->state & extension_sent ? " extension_sent" : "", t->state & extension_received ? " extension_received" : ""); struct peer * p = t->peers; while (p) { fprintf(s, "\t"); peer_print(s, p); - fprintf(s, "\n"); + if (t->dl == p) + fprintf(s, "dl=%lds\n", seconds()-t->time); + else + fprintf(s, "\n"); p = p->next; } printf("\t**** NODES ****\n"); @@ -351,6 +390,45 @@ void torrent_print (FILE * s, const struct torrent * t) { } } +/** + * what to log to FILE * from L() + */ + +enum verbosity { + incoming_dht = 1, + outgoing_dht = 1 << 1, + std_fail = 1 << 2, + disagreement = 1 << 3, + expected = 1 << 4, + user = 1 << 5, + debug = 1 << 6, +}; + +/** + * @return static string representation of a log message type + */ + +char * verbosity2str (enum verbosity v) { + switch (v) { + case incoming_dht: + return "incoming_dht"; + case outgoing_dht: + return "outgoing_dht"; + case std_fail: + return "std_fail"; + case disagreement: + return "disagreement"; + case expected: + return "expected"; + case user: + return "user"; + case debug: + return "debug"; + default: + return "unknown"; + } +} + /** * handle for the library */ @@ -364,7 +442,11 @@ struct dht { struct bucket * buckets6; /**< IPv6 routing table */ struct torrent * torrents; /**< linked list of torrents for which we want to know peers */ void (* possible_torrent)(struct dht *, const unsigned char *); /**< a user callback function that is called whenever we come across a torrent hash from a network */ +#ifdef DHT_USERDATA + DHT_USERDATA +#else void * userdata; /**< unused, but left for the library user to set so he can refer back to his structures from callback code, such as dht->possible_torrent(d, h) */ +#endif unsigned torrents_num; /**< number of torrents. this number can rise indefinitely, so it can, and should be capped by the caller, depending on available memory */ unsigned peers_num; /**< number of peers. same notice regarding memory applies here as for torrents */ unsigned torrents_max; /**< max number of torrents that we are allowed to store */ @@ -377,7 +459,10 @@ struct dht { unsigned rxb; /**< statistics: total number of bytes in received UDP bodies */ unsigned txb; /**< statistics: total number of bytes in transmitted UDP bodies */ unsigned tcp_max; /**< max number of tcp connections to peers */ - void * (* connection)(struct dht *, struct torrent *); /**< function for maintaining file descriptors for metadata downloading. this function is called with the torrent that just attempted to connect to a peer. it's the responsibility of the library user to then add the torrent->socket fd to a poll list of fds and to add a function in torrent->disconnection that will be called whenever the fd is closed so that the library user may remove the fd from the pollfds list. */ + void (* connection)(struct dht *, struct torrent *); /**< function for maintaining file descriptors for metadata downloading. this function is called with the torrent that just attempted to connect to a peer. it's the responsibility of the library user to then add the torrent->socket fd to a poll list of fds and to add a function in torrent->disconnection that will be called whenever the fd is closed so that the library user may remove the fd from the pollfds list. */ + unsigned tt; /**< tcp transmitted bytes */ + unsigned tr; /**< tcp received bytes */ + enum verbosity verbosity; /**< what to log */ }; /** @@ -394,7 +479,7 @@ void dht_print (FILE * s, const struct dht * d) { char secret[17*2]; secret[17*2+1] = '\0'; bin2hex(secret, d->secret, 16); - fprintf(s, "id=%s socket=%d t=%u p=%u tmax=%u pmax=%u p/t-max=%u runsec=%ld rxp=%u txp=%u rxb=%u txb=%u secret=%s\n", buf, d->socket, d->torrents_num, d->peers_num, d->torrents_max, d->peers_max, d->peers_per_torrent_max, seconds()-d->time, d->rxp, d->txp, d->rxb, d->txb, secret); + fprintf(s, "id=%s socket=%d t=%u p=%u tmax=%u pmax=%u p/t-max=%u runsec=%ld rxp=%u txp=%u rxb=%u txb=%u secret=%s tt=%u tr=%u\n", buf, d->socket, d->torrents_num, d->peers_num, d->torrents_max, d->peers_max, d->peers_per_torrent_max, seconds()-d->time, d->rxp, d->txp, d->rxb, d->txb, secret, d->tt, d->tr); printf("**** TORRENTS ****\n"); struct torrent * t = d->torrents; while (t) { @@ -446,7 +531,7 @@ void possible_torrent (struct dht * d __attribute__((unused)), const unsigned ch * @param ... [in] variable arguments to go with f */ -#define L(Lo, Lf, ...) do {char Lt[512]; time_t Ln = time(NULL); strftime(Lt, 512, "%c", localtime(&Ln)); fprintf(Lo, "[%s] %s()%s:%d: " Lf "\n", Lt, __func__, __FILE__, __LINE__ __VA_OPT__(,) __VA_ARGS__);} while (0) +#define L(Lv, Ld, Lf, ...) do {char Lt[512]; time_t Ln = time(NULL); strftime(Lt, 512, "%c", localtime(&Ln)); if (Ld->verbosity & Lv) fprintf(Ld->log, "[%s] %s@%s()%s:%d: " Lf "\n", Lt, verbosity2str(Lv), __func__, __FILE__, __LINE__ __VA_OPT__(,) __VA_ARGS__);} while (0) /** * sends a bencoding object to the remote node. does not free the input bencoding. inserts a v key to the input bencoding. @@ -474,15 +559,15 @@ void sendb (struct dht * d, struct bencoding * b, const struct sockaddr_in6 * a) char json[len+1]; b2json(json, b); json[len] = '\0'; - L(d->log, "sending to %s: %s", remote, json); + L(outgoing_dht, d, "sending to %s: %s", remote, json); len = bencode_length(b); char text[len]; bencode(text, b); #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wincompatible-pointer-types" - if (sendto(d->socket, text, len, MSG_DONTWAIT | MSG_NOSIGNAL, a, sizeof *a) == -1) - L(d->log, "sendto(%s): %s", remote, strerror(errno)); - else { + if (sendto(d->socket, text, len, MSG_DONTWAIT | MSG_NOSIGNAL, a, sizeof *a) == -1) { + L(std_fail, d, "sendto(%s): %s", remote, strerror(errno)); + } else { d->txp++; d->txb += len; } @@ -582,6 +667,7 @@ struct dht * dht_init (const struct bencoding * c) { d->torrents_max = UINT_MAX; // this is hardcore - so many torrents makes LL traversal too slow d->peers_max = UINT_MAX; // there's no way there even are this many peers on the entire network at a time xDDDDDDDDDDD d->peers_per_torrent_max = UINT_MAX; + d->verbosity = std_fail | disagreement | user; errno = 0; if (!d) return NULL; @@ -641,7 +727,7 @@ struct dht * dht_init (const struct bencoding * c) { void dht_free (struct dht * d) { if (d->socket != -1) if (close(d->socket) == -1) - L(d->log, "close(d->socket) == -1"); + L(std_fail, d, "close(d->socket) == -1"); struct bucket * bucket = d->buckets; while (bucket) { struct bucket * old = bucket; @@ -689,7 +775,7 @@ struct bencoding * persistent (const struct dht * d) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wincompatible-pointer-types" if (getsockname(d->socket, &bound, &size) == -1) - L(d->log, "getsockname: %s", strerror(errno)); + L(std_fail, d, "getsockname: %s", strerror(errno)); else { struct bencoding * port = bnum(ntohs(bound.sin6_port)); port->key = bstr(strdup("port")); @@ -986,10 +1072,8 @@ int bucket_good (const struct dht * d, const struct bucket * b) { return 0; // it's really impossible that a non-malicious node would by chance get so close n = n->next; } - // L(d->log, "our bucket filled"); return 1; } - // L(d->log, "not our bucket and filled"); return 1; } else { if (node_count(b->nodes) < K) @@ -1024,7 +1108,6 @@ void replied (const struct dht * d, const unsigned char * id, const struct socka char buf[41]; buf[40] = '\0'; bin2hex(buf, id, 20); - // L(d->log, "%s replied", buf); struct bucket * b = d->buckets; if (family(addr->sin6_addr.s6_addr) == AF_INET6) b = d->buckets6; @@ -1033,11 +1116,9 @@ void replied (const struct dht * d, const unsigned char * id, const struct socka if (found) { found->last_received = seconds(); found->unanswered = 0; - // L(d->log, "found"); return; } if (bucket_good(d, b)) { - // L(d->log, "bucket good"); return; } struct node * node = node_init(); @@ -1073,6 +1154,8 @@ void replied (const struct dht * d, const unsigned char * id, const struct socka */ void potential_node (struct dht * d, const struct sockaddr_in6 * a, const unsigned char * id) { + if (!a->sin6_port) + return; // sorry, I can't send to port 0. this is a mistake or a malicious node if (!memcmp(d->id, id, 20)) // we are not a potential node of ourselves return; struct bucket * bucket = d->buckets; @@ -1186,9 +1269,14 @@ struct torrent * add_torrent (struct dht * d, struct torrent * t) { * @param t [in] torrent * @param p [in] peer to add * @param this peer in storage. could be different if old one was freed, so discard value that you passed in and replace it with this value. memory ownership is NOT transfered from storage to caller + * @return input peer address if it was added. NULL if input peer is weird and not added. another peer that matches address and was already stored. peer is freed, unless it's address returned. */ struct peer * add_peer (struct dht * d, struct torrent * t, struct peer * p) { + if (p->addr.sin6_port == htons(1)) { /* I saw some malicious (?) nodes sending port 1 */ + peer_free(p); /* no one in their right mind would use port 1 for torrents */ + return NULL; /* email me if you know which client does that */ + } struct peer ** peer = &t->peers; struct peer ** bad = NULL; struct peer ** nondl = NULL; @@ -1402,7 +1490,7 @@ void replied_torrent_node (struct torrent * t, const struct sockaddr_in6 * a, co void compact (struct dht * d, const char * value, int len, struct torrent * t) { if (len != 4+2+20 && len != 16+2+20) { - L(d->log, "received packet contained an invalid compact node"); + L(disagreement, d, "received packet contained an invalid compact node"); return; } struct sockaddr_in6 addr = { @@ -1417,38 +1505,6 @@ void compact (struct dht * d, const char * value, int len, struct torrent * t) { if (t) potential_torrent_node(d, t, &addr, value); #pragma GCC diagnostic pop - /* - if (t) { - int i = 0; - struct node ** replaceable = NULL; - struct node ** farthest = &t->nodes; - struct node ** index = &t->nodes; - while (*index) { - i++; - if (node_grade(*index) == bad) - replaceable = index; - if (!closer((*index)->id, (*farthest)->id, t->hash)) - farthest = index; - index = &(*index)->next; - } - if (i <= K) { - node->next = t->nodes; - t->nodes = node->next; - } else if (replaceable) { - node->next = (*replaceable)->next; - node_free(*replaceable); - *replaceable = node; - find_node(d, &node->addr, t->hash); - } else if (!closer(node->id, (*farthest)->id, t->hash)) { - node->next = (*farthest)->next; - node_free(*farthest); - *farthest = node; - find_node(d, &node->addr, t->hash); - } else - node_free(node); - } else - node_free(node); - */ } /** @@ -1543,7 +1599,7 @@ void announce_peer (struct dht * d, const struct sockaddr_in6 * addr, struct ben #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wincompatible-pointer-types" if (getsockname(d->socket, &bound, &size) == -1) - L(d->log, "getsockname: %s", strerror(errno)); + L(std_fail, d, "getsockname: %s", strerror(errno)); else { struct bencoding * port = bnum(ntohs(bound.sin6_port)); b->key = bstr(strdup("port")); @@ -1574,8 +1630,8 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char remote[INET_ADDRSTRLEN + INET6_ADDRSTRLEN + 64 + (v && v->type & string ? v->valuelen : 0)]; if (!inet_ntop(addr.sin6_family, &addr.sin6_addr, remote, INET6_ADDRSTRLEN+7+INET_ADDRSTRLEN)) { snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno)); - sprintf(remote+strlen(remote), "/%d", ntohs(addr.sin6_port)); } + sprintf(remote+strlen(remote), "/%d", ntohs(addr.sin6_port)); if (v && v->type & string) { node_ver = v->value; sprintf(remote+strlen(remote), "-%s", node_ver); @@ -1586,7 +1642,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char * end = b2json(out, b); *end = '\0'; assert(out+len == end); - L(d->log, "handle(%s): %s", remote, out); + L(incoming_dht, d, "handle(%s): %s", remote, out); } struct bencoding * y = bpath(b, "y"); char * msg_type = ""; @@ -1613,7 +1669,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char j[len+1]; b2json(j, b); j[len] = '\0'; - L(d->log, "%s did not send a valid id in %s", remote, j); + L(disagreement, d, "%s did not send a valid id in %s", remote, j); } switch (qtype[0]) { case 'P': // ping @@ -1744,7 +1800,6 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { break; case 'A': // announce case 'a': - raise(SIGINT); tok = bpath(b, "a/token"); #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpointer-sign" @@ -1786,7 +1841,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char json[len+1]; b2json(json, b); json[len] = '\0'; - L(d->log, "%s sent an unknown query type: %s", remote, json); + L(disagreement, d, "%s sent an unknown query type: %s", remote, json); break; } break; @@ -1828,7 +1883,6 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { } struct bencoding * nodes = bpath(b, "r/nodes"); struct bencoding * nodes6 = bpath(b, "r/nodes6"); - // L(d->log, "r/nodes length: %zu, r/nodes6 length: %zu", nodes ? nodes->valuelen : 0, nodes6 ? nodes6->valuelen : 0); if (nodes && nodes->type & string && !(nodes->valuelen % 26)) for (unsigned i = 0; i < nodes->valuelen; i += 26) compact(d, nodes->value+i, 26, torrent); @@ -1862,7 +1916,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char * msg = NULL; if (e && e->child && e->child->next && e->child->next->type & string) msg = e->child->next->value; - L(d->log, "%s sent %s%s%s", remote, errtype, msg ? ": " : "", msg ? msg : ""); + L(disagreement, d, "%s sent %s%s%s", remote, errtype, msg ? ": " : "", msg ? msg : ""); break; case '\0': // y is not present, this may just be a DNS response, break; // do not log @@ -1872,7 +1926,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { char json[len+1]; b2json(json, b); json[len] = '\0'; - L(d->log, "%s sent an unknown type: %s", remote, json); + L(disagreement, d, "%s sent an unknown type: %s", remote, json); // send_error(d, b, &addr, addrlen, 203, "unknown type"); break; } @@ -1885,22 +1939,22 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { for (int i = 0; i < ns_msg_count(handle, ns_s_an); i++) { struct __ns_rr rr; if (ns_parserr(&handle, ns_s_an, i, &rr) == -1) { - L(d->log, "ns_parserr(%s) == -1", remote); + L(std_fail, d, "ns_parserr(%s) == -1", remote); break; } if (rr.type != ns_t_srv && rr.type != ns_t_a && rr.type != ns_t_aaaa) { - L(d->log, "%s unknown RR type %d", remote, rr.type); + L(disagreement, d, "%s unknown RR type %d", remote, rr.type); continue; } char address[INET_ADDRSTRLEN+INET6_ADDRSTRLEN+7]; switch (rr.rdlength) { case 4: if (!inet_ntop(AF_INET, rr.rdata, address, INET6_ADDRSTRLEN+INET_ADDRSTRLEN+7)) { - L(d->log, "%s !inet_ntop(AF_INET)", remote); + L(std_fail, d, "%s !inet_ntop(AF_INET)", remote); break; // this can't fail!? } sprintf(address+strlen(address), ":%u", ntohs(*((uint16_t *) pkt))); - L(d->log, "%s: A %s", remote, address); + L(expected, d, "%s: A %s", remote, address); struct sockaddr_in6 a = { .sin6_family = AF_INET6, .sin6_port = *((uint16_t *) pkt) @@ -1911,9 +1965,9 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { break; case 16: if (!inet_ntop(AF_INET6, rr.rdata, address, INET6_ADDRSTRLEN+INET_ADDRSTRLEN+7)) - L(d->log, "%s !inet_ntop(AF_INET6)", remote); + L(std_fail, d, "%s !inet_ntop(AF_INET6)", remote); sprintf(address+strlen(address), ":%u", ntohs(*((uint16_t *) pkt))); - L(d->log, "%s: AAAA %s", remote, address); + L(expected, d, "%s: AAAA %s", remote, address); struct sockaddr_in6 aaaa = { .sin6_family = AF_INET6, .sin6_port = *((uint16_t *) pkt) @@ -1930,25 +1984,25 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { if (ns_name_uncompress(pkt, pkt+len, rr.rdata+3*2, target, NS_MAXDNAME) == -1) { #pragma GCC diagnostic pop break; - L(d->log, "ns_name_uncompress(%s) == -1", remote); + L(std_fail, d, "ns_name_uncompress(%s) == -1", remote); } for (int j = 0; j <= 1; j++) { struct __res_state state; if (res_ninit(&state) == -1) { - L(d->log, "res_ninit(%s, %s) == -1", remote, target); + L(std_fail, d, "res_ninit(%s, %s) == -1", remote, target); continue; } unsigned char packet[65536]; int size = res_nmkquery(&state, QUERY, target, ns_c_in, j ? ns_t_a : ns_t_aaaa, NULL, 0, NULL, packet, 65536); if (size == -1) { - L(d->log, "res_nmkquery(%s) == -1", target); + L(std_fail, d, "res_nmkquery(%s) == -1", target); goto d; } memcpy(packet, rr.rdata+4, 2); #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wincompatible-pointer-types" if (sendto(d->socket, packet, size, MSG_DONTWAIT | MSG_NOSIGNAL, &addr, sizeof addr) == -1) - L(d->log, "sendto(%s, %s)", remote, target); + L(std_fail, d, "sendto(%s, %s)", remote, target); d->txp++; d->txb += size; #pragma GCC diagnostic pop @@ -1995,8 +2049,10 @@ int refresh (struct bucket * b) { return nrgood; } +#define PERIODIC 10 + /** - * does periodic work for the library, called every 13 minutes + * does periodic work for the library * * namely, it sends UDP packets: * - searching deeper DHT storage nodes for torrents with peers and announce @@ -2005,9 +2061,18 @@ int refresh (struct bucket * b) { * for bootstrapping, an **IPv4** nameserver is required in /etc/resolv.conf. res_*() functions only provide IPv4 nameservers. example script with this algorithm is available in utils/dns.c * * this can be a lot of packets, so please keep number of torrents with peers and announce low + * + * call this: + * - every PERIODIC seconds, even if it was called for incoming packets in this time period + * + stale buckets are refreshed + * + peers are refetched for joined torrents and announcements are sent + * + calling it more often is discouraged, since every periodic call sends out UDP packets for PEX and DHT searches/announces of torrents + * + * @param d [in] the dht library handle */ void periodic (struct dht * d) { + L(debug, d, "called"); int dns = 0; if (!refresh(d->buckets)) dns++; @@ -2017,7 +2082,7 @@ void periodic (struct dht * d) { char packet[512]; struct __res_state state; if (res_ninit(&state) == -1) { - L(d->log, "res_ninit(&state) == -1"); + L(std_fail, d, "res_ninit(&state) == -1"); goto t; } #pragma GCC diagnostic push @@ -2025,7 +2090,7 @@ void periodic (struct dht * d) { int size = res_nmkquery(&state, QUERY, "_dht._udp.travnik.sijanec.eu", ns_c_in, ns_t_srv, NULL, 0, NULL, packet, 512) != -1; // for some reason always returns 1 #pragma GCC diagnostic pop if (size == -1) { - L(d->log, "res_nmkquery(SRV) == -1"); + L(std_fail, d, "res_nmkquery(SRV) == -1"); goto d; } #pragma GCC diagnostic push @@ -2033,7 +2098,7 @@ void periodic (struct dht * d) { for (int i = 0; i < state.nscount; i++) if (state.nsaddr_list[i].sin_family == AF_INET) { // leider if (sendto(d->socket, packet, 512, MSG_DONTWAIT | MSG_NOSIGNAL, &state.nsaddr_list[i], sizeof state.nsaddr_list[i]) == -1) - L(d->log, "sendto: %s", strerror(errno)); + L(std_fail, d, "sendto: %s", strerror(errno)); d->txp++; d->txb += size; } @@ -2084,7 +2149,7 @@ void periodic (struct dht * d) { } } } - if (t->type & info) { + if (t->type & info && t->peers) { if (t->dl) { if (seconds() - t->time > DLTO) { t->dl->flags |= unreachable; @@ -2108,22 +2173,29 @@ void periodic (struct dht * d) { t->socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (t->socket == -1) { t->dl = NULL; - L(d->log, "socket: %s", strerror(errno)); + L(std_fail, d, "socket: %s", strerror(errno)); break; } #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wincompatible-pointer-types" if (connect(t->socket, &p->addr, sizeof p->addr) == -1) { #pragma GCC diagnostic pop - if (errno != EAGAIN) { - L(d->log, "connect: %s", strerror(errno)); + if (errno != EINPROGRESS) { + L(std_fail, d, "connect: %s", strerror(errno)); disconnect(t); } } t->time = seconds(); t->size = 0; t->progress = 0; + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + if (t->packet) + free(t->packet); + t->packet = malloc(32727); + t->recvd = 0; d->connection(d, t); + t->intentions(t); break; } p = p->next; @@ -2136,210 +2208,418 @@ void periodic (struct dht * d) { } /** - * does work; syncs with the network, handles incoming queries. - * - * call this: - * - whenever socket can be read from (via poll/epoll/select/...) - * - every 13 minutes, even if it was called for incoming packets in this time period - * + stale buckets are refreshed - * + peers are refetched for joined torrents and announcements are sent - * + calling it more often is discouraged, since every periodic call sends out UDP packets for PEX and DHT searches/announces of torrents - * - * @param d [in] dht library handle + * handle tcp activity */ -void work (struct dht * d) { +void tcp_work (struct dht * d) { char packet[65536]; - struct sockaddr_in6 addr; - socklen_t addrlen = sizeof addr; -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wincompatible-pointer-types" - int ret = recvfrom(d->socket, packet, 65536, MSG_DONTWAIT | MSG_TRUNC, &addr, &addrlen); -#pragma GCC diagnostic pop - if (addrlen != sizeof addr) - L(d->log, "addrlen changed, not parsing packet"); - else if (ret > 65536) - L(d->log, "recvfrom()d larger packet than 65536, not parsing packet"); - else if (ret < 0) { - if (errno != EAGAIN) - L(d->log, "recvfrom(): %s (%d)", strerror(errno), errno); - else { - struct torrent * t = d->torrents; - while (t) { - if (!t->dl) + L(debug, d, "called"); + struct torrent * t = d->torrents; + while (t) { + if (!t->dl) + goto c; + if (!(t->state & ~(incoming | outgoing))) { // initial state isn't really blank - outgoing + unsigned char handshake[1+19+8+20*2] = "aBitTorrent protocol"; + handshake[0] = 19; + handshake[1+19+5] = 0x10; + memcpy(handshake+1+19+8, t->hash, 20); + memcpy(handshake+1+19+8+20, d->id, 20); + if (send(t->socket, handshake, 1+19+8+20*2, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) { + d->tt += 1+19+8+20*2; + t->state |= handshake_sent; + t->state &= ~(incoming | outgoing); + t->state |= incoming; + t->intentions(t); + } else + if (errno != EAGAIN) { + L(std_fail, d, "send(): %s", strerror(errno)); + disconnect(t); goto c; - if (!t->state) { - unsigned char handshake[1+19+8+20*2] = "aBitTorrent protocol"; - handshake[0] = 19; - handshake[1+19+5] = 0x10; - memcpy(handshake+1+19+8, t->hash, 20); - memcpy(handshake+1+19+8+20, d->id, 20); - if (send(t->socket, handshake, 1+19+8+20*2, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) - t->state |= handshake_sent; - else - if (errno != EAGAIN) { - L(d->log, "send(): %s", strerror(errno)); - disconnect(t); - goto c; - } - } - if (t->state & handshake_received) { - char e[1024] = "\0\0\0\0\0\0d1:md11:ut_metadatai1eee"; - e[3] = 2+strlen(e+6); - e[4] = 20; - if (send(t->socket, e, e[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) - t->state |= extension_sent; - else - if (errno != EAGAIN) { - L(d->log, "send(): %s", strerror(errno)); - disconnect(t); - goto c; - } } - if (t->ut_metadata && !(t->dl->flags & requested)) { - char r[1024] = "\0\0\0"; - r[4] = t->ut_metadata; - r[3] = 2+sprintf(r+6, "d8:msg_typei0e5:piecei%uee", t->progress); - if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) - t->dl->flags |= requested; - else - if (errno != EAGAIN) { - L(d->log, "send(): %s", strerror(errno)); - disconnect(t); - goto c; - } + } + if (t->state & handshake_received && !(t->state & extension_sent)) { + char e[1024] = "\0\0\0\0\0\0d1:md11:ut_metadatai1eee"; + e[3] = 2+strlen(e+6); + e[4] = 20; + if (send(t->socket, e, e[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) { + d->tt += e[3]+4; + t->state |= extension_sent; + t->state &= ~(incoming | outgoing); + t->state |= incoming; + t->intentions(t); + } else + if (errno != EAGAIN) { + L(std_fail, d, "send(): %s", strerror(errno)); + disconnect(t); + goto c; } - ret = recv(t->socket, packet, 65536, MSG_DONTWAIT | MSG_PEEK); - if (ret < 0) { - if (errno != EAGAIN) { - L(d->log, "recv(TCP, MSG_PEEK): %s (%d)", strerror(errno), errno); - disconnect(t); - } + } + if (t->ut_metadata && !(t->dl->flags & requested)) { + char r[1024] = "\0\0\0"; + r[4] = 20; + r[5] = t->ut_metadata; + r[3] = 2+sprintf(r+6, "d8:msg_typei0e5:piecei%uee", t->progress); + if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) { + d->tt += r[3]+4; + t->dl->flags |= requested; + t->state &= ~(incoming | outgoing); + t->state |= incoming; + t->intentions(t); + } else + if (errno != EAGAIN) { + L(std_fail, d, "send(): %s", strerror(errno)); + disconnect(t); goto c; } - if (!t->state && ret >= 1+19+8+20*2) { - recv(t->socket, packet, 1+19+8+20*2, MSG_DONTWAIT); - t->state |= handshake_received; - if (memcmp(packet+1+19+8, t->hash, 20)) { + } + if (!(t->state & ~(handshake_sent | incoming | outgoing))) { + int ret = recv(t->socket, packet, 1+19+8+20*2, MSG_DONTWAIT); + if (ret == 0) { + L(disagreement, d, "received 0 bytes instead of handshake? weird. waiting"); + goto c; + } + if (ret < 0) { + if (errno != EAGAIN) { + L(std_fail, d, "recv(TCP, MSG_PEEK): %s (%d)", strerror(errno), errno); + disconnect(t); + } + goto c; + } + if (ret < 1+19+8+20*2) { // c'mon, this could've arrived in one packet HACK UGLY! + L(disagreement, d, "expected handshake, received only %d bytes", ret); + t->dl->flags |= nometasupport; // cause it sent a nonsensical packet + disconnect(t); + goto c; + } + d->tr += 1+19+8+20*2; + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + t->intentions(t); + t->state |= handshake_received; + if (memcmp(packet+1+19+8, t->hash, 20)) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpointer-sign" - possible_torrent(d, packet+1+19+8); + possible_torrent(d, packet+1+19+8); #pragma GCC diagnostic pop - disconnect(t); - goto c; - } - if (!(packet[1+19+5] & 0x10)) { - t->dl->flags |= nometasupport; - disconnect(t); - goto c; - } + disconnect(t); + goto c; + } + if (!(packet[1+19+5] & 0x10)) { + L(disagreement, d, "peer did not set extension bit for dynamic extension"); + t->dl->flags |= nometasupport; + disconnect(t); + goto c; + } + } + if (t->recvd < 4) { + int ret = recv(t->socket, t->packet+t->recvd, 4-t->recvd, MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN) { + L(std_fail, d, "recv(TCP, MSG_PEEK): %s (%d)", strerror(errno), errno); + disconnect(t); } - uint32_t l = ntohl(*((uint32_t *) packet)); + goto c; + } else { + uint32_t l = ntohl(*((uint32_t *) t->packet)); + L(debug, d, "found length of a packet to be %d", l); + d->tr += ret; + t->recvd += ret; + } + } + if (t->recvd >= 4) { + if (t->packet[0]) { + char buf[41]; + buf[40] = '\0'; + bin2hex(buf, t->packet, 20); + L(disagreement, d, "peer wants to send too big of a packet %s", buf); + t->dl->flags |= nometasupport; // too big pkt, sorry + disconnect(t); + goto c; + } + uint32_t l = ntohl(*((uint32_t *) t->packet)); + int ret = recv(t->socket, t->packet+t->recvd, MIN(l, 32727-t->recvd), MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN) { + L(std_fail, d, "recv(TCP): %s (%d)", strerror(errno), errno); + disconnect(t); + } + goto c; + } else { + d->tr += ret; + t->recvd += ret; + } #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wsign-compare" - if (t->state && !packet[0] /* to prevent ultra large packets and sign bit */ && ret >= 4+l) { + if (l > (t->recvd-4)) // we don't have a full packet + goto c; #pragma GCC diagnostic pop - recv(t->socket, packet, 4+l, MSG_DONTWAIT); - if (packet[4] == 20) { + L(expected, d, "full packet: type %u subtype %u recvd=%d", t->packet[4], t->packet[5], t->recvd); + if (t->packet[4] != 20) // if it's not extension, the only supported type + goto end_packet; #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpointer-sign" - struct bencoding * e = bdecode(packet+6, l-2, replace); + struct bencoding * e = bdecode(t->packet+6, 32727-6, replace); #pragma GCC diagnostic pop - switch (packet[5]) { - case 0: + struct bencoding * v = bpath(e, "v"); + if (v && v->type & string) { + L(expected, d, "peer's software is %s", v->value); + free(t->software); + t->software = strdup(v->value); + } + switch (t->packet[5]) { + case 0: + ; + struct bencoding * ut_metadata = bpath(e, "m/ut_metadata"); + struct bencoding * ut_pex = bpath(e, "m/ut_pex"); + struct bencoding * metadata_size = bpath(e, "metadata_size"); + if (ut_pex && ut_pex->type & num) + t->ut_pex = ut_pex->intvalue; + if (ut_metadata && ut_metadata->type & num) + t->ut_metadata = ut_metadata->intvalue; + else + goto nometa; + if (metadata_size && metadata_size->type & num && metadata_size->intvalue > 0 && metadata_size->intvalue < 100000000) { + t->size = metadata_size->intvalue; + t->metadata = realloc(t->metadata, metadata_size->intvalue); + } else { + nometa: + L(disagreement, d, "peer does not support ut_metadata in dynamic extension"); + t->dl->flags = nometasupport; + if (t->ut_pex) { + // ask for pex + } + disconnect(t); + break; + } + t->state |= extension_received; + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + t->intentions(t); + break; + case 1: + ; + struct bencoding * msg_type = bpath(e, "msg_type"); + struct bencoding * piece = bpath(e, "piece"); + if (msg_type && msg_type->type & num && piece && piece->type & num) { + switch (msg_type->intvalue) { + case 0: // request, we just reject it ; - struct bencoding * ut_metadata = bpath(e, "m/ut_metadata"); - struct bencoding * ut_pex = bpath(e, "m/ut_pex"); - struct bencoding * metadata_size = bpath(e, "metadata_size"); - if (ut_pex && ut_pex->type & num) { - t->ut_pex = ut_pex->intvalue; - t->dl->flags |= pex; + char r[1024] = "\0\0\0"; + r[4] = t->ut_metadata; + r[3] = 2+sprintf(r+6, "d8:msg_typei2e5:piecei%ldee", piece->intvalue); + if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) == -1) { + if (errno != EAGAIN) { + L(std_fail, d, "send(): %s", strerror(errno)); + disconnect(t); + break; + } + } else + d->tt += r[3]+4; + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + t->intentions(t); + break; + case 1: // data + if (piece->intvalue != t->progress) + break; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpointer-sign" + unsigned char * ee = strstr(t->packet+6, "ee"); +#pragma GCC diagnostic pop + if (!ee) { + disconnect(t); + L(disagreement, d, "malformed packet"); + break; + } + int piecelen = t->recvd-((ee+2)-t->packet); + L(debug, d, "received a %d byte piece", piecelen); + if (t->size < t->progress*16384+piecelen) { + raise(SIGINT); + disconnect(t); + L(disagreement, d, "sent more packets than space available. UNREACHABLE!!!"); + break; + } + if (!t->metadata) { + disconnect(t); + L(disagreement, d, "i have nowhere to store!"); + break; } - if (ut_metadata && ut_metadata->type & num) - t->ut_metadata = ut_pex->intvalue; - if (metadata_size && metadata_size->type & num && metadata_size->intvalue > 0 && metadata_size->intvalue < 100000000) { - t->size = metadata_size->intvalue; - t->metadata = realloc(t->metadata, metadata_size->intvalue); - } else { - if (t->ut_pex) { - // ask for pex + t->packet[32726] = '\0'; + memcpy(t->metadata+t->progress++*16384, ee+2, piecelen); + t->time = seconds(); + if (t->progress*16384 >= t->size) { + SHA1_CTX sha; // TODO SHA256 for torrent v2 + uint8_t results[SHA1_DIGEST_LENGTH]; + SHA1Init(&sha); + SHA1Update(&sha, t->metadata, t->size); + SHA1Final(results, &sha); + if (memcmp(results, t->hash, 20)) { + t->dl->flags |= badmeta; + t->dl->flags &= ~goodmeta; + L(disagreement, d, "received invalid metadata!"); + disconnect(t); + break; } + t->dl->flags &= ~badmeta; + t->dl->flags |= goodmeta; + L(expected, d, "received good metadata!"); disconnect(t); - goto c; + break; } + t->dl->flags &= ~requested; + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + t->intentions(t); break; - case 1: - ; - struct bencoding * msg_type = bpath(e, "msg_type"); - struct bencoding * piece = bpath(e, "piece"); - if (msg_type && msg_type->type & num && piece && piece->type & num) { - switch (msg_type->intvalue) { - case 0: // request, we just reject it - ; - char r[1024] = "\0\0\0"; - r[4] = t->ut_metadata; - r[3] = 2+sprintf(r+6, "d8:msg_typei2e5:piecei%ldee", piece->intvalue); - if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) == -1 && errno != EAGAIN) { - L(d->log, "send(): %s", strerror(errno)); - disconnect(t); - goto c; - } - break; - case 1: // data - if (piece->intvalue != t->progress) - break; - if (t->size < (t->progress+1)*16384) { - disconnect(t); - L(d->log, "sent more packets than space available. UNREACHABLE!!!"); - goto c; - } - if (!t->metadata) { - disconnect(t); - L(d->log, "i have nowhere to store!"); - goto c; - } - packet[65535] = '\0'; - char * ee = strstr(packet+6, "ee"); - if (!ee) { - disconnect(t); - L(d->log, "malformed packet"); - goto c; - } - memcpy(t->metadata+t->progress++*16384, ee+2, 16384); - if (t->progress*16384 >= t->size) { - SHA1_CTX sha; // TODO SHA256 for torrent v2 - uint8_t results[SHA1_DIGEST_LENGTH]; - SHA1Init(&sha); - SHA1Update(&sha, t->metadata, t->size); - SHA1Final(results, &sha); - if (memcmp(results, t->hash, 20)) { - t->dl->flags |= badmeta; - t->dl->flags &= ~goodmeta; - L(d->log, "received invalid metadata!"); - disconnect(t); - goto c; - } - L(d->log, "received good metadata!"); - disconnect(t); - goto c; - } - t->dl->flags &= ~requested; + case 2: // reject + t->dl->flags |= nometa; + disconnect(t); + break; + } + } + break; + } + free_bencoding(e); + end_packet: + t->recvd = 0; + } + /* +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-compare" + if (t->state) { +#pragma GCC diagnostic pop + recv(t->socket, packet, 4+l, MSG_DONTWAIT); + d->tr += 4+l; + if (packet[4] == 20) { + t->state &= ~(incoming | outgoing); + t->state |= outgoing; + t->intentions(t); + switch (packet[5]) { + case 0: + ; + struct bencoding * ut_metadata = bpath(e, "m/ut_metadata"); + struct bencoding * ut_pex = bpath(e, "m/ut_pex"); + struct bencoding * metadata_size = bpath(e, "metadata_size"); + if (ut_pex && ut_pex->type & num) + t->ut_pex = ut_pex->intvalue; + if (ut_metadata && ut_metadata->type & num) + t->ut_metadata = ut_pex->intvalue; + else + goto nometa; + if (metadata_size && metadata_size->type & num && metadata_size->intvalue > 0 && metadata_size->intvalue < 100000000) { + t->size = metadata_size->intvalue; + t->metadata = realloc(t->metadata, metadata_size->intvalue); + } else { + nometa: + t->dl->flags = nometasupport; + if (t->ut_pex) { + // ask for pex + } + disconnect(t); + break; + } + break; + case 1: + ; + struct bencoding * msg_type = bpath(e, "msg_type"); + struct bencoding * piece = bpath(e, "piece"); + if (msg_type && msg_type->type & num && piece && piece->type & num) { + switch (msg_type->intvalue) { + case 0: // request, we just reject it + ; + char r[1024] = "\0\0\0"; + r[4] = t->ut_metadata; + r[3] = 2+sprintf(r+6, "d8:msg_typei2e5:piecei%ldee", piece->intvalue); + if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) == -1) { + if (errno != EAGAIN) { + L(std_fail, d, "send(): %s", strerror(errno)); + disconnect(t); break; - case 2: // reject - t->dl->flags |= nometa; + } + } else + d->tt += r[3]+4; + break; + case 1: // data + if (piece->intvalue != t->progress) + break; + if (t->size < (t->progress+1)*16384) { + disconnect(t); + L(disagreement, d, "sent more packets than space available. UNREACHABLE!!!"); + break; + } + if (!t->metadata) { + disconnect(t); + L(disagreement, d, "i have nowhere to store!"); + break; + } + packet[65535] = '\0'; + char * ee = strstr(packet+6, "ee"); + if (!ee) { + disconnect(t); + L(disagreement, d, "malformed packet"); + break; + } + memcpy(t->metadata+t->progress++*16384, ee+2, 16384); + t->time = seconds(); + if (t->progress*16384 >= t->size) { + SHA1_CTX sha; // TODO SHA256 for torrent v2 + uint8_t results[SHA1_DIGEST_LENGTH]; + SHA1Init(&sha); + SHA1Update(&sha, t->metadata, t->size); + SHA1Final(results, &sha); + if (memcmp(results, t->hash, 20)) { + t->dl->flags |= badmeta; + t->dl->flags &= ~goodmeta; + L(disagreement, d, "received invalid metadata!"); disconnect(t); - goto c; + break; + } + L(expected, d, "received good metadata!"); + disconnect(t); + break; } - } - break; + t->dl->flags &= ~requested; + break; + case 2: // reject + t->dl->flags |= nometa; + disconnect(t); + break; + } } - } + break; } - c: - t = t->next; } - periodic(d); - } + } */ + c: + t = t->next; + } +} + +/** + * does work; syncs with the network, handles incoming queries. + * + * call this: + * - whenever socket can be read from (via poll/epoll/select/...) + * @param d [in] dht library handle + */ + +void work (struct dht * d) { + char packet[65536]; + struct sockaddr_in6 addr; + socklen_t addrlen = sizeof addr; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wincompatible-pointer-types" + int ret = recvfrom(d->socket, packet, 65536, MSG_DONTWAIT | MSG_TRUNC, &addr, &addrlen); +#pragma GCC diagnostic pop + if (addrlen != sizeof addr) + L(disagreement, d, "addrlen changed, not parsing packet"); + else if (ret > 65536) + L(disagreement, d, "recvfrom()d larger packet than 65536, not parsing packet"); + else if (ret < 0) { + if (errno != EAGAIN) + L(std_fail, d, "recvfrom(): %s (%d)", strerror(errno), errno); + else + tcp_work(d); } else { d->rxp++; d->rxb += ret; diff --git a/src/main.c b/src/main.c index 70b4d3e..d6c9fb3 100644 --- a/src/main.c +++ b/src/main.c @@ -9,7 +9,11 @@ #include #include #define S0(x) (x ? x : "") +#define TORRENT_USERDATA struct dht * dht; +#define DHT_USERDATA struct pollfd ** pollfds; size_t * pollfds_size; nfds_t * nfds; #include +#define DISCONNECTION_MIXIN_BOTTOM if (t->dl->flags & goodmeta) t->type &= ~info; +#include int samomor = 0; int periodično = 0; int sigusr1 = 0; @@ -31,9 +35,12 @@ void found_torrent (struct dht * d __attribute__((unused)), const unsigned char char buf[41]; bin2hex(buf, h, 20); buf[40] = '\0'; - L(stdout, "magnet:?xt=urn:btih:%s", buf); + L(user, d, "magnet:?xt=urn:btih:%s", buf); } int main (int argc, char ** argv) { + size_t pollfds_size = 1; + nfds_t nfds = 1; + struct pollfd * pollfds = NULL; int r = 0; struct dht * dht = NULL; struct sigaction sigact = { @@ -55,8 +62,10 @@ int main (int argc, char ** argv) { error_at_line(6, errno, __FILE__, __LINE__, "sigaddset(SIGUSR1)"); if (sigaddset(&sigset, SIGALRM) == -1) error_at_line(7, errno, __FILE__, __LINE__, "sigaddset(SIGALRM)"); + if (sigaddset(&sigset, SIGTERM) == -1) + error_at_line(8, errno, __FILE__, __LINE__, "sigaddset(SIGTERM)"); if (sigprocmask(SIG_UNBLOCK, &sigset, NULL) == -1) - error_at_line(8, errno, __FILE__, __LINE__, "sigprocmask"); + error_at_line(9, errno, __FILE__, __LINE__, "sigprocmask"); /* struct itimerval itimerval = { .it_interval = { .tv_sec = 60 @@ -65,7 +74,7 @@ int main (int argc, char ** argv) { if (setitimer(ITIMER_REAL, &itimerval, NULL) == -1) error_at_line(9, errno, __FILE__, __LINE__, "setitimer"); */ if (argc != 1+1) - error_at_line(10, 0, __FILE__, __LINE__, "%s configfile.ben >> possible_torrents.L", S0(argv[0])); + error_at_line(10, 0, __FILE__, __LINE__, "%s configfile.ben", S0(argv[0])); int cf = open(argv[1], O_RDWR | O_CLOEXEC | O_CREAT, 00664); if (cf == -1) error_at_line(11, errno, __FILE__, __LINE__, "open(%s)", argv[1]); @@ -83,39 +92,48 @@ int main (int argc, char ** argv) { } struct bencoding * config = bdecode(cfr, statbuf.st_size, replace); dht = dht_init(config); - dht->possible_torrent = found_torrent; free_bencoding(config); + dht->possible_torrent = found_torrent; + dht->connection = connection; + pollfds = malloc(sizeof *pollfds); + pollfds[0].fd = dht->socket; + pollfds[0].events = POLLIN; + dht->pollfds = &pollfds; + dht->pollfds_size = &pollfds_size; + dht->nfds = &nfds; + dht->verbosity |= (getenv("TRAVNIK_INCOMING_DHT") ? incoming_dht : 0) | (getenv("TRAVNIK_OUTGOING_DHT") ? outgoing_dht : 0) | expected | debug; struct torrent * torrent = torrent_init(); memcpy(torrent->hash, "\xdd\x82\x55\xec\xdc\x7c\xa5\x5f\xb0\xbb\xf8\x13\x23\xd8\x70\x62\xdb\x1f\x6d\x1c", 20); - torrent->type = announce | peers; + torrent->type = /* (useless, since we have no listening system yet) announce | */ peers | info; add_torrent(dht, torrent); - struct pollfd pollfd = { - .fd = dht->socket, - .events = POLLIN - }; + periodic(dht); + alarm(PERIODIC); w: - alarm(13*60); - while (poll(&pollfd, 1, -1) == 1) + while (poll(pollfds, nfds, -1) != -1) // can't timeout work(dht); - if (errno == EINTR) { - if (sigusr1) { - sigusr1 = 0; - dht_print(stdout, dht); - goto w; - } - if (periodično) { - periodično = 0; - work(dht); - goto w; - } - if (!samomor) { - error_at_line(0, errno, __FILE__, __LINE__, "poll"); - r = 114; - } - } else { - error_at_line(0, errno, __FILE__, __LINE__, "poll"); - r = 115; - goto r; + switch (errno) { + case EINTR: + if (sigusr1) { + sigusr1 = 0; + dht_print(stdout, dht); + goto w; + } + if (periodično) { + periodično = 0; + alarm(PERIODIC); + periodic(dht); + goto w; + } + if (!samomor) { + error_at_line(0, errno, __FILE__, __LINE__, "poll"); + r = 114; + } + break; + default: + error_at_line(0, errno, __FILE__, __LINE__, "poll"); + raise(SIGINT); + r = 115; + goto r; } config = persistent(dht); dht_free(dht); @@ -148,6 +166,8 @@ w: error_at_line(0, errno, __FILE__, __LINE__, "munmap(cf, %ld)", statbuf.st_size); if (close(cf) == -1) error_at_line(0, errno, __FILE__, __LINE__, "close(cf)"); - L(stderr, "exiting cleanly with status %d\n", r); + if (pollfds) + free(pollfds); + fprintf(stderr, "exiting cleanly with status %d\n", r); return r; } diff --git a/src/tcp.c b/src/tcp.c new file mode 100644 index 0000000..99f8828 --- /dev/null +++ b/src/tcp.c @@ -0,0 +1,62 @@ +void disconnection (struct torrent * t) { + fprintf(stderr, "disconnecting from peer "); + peer_print(stderr, t->dl); + fprintf(stderr, " nfds=%ld, pollfds=%zu\n", *t->dht->nfds, *t->dht->pollfds_size); + for (size_t i = 0; i < *t->dht->nfds; i++) { + if ((*t->dht->pollfds)[i].fd == t->socket) { + if (i != (*t->dht->nfds)--) + (*t->dht->pollfds)[i] = (*t->dht->pollfds)[*t->dht->nfds]; + break; + } + } + if (t->dl->flags & goodmeta) { + char buf[128]; + bin2hex(buf, t->hash, 20); + strcpy(buf+40, ".torrent"); + FILE * f = fopen(buf, "w+b"); + if (!f) { + L(user, t->dht, "fopen(%s, \"w+b\"): %s", buf, strerror(errno)); + return; + } + char remote[INET6_ADDRSTRLEN + 64]; + if (!inet_ntop(t->dl->addr.sin6_family, t->dl->addr.sin6_addr.s6_addr, remote, INET6_ADDRSTRLEN+7)) + snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno)); + sprintf(remote+strlen(remote), "/%d", ntohs(t->dl->addr.sin6_port)); + fprintf(f, "d10:created by58:http://ni.šijanec.eu/sijanec/travnik mailto:tk@sijanec.eu13:creation datei%lde8:encoding5:UTF-84:info", time(NULL)); + fwrite(t->metadata, 1, t->size, f); // i don't expect any errors here + fprintf(f, "6:sourced2:ip%zu:%s", strlen(remote), remote); + if (t->software) + fprintf(f, "1:v%zu:%s", strlen(t->software), t->software); + fputs("ee", f); + if (fclose(f) == EOF) + L(user, t->dht, "fclose(%s): %s", buf, strerror(errno)); + } +#ifdef DISCONNECTION_MIXIN_BOTTOM + DISCONNECTION_MIXIN_BOTTOM +#endif +} +void intentions (struct torrent * t) { + for (size_t i = 0; i < *t->dht->nfds; i++) + if ((*t->dht->pollfds)[i].fd == t->socket) { + (*t->dht->pollfds)[i].events &= ~(POLLIN | POLLOUT); + if (t->state & incoming) + (*t->dht->pollfds)[i].events |= POLLIN; + if (t->state & outgoing) + (*t->dht->pollfds)[i].events |= POLLOUT; + } + fprintf(stderr, "peer's intentions: "); + peer_print(stderr, t->dl); + fprintf(stderr, " nfds=%ld, pollfds=%zu%s%s\n", *t->dht->nfds, *t->dht->pollfds_size, (t->state & incoming) ? " reading" : "", (t->state & outgoing) ? " writing" : ""); +} +void connection (struct dht * d, struct torrent * t) { + if (*d->pollfds_size <= *d->nfds) + *d->pollfds = reallocarray(*d->pollfds, (*d->pollfds_size *= 2), sizeof **d->pollfds); + (*d->pollfds)[*d->nfds].events = POLLIN | POLLOUT; + (*d->pollfds)[(*d->nfds)++].fd = t->socket; + fprintf(stderr, "attempting to connect to peer "); + peer_print(stderr, t->dl); + fprintf(stderr, " nfds=%ld, pollfds=%zu\n", *d->nfds, *d->pollfds_size); + t->disconnection = disconnection; + t->dht = d; + t->intentions = intentions; +} -- cgit v1.2.3