Skip to content

Commit

Permalink
Add cluster.peers refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Povilas Versockas <[email protected]>
  • Loading branch information
povilasv committed Jun 21, 2018
1 parent 5e86f61 commit bdfd36e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
74 changes: 69 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ type Peer struct {
peers map[string]peer
failedPeers []peer

knownPeers []string
advertiseAddr string

failedReconnectionsCounter prometheus.Counter
reconnectionsCounter prometheus.Counter
failedRefreshCounter prometheus.Counter
refreshCounter prometheus.Counter
peerLeaveCounter prometheus.Counter
peerUpdateCounter prometheus.Counter
peerJoinCounter prometheus.Counter
Expand Down Expand Up @@ -95,6 +100,7 @@ const (
DefaultProbeInterval = 1 * time.Second
DefaultReconnectInterval = 10 * time.Second
DefaultReconnectTimeout = 6 * time.Hour
DefaultRefreshInterval = 30 * time.Second
maxGossipPacketSize = 1400
)

Expand All @@ -112,6 +118,7 @@ func Join(
probeInterval time.Duration,
reconnectInterval time.Duration,
reconnectTimeout time.Duration,
refreshInterval time.Duration,
) (*Peer, error) {
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
if err != nil {
Expand Down Expand Up @@ -164,11 +171,12 @@ func Join(
}

p := &Peer{
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
knownPeers: knownPeers,
}

p.register(reg)
Expand Down Expand Up @@ -221,6 +229,9 @@ func Join(
if reconnectTimeout != 0 {
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
}
if refreshInterval != 0 {
go p.handleRefresh(refreshInterval)
}

return p, nil
}
Expand Down Expand Up @@ -298,6 +309,15 @@ func (p *Peer) register(reg prometheus.Registerer) {
Help: "A counter of the number of cluster peer reconnections.",
})

p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_refresh_failed_total",
Help: "A counter of the number of failed cluster peer refresh attempts.",
})
p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_refresh_total",
Help: "A counter of the number of cluster peer joined via refresh.",
})

p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_peers_left_total",
Help: "A counter of the number of peers that have left.",
Expand Down Expand Up @@ -382,6 +402,50 @@ func (p *Peer) reconnect() {
}
}

func (p *Peer) handleRefresh(d time.Duration) {
tick := time.NewTicker(d)
defer tick.Stop()

for {
select {
case <-p.stopc:
return
case <-tick.C:
p.refresh()
}
}
}

func (p *Peer) refresh() {
logger := log.With(p.logger, "msg", "refresh")

resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, net.Resolver{}, false)
if err != nil {
level.Debug(logger).Log("peers", p.knownPeers, "err", err)
}

members := p.mlist.Members()
for _, peer := range resolvedPeers {
var isPeerFound bool
for _, member := range members {
if member.Address() == peer {
isPeerFound = true
break
}
}

if !isPeerFound {
if _, err := p.mlist.Join([]string{peer}); err != nil {
p.failedRefreshCounter.Inc()
level.Debug(logger).Log("result", "failure", "addr", peer)
} else {
p.refreshCounter.Inc()
level.Debug(logger).Log("result", "success", "addr", peer)
}
}
}
}

func (p *Peer) peerJoin(n *memberlist.Node) {
p.peerLock.Lock()
defer p.peerLock.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestJoinLeave(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
Expand All @@ -64,6 +65,7 @@ func TestJoinLeave(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
Expand Down Expand Up @@ -93,6 +95,7 @@ func TestReconnect(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
Expand All @@ -113,6 +116,7 @@ func TestReconnect(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
Expand Down Expand Up @@ -148,6 +152,7 @@ func TestRemoveFailedPeers(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
Expand Down Expand Up @@ -194,6 +199,7 @@ func TestInitiallyFailingPeers(t *testing.T) {
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
DefaultRefreshInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
Expand Down
2 changes: 2 additions & 0 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func main() {
settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
reconnectInterval = kingpin.Flag("cluster.reconnect-interval", "Interval between attempting to reconnect to lost peers.").Default(cluster.DefaultReconnectInterval.String()).Duration()
peerReconnectTimeout = kingpin.Flag("cluster.reconnect-timeout", "Length of time to attempt to reconnect to a lost peer.").Default(cluster.DefaultReconnectTimeout.String()).Duration()
refreshInterval = kingpin.Flag("cluster.refresh-interval", "Interval between attempting to refresh cluster.peers DNS records.").Default(cluster.DefaultReconnectInterval.String()).Duration()
)

kingpin.Version(version.Print("alertmanager"))
Expand Down Expand Up @@ -196,6 +197,7 @@ func main() {
*probeInterval,
*reconnectInterval,
*peerReconnectTimeout,
*refreshInterval,
)
if err != nil {
level.Error(logger).Log("msg", "Unable to initialize gossip mesh", "err", err)
Expand Down

0 comments on commit bdfd36e

Please sign in to comment.