Skip to content

Gbif

GBIFDownload dataclass

Represents a GBIF download URL and some associated metadata.

Source code in dataimporter/ext/gbif.py
145
146
147
148
149
150
151
152
153
154
155
156
@dataclass
class GBIFDownload:
    """
    Represents a GBIF download URL and some associated metadata.
    """

    # this is the URL of the zip itself
    url: str
    # the size of the zip in bytes according to GBIF's API
    zip_size: int
    # the number of records contained in the zip according to GBIF's API
    records: int

GBIFDownloadError

Bases: Exception

If we get a failed or killed status from GBIF when checking the download status we throw this exception to avoid waiting an hour for a download that is never going to succeed.

Source code in dataimporter/ext/gbif.py
131
132
133
134
135
136
137
138
139
140
141
142
class GBIFDownloadError(Exception):
    """
    If we get a failed or killed status from GBIF when checking the download status we
    throw this exception to avoid waiting an hour for a download that is never going to
    succeed.
    """

    def __init__(self, status: str):
        super().__init__(
            f'GBIF download link not available due to error, status: {status}'
        )
        self.status = status

GBIFDownloadTimeout

Bases: Exception

If we time out when trying to access the GBIF download file (i.e. we wait for GBIF to generate the download, but it takes too long) this exception is raised.

Source code in dataimporter/ext/gbif.py
116
117
118
119
120
121
122
123
124
125
126
127
128
class GBIFDownloadTimeout(Exception):
    """
    If we time out when trying to access the GBIF download file (i.e. we wait for GBIF
    to generate the download, but it takes too long) this exception is raised.
    """

    def __init__(self, timeout: int, status: str):
        super().__init__(
            f'GBIF download link not available within timeout ({timeout} seconds, last '
            f'known status: {status}).'
        )
        self.timeout = timeout
        self.status = status

GBIFView

Bases: View

View for GBIF records.

Source code in dataimporter/ext/gbif.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class GBIFView(View):
    """
    View for GBIF records.
    """

    def transform(self, record: SourceRecord) -> dict:
        """
        Converts the GBIF record's raw data to a dict which will then be embedded in
        specimen records and presented on the Data Portal.

        :param record: the record to project
        :return: a dict containing the data for this record that should be combined with
            a specimen record
        """
        data = {
            'gbifID': record.id,
        }
        issue_value = record.get_first_value('issue', default='').strip()
        if issue_value:
            # make a tuple and remove any empty values (in case of formatting weirds)
            data['gbifIssue'] = tuple(
                issue for issue in issue_value.split(';') if issue
            )
        return data

transform(record)

Converts the GBIF record's raw data to a dict which will then be embedded in specimen records and presented on the Data Portal.

Parameters:

Name Type Description Default
record SourceRecord

the record to project

required

Returns:

Type Description
dict

a dict containing the data for this record that should be combined with a specimen record

Source code in dataimporter/ext/gbif.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def transform(self, record: SourceRecord) -> dict:
    """
    Converts the GBIF record's raw data to a dict which will then be embedded in
    specimen records and presented on the Data Portal.

    :param record: the record to project
    :return: a dict containing the data for this record that should be combined with
        a specimen record
    """
    data = {
        'gbifID': record.id,
    }
    issue_value = record.get_first_value('issue', default='').strip()
    if issue_value:
        # make a tuple and remove any empty values (in case of formatting weirds)
        data['gbifIssue'] = tuple(
            issue for issue in issue_value.split(';') if issue
        )
    return data

get_changed_records(store, gbif_username, gbif_password)

Get a stream of the latest records from GBIF. This function will take time to complete as it will request a new download of the NHM's specimen dataset on GBIF, download it, and then stream the records from the downloaded CSV that have changed compared to the ones in the data DB already.

Parameters:

Name Type Description Default
store Store

the GBIF Store

required
gbif_username str

a GBIF username for requesting the download

required
gbif_password str

a GBIF password for requesting the download

required

Returns:

Type Description
Iterable[SourceRecord]

yields the changed SourceRecord objects

Source code in dataimporter/ext/gbif.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_changed_records(
    store: Store, gbif_username: str, gbif_password: str
) -> Iterable[SourceRecord]:
    """
    Get a stream of the latest records from GBIF. This function will take time to
    complete as it will request a new download of the NHM's specimen dataset on GBIF,
    download it, and then stream the records from the downloaded CSV that have changed
    compared to the ones in the data DB already.

    :param store: the GBIF Store
    :param gbif_username: a GBIF username for requesting the download
    :param gbif_password: a GBIF password for requesting the download
    :return: yields the changed SourceRecord objects
    """
    download_id = request_download(gbif_username, gbif_password)
    download = get_download_url(download_id)
    with requests.get(download.url, stream=True) as dl_r:
        with tempfile.NamedTemporaryFile() as tmp_file:
            # download the file to disk
            for chunk in dl_r.iter_content(chunk_size=4096):
                tmp_file.write(chunk)
            # close the http connection to gbif now that we're done downloading the file
            dl_r.close()
            # rewind so that we can read the file
            tmp_file.seek(0)
            # open the zip, read the occurrence file
            with ZipFile(tmp_file) as zip_file:
                with zip_file.open(f'{download_id}.csv') as raw_csv_file:
                    with TextIOWrapper(raw_csv_file, encoding='utf-8') as csv_file:
                        reader: Iterable[Dict[str, str]] = csv.DictReader(
                            csv_file, dialect='excel-tab', quoting=csv.QUOTE_NONE
                        )
                        for row in reader:
                            gbif_id = row['gbifID']
                            updated_record = SourceRecord(gbif_id, row, download_id)
                            existing_record = store.get_record(gbif_id)
                            # if the record has changed or is new, yield it
                            if existing_record != updated_record:
                                yield updated_record

get_download_url(download_id)

Wait for the given download to be ready and then return the URL to download the file.

This function tries every minute for an hour, checking the download status via the GBIF API. Once the status changes to "SUCCEEDED" the download URL is returned. If the status doesn't change to "SUCCEEDED" within the hour then a GBIFDownloadTimeout exception is raised. If a failed statuses appear, the function raises a GBIFDownloadError exception.

Parameters:

Name Type Description Default
download_id str

the GBIF download ID

required

Returns:

Type Description
GBIFDownload

a GBIFDownload object

Source code in dataimporter/ext/gbif.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_download_url(download_id: str) -> GBIFDownload:
    """
    Wait for the given download to be ready and then return the URL to download the
    file.

    This function tries every minute for an hour, checking the download status via the
    GBIF API. Once the status changes to "SUCCEEDED" the download URL is returned. If
    the status doesn't change to "SUCCEEDED" within the hour then a GBIFDownloadTimeout
    exception is raised. If a failed statuses appear, the function raises a
    GBIFDownloadError exception.

    :param download_id: the GBIF download ID
    :return: a GBIFDownload object
    :raise: GBIFDownloadTimeout if the download is not ready within the timeout
    """
    backoff_in_seconds = 60
    max_tries = 60
    url = f'https://api.gbif.org/v1/occurrence/download/{download_id}'
    status = 'PREPARING'

    for _ in range(max_tries):
        with requests.get(url) as r:
            download_info = r.json()
            status = download_info['status']
            if status == 'SUCCEEDED':
                return GBIFDownload(
                    download_info['downloadLink'],
                    download_info['size'],
                    download_info['totalRecords'],
                )
            elif status == 'FAILED':
                raise GBIFDownloadError(status)
            else:
                time.sleep(backoff_in_seconds)

    raise GBIFDownloadTimeout(max_tries * backoff_in_seconds, status)

request_download(gbif_username, gbif_password)

Request a download of the NHM's specimen dataset from GBIF. To request a download we need to be authenticated with GBIF, hence the username and password parameters.

Parameters:

Name Type Description Default
gbif_username str

GBIF account username

required
gbif_password str

GBIF account password

required

Returns:

Type Description
str

the GBIF download ID

Source code in dataimporter/ext/gbif.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def request_download(gbif_username: str, gbif_password: str) -> str:
    """
    Request a download of the NHM's specimen dataset from GBIF. To request a download we
    need to be authenticated with GBIF, hence the username and password parameters.

    :param gbif_username: GBIF account username
    :param gbif_password: GBIF account password
    :return: the GBIF download ID
    """
    download_filter = {
        'creator': gbif_username,
        'notificationAddresses': [],
        'sendNotification': False,
        'format': 'SIMPLE_CSV',
        'predicate': {
            'type': 'equals',
            'key': 'DATASET_KEY',
            # this is the NHM's specimen collection GBIF dataset key
            'value': '7e380070-f762-11e1-a439-00145eb45e9a',
            'matchCase': False,
        },
    }
    auth = HTTPBasicAuth(gbif_username, gbif_password)
    # request a new download
    response = requests.post(
        'https://api.gbif.org/v1/occurrence/download/request',
        json=download_filter,
        auth=auth,
    )
    return response.text