Source code for pandas.io.gbq

import warnings
from datetime import datetime
import json
import logging
from time import sleep
import uuid
import time
import sys

import numpy as np

from distutils.version import StrictVersion
from pandas import compat
from pandas.core.api import DataFrame
from pandas.tools.merge import concat
from pandas.core.common import PandasError
from pandas.compat import lzip, bytes_to_str


def _check_google_client_version():

    try:
        import pkg_resources

    except ImportError:
        raise ImportError('Could not import pkg_resources (setuptools).')

    if compat.PY3:
        google_api_minimum_version = '1.4.1'
    else:
        google_api_minimum_version = '1.2.0'

    _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution(
        'google-api-python-client').version

    if (StrictVersion(_GOOGLE_API_CLIENT_VERSION) <
            StrictVersion(google_api_minimum_version)):
        raise ImportError("pandas requires google-api-python-client >= {0} "
                          "for Google BigQuery support, "
                          "current version {1}"
                          .format(google_api_minimum_version,
                                  _GOOGLE_API_CLIENT_VERSION))


def _test_google_api_imports():

    try:
        import httplib2  # noqa
        try:
            from googleapiclient.discovery import build  # noqa
            from googleapiclient.errors import HttpError  # noqa
        except:
            from apiclient.discovery import build  # noqa
            from apiclient.errors import HttpError  # noqa
        from oauth2client.client import AccessTokenRefreshError  # noqa
        from oauth2client.client import OAuth2WebServerFlow  # noqa
        from oauth2client.file import Storage  # noqa
        from oauth2client.tools import run_flow, argparser  # noqa
    except ImportError as e:
        raise ImportError("Missing module required for Google BigQuery "
                          "support: {0}".format(str(e)))

logger = logging.getLogger('pandas.io.gbq')
logger.setLevel(logging.ERROR)


class InvalidPrivateKeyFormat(PandasError, ValueError):
    """
    Raised when provided private key has invalid format.
    """
    pass


class AccessDenied(PandasError, ValueError):
    """
    Raised when invalid credentials are provided, or tokens have expired.
    """
    pass


class DatasetCreationError(PandasError, ValueError):
    """
    Raised when the create dataset method fails
    """
    pass


class GenericGBQException(PandasError, ValueError):
    """
    Raised when an unrecognized Google API Error occurs.
    """
    pass


class InvalidColumnOrder(PandasError, ValueError):
    """
    Raised when the provided column order for output
    results DataFrame does not match the schema
    returned by BigQuery.
    """
    pass


class InvalidPageToken(PandasError, ValueError):
    """
    Raised when Google BigQuery fails to return,
    or returns a duplicate page token.
    """
    pass


class InvalidSchema(PandasError, ValueError):
    """
    Raised when the provided DataFrame does
    not match the schema of the destination
    table in BigQuery.
    """
    pass


class NotFoundException(PandasError, ValueError):
    """
    Raised when the project_id, table or dataset provided in the query could
    not be found.
    """
    pass


class StreamingInsertError(PandasError, ValueError):
    """
    Raised when BigQuery reports a streaming insert error.
    For more information see `Streaming Data Into BigQuery
    <https://cloud.google.com/bigquery/streaming-data-into-bigquery>`__
    """


class TableCreationError(PandasError, ValueError):
    """
    Raised when the create table method fails
    """
    pass


class GbqConnector(object):
    scope = 'https://www.googleapis.com/auth/bigquery'

    def __init__(self, project_id, reauth=False, verbose=False,
                 private_key=None):
        _check_google_client_version()
        _test_google_api_imports()
        self.project_id = project_id
        self.reauth = reauth
        self.verbose = verbose
        self.private_key = private_key
        self.credentials = self.get_credentials()
        self.service = self.get_service()

    def get_credentials(self):
        if self.private_key:
            return self.get_service_account_credentials()
        else:
            return self.get_user_account_credentials()

    def get_user_account_credentials(self):
        from oauth2client.client import OAuth2WebServerFlow
        from oauth2client.file import Storage
        from oauth2client.tools import run_flow, argparser

        flow = OAuth2WebServerFlow(
            client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd'
                       '.apps.googleusercontent.com'),
            client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
            scope=self.scope,
            redirect_uri='urn:ietf:wg:oauth:2.0:oob')

        storage = Storage('bigquery_credentials.dat')
        credentials = storage.get()

        if credentials is None or credentials.invalid or self.reauth:
            credentials = run_flow(flow, storage, argparser.parse_args([]))

        return credentials

    def get_service_account_credentials(self):
        # Bug fix for https://github.com/pydata/pandas/issues/12572
        # We need to know that a supported version of oauth2client is installed
        # Test that either of the following is installed:
        # - SignedJwtAssertionCredentials from oauth2client.client
        # - ServiceAccountCredentials from oauth2client.service_account
        # SignedJwtAssertionCredentials is available in oauthclient < 2.0.0
        # ServiceAccountCredentials is available in oauthclient >= 2.0.0
        oauth2client_v1 = True
        oauth2client_v2 = True

        try:
            from oauth2client.client import SignedJwtAssertionCredentials
        except ImportError:
            oauth2client_v1 = False

        try:
            from oauth2client.service_account import ServiceAccountCredentials
        except ImportError:
            oauth2client_v2 = False

        if not oauth2client_v1 and not oauth2client_v2:
            raise ImportError("Missing oauth2client required for BigQuery "
                              "service account support")

        from os.path import isfile

        try:
            if isfile(self.private_key):
                with open(self.private_key) as f:
                    json_key = json.loads(f.read())
            else:
                # ugly hack: 'private_key' field has new lines inside,
                # they break json parser, but we need to preserve them
                json_key = json.loads(self.private_key.replace('\n', '   '))
                json_key['private_key'] = json_key['private_key'].replace(
                    '   ', '\n')

            if compat.PY3:
                json_key['private_key'] = bytes(
                    json_key['private_key'], 'UTF-8')

            if oauth2client_v1:
                return SignedJwtAssertionCredentials(
                    json_key['client_email'],
                    json_key['private_key'],
                    self.scope,
                )
            else:
                return ServiceAccountCredentials.from_json_keyfile_dict(
                    json_key,
                    self.scope)
        except (KeyError, ValueError, TypeError, AttributeError):
            raise InvalidPrivateKeyFormat(
                "Private key is missing or invalid. It should be service "
                "account private key JSON (file path or string contents) "
                "with at least two keys: 'client_email' and 'private_key'. "
                "Can be obtained from: https://console.developers.google."
                "com/permissions/serviceaccounts")

    def _print(self, msg, end='\n'):
        if self.verbose:
            sys.stdout.write(msg + end)
            sys.stdout.flush()

    def _start_timer(self):
        self.start = time.time()

    def get_elapsed_seconds(self):
        return round(time.time() - self.start, 2)

    def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.',
                              overlong=7):
        sec = self.get_elapsed_seconds()
        if sec > overlong:
            self._print('{} {} {}'.format(prefix, sec, postfix))

    # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
    @staticmethod
    def sizeof_fmt(num, suffix='b'):
        fmt = "%3.1f %s%s"
        for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']:
            if abs(num) < 1024.0:
                return fmt % (num, unit, suffix)
            num /= 1024.0
        return fmt % (num, 'Y', suffix)

    def get_service(self):
        import httplib2
        try:
            from googleapiclient.discovery import build
        except:
            from apiclient.discovery import build

        http = httplib2.Http()
        http = self.credentials.authorize(http)
        bigquery_service = build('bigquery', 'v2', http=http)

        return bigquery_service

    @staticmethod
    def process_http_error(ex):
        # See `BigQuery Troubleshooting Errors
        # <https://cloud.google.com/bigquery/troubleshooting-errors>`__

        status = json.loads(bytes_to_str(ex.content))['error']
        errors = status.get('errors', None)

        if errors:
            for error in errors:
                reason = error['reason']
                message = error['message']

                raise GenericGBQException(
                    "Reason: {0}, Message: {1}".format(reason, message))

        raise GenericGBQException(errors)

    def process_insert_errors(self, insert_errors):
        for insert_error in insert_errors:
            row = insert_error['index']
            errors = insert_error.get('errors', None)
            for error in errors:
                reason = error['reason']
                message = error['message']
                location = error['location']
                error_message = ('Error at Row: {0}, Reason: {1}, '
                                 'Location: {2}, Message: {3}'
                                 .format(row, reason, location, message))

                # Report all error messages if verbose is set
                if self.verbose:
                    self._print(error_message)
                else:
                    raise StreamingInsertError(error_message +
                                               '\nEnable verbose logging to '
                                               'see all errors')

        raise StreamingInsertError

    def run_query(self, query):
        try:
            from googleapiclient.errors import HttpError
        except:
            from apiclient.errors import HttpError
        from oauth2client.client import AccessTokenRefreshError

        _check_google_client_version()

        job_collection = self.service.jobs()
        job_data = {
            'configuration': {
                'query': {
                    'query': query
                    # 'allowLargeResults', 'createDisposition',
                    # 'preserveNulls', destinationTable, useQueryCache
                }
            }
        }

        self._start_timer()
        try:
            self._print('Requesting query... ', end="")
            query_reply = job_collection.insert(
                projectId=self.project_id, body=job_data).execute()
            self._print('ok.\nQuery running...')
        except (AccessTokenRefreshError, ValueError):
            if self.private_key:
                raise AccessDenied(
                    "The service account credentials are not valid")
            else:
                raise AccessDenied(
                    "The credentials have been revoked or expired, "
                    "please re-run the application to re-authorize")
        except HttpError as ex:
            self.process_http_error(ex)

        job_reference = query_reply['jobReference']

        while not query_reply.get('jobComplete', False):
            self.print_elapsed_seconds('  Elapsed', 's. Waiting...')
            try:
                query_reply = job_collection.getQueryResults(
                    projectId=job_reference['projectId'],
                    jobId=job_reference['jobId']).execute()
            except HttpError as ex:
                self.process_http_error(ex)

        if self.verbose:
            if query_reply['cacheHit']:
                self._print('Query done.\nCache hit.\n')
            else:
                bytes_processed = int(query_reply.get(
                    'totalBytesProcessed', '0'))
                self._print('Query done.\nProcessed: {}\n'.format(
                    self.sizeof_fmt(bytes_processed)))

            self._print('Retrieving results...')

        total_rows = int(query_reply['totalRows'])
        result_pages = list()
        seen_page_tokens = list()
        current_row = 0
        # Only read schema on first page
        schema = query_reply['schema']

        # Loop through each page of data
        while 'rows' in query_reply and current_row < total_rows:
            page = query_reply['rows']
            result_pages.append(page)
            current_row += len(page)

            self.print_elapsed_seconds(
                '  Got page: {}; {}% done. Elapsed'.format(
                    len(result_pages),
                    round(100.0 * current_row / total_rows)))

            if current_row == total_rows:
                break

            page_token = query_reply.get('pageToken', None)

            if not page_token and current_row < total_rows:
                raise InvalidPageToken("Required pageToken was missing. "
                                       "Received {0} of {1} rows"
                                       .format(current_row, total_rows))

            elif page_token in seen_page_tokens:
                raise InvalidPageToken("A duplicate pageToken was returned")

            seen_page_tokens.append(page_token)

            try:
                query_reply = job_collection.getQueryResults(
                    projectId=job_reference['projectId'],
                    jobId=job_reference['jobId'],
                    pageToken=page_token).execute()
            except HttpError as ex:
                self.process_http_error(ex)

        if current_row < total_rows:
            raise InvalidPageToken()

        # print basic query stats
        self._print('Got {} rows.\n'.format(total_rows))

        return schema, result_pages

    def load_data(self, dataframe, dataset_id, table_id, chunksize):
        try:
            from googleapiclient.errors import HttpError
        except:
            from apiclient.errors import HttpError

        job_id = uuid.uuid4().hex
        rows = []
        remaining_rows = len(dataframe)

        if self.verbose:
            total_rows = remaining_rows
            self._print("\n\n")

        for index, row in dataframe.reset_index(drop=True).iterrows():
            row_dict = dict()
            row_dict['json'] = json.loads(row.to_json(force_ascii=False,
                                                      date_unit='s',
                                                      date_format='iso'))
            row_dict['insertId'] = job_id + str(index)
            rows.append(row_dict)
            remaining_rows -= 1

            if (len(rows) % chunksize == 0) or (remaining_rows == 0):
                self._print("\rStreaming Insert is {0}% Complete".format(
                    ((total_rows - remaining_rows) * 100) / total_rows))

                body = {'rows': rows}

                try:
                    response = self.service.tabledata().insertAll(
                        projectId=self.project_id,
                        datasetId=dataset_id,
                        tableId=table_id,
                        body=body).execute()
                except HttpError as ex:
                    self.process_http_error(ex)

                # For streaming inserts, even if you receive a success HTTP
                # response code, you'll need to check the insertErrors property
                # of the response to determine if the row insertions were
                # successful, because it's possible that BigQuery was only
                # partially successful at inserting the rows.  See the `Success
                # HTTP Response Codes
                # <https://cloud.google.com/bigquery/
                #       streaming-data-into-bigquery#troubleshooting>`__
                # section

                insert_errors = response.get('insertErrors', None)
                if insert_errors:
                    self.process_insert_errors(insert_errors)

                sleep(1)  # Maintains the inserts "per second" rate per API
                rows = []

        self._print("\n")

    def verify_schema(self, dataset_id, table_id, schema):
        try:
            from googleapiclient.errors import HttpError
        except:
            from apiclient.errors import HttpError

        try:
            return (self.service.tables().get(
                projectId=self.project_id,
                datasetId=dataset_id,
                tableId=table_id
            ).execute()['schema']) == schema

        except HttpError as ex:
            self.process_http_error(ex)

    def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
        delay = 0

        # Changes to table schema may take up to 2 minutes as of May 2015 See
        # `Issue 191
        # <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
        # Compare previous schema with new schema to determine if there should
        # be a 120 second delay

        if not self.verify_schema(dataset_id, table_id, table_schema):
            self._print('The existing table has a different schema. Please '
                        'wait 2 minutes. See Google BigQuery issue #191')
            delay = 120

        table = _Table(self.project_id, dataset_id,
                       private_key=self.private_key)
        table.delete(table_id)
        table.create(table_id, table_schema)
        sleep(delay)


def _parse_data(schema, rows):
    # see:
    # http://pandas.pydata.org/pandas-docs/dev/missing_data.html
    # #missing-data-casting-rules-and-indexing
    dtype_map = {'INTEGER': np.dtype(float),
                 'FLOAT': np.dtype(float),
                 # This seems to be buggy without nanosecond indicator
                 'TIMESTAMP': 'M8[ns]'}

    fields = schema['fields']
    col_types = [field['type'] for field in fields]
    col_names = [str(field['name']) for field in fields]
    col_dtypes = [dtype_map.get(field['type'], object) for field in fields]
    page_array = np.zeros((len(rows),),
                          dtype=lzip(col_names, col_dtypes))

    for row_num, raw_row in enumerate(rows):
        entries = raw_row.get('f', [])
        for col_num, field_type in enumerate(col_types):
            field_value = _parse_entry(entries[col_num].get('v', ''),
                                       field_type)
            page_array[row_num][col_num] = field_value

    return DataFrame(page_array, columns=col_names)


def _parse_entry(field_value, field_type):
    if field_value is None or field_value == 'null':
        return None
    if field_type == 'INTEGER' or field_type == 'FLOAT':
        return float(field_value)
    elif field_type == 'TIMESTAMP':
        timestamp = datetime.utcfromtimestamp(float(field_value))
        return np.datetime64(timestamp)
    elif field_type == 'BOOLEAN':
        return field_value == 'true'
    return field_value


[docs]def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True, private_key=None): """Load data from Google BigQuery. THIS IS AN EXPERIMENTAL LIBRARY The main method a user calls to execute a Query in Google BigQuery and read results into a pandas DataFrame. Google BigQuery API Client Library v2 for Python is used. Documentation is available at https://developers.google.com/api-client-library/python/apis/bigquery/v2 Authentication to the Google BigQuery service is via OAuth 2.0. By default user account credentials are used. You will be asked to grant permissions for product name 'pandas GBQ'. It is also posible to authenticate via service account credentials by using private_key parameter. Parameters ---------- query : str SQL-Like Query to return data values project_id : str Google BigQuery Account project ID. index_col : str (optional) Name of result column to use for index in results DataFrame col_order : list(str) (optional) List of BigQuery column names in the desired order for results DataFrame reauth : boolean (default False) Force Google BigQuery to reauthenticate the user. This is useful if multiple accounts are used. verbose : boolean (default True) Verbose output private_key : str (optional) Service account private key in JSON format. Can be file path or string contents. This is useful for remote server authentication (eg. jupyter iPython notebook on remote host) Returns ------- df: DataFrame DataFrame representing results of query """ if not project_id: raise TypeError("Missing required parameter: project_id") connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, private_key=private_key) schema, pages = connector.run_query(query) dataframe_list = [] while len(pages) > 0: page = pages.pop() dataframe_list.append(_parse_data(schema, page)) if len(dataframe_list) > 0: final_df = concat(dataframe_list, ignore_index=True) else: final_df = _parse_data(schema, []) # Reindex the DataFrame on the provided column if index_col is not None: if index_col in final_df.columns: final_df.set_index(index_col, inplace=True) else: raise InvalidColumnOrder( 'Index column "{0}" does not exist in DataFrame.' .format(index_col) ) # Change the order of columns in the DataFrame based on provided list if col_order is not None: if sorted(col_order) == sorted(final_df.columns): final_df = final_df[col_order] else: raise InvalidColumnOrder( 'Column order does not match this DataFrame.' ) # Downcast floats to integers and objects to booleans # if there are no NaN's. This is presently due to a # limitation of numpy in handling missing data. final_df._data = final_df._data.downcast(dtypes='infer') connector.print_elapsed_seconds( 'Total time taken', datetime.now().strftime('s.\nFinished at %Y-%m-%d %H:%M:%S.'), 0 ) return final_df
[docs]def to_gbq(dataframe, destination_table, project_id, chunksize=10000, verbose=True, reauth=False, if_exists='fail', private_key=None): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY The main method a user calls to export pandas DataFrame contents to Google BigQuery table. Google BigQuery API Client Library v2 for Python is used. Documentation is available at https://developers.google.com/api-client-library/python/apis/bigquery/v2 Authentication to the Google BigQuery service is via OAuth 2.0. By default user account credentials are used. You will be asked to grant permissions for product name 'pandas GBQ'. It is also posible to authenticate via service account credentials by using private_key parameter. Parameters ---------- dataframe : DataFrame DataFrame to be written destination_table : string Name of table to be written, in the form 'dataset.tablename' project_id : str Google BigQuery Account project ID. chunksize : int (default 10000) Number of rows to be inserted in each chunk from the dataframe. verbose : boolean (default True) Show percentage complete reauth : boolean (default False) Force Google BigQuery to reauthenticate the user. This is useful if multiple accounts are used. if_exists : {'fail', 'replace', 'append'}, default 'fail' 'fail': If table exists, do nothing. 'replace': If table exists, drop it, recreate it, and insert data. 'append': If table exists, insert data. Create if does not exist. private_key : str (optional) Service account private key in JSON format. Can be file path or string contents. This is useful for remote server authentication (eg. jupyter iPython notebook on remote host) """ if if_exists not in ('fail', 'replace', 'append'): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) if '.' not in destination_table: raise NotFoundException( "Invalid Table Name. Should be of the form 'datasetId.tableId' ") connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, private_key=private_key) dataset_id, table_id = destination_table.rsplit('.', 1) table = _Table(project_id, dataset_id, reauth=reauth, private_key=private_key) table_schema = _generate_bq_schema(dataframe) # If table exists, check if_exists parameter if table.exists(table_id): if if_exists == 'fail': raise TableCreationError("Could not create the table because it " "already exists. " "Change the if_exists parameter to " "append or replace data.") elif if_exists == 'replace': connector.delete_and_recreate_table( dataset_id, table_id, table_schema) elif if_exists == 'append': if not connector.verify_schema(dataset_id, table_id, table_schema): raise InvalidSchema("Please verify that the column order, " "structure and data types in the " "DataFrame match the schema of the " "destination table.") else: table.create(table_id, table_schema) connector.load_data(dataframe, dataset_id, table_id, chunksize)
def generate_bq_schema(df, default_type='STRING'): # deprecation TimeSeries, #11121 warnings.warn("generate_bq_schema is deprecated and will be removed in " "a future version", FutureWarning, stacklevel=2) return _generate_bq_schema(df, default_type=default_type) def _generate_bq_schema(df, default_type='STRING'): """ Given a passed df, generate the associated Google BigQuery schema. Parameters ---------- df : DataFrame default_type : string The default big query type in case the type of the column does not exist in the schema. """ type_mapping = { 'i': 'INTEGER', 'b': 'BOOLEAN', 'f': 'FLOAT', 'O': 'STRING', 'S': 'STRING', 'U': 'STRING', 'M': 'TIMESTAMP' } fields = [] for column_name, dtype in df.dtypes.iteritems(): fields.append({'name': column_name, 'type': type_mapping.get(dtype.kind, default_type)}) return {'fields': fields} class _Table(GbqConnector): def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): try: from googleapiclient.errors import HttpError except: from apiclient.errors import HttpError self.http_error = HttpError self.dataset_id = dataset_id super(_Table, self).__init__(project_id, reauth, verbose, private_key) def exists(self, table_id): """ Check if a table exists in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- table : str Name of table to be verified Returns ------- boolean true if table exists, otherwise false """ try: self.service.tables().get( projectId=self.project_id, datasetId=self.dataset_id, tableId=table_id).execute() return True except self.http_error as ex: if ex.resp.status == 404: return False else: self.process_http_error(ex) def create(self, table_id, schema): """ Create a table in Google BigQuery given a table and schema .. versionadded:: 0.17.0 Parameters ---------- table : str Name of table to be written schema : str Use the generate_bq_schema to generate your table schema from a dataframe. """ if self.exists(table_id): raise TableCreationError( "The table could not be created because it already exists") if not _Dataset(self.project_id, private_key=self.private_key).exists(self.dataset_id): _Dataset(self.project_id, private_key=self.private_key).create(self.dataset_id) body = { 'schema': schema, 'tableReference': { 'tableId': table_id, 'projectId': self.project_id, 'datasetId': self.dataset_id } } try: self.service.tables().insert( projectId=self.project_id, datasetId=self.dataset_id, body=body).execute() except self.http_error as ex: self.process_http_error(ex) def delete(self, table_id): """ Delete a table in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- table : str Name of table to be deleted """ if not self.exists(table_id): raise NotFoundException("Table does not exist") try: self.service.tables().delete( datasetId=self.dataset_id, projectId=self.project_id, tableId=table_id).execute() except self.http_error as ex: self.process_http_error(ex) class _Dataset(GbqConnector): def __init__(self, project_id, reauth=False, verbose=False, private_key=None): try: from googleapiclient.errors import HttpError except: from apiclient.errors import HttpError self.http_error = HttpError super(_Dataset, self).__init__(project_id, reauth, verbose, private_key) def exists(self, dataset_id): """ Check if a dataset exists in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- dataset_id : str Name of dataset to be verified Returns ------- boolean true if dataset exists, otherwise false """ try: self.service.datasets().get( projectId=self.project_id, datasetId=dataset_id).execute() return True except self.http_error as ex: if ex.resp.status == 404: return False else: self.process_http_error(ex) def datasets(self): """ Return a list of datasets in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- None Returns ------- list List of datasets under the specific project """ try: list_dataset_response = self.service.datasets().list( projectId=self.project_id).execute().get('datasets', None) if not list_dataset_response: return [] dataset_list = list() for row_num, raw_row in enumerate(list_dataset_response): dataset_list.append(raw_row['datasetReference']['datasetId']) return dataset_list except self.http_error as ex: self.process_http_error(ex) def create(self, dataset_id): """ Create a dataset in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- dataset : str Name of dataset to be written """ if self.exists(dataset_id): raise DatasetCreationError( "The dataset could not be created because it already exists") body = { 'datasetReference': { 'projectId': self.project_id, 'datasetId': dataset_id } } try: self.service.datasets().insert( projectId=self.project_id, body=body).execute() except self.http_error as ex: self.process_http_error(ex) def delete(self, dataset_id): """ Delete a dataset in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- dataset : str Name of dataset to be deleted """ if not self.exists(dataset_id): raise NotFoundException( "Dataset {0} does not exist".format(dataset_id)) try: self.service.datasets().delete( datasetId=dataset_id, projectId=self.project_id).execute() except self.http_error as ex: self.process_http_error(ex) def tables(self, dataset_id): """ List tables in the specific dataset in Google BigQuery .. versionadded:: 0.17.0 Parameters ---------- dataset : str Name of dataset to list tables for Returns ------- list List of tables under the specific dataset """ try: list_table_response = self.service.tables().list( projectId=self.project_id, datasetId=dataset_id).execute().get('tables', None) if not list_table_response: return [] table_list = list() for row_num, raw_row in enumerate(list_table_response): table_list.append(raw_row['tableReference']['tableId']) return table_list except self.http_error as ex: self.process_http_error(ex)