Skip to content

Importer

DataImporter

Main manager class for the data importer.

This is where all the action happens and where all the databases, views, and relationships are setup.

Source code in dataimporter/importer.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
class DataImporter:
    """
    Main manager class for the data importer.

    This is where all the action happens and where all the databases, views, and
    relationships are setup.
    """

    def __init__(self, config: Config):
        """
        Don't use this init directly, call use_importer above to ensure locking is
        managed correctly.

        :param config: a config object
        """
        self.config = config
        # client for mongo and elasticsearch via Splitgill
        self.client = SplitgillClient(
            config.get_mongo_client(),
            config.get_elasticsearch_client(),
            config.get_mongo_database_name(),
        )

        # make sure the data path exists
        self.config.data_path.mkdir(exist_ok=True)
        # create all the paths for data storage
        self.stores_path = config.data_path / 'stores'
        self.views_path = config.data_path / 'views'
        # make sure they exist
        self.stores_path.mkdir(exist_ok=True)
        self.views_path.mkdir(exist_ok=True)

        # create the stores we need (note not eaudit!)
        ecatalogue_store = Store(self.stores_path / 'ecatalogue')
        emultimedia_store = Store(self.stores_path / 'emultimedia')
        etaxonomy_store = Store(self.stores_path / 'etaxonomy')
        gbif_store = Store(self.stores_path / 'gbif')
        self.stores = [ecatalogue_store, emultimedia_store, etaxonomy_store, gbif_store]

        # create the views we need
        # mss published name does not have the sg_prefix on the front so that it can
        # be separated from the data portal's resources (it becomes impossible to clash
        # names by creating a resource called "mss")
        mss_view = MSSView(self.views_path / 'mss', emultimedia_store, 'mss')
        image_view = ImageView(
            self.views_path / 'image', emultimedia_store, config.iiif_base_url
        )
        taxonomy_view = TaxonomyView(self.views_path / 'taxonomy', etaxonomy_store)
        gbif_view = GBIFView(self.views_path / 'gbif', gbif_store)
        artefact_view = ArtefactView(
            self.views_path / 'artefact',
            ecatalogue_store,
            image_view,
            f'{config.sg_prefix}{config.artefact_id}',
        )
        indexlot_view = IndexLotView(
            self.views_path / 'indexlot',
            ecatalogue_store,
            image_view,
            taxonomy_view,
            f'{config.sg_prefix}{config.indexlot_id}',
        )
        mammal_part_view = MammalPartView(
            self.views_path / 'mammalpart', ecatalogue_store
        )
        specimen_view = SpecimenView(
            self.views_path / 'specimen',
            ecatalogue_store,
            image_view,
            taxonomy_view,
            gbif_view,
            mammal_part_view,
            f'{config.sg_prefix}{config.specimen_id}',
        )
        prep_view = PreparationView(
            self.views_path / 'preparation',
            ecatalogue_store,
            specimen_view,
            f'{config.sg_prefix}{config.preparation_id}',
        )
        self.views = [
            image_view,
            mss_view,
            taxonomy_view,
            gbif_view,
            artefact_view,
            indexlot_view,
            specimen_view,
            prep_view,
            mammal_part_view,
        ]

        # this is where store the last date we have fully imported from EMu
        self.emu_status = EMuStatus(config.data_path / 'emu_last_date.txt')

    def get_store(self, name: str) -> Store:
        """
        Get the store with the given name. If the store doesn't exist, a StoreNotFound
        exception is raised.

        :param name: the name of the store
        :return: the Store instance
        """
        for store in self.stores:
            if store.name == name:
                return store
        raise StoreNotFound(name)

    def get_view(self, name: str) -> View:
        """
        Get the view with the given name. If the view doesn't exist, a ViewNotFound
        exception is raised.

        :param name: the name of the view
        :return: the View instance
        """
        for view in self.views:
            if view.name == name:
                return view
        raise ViewNotFound(name)

    def get_database(self, view: Union[str, View]) -> SplitgillDatabase:
        """
        Returns a new SplitgillDatabase instance for the given view. If the view doesn't
        have an associated SplitgillDatabase name, then a ViewDoesNotHaveDatabase
        exception is raised. If the view parameter is passed as a str and the view does
        not exist, a ViewNotFound exception is raised.

        :param view: a View instance or a view's name
        :return: a SplitgillDatabase instance
        """
        if isinstance(view, str):
            view = self.get_view(view)
        if not view.is_published:
            raise ViewIsNotPublished(view)
        return SplitgillDatabase(view.published_name, self.client)

    def queue_changes(self, records: Iterable[SourceRecord], store_name: str):
        """
        Update the records in the store with the given name. The views based on the DB
        that is being updated will also be updated.

        :param records: an iterable of records to queue
        :param store_name: the name of the store to update
        """
        store = self.get_store(store_name)
        # find the views based on the store
        views = [view for view in self.views if view.store.name == store.name]

        for batch in partition(records, 1000):
            store.put(batch)
            for view in views:
                view.queue(batch)

    def queue_emu_changes(self) -> Optional[date]:
        """
        Look for new EMu dumps and if any are found beyond the date of the last queued
        EMu import, add the next day's data to the stores and view queues.

        :return the date that was queued or None if no dumps were found
        """
        last_queued = self.emu_status.get()
        dump_sets = find_emu_dumps(self.config.dumps_path, after=last_queued)
        if not dump_sets:
            return None

        next_day_dump_set = dump_sets[0]

        store_names = {store.name for store in self.stores if store.name != 'gbif'}
        for dump in next_day_dump_set.dumps:
            # normal tables are immediately processable, but if the dump is from
            # the eaudit table we need to do some additional work because each
            # audit record refers to a potentially different table from which it
            # is deleting a record
            if dump.table != 'eaudit':
                self.queue_changes(dump.read(), dump.table)
            else:
                # wrap the dump stream in a filter to only allow through records
                # we want to process
                filtered_dump = filter(
                    partial(is_valid_eaudit_record, tables=store_names),
                    dump.read(),
                )
                # queue the changes to each table's database in turn
                for table, records in groupby(
                    filtered_dump, key=lambda record: record.data['AudTable']
                ):
                    # convert the raw audit records into delete records as we
                    # queue them
                    self.queue_changes(map(convert_eaudit_to_delete, records), table)

        self.emu_status.update(next_day_dump_set.date)
        return next_day_dump_set.date

    def queue_gbif_changes(self):
        """
        Retrieve the latest GBIF records, check which ones have changed compared to the
        ones stored in the gbif data DB, and then queue them into the GBIF view.
        """
        self.queue_changes(
            get_changed_records(
                self.get_store('gbif'),
                self.config.gbif_username,
                self.config.gbif_password,
            ),
            'gbif',
        )

    def redact_records(
        self, store_name: str, record_ids: List[str], redaction_id: str
    ) -> int:
        """
        Deletes the given record IDs from the named DataDB. This doesn't delete any data
        from the views, MongoDB, or Elasticsearch. This will need to be done manually.

        This operation should only be performed in extreme circumstances where we need
        to remove data that should never have been released and should not be searchable
        at all, even in historic searches. This is because it breaks the core idea of
        the versioned Splitgill system.

        :param store_name: the name of the store to remove the records from
        :param record_ids: a list of str record IDs to redact
        :param redaction_id: an ID for the redaction, so it's traceable
        :return: the number of records deleted
        """
        store = self.get_store(store_name)
        return store.redact(record_ids, redaction_id)

    def release_records(self, up_to: int):
        """
        Releases embargoed records from each store up to the given up_to value. Released
        records queued to the views impacted.

        :param up_to: records with an embargo before this timestamp will be released
        """
        for store in self.stores:
            self.queue_changes(store.release_records(up_to), store.name)

    def add_to_mongo(self, view_name: str, everything: bool = False) -> Optional[int]:
        """
        Add the queued changes in the given view to MongoDB.

        :param view_name: the name of the view
        :param everything: whether to add all records to MongoDB even if they haven't
            changed. Default: False.
        :return: the new version committed, or None if no changes were made
        """
        self.release_records(now())

        view = self.get_view(view_name)
        database = self.get_database(view)

        if everything:
            changed_records = view.iter_all()
        else:
            changed_records = view.iter_changed()
        records = (
            Record(record.id, view.transform(record))
            if record and view.is_publishable(record)
            else Record.delete(record.id)
            for record in changed_records
        )

        database.ingest(records, commit=False, modified_field='modified')
        # send the options anyway, even if there's no change to them
        database.update_options(PARSING_OPTIONS, commit=False)
        committed = database.commit()
        # flush the queue as we've handled everything in it now
        view.flush()
        return committed

    def sync_to_elasticsearch(self, view_name: str, resync: bool = False):
        """
        Synchronise the given Splitgill database with Elasticsearch.

        :param view_name: the name of the view the Splitgill database will use
        :param resync: whether to resync all records to Elasticsearch even if they
            haven't changed
        """
        view = self.get_view(view_name)
        database = self.get_database(view)
        bulk_options = BulkOptions(
            chunk_size=self.config.bo_chunk_size,
            worker_count=self.config.bo_worker_count,
        )
        database.sync(resync=resync, bulk_options=bulk_options)

    def force_merge(self, view_name: str) -> dict:
        """
        Performs a force merge on the given view name's Elasticsearch indices. This may
        take a while! This is good for cleaning up deleted documents.

        :param view_name:
        :return:
        """
        view = self.get_view(view_name)
        database = self.get_database(view)
        client = self.client.elasticsearch
        return client.options(request_timeout=None).indices.forcemerge(
            index=database.indices.wildcard,
            allow_no_indices=True,
            wait_for_completion=True,
            max_num_segments=1,
        )

    def purge_unsuitable_records(self, view_name: str) -> Tuple[int, int, int]:
        """
        Delete existing records that no longer match the is_member and is_publishable
        rules for this view. Depublished records should be handled by the regular ingest
        process anyway, but this can be used for ad-hoc purging. It will also handle the
        rarer case of records that are no longer members.

        :param view_name: the name of the view to purge records from
        :return: the number of non-member records found, the number of non-publishable
            records found, and the total count of records deleted by this method
        """
        view = self.get_view(view_name)
        database = self.get_database(view)
        to_delete = []
        not_member_count = 0
        not_published_count = 0
        # ignore deleted records (i.e. records without an _id field)
        for mongo_record in database.iter_records(
            filter={'data._id': {'$exists': True}}
        ):
            source_record = view.store.get_record(mongo_record.id)
            if not source_record:
                continue
            if not view.is_member(source_record):
                not_member_count += 1
                to_delete.append(Record.delete(source_record.id))
            if not view.is_publishable(source_record):
                not_published_count += 1
                to_delete.append(Record.delete(source_record.id))

        commit_result = database.ingest(to_delete, modified_field='modified')
        return not_member_count, not_published_count, commit_result.updated

    def __enter__(self) -> 'DataImporter':
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        """
        Close the views and data DBs down.
        """
        for view in self.views:
            view.close()
        for store in self.stores:
            store.close()

__init__(config)

Don't use this init directly, call use_importer above to ensure locking is managed correctly.

Parameters:

Name Type Description Default
config Config

a config object

required
Source code in dataimporter/importer.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def __init__(self, config: Config):
    """
    Don't use this init directly, call use_importer above to ensure locking is
    managed correctly.

    :param config: a config object
    """
    self.config = config
    # client for mongo and elasticsearch via Splitgill
    self.client = SplitgillClient(
        config.get_mongo_client(),
        config.get_elasticsearch_client(),
        config.get_mongo_database_name(),
    )

    # make sure the data path exists
    self.config.data_path.mkdir(exist_ok=True)
    # create all the paths for data storage
    self.stores_path = config.data_path / 'stores'
    self.views_path = config.data_path / 'views'
    # make sure they exist
    self.stores_path.mkdir(exist_ok=True)
    self.views_path.mkdir(exist_ok=True)

    # create the stores we need (note not eaudit!)
    ecatalogue_store = Store(self.stores_path / 'ecatalogue')
    emultimedia_store = Store(self.stores_path / 'emultimedia')
    etaxonomy_store = Store(self.stores_path / 'etaxonomy')
    gbif_store = Store(self.stores_path / 'gbif')
    self.stores = [ecatalogue_store, emultimedia_store, etaxonomy_store, gbif_store]

    # create the views we need
    # mss published name does not have the sg_prefix on the front so that it can
    # be separated from the data portal's resources (it becomes impossible to clash
    # names by creating a resource called "mss")
    mss_view = MSSView(self.views_path / 'mss', emultimedia_store, 'mss')
    image_view = ImageView(
        self.views_path / 'image', emultimedia_store, config.iiif_base_url
    )
    taxonomy_view = TaxonomyView(self.views_path / 'taxonomy', etaxonomy_store)
    gbif_view = GBIFView(self.views_path / 'gbif', gbif_store)
    artefact_view = ArtefactView(
        self.views_path / 'artefact',
        ecatalogue_store,
        image_view,
        f'{config.sg_prefix}{config.artefact_id}',
    )
    indexlot_view = IndexLotView(
        self.views_path / 'indexlot',
        ecatalogue_store,
        image_view,
        taxonomy_view,
        f'{config.sg_prefix}{config.indexlot_id}',
    )
    mammal_part_view = MammalPartView(
        self.views_path / 'mammalpart', ecatalogue_store
    )
    specimen_view = SpecimenView(
        self.views_path / 'specimen',
        ecatalogue_store,
        image_view,
        taxonomy_view,
        gbif_view,
        mammal_part_view,
        f'{config.sg_prefix}{config.specimen_id}',
    )
    prep_view = PreparationView(
        self.views_path / 'preparation',
        ecatalogue_store,
        specimen_view,
        f'{config.sg_prefix}{config.preparation_id}',
    )
    self.views = [
        image_view,
        mss_view,
        taxonomy_view,
        gbif_view,
        artefact_view,
        indexlot_view,
        specimen_view,
        prep_view,
        mammal_part_view,
    ]

    # this is where store the last date we have fully imported from EMu
    self.emu_status = EMuStatus(config.data_path / 'emu_last_date.txt')

add_to_mongo(view_name, everything=False)

Add the queued changes in the given view to MongoDB.

Parameters:

Name Type Description Default
view_name str

the name of the view

required
everything bool

whether to add all records to MongoDB even if they haven't changed. Default: False.

False

Returns:

Type Description
Optional[int]

the new version committed, or None if no changes were made

Source code in dataimporter/importer.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def add_to_mongo(self, view_name: str, everything: bool = False) -> Optional[int]:
    """
    Add the queued changes in the given view to MongoDB.

    :param view_name: the name of the view
    :param everything: whether to add all records to MongoDB even if they haven't
        changed. Default: False.
    :return: the new version committed, or None if no changes were made
    """
    self.release_records(now())

    view = self.get_view(view_name)
    database = self.get_database(view)

    if everything:
        changed_records = view.iter_all()
    else:
        changed_records = view.iter_changed()
    records = (
        Record(record.id, view.transform(record))
        if record and view.is_publishable(record)
        else Record.delete(record.id)
        for record in changed_records
    )

    database.ingest(records, commit=False, modified_field='modified')
    # send the options anyway, even if there's no change to them
    database.update_options(PARSING_OPTIONS, commit=False)
    committed = database.commit()
    # flush the queue as we've handled everything in it now
    view.flush()
    return committed

close()

Close the views and data DBs down.

Source code in dataimporter/importer.py
423
424
425
426
427
428
429
430
def close(self):
    """
    Close the views and data DBs down.
    """
    for view in self.views:
        view.close()
    for store in self.stores:
        store.close()

force_merge(view_name)

Performs a force merge on the given view name's Elasticsearch indices. This may take a while! This is good for cleaning up deleted documents.

Parameters:

Name Type Description Default
view_name str
required

Returns:

Type Description
dict
Source code in dataimporter/importer.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def force_merge(self, view_name: str) -> dict:
    """
    Performs a force merge on the given view name's Elasticsearch indices. This may
    take a while! This is good for cleaning up deleted documents.

    :param view_name:
    :return:
    """
    view = self.get_view(view_name)
    database = self.get_database(view)
    client = self.client.elasticsearch
    return client.options(request_timeout=None).indices.forcemerge(
        index=database.indices.wildcard,
        allow_no_indices=True,
        wait_for_completion=True,
        max_num_segments=1,
    )

get_database(view)

Returns a new SplitgillDatabase instance for the given view. If the view doesn't have an associated SplitgillDatabase name, then a ViewDoesNotHaveDatabase exception is raised. If the view parameter is passed as a str and the view does not exist, a ViewNotFound exception is raised.

Parameters:

Name Type Description Default
view Union[str, View]

a View instance or a view's name

required

Returns:

Type Description
SplitgillDatabase

a SplitgillDatabase instance

Source code in dataimporter/importer.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def get_database(self, view: Union[str, View]) -> SplitgillDatabase:
    """
    Returns a new SplitgillDatabase instance for the given view. If the view doesn't
    have an associated SplitgillDatabase name, then a ViewDoesNotHaveDatabase
    exception is raised. If the view parameter is passed as a str and the view does
    not exist, a ViewNotFound exception is raised.

    :param view: a View instance or a view's name
    :return: a SplitgillDatabase instance
    """
    if isinstance(view, str):
        view = self.get_view(view)
    if not view.is_published:
        raise ViewIsNotPublished(view)
    return SplitgillDatabase(view.published_name, self.client)

get_store(name)

Get the store with the given name. If the store doesn't exist, a StoreNotFound exception is raised.

Parameters:

Name Type Description Default
name str

the name of the store

required

Returns:

Type Description
Store

the Store instance

Source code in dataimporter/importer.py
174
175
176
177
178
179
180
181
182
183
184
185
def get_store(self, name: str) -> Store:
    """
    Get the store with the given name. If the store doesn't exist, a StoreNotFound
    exception is raised.

    :param name: the name of the store
    :return: the Store instance
    """
    for store in self.stores:
        if store.name == name:
            return store
    raise StoreNotFound(name)

get_view(name)

Get the view with the given name. If the view doesn't exist, a ViewNotFound exception is raised.

Parameters:

Name Type Description Default
name str

the name of the view

required

Returns:

Type Description
View

the View instance

Source code in dataimporter/importer.py
187
188
189
190
191
192
193
194
195
196
197
198
def get_view(self, name: str) -> View:
    """
    Get the view with the given name. If the view doesn't exist, a ViewNotFound
    exception is raised.

    :param name: the name of the view
    :return: the View instance
    """
    for view in self.views:
        if view.name == name:
            return view
    raise ViewNotFound(name)

purge_unsuitable_records(view_name)

Delete existing records that no longer match the is_member and is_publishable rules for this view. Depublished records should be handled by the regular ingest process anyway, but this can be used for ad-hoc purging. It will also handle the rarer case of records that are no longer members.

Parameters:

Name Type Description Default
view_name str

the name of the view to purge records from

required

Returns:

Type Description
Tuple[int, int, int]

the number of non-member records found, the number of non-publishable records found, and the total count of records deleted by this method

Source code in dataimporter/importer.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def purge_unsuitable_records(self, view_name: str) -> Tuple[int, int, int]:
    """
    Delete existing records that no longer match the is_member and is_publishable
    rules for this view. Depublished records should be handled by the regular ingest
    process anyway, but this can be used for ad-hoc purging. It will also handle the
    rarer case of records that are no longer members.

    :param view_name: the name of the view to purge records from
    :return: the number of non-member records found, the number of non-publishable
        records found, and the total count of records deleted by this method
    """
    view = self.get_view(view_name)
    database = self.get_database(view)
    to_delete = []
    not_member_count = 0
    not_published_count = 0
    # ignore deleted records (i.e. records without an _id field)
    for mongo_record in database.iter_records(
        filter={'data._id': {'$exists': True}}
    ):
        source_record = view.store.get_record(mongo_record.id)
        if not source_record:
            continue
        if not view.is_member(source_record):
            not_member_count += 1
            to_delete.append(Record.delete(source_record.id))
        if not view.is_publishable(source_record):
            not_published_count += 1
            to_delete.append(Record.delete(source_record.id))

    commit_result = database.ingest(to_delete, modified_field='modified')
    return not_member_count, not_published_count, commit_result.updated

queue_changes(records, store_name)

Update the records in the store with the given name. The views based on the DB that is being updated will also be updated.

Parameters:

Name Type Description Default
records Iterable[SourceRecord]

an iterable of records to queue

required
store_name str

the name of the store to update

required
Source code in dataimporter/importer.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def queue_changes(self, records: Iterable[SourceRecord], store_name: str):
    """
    Update the records in the store with the given name. The views based on the DB
    that is being updated will also be updated.

    :param records: an iterable of records to queue
    :param store_name: the name of the store to update
    """
    store = self.get_store(store_name)
    # find the views based on the store
    views = [view for view in self.views if view.store.name == store.name]

    for batch in partition(records, 1000):
        store.put(batch)
        for view in views:
            view.queue(batch)

queue_emu_changes()

Look for new EMu dumps and if any are found beyond the date of the last queued EMu import, add the next day's data to the stores and view queues.

Source code in dataimporter/importer.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def queue_emu_changes(self) -> Optional[date]:
    """
    Look for new EMu dumps and if any are found beyond the date of the last queued
    EMu import, add the next day's data to the stores and view queues.

    :return the date that was queued or None if no dumps were found
    """
    last_queued = self.emu_status.get()
    dump_sets = find_emu_dumps(self.config.dumps_path, after=last_queued)
    if not dump_sets:
        return None

    next_day_dump_set = dump_sets[0]

    store_names = {store.name for store in self.stores if store.name != 'gbif'}
    for dump in next_day_dump_set.dumps:
        # normal tables are immediately processable, but if the dump is from
        # the eaudit table we need to do some additional work because each
        # audit record refers to a potentially different table from which it
        # is deleting a record
        if dump.table != 'eaudit':
            self.queue_changes(dump.read(), dump.table)
        else:
            # wrap the dump stream in a filter to only allow through records
            # we want to process
            filtered_dump = filter(
                partial(is_valid_eaudit_record, tables=store_names),
                dump.read(),
            )
            # queue the changes to each table's database in turn
            for table, records in groupby(
                filtered_dump, key=lambda record: record.data['AudTable']
            ):
                # convert the raw audit records into delete records as we
                # queue them
                self.queue_changes(map(convert_eaudit_to_delete, records), table)

    self.emu_status.update(next_day_dump_set.date)
    return next_day_dump_set.date

queue_gbif_changes()

Retrieve the latest GBIF records, check which ones have changed compared to the ones stored in the gbif data DB, and then queue them into the GBIF view.

Source code in dataimporter/importer.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def queue_gbif_changes(self):
    """
    Retrieve the latest GBIF records, check which ones have changed compared to the
    ones stored in the gbif data DB, and then queue them into the GBIF view.
    """
    self.queue_changes(
        get_changed_records(
            self.get_store('gbif'),
            self.config.gbif_username,
            self.config.gbif_password,
        ),
        'gbif',
    )

redact_records(store_name, record_ids, redaction_id)

Deletes the given record IDs from the named DataDB. This doesn't delete any data from the views, MongoDB, or Elasticsearch. This will need to be done manually.

This operation should only be performed in extreme circumstances where we need to remove data that should never have been released and should not be searchable at all, even in historic searches. This is because it breaks the core idea of the versioned Splitgill system.

Parameters:

Name Type Description Default
store_name str

the name of the store to remove the records from

required
record_ids List[str]

a list of str record IDs to redact

required
redaction_id str

an ID for the redaction, so it's traceable

required

Returns:

Type Description
int

the number of records deleted

Source code in dataimporter/importer.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def redact_records(
    self, store_name: str, record_ids: List[str], redaction_id: str
) -> int:
    """
    Deletes the given record IDs from the named DataDB. This doesn't delete any data
    from the views, MongoDB, or Elasticsearch. This will need to be done manually.

    This operation should only be performed in extreme circumstances where we need
    to remove data that should never have been released and should not be searchable
    at all, even in historic searches. This is because it breaks the core idea of
    the versioned Splitgill system.

    :param store_name: the name of the store to remove the records from
    :param record_ids: a list of str record IDs to redact
    :param redaction_id: an ID for the redaction, so it's traceable
    :return: the number of records deleted
    """
    store = self.get_store(store_name)
    return store.redact(record_ids, redaction_id)

release_records(up_to)

Releases embargoed records from each store up to the given up_to value. Released records queued to the views impacted.

Parameters:

Name Type Description Default
up_to int

records with an embargo before this timestamp will be released

required
Source code in dataimporter/importer.py
307
308
309
310
311
312
313
314
315
def release_records(self, up_to: int):
    """
    Releases embargoed records from each store up to the given up_to value. Released
    records queued to the views impacted.

    :param up_to: records with an embargo before this timestamp will be released
    """
    for store in self.stores:
        self.queue_changes(store.release_records(up_to), store.name)

sync_to_elasticsearch(view_name, resync=False)

Synchronise the given Splitgill database with Elasticsearch.

Parameters:

Name Type Description Default
view_name str

the name of the view the Splitgill database will use

required
resync bool

whether to resync all records to Elasticsearch even if they haven't changed

False
Source code in dataimporter/importer.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def sync_to_elasticsearch(self, view_name: str, resync: bool = False):
    """
    Synchronise the given Splitgill database with Elasticsearch.

    :param view_name: the name of the view the Splitgill database will use
    :param resync: whether to resync all records to Elasticsearch even if they
        haven't changed
    """
    view = self.get_view(view_name)
    database = self.get_database(view)
    bulk_options = BulkOptions(
        chunk_size=self.config.bo_chunk_size,
        worker_count=self.config.bo_worker_count,
    )
    database.sync(resync=resync, bulk_options=bulk_options)

EMuStatus

Class controlling the EMu dump status.

Source code in dataimporter/importer.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class EMuStatus:
    """
    Class controlling the EMu dump status.
    """

    def __init__(self, path: Path):
        """
        :param path: the file path to store the last date fully imported from EMu
        """
        self.path = path

    def get(self) -> date:
        """
        Get the last date fully imported from EMu. If no status file is found, return
        the constant FIRST_VERSION value.

        :return: a date
        """
        if not self.path.exists():
            return FIRST_VERSION

        date_as_str = self.path.read_text(encoding='utf-8').strip()
        return datetime.strptime(date_as_str, '%d-%m-%Y').date()

    def update(self, last_queued: date):
        """
        Update the last date status value with the given date.

        :param last_queued: the date to write
        """
        date_as_str = last_queued.strftime('%d-%m-%Y')
        self.path.write_text(date_as_str, encoding='utf-8')

    def clear(self):
        """
        Clear the last date status by deleting the status file.
        """
        self.path.unlink(missing_ok=True)

__init__(path)

Parameters:

Name Type Description Default
path Path

the file path to store the last date fully imported from EMu

required
Source code in dataimporter/importer.py
438
439
440
441
442
def __init__(self, path: Path):
    """
    :param path: the file path to store the last date fully imported from EMu
    """
    self.path = path

clear()

Clear the last date status by deleting the status file.

Source code in dataimporter/importer.py
466
467
468
469
470
def clear(self):
    """
    Clear the last date status by deleting the status file.
    """
    self.path.unlink(missing_ok=True)

get()

Get the last date fully imported from EMu. If no status file is found, return the constant FIRST_VERSION value.

Returns:

Type Description
date

a date

Source code in dataimporter/importer.py
444
445
446
447
448
449
450
451
452
453
454
455
def get(self) -> date:
    """
    Get the last date fully imported from EMu. If no status file is found, return
    the constant FIRST_VERSION value.

    :return: a date
    """
    if not self.path.exists():
        return FIRST_VERSION

    date_as_str = self.path.read_text(encoding='utf-8').strip()
    return datetime.strptime(date_as_str, '%d-%m-%Y').date()

update(last_queued)

Update the last date status value with the given date.

Parameters:

Name Type Description Default
last_queued date

the date to write

required
Source code in dataimporter/importer.py
457
458
459
460
461
462
463
464
def update(self, last_queued: date):
    """
    Update the last date status value with the given date.

    :param last_queued: the date to write
    """
    date_as_str = last_queued.strftime('%d-%m-%Y')
    self.path.write_text(date_as_str, encoding='utf-8')

use_importer(config)

Creates a new DataImporter instance and yields it. Only one instance of the DataImporter class can operate on a given data directory at one time and this function will raise an ImporterAlreadyRunning exception if that is detected.

Parameters:

Name Type Description Default
config Config

the config

required
Source code in dataimporter/importer.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@contextmanager
def use_importer(config: Config) -> Generator['DataImporter', Any, None]:
    """
    Creates a new DataImporter instance and yields it. Only one instance of the
    DataImporter class can operate on a given data directory at one time and this
    function will raise an ImporterAlreadyRunning exception if that is detected.

    :param config: the config
    """
    config.data_path.mkdir(exist_ok=True)
    lock = FileLock(config.lock_file)
    try:
        # don't wait for the other instance to finish if there is one, just fail asap
        with lock.acquire(blocking=False):
            with DataImporter(config) as importer:
                yield importer
    except Timeout:
        raise ImporterAlreadyRunning()