Skip to content

Commit

Permalink
vcsim: implement WaitForUpdatesEx pagination support
Browse files Browse the repository at this point in the history
Default WaitOptions.MaxObjectUpdates default to 100 as real vCenter does.
  • Loading branch information
dougm committed Aug 10, 2024
1 parent a22290f commit 5a501b5
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
50 changes: 50 additions & 0 deletions simulator/property_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type PropertyCollector struct {
mo.PropertyCollector

nopLocker
pending *types.UpdateSet
updates []types.ObjectUpdate
mu sync.Mutex
cancel context.CancelFunc
Expand Down Expand Up @@ -818,9 +819,47 @@ func (pc *PropertyCollector) apply(ctx *Context, update *types.UpdateSet) types.
return nil
}

// pageUpdateSet limits the given UpdateSet to max number of object updates.
// nil is returned when not truncated, otherwise the remaining UpdateSet.
func pageUpdateSet(update *types.UpdateSet, max int) *types.UpdateSet {
for i := range update.FilterSet {
set := update.FilterSet[i].ObjectSet
n := len(set)
if n+1 > max {
update.Truncated = types.NewBool(true)
f := types.PropertyFilterUpdate{
Filter: update.FilterSet[i].Filter,
ObjectSet: update.FilterSet[i].ObjectSet[max:],
}
update.FilterSet[i].ObjectSet = update.FilterSet[i].ObjectSet[:max]

pending := &types.UpdateSet{
Version: "P",
FilterSet: []types.PropertyFilterUpdate{f},
}

if len(update.FilterSet) > i {
pending.FilterSet = append(pending.FilterSet, update.FilterSet[i+1:]...)
update.FilterSet = update.FilterSet[:i+1]
}

return pending
}
max -= n
}
return nil
}

// WaitOptions.maxObjectUpdates says:
// > PropertyCollector policy may still limit the total count
// > to something less than maxObjectUpdates.
// Seems to be "may" == "will" and the default max is 100.
const defaultMaxObjectUpdates = 100 // vCenter's default

func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpdatesEx) soap.HasFault {
wait, cancel := context.WithCancel(context.Background())
oneUpdate := false
maxObject := defaultMaxObjectUpdates
if r.Options != nil {
if max := r.Options.MaxWaitSeconds; max != nil {
// A value of 0 causes WaitForUpdatesEx to do one update calculation and return any results.
Expand All @@ -829,6 +868,9 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda
wait, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(*max))
}
}
if max := r.Options.MaxObjectUpdates; max > 0 && max < defaultMaxObjectUpdates {
maxObject = int(max)
}
}
pc.mu.Lock()
pc.cancel = cancel
Expand All @@ -844,6 +886,12 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda
Returnval: set,
}

if pc.pending != nil {
body.Res.Returnval = pc.pending
pc.pending = pageUpdateSet(body.Res.Returnval, maxObject)
return body
}

apply := func() bool {
if fault := pc.apply(ctx, set); fault != nil {
body.Fault_ = Fault("", fault)
Expand All @@ -857,6 +905,7 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda
ctx.Map.AddHandler(pc) // Listen for create, update, delete of managed objects
apply() // Collect current state
set.Version = "-" // Next request with Version set will wait via loop below
pc.pending = pageUpdateSet(body.Res.Returnval, maxObject)
return body
}

Expand Down Expand Up @@ -932,6 +981,7 @@ func (pc *PropertyCollector) WaitForUpdatesEx(ctx *Context, r *types.WaitForUpda
}
}
if len(set.FilterSet) != 0 {
pc.pending = pageUpdateSet(body.Res.Returnval, maxObject)
return body
}
if oneUpdate {
Expand Down
68 changes: 68 additions & 0 deletions simulator/property_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package simulator

import (
"context"
"fmt"
"log"
"reflect"
"sync"
Expand Down Expand Up @@ -1766,3 +1767,70 @@ func TestUcFirst(t *testing.T) {
})
}
}

func TestPageUpdateSet(t *testing.T) {
max := defaultMaxObjectUpdates

sum := func(u []int) int {
total := 0
for i := range u {
total += u[i]
}
return total
}

sumUpdateSet := func(u *types.UpdateSet) int {
total := 0
for _, f := range u.FilterSet {
total += len(f.ObjectSet)
}
return total
}

tests := []struct {
filters int
objects []int
}{
{0, nil},
{1, []int{10}},
{1, []int{104}},
{3, []int{10, 0, 25}},
{1, []int{234, 21, 4}},
{2, []int{95, 32}},
}

for _, test := range tests {
name := fmt.Sprintf("%d filter %d objs", test.filters, sum(test.objects))
t.Run(name, func(t *testing.T) {
update := &types.UpdateSet{}

for i := 0; i < test.filters; i++ {
f := types.PropertyFilterUpdate{}
for j := 0; j < 156; j++ {
f.ObjectSet = append(f.ObjectSet, types.ObjectUpdate{})
}
update.FilterSet = append(update.FilterSet, f)
}

total := sumUpdateSet(update)
sum := 0

for {
pending := pageUpdateSet(update, max)
n := sumUpdateSet(update)
if n > max {
t.Fatalf("%d > %d", n, max)
}
sum += n
if pending == nil {
break
}
update = pending
}

if sum != total {
t.Fatalf("%d != %d", sum, total)
}
})
}
}

0 comments on commit 5a501b5

Please sign in to comment.