Section topics#
Gather Wikidata items from Wikipedia blue links.
Run it!#
You need access to a Wikimedia Foundation’s analytics client, AKA a stat box. Then:
me@my_box:~$ ssh stat1008.eqiad.wmnet # Or pick another one
me@stat1008:~$ export http_proxy=http://webproxy.eqiad.wmnet:8080
me@stat1008:~$ export https_proxy=http://webproxy.eqiad.wmnet:8080
me@stat1008:~$ git clone https://gitlab.wikimedia.org/repos/structured-data/section-topics.git st
me@stat1008:~$ cd st
me@stat1008:~/st$ conda-analytics-clone MY_ENV
me@stat1008:~/st$ source conda-analytics-activate MY_ENV
(MY_ENV) me@stat1008:~/st$ conda env update -n MY_ENV -f conda-environment.yaml
(MY_ENV) me@stat1008:~/st$ python section_topics/pipeline.py MY_WEEKLY_SNAPSHOT
Get –help#
(MY_ENV) me@stat1008:~/st$ python section_topics/pipeline.py --help
usage: pipeline.py [-h] [-w /hdfs_path/to/dir/] [-i /path/to/file.txt]
[-p hdfs_path/to/parquet] [-s /path/to/file.json] [-l N]
[-t hdfs_path/to/parquet] [-q /path/to/file1.txt ...] [--handle-media]
[-m /path/to/file.txt] [--keep-lists-and-tables]
YYYY-MM-DD
Gather section topics from Wikitext
positional arguments:
YYYY-MM-DD snapshot date
options:
-h, --help show this help message and exit
-w /hdfs_path/to/dir/, --work-dir /hdfs_path/to/dir/
Absolute HDFS path to the working directory. Default:
"section_topics" in the current user home
-i /path/to/file.txt, --input-wikis /path/to/file.txt
plain text file of wikis to process, one per line. Default: all
Wikipedias, see "data/wikipedias.txt"
-p hdfs_path/to/parquet, --page-filter hdfs_path/to/parquet
HDFS path to parquet of (wiki, page revision ID) rows to exclude,
as output by "scripts/check_bad_parsing.py". Must be relative to
the working directory. Default: badly parsed ptwiki articles, see
"2022-10_ptwiki_bad" in the working directory
-s /path/to/file.json, --section-title-filter /path/to/file.json
JSON file of `{ wiki: [list of section titles to exclude] }`.
Default: see "data/section_titles_denylist.json"
-l N, --length-filter N
exclude sections whose content length is less than the given
number of characters. Default: 500
-t hdfs_path/to/parquet, --table-filter hdfs_path/to/parquet
HDFS path to parquet with a dataframe to exclude, as output by
"scripts/detect_html_tables.py". Must be relative to the working
directory. The dataframe must include ('wiki_db', 'page_id',
'section_title') columns. Default: ar, bn, cs, es, id, pt, ru
sections with tables, see "20230301_target_wikis_tables" in the
working directory
-q /path/to/file1.txt ..., --qid-filter /path/to/file1.txt ...
plain text file(s) of Wikidata IDs to exclude, one per line.
Default: see "data/qids_for_all_points_in_time.txt" and
"data/qids_for_media_outlets.txt"
--handle-media separate media links and dump them to "media_links". WARNING: the
pipeline execution time will increase to roughly 20 minutes
-m /path/to/file.txt, --media-prefixes /path/to/file.txt
plain text file with media prefixes, one per line. Default: all
Wikipedia ones, see "data/media_prefixes.txt". Ignored if "--
handle-media" is not passed
--keep-lists-and-tables
don't skip sections with at least one standard wikitext list or
table
Get your hands dirty#
Install the development environment:
me@stat1008:~/st$ conda-analytics-clone MY_DEV_ENV
me@stat1008:~/st$ source conda-analytics-activate MY_DEV_ENV
(MY_DEV_ENV) me@stat1008:~/st$ conda env update -n MY_DEV_ENV -f dev-conda-environment.yaml
Test#
(MY_DEV_ENV) me@stat1008:~/st$ python -m pytest tests/
Lint#
(MY_DEV_ENV) me@stat1008:~/st$ pre-commit install
At every git commit
, pre-commit will
run the checks and autofix or tell you what to fix.
Docs#
(MY_DEV_ENV) me@stat1008:~/st$ sphinx-build docs/ docs/_build/
Trigger an Airflow test run#
Follow this walkthrough to simulate a production execution of the pipeline in your stat box. Inspired by this snippet.
Build your artifact#
Pick a branch you want to test from the drop-down menu
Click on the pipeline status button, it should be a green tick
Click on the play button next to
publish_conda_env
, wait until doneOn the left sidebar, go to Packages and registries > Package Registry
Click on the first item in the list, then copy the Asset URL. It should be something like
https://gitlab.wikimedia.org/repos/structured-data/section-topics/-/package_files/1321/download
Get your artifact ready#
me@stat1008:~$ mkdir artifacts
me@stat1008:~$ cd artifacts
me@stat1008:~$ wget -O MY_ARTIFACT MY_COPIED_ASSET_URL
me@stat1008:~$ hdfs dfs -mkdir artifacts
me@stat1008:~$ hdfs dfs -copyFromLocal MY_ARTIFACT artifacts
me@stat1008:~$ hdfs dfs -chmod -R o+rx artifacts
Spin up an Airflow instance#
On your stat box:
me@stat1008:~$ git clone https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags.git
me@stat1008:~$ cd airflow-dags
me@stat1008:~$ sudo -u analytics-privatedata rm -fr /tmp/air/MY_AIRFLOW_HOME # If you've previously run the next command
me@stat1008:~$ sudo -u analytics-privatedata ./run_dev_instance.sh -m /tmp/MY_AIRFLOW_HOME -p MY_PORT platform_eng
On your local box:
me@my_box:~$ ssh -t -N stat1008.eqiad.wmnet -L MY_PORT:stat1008.eqiad.wmnet:MY_PORT
Trigger the DAG run#
Go to
http://localhost:MY_PORT/
on your browserOn the top bar, go to Admin > Variables
Click on the middle button (Edit record) next to the
platform_eng/dags/section_topics_dag.py
KeyUpdate
{ "conda_env" : "hdfs://analytics-hadoop/user/ME/artifacts/MY_ARTIFACT" }
Add any other relevant DAG properties
Click on the Save button
On the top bar, go to DAGs and click on the
section_topics
slider. This should trigger an automatic DAG runClick on
section_topics
You’re all set!
Release#
On the left sidebar, go to CI/CD > Pipelines
Click on the play button, select
trigger_release
If the job went fine, you’ll find a new artifact in the Package Registry
We follow Data Engineering’s
workflow_utils:
- the main
branch is on a .dev
release - releases are made by
removing the .dev
suffix and committing a tag
Deploy#
On the left sidebar, go to CI/CD > Pipelines
Click on the play button and select
bump_on_airflow_dags
. This will create a merge request at airflow-dagsDouble-check it and merge
Deploy the DAGs:
me@my_box:~$ ssh deployment.eqiad.wmnet
me@deploy1002:~$ cd /srv/deployment/airflow-dags/platform_eng/
me@deploy1002:~$ git pull
me@deploy1002:~$ scap deploy
See the docs for more details.
API documentation#
The data pipeline#
The section topics data pipeline is a sequence of pyspark.sql.DataFrame
extraction and transformation functions.
Inputs come from Wikimedia Foundation’s Analytics Data Lake:
Wikipedias wikitext (all Wikipedias by default as per wikipedias.txt )
High-level steps:
gather wikitext of sections at a given hierarchy level via the MediaWiki parser from hell. Default:
section_topics.pipeline.SECTION_LEVEL
, lead section includedoptionally filter out sections that don’t convey relevant content, typically lists and tables
extract Wikidata QIDs from wikilinks: the so-called section topics
optionally filter out noisy topics, typically dates and numbers
compute the relevance score
Output row example:
snapshot |
wiki_db |
page_namespace |
revision_id |
page_qid |
page_id |
page_title |
section_index |
section_title |
topic_qid |
topic_title |
topic_score |
---|---|---|---|---|---|---|---|---|---|---|---|
2023-01-16 |
enwiki |
0 |
1127523670 |
Q36724 |
841 |
Attila |
5 |
Solitary kingship |
Q3623581 |
Arnegisclus |
1.13 |
More documentation lives in MediaWiki.
Functions are ordered by their execution in the pipeline.
- section_topics.pipeline.SECTION_LEVEL = 2#
Section hierarchy level to be extracted. Level 1 is just for page titles, actual sections start from level 2.
- section_topics.pipeline.SECTION_ZERO_TITLE = '### zero ###'#
Reserved title for the lead section, AKA section zero.
- section_topics.pipeline.STRIP_CHARS = '!"#$%&\' *+,-./:;<=>?@[\\]^_`{|}~'#
ASCII punctuation characters to be stripped from section headings. Include the ASCII white space, don’t strip round brackets.
- section_topics.pipeline.SUBSTITUTE_PATTERN = '[\\s_]'#
All kinds of white space to be substituted for the ASCII one; underscores turn into spaces as well.
- section_topics.pipeline.get_monthly_snapshot(weekly)[source]#
Get the most recent monthly snapshot given a weekly one.
A snapshot date is the beginning of the snapshot interval. For instance:
2022-05-16
covers until2022-05-22
(at 23:59:59). May is not over, so the May monthly snapshot is not available yet. Hence return end of April, i.e.,2022-04
2022-05-30
covers until2022-06-05
. May is over, so the May monthly snapshot is available. Hence return end of May, i.e.,2022-05
- Parameters:
weekly (
str
) – aYYYY-MM-DD
date- Raises:
ValueError – if the passed date has an invalid format
- Return type:
- Returns:
the relevant monthly snapshot
- section_topics.pipeline.load_pages(spark, monthly_snapshot, wikis, namespace)[source]#
Load wiki pages with their wikitext through the
section_topics.queries.PAGES
Data Lake query.
- section_topics.pipeline.load_qids(spark, weekly_snapshot, wikis, namespace)[source]#
Load Wikidata QIDs with their page links through the
section_topics.queries.QIDS
Data Lake query.
- section_topics.pipeline.load_redirects(spark, monthly_snapshot, wikis)[source]#
Load wiki page redirects through the
section_topics.queries.REDIRECTS
Data Lake query.
- section_topics.pipeline.apply_filter(df, filter_df, broadcast=False)[source]#
Exclude rows of an input dataframe given a filter dataframe.
Anti-join all filter columns against input ones.
- Parameters:
df (
DataFrame
) – a dataframe to be filteredfilter_df (
DataFrame
) – a dataframe acting as a filter. Columns must be a subset ofdf
broadcast (
bool
) – whether to broadcastfilter_df
, which tells Spark to perform a broadcast hash join, i.e.,pyspark.sql.functions.broadcast()
. Much faster iffilter_df
is small
- Return type:
DataFrame
- Returns:
the filtered
df
dataframe
- section_topics.pipeline.look_up_qids(pages, qids)[source]#
Look up page QIDs through page IDs.
- Parameters:
pages (
DataFrame
) – a dataframe of pages as output byapply_filter()
. Pass the output ofload_pages()
if you want the full raw dataset.qids (
DataFrame
) – a dataframe of page IDs and Wikidata QIDs as output byload_qids()
- Return type:
DataFrame
- Returns:
the
pages
dataframe with page QIDs added
- section_topics.pipeline.wikitext_headings_to_anchors(headings)[source]#
Transform wikitext headings into URL anchors.
For instance,
=== Album in studio ===
becomesAlbum_in_studio
, and serves as a section link in https://it.wikipedia.org/wiki/Gaznevada#Album_in_studio.Anchors that occur more than once get a numeric suffix in the form
anchor_N
.
- section_topics.pipeline.normalize_heading(heading, substitute_re=re.compile('[\\\\s_]'), strip_chars='!"#$%&\\' *+, -./:;<=>?@[\\\\]^_`{|}~')[source]#
Normalize section headings for better matching.
Normalization steps:
remove
_N
suffixes in case of duplicate section anchors as added bywikitext_headings_to_anchors()
replace characters matched by
substitute_re
with one ASCII white spacestrip leading and trailing
strip_chars
lowercase
Note
This normalization is not perfect: it’s a trade-off between several ones, some of which may prevent from converging to a lowest common denominator. However, only extreme edge cases might be affected.
- Parameters:
- Return type:
- Returns:
the normalized section headings
- section_topics.pipeline.normalize_heading_column(column, substitute_pattern='[\\\\s_]', strip_chars='!"#$%&\\' *+, -./:;<=>?@[\\\\]^_`{|}~')[source]#
Normalize a dataframe column of section headings for better matching.
Same as
normalize_heading()
, but implemented with PySpark SQL functions.- Parameters:
- Return type:
Column
- Returns:
the column of normalized section headings
- section_topics.pipeline.normalize_denylist(denylist)[source]#
Normalize a denylist of section headings for better matching.
Apply
normalize_heading()
to the given input.
- section_topics.pipeline.parse_excluding(section_denylist, keep_lists_and_tables, minimum_section_size)[source]#
Currying function that passes a denylist of section headings to the underlying
parse()
PySpark user-defined function (UDF).See also this gist.
- Parameters:
section_denylist (
dict
) – a dict of normalized{ wiki: [list of section headings to exclude] }
. Pass an empty dict for no denylist. You can normalize vianormalize_denylist()
keep_lists_and_tables (
bool
) – whether to keep sections with at least one standard wikitext list or tableminimum_section_size (
int
) – minimum content character length for the section to be considered
- Return type:
udf
- Returns:
the actual UDF
- section_topics.pipeline.extract_sections(articles, keep_lists_and_tables, minimum_section_size, denylist={})[source]#
Apply the
parse()
UDF to extract sections and wikilinks from articles.Create one row per link and don’t select processed columns.
- Parameters:
articles (
DataFrame
) – a dataframe of article pages as output bylook_up_qids()
keep_lists_and_tables (
bool
) – whether to keep sections with at least one standard wikitext list or tableminimum_section_size (
int
) – minimum content character length for the section to be considereddenylist (
dict
) – (optional) a dict of raw{ wiki: [list of section headings to exclude] }
- Return type:
DataFrame
- Returns:
the dataframe of sections and links extracted as per
parse()
- section_topics.pipeline.normalize_wikilinks(link_column)[source]#
Lowercase the first character of wikilink target titles.
The link target is case-sensitive except for the first character.
- Parameters:
link_column (
str
) – a dataframe column name of wikilinks- Return type:
Column
- Returns:
the column of normalized wikilinks.
None
values are kept.
- section_topics.pipeline.handle_media(sections, media_prefixes)[source]#
Separate media links from other ones.
Detect media links via lowercased lookup of namespace prefixes.
- Parameters:
sections (
DataFrame
) – a dataframe of sections and wikilinks as output byextract_sections()
media_prefixes (
list
) – a list of namespace prefixes for media pages
- Return type:
Tuple
[DataFrame
,DataFrame
]- Returns:
the dataframe of media links and the remainder of the
sections
dataframe
- section_topics.pipeline.clean_up_links(sections)[source]#
Filter empty strings and add a column of normalized wikilinks.
- Parameters:
sections (
DataFrame
) – a dataframe of sections and wikilinks as output byextract_sections()
- Return type:
DataFrame
- Returns:
the cleaned
sections
dataframe
- section_topics.pipeline.resolve_redirects(sections, redirects)[source]#
Follow section wikilinks redirects.
If a wikilink points to a redirect page, replace its title with the redirected one. Normalize redirects via
normalize_wikilinks()
.- Parameters:
sections (
DataFrame
) – a dataframe of sections and cleaned wikilinks as output byclean_up_links()
redirects (
DataFrame
) – a dataframe of page titles and redirected page titles as output byload_redirects()
- Return type:
DataFrame
- Returns:
the dataframe of redirected wikilinks. Both original and normalized ones are kept.
- section_topics.pipeline.gather_section_topics(sections, articles, categories)[source]#
Align section wikilinks to their Wikidata QIDs: the so-called section topics.
- Parameters:
sections (
DataFrame
) – a dataframe of sections and normalized wikilinks as output byresolve_redirects()
. Pass the output ofclean_up_links()
to skip wikilinks pointing to redirect pages.articles (
DataFrame
) – a dataframe of articles as output bylook_up_qids()
categories (
DataFrame
) – a dataframe of categories as output bylook_up_qids()
- Return type:
DataFrame
- Returns:
the dataframe of sections and topics (as Wikidata QIDs). Original topic titles are kept.
- section_topics.pipeline.compute_relevance(topics, level='section')[source]#
Compute either the section-level or the article-level relevance score for every section topic.
The section-level score is a standard term frequency-inverted document frequency (TF-IDF). The article-level score is a custom TF-IDF, where TF is across wikis and IDF is within one wiki.
Workflow:
filter null topic QIDs
compute TF numerator: occurrences of one topic QID in a section or page QID
compute TF denominator: occurrences of all topic QIDs in a section or page QID
compute TF: numerator / denominator
join with input on section or page QID and topic QID
compute IDF numerator: count of sections or page QIDs in a wiki
compute IDF denominator: count of sections or page QIDs where a topic QID occurs, in a wiki
compute IDF: log( numerator / denominator )
join with input on wiki and topic QID
compute TF-IDF: TF * IDF
- Parameters:
topics (
DataFrame
) – a dataframe of section topics as output bygather_section_topics()
level (
str
) – (optional) at which level relevance is computed,section
orarticle
- Return type:
DataFrame
- Returns:
the input dataframe with the
tf_idf
column added
- section_topics.pipeline.compose_output(scored_topics, all_topics, snapshot, page_namespace=0)[source]#
Fuse scored topics with null ones and build the output dataset.
- Parameters:
scored_topics (
DataFrame
) – a dataframe of scored topics as output bycompute_relevance()
all_topics (
DataFrame
) – a dataframe of all topics as output bygather_section_topics()
snapshot (
str
) – a weekly snapshot to serve as the constant value for thesnapshot
column of the output dataframepage_namespace (
int
) – (optional) a page namespace to serve as the constant value for thepage_namespace
column of the output dataframe
- Return type:
DataFrame
- Returns:
the final output dataframe
- section_topics.pipeline.parse(wiki, wikitext, section_level=SECTION_LEVEL, section_zero_title=SECTION_ZERO_TITLE)#
Note
This is the core function reponsible for the data heavy lifting. It’s implemented as a PySpark user-defined function (
pyspark.sql.functions.udf()
). It must be called by theparse_excluding()
currying function, which seems the only way to pass an optional denylist of section headings.Parse a wikitext into section indices, titles, and wikilinks. The lead section is included.
A section title is the wikitext heading normalized through
wikitext_headings_to_anchors()
. Clean wikilinks with mwparserfromhell’s strip_code(). This can lead to empty strings that should be filtered.- Parameters:
- Returns:
the list of (index, title, wikilinks) section dictionaries
- Return type:
List[str]
Queries to the Data Lake#
A set of Spark-flavoured SQL queries that gather relevant data from the Wikimedia Foundation’s Analytics Data Lake.
- section_topics.queries.PAGES = "SELECT wiki_db, revision_id, page_id, REPLACE(page_title, ' ', '_') AS page_title, revision_text\nFROM wmf.mediawiki_wikitext_current\nWHERE snapshot='{monthly}' AND wiki_db IN ({wikis}) AND page_namespace={namespace} AND page_redirect_title=''\n"#
Gather pages with their wikitext.
- section_topics.queries.QIDS = "SELECT wiki_db, item_id, page_id\nFROM wmf.wikidata_item_page_link\nWHERE snapshot='{weekly}' AND wiki_db IN ({wikis}) AND page_namespace={namespace}\n"#
Gather Wikidata items with their page links.
- section_topics.queries.REDIRECTS = "SELECT wiki_db, REPLACE(page_title, ' ', '_') AS page_title,\nREPLACE(page_redirect_title, ' ', '_') AS page_redirect_title\nFROM wmf.mediawiki_wikitext_current\nWHERE snapshot='{monthly}' AND wiki_db IN ({wikis}) AND page_namespace=0 AND page_redirect_title!=''\n"#
Gather page redirects.