BigQuery 结果 运行 通过 python 在 Google Cloud 上与 运行ning 在 MAC 上的结果不匹配
BigQuery results run via python in Google Cloud don't match results running on MAC
我有一个 python 应用程序,它 运行 在 BigQuery 上查询并将结果附加到文件中。我在 MAC 工作站 (Yosemite) 和 GC 实例 (ubuntu 14.1) 上 运行 这个,浮点数的结果不同。我怎样才能使它们相同?它们 python 两者的环境相同。
run on google cloud instance
1120224,2015-04-06,23989,866,55159.71274162368,0.04923989554019882,0.021414467106578683,0.03609987911125933,63.69481840834143
54897577,2015-04-06,1188089,43462,2802473.708558333,0.051049132980100984,0.021641920553251377,0.03658143455582873,64.4810111950286
run on mac workstation
1120224,2015-04-06,23989,866,55159.712741623654,0.049239895540198794,0.021414467106578683,0.03609987911125933,63.694818408341405
54897577,2015-04-06,1188089,43462,2802473.708558335,0.05104913298010102,0.021641920553251377,0.03658143455582873,64.48101119502864
import sys
import pdb
import json
from collections import OrderedDict
from csv import DictWriter
from pprint import pprint
from apiclient import discovery
from oauth2client import tools
import functools
import argparse
import httplib2
import time
from subprocess import call
def authenticate_SERVICE_ACCOUNT(service_acct_email, private_key_path):
""" Generic authentication through a service accounts.
Args:
service_acct_email: The service account email associated
with the private key private_key_path: The path to the private key file
"""
from oauth2client.client import SignedJwtAssertionCredentials
with open(private_key_path, 'rb') as pk_file:
key = pk_file.read()
credentials = SignedJwtAssertionCredentials(
service_acct_email,
key,
scope='https://www.googleapis.com/auth/bigquery')
http = httplib2.Http()
auth_http = credentials.authorize(http)
return discovery.build('bigquery', 'v2', http=auth_http)
def create_query(number_of_days_ago):
""" Create a query
Args:
number_of_days_ago: Default value of 1 gets yesterday's data
"""
q = 'SELECT xxxxxxxxxx'
return q;
def translate_row(row, schema):
"""Apply the given schema to the given BigQuery data row.
Args:
row: A single BigQuery row to transform.
schema: The BigQuery table schema to apply to the row, specifically
the list of field dicts.
Returns:
Dict containing keys that match the schema and values that match
the row.
Adpated from bigquery client
https://github.com/tylertreat/BigQuery-Python/blob/master/bigquery/client.py
"""
log = {}
#pdb.set_trace()
# Match each schema column with its associated row value
for index, col_dict in enumerate(schema):
col_name = col_dict['name']
row_value = row['f'][index]['v']
if row_value is None:
log[col_name] = None
continue
# Cast the value for some types
if col_dict['type'] == 'INTEGER':
row_value = int(row_value)
elif col_dict['type'] == 'FLOAT':
row_value = float(row_value)
elif col_dict['type'] == 'BOOLEAN':
row_value = row_value in ('True', 'true', 'TRUE')
log[col_name] = row_value
return log
def extractResult(queryReply):
""" Extract a result from the query reply. Uses schema and rows to translate.
Args:
queryReply: the object returned by bigquery
"""
#pdb.set_trace()
result = []
schema = queryReply.get('schema', {'fields': None})['fields']
rows = queryReply.get('rows',[])
for row in rows:
result.append(translate_row(row, schema))
return result
def writeToCsv(results, filename, ordered_fieldnames, withHeader=True):
""" Create a csv file from a list of rows.
Args:
results: list of rows of data (first row is assumed to be a header)
order_fieldnames: a dict with names of fields in order desired - names must exist in results header
withHeader: a boolen to indicate whether to write out header -
Set to false if you are going to append data to existing csv
"""
try:
the_file = open(filename, "w")
writer = DictWriter(the_file, fieldnames=ordered_fieldnames)
if withHeader:
writer.writeheader()
writer.writerows(results)
the_file.close()
except:
print "Unexpected error:", sys.exc_info()[0]
raise
def runSyncQuery (client, projectId, query, timeout=0):
results = []
try:
print 'timeout:%d' % timeout
jobCollection = client.jobs()
queryData = {'query':query,
'timeoutMs':timeout}
queryReply = jobCollection.query(projectId=projectId,
body=queryData).execute()
jobReference=queryReply['jobReference']
# Timeout exceeded: keep polling until the job is complete.
while(not queryReply['jobComplete']):
print 'Job not yet complete...'
queryReply = jobCollection.getQueryResults(
projectId=jobReference['projectId'],
jobId=jobReference['jobId'],
timeoutMs=timeout).execute()
# If the result has rows, print the rows in the reply.
if('rows' in queryReply):
#print 'has a rows attribute'
#pdb.set_trace();
result = extractResult(queryReply)
results.extend(result)
currentPageRowCount = len(queryReply['rows'])
# Loop through each page of data
while('rows' in queryReply and currentPageRowCount < int(queryReply['totalRows'])):
queryReply = jobCollection.getQueryResults(
projectId=jobReference['projectId'],
jobId=jobReference['jobId'],
startIndex=currentRow).execute()
if('rows' in queryReply):
result = extractResult(queryReply)
results.extend(result)
currentRow += len(queryReply['rows'])
except AccessTokenRefreshError:
print ("The credentials have been revoked or expired, please re-run"
"the application to re-authorize")
except HttpError as err:
print 'Error in runSyncQuery:', pprint.pprint(err.content)
except Exception as err:
print 'Undefined error' % err
return results;
# Main
if __name__ == '__main__':
# Name of file
FILE_NAME = "results.csv"
# Default prior number of days to run query
NUMBER_OF_DAYS = "1"
# BigQuery project id as listed in the Google Developers Console.
PROJECT_ID = 'xxxxxx'
# Service account email address as listed in the Google Developers Console.
SERVICE_ACCOUNT = 'xxxxxx@developer.gserviceaccount.com'
KEY = "/usr/local/xxxxxxxx"
query = create_query(NUMBER_OF_DAYS)
# Authenticate
client = authenticate_SERVICE_ACCOUNT(SERVICE_ACCOUNT, KEY)
# Get query results
results = runSyncQuery (client, PROJECT_ID, query, timeout=0)
#pdb.set_trace();
# Write results to csv without header
ordered_fieldnames = OrderedDict([('f_split',None),('m_members',None),('f_day',None),('visitors',None),('purchasers',None),('demand',None), ('dmd_per_mem',None),('visitors_per_mem',None),('purchasers_per_visitor',None),('dmd_per_purchaser',None)])
writeToCsv(results, FILE_NAME, ordered_fieldnames, False)
# Backup current data
backupfilename = "data_bk-" + time.strftime("%y-%m-%d") + ".csv"
call(['cp','../data/data.csv',backupfilename])
# Concatenate new results to data
with open("../data/data.csv", "ab") as outfile:
with open("results.csv","rb") as infile:
line = infile.read()
outfile.write(line)
假设浮点数是不确定的:
https://randomascii.wordpress.com/2013/07/16/floating-point-determinism/
“the IEEE standard does not guarantee that the same program will
deliver identical results on all conforming systems.”
您提到这些来自浮点数据的总和。正如 Felipe 提到的,浮点数很尴尬;它违反了我们倾向于假设的一些数学恒等式。
在这种情况下,联想 属性 是咬我们的那个。也就是说,通常 (A+B)+C == A+(B+C)
。但是,在浮点数学中,情况并非如此。每个操作都是一个近似值;如果用 'approx' 函数包装,您会看得更清楚:approx(approx(A+B) + C)
明显不同于 approx(A + approx(B+C))
。
如果您考虑 bigquery 如何计算聚合,它会构建一个执行树,并计算要在树的叶子上聚合的值。当这些答案准备就绪时,它们会传回树的更高级别并聚合(假设它们已添加)。 "when they're ready" 部分使其具有不确定性。
一个节点可能会以第一次 A,B,C
和第二次 C,A, B
的顺序返回结果。这意味着分发顺序将发生变化,因为您将在第一次获得 approx(approx(A + B) + C)
,而在第二次获得 approx(approx(C, A) + B)
。请注意,由于我们正在处理排序,因此看起来交换 属性 可能是有问题的,但事实并非如此; A+B
在浮动数学中与 B+A
相同。问题实际上是您要添加不相关的部分结果。
浮点数学具有各种讨厌的特性,如果您依赖于精度,通常应该避免使用。
我有一个 python 应用程序,它 运行 在 BigQuery 上查询并将结果附加到文件中。我在 MAC 工作站 (Yosemite) 和 GC 实例 (ubuntu 14.1) 上 运行 这个,浮点数的结果不同。我怎样才能使它们相同?它们 python 两者的环境相同。
run on google cloud instance 1120224,2015-04-06,23989,866,55159.71274162368,0.04923989554019882,0.021414467106578683,0.03609987911125933,63.69481840834143 54897577,2015-04-06,1188089,43462,2802473.708558333,0.051049132980100984,0.021641920553251377,0.03658143455582873,64.4810111950286
run on mac workstation 1120224,2015-04-06,23989,866,55159.712741623654,0.049239895540198794,0.021414467106578683,0.03609987911125933,63.694818408341405 54897577,2015-04-06,1188089,43462,2802473.708558335,0.05104913298010102,0.021641920553251377,0.03658143455582873,64.48101119502864
import sys
import pdb
import json
from collections import OrderedDict
from csv import DictWriter
from pprint import pprint
from apiclient import discovery
from oauth2client import tools
import functools
import argparse
import httplib2
import time
from subprocess import call
def authenticate_SERVICE_ACCOUNT(service_acct_email, private_key_path):
""" Generic authentication through a service accounts.
Args:
service_acct_email: The service account email associated
with the private key private_key_path: The path to the private key file
"""
from oauth2client.client import SignedJwtAssertionCredentials
with open(private_key_path, 'rb') as pk_file:
key = pk_file.read()
credentials = SignedJwtAssertionCredentials(
service_acct_email,
key,
scope='https://www.googleapis.com/auth/bigquery')
http = httplib2.Http()
auth_http = credentials.authorize(http)
return discovery.build('bigquery', 'v2', http=auth_http)
def create_query(number_of_days_ago):
""" Create a query
Args:
number_of_days_ago: Default value of 1 gets yesterday's data
"""
q = 'SELECT xxxxxxxxxx'
return q;
def translate_row(row, schema):
"""Apply the given schema to the given BigQuery data row.
Args:
row: A single BigQuery row to transform.
schema: The BigQuery table schema to apply to the row, specifically
the list of field dicts.
Returns:
Dict containing keys that match the schema and values that match
the row.
Adpated from bigquery client
https://github.com/tylertreat/BigQuery-Python/blob/master/bigquery/client.py
"""
log = {}
#pdb.set_trace()
# Match each schema column with its associated row value
for index, col_dict in enumerate(schema):
col_name = col_dict['name']
row_value = row['f'][index]['v']
if row_value is None:
log[col_name] = None
continue
# Cast the value for some types
if col_dict['type'] == 'INTEGER':
row_value = int(row_value)
elif col_dict['type'] == 'FLOAT':
row_value = float(row_value)
elif col_dict['type'] == 'BOOLEAN':
row_value = row_value in ('True', 'true', 'TRUE')
log[col_name] = row_value
return log
def extractResult(queryReply):
""" Extract a result from the query reply. Uses schema and rows to translate.
Args:
queryReply: the object returned by bigquery
"""
#pdb.set_trace()
result = []
schema = queryReply.get('schema', {'fields': None})['fields']
rows = queryReply.get('rows',[])
for row in rows:
result.append(translate_row(row, schema))
return result
def writeToCsv(results, filename, ordered_fieldnames, withHeader=True):
""" Create a csv file from a list of rows.
Args:
results: list of rows of data (first row is assumed to be a header)
order_fieldnames: a dict with names of fields in order desired - names must exist in results header
withHeader: a boolen to indicate whether to write out header -
Set to false if you are going to append data to existing csv
"""
try:
the_file = open(filename, "w")
writer = DictWriter(the_file, fieldnames=ordered_fieldnames)
if withHeader:
writer.writeheader()
writer.writerows(results)
the_file.close()
except:
print "Unexpected error:", sys.exc_info()[0]
raise
def runSyncQuery (client, projectId, query, timeout=0):
results = []
try:
print 'timeout:%d' % timeout
jobCollection = client.jobs()
queryData = {'query':query,
'timeoutMs':timeout}
queryReply = jobCollection.query(projectId=projectId,
body=queryData).execute()
jobReference=queryReply['jobReference']
# Timeout exceeded: keep polling until the job is complete.
while(not queryReply['jobComplete']):
print 'Job not yet complete...'
queryReply = jobCollection.getQueryResults(
projectId=jobReference['projectId'],
jobId=jobReference['jobId'],
timeoutMs=timeout).execute()
# If the result has rows, print the rows in the reply.
if('rows' in queryReply):
#print 'has a rows attribute'
#pdb.set_trace();
result = extractResult(queryReply)
results.extend(result)
currentPageRowCount = len(queryReply['rows'])
# Loop through each page of data
while('rows' in queryReply and currentPageRowCount < int(queryReply['totalRows'])):
queryReply = jobCollection.getQueryResults(
projectId=jobReference['projectId'],
jobId=jobReference['jobId'],
startIndex=currentRow).execute()
if('rows' in queryReply):
result = extractResult(queryReply)
results.extend(result)
currentRow += len(queryReply['rows'])
except AccessTokenRefreshError:
print ("The credentials have been revoked or expired, please re-run"
"the application to re-authorize")
except HttpError as err:
print 'Error in runSyncQuery:', pprint.pprint(err.content)
except Exception as err:
print 'Undefined error' % err
return results;
# Main
if __name__ == '__main__':
# Name of file
FILE_NAME = "results.csv"
# Default prior number of days to run query
NUMBER_OF_DAYS = "1"
# BigQuery project id as listed in the Google Developers Console.
PROJECT_ID = 'xxxxxx'
# Service account email address as listed in the Google Developers Console.
SERVICE_ACCOUNT = 'xxxxxx@developer.gserviceaccount.com'
KEY = "/usr/local/xxxxxxxx"
query = create_query(NUMBER_OF_DAYS)
# Authenticate
client = authenticate_SERVICE_ACCOUNT(SERVICE_ACCOUNT, KEY)
# Get query results
results = runSyncQuery (client, PROJECT_ID, query, timeout=0)
#pdb.set_trace();
# Write results to csv without header
ordered_fieldnames = OrderedDict([('f_split',None),('m_members',None),('f_day',None),('visitors',None),('purchasers',None),('demand',None), ('dmd_per_mem',None),('visitors_per_mem',None),('purchasers_per_visitor',None),('dmd_per_purchaser',None)])
writeToCsv(results, FILE_NAME, ordered_fieldnames, False)
# Backup current data
backupfilename = "data_bk-" + time.strftime("%y-%m-%d") + ".csv"
call(['cp','../data/data.csv',backupfilename])
# Concatenate new results to data
with open("../data/data.csv", "ab") as outfile:
with open("results.csv","rb") as infile:
line = infile.read()
outfile.write(line)
假设浮点数是不确定的:
https://randomascii.wordpress.com/2013/07/16/floating-point-determinism/
“the IEEE standard does not guarantee that the same program will deliver identical results on all conforming systems.”
您提到这些来自浮点数据的总和。正如 Felipe 提到的,浮点数很尴尬;它违反了我们倾向于假设的一些数学恒等式。
在这种情况下,联想 属性 是咬我们的那个。也就是说,通常 (A+B)+C == A+(B+C)
。但是,在浮点数学中,情况并非如此。每个操作都是一个近似值;如果用 'approx' 函数包装,您会看得更清楚:approx(approx(A+B) + C)
明显不同于 approx(A + approx(B+C))
。
如果您考虑 bigquery 如何计算聚合,它会构建一个执行树,并计算要在树的叶子上聚合的值。当这些答案准备就绪时,它们会传回树的更高级别并聚合(假设它们已添加)。 "when they're ready" 部分使其具有不确定性。
一个节点可能会以第一次 A,B,C
和第二次 C,A, B
的顺序返回结果。这意味着分发顺序将发生变化,因为您将在第一次获得 approx(approx(A + B) + C)
,而在第二次获得 approx(approx(C, A) + B)
。请注意,由于我们正在处理排序,因此看起来交换 属性 可能是有问题的,但事实并非如此; A+B
在浮动数学中与 B+A
相同。问题实际上是您要添加不相关的部分结果。
浮点数学具有各种讨厌的特性,如果您依赖于精度,通常应该避免使用。