BigQuery 查询失败处理
Bigquery query failure handling
我正在尝试确保我的所有 streaming 查询都已成功插入,但似乎其中一些查询未正确插入。
没有抛出任何错误,但是我没有使用插入请求返回的数据,因为我不知道错误的结构。
这是我的插入代码:
$rows = array();
$row = new Google_Service_Bigquery_TableDataInsertAllRequestRows;
$row->setJson($data);
$rows[0] = $row;
$request = new Google_Service_Bigquery_TableDataInsertAllRequest;
$request->setKind('bigquery#tableDataInsertAllRequest');
$request->setRows($rows);
return $this->service->tabledata->insertAll($project, $dataset, $tableid, $request);
以上代码排除了authentication/setting客户端&服务
你可以用这个方法
/**
*
* @param Google_Client $client
* @param type $project_id
* @param type $dataset_id
* @param type $rows
* @return mixed
* @throws Google_Service_Exception
*/
public function BQ_Tabledata_InsertAll($client, $project_id, $dataset_id, $rows) {
$success = true;
$failed_lines = array();
$last_reason = '';
$ret = array(
'success' => &$success,
'last_reason' => &$last_reason,
'failed_lines' => &$failed_lines,
);
$bq = new Google_Service_Bigquery($client);
$request = new Google_Service_Bigquery_TableDataInsertAllRequest();
$request->setRows($rows);
try {
$resp = new Google_Service_Bigquery_TableDataInsertAllResponse();
$resp = $bq->tabledata->insertAll($project_id, $dataset_id, static::tableId(), $request);
$errors = new Google_Service_Bigquery_TableDataInsertAllResponseInsertErrors();
$errors = @$resp->getInsertErrors();
if (!empty($errors)) {
$error_msg = "\r\nRequest Headers: \r\n" . json_encode($client->request->getRequestHeaders()) . "\r\nResponse Headers: \r\n" . json_encode($client->request->getResponseHeaders()) . "\r\nRequest Body:\r\n" . $client->request->getPostBody() . "\r\nResponse Body:\r\n" . $client->request->getResponseBody() . "\r\n";
if (is_array($errors)) {
foreach ($errors as $eP) {
$arr = $eP->getErrors();
$line = $eP->getIndex();
if (is_array($arr)) {
foreach ($arr as $e) {
switch ($e->getReason()) {
case "stopped":
break;
case "timeout":
$failed_lines[] = $line;
$last_reason = $e->getReason();
$error_msg.= sprintf("Timeout on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
break;
default:
$error_msg.= sprintf("Error on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
break;
}
}
} else {
$error_msg.= json_encode($arr) . "\r\n";
}
}
$this->setErrorMessage($error_msg);
} else {
$this->setErrorMessage($errors);
}
//print_r($errors);
//exit;
$success = false;
}
return $ret;
} catch (Google_Service_Exception $e) {
$this->setErrors($e->getErrors())->setErrorMessage($e->getMessage());
throw $e;
}
}
我正在尝试确保我的所有 streaming 查询都已成功插入,但似乎其中一些查询未正确插入。
没有抛出任何错误,但是我没有使用插入请求返回的数据,因为我不知道错误的结构。
这是我的插入代码:
$rows = array();
$row = new Google_Service_Bigquery_TableDataInsertAllRequestRows;
$row->setJson($data);
$rows[0] = $row;
$request = new Google_Service_Bigquery_TableDataInsertAllRequest;
$request->setKind('bigquery#tableDataInsertAllRequest');
$request->setRows($rows);
return $this->service->tabledata->insertAll($project, $dataset, $tableid, $request);
以上代码排除了authentication/setting客户端&服务
你可以用这个方法
/**
*
* @param Google_Client $client
* @param type $project_id
* @param type $dataset_id
* @param type $rows
* @return mixed
* @throws Google_Service_Exception
*/
public function BQ_Tabledata_InsertAll($client, $project_id, $dataset_id, $rows) {
$success = true;
$failed_lines = array();
$last_reason = '';
$ret = array(
'success' => &$success,
'last_reason' => &$last_reason,
'failed_lines' => &$failed_lines,
);
$bq = new Google_Service_Bigquery($client);
$request = new Google_Service_Bigquery_TableDataInsertAllRequest();
$request->setRows($rows);
try {
$resp = new Google_Service_Bigquery_TableDataInsertAllResponse();
$resp = $bq->tabledata->insertAll($project_id, $dataset_id, static::tableId(), $request);
$errors = new Google_Service_Bigquery_TableDataInsertAllResponseInsertErrors();
$errors = @$resp->getInsertErrors();
if (!empty($errors)) {
$error_msg = "\r\nRequest Headers: \r\n" . json_encode($client->request->getRequestHeaders()) . "\r\nResponse Headers: \r\n" . json_encode($client->request->getResponseHeaders()) . "\r\nRequest Body:\r\n" . $client->request->getPostBody() . "\r\nResponse Body:\r\n" . $client->request->getResponseBody() . "\r\n";
if (is_array($errors)) {
foreach ($errors as $eP) {
$arr = $eP->getErrors();
$line = $eP->getIndex();
if (is_array($arr)) {
foreach ($arr as $e) {
switch ($e->getReason()) {
case "stopped":
break;
case "timeout":
$failed_lines[] = $line;
$last_reason = $e->getReason();
$error_msg.= sprintf("Timeout on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
break;
default:
$error_msg.= sprintf("Error on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
break;
}
}
} else {
$error_msg.= json_encode($arr) . "\r\n";
}
}
$this->setErrorMessage($error_msg);
} else {
$this->setErrorMessage($errors);
}
//print_r($errors);
//exit;
$success = false;
}
return $ret;
} catch (Google_Service_Exception $e) {
$this->setErrors($e->getErrors())->setErrorMessage($e->getMessage());
throw $e;
}
}