aboutsummaryrefslogtreecommitdiff
path: root/gnunet/dht.py
blob: 8e4ae8596499cf47c6a63d982a95b3b30c6cbc6d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import dbus

import datetime

from . import *
from . import block
from ._dbus_utils import *

get_requests = {}
requests_lock = threading.Lock()


class GetResult(threading.Thread):
    def __init__(self, expiry, key, get_path, put_path, block_type, data, path):
        threading.Thread.__init__(self)
        self.expiry = expiry
        self.key = key
        self.get_path = get_path
        self.put_path = put_path
        self.block_type = block_type
        self.data = data
        self.path = path
        self.daemon = True
        self.start()

    def run(self):
        request = None
        with requests_lock:
            request = get_requests[self.path]

        if request:
            if request.record_route:
                request.callback(self.block_type, self.key, self.data, self.expiry, get_path=self.get_path, put_path=self.put_path)
            else:
                request.callback(self.block_type, self.key, self.data, self.expiry)


def _result(expiry, key, get_path, put_path, block_type, data, path):
    expiry = pythonize(expiry, datetime.datetime)
    key = HashCode(key)
    get_path = list(get_path)
    put_path = list(put_path)
    block_type = str(block_type)
    data = bytearray(data)
    GetResult(expiry, key, get_path, put_path, block_type, data, path)

sysbus.add_signal_receiver(_result, "result", "gnu.gnunet.dht.get", "gnu.gnunet.dht", path_keyword="path")


class GetRequest:
    def __init__(self, path, callback, record_route):
        self._path = path
        self.callback = callback
        self.record_route = record_route

    def filter_known_results(self, keys):
        keys = dbus.Array([dbusize(HashCode(key)) for key in list(keys)], signature="v")
        try:
            sysbus.get_object("gnu.gnunet.dht", self._path).filter_known_results(keys, dbus_interface="gnu.gnunet.dht.get")
        except dbus.DBusException as e:
            handle_exception(e, "dht", "gnu.gnunet.dht")

    def stop(self):
        try:
            sysbus.get_object("gnu.gnunet.dht", self._path).stop(dbus_interface="gnu.gnunet.dht.get")
        except dbus.DBusException as e:
            handle_exception(e, "dht", "gnu.gnunet.dht")


def put(key, desired_replication_level, block_type, data, expiry=None, demultiplex_everywhere=False, record_route=False, bart=False):
    key = dbusize(HashCode(key), True)
    desired_replication_level = dbus.UInt32(desired_replication_level)
    if block_type not in block.types:
        raise ValueError("'block_type' must be one of %s" % block.types)
    block_type = dbus.String(block_type, variant_level=1)
    if expiry is not None:
        if not isinstance(expiry, datetime.datetime):
            raise TypeError("'expiry' must be a datetime.datetime")
        expiry = dbusize(expiry)
    else:
        expiry = dbus.String("end of time", variant_level=1)
    options = dbus.Array([], variant_level=1, signature="s")
    if demultiplex_everywhere:
        options += ["demultiplex_everywhere"]
    if record_route:
        options += ["record_route"]
    if bart:
        options += ["bart"]
    data = dbus.Array(bytearray(data), signature="y")

    try:
        sysbus.get_object("gnu.gnunet.dht", "/").put(key, desired_replication_level, options, block_type, data, expiry)
    except dbus.DBusException as e:
        handle_exception(e, "dht", "gnu.gnunet.dht")


def get_start(callback, block_type, key, desired_replication_level, demultiplex_everywhere=False, record_route=False, bart=False):
    if block_type not in block.types:
        raise ValueError("'block_type' must be one of %s" % block.types)
    block_type = dbus.String(block_type, variant_level=1)
    key = dbusize(HashCode(key), True)
    desired_replication_level = dbus.UInt32(desired_replication_level)
    options = dbus.Array([], variant_level=1, signature="s")
    if demultiplex_everywhere:
        options += ["demultiplex_everywhere"]
    if record_route:
        options += ["record_route"]
    if bart:
        options += ["bart"]

    ret = None
    try:
        with requests_lock:
            path = sysbus.get_object("gnu.gnunet.dht", "/").get_start(block_type, key, desired_replication_level, options)
            ret = GetRequest(path, callback, record_route)
            get_requests[path] = ret
    except dbus.DBusException as e:
        handle_exception(e, "dht", "gnu.gnunet.dht")

    return ret