summaryrefslogtreecommitdiffstats
path: root/bbb/bbb.py
diff options
context:
space:
mode:
Diffstat (limited to 'bbb/bbb.py')
-rw-r--r--bbb/bbb.py247
1 files changed, 226 insertions, 21 deletions
diff --git a/bbb/bbb.py b/bbb/bbb.py
index d57327f..68b2aed 100644
--- a/bbb/bbb.py
+++ b/bbb/bbb.py
@@ -3,8 +3,8 @@ import requests
import time
import warnings
import dateutil.parser
+import json
-from .util import *
from .exceptions import *
from .constants import *
@@ -82,6 +82,10 @@ class WikibaseServer:
def _get(self, action, params=dict()):
return self._api_call("GET", action, params)
+ def _get_csrf_token(self):
+ resp = self._get('query', dict(meta="tokens", type="csrf"))
+ return resp['query']['tokens']['csrftoken']
+
def check(self):
# Check that wikibase API calls work (instead of just "action=query")
self._get("wbparsevalue",
@@ -129,6 +133,8 @@ class WikibaseServer:
NB: doesn't handle case of multiple sites, single title
"""
+ if len(query) == 0:
+ return []
params = {
'sites': site or self.site,
'languages': lang or self.lang,
@@ -167,7 +173,7 @@ class WikibaseServer:
as_titles = False
if type(query[0]) is int:
# Convert list of ints to QIDs
- query = map(lambda x: "Q%d" % x, query)
+ query = ["Q%d" % x for x in query]
elif not (type(query[0]) is str and query[0][0] in "Q" and query[0][1:].isdigit()):
# Must be list of titles
as_titles = True
@@ -177,44 +183,124 @@ class WikibaseServer:
except MissingEntityError as wee:
# Case entity error to item error
raise MissingItemError(id=wee.id, title=wee.title)
- items = [WikibaseItem.from_json(e) for e in entities]
+ items = [WikibaseItem.from_dict(e) for e in entities]
return items
def get_item(self, query, **kwargs):
return self.get_items((query, ), **kwargs)
-
+
+ def get_properties(self, query, **kwargs):
+ if type(query[0]) is int:
+ # Convert list of ints to PIDs
+ query = map(lambda x: "P%d" % x, query)
+ elif not (type(query[0]) is str and query[0][0] in "P" and query[0][1:].isdigit()):
+ raise ValueError("query must be a list of PIDs")
+
+ try:
+ entities = self._get_entities(query, as_titles=False, expected='property', **kwargs)
+ except MissingEntityError as wee:
+ # Case entity error to item error
+ raise MissingPropertyError(id=wee.id, title=wee.title)
+ items = [WikibaseProperty.from_dict(e) for e in entities]
+ return items
+
+ def get_property(self, query, **kwargs):
+ return self.get_properties((query, ), **kwargs)
+
+ def _search_entities(self, query, etype, limit=7, lang=None):
+ resp = self._get("wbsearchentities",
+ dict(search=query, language=lang or self.lang, type=etype,
+ limit=limit))
+ if not 'success' in resp:
+ raise WikibaseException("Expected 'success'")
+ results = resp['search']
+ return [dict(id=r['id'],
+ label=r['label'],
+ #description=r['description'],
+ url=r['url']) for r in results]
+
+ def search_items(self, query, fetch=False, lang=None, **kwargs):
+ lang = lang or self.lang
+ results = self._search_entities(query, 'item', lang=lang, **kwargs)
+ if not fetch:
+ return results
+ return self.get_items([short['id'] for short in results], lang=lang)
+
+ def search_properties(self, query, fetch=False, lang=None, **kwargs):
+ lang = lang or self.lang
+ results = self._search_entities(query, 'property', lang=lang, **kwargs)
+ if not fetch:
+ return results
+ return get_properties([short['id'] for short in results], lang=lang)
+
+ def create(self, thing):
+ if not isinstance(thing, (WikibaseEntity, WikibaseStatement)):
+ raise ValueError(
+ "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement")
+ thing.create(self)
+
+ def save(self, thing):
+ if not isinstance(thing, (WikibaseEntity, WikibaseStatement)):
+ raise ValueError(
+ "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement")
+ thing.save(self)
class WikibaseEntity:
'''
Base class for WikibaseItem and WikibaseProperty
'''
- def __init__(self, dbid=None, rev=None, rev_timestamp=None, labels=[],
- descriptions=[], aliases=[], statements=[], sitelinks=[]):
- self.dbid = dbid
- self.rev = rev
- self.rev_timestamp = rev_timestamp
+ def __init__(self, labels=[], descriptions=[], aliases=[], dbid=None,
+ rev=None, rev_timestamp=None, statements=[], lang=DEFAULT_LANG):
+ if type(labels) is str:
+ labels = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": labels}}
+ if type(descriptions) is str:
+ descriptions = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": descriptions}}
+ if type(aliases) is str:
+ aliases = (aliases, )
+ if type(aliases) in (list, tuple):
+ aliases = {DEFAULT_LANG:
+ [{"language": DEFAULT_LANG, "value": al} for al in aliases]
+ }
self.labels = labels
self.descriptions = descriptions
self.aliases = aliases
+ self.dbid = dbid
+ self.rev = rev
+ self.rev_timestamp = rev_timestamp
self.statements = statements
- self.sitelinks = sitelinks
+ self.entity_type = None
@classmethod
- def from_json(cls, j):
+ def from_dict(cls, j):
we = cls(
dbid=j['id'],
rev=j['lastrevid'],
rev_timestamp=dateutil.parser.parse(j['modified']),
aliases=j['aliases'],
- sitelinks=j['sitelinks'],
descriptions=j['descriptions'],
)
claims = j['claims']
- for c in claims:
- we.statements.append(WikibaseStatement.from_json(c))
+ for prop in claims:
+ for c in claims[prop]:
+ we.statements.append(WikibaseStatement.from_dict(c))
return we
+ def to_dict(self, new=False):
+ d = dict()
+ d['labels'] = self.labels
+ d['descriptions'] = self.descriptions
+ d['aliases'] = self.aliases
+ if self.entity_type:
+ d['type'] = self.entity_type
+ d['statements'] = [s.to_dict() for s in self.statements]
+ if not new:
+ if self.dbid:
+ d['id'] = self.dbid
+ if self.rev:
+ d['lastrevid'] = self.rev
+ return d
+
def add_statement(self, statement):
raise NotImplementedError
@@ -224,29 +310,89 @@ class WikibaseEntity:
def add_alias(self, label):
raise NotImplementedError
+ def create(self, srv, summary=None):
+ # TODO: first have to traverse statements and find/sync any properties?
+ if summary is None:
+ summary = "Created new Entity"
+ data = self.to_dict(new=True)
+ srv._post("wbeditentity", {
+ "bot": srv.is_bot,
+ "new": self.entity_type,
+ "token": srv._get_csrf_token(),
+ "summary": summary,
+ "data": data,
+ })
+
+ def save(self, srv, summary=None):
+ if summary is None:
+ summary = "Changes to existing Entity"
+ data = self.to_dict(new=True)
+ srv._post("wbeditentity", {
+ "bot": srv.is_bot,
+ "new": self.entity_type,
+ "token": srv._get_csrf_token(),
+ "summary": summary,
+ "data": data,
+ })
+
class WikibaseItem(WikibaseEntity):
def __init__(self, *args, **kwargs):
+ self.sitelinks = kwargs.pop('sitelinks', [])
super().__init__(*args, **kwargs)
+ self.entity_type = "item"
def __repr__(self):
- return "<WikibaseItem %s>" % self.qid
+ return "<WikibaseItem %s>" % self.qid()
+
+ @classmethod
+ def from_dict(cls, d):
+ wi = super().from_dict(d)
+ wi.sitelinks = d['sitelinks']
+ return wi
+
+ def to_dict(self):
+ d = super().to_dict()
+ d['sitelinks'] = self.sitelinks
+ return d
+
+ def qid(self):
+ if self.dbid:
+ assert self.dbid.startswith("Q")
+ return self.dbid
class WikibaseProperty(WikibaseEntity):
def __init__(self,*args, **kwargs):
- self.datatype = kwargs.pop('datatype')
+ self.datatype = kwargs.pop('datatype', None)
super().__init__(*args, **kwargs)
+ self.entity_type = "property"
def __repr__(self):
- return "<WikibaseProperty %s>" % self.pid
+ return "<WikibaseProperty %s>" % self.pid()
+
+ @classmethod
+ def from_dict(cls, d):
+ wp = super().from_dict(d)
+ wp.datatype = d['datatype']
+ return wp
+
+ def to_dict(self):
+ d = super().to_dict()
+ d['datatype'] = self.datatype
+ return d
+
+ def pid(self):
+ if self.dbid:
+ assert self.dbid.startswith("P")
+ return self.dbid
class WikibaseStatement:
- def __init__(self, property=None, value=None, qualifiers=[], references=[],
- rank='normal'):
- self.property = property
- self.value = value
+ def __init__(self, qualifiers=[], references=[],
+ rank='normal', mainsnak=None, guid=None):
+ self.guid = guid
+ self.mainsnak = mainsnak
self.qualifiers = qualifiers
self.references = references
self.rank = rank
@@ -257,3 +403,62 @@ class WikibaseStatement:
else:
return "<WikibaseStatement (empty)>"
+ @classmethod
+ def from_dict(cls, d):
+ print(d.keys())
+ if not d['type'] == 'statement':
+ raise WikibaseException("Tried to parse a non-statement claim?")
+ quals = []
+ for prop_set in d.get('qualifiers', {}).values():
+ quals.extend([WikibaseSnak.from_dict(q) for q in prop_set])
+ # TODO: handle references hashes
+ refs = []
+ for ref in d.get("references", []):
+ # We get a list of lists; deep!
+ snaks_lists = ref.get("snaks", {}).values()
+ snaks = []
+ for sl in snaks_lists:
+ snaks.extend(sl)
+ refs.append([WikibaseSnak.from_dict(q) for q in snaks])
+ ws = cls(
+ mainsnak=WikibaseSnak.from_dict(d['mainsnak']),
+ guid=d['id'],
+ rank=d['rank'],
+ qualifiers=quals,
+ references=refs,
+ )
+
+class WikibaseSnak:
+
+ def __init__(self, property=None, value=None, snaktype=None, hash=None, datatype=None):
+ if snaktype is None:
+ if value is None:
+ snaktype = "novalue"
+ else:
+ snaktype = "somevalue"
+ self.property = property
+ self.snaktype = snaktype
+ self.datatype = datatype
+ self.value = value
+ self.hash = None
+
+ def __repr__(self):
+ return "<WikibaseSnak %s has %s>" % (
+ self.property, self.snaktype)
+
+ @classmethod
+ def from_dict(cls, d):
+ ws = cls(
+ property=d['property'],
+ snaktype=d['snaktype'],
+ hash=d.get('hash', None),
+ )
+ if ws.snaktype == 'value':
+ ws.datatype = d['datatype']
+ if ws.datatype == "wikibase-item":
+ ws.value = "Q%d" % int(d['datavalue']['value']['numeric-id'])
+ if ws.datatype == "wikibase-property":
+ ws.value = "P%d" % int(d['datavalue']['value']['numeric-id'])
+ else:
+ ws.value = d['datavalue']['value']
+