Elasticsearch

Utilities for indexing data to Elasticsearch with Apache Beam.

class resiliparse.beam.elasticsearch.ElasticsearchBulkIndex(es_args: Dict[<class 'str'>, Any], default_index: Union[<class 'NoneType'>, <class 'str'>] = None, update: bool = False, parallelism: int = None, buffer_size: int = 3200, chunk_size: int = 800, max_retries: int = 10, initial_backoff: float = 2, max_backoff: float = 600, request_timeout: int = 240, ignore_400: bool = True, dry_run: bool = False)

Bases: PTransform

Bulk-index documents to Elasticsearch.

Expects bulk index actions that can be passed to elasticsearch.helpers.streaming_bulk(). Helpers for creating such actions are given with index_action(), update_action(), and delete_action().

If the records are KV pairs, the key will be used as the document _id and the value is expected to be the document that is to be indexed as a dict. In this case, fields starting with underscores will be discarded and default_index must be set.

Returns the document IDs of successfully indexed documents.

Parameters:
  • es_args – Elasticsearch client arguments

  • default_index – default index name if none is given in the index action

  • update – do a bulk UPDATE instead of a bulk INDEX (requires documents to exist)

  • parallelism – reshuffle to achieve the desired level of parallelism

  • buffer_size – internal buffer size

  • chunk_size – indexing chunk size

  • max_retries – maximum number of retries on recoverable failures

  • initial_backoff – initial retry backoff

  • max_backoff – maximum retry backoff

  • request_timeout – Elasticsearch request timeout

  • ignore_400 – ignore HTTP 400 errors (skip unindexable documents)

  • dry_run – discard documents and do not actually index them

resiliparse.beam.elasticsearch.delete_action(doc_id: str, index: str)

Build a delete bulk action.

Parameters:
  • doc_id – document ID

  • index – index name

Returns:

index action dict

resiliparse.beam.elasticsearch.ensure_index(client: ~elasticsearch.Elasticsearch, index_name: str, index_settings: Dict[<class 'str'>, <class 'str'>] = None, mapping: Dict[<class 'str'>, <class 'str'>] = None)

Helper function to ensure an index exists.

If the index does not exist, it will be created with the given mapping and settings.

Parameters:
  • client – Elasticsearch client instance

  • index_name – index name

  • index_settings – index settings dict

  • mapping – index mapping dict

resiliparse.beam.elasticsearch.index_action(doc_id: str, index: Union[<class 'NoneType'>, <class 'str'>], doc: Dict[<class 'str'>, Any])

Build an index bulk action.

Parameters:
  • doc_id – document ID (None for using autogenerated IDs)

  • index – index name

  • doc – document as dict

Returns:

index action dict

resiliparse.beam.elasticsearch.update_action(doc_id: str, index: str, partial_doc: Dict[<class 'str'>, Any])

Build an update bulk action.

Parameters:
  • doc_id – document ID

  • index – index name

  • partial_doc – partial update document as dict

Returns:

index action dict