commit f977d102c9e6f7fcb642236e358b3ec915ed8787
parent 384834d0c1b45b5e3ca932f5ebbfff39de3855b6
Author: Martin Schanzenbach <schanzen@gnunet.org>
Date: Thu, 11 Jul 2024 16:48:18 +0200
add various performance improvements including multi namestore process worker support and logging
Diffstat:
3 files changed, 39 insertions(+), 22 deletions(-)
diff --git a/ascension/ascension.py b/ascension/ascension.py
@@ -23,6 +23,7 @@ Author: rexxnor
import argparse
import logging
+import os
import time
import subprocess
import itertools
@@ -38,8 +39,18 @@ import ascension.util.keyfile
import ascension.util.rest
import ascension.util.transformers
-def work_slice(n,step,pp_setslice):
- ns_process = subprocess.Popen(["gnunet-namestore", "-B", str(step), "-a", "-S"], stdin=subprocess.PIPE, text=True)
+def work_slice(n,step,pp_setslice,gn_path):
+ # TODO Check path / exit code? Config could be generated.
+ worker_cfg = 'namestore-ascension-worker-' + str(n) + '.conf'
+ with open(worker_cfg, 'w') as f:
+ f.write('[namestore]\n')
+ f.write('DATABASE = postgres\n')
+ worker_sock = 'UNIXPATH = $GNUNET_USER_RUNTIME_DIR/gnunet-service-namestore-'+str(n)+'.sock'
+ f.write(worker_sock)
+
+ ns_svc_full_path = os.path.join(gn_path, 'gnunet', 'libexec', 'gnunet-service-namestore')
+ ns_svc_process = subprocess.Popen([ns_svc_full_path, '-c', worker_cfg])
+ ns_process = subprocess.Popen(["gnunet-namestore", "-B", str(step), "-a", "-S", "-c", worker_cfg], stdin=subprocess.PIPE, text=True)
start = time.time()
i = 0
j = 0
@@ -58,16 +69,14 @@ def work_slice(n,step,pp_setslice):
flags = "[r{}]".format('p' if not r.is_private else '')
# FIXME we have many more flags. but probably not in our use
# case? We always have relative expirations, for example.
- #print("{} {} {} {}\n".format(r.record_type,
- # r.relative_expiration,
- # flags,
- # r.value))
ns_process.stdin.write("{} {} {} {}\n".format(r.record_type,
r.relative_expiration,
flags,
r.value))
ns_process.stdin.close()
ns_process.wait()
+ ns_svc_process.terminate()
+ os.remove(worker_cfg)
class Ascension():
"""
@@ -75,6 +84,10 @@ class Ascension():
"""
def __init__(self, args: argparse.Namespace):
"""Constructor initializing all the classes and variables needed"""
+ # Logging
+ logging.basicConfig()
+ self.logger = logging.getLogger(__name__)
+ self.logger.setLevel(int(args.loglevel))
domain = args.domain
# special case for root zone
@@ -85,10 +98,13 @@ class Ascension():
self.rrsetcount = 0
self.subzonedict = {}
+ self.gnunet_prefix = args.gnunetprefix
+ self.num_workers = int(args.workers)
+ self.batch_size = int(args.batchsize)
self.session = ascension.util.rest.GNUnetRestSession()
self.gnszone = ascension.util.classes.GNSZone(
- self.session, domain, args.public, args.ttl
+ self.session, domain, args.public, args.ttl, self.logger
)
self.dnszone = ascension.util.classes.DNSZone(
domain, args.nameserver, args.port, args.keyfile
@@ -98,13 +114,9 @@ class Ascension():
self.dnszone
)
- # Logging
- logging.basicConfig()
- self.logger = logging.getLogger(__name__)
- self.logger.setLevel(int(args.loglevel))
-
+
- def add_records_to_gns(self, num_workers, batchsize) -> None:
+ def add_records_to_gns(self) -> None:
"""
Extracts records from transferred zone and adds them to GNS
:raises AttributeError: When getting incomplete data
@@ -260,14 +272,14 @@ class Ascension():
pp_setcount = len(pp_set.items())
left = pp_setcount
start = 0
- slice0count = int(pp_setcount/num_workers)
+ slice0count = int(pp_setcount/self.num_workers)
workers = []
- for i in range(num_workers):
+ for i in range(self.num_workers):
slice1count = slice0count
- if (i+1 == num_workers):
+ if (i+1 == self.num_workers):
slice1count = left
ppslice = dict(itertools.islice(pp_set.items(), start, start+slice1count))
- p0 = mp.Process(target=work_slice, args=(i,batchsize,ppslice,))
+ p0 = mp.Process(target=work_slice, args=(i,self.batch_size,ppslice,self.gnunet_prefix))
p0.start()
workers.append(p0)
start += slice1count
@@ -523,7 +535,7 @@ def main():
time.sleep(retry)
continue
- ascender.add_records_to_gns(int(args.workers), int(args.batchsize))
+ ascender.add_records_to_gns()
first_run = False
if args.standalone:
diff --git a/ascension/util/argumentparser.py b/ascension/util/argumentparser.py
@@ -34,6 +34,10 @@ def parse_arguments() -> argparse.Namespace:
parser.add_argument('-n', '--nameserver',
help='Nameserver to use for migrating',
required=False)
+ parser.add_argument('-G', '--gnunetprefix',
+ help='GNUnet prefix',
+ default='/usr/lib',
+ required=False)
parser.add_argument('-P', '--port',
help='Port to use for zone transfer with nameserver',
required=False,
diff --git a/ascension/util/classes.py b/ascension/util/classes.py
@@ -74,9 +74,10 @@ class GNSZone():
gnunet_rest: ascension.util.rest.GNUnetRestSession,
zonename: str,
public: bool,
- minimum: int):
+ minimum: int,
+ logger: logging.Logger):
"""Constructor"""
- self.logger = logging.getLogger(__name__)
+ self.logger = logger
self.domain = zonename
self.gnunet_rest = gnunet_rest
self.public = public
@@ -126,6 +127,7 @@ class GNSZone():
:param zonename: The label name of the zone
:returns: str of pubkey of created or existing GNUnet zone
"""
+ self.logger.info('Creating zone ' + zonename)
# This is needed including the argument for subzones
res = subprocess.run(["gnunet-identity", "-X", "-C", zonename])
if res.returncode == 201:
@@ -135,7 +137,7 @@ class GNSZone():
return None
else:
self.logger.info("Created identity %s", zonename)
- res = subprocess.run(["gnunet-identity", "-q", "-d", "-e", zonename], text=True)
+ res = subprocess.run(["gnunet-identity", "-q", "-d", "-e", zonename], text=True, capture_output=True)
if res.returncode != 0:
self.logger.error("Failed to read identity key for `%s'", zonename)
return None
@@ -250,7 +252,6 @@ class DNSZone:
else:
zf = self.zone_backup_file
self.zone = dns.zone.from_file(zf, origin=self.domain)
- print(self.zone)
self.logger.info("Zonebackup file %s loaded", zf)
except FileNotFoundError:
self.logger.info("Zonebackup file was not found, will be created")