Spring 数据 Cassandra 在通过 CassandraPageRequest 后无法执行分页

Spring data Cassandra not able to perform Paginaton, after passing the CassandraPageRequest

问题:

nextPageStateString 正在传递给客户端,并在下一个请求期间将其转换回 Bytebuffer

下一行

feedRepository.findAll(cassandraPageRequest); 抛出异常

我只是转换序列化和反序列化 PageState,不知道为什么它抱怨我没有修改 PagingState 对象。

问题:

我是否对字节缓冲区进行了正确的序列化和反序列化? 如果不是,还有什么其他方法。


com.datastax.oss.driver.api.core.servererrors.ProtocolError: Invalid value for the paging state
    at com.datastax.oss.driver.api.core.servererrors.ProtocolError.copy(ProtocolError.java:52) ~[java-driver-core-4.13.0.jar:na]
    at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149) ~[java-driver-core-4.13.0.jar:na]
    at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53) ~[java-driver-core-4.13.0.jar:na]
    at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30) ~[java-driver-core-4.13.0.jar:na]
    at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230) ~[java-driver-core-4.13.0.jar:na]
    at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:54) ~[java-driver-core-4.13.0.jar:na]
    at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:540) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at org.springframework.data.cassandra.core.CassandraTemplate.doExecute(CassandraTemplate.java:959) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at org.springframework.data.cassandra.core.CassandraTemplate.doQueryForResultSet(CassandraTemplate.java:951) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at org.springframework.data.cassandra.core.CassandraTemplate.slice(CassandraTemplate.java:404) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at org.springframework.data.cassandra.core.CassandraTemplate.slice(CassandraTemplate.java:478) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at org.springframework.data.cassandra.repository.support.SimpleCassandraRepository.findAll(SimpleCassandraRepository.java:241) ~[spring-data-cassandra-3.3.0.jar:3.3.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new[=11=](RepositoryMethodInvoker.java:289) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:529) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:285) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:638) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:163) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:138) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.6.0.jar:2.6.0]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.13.jar:5.3.13]
    at jdk.proxy2/jdk.proxy2.$Proxy89.findAll(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137) ~[spring-tx-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.13.jar:5.3.13]
    at jdk.proxy2/jdk.proxy2.$Proxy89.findAll(Unknown Source) ~[na:na]
    at com.example.FeedService.service.impl.FeedServiceImpl.getUserFeed(FeedServiceImpl.java:60) ~[classes/:na]
    at com.example.FeedService.api.FeedApi.getFeed(FeedApi.java:39) 
@Service
public class FeedServiceImpl implements FeedService {

    private static final char[] HEX_CODE = "0123456789ABCDEF".toCharArray();

    @Autowired
    private FeedRepository feedRepository;

    @Override
    public SlicedResult<UserFeed> getUserFeed(String userId, Optional<String> pagingState) {
        // reference
        // https://github.com/JSextonn/spring-cassandra-pagination-example

        final PageRequest pageRequest =  PageRequest.of(0, constants.PAGE_SIZE);
        CassandraPageRequest cassandraPageRequest =
                pagingState.
                        map(pageState ->{

                                    int len = pageState.length();
                                    ByteBuffer bytes = ByteBuffer.allocate(len / 2);

                                    for (int i = 0; i < len; i += 2) {
                                        bytes.put((byte) ((Character.digit(pageState.charAt(i), 16) << 4)
                                                         + Character.digit(pageState.charAt(i+1), 16)));
                                     }
                                    return CassandraPageRequest.of(pageRequest,bytes);
                            }
                        )
                        .orElse(CassandraPageRequest.first(constants.PAGE_SIZE));



        Slice<UserFeed> userFeedSlice = feedRepository.findAll(cassandraPageRequest);
        CassandraPageRequest nextPageable = ((CassandraPageRequest)userFeedSlice.getPageable());
        ByteBuffer nextPagingState = nextPageable.getPagingState();
        String nextPageStateString =  toHexString(nextPagingState);
        userFeedSlice = feedRepository.findAll(userFeedSlice.nextPageable());

        return SlicedResult.<UserFeed>builder().pageState(nextPageStateString).content(userFeedSlice.getContent())
                .build();
    }

    public static String toHexString(ByteBuffer buffer) {
        final StringBuilder r = new StringBuilder(buffer.remaining() * 2);
        while (buffer.hasRemaining()) {
            final byte b = buffer.get();
            r.append(HEX_CODE[(b >> 4) & 0xF]);
            r.append(HEX_CODE[(b & 0xF)]);
        }
        return r.toString();
    }
}

CassandraPageRequest.of(pageRequest, bytes); 解决问题之前调用 bytes.position(0);