BigQuery 查询失败处理

Bigquery query failure handling

我正在尝试确保我的所有 streaming 查询都已成功插入,但似乎其中一些查询未正确插入。

没有抛出任何错误,但是我没有使用插入请求返回的数据,因为我不知道错误的结构。

这是我的插入代码:

    $rows = array();
    $row = new Google_Service_Bigquery_TableDataInsertAllRequestRows;
    $row->setJson($data);
    $rows[0] = $row;
    $request = new Google_Service_Bigquery_TableDataInsertAllRequest;
    $request->setKind('bigquery#tableDataInsertAllRequest');
    $request->setRows($rows);
    return $this->service->tabledata->insertAll($project, $dataset, $tableid, $request);

以上代码排除了authentication/setting客户端&服务

你可以用这个方法

/**
 * 
 * @param Google_Client $client
 * @param type $project_id
 * @param type $dataset_id
 * @param type $rows
 * @return mixed
 * @throws Google_Service_Exception
 */
public function BQ_Tabledata_InsertAll($client, $project_id, $dataset_id, $rows) {
    $success = true;
    $failed_lines = array();
    $last_reason = '';
    $ret = array(
        'success' => &$success,
        'last_reason' => &$last_reason,
        'failed_lines' => &$failed_lines,
    );
    $bq = new Google_Service_Bigquery($client);
    $request = new Google_Service_Bigquery_TableDataInsertAllRequest();
    $request->setRows($rows);
    try {
        $resp = new Google_Service_Bigquery_TableDataInsertAllResponse();
        $resp = $bq->tabledata->insertAll($project_id, $dataset_id, static::tableId(), $request);
        $errors = new Google_Service_Bigquery_TableDataInsertAllResponseInsertErrors();
        $errors = @$resp->getInsertErrors();
        if (!empty($errors)) {
            $error_msg = "\r\nRequest Headers: \r\n" . json_encode($client->request->getRequestHeaders()) . "\r\nResponse Headers: \r\n" . json_encode($client->request->getResponseHeaders()) . "\r\nRequest Body:\r\n" . $client->request->getPostBody() . "\r\nResponse Body:\r\n" . $client->request->getResponseBody() . "\r\n";
            if (is_array($errors)) {
                foreach ($errors as $eP) {
                    $arr = $eP->getErrors();
                    $line = $eP->getIndex();
                    if (is_array($arr)) {
                        foreach ($arr as $e) {
                            switch ($e->getReason()) {
                                case "stopped":
                                    break;
                                case "timeout":
                                    $failed_lines[] = $line;
                                    $last_reason = $e->getReason();
                                    $error_msg.= sprintf("Timeout on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
                                    break;
                                default:
                                    $error_msg.= sprintf("Error on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
                                    break;
                            }
                        }
                    } else {
                        $error_msg.= json_encode($arr) . "\r\n";
                    }
                }
                $this->setErrorMessage($error_msg);
            } else {
                $this->setErrorMessage($errors);
            }
            //print_r($errors);
            //exit;
            $success = false;
        }
        return $ret;
    } catch (Google_Service_Exception $e) {
        $this->setErrors($e->getErrors())->setErrorMessage($e->getMessage());
        throw $e;
    }
}