如何使用 JAVA 在 Redis 中进行批量插入?

How to do Mass insertion in Redis using JAVA?

您好,我需要多次插入表单

SADD 键值

我有键值对,需要知道如何使用 JAVA 执行批量插入。我在 Redis 协议中写了一个文件。如何进一步

如果您将输入写入 Redis 协议格式,那么为什么不使用 redis-cli 或 nc 的管道模式?解释自http://redis.io/topics/mass-insert.

如果您有大量(键、值)输入,那么您可以使用 Jedis 执行带流水线的 sadd 以获得更高的性能。

下面的例子假设迭代器(Iterator)的元素每一项都是key"\t"value形式。

try (Jedis jedis = new Jedis(host, port)) {
  Pipeline pipeline = jedis.pipelined();
  while (iter.hasNext()) {
    String[] keyValue = iter.next().split("\t");
    pipeline.sadd(keyValue[0], keyValue[1]);
    // you can call pipeline.sync() and start new pipeline here if you think there're so much operations in one pipeline
  }
  pipeline.sync();
}

如果您正在通过 Spring CacheManager 执行实际的 read/write 操作,并且 RedisTemplate 配置为使用 Redis作为缓存,您还可以使用 RedisTemplate 的 executePipelined 方法,该方法将回调作为参数。回调需要定义 doInRedis 方法,该方法在 Redis 中完成您想要批量执行的工作(read/write 操作)。

以下代码显示通过调用 redisTemplate.opsForHash().put().

@Component
public class RedisClient {
  @Autowired
  RedisTemplate redisTemplate;  
  
  //batch-insert using Redis pipeline, a list of objects into the cache specified by cacheName
  public void put(String cacheName, List<CacheableObject> objects) {
    try {
      this.redisTemplate.executePipelined(new RedisCallback<Object>() {
        @Override
        public Object doInRedis(RedisConnection connection) throws DataAccessException {
          for(CacheableObject object: objects) {
            redisTemplate.opsForHash().put(cacheName, object.getKey(), object.getValue()); 
          }
          return null;
        }
      });
    }
    catch(Exception e) {
        log.error("Error inserting objects into Redis cache: {}", e.getMessage());
    }
}

RedisTemplate 本身使用如下配置 class 进行配置:

@Configuration
@EnableCaching
public class RedisCacheConfig extends CachingConfigurerSupport implements 
                                                               CachingConfigurer {

  @Value("${redis.hostname}")
  private String redisHost;
  @Value("${redis.port}")
  private int redisPort;
  @Value("${redis.timeout.secs:1}")
  private int redisTimeoutInSecs;
  @Value("${redis.socket.timeout.secs:1}")
  private int redisSocketTimeoutInSecs;
  @Value("${redis.ttl.hours:1}")
  private int redisDataTTL;

  @Bean
  JedisConnectionFactory jedisConnectionFactory() {
     RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(redisHost, redisPort);
     return new JedisConnectionFactory(redisStandaloneConfiguration);
  }
 
  @Bean
  public RedisTemplate<Object, Object> redisTemplate() {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<Object, Object>();
    redisTemplate.setConnectionFactory(jedisConnectionFactory());
    return redisTemplate;
}   

  @Bean
  public RedisCacheManager redisCacheManager (JedisConnectionFactory jedisConnectionFactory) {
    RedisCacheConfiguration redisCacheConfiguration = 
            RedisCacheConfiguration.defaultCacheConfig().disableCachingNullValues()
            .entryTtl(Duration.ofHours(redisDataTTL)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.java()));
    redisCacheConfiguration.usePrefix();
    RedisCacheManager redisCacheManager = 
            RedisCacheManager.RedisCacheManagerBuilder.
            fromConnectionFactory(jedisConnectionFactory)
            .cacheDefaults(redisCacheConfiguration).build();
    redisCacheManager.setTransactionAware(true);
    return redisCacheManager;
}

  @Bean
  public JedisPoolConfig poolConfig() {
    final JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setTestOnBorrow(true);
    jedisPoolConfig.setMaxTotal(100);
    jedisPoolConfig.setMaxIdle(100);
    jedisPoolConfig.setMinIdle(10);
    jedisPoolConfig.setTestOnReturn(true);
    jedisPoolConfig.setTestWhileIdle(true);
    return jedisPoolConfig;
}

  @Override
  public CacheErrorHandler errorHandler() {
    return new RedisCacheErrorHandler();
  }
}