From 4e332e9037530ebc62836acfa78896dc76700c9c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 2 Dec 2020 10:49:35 -0800 Subject: add 'lxml' mode for large XML file import, and multi-tags --- python/fatcat_import.py | 3 +-- python/fatcat_tools/importers/common.py | 43 +++++++++++++++++++++------------ python/tests/import_pubmed.py | 4 +-- 3 files changed, 31 insertions(+), 19 deletions(-) (limited to 'python') diff --git a/python/fatcat_import.py b/python/fatcat_import.py index ff6c94dc..6c9e65a8 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -71,8 +71,7 @@ def run_pubmed(args): Bs4XmlLargeFilePusher( pi, args.xml_file, - "PubmedArticle", - record_list_tag="PubmedArticleSet", + ["PubmedArticle"], ).run() def run_jstor(args): diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 6dc2ab9e..2446cdbf 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -8,6 +8,9 @@ import datetime import subprocess from collections import Counter from typing import Optional, Tuple +from confluent_kafka import Consumer, KafkaException +import lxml +import xml.parsers.expat import xml.etree.ElementTree as ET import elasticsearch @@ -528,7 +531,7 @@ class JsonLinePusher(RecordPusher): record = json.loads(line) self.importer.push_record(record) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -544,7 +547,7 @@ class CsvPusher(RecordPusher): continue self.importer.push_record(line) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -560,7 +563,7 @@ class LinePusher(RecordPusher): continue self.importer.push_record(line) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -579,7 +582,7 @@ class SqlitePusher(RecordPusher): for row in cur: self.importer.push_record(row) counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -600,7 +603,7 @@ class Bs4XmlLinesPusher(RecordPusher): self.importer.push_record(soup) soup.decompose() counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts @@ -618,7 +621,7 @@ class Bs4XmlFilePusher(RecordPusher): record.decompose() counts = self.importer.finish() soup.decompose() - print(counts) + print(counts, file=sys.stderr) return counts @@ -641,29 +644,39 @@ class Bs4XmlLargeFilePusher(RecordPusher): by inner container/release API lookup caches. """ - def __init__(self, importer, xml_file, record_tag, **kwargs): + def __init__(self, importer, xml_file, record_tags, use_lxml=False, **kwargs): self.importer = importer self.xml_file = xml_file - self.record_tag = record_tag + self.record_tags = record_tags + self.use_lxml = use_lxml def run(self): - elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) + if self.use_lxml: + elem_iter = lxml.etree.iterparse(self.xml_file, ["start", "end"], load_dtd=True) + else: + elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) root = None for (event, element) in elem_iter: - if not root and event == "start": + if (root is not None) and event == "start": root = element continue - if not (element.tag == self.record_tag and event == "end"): + if not (element.tag in self.record_tags and event == "end"): continue - soup = BeautifulSoup(ET.tostring(element), "xml") - for record in soup.find_all(self.record_tag): + if self.use_lxml: + soup = BeautifulSoup(lxml.etree.tostring(element), "xml") + else: + soup = BeautifulSoup(ET.tostring(element), "xml") + for record in soup.find_all(): + if record.name not in self.record_tags: + continue self.importer.push_record(record) record.decompose() soup.decompose() element.clear() - root.clear() + if root is not None: + root.clear() counts = self.importer.finish() - print(counts) + print(counts, file=sys.stderr) return counts diff --git a/python/tests/import_pubmed.py b/python/tests/import_pubmed.py index 201f533c..10ded3fc 100644 --- a/python/tests/import_pubmed.py +++ b/python/tests/import_pubmed.py @@ -20,7 +20,7 @@ def test_pubmed_importer(pubmed_importer): last_index = pubmed_importer.api.get_changelog(limit=1)[0].index with open('tests/files/pubmedsample_2019.xml', 'r') as f: pubmed_importer.bezerk_mode = True - counts = Bs4XmlLargeFilePusher(pubmed_importer, f, "PubmedArticle").run() + counts = Bs4XmlLargeFilePusher(pubmed_importer, f, ["PubmedArticle"]).run() assert counts['insert'] == 176 assert counts['exists'] == 0 assert counts['skip'] == 0 @@ -37,7 +37,7 @@ def test_pubmed_importer(pubmed_importer): with open('tests/files/pubmedsample_2019.xml', 'r') as f: pubmed_importer.bezerk_mode = False pubmed_importer.reset() - counts = Bs4XmlLargeFilePusher(pubmed_importer, f, "PubmedArticle").run() + counts = Bs4XmlLargeFilePusher(pubmed_importer, f, ["PubmedArticle"]).run() assert counts['insert'] == 0 assert counts['exists'] == 176 assert counts['skip'] == 0 -- cgit v1.2.3