Resiliparse Itertools

Resiliparse Itertools are a collection of convenient and robust helper functions for iterating over data from unreliable sources using other tools from the Resiliparse toolkit.

Progress Loops

Progress loops are a convenience tool for iterating data with an active TimeGuard context. Since running a for loop in a TimeGuard with progress being reported after each iteration is a very common pattern, you can use the progress_loop() pass-through generator as a shortcut:

from time import sleep
from resiliparse.itertools import progress_loop
from resiliparse.process_guard import time_guard, ExecutionTimeout

@time_guard(timeout=10)
def foo():
    for _ in progress_loop(range(1000)):
        try:
            sleep(0.1)
        except ExecutionTimeout:
            break

foo()

In cases where context auto-detection doesn’t work, the active guard context can be passed to the generator via the ctx parameter:

with time_guard(timeout=10) as guard:
    for _ in progress_loop(range(1000), ctx=guard):
        try:
            sleep(0.1)
        except ExecutionTimeout:
            break

Exception Loops

Exception loops wrap an iterator to catch and return any exceptions raised while evaluating the input iterator.

This is primarily useful for unreliable generators that may throw unpredictably at any time for unknown reasons (e.g., generators reading from a network data source). If you do not want to wrap the entire loop in a try/except clause, you can use an exc_loop() to catch any such exceptions and return them.

from resiliparse.itertools import exc_loop

def throw_gen():
    from random import random
    for i in range(100):
        if random() <= 0.1:
            raise Exception('Random exception')
        yield i

for val, exc in exc_loop(throw_gen()):
    if exc is not None:
        print('Exception raised:', exc)
        break

    print(val)

Note

Remember that a generator will end after throwing an exception, so if the input iterator is a generator, you will have to create a new instance in order to retry or continue.

WARC Retry Loops

The warc_retry() helper wraps a fastwarc.warc.ArchiveIterator instance to try to continue reading after a (fatal) stream failure.

Note

FastWARC needs to be installed for this.

Use a WARC retry loop if the underlying stream is unreliable, such as when reading from a network data source that is expected to fail at any time. If an exception other than StopIteration is raised while consuming the iterator, the WARC reading process will be retried up to retry_count times (default: 3). After a failure, the fastwarc.warc.ArchiveIterator will be reinitialised with a new stream object by calling stream_factory. The new stream object returned by stream_factory() must be seekable.

from fastwarc.warc import ArchiveIterator
from resiliparse.itertools import warc_retry

def stream_factory():
    return open('warcfile.warc.gz', 'rb')

for record in warc_retry(ArchiveIterator(stream_factory()), stream_factory, retry_count=3):
    pass

If the stream does not support seeking, you can set seek=False. In this case, the position in bytes of the last successful record will be passed as a parameter to stream_factory. The factory is expected to return a new stream that already starts at this position:

from fastwarc.warc import ArchiveIterator
from resiliparse.itertools import warc_retry

def stream_factory(offset):
    stream = open('warcfile.warc.gz', 'rb')
    stream.seek(offset)
    return stream

for record in warc_retry(ArchiveIterator(stream_factory(0)), stream_factory, seek=False):
    pass

Important

Make sure the stream starts at exactly the given position or else you will end up with either duplicate or skipped records or the ArchiveIterator will fail. The first record at this position will be skipped automatically.

As a last option, you can also set seek=None, which will instruct warc_retry() to consume all bytes up to the previous position. This is the most expensive way of “seeking” on a stream and should be used only if the other two methods do not work for you.

Note

Exceptions raised inside stream_factory() will be caught and count towards retry_count.