Memsql::Streamliner Python 转换
Memsql::Streamliner Python Transform
我在 Memsql::Streamliner::Transform (Python) 实用程序中工作。必须覆盖一个转换方法以提供自定义转换功能。
def transform(self, sql_context, dataframe, logger):
dataframe.column[0]
是字节数组(JSON 字符串)。
如何将字节数组转换为具有命名列的 DataFrame?
目标:访问转换后的 DataFrame 中的各个列。
您可以使用 dataframe.rdd
访问底层 rdd 并映射它以将每个字节字符串转换为包含您的列的列表。您可以通过将列列表作为 createDataframe
.
的第二个参数将生成的 rdd 转换回具有命名列的数据框
像下面这样的东西应该可以工作:
def parse(row):
bytestring = row[0]
json_data = convert_bytes_and_parse_json(bytestring)
return [ json_data["mycolumn1"], json_data["mycolumn2"] ]
parsedRDD = dataframe.rdd.map(parse)
parsedDf = sql_context.createDataframe(parsedRDD, ["mycolumn1", "mycolumn2"])
# now you can access columns by name
parsedDf.select(parsedDf["mycolumn1"])
我在 Memsql::Streamliner::Transform (Python) 实用程序中工作。必须覆盖一个转换方法以提供自定义转换功能。
def transform(self, sql_context, dataframe, logger):
dataframe.column[0]
是字节数组(JSON 字符串)。
如何将字节数组转换为具有命名列的 DataFrame?
目标:访问转换后的 DataFrame 中的各个列。
您可以使用 dataframe.rdd
访问底层 rdd 并映射它以将每个字节字符串转换为包含您的列的列表。您可以通过将列列表作为 createDataframe
.
像下面这样的东西应该可以工作:
def parse(row):
bytestring = row[0]
json_data = convert_bytes_and_parse_json(bytestring)
return [ json_data["mycolumn1"], json_data["mycolumn2"] ]
parsedRDD = dataframe.rdd.map(parse)
parsedDf = sql_context.createDataframe(parsedRDD, ["mycolumn1", "mycolumn2"])
# now you can access columns by name
parsedDf.select(parsedDf["mycolumn1"])