goraft中所有节点的状态
Status of all nodes in goraft
我有 4 个节点的集群 2001、2002、2003 和 2004。
他们使用 goraft 绑定。
假设 2001 是主服务器。
现在当它失败时,另一个节点成为服务器。
现在我想要的是,成为当前服务器的节点应该发送消息说我是新的领导者。
那么如何实现呢?
我正在使用带有 GORAFD 实现的 GORAFT。
我这里附上源码
main.go - 对于客户端
package main
import (
"flag"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/server"
"log"
"math/rand"
"os"
"time"
"strconv"
)
var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string
func init() {
flag.Parse()
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
flag.BoolVar(&debug, "debug", false, "Raft debugging")
flag.StringVar(&host, "h", "localhost", "hostname")
p,_:=strconv.Atoi(flag.Arg(1))
flag.IntVar(&port, "p", p, "port")
flag.StringVar(&join, "join", "", "host:port of leader to join")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}
func main() {
log.SetFlags(0)
flag.Parse()
if verbose {
log.Print("Verbose logging enabled.")
}
if trace {
raft.SetLogLevel(raft.Trace)
log.Print("Raft trace debugging enabled.")
} else if debug {
raft.SetLogLevel(raft.Debug)
log.Print("Raft debugging enabled.")
}
rand.Seed(time.Now().UnixNano())
// Setup commands.
raft.RegisterCommand(&command.WriteCommand{})
// Set the data directory.
if flag.NArg() == 0 {
flag.Usage()
log.Fatal("Data path argument required")
}
path := flag.Arg(0)
if err := os.MkdirAll(path, 0744); err != nil {
log.Fatalf("Unable to create path: %v", err)
}
log.SetFlags(log.LstdFlags)
s := server.New(path, host, port)
log.Fatal(s.ListenAndServe("localhost:2001"))
fmt.Println("I am changing my status");
}
Main.go - 对于服务器即 2001
package main
import (
"flag"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/server"
"log"
"math/rand"
"os"
"time"
"strconv"
)
var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string
func init() {
flag.Parse()
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
flag.BoolVar(&debug, "debug", false, "Raft debugging")
flag.StringVar(&host, "h", "localhost", "hostname")
p,_:=strconv.Atoi(flag.Arg(1))
flag.IntVar(&port, "p", p, "port")
flag.StringVar(&join, "join", "", "host:port of leader to join")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}
func main() {
log.SetFlags(0)
flag.Parse()
if verbose {
log.Print("Verbose logging enabled.")
}
if trace {
raft.SetLogLevel(raft.Trace)
log.Print("Raft trace debugging enabled.")
} else if debug {
raft.SetLogLevel(raft.Debug)
log.Print("Raft debugging enabled.")
}
rand.Seed(time.Now().UnixNano())
// Setup commands.
raft.RegisterCommand(&command.WriteCommand{})
// Set the data directory.
if flag.NArg() == 0 {
flag.Usage()
log.Fatal("Data path argument required")
}
path := flag.Arg(0)
if err := os.MkdirAll(path, 0744); err != nil {
log.Fatalf("Unable to create path: %v", err)
}
log.SetFlags(log.LstdFlags)
s := server.New(path, host, port)
log.Fatal(s.ListenAndServe(join))
}
普通Server.go代码
package server
import (
"bytes"
"encoding/json"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/db"
"github.com/gorilla/mux"
"io/ioutil"
"log"
"math/rand"
"net/http"
"path/filepath"
"sync"
"time"
)
// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
name string
host string
port int
path string
router *mux.Router
raftServer raft.Server
httpServer *http.Server
db *db.DB
mutex sync.RWMutex
}
// Creates a new server.
func New(path string, host string, port int) *Server {
s := &Server{
host: host,
port: port,
path: path,
db: db.New(),
router: mux.NewRouter(),
}
// Read existing name or generate a new one.
if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
s.name = string(b)
} else {
s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
panic(err)
}
}
return s
}
// Returns the connection string.
func (s *Server) connectionString() string {
return fmt.Sprintf("http://%s:%d", s.host, s.port)
}
// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
var err error
log.Printf("Initializing Raft Server: %s", s.path)
// Initialize and start Raft server.
transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
if err != nil {
log.Fatal(err)
}
transporter.Install(s.raftServer, s)
s.raftServer.Start()
if leader != "" {
// Join to leader if specified.
log.Println("Attempting to join leader:", leader)
if !s.raftServer.IsLogEmpty() {
log.Fatal("Cannot join with an existing log")
}
if err := s.Join(leader); err != nil {
log.Fatal(err)
}
} else if s.raftServer.IsLogEmpty() {
// Initialize the server by joining itself.
log.Println("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
})
if err != nil {
log.Fatal(err)
}
} else {
log.Println("Recovered from log")
}
log.Println("Initializing HTTP server")
// Initialize and start HTTP server.
s.httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", s.port),
Handler: s.router,
}
s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
log.Println("Listening at:", s.connectionString())
return s.httpServer.ListenAndServe()
}
// This is a hack around Gorilla mux not providing the correct net/http
// HandleFunc() interface.
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
s.router.HandleFunc(pattern, handler)
}
// Joins to the leader of an existing cluster.
func (s *Server) Join(leader string) error {
command := &raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
}
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
if err != nil {
return err
}
resp.Body.Close()
return nil
}
func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
command := &raft.DefaultJoinCommand{}
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
value := s.db.Get(vars["key"])
w.Write([]byte(value))
}
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
// Read the value from the POST body.
b, err := ioutil.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
value := string(b)
// Execute the command against the Raft server.
_, err = s.raftServer.Do(command.NewWriteCommand(vars["key"], value))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
请给出一些解决方案。
我做到了
我刚刚在 goraft-library 代码中插入了新行,其中领导者选择发生。
因此,只需转到
server.go goraft 文件并进行如下修改。
原始 Server.go - 第 [287-309] 行
// Sets the state of the server.
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Temporarily store previous values.
prevState := s.state
prevLeader := s.leader
// Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
s.syncedPeer = make(map[string]bool)
}
// Dispatch state and leader change events.
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
已编辑 Server.go
// Sets the state of the server.
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Temporarily store previous values.
prevState := s.state
prevLeader := s.leader
// Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
s.syncedPeer = make(map[string]bool)
}
// Dispatch state and leader change events.
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
if prevLeader != s.leader {
fmt.Println("I am the Leader..!! ",s.connectionString," ",s.path)
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
所以它会在活动主服务器的控制台上打印连接状态以及当前服务器的存储路径。
我有 4 个节点的集群 2001、2002、2003 和 2004。 他们使用 goraft 绑定。 假设 2001 是主服务器。 现在当它失败时,另一个节点成为服务器。 现在我想要的是,成为当前服务器的节点应该发送消息说我是新的领导者。 那么如何实现呢? 我正在使用带有 GORAFD 实现的 GORAFT。 我这里附上源码
main.go - 对于客户端
package main
import (
"flag"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/server"
"log"
"math/rand"
"os"
"time"
"strconv"
)
var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string
func init() {
flag.Parse()
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
flag.BoolVar(&debug, "debug", false, "Raft debugging")
flag.StringVar(&host, "h", "localhost", "hostname")
p,_:=strconv.Atoi(flag.Arg(1))
flag.IntVar(&port, "p", p, "port")
flag.StringVar(&join, "join", "", "host:port of leader to join")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}
func main() {
log.SetFlags(0)
flag.Parse()
if verbose {
log.Print("Verbose logging enabled.")
}
if trace {
raft.SetLogLevel(raft.Trace)
log.Print("Raft trace debugging enabled.")
} else if debug {
raft.SetLogLevel(raft.Debug)
log.Print("Raft debugging enabled.")
}
rand.Seed(time.Now().UnixNano())
// Setup commands.
raft.RegisterCommand(&command.WriteCommand{})
// Set the data directory.
if flag.NArg() == 0 {
flag.Usage()
log.Fatal("Data path argument required")
}
path := flag.Arg(0)
if err := os.MkdirAll(path, 0744); err != nil {
log.Fatalf("Unable to create path: %v", err)
}
log.SetFlags(log.LstdFlags)
s := server.New(path, host, port)
log.Fatal(s.ListenAndServe("localhost:2001"))
fmt.Println("I am changing my status");
}
Main.go - 对于服务器即 2001
package main
import (
"flag"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/server"
"log"
"math/rand"
"os"
"time"
"strconv"
)
var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string
func init() {
flag.Parse()
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
flag.BoolVar(&debug, "debug", false, "Raft debugging")
flag.StringVar(&host, "h", "localhost", "hostname")
p,_:=strconv.Atoi(flag.Arg(1))
flag.IntVar(&port, "p", p, "port")
flag.StringVar(&join, "join", "", "host:port of leader to join")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}
func main() {
log.SetFlags(0)
flag.Parse()
if verbose {
log.Print("Verbose logging enabled.")
}
if trace {
raft.SetLogLevel(raft.Trace)
log.Print("Raft trace debugging enabled.")
} else if debug {
raft.SetLogLevel(raft.Debug)
log.Print("Raft debugging enabled.")
}
rand.Seed(time.Now().UnixNano())
// Setup commands.
raft.RegisterCommand(&command.WriteCommand{})
// Set the data directory.
if flag.NArg() == 0 {
flag.Usage()
log.Fatal("Data path argument required")
}
path := flag.Arg(0)
if err := os.MkdirAll(path, 0744); err != nil {
log.Fatalf("Unable to create path: %v", err)
}
log.SetFlags(log.LstdFlags)
s := server.New(path, host, port)
log.Fatal(s.ListenAndServe(join))
}
普通Server.go代码
package server
import (
"bytes"
"encoding/json"
"fmt"
"github.com/goraft/raft"
"github.com/goraft/raftd/command"
"github.com/goraft/raftd/db"
"github.com/gorilla/mux"
"io/ioutil"
"log"
"math/rand"
"net/http"
"path/filepath"
"sync"
"time"
)
// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
name string
host string
port int
path string
router *mux.Router
raftServer raft.Server
httpServer *http.Server
db *db.DB
mutex sync.RWMutex
}
// Creates a new server.
func New(path string, host string, port int) *Server {
s := &Server{
host: host,
port: port,
path: path,
db: db.New(),
router: mux.NewRouter(),
}
// Read existing name or generate a new one.
if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
s.name = string(b)
} else {
s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
panic(err)
}
}
return s
}
// Returns the connection string.
func (s *Server) connectionString() string {
return fmt.Sprintf("http://%s:%d", s.host, s.port)
}
// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
var err error
log.Printf("Initializing Raft Server: %s", s.path)
// Initialize and start Raft server.
transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
if err != nil {
log.Fatal(err)
}
transporter.Install(s.raftServer, s)
s.raftServer.Start()
if leader != "" {
// Join to leader if specified.
log.Println("Attempting to join leader:", leader)
if !s.raftServer.IsLogEmpty() {
log.Fatal("Cannot join with an existing log")
}
if err := s.Join(leader); err != nil {
log.Fatal(err)
}
} else if s.raftServer.IsLogEmpty() {
// Initialize the server by joining itself.
log.Println("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
})
if err != nil {
log.Fatal(err)
}
} else {
log.Println("Recovered from log")
}
log.Println("Initializing HTTP server")
// Initialize and start HTTP server.
s.httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", s.port),
Handler: s.router,
}
s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
log.Println("Listening at:", s.connectionString())
return s.httpServer.ListenAndServe()
}
// This is a hack around Gorilla mux not providing the correct net/http
// HandleFunc() interface.
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
s.router.HandleFunc(pattern, handler)
}
// Joins to the leader of an existing cluster.
func (s *Server) Join(leader string) error {
command := &raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
}
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
if err != nil {
return err
}
resp.Body.Close()
return nil
}
func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
command := &raft.DefaultJoinCommand{}
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
value := s.db.Get(vars["key"])
w.Write([]byte(value))
}
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
// Read the value from the POST body.
b, err := ioutil.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
value := string(b)
// Execute the command against the Raft server.
_, err = s.raftServer.Do(command.NewWriteCommand(vars["key"], value))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
请给出一些解决方案。
我做到了
我刚刚在 goraft-library 代码中插入了新行,其中领导者选择发生。
因此,只需转到 server.go goraft 文件并进行如下修改。
原始 Server.go - 第 [287-309] 行
// Sets the state of the server.
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Temporarily store previous values.
prevState := s.state
prevLeader := s.leader
// Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
s.syncedPeer = make(map[string]bool)
}
// Dispatch state and leader change events.
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
已编辑 Server.go
// Sets the state of the server.
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Temporarily store previous values.
prevState := s.state
prevLeader := s.leader
// Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
s.syncedPeer = make(map[string]bool)
}
// Dispatch state and leader change events.
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
if prevLeader != s.leader {
fmt.Println("I am the Leader..!! ",s.connectionString," ",s.path)
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
所以它会在活动主服务器的控制台上打印连接状态以及当前服务器的存储路径。