为什么 Akka.Persistance 不重播我的日记条目
Why is Akka.Persistance not replaying my Journal entries
我正在为 Service Fabric 编写 Akka.Persistence 的实现,但我似乎无法使快照正常工作。当它尝试恢复状态时,它会获取最新的快照,但不会重播自最新快照以来的事件。如果我只是没有正确连接组件,或者我对持久性库的实现不正确,我不清楚。
我的演员是一个简单的计数器,我的状态只是当前计数。
我希望 Recover 应该首先被调用,然后 Recover 将被调用用于最后一个快照和最高序列号之间的每个日志条目。日志中有一个函数 ReplayMessagesAsync(...) 看起来应该这样做,但它没有被调用。
我的计数器代码如下,我的其余代码是:Code
using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AkkaPersistence.Actors
{
public class Counter : ReceivePersistentActor
{
public class GetCount { }
private int counter;
private CounterState State = new CounterState();
private int _msgsSinceLastSnapshot = 0;
public Counter()
{
Recover<Evt>(evt =>
{
State.Update(evt);
});
Recover<SnapshotOffer>(offer => {
var snapshotEntry = offer.Snapshot as SnapshotEntry;
if (snapshotEntry != null)
{
State = (CounterState)snapshotEntry.Snapshot;
}
});
Command<string>(str => Persist(str, s =>
{
++counter;
var evt = new Evt(s);
State.Update(evt);
if (++_msgsSinceLastSnapshot % 10 == 0)
{
//time to save a snapshot
SaveSnapshot(State.Copy());
}
}));
Command<GetCount>(get => Sender.Tell(State.Count));
Command<SaveSnapshotSuccess>(success =>
{
ServiceEventSource.Current.Message($"Saved snapshot");
DeleteMessages(success.Metadata.SequenceNr);
});
Command<SaveSnapshotFailure>(failure => {
// handle snapshot save failure...
ServiceEventSource.Current.Message($"Snapshot failure");
});
}
public override string PersistenceId
{
get
{
return "counter";
}
}
}
internal class CounterState
{
private long count = 0L;
public long Count
{
get { return count; }
set { count = value; }
}
public CounterState(long count)
{
this.Count = count;
}
public CounterState() : this(0)
{
}
public CounterState Copy()
{
return new CounterState(count);
}
public void Update(Evt evt)
{
++Count;
}
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
}
有几处我做错了:
1) 我需要 return 传入的内容,而不是我的 SnapshotEntry,它是我的持久性机制的实现细节。
2)当我从保存字符串转换为尝试将对象保存为日志的一部分时,一个简单的错误。
3) 最后还有一个问题,那就是潜在的问题,那就是子对象的序列化失败了。在这段代码中,我不想包含子对象的类型,所以我为 Journal 添加了一个自定义序列化程序 (the Wire serializer) 以及已经存在的 SnapshotSerializer,它现在正在工作。
我正在为 Service Fabric 编写 Akka.Persistence 的实现,但我似乎无法使快照正常工作。当它尝试恢复状态时,它会获取最新的快照,但不会重播自最新快照以来的事件。如果我只是没有正确连接组件,或者我对持久性库的实现不正确,我不清楚。 我的演员是一个简单的计数器,我的状态只是当前计数。 我希望 Recover 应该首先被调用,然后 Recover 将被调用用于最后一个快照和最高序列号之间的每个日志条目。日志中有一个函数 ReplayMessagesAsync(...) 看起来应该这样做,但它没有被调用。 我的计数器代码如下,我的其余代码是:Code
using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AkkaPersistence.Actors
{
public class Counter : ReceivePersistentActor
{
public class GetCount { }
private int counter;
private CounterState State = new CounterState();
private int _msgsSinceLastSnapshot = 0;
public Counter()
{
Recover<Evt>(evt =>
{
State.Update(evt);
});
Recover<SnapshotOffer>(offer => {
var snapshotEntry = offer.Snapshot as SnapshotEntry;
if (snapshotEntry != null)
{
State = (CounterState)snapshotEntry.Snapshot;
}
});
Command<string>(str => Persist(str, s =>
{
++counter;
var evt = new Evt(s);
State.Update(evt);
if (++_msgsSinceLastSnapshot % 10 == 0)
{
//time to save a snapshot
SaveSnapshot(State.Copy());
}
}));
Command<GetCount>(get => Sender.Tell(State.Count));
Command<SaveSnapshotSuccess>(success =>
{
ServiceEventSource.Current.Message($"Saved snapshot");
DeleteMessages(success.Metadata.SequenceNr);
});
Command<SaveSnapshotFailure>(failure => {
// handle snapshot save failure...
ServiceEventSource.Current.Message($"Snapshot failure");
});
}
public override string PersistenceId
{
get
{
return "counter";
}
}
}
internal class CounterState
{
private long count = 0L;
public long Count
{
get { return count; }
set { count = value; }
}
public CounterState(long count)
{
this.Count = count;
}
public CounterState() : this(0)
{
}
public CounterState Copy()
{
return new CounterState(count);
}
public void Update(Evt evt)
{
++Count;
}
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
}
有几处我做错了: 1) 我需要 return 传入的内容,而不是我的 SnapshotEntry,它是我的持久性机制的实现细节。 2)当我从保存字符串转换为尝试将对象保存为日志的一部分时,一个简单的错误。 3) 最后还有一个问题,那就是潜在的问题,那就是子对象的序列化失败了。在这段代码中,我不想包含子对象的类型,所以我为 Journal 添加了一个自定义序列化程序 (the Wire serializer) 以及已经存在的 SnapshotSerializer,它现在正在工作。