Source code for betty.wikipedia

"""
Fetch information from Wikipedia.
"""

from __future__ import annotations

import logging
import re
from collections import defaultdict
from collections.abc import (
    Mapping,
)
from contextlib import suppress, contextmanager
from json import JSONDecodeError
from pathlib import Path
from typing import cast, Any, TYPE_CHECKING
from urllib.parse import quote, urlparse

from geopy import Point

from betty.ancestry.file_reference import FileReference
from betty.ancestry.file import File
from betty.ancestry.has_file_references import HasFileReferences
from betty.ancestry.link import HasLinks, Link
from betty.ancestry.place import Place
from betty.asyncio import gather
from betty.concurrent import Lock, AsynchronizedLock, RateLimiter
from betty.fetch import FetchError
from betty.functools import filter_suppress
from betty.locale import (
    negotiate_locale,
    to_locale,
    get_data,
    Localey,
    UNDETERMINED_LOCALE,
)
from betty.locale.error import LocaleError
from betty.locale.localizable import plain
from betty.locale.localized import Localized
from betty.media_type import MediaType
from betty.media_type.media_types import HTML

if TYPE_CHECKING:
    from betty.ancestry import Ancestry
    from betty.locale.localizer import LocalizerRepository
    from betty.fetch import Fetcher
    from collections.abc import (
        Sequence,
        MutableSequence,
        MutableMapping,
        Iterator,
    )


[docs] class NotAPageError(ValueError): """ Raised when a URL does not point to a Wikipedia page. """ pass # pragma: no cover
_URL_PATTERN = re.compile(r"^https?://([a-z]+)\.wikipedia\.org/wiki/([^/?#]+).*$") def _parse_url(url: str) -> tuple[str, str]: match = _URL_PATTERN.fullmatch(url) if match is None: raise NotAPageError return cast(tuple[str, str], match.groups())
[docs] class Summary(Localized): """ A Wikipedia page summary. """
[docs] def __init__(self, locale: str, name: str, title: str, content: str): self._name = name self._title = title self._content = content self._locale = locale
def __eq__(self, other: object) -> bool: if not isinstance(other, Summary): return False if self.name != other.name: return False if self.url != other.url: return False if self.title != other.title: return False if self.content != other.content: return False return True @property def name(self) -> str: """ The page's machine name. """ return self._name @property def url(self) -> str: """ The URL to the web page. """ return f"https://{self.locale}.wikipedia.org/wiki/{self._name}" @property def title(self) -> str: """ The page's human-readable title. """ return self._title @property def content(self) -> str: """ The page's human-readable summary content. """ return self._content
[docs] class Image: """ An image from Wikimedia Commons. """
[docs] def __init__( self, path: Path, media_type: MediaType, title: str, wikimedia_commons_url: str, name: str, ): self._path = path self._media_type = media_type self._title = title self._wikimedia_commons_url = wikimedia_commons_url self._name = name
def __hash__(self) -> int: return hash( (self.path, self.media_type, self.title, self.wikimedia_commons_url) ) @property def path(self) -> Path: """ The path to the image on disk. """ return self._path @property def media_type(self) -> MediaType: """ The image's media type. """ return self._media_type @property def title(self) -> str: """ The human-readable image title. """ return self._title @property def wikimedia_commons_url(self) -> str: """ The URL to the Wikimedia Commons web page for this image. """ return self._wikimedia_commons_url @property def name(self) -> str: """ The image's file name. """ return self._name
class _Retriever: _WIKIPEDIA_RATE_LIMIT = 200 def __init__( self, fetcher: Fetcher, ): self._fetcher = fetcher self._images: MutableMapping[str, Image | None] = {} self._rate_limiter = RateLimiter(self._WIKIPEDIA_RATE_LIMIT) @contextmanager def _catch_exceptions(self) -> Iterator[None]: try: yield except FetchError as error: logging.getLogger(__name__).warning(str(error)) async def _fetch_json(self, url: str, *selectors: str | int) -> Any: async with self._rate_limiter: response = await self._fetcher.fetch(url) try: data = response.json except JSONDecodeError as error: raise FetchError( plain(f"Invalid JSON returned by {url}: {error}") ) from error try: for selector in selectors: data = data[selector] except (LookupError, TypeError) as error: raise FetchError( plain( f"Could not successfully parse the JSON format returned by {url}: {error}" ) ) from error return data async def _get_query_api_data(self, url: str) -> Mapping[str, Any]: return cast(Mapping[str, Any], await self._fetch_json(url, "query", "pages", 0)) async def _get_page_query_api_data( self, page_language: str, page_name: str ) -> Mapping[str, Any]: return await self._get_query_api_data( f"https://{page_language}.wikipedia.org/w/api.php?action=query&titles={quote(page_name)}&prop=langlinks|pageimages|coordinates&lllimit=500&piprop=name&pilicense=free&pilimit=1&coprimary=primary&format=json&formatversion=2" ) async def get_translations( self, page_language: str, page_name: str ) -> Mapping[str, str]: try: api_data = await self._get_page_query_api_data(page_language, page_name) except FetchError as error: logger = logging.getLogger(__name__) logger.warning(str(error)) return {} try: translations_data = api_data["langlinks"] except LookupError: # There may not be any translations. return {} return { translation_data["lang"]: translation_data["title"] for translation_data in translations_data } async def get_summary(self, page_language: str, page_name: str) -> Summary | None: with self._catch_exceptions(): url = f"https://{page_language}.wikipedia.org/api/rest_v1/page/summary/{page_name}" api_data = await self._fetch_json(url) try: return Summary( page_language, page_name, api_data["titles"]["normalized"], ( api_data["extract_html"] if "extract_html" in api_data else api_data["extract"] ), ) except LookupError as error: raise FetchError( plain( f"Could not successfully parse the JSON content returned by {url}: {error}" ) ) from error async def get_image(self, page_language: str, page_name: str) -> Image | None: with self._catch_exceptions(): api_data = await self._get_page_query_api_data(page_language, page_name) try: page_image_name = api_data["pageimage"] except LookupError: # There may not be any images. return None if page_image_name in self._images: return self._images[page_image_name] url = f"https://en.wikipedia.org/w/api.php?action=query&prop=imageinfo&titles=File:{quote(page_image_name)}&iiprop=url|mime|canonicaltitle&format=json&formatversion=2" image_info_api_data = await self._get_query_api_data(url) try: image_info = image_info_api_data["imageinfo"][0] except LookupError as error: raise FetchError( plain( f"Could not successfully parse the JSON content returned by {url}: {error}" ) ) from error async with self._rate_limiter: image_path = await self._fetcher.fetch_file(image_info["url"]) image = Image( image_path, MediaType(image_info["mime"]), # Strip "File:" or any translated equivalent from the beginning of the image's title. image_info["canonicaltitle"][ image_info["canonicaltitle"].index(":") + 1 : ], image_info["descriptionurl"], Path(urlparse(image_info["url"]).path).name, ) return image async def get_place_coordinates( self, page_language: str, page_name: str ) -> Point | None: with self._catch_exceptions(): api_data = await self._get_page_query_api_data(page_language, page_name) try: coordinates = api_data["coordinates"][0] except LookupError: # There may not be any coordinates. return None try: if coordinates["globe"] != "earth": return None return Point(coordinates["lat"], coordinates["lon"]) except LookupError as error: raise FetchError( plain(f"Could not successfully parse the JSON content: {error}") ) from error class _Populator: def __init__( self, ancestry: Ancestry, locales: Sequence[str], localizers: LocalizerRepository, retriever: _Retriever, ): self._ancestry = ancestry self._locales = locales self._localizers = localizers self._retriever = retriever self._image_files: MutableMapping[Image, File] = {} self._image_files_locks: Mapping[Image, Lock] = defaultdict( AsynchronizedLock.threading ) async def populate(self) -> None: await gather( *( self._populate_entity(entity, self._locales) for entity in self._ancestry if isinstance(entity, HasLinks) ) ) async def _populate_entity(self, entity: HasLinks, locales: Sequence[str]) -> None: populations = [self._populate_has_links(entity, locales)] if isinstance(entity, HasFileReferences): populations.append(self._populate_has_file_references(entity)) if isinstance(entity, Place): populations.append(self._populate_place(entity)) await gather(*populations) async def _populate_has_links( self, has_links: HasLinks, locales: Sequence[str] ) -> None: summary_links: MutableSequence[tuple[str, str]] = [] for link in has_links.links: try: page_language, page_name = _parse_url(link.url) except NotAPageError: continue else: try: get_data(page_language) except LocaleError: continue else: summary_links.append((page_language, page_name)) summary = None if not link.label: with suppress(FetchError): summary = await self._retriever.get_summary( page_language, page_name ) await self.populate_link(link, page_language, summary) await self._populate_has_links_with_translation( has_links, locales, summary_links ) async def _populate_has_links_with_translation( self, has_links: HasLinks, locales: Sequence[str], summary_links: MutableSequence[tuple[str, str]], ) -> None: for page_language, page_name in summary_links: page_translations = await self._retriever.get_translations( page_language, page_name ) if len(page_translations) == 0: continue page_translation_locale_datas: Sequence[Localey] = list( filter_suppress(get_data, LocaleError, page_translations.keys()) ) for locale in locales: if locale == page_language: continue added_page_locale_data = negotiate_locale( locale, page_translation_locale_datas ) if added_page_locale_data is None: continue added_page_language = to_locale(added_page_locale_data) added_page_name = page_translations[added_page_language] if (added_page_language, added_page_name) in summary_links: continue added_summary = await self._retriever.get_summary( added_page_language, added_page_name ) if not added_summary: continue added_link = Link(added_summary.url) await self.populate_link(added_link, added_page_language, added_summary) has_links.links.append(added_link) summary_links.append((added_page_language, added_page_name)) return async def populate_link( self, link: Link, summary_language: str, summary: Summary | None = None ) -> None: if link.url.startswith("http:"): link.url = "https:" + link.url[5:] if link.media_type is None: link.media_type = HTML if link.relationship is None: link.relationship = "external" if link.locale is UNDETERMINED_LOCALE: link.locale = summary_language if not link.description: # There are valid reasons for links in locales that aren't supported. with suppress(ValueError): link.description = ( await self._localizers.get_negotiated(link.locale) )._("Read more on Wikipedia.") if summary is not None and not link.label: link.label = summary.title async def _populate_place(self, place: Place) -> None: await self._populate_place_coordinates(place) async def _populate_place_coordinates(self, place: Place) -> None: await gather( *( self._populate_place_coordinates_link(place, link) for link in place.links ) ) async def _populate_place_coordinates_link(self, place: Place, link: Link) -> None: try: page_language, page_name = _parse_url(link.url) except NotAPageError: return else: coordinates = await self._retriever.get_place_coordinates( page_language, page_name ) if coordinates: place.coordinates = coordinates async def _populate_has_file_references( self, has_file_references: HasFileReferences & HasLinks ) -> None: await gather( *( self._populate_has_file_references_link(has_file_references, link) for link in has_file_references.links ) ) async def _populate_has_file_references_link( self, has_file_references: HasFileReferences & HasLinks, link: Link ) -> None: try: page_language, page_name = _parse_url(link.url) except NotAPageError: return else: image = await self._retriever.get_image(page_language, page_name) if not image: return has_file_references.file_references.add( await self._image_file_reference(image) ) async def _image_file_reference(self, image: Image) -> FileReference: async with self._image_files_locks[image]: try: file = self._image_files[image] except KeyError: links = [] for locale in self._locales: localizer = await self._localizers.get(locale) links.append( Link( f"{image.wikimedia_commons_url}?uselang={locale}", label=localizer._( "Description, licensing, and image history" ), description=localizer._( "Find out more about this image on Wikimedia Commons." ), locale=locale, media_type=HTML, ) ) file = File( id=f"wikipedia-{image.title}", name=image.name, path=image.path, media_type=image.media_type, links=links, ) self._image_files[image] = file self._ancestry.add(file) file_reference = FileReference(None, file) self._ancestry.add(file_reference) return file_reference