如何在 Jython Evaluator 中获取 StreamSets 记录字段类型
How to get StreamSets Record Fields Type inside Jython Evaluator
我有一个 StreamSets 管道,我使用 JDBC 组件作为源从远程 SQL 服务器数据库读取数据,并将数据放入 Hive 和 Kudu 数据湖。
我在使用 Binary Columns 类型时遇到了一些问题,因为 Impala 中不支持 Binary 类型,我用它来访问 Hive 和 Kudu。
我决定将二进制类型的列(在管道中作为 Byte_Array 类型流动)转换为字符串并像那样插入。
我尝试使用字段类型转换器元素将所有 Byte_Array 类型转换为字符串,但没有成功。所以我使用了一个 Jython 组件将所有 arr.arr 类型转换为 String。它工作正常,直到我在该字段上获得 Null 值,因此 Jython 类型为 None.type 并且我无法检测到 Byte_Array 类型并且无法将其转换为 String。所以我无法将它插入到 Kudu 中。
任何有关如何在 Jython Evaluator 中获取 StreamSets 记录字段类型的帮助?或者有任何解决我面临的问题的建议吗?
您需要使用sdcFunctions.getFieldNull()
来测试该字段是否为NULL_BYTE_ARRAY
。例如:
import array
def convert(item):
return ':-)'
def is_byte_array(record, k, v):
# getFieldNull expect a field path, so we need to prepend the '/'
return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY
or (type(v) == array.array and v.typecode == 'b'))
for record in records:
try:
record.value = {k: convert(v) if is_byte_array(record, k, v) else v
for k, v in record.value.items()}
output.write(record)
except Exception as e:
error.write(record, str(e))
所以这是我的最终解决方案:
您可以使用以下逻辑通过使用 NULL_CONSTANTS:
来检测 Jython 组件内的任何 StreamSet 类型
NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG,
NULL_FLOAT, NULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL,
NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP
思路是将字段的值保存在一个临时变量中,设置字段的值为None并使用函数sdcFunctions.getFieldNull 通过将其与 NULL_CONSTANTS.
之一进行比较来了解 StreamSets 类型
import binascii
def toByteArrayToHexString(value):
if value is None:
return NULL_STRING
value = '0x'+binascii.hexlify(value).upper()
return value
for record in records:
try:
for colName,value in record.value.items():
temp = record.value[colName]
record.value[colName] = None
if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_BYTE_ARRAY:
temp = toByteArrayToHexString(temp)
record.value[colName] = temp
output.write(record)
except Exception as e
error.write(record, str(e))
限制:
上面的代码只有当它有值时(当它不为NULL时)才会将Date类型转换为Datetime类型
我有一个 StreamSets 管道,我使用 JDBC 组件作为源从远程 SQL 服务器数据库读取数据,并将数据放入 Hive 和 Kudu 数据湖。
我在使用 Binary Columns 类型时遇到了一些问题,因为 Impala 中不支持 Binary 类型,我用它来访问 Hive 和 Kudu。
我决定将二进制类型的列(在管道中作为 Byte_Array 类型流动)转换为字符串并像那样插入。
我尝试使用字段类型转换器元素将所有 Byte_Array 类型转换为字符串,但没有成功。所以我使用了一个 Jython 组件将所有 arr.arr 类型转换为 String。它工作正常,直到我在该字段上获得 Null 值,因此 Jython 类型为 None.type 并且我无法检测到 Byte_Array 类型并且无法将其转换为 String。所以我无法将它插入到 Kudu 中。
任何有关如何在 Jython Evaluator 中获取 StreamSets 记录字段类型的帮助?或者有任何解决我面临的问题的建议吗?
您需要使用sdcFunctions.getFieldNull()
来测试该字段是否为NULL_BYTE_ARRAY
。例如:
import array
def convert(item):
return ':-)'
def is_byte_array(record, k, v):
# getFieldNull expect a field path, so we need to prepend the '/'
return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY
or (type(v) == array.array and v.typecode == 'b'))
for record in records:
try:
record.value = {k: convert(v) if is_byte_array(record, k, v) else v
for k, v in record.value.items()}
output.write(record)
except Exception as e:
error.write(record, str(e))
所以这是我的最终解决方案:
您可以使用以下逻辑通过使用 NULL_CONSTANTS:
来检测 Jython 组件内的任何 StreamSet 类型NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG, NULL_FLOAT, NULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL, NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP
思路是将字段的值保存在一个临时变量中,设置字段的值为None并使用函数sdcFunctions.getFieldNull 通过将其与 NULL_CONSTANTS.
之一进行比较来了解 StreamSets 类型import binascii def toByteArrayToHexString(value): if value is None: return NULL_STRING value = '0x'+binascii.hexlify(value).upper() return value for record in records: try: for colName,value in record.value.items(): temp = record.value[colName] record.value[colName] = None if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_BYTE_ARRAY: temp = toByteArrayToHexString(temp) record.value[colName] = temp output.write(record) except Exception as e error.write(record, str(e))
限制: 上面的代码只有当它有值时(当它不为NULL时)才会将Date类型转换为Datetime类型