commit e7e3fa3422301feed68e02ae88c0ce8374faae18
parent a237ad931d72d2a154f522e2aed9cce359aca67c
Author: Martin Schanzenbach <schanzen@gnunet.org>
Date: Thu, 4 Jul 2024 08:43:07 +0200
Add threaded inserts with workers
Diffstat:
1 file changed, 67 insertions(+), 31 deletions(-)
diff --git a/ascension/ascension.py b/ascension/ascension.py
@@ -25,6 +25,8 @@ import argparse
import logging
import time
import subprocess
+import itertools
+import multiprocessing as mp
import dns.rdatatype
import dns.zone
@@ -36,6 +38,36 @@ import ascension.util.keyfile
import ascension.util.rest
import ascension.util.transformers
+def work_slice(n,step,pp_setslice):
+ ns_process = subprocess.Popen(["gnunet-namestore", "-B", str(step), "-a", "-S"], stdin=subprocess.PIPE, text=True)
+ start = time.time()
+ i = 0
+ j = 0
+ psetcount = len(pp_setslice)
+ for name, payload in pp_setslice.items():
+ # log if the rdataset is empty for some reason
+ i += 1
+ if not payload:
+ print("Empty Rdataset!")
+ continue
+ j += len(payload.data)
+ if (i % step) == 0:
+ print("Worker #%d: Adding record set %d/%d for a total of %d records\n"%(n,i,psetcount,j), end="")
+ ns_process.stdin.write(name + ":\n")
+ for r in payload.data:
+ flags = "[r{}]".format('p' if not r.is_private else '')
+ # FIXME we have many more flags. but probably not in our use
+ # case? We always have relative expirations, for example.
+ #print("{} {} {} {}\n".format(r.record_type,
+ # r.relative_expiration,
+ # flags,
+ # r.value))
+ ns_process.stdin.write("{} {} {} {}\n".format(r.record_type,
+ r.relative_expiration,
+ flags,
+ r.value))
+ ns_process.stdin.close()
+ ns_process.wait()
class Ascension():
"""
@@ -186,19 +218,7 @@ class Ascension():
# Replace the records already present in GNS as old ones are not deleted
self.logger.debug(payload.record_name + "." + domain + ":\n")
- self.ns_process.stdin.write(payload.record_name + "." + domain + ":\n")
- for r in payload.data:
- flags = "[r{}]".format('p' if not r.is_private else '')
- # FIXME we have many more flags. but probably not in our use
- # case? We always have relative expirations, for example.
- self.logger.debug("{} {} {} {}\n".format(r.record_type,
- r.relative_expiration,
- flags,
- r.value))
- self.ns_process.stdin.write("{} {} {} {}\n".format(r.record_type,
- r.relative_expiration,
- flags,
- r.value))
+ return payload.record_name + "." + domain, payload
# FIXME error checking
#response = self.session.post(f"/namestore/{domain}", data=payload.to_json())
@@ -216,33 +236,48 @@ class Ascension():
# End of worker
# Building hierarchy afterwards
- start = time.time()
+ tstart = time.time()
self.create_zone_hierarchy()
# Needs to happen after the previous line and before the adding of records
self.transformer.subzonedict = self.subzonedict
- end = time.time()
- self.logger.info("Zone hierarchy in %s seconds", str(end - start))
+ tend = time.time()
+ self.logger.info("Zone hierarchy in %s seconds", str(tend - tstart))
# Do it single threaded because threading scares me
- self.ns_process = subprocess.Popen(["gnunet-namestore", "-a", "-S"], stdin=subprocess.PIPE, text=True)
- start = time.time()
- i = 0
setcount = len(self.dnszone.zone.nodes.items())
+ pp_set = {}
+ rrcount = 0
for name, rdatasets in self.dnszone.zone.nodes.items():
# log if the rdataset is empty for some reason
- i += 1
- print("Adding records %d/%d\r"%(i,setcount), end="")
if not rdatasets:
- self.logger.warning("Empty Rdataset!")
+ print("Empty Rdataset!")
continue
- worker((name, rdatasets))
- end = time.time()
-
- self.ns_process.stdin.close()
- self.ns_process.wait()
- self.logger.info("Added %d RRSets", self.rrsetcount)
+ name,payload = worker((name, rdatasets))
+ pp_set[name] = payload
+ if payload:
+ rrcount += len(payload.data)
+
+ pp_setcount = len(pp_set.items())
+ left = pp_setcount
+ start = 0
+ slice0count = int(pp_setcount/10)
+ workers = []
+ for i in range(10):
+ slice1count = slice0count
+ if (i == 9):
+ slice1count = left
+ ppslice = dict(itertools.islice(pp_set.items(), start, start+slice1count))
+ p0 = mp.Process(target=work_slice, args=(i,1000,ppslice,))
+ p0.start()
+ workers.append(p0)
+ start += slice1count
+ for w in workers:
+ w.join()
+ tend = time.time()
+
+ self.logger.info("Added %d RRSets for a total of %d RRs", pp_setcount, rrcount)
self.logger.info("All records have been added in %s seconds",
- str(end - start))
+ str(tend - tstart))
def add_pkey_record_to_zone(self, pkey: str, domain: str, label: str, ttl: int) -> None:
@@ -265,7 +300,7 @@ class Ascension():
record_name=label,
data=[data]
)
- self.logger.info("Added records to /namestore/%s with data %s", domain, record_data)
+ self.logger.debug("Added records to /namestore/%s with data %s", domain, record_data)
payload = record_data
self.ns_process.stdin.write(payload.record_name + "." + domain + ":\n")
for r in payload.data:
@@ -391,7 +426,7 @@ class Ascension():
if sub == '' or not pkeyttltuple[0]:
continue
label = zone.split('.')[0]
- self.logger.info("Adding zone %s with %s pkey into %s", zone, pkey, domain)
+ self.logger.info("Adding zone %s with %s zkey into %s", zone, pkey, domain)
self.add_pkey_record_to_zone(pkey, domain, label, int(ttl))
self.ns_process.stdin.close()
self.ns_process.wait()
@@ -488,4 +523,5 @@ def main():
return 0
if __name__ == '__main__':
+ mp.set_start_method('spawn')
main()