Fabric 允许应用程序在调用链码的时候监听链码中设置的事件,在监听到相应的事件后做相应的处理过程。

Fabric1.4 中的事件机制与老版本的事件机制源码部分有所不同,用的是Deliver的方式,所以老版本的代码与 Fabric1.4 版本的代码会有所不同。

本文将从源码解析整个事件机制的处理流程的。


回顾链码调用源码

链码调用监听事件部分代码

在之前的文章——Hyperledger Fabric从源码分析链码查询与调用,我有提到在ChaincodeInvokeOrQuery()函数中有对是否等待链码事件的处理,先来回顾一下:

func ChaincodeInvokeOrQuery(/*参数省略*/) (*pb.ProposalResponse, error) {
    // ...........
    if invoke {
    // invoke走这里
        // ..........
            var dg *deliverGroup
            var ctx context.Context
            if waitForEvent {
        // 这个参数是事件相关,命令中可以设置--waitForEvent参数,表示是否监听事件
                var cancelFunc context.CancelFunc
                ctx, cancelFunc = context.WithTimeout(context.Background(), waitForEventTimeout)
                defer cancelFunc()
                
        // 创建DeliverGroup
                dg = newDeliverGroup(deliverClients, peerAddresses, certificate, channelID, txid)
        // 连接所有peer节点上的deliver service
                err := dg.Connect(ctx)
                if err != nil {
                    return nil, err
                }
            }

      // 将交易发送给排序节点
            if err = bc.Send(env); err != nil {
                return proposalResp, errors.WithMessage(err, fmt.Sprintf("error sending transaction for %s", funcName))
            }

            if dg != nil && ctx != nil {
        // 等待事件被记录到账本中
                err = dg.Wait(ctx)
                if err != nil {
                    return nil, err
                }
            }
        }
    }

    return proposalResp, nil
}

创建deliverGroup对象

如果在执行peer chaincode invoke命令的时候设置了--waitForEvent参数,表示会等待并监听链码上的事件。函数中调用newDeliverGroup()创建了一个dg对象,看下这个函数,在peer/chaincode/common.go的557行:

type deliverGroup struct {
    Clients     []*deliverClient    // 所有背书节点的deliverClient
    Certificate tls.Certificate        // tls证书
    ChannelID   string    // 通道ID
    TxID        string    // txID
    mutex       sync.Mutex    
    Error       error    
    wg          sync.WaitGroup    
}

func newDeliverGroup(deliverClients []api.PeerDeliverClient, peerAddresses []string, certificate tls.Certificate, channelID string, txid string) *deliverGroup {
  // deliverClients表示所有连接背书节点的deliverClients
  // peerAddress表示所有背书节点的address
  // certificate表示tls证书
  // channelID为通道ID,txid为交易id
    clients := make([]*deliverClient, len(deliverClients))
    for i, client := range deliverClients {
        dc := &deliverClient{
            Client:  client,
            Address: peerAddresses[i],
        }
        clients[i] = dc
    }

    dg := &deliverGroup{
        Clients:     clients,
        Certificate: certificate,
        ChannelID:   channelID,
        TxID:        txid,
    }

    return dg
}

InitFactory() 创建 deliverClients

deliverClients参数在InitCmdFactory()中创建,这个函数我们之前只是简单介绍了一下,它是创建了一个链码命令工厂,其中保存了很多clients,其中就包括了DeliverClients,现在就来仔细看一下它是如何创建这些clients的,代码在peer/chaincode/common.go的349行:

// ChaincodeCmdFactory holds the clients used by ChaincodeCmd
type ChaincodeCmdFactory struct {
    EndorserClients []pb.EndorserClient    // 背书客户端,用于背书
    DeliverClients  []api.PeerDeliverClient    // Deliver客户端,用于向peer的DeliverServer发送消息,主要就是事件
    Certificate     tls.Certificate    // tls证书相关信息
    Signer          msp.SigningIdentity    // 用于消息的签名
    BroadcastClient common.BroadcastClient    // 广播客户端,用于向orderer节点发送消息
}

// InitCmdFactory init the ChaincodeCmdFactory with default clients
func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool) (*ChaincodeCmdFactory, error) {
    var err error
    var endorserClients []pb.EndorserClient
    var deliverClients []api.PeerDeliverClient
    if isEndorserRequired {
    // 是否需要背书,一般使用peer chaincode package命令打包链码的时候该参数为false
    // 主要是验证peer连接的一些参数
        if err = validatePeerConnectionParameters(cmdName); err != nil {
            return nil, errors.WithMessage(err, "error validating peer connection parameters")
        }
    // 如果没有指定peerAddresses,它的值默认为[]string{""},包含一个空字符串,也就是下面的循环只会执行一次
    for i, address := range peerAddresses {
            var tlsRootCertFile string
            if tlsRootCertFiles != nil {
        // 如果没有指定tlsRootCertFiles,它的值默认为[]string{""}
                tlsRootCertFile = tlsRootCertFiles[i]
            }
      // 获取指定peer的背书客户端
            endorserClient, err := common.GetEndorserClientFnc(address, tlsRootCertFile)
            if err != nil {
                return nil, errors.WithMessage(err, fmt.Sprintf("error getting endorser client for %s", cmdName))
            }
            endorserClients = append(endorserClients, endorserClient)
      // 获取指定peer的deliver客户端
            deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile)
            if err != nil {
                return nil, errors.WithMessage(err, fmt.Sprintf("error getting deliver client for %s", cmdName))
            }
            deliverClients = append(deliverClients, deliverClient)
        }
    if len(endorserClients) == 0 {
      // 如果endorserClients为长度为0则报错
            return nil, errors.New("no endorser clients retrieved - this might indicate a bug")
        }
    }
  // 获取证书
  certificate, err := common.GetCertificateFnc()
    if err != nil {
        return nil, errors.WithMessage(err, "error getting client cerificate")
    }
    // 获取签名者
    signer, err := common.GetDefaultSignerFnc()
    if err != nil {
        return nil, errors.WithMessage(err, "error getting default signer")
    }
  
  var broadcastClient common.BroadcastClient
  // 如果需要发送交易给排序节点的操作,isOrdererRequired为空
  // query的时候就会为false,invoke的时候就会为空
    if isOrdererRequired {
        if len(common.OrderingEndpoint) == 0 {
            if len(endorserClients) == 0 {
                return nil, errors.New("orderer is required, but no ordering endpoint or endorser client supplied")
            }
            endorserClient := endorserClients[0]

            orderingEndpoints, err := common.GetOrdererEndpointOfChainFnc(channelID, signer, endorserClient)
            if err != nil {
                return nil, errors.WithMessage(err, fmt.Sprintf("error getting channel (%s) orderer endpoint", channelID))
            }
            if len(orderingEndpoints) == 0 {
                return nil, errors.Errorf("no orderer endpoints retrieved for channel %s", channelID)
            }
            logger.Infof("Retrieved channel (%s) orderer endpoint: %s", channelID, orderingEndpoints[0])
            // override viper env
            viper.Set("orderer.address", orderingEndpoints[0])
        }

        broadcastClient, err = common.GetBroadcastClientFnc()

        if err != nil {
            return nil, errors.WithMessage(err, "error getting broadcast client")
        }
    }
  // 赋值成员,返回ChaincodeCmdFactory
    return &ChaincodeCmdFactory{
        EndorserClients: endorserClients,
        DeliverClients:  deliverClients,
        Signer:          signer,
        BroadcastClient: broadcastClient,
        Certificate:     certificate,
    }, nil
}

大体流程就是,为ChaincodeCmdFactory的各个成员赋值的过程,我们关心DeliverClients是如何赋值的,追踪过去找到相应的方法,在peer/common/peerclient.go的200行:

// InitFactory()中这么获取deliverClient
deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile)

//GetPeerDeliverClientFnc是一个函数变量,在init的时候被赋值为GetPeerDeliverClient
GetPeerDeliverClientFnc = GetPeerDeliverClient

// GetPeerDeliverClient returns a new deliver client. If both the address and
// tlsRootCertFile are not provided, the target values for the client are taken
// from the configuration settings for "peer.address" and
// "peer.tls.rootcert.file"
func GetPeerDeliverClient(address, tlsRootCertFile string) (api.PeerDeliverClient, error) {
    var peerClient *PeerClient
    var err error
  // address是传进来的peer地址,当我们没有指定peeraddress参数时,传进来的address就是""
    if address != "" {
        peerClient, err = NewPeerClientForAddress(address, tlsRootCertFile)
    } else {
    // 走这里
        peerClient, err = NewPeerClientFromEnv()
    }
    if err != nil {
        return nil, err
    }
    return peerClient.PeerDeliver()
}

看下NewPeerClientFromEnv()方法,在peer/common/peerclient.go31行:

// NewPeerClientFromEnv creates an instance of a PeerClient from the global
// Viper instance
func NewPeerClientFromEnv() (*PeerClient, error) {
      // 从环境变量中获取address,override,clientConfig,具体实现在下面
   address, override, clientConfig, err := configFromEnv("peer")
   if err != nil {
      return nil, errors.WithMessage(err, "failed to load config for PeerClient")
   }
  // 主要就是创建一个peerClient
   return newPeerClientForClientConfig(address, override, clientConfig)
}

// ClientConfig defines the parameters for configuring a GRPCClient instance
type ClientConfig struct {
    // 安全相关参数(tls)
    SecOpts *SecureOptions
    // keepalive参数
    KaOpts *KeepaliveOptions
    // client阻塞的时长
    Timeout time.Duration
    // 连接是否是非阻塞的
    AsyncConnect bool
}

// 这里就设置了两个参数,一个是SecOpts,一个是Timeout,其他都是默认值
func configFromEnv(prefix string) (address, override string, clientConfig comm.ClientConfig, err error) {
  // 获取CORE_PEER_ADDRESS环境变量,这个肯定会有
    address = viper.GetString(prefix + ".address")
  // 获取CORE_PEER_TLS_SERVERHOSTOVERRIDE环境变量,这个可能没有
    override = viper.GetString(prefix + ".tls.serverhostoverride")
    clientConfig = comm.ClientConfig{}
  // 获取CORE_PEER_CLIENT_CONNTIMEOUT环境变量,这个一般没有,用默认的
    connTimeout := viper.GetDuration(prefix + ".client.connTimeout")
    if connTimeout == time.Duration(0) {
    // 默认的3S
        connTimeout = defaultConnTimeout
    }
    clientConfig.Timeout = connTimeout
  
  // 下面都是一些TLS相关的环境变量参数读取,读取证书等等
    secOpts := &comm.SecureOptions{
    // 都是从环境变量中获取
        UseTLS:            viper.GetBool(prefix + ".tls.enabled"),
        RequireClientCert: viper.GetBool(prefix + ".tls.clientAuthRequired")}
    if secOpts.UseTLS {
        caPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.rootcert.file"))
        if res != nil {
            err = errors.WithMessage(res,
                fmt.Sprintf("unable to load %s.tls.rootcert.file", prefix))
            return
        }
        secOpts.ServerRootCAs = [][]byte{caPEM}
    }
    if secOpts.RequireClientCert {
        keyPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.clientKey.file"))
        if res != nil {
            err = errors.WithMessage(res,
                fmt.Sprintf("unable to load %s.tls.clientKey.file", prefix))
            return
        }
        secOpts.Key = keyPEM
        certPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.clientCert.file"))
        if res != nil {
            err = errors.WithMessage(res,
                fmt.Sprintf("unable to load %s.tls.clientCert.file", prefix))
            return
        }
        secOpts.Certificate = certPEM
    }
    clientConfig.SecOpts = secOpts
  // 返回clientConfig
    return
}

看下newPeerClientForClientConfig()方法,在peer/common/peerclient.go的85行:

func newPeerClientForClientConfig(address, override string, clientConfig comm.ClientConfig) (*PeerClient, error) {
  // 根据给入的配置,创建了GRPCClient对象
    gClient, err := comm.NewGRPCClient(clientConfig)
    if err != nil {
        return nil, errors.WithMessage(err, "failed to create PeerClient from config")
    }
  // 创建了PeerClient对象
    pClient := &PeerClient{
        commonClient: commonClient{
            GRPCClient: gClient,
            address:    address,
            sn:         override}}
    return pClient, nil
}

因此到这里NewPeerClientFromEnv()方法创建了一个peerClient,它从环境变量中获取相应的配置参数,创建了一个与peer节点进行通信的peerClient客户端,接下来调用peerClient.PeerDeliver()创建与peer节点的deliverServer通信的PeerDeliverClient,看下这个实现,在peer/common/peerclient.go的116行:

// PeerDeliver returns a client for the Deliver service for peer-specific use
// cases (i.e. DeliverFiltered)
func (pc *PeerClient) PeerDeliver() (api.PeerDeliverClient, error) {
  // 创建了一个grpc.ClientConn的连接对象
    conn, err := pc.commonClient.NewConnection(pc.address, pc.sn)
    if err != nil {
        return nil, errors.WithMessage(err, fmt.Sprintf("deliver client failed to connect to %s", pc.address))
    }
    pbClient := pb.NewDeliverClient(conn)
    return &PeerDeliverClient{Client: pbClient}, nil
}

看下NewDeliverClient()方法,在protos/peer/events.pb.go的509行:

// DeliverClient is the client API for Deliver service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DeliverClient interface {
    // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
    // Payload data as a marshaled orderer.SeekInfo message,
    // then a stream of block replies is received
    Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error)
    // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
    // Payload data as a marshaled orderer.SeekInfo message,
    // then a stream of **filtered** block replies is received
    DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error)
}

type deliverClient struct {
    cc *grpc.ClientConn
}

func NewDeliverClient(cc *grpc.ClientConn) DeliverClient {
    return &deliverClient{cc}
}

OK到这里我们就找了DeliverClient的相关的proto接口与对象了,DeliverClient可以调用DeliverDeliverFiltered两个接口分别实现两个不同的流:

Deliver流:

// Deliver接口返回的流,实现了接口Send,Recv分别从流中获取消息
func (c *deliverClient) Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error) {
    stream, err := c.cc.NewStream(ctx, &_Deliver_serviceDesc.Streams[0], "/protos.Deliver/Deliver", opts...)
    if err != nil {
        return nil, err
    }
    x := &deliverDeliverClient{stream}
    return x, nil
}

type Deliver_DeliverClient interface {
    Send(*common.Envelope) error
    Recv() (*DeliverResponse, error)
    grpc.ClientStream
}

type deliverDeliverClient struct {
    grpc.ClientStream
}

func (x *deliverDeliverClient) Send(m *common.Envelope) error {
    return x.ClientStream.SendMsg(m)
}

func (x *deliverDeliverClient) Recv() (*DeliverResponse, error) {
    m := new(DeliverResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

DeliverFiltered流:

// DeliverFiltered接口返回的流,实现了接口Send,Recv分别从流中获取消息
func (c *deliverClient) DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error) {
    stream, err := c.cc.NewStream(ctx, &_Deliver_serviceDesc.Streams[1], "/protos.Deliver/DeliverFiltered", opts...)
    if err != nil {
        return nil, err
    }
    x := &deliverDeliverFilteredClient{stream}
    return x, nil
}

type Deliver_DeliverFilteredClient interface {
    Send(*common.Envelope) error
    Recv() (*DeliverResponse, error)
    grpc.ClientStream
}

type deliverDeliverFilteredClient struct {
    grpc.ClientStream
}

func (x *deliverDeliverFilteredClient) Send(m *common.Envelope) error {
    return x.ClientStream.SendMsg(m)
}

func (x *deliverDeliverFilteredClient) Recv() (*DeliverResponse, error) {
    m := new(DeliverResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

DeliverServer服务我们一会再回过头来看,现在先跟着刚才的思路走下去。

PeerDeliver()方法中,我们通过NewDeliverClient()方法创建了一个DeliverClient,并赋值到PeerDeliverClient对象的成员变量中返回。

好了到这里InitCmdFactory()方法中就创建好了PeerDeliverClient,它最终提供了两个接口,最终在使用接口的时候再来看看:

// PeerDeliverClient defines the interface for a peer deliver client
type PeerDeliverClient interface {
  // Deliver方法在源码中找调用发送没有地方在调用
    Deliver(ctx context.Context, opts ...grpc.CallOption) (api.Deliver, error)
    DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (api.Deliver, error)
}

连接到peer节点上的deliver service并发送消息

再回到ChaincodeInvokeOrQuery()中,我们通过newDeliverGroup()创建了一个DeliverGroup对象dg,其中包含了刚刚说的在InitCmdFactory()方法中就创建好了PeerDeliverClient

接下来执行的下面这行代码:

 // 创建DeliverGroup
dg = newDeliverGroup(deliverClients, peerAddresses, certificate, channelID, txid)
// 连接所有peer节点上的deliver service
err := dg.Connect(ctx)

来看下这个方法,在peer/chaincode/common.go的581行:

// Connect waits for all deliver clients in the group to connect to
// the peer's deliver service, receive an error, or for the context
// to timeout. An error will be returned whenever even a single
// deliver client fails to connect to its peer
func (dg *deliverGroup) Connect(ctx context.Context) error {
   // waitgroup Add deliverClients的长度,如果没有peeraddress参数默认就是一个peer
   dg.wg.Add(len(dg.Clients))
   for _, client := range dg.Clients {
         // 建立起deliverClient与deliverServer的连接
      go dg.ClientConnect(ctx, client)
   }
   readyCh := make(chan struct{})
  // 每个ClientConnect执行完都会加waitgroup -1
  // 这里就是执行wg.wait(),完了以后发送信号给readyCh通道
   go dg.WaitForWG(readyCh)

   select {
   case <-readyCh:
     // 全部完成后判断有没有错,有一个出错就返回错误
      if dg.Error != nil {
         err := errors.WithMessage(dg.Error, "failed to connect to deliver on all peers")
         return err
      }
   case <-ctx.Done():
     // ctx设置了timeout,如果时间到了连接还没有全部建立完成就报错
      err := errors.New("timed out waiting for connection to deliver on all peers")
      return err
   }

   return nil
}

接下来看下ClientConnect()方法,在peer/chaincode/common.go的606行:

// ClientConnect sends a deliver seek info envelope using the
// provided deliver client, setting the deliverGroup's Error
// field upon any error
func (dg *deliverGroup) ClientConnect(ctx context.Context, dc *deliverClient) {
  // wg.Done
    defer dg.wg.Done()
  // 这里最终建立的是DeliverFiltered流,没有建立Deliver流
    df, err := dc.Client.DeliverFiltered(ctx)
    if err != nil {
        err = errors.WithMessage(err, fmt.Sprintf("error connecting to deliver filtered at %s", dc.Address))
        dg.setError(err)
        return
    }
  // CloseSend关闭流的发送方向,如果遇到错误时,将关闭流
  // 看这意思是只会发送一条消息,也就是下面将要发送的这一条消息
    defer df.CloseSend()
    dc.Connection = df
    
  // 创建消息envelope,一会看下这个函数
    envelope := createDeliverEnvelope(dg.ChannelID, dg.Certificate)
  // 往流里发送数据
    err = df.Send(envelope)
    if err != nil {
        err = errors.WithMessage(err, fmt.Sprintf("error sending deliver seek info envelope to %s", dc.Address))
        dg.setError(err)
        return
    }
}

看下createDeliverEnvelope()方法,在peer/chaincode/common.go的701行:

// 该函数创建了DeliverClient发送的第一个消息,也是唯一一个消息
func createDeliverEnvelope(channelID string, certificate tls.Certificate) *pcommon.Envelope {
    var tlsCertHash []byte
    // 检查证书,计算证书Hash
    if len(certificate.Certificate) > 0 {
        tlsCertHash = util.ComputeSHA256(certificate.Certificate[0])
    }
    
  // start最终表示的是区块寻址的开始高度
    start := &ab.SeekPosition{
        Type: &ab.SeekPosition_Newest{
            Newest: &ab.SeekNewest{},
        },
    }
    
  // stop表示的是区块寻址的终止高度
    stop := &ab.SeekPosition{
        Type: &ab.SeekPosition_Specified{
            Specified: &ab.SeekSpecified{
                Number: math.MaxUint64,
            },
        },
    }
    
  // 寻址信息,包含Start,Stop,Behavior
  // 下面再来解析这几个参数的意思
    seekInfo := &ab.SeekInfo{
        Start:    start,
        Stop:     stop,
        Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
    }
    
  // 根据现有的信息创建一个签名的Envelope
    env, err := putils.CreateSignedEnvelopeWithTLSBinding(
        pcommon.HeaderType_DELIVER_SEEK_INFO, channelID, localmsp.NewSigner(),
        seekInfo, int32(0), uint64(0), tlsCertHash)
    if err != nil {
        logger.Errorf("Error signing envelope: %s", err)
        return nil
    }

    return env
}

来看下几个关键的几个结构体

SeekPosition表示寻找的位置

type SeekPosition struct {
    Type                 isSeekPosition_Type 
    XXX_NoUnkeyedLiteral struct{}            
    XXX_unrecognized     []byte              
    XXX_sizecache        int32               
}

其中Type字段可以有三种类型:

  • SeekPosition_Newest表示最新的区块
  • SeekPosition_Oldest表示最原始的区块
  • SeekPosition_Specified表示指定的区块,需要指定区块高度

SeekInfo表示寻找具体信息

// SeekInfo specifies the range of requested blocks to return
// If the start position is not found, an error is immediately returned
// Otherwise, blocks are returned until a missing block is encountered, then behavior is dictated
// by the SeekBehavior specified.
type SeekInfo struct {
  // 起始高度
    Start                *SeekPosition            
  // 停止高度
    Stop                 *SeekPosition              
  // 寻找行为
    Behavior             SeekInfo_SeekBehavior    
  // 错误响应
    ErrorResponse        SeekInfo_SeekErrorResponse 
    XXX_NoUnkeyedLiteral struct{}                   
    XXX_unrecognized     []byte                     
    XXX_sizecache        int32                      
}

来看下SeekInfo_SeekBehavior类型:

// If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available,
// if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not
// found.  To request that all blocks be returned indefinitely as they are created, behavior should be
// set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64
type SeekInfo_SeekBehavior int32

const (
  // 阻塞直到请求的区块可用
    SeekInfo_BLOCK_UNTIL_READY SeekInfo_SeekBehavior = 0
  // 返回错误表示未找到指定的区块
    SeekInfo_FAIL_IF_NOT_READY SeekInfo_SeekBehavior = 1
)

// 如果要求所有区块无限期地返回(不断地返回最新的区块)。应该设置SeekInfo_BLOCK_UNTIL_READY ,并且stop的SeekPosition应该设置为SeekPosition_Specified,并且指定的高度应该为MAX_UINT64

createDeliverEnvelope()函数中,创建的SeekInfo对象的Start参数为SeekPosition_NewestStop参数为SeekPosition_Specified,并且指定为MAX_UINT64高度,Behavior指定为SeekInfo_BLOCK_UNTIL_READY。这样的设置方式就可以表示一直返回最新的区块回来。

好了看下CreateSignedEnvelopeWithTLSBinding()方法,创建一条Deliver客户端与服务之间通信的消息,在protos/utils/txutils.go的69行:

// CreateSignedEnvelopeWithTLSBinding creates a signed envelope of the desired
// type, with marshaled dataMsg and signs it. It also includes a TLS cert hash
// into the channel header
func CreateSignedEnvelopeWithTLSBinding(txType common.HeaderType, channelID string, signer crypto.LocalSigner, dataMsg proto.Message, msgVersion int32, epoch uint64, tlsCertHash []byte) (*common.Envelope, error) {
  // txType为HeaderType_DELIVER_SEEK_INFO
  // channelID为通道ID
  // signer为从msp中获取的签名者,用来签名
  // dataMsg就是创建好的seek_info类型
  // msgVersion为0,epoch为0,
  // tlsCertHash为证书hash
  
  // 创建ChannelHeader
    payloadChannelHeader := MakeChannelHeader(txType, msgVersion, channelID, epoch)
    payloadChannelHeader.TlsCertHash = tlsCertHash
    var err error
  
  // 创建SignatureHeader
    payloadSignatureHeader := &common.SignatureHeader{}

    if signer != nil {
    // 如果signer不为空,则是用signer创建一个SignatureHeader
    // 否则SignatureHeader为空
        payloadSignatureHeader, err = signer.NewSignatureHeader()
        if err != nil {
            return nil, err
        }
    }

  // 序列化seek_info信息
    data, err := proto.Marshal(dataMsg)
    if err != nil {
        return nil, errors.Wrap(err, "error marshaling")
    }
    
  // 序列化Payload
    paylBytes := MarshalOrPanic(
        &common.Payload{
            Header: MakePayloadHeader(payloadChannelHeader, payloadSignatureHeader),
            Data:   data,
        },
    )
    
  // 对payload签名
    var sig []byte
    if signer != nil {
        sig, err = signer.Sign(paylBytes)
        if err != nil {
            return nil, err
        }
    }
    
  // 创建env对象并返回
    env := &common.Envelope{
        Payload:   paylBytes,
        Signature: sig,
    }

    return env, nil
}

OK到这里ClientConnect()方法就解析完了,梳理一下:

  1. DeliverServer建立了DeliverFiltered
  2. 创建了第一个消息并发送
  3. 发送完以后调用CloseSend()关闭发送流,不会再次发送信息

其实这里完了以后,客户端就完成了建立流连接并发送消息的过程,那么接下来就是接收服务端消息的过程了。

等待服务端消息完成事件等待

回到ChaincodeInvokeOrQuery()函数中,有下面一段代码:

if dg != nil && ctx != nil {
    // wait for event that contains the txid from all peers
    err = dg.Wait(ctx)
    if err != nil {
        return nil, err
    }
}

等待并处理事件的就这么一行代码,dg.Wait(),看下这个方法,在peer/chaincode/common.go的629行:

// Wait waits for all deliver client connections in the group to
// either receive a block with the txid, an error, or for the
// context to timeout
func (dg *deliverGroup) Wait(ctx context.Context) error {
    if len(dg.Clients) == 0 {
        return nil
    }
    
  //waitgroup++
    dg.wg.Add(len(dg.Clients))
    for _, client := range dg.Clients {
    // 等待事件
        go dg.ClientWait(client)
    }
    readyCh := make(chan struct{})
  //等待waitgroup
    go dg.WaitForWG(readyCh)

    select {
    case <-readyCh:
        if dg.Error != nil {
      //有一个客户端执行ClientWait失败就返回错误
            err := errors.WithMessage(dg.Error, "failed to receive txid on all peers")
            return err
        }
    case <-ctx.Done():
    // 超时还未搞定,就是等待超时,返回错误
        err := errors.New("timed out waiting for txid on all peers")
        return err
    }

    return nil
}

看下ClientWait()方法,在peer/chaincode/common.go的657行:

// ClientWait waits for the specified deliver client to receive
// a block event with the requested txid
func (dg *deliverGroup) ClientWait(dc *deliverClient) {
    defer dg.wg.Done()
    for {
    // 死循环从流中获取消息
        resp, err := dc.Connection.Recv()
        if err != nil {
      // Recv出错直接返回
            err = errors.WithMessage(err, fmt.Sprintf("error receiving from deliver filtered at %s", dc.Address))
            dg.setError(err)
            return
        }
        switch r := resp.Type.(type) {
        case *pb.DeliverResponse_FilteredBlock:
      // 如果是DeliverResponse_FilteredBlock这个类型
      // filteredTransactions其实是监听到的区块中包含过滤事件的所有交易
            filteredTransactions := r.FilteredBlock.FilteredTransactions
            for _, tx := range filteredTransactions {
                if tx.Txid == dg.TxID {
          // 仅仅输出一个INFO日志而已,这个如果我们加了--waitForEvent参数可以看到这个日志(如果设置了事件的话)
          // 这里仅仅是输出了当前这笔交易
                    logger.Infof("txid [%s] committed with status (%s) at %s", dg.TxID, tx.TxValidationCode, dc.Address)
                    return
                }
            }
        case *pb.DeliverResponse_Status:
      // 如果是DeliverResponse_Status类型就报错
            err = errors.Errorf("deliver completed with status (%s) before txid received", r.Status)
            dg.setError(err)
            return
        default:
      // 其他类型不支持,报错
            err = errors.Errorf("received unexpected response type (%T) from %s", r, dc.Address)
            dg.setError(err)
            return
        }
    }
}

虽然ClientWait()用了一个死循环去接收事件,但是每个分支的处理都是return的,因此我们可以看到即使DeliverClient中设置的SeekInfo参数表示的是无限期返回区块,但是我们在命令行加上--waitForEvent参数的时候,会输出一个 INFO 日志就退出了。

这里插一嘴,Fabric 的各个语言的 SDK 中都有对事件部分的控制逻辑,当然不仅仅是像 cmd 这样简单处理,可以拿到的完整的事件对象,里面包含了事件的具体内容,有兴趣地去看看各个版本的 SDK 是如何使用 Fabric 的事件机制的就可以了

到这里,链码调用这一侧,也就是DeliverClient这一侧的逻辑基本就分析完了,下一篇文章我们再看看DeliverServer这一侧的逻辑。

最后修改:2020 年 09 月 03 日 02 : 24 PM
如果觉得我的文章对你有用,请随意赞赏