redis.clients.jedis.exceptions.JedisDataException: 调用此方法前请关闭管道或多块

redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method

我想要页面 zrange,在调用此方法之前获取 error:Please 关闭管道或多块。,如何解决这个问题(我的集群不支持多命令 https://github.com/CodisLabs/codis/blob/master/doc/unsupported_cmds.md)?

runWithPipeline(new JedisPipelinedCallback() {
  @Override
  public void execute(Pipeline pipeline) {
    int offset = 0;
    boolean finished = false;

    do {
      // need to paginate the keys
      Set<byte[]> rawKeys = pipeline.zrange(rawKnownKeysKey, (offset) * PAGE_SIZE, (offset + 1) * PAGE_SIZE - 1).get();
      finished = rawKeys.size() < PAGE_SIZE;
      offset++;
      if (!rawKeys.isEmpty()) {
        List<byte[]> regionedKeys = new ArrayList<byte[]>();
        for (byte[] rawKey : rawKeys) {
          regionedKeys.add(getRegionedKey(rawRegion, rawKey));
        }

        pipeline.del(regionedKeys.toArray(new byte[regionedKeys.size()][]));
      }
      pipeline.sync();
    } while (!finished);

    pipeline.del(rawKnownKeysKey);
  }
});

// 创建 {@link redis.clients.jedis.JedisPool} 实例。

public static JedisPool createJedisPool(Properties props) {

    String host = props.getProperty(RedisConfig.HOST, "localhost");
    Integer port = Integer.decode(props.getProperty(RedisConfig.PORT, String.valueOf(Protocol.DEFAULT_PORT)));
    Integer timeout = Integer.decode(props.getProperty(RedisConfig.TIMEOUT, String.valueOf(Protocol.DEFAULT_TIMEOUT))); // msec
    String password = props.getProperty(RedisConfig.PASSWORD, null);
    Integer database = Integer.decode(props.getProperty(RedisConfig.DATABASE, String.valueOf(Protocol.DEFAULT_DATABASE)));

    log.info("create JedisPool. host=[{}], port=[{}], timeout=[{}], password=[{}], database=[{}]",
             host, port, timeout, password, database);

    return new JedisPool(createJedisPoolConfig(), host, port, timeout, password, database);
}

// 创建 {@link redis.clients.jedis.JedisPoolConfig} 实例。

private static JedisPoolConfig createJedisPoolConfig() {
    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.setMaxTotal(256);
    poolConfig.setMinIdle(2);
    return poolConfig;
}


Jedis jedis = jedisPool.getResource();


private void runWithPipeline(final JedisPipelinedCallback callback) {
  final Jedis jedis = jedisPool.getResource();
  try {
    final Pipeline pipeline = jedis.pipelined();
    callback.execute(pipeline);
    // use #sync(), not #exec()
    pipeline.sync();
  } finally {
    jedisPool.returnResource(jedis);
  }
}

这部分代码有问题,您在其中尝试使用 .get() 方法从管道中检索值。

Set<byte[]> rawKeys = pipeline.zrange(rawKnownKeysKey, (offset) * PAGE_SIZE, (offset + 1) * PAGE_SIZE - 1).get();

当您在管道中发出命令时,如果您尝试在发出 sync() 之前获取值,则需要在检索值之前发出 sync() 命令以执行已添加到管道中的所有命令它会像上面那样抛出错误。

解决方案:

Response<Set<byte[]>> temp = pipeline.zrange(rawKnownKeysKey, (offset) * PAGE_SIZE, (offset + 1) * PAGE_SIZE - 1);
pipeline.sync();
Set<byte[]> rawKeys = temp.get();