From d67921d73e26d006da25c48f37c87e4179f86eff Mon Sep 17 00:00:00 2001 From: bnewbold Date: Mon, 25 Jan 2016 13:24:11 -0800 Subject: bbb.py: more general WIP --- bbb/bbb.py | 247 +++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 226 insertions(+), 21 deletions(-) diff --git a/bbb/bbb.py b/bbb/bbb.py index d57327f..68b2aed 100644 --- a/bbb/bbb.py +++ b/bbb/bbb.py @@ -3,8 +3,8 @@ import requests import time import warnings import dateutil.parser +import json -from .util import * from .exceptions import * from .constants import * @@ -82,6 +82,10 @@ class WikibaseServer: def _get(self, action, params=dict()): return self._api_call("GET", action, params) + def _get_csrf_token(self): + resp = self._get('query', dict(meta="tokens", type="csrf")) + return resp['query']['tokens']['csrftoken'] + def check(self): # Check that wikibase API calls work (instead of just "action=query") self._get("wbparsevalue", @@ -129,6 +133,8 @@ class WikibaseServer: NB: doesn't handle case of multiple sites, single title """ + if len(query) == 0: + return [] params = { 'sites': site or self.site, 'languages': lang or self.lang, @@ -167,7 +173,7 @@ class WikibaseServer: as_titles = False if type(query[0]) is int: # Convert list of ints to QIDs - query = map(lambda x: "Q%d" % x, query) + query = ["Q%d" % x for x in query] elif not (type(query[0]) is str and query[0][0] in "Q" and query[0][1:].isdigit()): # Must be list of titles as_titles = True @@ -177,44 +183,124 @@ class WikibaseServer: except MissingEntityError as wee: # Case entity error to item error raise MissingItemError(id=wee.id, title=wee.title) - items = [WikibaseItem.from_json(e) for e in entities] + items = [WikibaseItem.from_dict(e) for e in entities] return items def get_item(self, query, **kwargs): return self.get_items((query, ), **kwargs) - + + def get_properties(self, query, **kwargs): + if type(query[0]) is int: + # Convert list of ints to PIDs + query = map(lambda x: "P%d" % x, query) + elif not (type(query[0]) is str and query[0][0] in "P" and query[0][1:].isdigit()): + raise ValueError("query must be a list of PIDs") + + try: + entities = self._get_entities(query, as_titles=False, expected='property', **kwargs) + except MissingEntityError as wee: + # Case entity error to item error + raise MissingPropertyError(id=wee.id, title=wee.title) + items = [WikibaseProperty.from_dict(e) for e in entities] + return items + + def get_property(self, query, **kwargs): + return self.get_properties((query, ), **kwargs) + + def _search_entities(self, query, etype, limit=7, lang=None): + resp = self._get("wbsearchentities", + dict(search=query, language=lang or self.lang, type=etype, + limit=limit)) + if not 'success' in resp: + raise WikibaseException("Expected 'success'") + results = resp['search'] + return [dict(id=r['id'], + label=r['label'], + #description=r['description'], + url=r['url']) for r in results] + + def search_items(self, query, fetch=False, lang=None, **kwargs): + lang = lang or self.lang + results = self._search_entities(query, 'item', lang=lang, **kwargs) + if not fetch: + return results + return self.get_items([short['id'] for short in results], lang=lang) + + def search_properties(self, query, fetch=False, lang=None, **kwargs): + lang = lang or self.lang + results = self._search_entities(query, 'property', lang=lang, **kwargs) + if not fetch: + return results + return get_properties([short['id'] for short in results], lang=lang) + + def create(self, thing): + if not isinstance(thing, (WikibaseEntity, WikibaseStatement)): + raise ValueError( + "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement") + thing.create(self) + + def save(self, thing): + if not isinstance(thing, (WikibaseEntity, WikibaseStatement)): + raise ValueError( + "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement") + thing.save(self) class WikibaseEntity: ''' Base class for WikibaseItem and WikibaseProperty ''' - def __init__(self, dbid=None, rev=None, rev_timestamp=None, labels=[], - descriptions=[], aliases=[], statements=[], sitelinks=[]): - self.dbid = dbid - self.rev = rev - self.rev_timestamp = rev_timestamp + def __init__(self, labels=[], descriptions=[], aliases=[], dbid=None, + rev=None, rev_timestamp=None, statements=[], lang=DEFAULT_LANG): + if type(labels) is str: + labels = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": labels}} + if type(descriptions) is str: + descriptions = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": descriptions}} + if type(aliases) is str: + aliases = (aliases, ) + if type(aliases) in (list, tuple): + aliases = {DEFAULT_LANG: + [{"language": DEFAULT_LANG, "value": al} for al in aliases] + } self.labels = labels self.descriptions = descriptions self.aliases = aliases + self.dbid = dbid + self.rev = rev + self.rev_timestamp = rev_timestamp self.statements = statements - self.sitelinks = sitelinks + self.entity_type = None @classmethod - def from_json(cls, j): + def from_dict(cls, j): we = cls( dbid=j['id'], rev=j['lastrevid'], rev_timestamp=dateutil.parser.parse(j['modified']), aliases=j['aliases'], - sitelinks=j['sitelinks'], descriptions=j['descriptions'], ) claims = j['claims'] - for c in claims: - we.statements.append(WikibaseStatement.from_json(c)) + for prop in claims: + for c in claims[prop]: + we.statements.append(WikibaseStatement.from_dict(c)) return we + def to_dict(self, new=False): + d = dict() + d['labels'] = self.labels + d['descriptions'] = self.descriptions + d['aliases'] = self.aliases + if self.entity_type: + d['type'] = self.entity_type + d['statements'] = [s.to_dict() for s in self.statements] + if not new: + if self.dbid: + d['id'] = self.dbid + if self.rev: + d['lastrevid'] = self.rev + return d + def add_statement(self, statement): raise NotImplementedError @@ -224,29 +310,89 @@ class WikibaseEntity: def add_alias(self, label): raise NotImplementedError + def create(self, srv, summary=None): + # TODO: first have to traverse statements and find/sync any properties? + if summary is None: + summary = "Created new Entity" + data = self.to_dict(new=True) + srv._post("wbeditentity", { + "bot": srv.is_bot, + "new": self.entity_type, + "token": srv._get_csrf_token(), + "summary": summary, + "data": data, + }) + + def save(self, srv, summary=None): + if summary is None: + summary = "Changes to existing Entity" + data = self.to_dict(new=True) + srv._post("wbeditentity", { + "bot": srv.is_bot, + "new": self.entity_type, + "token": srv._get_csrf_token(), + "summary": summary, + "data": data, + }) + class WikibaseItem(WikibaseEntity): def __init__(self, *args, **kwargs): + self.sitelinks = kwargs.pop('sitelinks', []) super().__init__(*args, **kwargs) + self.entity_type = "item" def __repr__(self): - return "" % self.qid + return "" % self.qid() + + @classmethod + def from_dict(cls, d): + wi = super().from_dict(d) + wi.sitelinks = d['sitelinks'] + return wi + + def to_dict(self): + d = super().to_dict() + d['sitelinks'] = self.sitelinks + return d + + def qid(self): + if self.dbid: + assert self.dbid.startswith("Q") + return self.dbid class WikibaseProperty(WikibaseEntity): def __init__(self,*args, **kwargs): - self.datatype = kwargs.pop('datatype') + self.datatype = kwargs.pop('datatype', None) super().__init__(*args, **kwargs) + self.entity_type = "property" def __repr__(self): - return "" % self.pid + return "" % self.pid() + + @classmethod + def from_dict(cls, d): + wp = super().from_dict(d) + wp.datatype = d['datatype'] + return wp + + def to_dict(self): + d = super().to_dict() + d['datatype'] = self.datatype + return d + + def pid(self): + if self.dbid: + assert self.dbid.startswith("P") + return self.dbid class WikibaseStatement: - def __init__(self, property=None, value=None, qualifiers=[], references=[], - rank='normal'): - self.property = property - self.value = value + def __init__(self, qualifiers=[], references=[], + rank='normal', mainsnak=None, guid=None): + self.guid = guid + self.mainsnak = mainsnak self.qualifiers = qualifiers self.references = references self.rank = rank @@ -257,3 +403,62 @@ class WikibaseStatement: else: return "" + @classmethod + def from_dict(cls, d): + print(d.keys()) + if not d['type'] == 'statement': + raise WikibaseException("Tried to parse a non-statement claim?") + quals = [] + for prop_set in d.get('qualifiers', {}).values(): + quals.extend([WikibaseSnak.from_dict(q) for q in prop_set]) + # TODO: handle references hashes + refs = [] + for ref in d.get("references", []): + # We get a list of lists; deep! + snaks_lists = ref.get("snaks", {}).values() + snaks = [] + for sl in snaks_lists: + snaks.extend(sl) + refs.append([WikibaseSnak.from_dict(q) for q in snaks]) + ws = cls( + mainsnak=WikibaseSnak.from_dict(d['mainsnak']), + guid=d['id'], + rank=d['rank'], + qualifiers=quals, + references=refs, + ) + +class WikibaseSnak: + + def __init__(self, property=None, value=None, snaktype=None, hash=None, datatype=None): + if snaktype is None: + if value is None: + snaktype = "novalue" + else: + snaktype = "somevalue" + self.property = property + self.snaktype = snaktype + self.datatype = datatype + self.value = value + self.hash = None + + def __repr__(self): + return "" % ( + self.property, self.snaktype) + + @classmethod + def from_dict(cls, d): + ws = cls( + property=d['property'], + snaktype=d['snaktype'], + hash=d.get('hash', None), + ) + if ws.snaktype == 'value': + ws.datatype = d['datatype'] + if ws.datatype == "wikibase-item": + ws.value = "Q%d" % int(d['datavalue']['value']['numeric-id']) + if ws.datatype == "wikibase-property": + ws.value = "P%d" % int(d['datavalue']['value']['numeric-id']) + else: + ws.value = d['datavalue']['value'] + -- cgit v1.2.3