如何将 Ramda Pipe 函数与承诺和静态回调混合使用?

How to use Ramda Pipe function with a mix of promises and static callbacks?

基于 我已经能够为初始数据对象创建一个解析基于静态和承诺的回调的函数。

现在我希望能够通过一系列回调来传输此数据对象,但是 运行 一旦我将多个承诺添加到组合中就会遇到麻烦。

当前设置

// Libaries
const R = require('ramda');
const fetch = require('node-fetch');
const Promise = require('bluebird');

// Input
const data = {
  array: [['#', 'FirstName', 'LastName'], ['1', 'tim', 'foo'], ['2', 'kim', 'bar']],
  header: 'FirstName',
  more: 'stuff',
  goes: 'here'
};

// Static and Promise Resolver (with Helper Function)
const transposeObj = (obj, len = Object.values(obj)[0].length) =>
  [...Array(len)].map((_, i) => Object.entries(obj).reduce((a, [k, v]) => ({ ...a, [k]: v[i] }), {}));

const mergeCallback = async ({ array: [headers, ...rows], header, ...rest }, callback) => {
  const index = R.indexOf(header, headers);
  const result = await Promise.map(rows, row => {
    return callback(row[index]);
  })
    .then(x => ({ changes: x.map(v => transposeObj(v.changes)) }))
    .then(({ changes }) => ({
      allHeaders: R.flatten([
        ...headers,
        R.chain(t => R.chain(Object.keys, t), [...changes])
          .filter(k => !headers.includes(k))
          .filter((x, i, a) => a.indexOf(x) == i)
      ]),
      changes
    }))
    .then(({ changes, allHeaders }) => ({
      resultRows: R.chain(
        (row, i = R.indexOf(row, [...rows])) =>
          changes[i].map(change =>
            Object.entries(change).reduce(
              (r, [k, v]) => [...r.slice(0, allHeaders.indexOf(k)), v, ...r.slice(allHeaders.indexOf(k) + 1)],
              row.slice(0)
            )
          ),
        [...rows]
      ),
      allHeaders
    }))
    .then(({ resultRows, allHeaders, array }) => ({
      array: [allHeaders, ...resultRows],
      header,
      ...rest
    }));
  return result;
};

// Example Callbacks and their services
const adapterPromise1 = async name => {
  const response = await fetch(`https://api.abalin.net/get/getdate?name=${name}&calendar=us`).then(res => res.json());
  return {
    changes: {
      nameday: R.pluck('day', response.results),
      namemonth: R.pluck('month', response.results)
    }
  };
};
const servicePromise1 = input => mergeCallback(input, adapterPromise1);

const adapterPromise2 = async name => {
  const response = await fetch(`https://api.genderize.io?name=${name}`).then(res => res.json());
  return {
    changes: {
      gender: R.of(response.gender)
    }
  };
};
const servicePromise2 = input => mergeCallback(input, adapterPromise2);

const adapterStatic1 = name => ({ changes: { NameLength: R.of(R.length(name)) } });
const serviceStatic1 = input => mergeCallback(input, adapterStatic1);

管道尝试

const result = R.pipe(
  servicePromise1,
  servicePromise2,
  serviceStatic1
)(data);

// console.log(result); <<< preferred resolution method, but not working due to promise
result.then(console.log);

预期结果

{ array:  
   [ [ '#', 
       'FirstName', 
       'LastName', 
       'nameday', 
       'namemonth', 
       'gender', 
       'NameLength' ], 
     [ '1', 'tim', 'foo', 24, 1, 'male', 3 ], 
     [ '1', 'tim', 'foo', 20, 6, 'male', 3 ], 
     [ '2', 'kim', 'bar', 8, 9, 'male', 3 ], 
     [ '2', 'kim', 'bar', 11, 10, 'male', 3 ] ], 
  header: 'FirstName', 
  more: 'stuff', 
  goes: 'here' } 

当前结果

Pipe 适用于任何一个服务调用,但一旦我尝试使用两个或多个服务,我就会收到以下错误。

Cannot read property 'Symbol(Symbol.iterator)' of undefined 

任何关于如何让它工作的提示都将不胜感激。

Ramda 的 pipe 不是 Promise-aware。旧的 Promise-aware 版本 pipeP 已被弃用,取而代之的是更通用的 pipeWith. You can use it with Promises by passing R.then(即将重命名为 R.andThen),如下所示:

R.pipeWith (R.then, [
//servicePromise1, // problem with CORS headers here.
  servicePromise2,
  serviceStatic1
]) (data)
.then (console .log)

出于某种原因,当我尝试从 Ramda 的 REPL 或 SO 片段中 运行 时,您的第一个 API 调用是 运行 为我解决 CORS 问题,但该过程应该没有它就清楚了。

这可能足以解决您的问题。它适用于此 test-case。但是我看到一个突出的问题:pipe 的所有版本都将上一个调用的结果传递给下一个。但是,您使用 属性 数据来配置有关如何触发下一个回调的内容,即您的 header 属性。因此,这必须在整个管道中保持固定。如果所有调用都将使用 FirstName 属性,那很好,但我的印象是他们需要自己的版本。

但是编写一个自定义管道函数,让您将其与回调函数一起传递就足够容易了。那么您的调用可能如下所示:

seq ([
  ['FirstName', servicePromise2],
  ['FirstName', serviceStatic1]
]) (data)
.then(console.log)

您可以在这段代码中看到该想法的工作版本:

// Input
const data = {
  array: [['#', 'FirstName', 'LastName'], ['1', 'tim', 'foo'], ['2', 'kim', 'bar']],
  header: 'FirstName',
  more: 'stuff',
  goes: 'here'
};

// Static and Promise Resolver (with Helper Function)
const transposeObj = (obj, len = Object.values(obj)[0].length) =>
  [...Array(len)].map((_, i) => Object.entries(obj).reduce((a, [k, v]) => ({ ...a, [k]: v[i] }), {}));

const mergeCallback = async ({ array: [headers, ...rows], header, ...rest }, callback) => {
  const index = R.indexOf(header, headers);
  const result = await Promise.all(rows.map(row => {
    return callback(row[index]);
  }))
    .then(x => ({ changes: x.map(v => transposeObj(v.changes)) }))
    .then(({ changes }) => ({
      allHeaders: R.flatten([
        ...headers,
        R.chain(t => R.chain(Object.keys, t), [...changes])
          .filter(k => !headers.includes(k))
          .filter((x, i, a) => a.indexOf(x) == i)
      ]),
      changes
    }))
    .then(({ changes, allHeaders }) => ({
      resultRows: R.chain(
        (row, i = R.indexOf(row, [...rows])) =>
          changes[i].map(change =>
            Object.entries(change).reduce(
              (r, [k, v]) => [...r.slice(0, allHeaders.indexOf(k)), v, ...r.slice(allHeaders.indexOf(k) + 1)],
              row.slice(0)
            )
          ),
        [...rows]
      ),
      allHeaders
    }))
    .then(({ resultRows, allHeaders, array }) => ({
      array: [allHeaders, ...resultRows],
      header,
      ...rest
    }));
  return result;
};

// Example Callbacks and their services
const adapterPromise2 = async (name) => {
  const response = await fetch(`https://api.genderize.io?name=${name}`).then(res => res.json());
  return {
    changes: {
      gender: R.of(response.gender)
    }
  };
};
const servicePromise2 = input => mergeCallback(input, adapterPromise2);

const adapterStatic1 = name => ({ changes: { NameLength: R.of(R.length(name)) } });
const serviceStatic1 = input => mergeCallback(input, adapterStatic1);

const seq = (configs) => (data) =>
  configs.reduce(
    (pr, [header, callback]) => pr.then(data => callback({...data, header})),
    Promise.resolve(data)
  )

seq ([
  ['FirstName',  servicePromise2],
  ['FirstName', serviceStatic1]
]) (data)
.then(console.log)
<script src="//cdnjs.cloudflare.com/ajax/libs/ramda/0.26.1/ramda.js"></script>

不过,我仍然认为这有些尴尬。你给我找的 header 名字根本不属于那个输入数据。你可以把它变成你的 mergeCallback 函数的另一个 属性 并更新你的包装器以从那里传递它,比如

const servicePromise2 = (input) => mergeCallback(input, 'FirstName', adapterPromise2);

在我看来更好的是,即使我知道它会为您现有的回调函数增加一些工作,也就是将整行传递给回调函数,结构为 object 所有 headers 作为属性。 Ramda 的 zipObj 可以这样使用:

  const result = await Promise.all(rows.map(row => {
    return callback(zipObj(headers, row));
  }))

像这样传递给每个回调 object:

{"#":"1", FirstName: "tim", LastName: "foo" /*, gender: 'male', ... */}

您可以将回调的签名更改为

const adapterPromise2 = async ({FirstName: name}) => { ...use `name` ... }

并保持 body 不变,或者只需将变量名称更改为 FirstName 以匹配 object.

const adapterPromise2 = async ({FirstName}) => { ...use `FirstName`... }

无论哪种方式,这都会使通用代码更简单,删除在当前 API 中感觉很尴尬的 header 属性,而无需显着更改现有回调。