aboutsummaryrefslogtreecommitdiff
path: root/gnunet/dht.py
blob: e9aa52586299cbc84a786224dc53276373a19ea9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import dbus

import datetime

from . import *
from . import block
from ._dbus_utils import *

get_requests = {}
requests_lock = threading.Lock()


class GetResult(threading.Thread):
    def __init__(self, expiry, key, get_path, put_path, block_type, data, path):
        threading.Thread.__init__(self)
        self.expiry = expiry
        self.key = key
        self.get_path = get_path
        self.put_path = put_path
        self.block_type = block_type
        self.data = data
        self.path = path
        self.daemon = True
        self.start()

    def run(self):
        request = None
        with requests_lock:
            request = get_requests[self.path]

        if request:
            if request.record_route:
                request.callback(self.block_type, self.key, self.data,
                                 self.expiry, get_path=self.get_path,
                                 put_path=self.put_path)
            else:
                request.callback(self.block_type, self.key, self.data,
                                 self.expiry)


def _result(expiry, key, get_path, put_path, block_type, data, path):
    expiry = pythonize(expiry, datetime.datetime)
    key = HashCode(key)
    get_path = list(get_path)
    put_path = list(put_path)
    block_type = str(block_type)
    data = bytearray(data)
    GetResult(expiry, key, get_path, put_path, block_type, data, path)

sysbus.add_signal_receiver(_result, "result", "gnu.gnunet.dht.get",
                           "gnu.gnunet.dht", path_keyword="path")


class GetRequest:
    def __init__(self, path, callback, record_route):
        self._path = path
        self.callback = callback
        self.record_route = record_route

    def filter_known_results(self, keys):
        keys = dbus.Array([dbusize(HashCode(key))
                           for key in list(keys)], signature="v")
        try:
            sysbus.get_object("gnu.gnunet.dht", self._path).filter_known_results(keys, dbus_interface="gnu.gnunet.dht.get")
        except dbus.DBusException as e:
            handle_exception(e, "dht", "gnu.gnunet.dht")

    def stop(self):
        try:
            sysbus.get_object("gnu.gnunet.dht", self._path).stop(dbus_interface="gnu.gnunet.dht.get")
        except dbus.DBusException as e:
            handle_exception(e, "dht", "gnu.gnunet.dht")


def put(key, desired_replication_level, block_type, data, expiry=None,
        demultiplex_everywhere=False, record_route=False, bart=False):
    key = dbusize(HashCode(key), True)
    desired_replication_level = dbus.UInt32(desired_replication_level)
    if block_type not in block.TYPES:
        raise ValueError("'block_type' must be one of %s" % block.TYPES)
    block_type = dbus.String(block_type, variant_level=1)
    if expiry is not None:
        if not isinstance(expiry, datetime.datetime):
            raise TypeError("'expiry' must be a datetime.datetime")
        expiry = dbusize(expiry)
    else:
        expiry = dbus.String("end of time", variant_level=1)
    options = dbus.Array([], variant_level=1, signature="s")
    if demultiplex_everywhere:
        options += ["demultiplex_everywhere"]
    if record_route:
        options += ["record_route"]
    if bart:
        options += ["bart"]
    data = dbus.Array(bytearray(data), signature="y")

    try:
        sysbus.get_object("gnu.gnunet.dht", "/").put(key, desired_replication_level, options, block_type, data, expiry)
    except dbus.DBusException as e:
        handle_exception(e, "dht", "gnu.gnunet.dht")


def get_start(callback, block_type, key, desired_replication_level,
              demultiplex_everywhere=False, record_route=False, bart=False):
    if block_type not in block.TYPES:
        raise ValueError("'block_type' must be one of %s" % block.TYPES)
    block_type = dbus.String(block_type, variant_level=1)
    key = dbusize(HashCode(key), True)
    desired_replication_level = dbus.UInt32(desired_replication_level)
    options = dbus.Array([], variant_level=1, signature="s")
    if demultiplex_everywhere:
        options += ["demultiplex_everywhere"]
    if record_route:
        options += ["record_route"]
    if bart:
        options += ["bart"]

    ret = None
    try:
        with requests_lock:
            path = sysbus.get_object("gnu.gnunet.dht", "/").get_start(block_type, key, desired_replication_level, options)
            ret = GetRequest(path, callback, record_route)
            get_requests[path] = ret
    except dbus.DBusException as e:
        handle_exception(e, "dht", "gnu.gnunet.dht")

    return ret