Skip to content

Emu

auto(config, one=False, delay_sync=False)

Processes all the available EMu exports, queuing, ingesting, and indexing one day's worth of data at a time ensuring each day's data is represented by a new version.

Source code in dataimporter/cli/emu.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@emu_group.command()
@with_config()
@click.option(
    '--one',
    is_flag=True,
    show_default=True,
    default=False,
    help="Only process the next available day's worth of EMu exports",
)
@click.option(
    '--delay-sync',
    is_flag=True,
    show_default=True,
    default=False,
    help='Perform one sync at the end instead of after each dump set',
)
def auto(config: Config, one: bool = False, delay_sync: bool = False):
    """
    Processes all the available EMu exports, queuing, ingesting, and indexing one day's
    worth of data at a time ensuring each day's data is represented by a new version.
    """
    with use_importer(config) as importer:
        while True:
            console.log('Queuing next dump set')
            date_queued = importer.queue_emu_changes()
            if date_queued is None:
                console.log('No more dumps to import, done')
                break

            console.log(f'Date queued: {date_queued.isoformat()}')

            for name in EMU_VIEWS:
                console.log(f'Adding changes from {name} view to mongo')
                importer.add_to_mongo(name)
                console.log(f'Added changes from {name} view to mongo')

            if delay_sync:
                console.log(f'Delaying sync...')
            else:
                for name in EMU_VIEWS:
                    console.log(f'Syncing changes from {name} view to elasticsearch')
                    importer.sync_to_elasticsearch(name)
                    console.log(f'Finished with {name}')

            if one:
                console.log("Stopping after one day's data as requested")
                break

        if delay_sync:
            for name in EMU_VIEWS:
                console.log(f'Syncing changes from {name} view to elasticsearch')
                importer.sync_to_elasticsearch(name)
                console.log(f'Finished with {name}')

get_emu_date(config)

Prints the latest date we've imported EMu exports for.

Source code in dataimporter/cli/emu.py
106
107
108
109
110
111
112
113
@emu_group.command('get-emu-date')
@with_config()
def get_emu_date(config: Config):
    """
    Prints the latest date we've imported EMu exports for.
    """
    with use_importer(config) as importer:
        console.log(importer.emu_status.get())

queue(amount, config)

Queues the specified amount of EMu exports.

This can be "next", "all", or a positive number.

Source code in dataimporter/cli/emu.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@emu_group.command()
@click.argument('amount', type=str)
@with_config()
def queue(amount: str, config: Config):
    """
    Queues the specified amount of EMu exports.

    This can be "next", "all", or a positive number.
    """
    if amount == 'next':
        amount = 1
        console.log(f'Queuing next dumpset only')
    elif amount == 'all':
        amount = math.inf
        console.log(f'Queuing all available dumpsets')
    else:
        amount = int(amount)
        assert amount > 0, 'Amount must be greater than 0'
        console.log(f'Queuing next {amount} dumpsets')

    with use_importer(config) as importer:
        console.log(f'Current latest queued dumpset date: {importer.emu_status.get()}')

        while amount > 0:
            console.log('Queuing next dump set')
            date_queued = importer.queue_emu_changes()
            if date_queued is None:
                console.log('No data to queue')
                break
            else:
                console.log(f'Date queued: {date_queued.isoformat()}')
            amount -= 1