Cycle.js - 驱动程序 - PhoenixJS (Websockets)

Cycle.js - Driver - PhoenixJS (Websockets)

我们目前有一个 VueJS 应用程序,我正在考虑将其迁移到 Cycle.js(第一个主要项目)。

我在 Cycle.JS 中了解到我们有针对驱动程序的 SI 和 SO(使用 adapt());自然地,WebSocket 实现适合这种情况,因为它具有读写效果。

我们使用 Phoenix (Elixir) 作为后端,使用 Channels 进行软实时通信。我们的客户端 WS 库在这里是 Phoenixhttps://www.npmjs.com/package/phoenix.

如果您知道如何连接,Cycle.js.org 上的 example 是完美的选择。

在我们的例子中,我们使用 REST 端点进行身份验证,该端点 return 是一个令牌 (JWT),用于初始化 WebSocket(令牌参数)。此令牌不能简单地传递到驱动程序中,因为驱动程序在 Cycle.js 应用程序运行时被初始化。

我们现在(在我们的 VueJS 应用程序中)的示例(不是实际代码):

// Code ommited for brevity 
socketHandler = new vueInstance.$phoenix.Socket(FQDN, {
    _token: token
});
socketHandler.onOpen(() => VueBus.$emit('SOCKET_OPEN'));

//...... Vue component (example)
VueBus.$on('SOCKET_OPEN', function () {
    let chan = VueStore.socketHandler.channel('PRIV_CHANNEL', {
        _token: token
    });

    chan.join()
        .receive('ok', () => {
            //... code
        })
})

以上是一个示例,我们有一个 Vuex 存储全局状态(套接字等),集中式消息总线(Vue 应用程序)用于组件之间的通信以及来自实例化 Phoenix Socket 的通道设置.

我们的频道设置依赖于经过身份验证的 Socket 连接,它本身需要身份验证才能加入该特定频道。

问题是,Cycle.js 有可能吗?

  1. 使用来自 REST 调用(JWT 令牌响应)的令牌参数初始化 WebSocket 连接 - 我们已经部分实现了这个
  2. 基于该套接字和令牌创建通道(通道流出驱动程序?
  3. 访问多个频道流(我假设它可能像 sources.HTTP.select(CATEGORY))

我们这里有一个 1:N 依赖关系,我不确定驱动程序是否可行。

提前谢谢你,

更新@ 17/12/2018

基本上我要模仿的是以下内容(来自 Cycle.js.org):

驱动程序接收一个接收器,以执行写入效果(在特定通道上发送消息),但也可能 return 一个源;这意味着有两个异步流?这意味着在运行时创建套接字可能会导致一个流在实例化之前访问 "socket";请参阅下面代码段中的评论。

import {adapt} from '@cycle/run/lib/adapt';

function makeSockDriver(peerId) {
  // This socket may be created at an unknown period
  //let socket = new Sock(peerId);
  let socket = undefined;

  // Sending is perfect
  function sockDriver(sink$) {
    sink$.addListener({
      next: listener => {

        sink$.addListener({
                next: ({ channel, data }) => {
                    if(channel === 'OPEN_SOCKET' && socket === null) {
                        token = data;

                        // Initialising the socket
                        socket = new phoenix.Socket(FQDN, { token });
                        socketHandler.onOpen(() => listener.next({
                            channel: 'SOCKET_OPEN'
                        }));
                    } else {
                        if(channels[channel] === undefined) {
                            channels[channel] = new Channel(channel, { token });
                        }
                        channels[channel].join()
                            .receive('ok', () => {
                                sendData(data);
                            });
                    }
                }
            });
      },
      error: () => {},
      complete: () => {},
    });

    const source$ = xs.create({
      start: listener => {
        sock.onReceive(function (msg) {
            // There is no guarantee that "socket" is defined here, as this may fire before the socket is actually created 
            socket.on('some_event'); // undefined

            // This works however because a call has been placed back onto the browser stack which probably gives the other blocking thread chance to write to the local stack variable "socket". But this is far from ideal
            setTimeout(() => socket.on('some_event'));
        });
      },
      stop: () => {},
    });

    return adapt(source$);
  }

  return sockDriver;
}

Jan van Brügge,您提供的解决方案非常完美(谢谢),只是我在响应部分遇到了问题。请看上面的例子。

例如,我想要实现的是这样的:

// login component
return {
    DOM: ...
    WS: xs.of({
        channel: "OPEN_CHANNEL",
        data: {
            _token: 'Bearer 123'
        }
    })
}

//////////////////////////////////////
// Some authenticated component

// Intent
const intent$ = sources.WS.select(CHANNEL_NAME).startWith(null)

// Model
const model$ = intent$.map(resp => {
    if (resp.some_response !== undefined) {
        return {...}; // some model
    }
    return resp;
})

return {
    DOM: model$.map(resp => {
        // Use response from websocket to create UI of some sort
    })
}

首先,是的,这可以通过驱动程序实现,我的建议会产生一个感觉非常像 HTTP 驱动程序的驱动程序。

首先要有一些粗略的伪代码,我可以在其中解释所有内容,我可能误解了你问题的某些部分,所以这可能是错误的。

interface WebsocketMessage {
    channel: string;
    data: any;
}

function makeWebSocketDriver() {
    let socket = null;
    let token = null;
    let channels = {}
    return function websocketDriver(sink$: Stream<WebsocketMessage> {
        return xs.create({
            start: listener => {
                sink$.addListener({
                    next: ({ channel, data }) => {
                        if(channel === 'OPEN_SOCKET' && socket === null) {
                            token = data;
                            socket = new phoenix.Socket(FQDN, { token });
                            socketHandler.onOpen(() => listener.next({
                                channel: 'SOCKET_OPEN'
                            }));
                        } else {
                            if(channels[channel] === undefined) {
                                channels[channel] = new Channel(channel, { token });
                            }
                            channels[channel].join()
                                .receive('ok', () => {
                                    sendData(data);
                                });
                        }
                    }
                });
            }
        });
    };
}

这就是这种驱动程序的粗略结构。您会看到它等待带有令牌的消息,然后打开套接字。它还会根据消息的类别跟踪开放频道和 sends/receives。这种方法只需要所有通道都有唯一的名称,我不确定你的通道协议在这方面是如何工作的或者你特别想要什么。

我希望这足以帮助您入门,如果您弄清楚通道 send/receive 和套接字的 API,我可能会提供更多帮助。也随时欢迎您在我们的 gitter channel

中提问