Skip to content

Dbs

ChangeQueue

A database that acts as a queue of IDs that have changed.

Source code in dataimporter/lib/dbs.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
class ChangeQueue:
    """
    A database that acts as a queue of IDs that have changed.
    """

    def __init__(self, path: Path):
        self.db = DB(path)

    def size(self) -> int:
        return self.db.size()

    def clear(self):
        self.db.clear()

    def close(self):
        self.db.close()

    def put_many(self, records: List[SourceRecord]):
        """
        Update the queue with the given records. The IDs are added in a transaction.

        :param records: the records that have changed
        """
        self.put_many_ids([record.id for record in records])

    def put_many_ids(self, record_ids: List[str]):
        """
        Update the queue with the given record IDs. The IDs are added in a transaction.

        :param record_ids: the record IDs that have changed
        """
        self.db.put((record_id.encode('utf-8'), b'') for record_id in record_ids)

    def iter(self) -> Iterable[str]:
        """
        Yields the IDs one by one.

        :return: yields the changed int IDs from the queue
        """
        yield from (key.decode('utf-8') for key in self.db.keys())

    def is_queued(self, record_id: str) -> bool:
        """
        Determines whether the record ID provided has already been queued.

        :param record_id: the record ID to check
        :return: True if the record ID is already in the queue, False if not
        """
        return record_id.encode('utf-8') in self.db

is_queued(record_id)

Determines whether the record ID provided has already been queued.

Parameters:

Name Type Description Default
record_id str

the record ID to check

required

Returns:

Type Description
bool

True if the record ID is already in the queue, False if not

Source code in dataimporter/lib/dbs.py
699
700
701
702
703
704
705
706
def is_queued(self, record_id: str) -> bool:
    """
    Determines whether the record ID provided has already been queued.

    :param record_id: the record ID to check
    :return: True if the record ID is already in the queue, False if not
    """
    return record_id.encode('utf-8') in self.db

iter()

Yields the IDs one by one.

Returns:

Type Description
Iterable[str]

yields the changed int IDs from the queue

Source code in dataimporter/lib/dbs.py
691
692
693
694
695
696
697
def iter(self) -> Iterable[str]:
    """
    Yields the IDs one by one.

    :return: yields the changed int IDs from the queue
    """
    yield from (key.decode('utf-8') for key in self.db.keys())

put_many(records)

Update the queue with the given records. The IDs are added in a transaction.

Parameters:

Name Type Description Default
records List[SourceRecord]

the records that have changed

required
Source code in dataimporter/lib/dbs.py
675
676
677
678
679
680
681
def put_many(self, records: List[SourceRecord]):
    """
    Update the queue with the given records. The IDs are added in a transaction.

    :param records: the records that have changed
    """
    self.put_many_ids([record.id for record in records])

put_many_ids(record_ids)

Update the queue with the given record IDs. The IDs are added in a transaction.

Parameters:

Name Type Description Default
record_ids List[str]

the record IDs that have changed

required
Source code in dataimporter/lib/dbs.py
683
684
685
686
687
688
689
def put_many_ids(self, record_ids: List[str]):
    """
    Update the queue with the given record IDs. The IDs are added in a transaction.

    :param record_ids: the record IDs that have changed
    """
    self.db.put((record_id.encode('utf-8'), b'') for record_id in record_ids)

DB

A class wrapping the plyvel database object with some useful functionality.

Source code in dataimporter/lib/dbs.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class DB:
    """
    A class wrapping the plyvel database object with some useful functionality.
    """

    def __init__(self, path: Path):
        """
        :param path: the path to the database
        """
        self.path = path
        self.path.mkdir(parents=True, exist_ok=True)
        self.db = plyvel.DB(str(path), create_if_missing=True)

    def get(self, key: bytes) -> Optional[bytes]:
        """
        Returns the value associated with this key.

        :param key: the key
        :return: the value, or None if the key isn't present in this DB
        """
        return self.db.get(key)

    def close(self):
        self.db.close()

    @property
    def name(self) -> str:
        return self.path.name

    def put(self, keys_and_values: Iterable[Tuple[bytes, bytes]]):
        """
        Writes the given keys and values to the database. Both keys and values must be
        bytes. This method intentionally takes an iterable so be careful with length!

        :param keys_and_values: an iterable of 2-tuples representing the keys and values
        """
        with self.get_writer() as wb:
            for key, value in keys_and_values:
                wb.put(key, value)

    def purge(self, keys: Iterable[bytes]):
        """
        Deletes the given keys from the database. Note this is a hard delete and the
        keys are completed removed from the database. The keys must be bytes. This
        method intentionally takes an iterable so be careful with length!

        :param keys: an iterable of bytes
        """
        with self.get_writer() as wb:
            for key in keys:
                wb.delete(key)

    @contextmanager
    def get_writer(self):
        """
        Creates a write batch on the underlying plyvel database and yields it.

        The write batch is always opened with transaction=True.
        """
        with self.db.write_batch(transaction=True) as wb:
            yield wb

    def __contains__(self, key: bytes) -> bool:
        """
        Checks if the given bytes key is in this plyvel database. If it is returns True,
        if not returns False.

        :param key: they bytes key to lookup
        :return: True if the database contains the key, False if not
        """
        return self.db.get(key) is not None

    def keys(self, **iterator_kwargs) -> Iterable[bytes]:
        """
        Yields the keys from the database. By default, this is in lowest to highest
        order.

        :param iterator_kwargs: any additional plyvel iterator kwargs
        :return: yields bytes keys
        """
        yield from self.db.iterator(
            include_key=True, include_value=False, **iterator_kwargs
        )

    def values(self, **iterator_kwargs) -> Iterable[bytes]:
        """
        Yields the values from the database. By default, this is in lowest to highest
        order by key.

        :param iterator_kwargs: any additional plyvel iterator kwargs
        :return: yields bytes values
        """
        yield from self.db.iterator(
            include_key=False, include_value=True, **iterator_kwargs
        )

    def items(self, **iterator_kwargs) -> Iterable[Tuple[bytes, bytes]]:
        """
        Yields the key, value pairs from the database. By default, this is in lowest to
        highest order by key.

        :param iterator_kwargs: any additional plyvel iterator kwargs
        :return: yields 2-tuples of bytes key and bytes value
        """
        yield from self.db.iterator(
            include_key=True, include_value=True, **iterator_kwargs
        )

    def size(self) -> int:
        """
        Returns the count of the number of keys in the database. This is achieved by
        iterating over the keys and counting them, one by one.

        :return: the number of keys in the database
        """
        return sum(1 for _ in self.keys())

    def clear(self):
        """
        Clear the database of all data.

        For speed, this is achieved by deleting the root database directory and then
        recreating it.
        """
        self.close()
        shutil.rmtree(self.path)
        self.path.mkdir(parents=True, exist_ok=True)
        self.db = plyvel.DB(str(self.path), create_if_missing=True)

    def is_empty(self) -> bool:
        """
        Returns whether the database is empty.

        :return: True if the database has no data in it, False if not.
        """
        a_key = next(iter(self.keys()), None)
        return True if a_key is None else False

__contains__(key)

Checks if the given bytes key is in this plyvel database. If it is returns True, if not returns False.

Parameters:

Name Type Description Default
key bytes

they bytes key to lookup

required

Returns:

Type Description
bool

True if the database contains the key, False if not

Source code in dataimporter/lib/dbs.py
76
77
78
79
80
81
82
83
84
def __contains__(self, key: bytes) -> bool:
    """
    Checks if the given bytes key is in this plyvel database. If it is returns True,
    if not returns False.

    :param key: they bytes key to lookup
    :return: True if the database contains the key, False if not
    """
    return self.db.get(key) is not None

__init__(path)

Parameters:

Name Type Description Default
path Path

the path to the database

required
Source code in dataimporter/lib/dbs.py
19
20
21
22
23
24
25
def __init__(self, path: Path):
    """
    :param path: the path to the database
    """
    self.path = path
    self.path.mkdir(parents=True, exist_ok=True)
    self.db = plyvel.DB(str(path), create_if_missing=True)

clear()

Clear the database of all data.

For speed, this is achieved by deleting the root database directory and then recreating it.

Source code in dataimporter/lib/dbs.py
131
132
133
134
135
136
137
138
139
140
141
def clear(self):
    """
    Clear the database of all data.

    For speed, this is achieved by deleting the root database directory and then
    recreating it.
    """
    self.close()
    shutil.rmtree(self.path)
    self.path.mkdir(parents=True, exist_ok=True)
    self.db = plyvel.DB(str(self.path), create_if_missing=True)

get(key)

Returns the value associated with this key.

Parameters:

Name Type Description Default
key bytes

the key

required

Returns:

Type Description
Optional[bytes]

the value, or None if the key isn't present in this DB

Source code in dataimporter/lib/dbs.py
27
28
29
30
31
32
33
34
def get(self, key: bytes) -> Optional[bytes]:
    """
    Returns the value associated with this key.

    :param key: the key
    :return: the value, or None if the key isn't present in this DB
    """
    return self.db.get(key)

get_writer()

Creates a write batch on the underlying plyvel database and yields it.

The write batch is always opened with transaction=True.

Source code in dataimporter/lib/dbs.py
66
67
68
69
70
71
72
73
74
@contextmanager
def get_writer(self):
    """
    Creates a write batch on the underlying plyvel database and yields it.

    The write batch is always opened with transaction=True.
    """
    with self.db.write_batch(transaction=True) as wb:
        yield wb

is_empty()

Returns whether the database is empty.

Returns:

Type Description
bool

True if the database has no data in it, False if not.

Source code in dataimporter/lib/dbs.py
143
144
145
146
147
148
149
150
def is_empty(self) -> bool:
    """
    Returns whether the database is empty.

    :return: True if the database has no data in it, False if not.
    """
    a_key = next(iter(self.keys()), None)
    return True if a_key is None else False

items(**iterator_kwargs)

Yields the key, value pairs from the database. By default, this is in lowest to highest order by key.

Parameters:

Name Type Description Default
iterator_kwargs

any additional plyvel iterator kwargs

{}

Returns:

Type Description
Iterable[Tuple[bytes, bytes]]

yields 2-tuples of bytes key and bytes value

Source code in dataimporter/lib/dbs.py
110
111
112
113
114
115
116
117
118
119
120
def items(self, **iterator_kwargs) -> Iterable[Tuple[bytes, bytes]]:
    """
    Yields the key, value pairs from the database. By default, this is in lowest to
    highest order by key.

    :param iterator_kwargs: any additional plyvel iterator kwargs
    :return: yields 2-tuples of bytes key and bytes value
    """
    yield from self.db.iterator(
        include_key=True, include_value=True, **iterator_kwargs
    )

keys(**iterator_kwargs)

Yields the keys from the database. By default, this is in lowest to highest order.

Parameters:

Name Type Description Default
iterator_kwargs

any additional plyvel iterator kwargs

{}

Returns:

Type Description
Iterable[bytes]

yields bytes keys

Source code in dataimporter/lib/dbs.py
86
87
88
89
90
91
92
93
94
95
96
def keys(self, **iterator_kwargs) -> Iterable[bytes]:
    """
    Yields the keys from the database. By default, this is in lowest to highest
    order.

    :param iterator_kwargs: any additional plyvel iterator kwargs
    :return: yields bytes keys
    """
    yield from self.db.iterator(
        include_key=True, include_value=False, **iterator_kwargs
    )

purge(keys)

Deletes the given keys from the database. Note this is a hard delete and the keys are completed removed from the database. The keys must be bytes. This method intentionally takes an iterable so be careful with length!

Parameters:

Name Type Description Default
keys Iterable[bytes]

an iterable of bytes

required
Source code in dataimporter/lib/dbs.py
54
55
56
57
58
59
60
61
62
63
64
def purge(self, keys: Iterable[bytes]):
    """
    Deletes the given keys from the database. Note this is a hard delete and the
    keys are completed removed from the database. The keys must be bytes. This
    method intentionally takes an iterable so be careful with length!

    :param keys: an iterable of bytes
    """
    with self.get_writer() as wb:
        for key in keys:
            wb.delete(key)

put(keys_and_values)

Writes the given keys and values to the database. Both keys and values must be bytes. This method intentionally takes an iterable so be careful with length!

Parameters:

Name Type Description Default
keys_and_values Iterable[Tuple[bytes, bytes]]

an iterable of 2-tuples representing the keys and values

required
Source code in dataimporter/lib/dbs.py
43
44
45
46
47
48
49
50
51
52
def put(self, keys_and_values: Iterable[Tuple[bytes, bytes]]):
    """
    Writes the given keys and values to the database. Both keys and values must be
    bytes. This method intentionally takes an iterable so be careful with length!

    :param keys_and_values: an iterable of 2-tuples representing the keys and values
    """
    with self.get_writer() as wb:
        for key, value in keys_and_values:
            wb.put(key, value)

size()

Returns the count of the number of keys in the database. This is achieved by iterating over the keys and counting them, one by one.

Returns:

Type Description
int

the number of keys in the database

Source code in dataimporter/lib/dbs.py
122
123
124
125
126
127
128
129
def size(self) -> int:
    """
    Returns the count of the number of keys in the database. This is achieved by
    iterating over the keys and counting them, one by one.

    :return: the number of keys in the database
    """
    return sum(1 for _ in self.keys())

values(**iterator_kwargs)

Yields the values from the database. By default, this is in lowest to highest order by key.

Parameters:

Name Type Description Default
iterator_kwargs

any additional plyvel iterator kwargs

{}

Returns:

Type Description
Iterable[bytes]

yields bytes values

Source code in dataimporter/lib/dbs.py
 98
 99
100
101
102
103
104
105
106
107
108
def values(self, **iterator_kwargs) -> Iterable[bytes]:
    """
    Yields the values from the database. By default, this is in lowest to highest
    order by key.

    :param iterator_kwargs: any additional plyvel iterator kwargs
    :return: yields bytes values
    """
    yield from self.db.iterator(
        include_key=False, include_value=True, **iterator_kwargs
    )

Index

Class representing an index of field values -> record IDs, allowing quick access to the IDs of the records that contain a given field value.

Source code in dataimporter/lib/dbs.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
class Index:
    """
    Class representing an index of field values -> record IDs, allowing quick access to
    the IDs of the records that contain a given field value.
    """

    def __init__(self, path: Path, field: str):
        """
        :param path: the path to store the index at
        :param field: the field to index
        """
        self.field = field
        self.db = DB(path)

    def clear(self):
        self.db.clear()

    def close(self):
        self.db.close()

    def update(self, updates: List[SourceRecord], deletes: List[SourceRecord]):
        """
        Update the index with the given updated and deleted records.

        :param updates:
        :param deletes:
        :return:
        """
        # write any updates
        if updates:
            with self.db.get_writer() as wb:
                for record in updates:
                    for value in record.iter_all_values(self.field, clean=True):
                        wb.put(f'{value}{SPLITTER}{record.id}'.encode('utf-8'), b'')

        # todo: find a way to make this faster, maybe in memory cache of the reverse
        #       lookups? Or just keep another DB for the reverse lookup?
        if deletes:
            # because we optimise for lookups with the foreign values, this is a bit
            # slow as we have to do a full scan of the whole keyspace
            keys_to_delete = {record.id.encode('utf-8') for record in deletes}
            self.db.purge(
                key
                for key in self.db.keys()
                if key.rsplit(SPLITTER_BYTES)[1] in keys_to_delete
            )

    def populate(self, records: Iterable[SourceRecord]):
        for batch in partition(records, 1000):
            updates = [record for record in batch if self.field in record]
            deletes = [record for record in batch if record.is_deleted]
            self.update(updates, deletes)

    def lookup(self, values: Iterable[str]) -> Iterable[str]:
        for value in values:
            prefix = f'{value}{SPLITTER}'.encode('utf-8')
            prefix_length = len(prefix)
            for raw_key in self.db.keys(prefix=prefix):
                yield raw_key[prefix_length:].decode('utf-8')

__init__(path, field)

Parameters:

Name Type Description Default
path Path

the path to store the index at

required
field str

the field to index

required
Source code in dataimporter/lib/dbs.py
603
604
605
606
607
608
609
def __init__(self, path: Path, field: str):
    """
    :param path: the path to store the index at
    :param field: the field to index
    """
    self.field = field
    self.db = DB(path)

update(updates, deletes)

Update the index with the given updated and deleted records.

Parameters:

Name Type Description Default
updates List[SourceRecord]
required
deletes List[SourceRecord]
required

Returns:

Type Description
Source code in dataimporter/lib/dbs.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def update(self, updates: List[SourceRecord], deletes: List[SourceRecord]):
    """
    Update the index with the given updated and deleted records.

    :param updates:
    :param deletes:
    :return:
    """
    # write any updates
    if updates:
        with self.db.get_writer() as wb:
            for record in updates:
                for value in record.iter_all_values(self.field, clean=True):
                    wb.put(f'{value}{SPLITTER}{record.id}'.encode('utf-8'), b'')

    # todo: find a way to make this faster, maybe in memory cache of the reverse
    #       lookups? Or just keep another DB for the reverse lookup?
    if deletes:
        # because we optimise for lookups with the foreign values, this is a bit
        # slow as we have to do a full scan of the whole keyspace
        keys_to_delete = {record.id.encode('utf-8') for record in deletes}
        self.db.purge(
            key
            for key in self.db.keys()
            if key.rsplit(SPLITTER_BYTES)[1] in keys_to_delete
        )

PutResult dataclass

A class to hold the result of a put call on a Store.

Source code in dataimporter/lib/dbs.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@dataclass
class PutResult:
    """
    A class to hold the result of a put call on a Store.
    """

    # the number of records that were inserted or updated
    upserted: int
    # the number of records that were deletes (i.e. had no data)
    deleted: int
    # the number of records that were embargoed and therefore added to the embargo DB
    embargoed: int
    # the number of records that were ignored because they were previously redacted
    redacted: int

Store

Class representing a store of data.

This includes record data, embargo data, and indexes.

Source code in dataimporter/lib/dbs.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
class Store:
    """
    Class representing a store of data.

    This includes record data, embargo data, and indexes.
    """

    def __init__(self, path: Path):
        self.root = path
        self.data = DB(self.root / 'data')
        self.embargoes = DB(self.root / 'embargoes')
        self.redactions = DB(self.root / 'redactions')
        self.index_root = self.root / 'indexes'
        self.indexes: Dict[str, Index] = {}
        self._redaction_cache = self.get_all_redacted_ids()

    @property
    def name(self) -> str:
        return self.root.name

    def get_record(
        self, record_id: str, return_deleted: bool = False
    ) -> Optional[SourceRecord]:
        """
        Retrieves the record from the store with the given ID.

        If the record is not found, None is returned. If the record has been deleted it
        is not returned and None is returned instead, unless return_deleted is True.

        :param record_id: the record's ID
        :param return_deleted: whether to return deleted records or None instead
        :return: a SourceRecord object with the given ID or None
        """
        packed_record_data = self.data.get(record_id.encode('utf-8'))
        if packed_record_data is None:
            return None
        unpacker = make_unpacker()
        unpacker.feed(packed_record_data)
        record = SourceRecord(*next(unpacker))
        if not return_deleted and record.is_deleted:
            return None
        return record

    def get_records(
        self, record_ids: Iterable[str], yield_deleted: bool = False
    ) -> Iterable[SourceRecord]:
        """
        Given an iterable of record IDs, yield the records from the database with those
        IDs in the order they are requested.

        If a record isn't found, then it is skipped, and we move on to the next ID
        without yielding anything. If a record has been deleted it is not yielded unless
        yield_deleted is set to True.

        :param record_ids: the record IDs
        :param yield_deleted: whether to yield deleted records or not
        :return: yields SourceRecord objects
        """
        data = (self.data.get(record_id.encode('utf-8')) for record_id in record_ids)
        yield from iter_source_records(filter(None, data), yield_deleted)

    def iter(self, yield_deleted: bool = False) -> Iterable[SourceRecord]:
        """
        Yields all records in this database, in ascending ID order.

        If a record has been deleted it is not yielded unless yield_deleted is set to
        True.

        :param yield_deleted: whether to yield deleted records or not
        :return: yields SourceRecord objects
        """
        yield from iter_source_records(self.data.values(), yield_deleted)

    def has(self, record_id: str, allow_deleted: bool = False) -> bool:
        """
        Checks whether the given record ID is in the store. If the record exists but has
        been deleted, False is returned, unless allow_deleted is set to True.

        :param record_id: the record ID
        :param allow_deleted: whether to treat deleted records as extant or not
        :return: whether record is in the store or not
        """
        return self.get_record(record_id, return_deleted=allow_deleted) is not None

    def put(self, records: List[SourceRecord]) -> PutResult:
        """
        Given a list of records, add them to the database. This is essentially an
        upsert, new records are inserted, existing records are just overridden, deleted
        records are deleted.

        The records are added in one transactional batch.

        :param records: a list of records
        :return: a PutResult object
        """
        deleted = 0
        upserted = 0
        embargoed = 0
        redacted = 0
        embargo_threshold = now()
        # cache the pack method as we're going to be using it a lot
        pack = msgpack.Packer().pack

        # keep track of deleted records and which indexes have updates
        index_deletes = []
        field_updates = {field: [] for field in self.indexes.keys()}

        with ExitStack() as stack:
            dwb = stack.enter_context(self.data.get_writer())
            ewb = stack.enter_context(self.embargoes.get_writer())

            for record in records:
                # ensure the record ID is legal
                if SPLITTER in record.id:
                    raise InvalidRecordID(record)

                # ignore redacted records
                if record.id in self._redaction_cache:
                    redacted += 1
                    continue

                record_id = record.id.encode('utf-8')
                data = pack((record.id, record.data, record.source))

                if record.is_deleted:
                    if self.has(record.id):
                        deleted += 1
                        dwb.put(record_id, data)
                        ewb.delete(record_id)
                        index_deletes.append(record)
                    continue

                embargo = record.get_embargo()
                if embargo is not None and embargo > embargo_threshold:
                    embargoed += 1
                    # add the record to the embargo DB
                    ewb.put(record_id, data)
                    # put a deleted version of the record into data DB
                    dwb.put(
                        record_id,
                        pack((record.id, {}, f'{record.source} [embargoed]')),
                    )
                    # indicate to the indexes that this record has been deleted
                    index_deletes.append(record)
                    continue

                upserted += 1
                dwb.put(record_id, data)
                # keep track of the records that may need their index values updated
                for field, index_updates in field_updates.items():
                    # the index will actually check for non-empty values and only
                    # index those, but this is a quick dirty check just to get the
                    # record queued up for the index to then look at properly later
                    if field in record:
                        index_updates.append(record)

        # update the indexes as needed
        for field, index_updates in field_updates.items():
            if index_updates or index_deletes:
                self.indexes[field].update(index_updates, index_deletes)

        return PutResult(upserted, deleted, embargoed, redacted)

    def delete(self, record_ids: Iterable[str], source: str) -> int:
        """
        Deletes all the record data associated with the given record IDs by setting
        their data to the empty dict. This is a soft delete, for a hard delete, use the
        purge method.

        If a record is embargoed, this will also be deleted.

        :param record_ids: an iterable of record ids to delete
        :param source: source value to store with the deleted records
        :return: the number of IDs that were deleted (i.e. existed and were removed)
        """
        deleted = 0

        for batch in partition(record_ids, 1000):
            result = self.put(
                [SourceRecord(record_id, {}, source) for record_id in batch]
            )
            deleted += result.deleted

        return deleted

    def purge(self, record_ids: Iterable[str]) -> int:
        """
        Purges all the record data associated with the given record IDs. This is a hard
        delete, for a soft delete, use the delete method.

        Extra warning: this is NOT the same as deleting a SourceRecord by setting its
        data property to an empty dict, this is a hard delete where the data is removed.
        It should only be used for redactions in extreme cases.

        :param record_ids: an iterable of record ids to delete
        :return: the number of IDs that were deleted (i.e. existed and were removed)
        """
        purged = 0

        # create an iterable which converts each record ID into bytes
        raw_ids = (record_id.encode('utf-8') for record_id in record_ids)
        for batch in partition(raw_ids, 1000):
            # purge from the data DB and the embargo DB
            for db in (self.data, self.embargoes):
                to_delete = [raw_id for raw_id in batch if raw_id in db]
                purged += len(to_delete)
                db.purge(to_delete)

        return purged

    def redact(self, record_ids: Iterable[str], redaction_id: str) -> int:
        """
        Add all the given record IDs to the redaction list for this store. The redaction
        ID is stored for later retrieval if needed. This isn't expected to be an ID that
        is represented in this system but perhaps can be looked up in a different system
        to understand why the record has been redacted.

        :param record_ids: an iterable of record IDs to redact
        :param redaction_id: an identifier for why the records have been redacted
        :return: the number of IDs that were deleted
        """
        purged = self.purge(record_ids)
        with self.redactions.get_writer() as wb:
            for record_id in record_ids:
                wb.put(record_id.encode('utf-8'), redaction_id.encode('utf-8'))
        # update the redaction cache
        self._redaction_cache = self.get_all_redacted_ids()
        return purged

    def is_redacted(self, record_id: str) -> bool:
        """
        Checks if the given record is redacted.

        :param record_id: the record ID
        :return: True if the record is redacted, False if not
        """
        return self.get_redaction_id(record_id) is not None

    def get_redaction_id(self, record_id: str) -> Optional[str]:
        """
        Retrieves the redaction ID associated with the given record ID. If the record is
        not redacted, returns None.

        :param record_id: the record ID
        :return: None if the record is not redacted, otherwise, returns the redaction ID
        """
        redaction_id = self.redactions.get(record_id.encode('utf-8'))
        if redaction_id is None:
            return None
        return redaction_id.decode('utf-8')

    def get_all_redacted_ids(self) -> Dict[str, str]:
        """
        Return a dict of redacted record IDs along with the redaction ID for each.

        :return: a dict of record IDs -> retraction IDs
        """
        return {
            key.decode('utf-8'): value.decode('utf-8')
            for key, value in self.redactions.items()
        }

    def create_index(self, field: str) -> 'Index':
        """
        Creates a new index on the given field. If no existing data is found and there
        is data in this database, the index will also be populated and therefore this
        may take a bit of time.

        :param field: the field to create the index on
        :return: an Index object representing the index
        """
        if field in self.indexes:
            return self.indexes[field]

        path = self.index_root / field
        # check this before making the index
        already_exists = path.exists()
        index = Index(path, field)

        # if the index didn't exist before we just created it, populate it
        if not already_exists:
            # no need to iterate through deleted records given the index is new
            index.populate(self.iter(yield_deleted=False))

        self.indexes[field] = index
        return index

    def populate_index(self, field: str, clear_first: bool = False):
        """
        Populate the index on the given field. If clear_first is True, empties the index
        first.

        :param field: the indexed field
        :param clear_first: whether to delete the index's data first
        """
        index = self.indexes[field]
        if clear_first:
            index.clear()
        # if the index has been cleared out, no need to iter the deleted records
        index.populate(self.iter(yield_deleted=not clear_first))

    def release_records(self, up_to: int) -> Iterable[SourceRecord]:
        """
        Move records from the embargo DB to the data DB which should be released from
        their embargo based on the up_to parameter. Any record with an embargo date
        before this up_to timestamp will be released.

        :param up_to: the timestamp threshold to release up to
        :return: yields SourceRecord objects for each released record
        """
        released = (
            record
            for record in iter_source_records(self.embargoes.values(), False)
            if record.get_embargo() <= up_to
        )
        # deal with 1000 records at a time
        for batch in partition(released, 1000):
            self.embargoes.purge(record.id.encode('utf-8') for record in batch)
            self.put(batch)
            yield from batch

    def is_embargoed(self, record_id: str, timestamp: int) -> bool:
        """
        Checks whether the given record ID's associated embargo date is greater than the
        given timestamp. If the record doesn't have an associated embargo date or if it
        deleted, or if we have never seen this record before, then False is returned.

        :param record_id: the record ID
        :param timestamp: the timestamp to compare the record's embargo date with
        :return: True if the record is embargoed past the given date, False if not
        """
        if self.has(record_id):
            # this is an available record, can't be embargoed
            return False
        packed_record_data = self.embargoes.get(record_id.encode('utf-8'))
        if packed_record_data is None:
            # the record is also not embargoed, we just don't know anything about it
            return False
        unpacker = make_unpacker()
        unpacker.feed(packed_record_data)
        record = SourceRecord(*next(unpacker))
        return record.get_embargo() > timestamp

    def iter_embargoed_ids(self, timestamp: Optional[int] = None) -> Iterable[str]:
        """
        Yields the IDs in this Store that are embargoed.

        :param timestamp: the threshold to check the embargoes against. If None, all
            record IDs currently in the embargo database are yielded
        :return: yields record IDs
        """
        if timestamp is None:
            yield from (raw_id.decode('utf-8') for raw_id in self.embargoes.keys())
        else:
            for raw_id in self.embargoes.keys():
                record_id = raw_id.decode('utf-8')
                if self.is_embargoed(record_id, timestamp):
                    yield record_id

    def size(self, include_deletes=False) -> int:
        """
        Returns the number of records present in this store.

        :param include_deletes: whether to count deleted records or not
        :return: the count
        """
        if include_deletes:
            return self.data.size()
        else:
            return sum(
                1 for record in self.iter(yield_deleted=True) if not record.is_deleted
            )

    def close(self):
        self.data.close()
        self.embargoes.close()
        self.redactions.close()
        for index in self.indexes.values():
            index.close()

create_index(field)

Creates a new index on the given field. If no existing data is found and there is data in this database, the index will also be populated and therefore this may take a bit of time.

Parameters:

Name Type Description Default
field str

the field to create the index on

required

Returns:

Type Description
Index

an Index object representing the index

Source code in dataimporter/lib/dbs.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def create_index(self, field: str) -> 'Index':
    """
    Creates a new index on the given field. If no existing data is found and there
    is data in this database, the index will also be populated and therefore this
    may take a bit of time.

    :param field: the field to create the index on
    :return: an Index object representing the index
    """
    if field in self.indexes:
        return self.indexes[field]

    path = self.index_root / field
    # check this before making the index
    already_exists = path.exists()
    index = Index(path, field)

    # if the index didn't exist before we just created it, populate it
    if not already_exists:
        # no need to iterate through deleted records given the index is new
        index.populate(self.iter(yield_deleted=False))

    self.indexes[field] = index
    return index

delete(record_ids, source)

Deletes all the record data associated with the given record IDs by setting their data to the empty dict. This is a soft delete, for a hard delete, use the purge method.

If a record is embargoed, this will also be deleted.

Parameters:

Name Type Description Default
record_ids Iterable[str]

an iterable of record ids to delete

required
source str

source value to store with the deleted records

required

Returns:

Type Description
int

the number of IDs that were deleted (i.e. existed and were removed)

Source code in dataimporter/lib/dbs.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def delete(self, record_ids: Iterable[str], source: str) -> int:
    """
    Deletes all the record data associated with the given record IDs by setting
    their data to the empty dict. This is a soft delete, for a hard delete, use the
    purge method.

    If a record is embargoed, this will also be deleted.

    :param record_ids: an iterable of record ids to delete
    :param source: source value to store with the deleted records
    :return: the number of IDs that were deleted (i.e. existed and were removed)
    """
    deleted = 0

    for batch in partition(record_ids, 1000):
        result = self.put(
            [SourceRecord(record_id, {}, source) for record_id in batch]
        )
        deleted += result.deleted

    return deleted

get_all_redacted_ids()

Return a dict of redacted record IDs along with the redaction ID for each.

Returns:

Type Description
Dict[str, str]

a dict of record IDs -> retraction IDs

Source code in dataimporter/lib/dbs.py
466
467
468
469
470
471
472
473
474
475
def get_all_redacted_ids(self) -> Dict[str, str]:
    """
    Return a dict of redacted record IDs along with the redaction ID for each.

    :return: a dict of record IDs -> retraction IDs
    """
    return {
        key.decode('utf-8'): value.decode('utf-8')
        for key, value in self.redactions.items()
    }

get_record(record_id, return_deleted=False)

Retrieves the record from the store with the given ID.

If the record is not found, None is returned. If the record has been deleted it is not returned and None is returned instead, unless return_deleted is True.

Parameters:

Name Type Description Default
record_id str

the record's ID

required
return_deleted bool

whether to return deleted records or None instead

False

Returns:

Type Description
Optional[SourceRecord]

a SourceRecord object with the given ID or None

Source code in dataimporter/lib/dbs.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def get_record(
    self, record_id: str, return_deleted: bool = False
) -> Optional[SourceRecord]:
    """
    Retrieves the record from the store with the given ID.

    If the record is not found, None is returned. If the record has been deleted it
    is not returned and None is returned instead, unless return_deleted is True.

    :param record_id: the record's ID
    :param return_deleted: whether to return deleted records or None instead
    :return: a SourceRecord object with the given ID or None
    """
    packed_record_data = self.data.get(record_id.encode('utf-8'))
    if packed_record_data is None:
        return None
    unpacker = make_unpacker()
    unpacker.feed(packed_record_data)
    record = SourceRecord(*next(unpacker))
    if not return_deleted and record.is_deleted:
        return None
    return record

get_records(record_ids, yield_deleted=False)

Given an iterable of record IDs, yield the records from the database with those IDs in the order they are requested.

If a record isn't found, then it is skipped, and we move on to the next ID without yielding anything. If a record has been deleted it is not yielded unless yield_deleted is set to True.

Parameters:

Name Type Description Default
record_ids Iterable[str]

the record IDs

required
yield_deleted bool

whether to yield deleted records or not

False

Returns:

Type Description
Iterable[SourceRecord]

yields SourceRecord objects

Source code in dataimporter/lib/dbs.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def get_records(
    self, record_ids: Iterable[str], yield_deleted: bool = False
) -> Iterable[SourceRecord]:
    """
    Given an iterable of record IDs, yield the records from the database with those
    IDs in the order they are requested.

    If a record isn't found, then it is skipped, and we move on to the next ID
    without yielding anything. If a record has been deleted it is not yielded unless
    yield_deleted is set to True.

    :param record_ids: the record IDs
    :param yield_deleted: whether to yield deleted records or not
    :return: yields SourceRecord objects
    """
    data = (self.data.get(record_id.encode('utf-8')) for record_id in record_ids)
    yield from iter_source_records(filter(None, data), yield_deleted)

get_redaction_id(record_id)

Retrieves the redaction ID associated with the given record ID. If the record is not redacted, returns None.

Parameters:

Name Type Description Default
record_id str

the record ID

required

Returns:

Type Description
Optional[str]

None if the record is not redacted, otherwise, returns the redaction ID

Source code in dataimporter/lib/dbs.py
453
454
455
456
457
458
459
460
461
462
463
464
def get_redaction_id(self, record_id: str) -> Optional[str]:
    """
    Retrieves the redaction ID associated with the given record ID. If the record is
    not redacted, returns None.

    :param record_id: the record ID
    :return: None if the record is not redacted, otherwise, returns the redaction ID
    """
    redaction_id = self.redactions.get(record_id.encode('utf-8'))
    if redaction_id is None:
        return None
    return redaction_id.decode('utf-8')

has(record_id, allow_deleted=False)

Checks whether the given record ID is in the store. If the record exists but has been deleted, False is returned, unless allow_deleted is set to True.

Parameters:

Name Type Description Default
record_id str

the record ID

required
allow_deleted bool

whether to treat deleted records as extant or not

False

Returns:

Type Description
bool

whether record is in the store or not

Source code in dataimporter/lib/dbs.py
288
289
290
291
292
293
294
295
296
297
def has(self, record_id: str, allow_deleted: bool = False) -> bool:
    """
    Checks whether the given record ID is in the store. If the record exists but has
    been deleted, False is returned, unless allow_deleted is set to True.

    :param record_id: the record ID
    :param allow_deleted: whether to treat deleted records as extant or not
    :return: whether record is in the store or not
    """
    return self.get_record(record_id, return_deleted=allow_deleted) is not None

is_embargoed(record_id, timestamp)

Checks whether the given record ID's associated embargo date is greater than the given timestamp. If the record doesn't have an associated embargo date or if it deleted, or if we have never seen this record before, then False is returned.

Parameters:

Name Type Description Default
record_id str

the record ID

required
timestamp int

the timestamp to compare the record's embargo date with

required

Returns:

Type Description
bool

True if the record is embargoed past the given date, False if not

Source code in dataimporter/lib/dbs.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def is_embargoed(self, record_id: str, timestamp: int) -> bool:
    """
    Checks whether the given record ID's associated embargo date is greater than the
    given timestamp. If the record doesn't have an associated embargo date or if it
    deleted, or if we have never seen this record before, then False is returned.

    :param record_id: the record ID
    :param timestamp: the timestamp to compare the record's embargo date with
    :return: True if the record is embargoed past the given date, False if not
    """
    if self.has(record_id):
        # this is an available record, can't be embargoed
        return False
    packed_record_data = self.embargoes.get(record_id.encode('utf-8'))
    if packed_record_data is None:
        # the record is also not embargoed, we just don't know anything about it
        return False
    unpacker = make_unpacker()
    unpacker.feed(packed_record_data)
    record = SourceRecord(*next(unpacker))
    return record.get_embargo() > timestamp

is_redacted(record_id)

Checks if the given record is redacted.

Parameters:

Name Type Description Default
record_id str

the record ID

required

Returns:

Type Description
bool

True if the record is redacted, False if not

Source code in dataimporter/lib/dbs.py
444
445
446
447
448
449
450
451
def is_redacted(self, record_id: str) -> bool:
    """
    Checks if the given record is redacted.

    :param record_id: the record ID
    :return: True if the record is redacted, False if not
    """
    return self.get_redaction_id(record_id) is not None

iter(yield_deleted=False)

Yields all records in this database, in ascending ID order.

If a record has been deleted it is not yielded unless yield_deleted is set to True.

Parameters:

Name Type Description Default
yield_deleted bool

whether to yield deleted records or not

False

Returns:

Type Description
Iterable[SourceRecord]

yields SourceRecord objects

Source code in dataimporter/lib/dbs.py
276
277
278
279
280
281
282
283
284
285
286
def iter(self, yield_deleted: bool = False) -> Iterable[SourceRecord]:
    """
    Yields all records in this database, in ascending ID order.

    If a record has been deleted it is not yielded unless yield_deleted is set to
    True.

    :param yield_deleted: whether to yield deleted records or not
    :return: yields SourceRecord objects
    """
    yield from iter_source_records(self.data.values(), yield_deleted)

iter_embargoed_ids(timestamp=None)

Yields the IDs in this Store that are embargoed.

Parameters:

Name Type Description Default
timestamp Optional[int]

the threshold to check the embargoes against. If None, all record IDs currently in the embargo database are yielded

None

Returns:

Type Description
Iterable[str]

yields record IDs

Source code in dataimporter/lib/dbs.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def iter_embargoed_ids(self, timestamp: Optional[int] = None) -> Iterable[str]:
    """
    Yields the IDs in this Store that are embargoed.

    :param timestamp: the threshold to check the embargoes against. If None, all
        record IDs currently in the embargo database are yielded
    :return: yields record IDs
    """
    if timestamp is None:
        yield from (raw_id.decode('utf-8') for raw_id in self.embargoes.keys())
    else:
        for raw_id in self.embargoes.keys():
            record_id = raw_id.decode('utf-8')
            if self.is_embargoed(record_id, timestamp):
                yield record_id

populate_index(field, clear_first=False)

Populate the index on the given field. If clear_first is True, empties the index first.

Parameters:

Name Type Description Default
field str

the indexed field

required
clear_first bool

whether to delete the index's data first

False
Source code in dataimporter/lib/dbs.py
502
503
504
505
506
507
508
509
510
511
512
513
514
def populate_index(self, field: str, clear_first: bool = False):
    """
    Populate the index on the given field. If clear_first is True, empties the index
    first.

    :param field: the indexed field
    :param clear_first: whether to delete the index's data first
    """
    index = self.indexes[field]
    if clear_first:
        index.clear()
    # if the index has been cleared out, no need to iter the deleted records
    index.populate(self.iter(yield_deleted=not clear_first))

purge(record_ids)

Purges all the record data associated with the given record IDs. This is a hard delete, for a soft delete, use the delete method.

Extra warning: this is NOT the same as deleting a SourceRecord by setting its data property to an empty dict, this is a hard delete where the data is removed. It should only be used for redactions in extreme cases.

Parameters:

Name Type Description Default
record_ids Iterable[str]

an iterable of record ids to delete

required

Returns:

Type Description
int

the number of IDs that were deleted (i.e. existed and were removed)

Source code in dataimporter/lib/dbs.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def purge(self, record_ids: Iterable[str]) -> int:
    """
    Purges all the record data associated with the given record IDs. This is a hard
    delete, for a soft delete, use the delete method.

    Extra warning: this is NOT the same as deleting a SourceRecord by setting its
    data property to an empty dict, this is a hard delete where the data is removed.
    It should only be used for redactions in extreme cases.

    :param record_ids: an iterable of record ids to delete
    :return: the number of IDs that were deleted (i.e. existed and were removed)
    """
    purged = 0

    # create an iterable which converts each record ID into bytes
    raw_ids = (record_id.encode('utf-8') for record_id in record_ids)
    for batch in partition(raw_ids, 1000):
        # purge from the data DB and the embargo DB
        for db in (self.data, self.embargoes):
            to_delete = [raw_id for raw_id in batch if raw_id in db]
            purged += len(to_delete)
            db.purge(to_delete)

    return purged

put(records)

Given a list of records, add them to the database. This is essentially an upsert, new records are inserted, existing records are just overridden, deleted records are deleted.

The records are added in one transactional batch.

Parameters:

Name Type Description Default
records List[SourceRecord]

a list of records

required

Returns:

Type Description
PutResult

a PutResult object

Source code in dataimporter/lib/dbs.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def put(self, records: List[SourceRecord]) -> PutResult:
    """
    Given a list of records, add them to the database. This is essentially an
    upsert, new records are inserted, existing records are just overridden, deleted
    records are deleted.

    The records are added in one transactional batch.

    :param records: a list of records
    :return: a PutResult object
    """
    deleted = 0
    upserted = 0
    embargoed = 0
    redacted = 0
    embargo_threshold = now()
    # cache the pack method as we're going to be using it a lot
    pack = msgpack.Packer().pack

    # keep track of deleted records and which indexes have updates
    index_deletes = []
    field_updates = {field: [] for field in self.indexes.keys()}

    with ExitStack() as stack:
        dwb = stack.enter_context(self.data.get_writer())
        ewb = stack.enter_context(self.embargoes.get_writer())

        for record in records:
            # ensure the record ID is legal
            if SPLITTER in record.id:
                raise InvalidRecordID(record)

            # ignore redacted records
            if record.id in self._redaction_cache:
                redacted += 1
                continue

            record_id = record.id.encode('utf-8')
            data = pack((record.id, record.data, record.source))

            if record.is_deleted:
                if self.has(record.id):
                    deleted += 1
                    dwb.put(record_id, data)
                    ewb.delete(record_id)
                    index_deletes.append(record)
                continue

            embargo = record.get_embargo()
            if embargo is not None and embargo > embargo_threshold:
                embargoed += 1
                # add the record to the embargo DB
                ewb.put(record_id, data)
                # put a deleted version of the record into data DB
                dwb.put(
                    record_id,
                    pack((record.id, {}, f'{record.source} [embargoed]')),
                )
                # indicate to the indexes that this record has been deleted
                index_deletes.append(record)
                continue

            upserted += 1
            dwb.put(record_id, data)
            # keep track of the records that may need their index values updated
            for field, index_updates in field_updates.items():
                # the index will actually check for non-empty values and only
                # index those, but this is a quick dirty check just to get the
                # record queued up for the index to then look at properly later
                if field in record:
                    index_updates.append(record)

    # update the indexes as needed
    for field, index_updates in field_updates.items():
        if index_updates or index_deletes:
            self.indexes[field].update(index_updates, index_deletes)

    return PutResult(upserted, deleted, embargoed, redacted)

redact(record_ids, redaction_id)

Add all the given record IDs to the redaction list for this store. The redaction ID is stored for later retrieval if needed. This isn't expected to be an ID that is represented in this system but perhaps can be looked up in a different system to understand why the record has been redacted.

Parameters:

Name Type Description Default
record_ids Iterable[str]

an iterable of record IDs to redact

required
redaction_id str

an identifier for why the records have been redacted

required

Returns:

Type Description
int

the number of IDs that were deleted

Source code in dataimporter/lib/dbs.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def redact(self, record_ids: Iterable[str], redaction_id: str) -> int:
    """
    Add all the given record IDs to the redaction list for this store. The redaction
    ID is stored for later retrieval if needed. This isn't expected to be an ID that
    is represented in this system but perhaps can be looked up in a different system
    to understand why the record has been redacted.

    :param record_ids: an iterable of record IDs to redact
    :param redaction_id: an identifier for why the records have been redacted
    :return: the number of IDs that were deleted
    """
    purged = self.purge(record_ids)
    with self.redactions.get_writer() as wb:
        for record_id in record_ids:
            wb.put(record_id.encode('utf-8'), redaction_id.encode('utf-8'))
    # update the redaction cache
    self._redaction_cache = self.get_all_redacted_ids()
    return purged

release_records(up_to)

Move records from the embargo DB to the data DB which should be released from their embargo based on the up_to parameter. Any record with an embargo date before this up_to timestamp will be released.

Parameters:

Name Type Description Default
up_to int

the timestamp threshold to release up to

required

Returns:

Type Description
Iterable[SourceRecord]

yields SourceRecord objects for each released record

Source code in dataimporter/lib/dbs.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def release_records(self, up_to: int) -> Iterable[SourceRecord]:
    """
    Move records from the embargo DB to the data DB which should be released from
    their embargo based on the up_to parameter. Any record with an embargo date
    before this up_to timestamp will be released.

    :param up_to: the timestamp threshold to release up to
    :return: yields SourceRecord objects for each released record
    """
    released = (
        record
        for record in iter_source_records(self.embargoes.values(), False)
        if record.get_embargo() <= up_to
    )
    # deal with 1000 records at a time
    for batch in partition(released, 1000):
        self.embargoes.purge(record.id.encode('utf-8') for record in batch)
        self.put(batch)
        yield from batch

size(include_deletes=False)

Returns the number of records present in this store.

Parameters:

Name Type Description Default
include_deletes

whether to count deleted records or not

False

Returns:

Type Description
int

the count

Source code in dataimporter/lib/dbs.py
574
575
576
577
578
579
580
581
582
583
584
585
586
def size(self, include_deletes=False) -> int:
    """
    Returns the number of records present in this store.

    :param include_deletes: whether to count deleted records or not
    :return: the count
    """
    if include_deletes:
        return self.data.size()
    else:
        return sum(
            1 for record in self.iter(yield_deleted=True) if not record.is_deleted
        )

iter_source_records(raw_data_stream, yield_deleted)

Iterate over a bytes stream, converting the bytes to SourceRecords and yielding them.

Parameters:

Name Type Description Default
raw_data_stream Iterable[bytes]

the raw bytes stream

required
yield_deleted bool

whether to yield deleted records

required

Returns:

Type Description
Iterable[SourceRecord]

yield SourceRecord objects

Source code in dataimporter/lib/dbs.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def iter_source_records(
    raw_data_stream: Iterable[bytes], yield_deleted: bool
) -> Iterable[SourceRecord]:
    """
    Iterate over a bytes stream, converting the bytes to SourceRecords and yielding
    them.

    :param raw_data_stream: the raw bytes stream
    :param yield_deleted: whether to yield deleted records
    :return: yield SourceRecord objects
    """
    unpacker = make_unpacker()
    for batch in partition(raw_data_stream, 1000):
        unpacker.feed(b''.join(batch))
        records = (SourceRecord(*params) for params in unpacker)
        if yield_deleted:
            yield from records
        else:
            # filter out the deleted records
            yield from filter(None, records)

make_unpacker()

Creates a new msgpack Unpacker object and returns it. This unpacker uses use_list=False which is faster and more importantly, required, as we assume list- like data structures are stored as tuples in this lib.

Returns:

Type Description
Unpacker

a new Unpacker object

Source code in dataimporter/lib/dbs.py
169
170
171
172
173
174
175
176
177
def make_unpacker() -> msgpack.Unpacker:
    """
    Creates a new msgpack Unpacker object and returns it. This unpacker uses
    use_list=False which is faster and more importantly, required, as we assume list-
    like data structures are stored as tuples in this lib.

    :return: a new Unpacker object
    """
    return msgpack.Unpacker(use_list=False)