Transformer 在 pyspark.ml 中的多个特征上运行
Transformer operating on multiple features in pyspark.ml
我想在 DataFrame
中创建自己的特征转换器,因此我添加了一个列,例如,其他两个列之间的差异。我跟着 ,但那里的变压器只在一列上运行。 pyspark.ml.Transformer
将字符串作为 inputCol
的参数,所以我当然不能指定多个列。
所以基本上,我想要实现的是一种类似于此方法的 _transform()
方法:
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset.select([self.getInputCol()])
# Define transformer logic
def f(col1, col2):
return col1 - col2
t = IntegerType()
return dataset.withColumn(out_col, udf(f, t)(in_col))
这怎么可能?
我设法解决了这个问题,方法是首先从我要操作的特征集中创建一个 Vector
,然后对新生成的矢量特征应用变换。下面是一个示例代码,说明如何制作一个不同于其他两个功能的新功能:
class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(MeasurementDifferenceTransformer, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
# Define transformer logic
def f(vector):
return float(vector[0] - vector[1])
t = FloatType()
return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col))
要使用它,我们首先实例化一个 VectorAssembler
来创建一个向量特征:
pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector")
然后我们实例化转换器:
pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2")
最后我们转换数据:
pairfeats = pair_assembler.transform(df)
difffeats = pait_transformer.transform(pairfeats)
您无需为了对多列进行操作而经历所有这些麻烦。这是使用 HasInputCols(而不是 HasInputCol)的更好方法
class MeasurementDifferenceTransformer(Transformer, HasInputCols, HasOutputCol):
@keyword_only
def __init__(self, inputCols=None, outputCol=None):
super(MeasurementDifferenceTransformer, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCols=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = self.getInputCols()
# Define transformer logic
def f(col1, col2):
return float(col1-col2)
t = FloatType()
return dataset.withColumn(out_col, udf(lambda f, t)(*in_col))
我想在 DataFrame
中创建自己的特征转换器,因此我添加了一个列,例如,其他两个列之间的差异。我跟着 pyspark.ml.Transformer
将字符串作为 inputCol
的参数,所以我当然不能指定多个列。
所以基本上,我想要实现的是一种类似于此方法的 _transform()
方法:
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset.select([self.getInputCol()])
# Define transformer logic
def f(col1, col2):
return col1 - col2
t = IntegerType()
return dataset.withColumn(out_col, udf(f, t)(in_col))
这怎么可能?
我设法解决了这个问题,方法是首先从我要操作的特征集中创建一个 Vector
,然后对新生成的矢量特征应用变换。下面是一个示例代码,说明如何制作一个不同于其他两个功能的新功能:
class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(MeasurementDifferenceTransformer, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
# Define transformer logic
def f(vector):
return float(vector[0] - vector[1])
t = FloatType()
return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col))
要使用它,我们首先实例化一个 VectorAssembler
来创建一个向量特征:
pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector")
然后我们实例化转换器:
pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2")
最后我们转换数据:
pairfeats = pair_assembler.transform(df)
difffeats = pait_transformer.transform(pairfeats)
您无需为了对多列进行操作而经历所有这些麻烦。这是使用 HasInputCols(而不是 HasInputCol)的更好方法
class MeasurementDifferenceTransformer(Transformer, HasInputCols, HasOutputCol):
@keyword_only
def __init__(self, inputCols=None, outputCol=None):
super(MeasurementDifferenceTransformer, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCols=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = self.getInputCols()
# Define transformer logic
def f(col1, col2):
return float(col1-col2)
t = FloatType()
return dataset.withColumn(out_col, udf(lambda f, t)(*in_col))