diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 43 |
1 files changed, 28 insertions, 15 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 6dc2ab9e..2446cdbf 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -8,6 +8,9 @@ import datetime import subprocess from collections import Counter from typing import Optional, Tuple +from confluent_kafka import Consumer, KafkaException +import lxml +import xml.parsers.expat import xml.etree.ElementTree as ET import elasticsearch @@ -528,7 +531,7 @@ class JsonLinePusher(RecordPusher): record = json.loads(line) self.importer.push_record(record) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -544,7 +547,7 @@ class CsvPusher(RecordPusher): continue self.importer.push_record(line) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -560,7 +563,7 @@ class LinePusher(RecordPusher): continue self.importer.push_record(line) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -579,7 +582,7 @@ class SqlitePusher(RecordPusher): for row in cur: self.importer.push_record(row) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -600,7 +603,7 @@ class Bs4XmlLinesPusher(RecordPusher): self.importer.push_record(soup) soup.decompose() counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -618,7 +621,7 @@ class Bs4XmlFilePusher(RecordPusher): record.decompose() counts = self.importer.finish() soup.decompose() - print(counts) + print(counts, file=sys.stderr) return counts @@ -641,29 +644,39 @@ class Bs4XmlLargeFilePusher(RecordPusher): by inner container/release API lookup caches. """ - def __init__(self, importer, xml_file, record_tag, **kwargs): + def __init__(self, importer, xml_file, record_tags, use_lxml=False, **kwargs): self.importer = importer self.xml_file = xml_file - self.record_tag = record_tag + self.record_tags = record_tags + self.use_lxml = use_lxml def run(self): - elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) + if self.use_lxml: + elem_iter = lxml.etree.iterparse(self.xml_file, ["start", "end"], load_dtd=True) + else: + elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) root = None for (event, element) in elem_iter: - if not root and event == "start": + if (root is not None) and event == "start": root = element continue - if not (element.tag == self.record_tag and event == "end"): + if not (element.tag in self.record_tags and event == "end"): continue - soup = BeautifulSoup(ET.tostring(element), "xml") - for record in soup.find_all(self.record_tag): + if self.use_lxml: + soup = BeautifulSoup(lxml.etree.tostring(element), "xml") + else: + soup = BeautifulSoup(ET.tostring(element), "xml") + for record in soup.find_all(): + if record.name not in self.record_tags: + continue self.importer.push_record(record) record.decompose() soup.decompose() element.clear() - root.clear() + if root is not None: + root.clear() counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts |