Apache Beam - 将 BigQuery TableRow 写入 Cassandra
Apache Beam - Write BigQuery TableRow to Cassandra
我正在尝试从 BigQuery 读取数据(使用 TableRow)并将输出写入 Cassandra。怎么做?
这是我试过的方法。这有效:
/* Read BQ */
PCollection<CxCpmMapProfile> data = p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>() {
public CxCpmMapProfile apply(SchemaAndRecord record) {
GenericRecord r = record.getRecord();
return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
}
}).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());
/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
.withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
.withPort(9042)
.withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
.withEntity(CxCpmMapProfile.class));
但是当我像这样使用 TableRow 更改 Read BQ 部分时:
/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
.usingStandardSql().withoutValidation());
在Write to Cassandra中出现如下错误
The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)
错误是由于输入 PCollection 包含 TableRow
个元素,而 CassandraIO 读取需要 CxCpmMacProfile
个元素。您需要将 BigQuery 中的元素读取为 CxCpmMacProfile
元素。 BigQueryIO documentation has an example of reading rows from a table and parsing them into a custom type, done through the read(SerializableFunction)
方法。
我正在尝试从 BigQuery 读取数据(使用 TableRow)并将输出写入 Cassandra。怎么做?
这是我试过的方法。这有效:
/* Read BQ */
PCollection<CxCpmMapProfile> data = p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>() {
public CxCpmMapProfile apply(SchemaAndRecord record) {
GenericRecord r = record.getRecord();
return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
}
}).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());
/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
.withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
.withPort(9042)
.withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
.withEntity(CxCpmMapProfile.class));
但是当我像这样使用 TableRow 更改 Read BQ 部分时:
/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
.usingStandardSql().withoutValidation());
在Write to Cassandra中出现如下错误
The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)
错误是由于输入 PCollection 包含 TableRow
个元素,而 CassandraIO 读取需要 CxCpmMacProfile
个元素。您需要将 BigQuery 中的元素读取为 CxCpmMacProfile
元素。 BigQueryIO documentation has an example of reading rows from a table and parsing them into a custom type, done through the read(SerializableFunction)
方法。