Bigquery kafka 接收器连接不接受布尔值

Bigquery kafka sink connect not accepting boolean values

Schemaless Bigquery kafka sink connector SMT 无法将数据保存到 bigquery on boolean。

MapsUtil.debugPrint 在从 apply(R record).

返回之前的 recordValue
active = true java.lang.String

架构定义

  {
    "mode": "NULLABLE",
    "name": "active",
    "type": "BOOLEAN"
  }

解串器

public class BooleanDeserialiser extends JsonDeserializer<Boolean> {

@Override
public Boolean deserialize(JsonParser parser, DeserializationContext context)
        throws IOException {
    return !"0".equals(parser.getText());
}

序列化程序

public class BooleanSerialiser extends JsonSerializer<Boolean> {

@Override
public void serialize(Boolean value, JsonGenerator gen, SerializerProvider serializers)
        throws IOException {
    gen.writeString(value ? "true" : "false");
}

错误

    [row index 76]: invalid: Cannot convert string value to boolean: 1
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我将此保留为社区 Wiki 回复,以提高社区对此类问题的可见度。正如您对 connector devs, your request may need a feature implemented on bigquery that needs to be requested to by opening a BigQuery Feature Request 提出的问题所述。这将使此类请求成为开发人员关注的焦点,以便予以考虑。

值得一提的是,你可以在项目issue dashboard.

上看到confluentinc kafka-connect-bigquery的issues/bugs列表