Resiliparse Itertools
Resiliparse Itertools API documentation.
- resiliparse.itertools.exc_loop(it)
Wraps an iterator into another iterator that catches and returns any exceptions raised while evaluating the input iterator.
This is primarily useful for unreliable generators that may throw unpredictably at any time for unknown reasons (e.g., generators reading from a network data source). If you do not want to wrap the entire loop in a
try/except
clause, you can use anexc_loop()
to catch any such exceptions and return them. Remember that a generator will end after throwing an exception, so if the input iterator is a generator, you will have to create a new instance in order to retry or continue.- Parameters:
it (t.Iterable[t.Any]) – original iterator
- Returns:
iterator of items and
None
orNone
and exception instance- Return type:
t.Iterable[(t.Any or None, BaseException or None)]
- resiliparse.itertools.progress_loop()
Deprecated since version 0.12.0: Use
resiliparse.process_guard.progress_loop()
instead.
- resiliparse.itertools.warc_retry(archive_iterator, stream_factory, retry_count=3, seek=True)
Wrap a
fastwarc.warc.ArchiveIterator
to try to continue reading after a stream failure.Use if the underlying stream is unreliable, such as when reading from a network data source. If an exception other than
StopIteration
is raised while consuming the iterator, the WARC reading process will be retried up to retry_count times. When a stream failure occurs,archive_iterator
will be reinitialised with a new stream object by callingstream_factory
.The new stream object returned by
stream_factory()
must be seekable. If the stream does not support seeking, you can setseek=False
. In this case, the stream position in bytes of the last successfully read record will be passed as a parameter tostream_factory()
. The factory is then expected to return a stream that already starts at this exact position (or else reading would restart from the beginning resulting in duplicate records). This is primarily useful for streams that are not inherently seekable, but have an external facility for starting them at the correct position (such as S3 HTTPS streams created from range requests).As another option,
seek
can also beNone
, which instructswarc_retry()
to consume the stream up to the continuation position. The stream returned bystream_factory()
must start at the beginning and will be read normally, but all bytes before the last record will be skipped over before continuing to parse the contents. This is the most expensive method of “seeking” on a stream and should only be used if the stream is not seekable and there is no other option for starting it at the correct offset.Exceptions raised inside
stream_factory()
will be caught and count towardsretry_count
.- Parameters:
archive_iterator (fastwarc.warc.ArchiveIterator) – input WARC iterator
stream_factory (t.Callable[[], t.Any] or t.Callable[[int], t.Any]) – callable returning a new stream instance to continue iteration in case of failure
retry_count (int) – maximum number of retries before giving up (set to
None
or zero for no limit)seek (bool or None) – whether to seek to previous position on new stream object (or
None
for “stream consumption”)
- Returns:
wrapped
ArchiveIterator
- Return type:
t.Iterable[fastwarc.warc.WarcRecord]