Resiliparse Beam Transforms
Resiliparse offers a variety of PTransform
utility classes for processing web archive data with Apache Beam. The transform library provides input formats for processing WARC files, bulk-indexing data to Elasticsearch, as well as a few convenience transforms that work around shortcomings in certain Beam runners.
Installing Resiliparse Beam Transforms
The Resiliparse Beam transforms come bundled with Resiliparse by default, but in order to install the required dependencies (apache_beam
, boto3
, etc.), you need to install Resiliparse with the beam
extra (or alternatively: all
):
pip install 'resiliparse[beam]'
Reading WARC Files
Reading WARC files in Apache Beam is made easy with ReadWarcs
and ReadAllWarcs
, which accept either a file glob or a PCollection
of file metadata objects:
import apache_beam as beam
from resiliparse.beam.fileio import MatchFiles
from resiliparse.beam.warcio import ReadWarcs, ReadAllWarcs
# Count WARC records from local file system
with beam.Pipeline() as pipeline:
record_count = (pipeline
| ReadWarcs('/path/to/warcs/*.warc.gz')
| beam.combiners.Count.Globally())
# Count WARC records from S3 storage (requires a proper S3 client config)
with beam.Pipeline() as pipeline:
record_count = (pipeline
| ReadWarcs('s3://mybucket/warcs/*.warc.gz')
| beam.combiners.Count.Globally())
# Count WARC records from pre-matched file names
with beam.Pipeline() as pipeline:
record_count = (pipeline
| MatchFiles('/path/to/warc/*.warc.gz')
| ReadAllWarcs()
| beam.combiners.Count.Globally())
The WARC files are read with FastWARC. If you want to customise the WARC iterator, you can pass an additional dict of parameters to warc_args
, which will be used for instantiating the ArchiveIterator
.
Note
resiliparse.beam.fileio.MatchFiles
is a replacement for Beam’s own MatchFiles
that forces a fusion break by adding a Reshuffle
transform. This is meant primarily for fixing a major shortcoming of the FlinkRunner, which does not support distributing splits on its own at the moment. If you don’t need (or don’t want) to reshuffle the matched file names (e.g., because you use the DataflowRunner), either set the shuffle
parameter to False
or use the original Beam implementation.
Reading Text Files
For reading text files line by line, Resiliparse offers ReadFromText
and ReadAllFromText
. The two transform behave similar to Beam’s own implementations apache_beam.io.textio.ReadFromText
and apache_beam.io.textio.ReadAllFromText
, but enforce fusion breaks for ensuring that splittable text files are processed in parallel (similar to what MatchFiles
does). If a text file is uncompressed, it will be split into chunks of up to desired_split_size
bytes. The splits are reshuffled and then processes in parallel. This is to avoid input bottlenecks on Beam runners that do not (yet) support runner-initiated splits, such as the FlinkRunner.
import apache_beam as beam
from resiliparse.beam.fileio import MatchFiles
from resiliparse.beam.textio import ReadFromText, ReadAllFromText
# Count lines from text files
with beam.Pipeline() as pipeline:
line_count = (pipeline
| ReadFromText(
'/path/to/text/files/*.txt',
shuffle_splits=True # shuffle textfile splits (if splittable)
shuffle_names=True # shuffle matched filenames (use if you have lots of files)
desired_split_size=64*1024*1024 # desired split size in bytes (default: 64 MB)
min_split_size=1024*1024 # minimum split size in bytes (default: 1 MB)
)
| beam.combiners.Count.Globally())
# Count lines from pre-matched file names
with beam.Pipeline() as pipeline:
line_count = (pipeline
| MatchFiles('/path/to/text/files/*.txt')
| ReadAllFromText(shuffle_splits=True) # No need to set shuffle_names here
| beam.combiners.Count.Globally())
Note
ReadFromText
and ReadAllFromText
support only \n
-separated files at the moment, but can be used for reading lines with binary content as well if coder
is set to None
. By default coder
is set to StrUtf8Coder
, which uses Resiliparse’s bytes_to_str
for decoding lines to handle broken encodings better (the default Beam implementation would raise an exception here).
Bulk-indexing to Elasticsearch
Resiliparse provides a transform for bulk-indexing documents to an Elasticsearch cluster. It expects either Elasticsearch bulk index actions (also see Bulk helpers for more information) or key/value pairs of document IDs and document dicts. The former can also be used for upserts and deletes, the latter only for indexing new documents.
from resiliparse.beam import elasticsearch as es
# Arguments for creating an Elasticsearch Python client
es_client_args = {hosts=['localhost:9200'], use_ssl=True}
# Index documents to Elasticsearch cluster
# (returns the IDs of successfully index documents)
with beam.Pipeline() as pipeline:
_ = (pipeline
| beam.Create([
# index_action() is a helper for creating a valid index action from
# a document ID, an index name, and a document dict.
es.index_action('doc1', 'index_name', {'field': 'value'}),
es.index_action('doc2', 'index_name', {'field': 'value'}),
es.index_action('doc3', 'index_name', {'field': 'value'}),
es.index_action(None, 'index_name', {'field': 'value'}), # Auto ID
es.delete_action('doc1', 'index_name'), # Delete
])
| es.ElasticsearchBulkIndex(es_client_args))
# You can also pass KV pairs (requires you to define a default index)
with beam.Pipeline() as pipeline:
_ = (pipeline
| beam.Create([
('doc1', {'field': 'value'}),
('doc2', {'field': 'value'}),
('doc3', {'field': 'value'}),
])
| es.ElasticsearchBulkIndex(es_client_args, default_index='index_name'))