Skip to content

Commit

Permalink
Implement a session handle to running processes
Browse files Browse the repository at this point in the history
- Implement Start function that returns a Session
- Change Run to use Start and Session#Wait
- Replace RunStatus with Session

FAB-16108

Change-Id: I1ed90b9452c43020bd3a99904b6089ad6fb22b09
Signed-off-by: Matthew Sykes <[email protected]>
  • Loading branch information
sykesm committed Nov 20, 2019
1 parent af55876 commit 3032e8e
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 120 deletions.
79 changes: 10 additions & 69 deletions core/container/externalbuilders/externalbuilders.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package externalbuilders

import (
"bufio"
"encoding/json"
"fmt"
"io"
Expand All @@ -16,8 +15,6 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/chaincode/persistence"
Expand Down Expand Up @@ -335,36 +332,7 @@ type RunConfig struct {
RootCert string `json:"root_cert"` // PEM encoded peer chaincode certificate
}

type RunStatus struct {
mutex sync.Mutex
doneC chan struct{}
err error
}

func NewRunStatus() *RunStatus {
return &RunStatus{
doneC: make(chan struct{}),
}
}

func (rs *RunStatus) Err() error {
rs.mutex.Lock()
defer rs.mutex.Unlock()
return rs.err
}

func (rs *RunStatus) Done() <-chan struct{} {
return rs.doneC
}

func (rs *RunStatus) Notify(err error) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
rs.err = err
close(rs.doneC)
}

func (b *Builder) Run(ccid, bldDir string, peerConnection *ccintf.PeerConnection) (*RunStatus, error) {
func (b *Builder) Run(ccid, bldDir string, peerConnection *ccintf.PeerConnection) (*Session, error) {
lc := &RunConfig{
PeerAddress: peerConnection.Address,
CCID: ccid,
Expand Down Expand Up @@ -392,20 +360,18 @@ func (b *Builder) Run(ccid, bldDir string, peerConnection *ccintf.PeerConnection

run := filepath.Join(b.Location, "bin", "run")
cmd := b.NewCommand(run, bldDir, launchDir)

rs := NewRunStatus()
sess, err := Start(b.Logger, cmd)
if err != nil {
os.RemoveAll(launchDir)
return nil, errors.Wrapf(err, "builder '%s' run failed to start", b.Name)
}

go func() {
defer os.RemoveAll(launchDir)
err := RunCommand(b.Logger, cmd)
if err != nil {
rs.Notify(errors.Wrapf(err, "builder '%s' run failed", b.Name))
return
}
rs.Notify(nil)
sess.Wait()
}()

return rs, nil
return sess, nil
}

// NewCommand creates an exec.Cmd that is configured to prune the calling
Expand Down Expand Up @@ -441,34 +407,9 @@ func contains(envWhiteList []string, key string) bool {
}

func RunCommand(logger *flogging.FabricLogger, cmd *exec.Cmd) error {
logger = logger.With("command", filepath.Base(cmd.Path))

stderr, err := cmd.StderrPipe()
sess, err := Start(logger, cmd)
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

is := bufio.NewReader(stderr)
for done := false; !done; {
// read output line by line
line, err := is.ReadString('\n')
switch err {
case nil:
logger.Info(strings.TrimSuffix(line, "\n"))
case io.EOF:
if len(line) > 0 {
logger.Info(line)
}
done = true
default:
logger.Error("error reading command output", err)
return err
}
}

return cmd.Wait()
return sess.Wait()
}
46 changes: 4 additions & 42 deletions core/container/externalbuilders/externalbuilders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,12 @@ var _ = Describe("Externalbuilders", func() {
})

It("runs the package by invoking external builder", func() {
rs, err := builder.Run("test-ccid", bldDir, fakeConnection)
sess, err := builder.Run("test-ccid", bldDir, fakeConnection)
Expect(err).NotTo(HaveOccurred())
Eventually(rs.Done()).Should(BeClosed())
Expect(rs.Err()).NotTo(HaveOccurred())
})

Context("when the run exits with a non-zero status", func() {
BeforeEach(func() {
builder.Location = "testdata/failbuilder"
builder.Name = "failbuilder"
})

It("returns an error", func() {
rs, err := builder.Run("test-ccid", bldDir, fakeConnection)
Expect(err).NotTo(HaveOccurred())
Eventually(rs.Done()).Should(BeClosed())
Expect(rs.Err()).To(MatchError("builder 'failbuilder' run failed: exit status 1"))
})
errCh := make(chan error)
go func() { errCh <- sess.Wait() }()
Eventually(errCh).Should(Receive(BeNil()))
})
})

Expand Down Expand Up @@ -421,32 +409,6 @@ var _ = Describe("Externalbuilders", func() {
})
})

Describe("RunStatus", func() {
var rs *externalbuilders.RunStatus

BeforeEach(func() {
rs = externalbuilders.NewRunStatus()
})

It("has a blocking ready channel", func() {
Consistently(rs.Done()).ShouldNot(BeClosed())
})

When("notify is called with an error", func() {
BeforeEach(func() {
rs.Notify(fmt.Errorf("fake-status-error"))
})

It("closes the blocking ready channel", func() {
Expect(rs.Done()).To(BeClosed())
})

It("sets the error value", func() {
Expect(rs.Err()).To(MatchError("fake-status-error"))
})
})
})

Describe("SanitizeCCIDPath", func() {
It("forbids the set of forbidden windows characters", func() {
sanitizedPath := externalbuilders.SanitizeCCIDPath(`<>:"/\|?*&`)
Expand Down
12 changes: 6 additions & 6 deletions core/container/externalbuilders/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ type Instance struct {
PackageID string
BldDir string
Builder *Builder
RunStatus *RunStatus
Session *Session
}

func (i *Instance) Start(peerConnection *ccintf.PeerConnection) error {
rs, err := i.Builder.Run(i.PackageID, i.BldDir, peerConnection)
sess, err := i.Builder.Run(i.PackageID, i.BldDir, peerConnection)
if err != nil {
return errors.WithMessage(err, "could not execute run")
}
i.RunStatus = rs
i.Session = sess
return nil
}

Expand All @@ -34,11 +34,11 @@ func (i *Instance) Stop() error {
}

func (i *Instance) Wait() (int, error) {
if i.RunStatus == nil {
if i.Session == nil {
return 0, errors.Errorf("instance was not successfully started")
}
<-i.RunStatus.Done()
err := i.RunStatus.Err()
err := i.Session.Wait()
err = errors.Wrapf(err, "builder '%s' run failed", i.Builder.Name)
if exitErr, ok := errors.Cause(err).(*exec.ExitError); ok {
return exitErr.ExitCode(), err
}
Expand Down
12 changes: 10 additions & 2 deletions core/container/externalbuilders/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,18 @@ var _ = Describe("Instance", func() {
It("invokes the builder's run command and sets the run status", func() {
err := instance.Start(&ccintf.PeerConnection{
Address: "fake-peer-address",
TLSConfig: &ccintf.TLSConfig{
ClientCert: []byte("fake-client-cert"),
ClientKey: []byte("fake-client-key"),
RootCert: []byte("fake-root-cert"),
},
})
Expect(err).NotTo(HaveOccurred())
Expect(instance.RunStatus).NotTo(BeNil())
Eventually(instance.RunStatus.Done()).Should(BeClosed())
Expect(instance.Session).NotTo(BeNil())

errCh := make(chan error)
go func() { errCh <- instance.Session.Wait() }()
Eventually(errCh).Should(Receive(BeNil()))
})
})

Expand Down
87 changes: 87 additions & 0 deletions core/container/externalbuilders/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package externalbuilders

import (
"bufio"
"io"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"

"github.com/hyperledger/fabric/common/flogging"
)

type Session struct {
mutex sync.Mutex
command *exec.Cmd
exited chan struct{}
exitErr error
waitStatus syscall.WaitStatus
}

// Start will start the provided command and return a Session that can be used
// to await completion or signal the process.
//
// The provided logger is used log stderr from the running process.
func Start(logger *flogging.FabricLogger, cmd *exec.Cmd) (*Session, error) {
logger = logger.With("command", filepath.Base(cmd.Path))

stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}

err = cmd.Start()
if err != nil {
return nil, err
}

sess := &Session{
command: cmd,
exited: make(chan struct{}),
}
go sess.waitForExit(logger, stderr)

return sess, nil
}

func (s *Session) waitForExit(logger *flogging.FabricLogger, stderr io.Reader) {
// copy stderr to the logger until stderr is closed
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
logger.Info(scanner.Text())
}
if err := scanner.Err(); err != nil {
logger.Errorf("command output scanning failed: %s", err)
}

// wait for the command to exit and to complete
err := s.command.Wait()

// update state and close the exited channel
s.mutex.Lock()
defer s.mutex.Unlock()
s.exitErr = err
s.waitStatus = s.command.ProcessState.Sys().(syscall.WaitStatus)
close(s.exited)
}

// Wait waits for the running command to terminate and returns the exit error
// from the command. If the command has already exited, the exit err will be
// returned immediately.
func (s *Session) Wait() error {
<-s.exited
return s.exitErr
}

// Signal will send a signal to the running process.
func (s *Session) Signal(sig os.Signal) {
s.command.Process.Signal(sig)
}
Loading

0 comments on commit 3032e8e

Please sign in to comment.