Resiliparse Itertools

Resiliparse Itertools API documentation.

resiliparse.itertools.exc_loop(it)

Wraps an iterator into another iterator that catches and returns any exceptions raised while evaluating the input iterator.

This is primarily useful for unreliable generators that may throw unpredictably at any time for unknown reasons (e.g., generators reading from a network data source). If you do not want to wrap the entire loop in a try/except clause, you can use an exc_loop() to catch any such exceptions and return them. Remember that a generator will end after throwing an exception, so if the input iterator is a generator, you will have to create a new instance in order to retry or continue.

Parameters:

it (t.Iterable[t.Any]) – original iterator

Returns:

iterator of items and None or None and exception instance

Return type:

t.Iterable[(t.Any or None, BaseException or None)]

resiliparse.itertools.progress_loop(it, ctx=None)

Deprecated since version 0.12.0: Use resiliparse.process_guard.progress_loop() instead.

resiliparse.itertools.warc_retry(archive_iterator, stream_factory, retry_count=3, seek=True)

Wrap a fastwarc.warc.ArchiveIterator to try to continue reading after a stream failure.

Use if the underlying stream is unreliable, such as when reading from a network data source. If an exception other than StopIteration is raised while consuming the iterator, the WARC reading process will be retried up to retry_count times. When a stream failure occurs, archive_iterator will be reinitialised with a new stream object by calling stream_factory.

The new stream object returned by stream_factory() must be seekable. If the stream does not support seeking, you can set seek=False. In this case, the stream position in bytes of the last successfully read record will be passed as a parameter to stream_factory(). The factory is then expected to return a stream that already starts at this exact position (or else reading would restart from the beginning resulting in duplicate records). This is primarily useful for streams that are not inherently seekable, but have an external facility for starting them at the correct position (such as S3 HTTPS streams created from range requests).

As another option, seek can also be None, which instructs warc_retry() to consume the stream up to the continuation position. The stream returned by stream_factory() must start at the beginning and will be read normally, but all bytes before the last record will be skipped over before continuing to parse the contents. This is the most expensive method of “seeking” on a stream and should only be used if the stream is not seekable and there is no other option for starting it at the correct offset.

Exceptions raised inside stream_factory() will be caught and count towards retry_count.

Parameters:
  • archive_iterator (fastwarc.warc.ArchiveIterator) – input WARC iterator

  • stream_factory (t.Callable[[], t.Any] or t.Callable[[int], t.Any]) – callable returning a new stream instance to continue iteration in case of failure

  • retry_count (int) – maximum number of retries before giving up (set to None or zero for no limit)

  • seek (bool or None) – whether to seek to previous position on new stream object (or None for “stream consumption”)

Returns:

wrapped ArchiveIterator

Return type:

t.Iterable[fastwarc.warc.WarcRecord]