以多种方式发送数据,具体取决于您希望如何发送

Send data in multiple ways depending on how you want to send it

我有一堆键和值,我想通过将它们打包到一个字节数组中来发送到我们的消息队列。我将把所有键和值组成一个字节数组,它们应该总是小于 50K,然后发送到我们的消息队列。

数据包class:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

下面是我发送数据的方式。截至目前,我的设计仅允许通过调用上述 sendData() 方法中的 sendToQueueAsync 方法来异步发送数据。

  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

现在我需要扩展我的设计,以便我可以用三种不同的方式发送数据。由用户决定他想要发送数据的方式,"sync" 或 "async".

发送记录class:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

调用者只会调用以下三种方法之一:

我应该如何设计我的 PacketSendRecord class 以便我可以告诉 Packet class 这些数据需要发送以上三种方式到我的消息队列。由用户决定他想以哪种方式将数据发送到消息队列。截至目前,我的 Packet class 的结构方式,它只能以一种方式发送数据。

我没有在 Packet 中看到 sender 的定义。我假设它被定义为私有实例变量?

设计确实需要修改。 通过让 Packet class 进行发送,设计违反了 Single responsibility principle。应该有一个单独的(可能是抽象的)class准备要发送的数据(准备一个java.nio.Buffer实例)并且它可以有一个或多个子classes,其中一个returns一个java.nio.ByteBuffer实例。

一个单独的 class 获取 Buffer 并执行发送。这个(可能是抽象的)class 可以有针对不同发送平台和方法的子 classes。

然后,您需要另一个 class 来实现 Builder pattern。希望发送数据包的客户端使用构建器指定具体的 PacketSender(可能还有其他需要的属性,如套接字编号),然后调用 send() 进行发送。

我认为您最好的选择是策略模式 (https://en.wikipedia.org/wiki/Strategy_pattern)。

使用此模式,您可以封装每个类型的行为 "send",例如 AsynchronousSend class、SynchronousSend class 和 AsynchronousSocketSend class . (您可能会想出更好的名字)。然后 Packet class 可以根据一些逻辑决定使用哪个 class 将数据发送到队列。

首先,您需要清楚地回答谁(或您的代码的哪一部分)负责决定使用哪种发送方法的问题。

  • 是否基于某些外部配置?
  • 是否基于某些(动态)用户决定?
  • 是否基于正在处理的分区?
  • 是否根据消息内容?

(仅列举几种可能性)

答案将决定最合适的结构。

然而,很明显当前的sendData()方法是使决定生效的地方。因此,需要提供此方法的实现才能使用。实际的 send() 可能在所有情况下都是相似的。它建议将 sending 功能封装到一个提供 send() 方法签名的接口中:

send(address, data);

如果要根据实际消息数据确定目标套接字,那么您可能更喜欢

的通用签名
send(address, data, socket);

并使该套接字值可选或使用特定值编码 "no specific socket" 个案例。否则,您可以使用特定的 Sender 实例,该实例具有通过构造函数传入的套接字。

我目前没有从您提供的内容中看到一个有效的理由,即要求将三种不同的发送方法实现为一个 class 中的三种不同方法。如果公共代码是一个原因,那么使用公共基础 class 将允许适当的共享。

这留下了一个问题,即如何在 sendData().

中提供适当 Sender 实施的特定实例

如果要在 sendData() 之外确定发送策略,则必须提交实施。作为参数或作为当前 class 实例中的字段。如果本地数据决定了发送策略,您应该将正确实施的确定委托给选择class,这将return正确实施。然后调用将类似于:

startegySelector.selectStartegy(selectionParameters).send(address,data);

虽然,如果没有更清楚地了解执行过程中什么是固定的,什么是可变的,很难提出最佳方法

如果决策是基于数据的,整个选择和转移过程是本地的 Packet class。

如果决定是在 Packet 外部做出的,您可能希望在该位置获得发送策略实施并将其作为参数传递给 addAndSendJunked()(或更准确地说传递给 sendData().

你可以有一个枚举 class,比如 PacketTransportionMode,它会为不同类型的枚举值(SYNC、ASYNC、SYNC_ON_SOCKET)覆盖一个 'send' 方法,例如:。

public enum PacketTransportionMode {
SYNC {
    @Override
    public boolean send(Packet packet) {
        byte[] message = packet.getMessage();
        Socket socket = new Socket(packet.getReceiverHost(), packet.getReceiverPort());
        DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
        dOut.writeInt(message.length); // write length of the message
        dOut.write(message);           // write the message
        return true;
    }
},
ASYNC {
    @Override
    public boolean send(Packet packet) {
        // TODO Auto-generated method stub
        return false;
    }
},
SYNC_ON_SOCKET

{
    @Override
    public boolean send(Packet packet) {
        // TODO Auto-generated method stub
        return false;
    }

};
public abstract boolean send(Packet packet);
}

另外,在数据包class中,引入transportationMode变量。在packet.send()实现中,可以调用this.packetTransportationMode.send(this)

客户端可以创建数据包对象并在开始时设置它的transportationMode,类似于设置RecordPartition。然后客户端可以调用 packet.send();

或者不是将 transportationMode 变量放在 packet class 中并调用 this.packetTransportationMode.send(this),客户端也可以创建 Packet 对象并直接调用 PacketTransportionMode.SYNC.send(packet)。

使用枚举变量来定义发送消息的类型

public enum TypeToSend {
    async, sync, socket 
}

public final class Packet implements Closeable {
TypeToSend typeToSend;
public Packet(TypeToSend typeToSend) {
        this.typeToSend = typeToSend;
    }
switch(typeToSend){
     case async:{}
     case sync:{}
     case socket:{}
}
}

策略。与 Kerri Brown 的回答不同的是,Packet 不应该在策略之间做出决定。相反,在数据包 class.

之外决定它

单个发送策略接口应该由 3 个不同的 classes 实现,每个对应于提到的发送方法之一。在Packet中注入策略接口,让Packet不管处理什么策略都不需要改变

你说一定要根据用户的选择。所以你可以先询问用户,选择是什么,然后基于此,实例化一个对应于用户选择的发送策略接口的实现。然后,用选择的发送策略实例实例化Packet。

如果你觉得以后的选择可能不取决于用户,那就把它做成工厂。那么你的解决方案就变成了工厂和策略的组合。

那样的话,Packet可以注入Factory接口。 Packet 要求 Factory 给它发送策略。接下来它使用从工厂获取的策略发送。工厂要求用户输入,稍后可以根据其他条件而不是用户输入进行选择来替换。您可以通过在未来以不同方式实现工厂接口并注入新工厂而不是这个工厂来实现这一点(即基于用户输入的工厂与其他一些基于条件的工厂)。

这两种方法都会为您提供遵循 Open/Close 原则的代码。但如果您真的不需要工厂,请尽量不要过度设计。