上篇文章——Hyperledger Fabric从源码分析事件机制(一),我从 cmd 中添加--waitForEvent参数去监听事件这一入口出发,分析了DeliverClient这一侧的相关源代码,接下来这篇文章将会解析DeliverServer这一侧的相关源代码,准备好上车了兄弟们。


从proto文件中发现DeliverServer

事件相关的pb.go文件在protos/peer/events.pb.go中,关于DeliverServer的部分在576行:

// DeliverServer is the server API for Deliver service.
type DeliverServer interface {
    // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
    // Payload data as a marshaled orderer.SeekInfo message,
    // then a stream of block replies is received
    Deliver(Deliver_DeliverServer) error
    // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
    // Payload data as a marshaled orderer.SeekInfo message,
    // then a stream of **filtered** block replies is received
    DeliverFiltered(Deliver_DeliverFilteredServer) error
}

// 通过这个函数注册DeliverServer
func RegisterDeliverServer(s *grpc.Server, srv DeliverServer) {
    s.RegisterService(&_Deliver_serviceDesc, srv)
}

// Deliver流的Handler
func _Deliver_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(DeliverServer).Deliver(&deliverDeliverServer{stream})
}

type Deliver_DeliverServer interface {
    Send(*DeliverResponse) error
    Recv() (*common.Envelope, error)
    grpc.ServerStream
}

type deliverDeliverServer struct {
    grpc.ServerStream
}

func (x *deliverDeliverServer) Send(m *DeliverResponse) error {
    return x.ServerStream.SendMsg(m)
}

func (x *deliverDeliverServer) Recv() (*common.Envelope, error) {
    m := new(common.Envelope)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

// DeliverFiltered流的Handler
func _Deliver_DeliverFiltered_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(DeliverServer).DeliverFiltered(&deliverDeliverFilteredServer{stream})
}

type Deliver_DeliverFilteredServer interface {
    Send(*DeliverResponse) error
    Recv() (*common.Envelope, error)
    grpc.ServerStream
}

type deliverDeliverFilteredServer struct {
    grpc.ServerStream
}

func (x *deliverDeliverFilteredServer) Send(m *DeliverResponse) error {
    return x.ServerStream.SendMsg(m)
}

func (x *deliverDeliverFilteredServer) Recv() (*common.Envelope, error) {
    m := new(common.Envelope)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

// grpc服务对象
var _Deliver_serviceDesc = grpc.ServiceDesc{
    ServiceName: "protos.Deliver",
    HandlerType: (*DeliverServer)(nil),
    Methods:     []grpc.MethodDesc{},
    Streams: []grpc.StreamDesc{
        {
            StreamName:    "Deliver",
            Handler:       _Deliver_Deliver_Handler,
            ServerStreams: true,
            ClientStreams: true,
        },
        {
            StreamName:    "DeliverFiltered",
            Handler:       _Deliver_DeliverFiltered_Handler,
            ServerStreams: true,
            ClientStreams: true,
        },
    },
    Metadata: "peer/events.proto",
}

peer 节点启动时注册 DeliverServer

看看RegisterDeliverServer()这个方法在哪被调用了

abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)

上述代码在peer/node/start.go的274行。

因此破案了,在peer节点启动的时候,启动了DeliverServer。其中主要是abServer这个对象实现了DeliverServer的相关接口,那么来看下NewDeliverEventsServer这个方法吧,生成了一个DeliverEventsServer对象,在core/peer/deliverevents.go的132行:

// NewDeliverEventsServer creates a peer.Deliver server to deliver block and
// filtered block events
func NewDeliverEventsServer(mutualTLS bool, policyCheckerProvider PolicyCheckerProvider, chainManager deliver.ChainManager, metricsProvider metrics.Provider) peer.DeliverServer {
  // mutualTLS是TLS相关的一些东西
  // policyCheckerProvider主要是做ACL的一些检测,暂时不关心
  // chainManager传进来的是一个空的DeliverChainManager对象
  // metricsProvider是指标相关的东西
    timeWindow := viper.GetDuration("peer.authentication.timewindow")
    if timeWindow == 0 {
        defaultTimeWindow := 15 * time.Minute
        logger.Warningf("`peer.authentication.timewindow` not set; defaulting to %s", defaultTimeWindow)
        timeWindow = defaultTimeWindow
    }
    metrics := deliver.NewMetrics(metricsProvider)
  // 最终返回的是一个server对象
    return &server{
    // NewHandler创建了一个deliver.Handler对象
        dh:                    deliver.NewHandler(chainManager, timeWindow, mutualTLS, metrics, false),
        policyCheckerProvider: policyCheckerProvider,
    }
}

这个server对象最终实现了DeliverDeliverFiltered接口,实现了DeliverServer

DeliverFiltered接口的实现

上篇文章中说到,cmd 最终建立的是DeliverFiltered流,因此最终会走到serverDeliverFiltered方法,看下这个方法的实现,在core/peer/deliverevents.go的101行:

// Deliver sends a stream of blocks to a client after commitment
func (s *server) DeliverFiltered(srv peer.Deliver_DeliverFilteredServer) error {
    logger.Debugf("Starting new DeliverFiltered handler")
    defer dumpStacktraceOnPanic()
    // getting policy checker based on resources.Event_FilteredBlock resource name
    deliverServer := &deliver.Server{
        Receiver:      srv,
        PolicyChecker: s.policyCheckerProvider(resources.Event_FilteredBlock),
        ResponseSender: &filteredBlockResponseSender{
            Deliver_DeliverFilteredServer: srv,
        },
    }
  // 执行Handle方法处理
    return s.dh.Handle(srv.Context(), deliverServer)
}

Handle() 方法

最终执行的是Handle()方法,看一下,在common/deliver/deliver.go的152行:

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
    addr := util.ExtractRemoteAddress(ctx)
    logger.Debugf("Starting new deliver loop for %s", addr)
  // 指标相关
    h.Metrics.StreamsOpened.Add(1)
    defer h.Metrics.StreamsClosed.Add(1)
  // 死循环从客户端获取消息
    for {
        logger.Debugf("Attempting to read seek info message from %s", addr)
        envelope, err := srv.Recv()
        if err == io.EOF {
            logger.Debugf("Received EOF from %s, hangup", addr)
            return nil
        }
        if err != nil {
            logger.Warningf("Error reading from %s: %s", addr, err)
            return err
        }
        
    // 主要的执行函数deliverBlocks
        status, err := h.deliverBlocks(ctx, srv, envelope)
        if err != nil {
            return err
        }
        
    // 响应statusResponse
        err = srv.SendStatusResponse(status)
        if status != cb.Status_SUCCESS {
            return err
        }
        if err != nil {
            logger.Warningf("Error sending to %s: %s", addr, err)
            return err
        }
        
    // 等待新的SeekInfo信息
        logger.Debugf("Waiting for new SeekInfo from %s", addr)
    }
}

deliverBlocks() 方法

主要逻辑都在deliverBlocks()中,在common/deliver/deliver.go的194行:

func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
    addr := util.ExtractRemoteAddress(ctx)
  // 获取消息的payload,出错就返回Status_BAD_REQUEST状态
    payload, err := utils.UnmarshalPayload(envelope.Payload)
    if err != nil {
        logger.Warningf("Received an envelope from %s with no payload: %s", addr, err)
        return cb.Status_BAD_REQUEST, nil
    }
    
  // 如果payload.Header为空就返回Status_BAD_REQUEST状态
    if payload.Header == nil {
        logger.Warningf("Malformed envelope received from %s with bad header", addr)
        return cb.Status_BAD_REQUEST, nil
    }
    
  // 获取channelHeader,出错就返回Status_BAD_REQUEST状态
    chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
    if err != nil {
        logger.Warningf("Failed to unmarshal channel header from %s: %s", addr, err)
        return cb.Status_BAD_REQUEST, nil
    }
  // 验证ChannelHeader,出错就返回Status_BAD_REQUEST状态
  err = h.validateChannelHeader(ctx, chdr)
    if err != nil {
        logger.Warningf("Rejecting deliver for %s due to envelope validation error: %s", addr, err)
        return cb.Status_BAD_REQUEST, nil
    }
  
  // 获取通道信息,也就是链信息,这块不展开了
  chain := h.ChainManager.GetChain(chdr.ChannelId)
    if chain == nil {
        // 通道找不到,就返回Status_NOT_FOUND状态
        logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
        return cb.Status_NOT_FOUND, nil
    }
  
  labels := []string{
        "channel", chdr.ChannelId,
    // 这里filtered为true
        "filtered", strconv.FormatBool(isFiltered(srv)),
    }
  // 指标相关
  h.Metrics.RequestsReceived.With(labels...).Add(1)
    defer func() {
        labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
        h.Metrics.RequestsCompleted.With(labels...).Add(1)
    }()
  
  // 获取SeekInfo,出错就返回Status_BAD_REQUEST状态
  seekInfo := &ab.SeekInfo{}
    if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
        logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
        return cb.Status_BAD_REQUEST, nil
    }
  
  erroredChan := chain.Errored()
    if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT {
        // 在SeekInfo_BEST_EFFORT时(表示尽力而为),设置erroredChan = nil
    // 这样下面就会走default分支,也就是什么都不做
        erroredChan = nil
    }
    select {
    case <-erroredChan:
    // 如果出错,返回Status_SERVICE_UNAVAILABLE状态
        logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
        return cb.Status_SERVICE_UNAVAILABLE, nil
    default:
    }
  
  // 创建一个SessionAccessControl对象,出错就返回Status_BAD_REQUEST状态
  accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, h.ExpirationCheckFunc)
    if err != nil {
        logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
        return cb.Status_BAD_REQUEST, nil
    }
  
  // 评估ACL(访问控制权限)相关的一些东西
  // 如果没有权限,就会返回Status_FORBIDDEN状态
  if err := accessControl.Evaluate(); err != nil {
        logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
        return cb.Status_FORBIDDEN, nil
    }
  
  // 如果seekInfo的Start或者是Stop为nil,返回Status_BAD_REQUEST状态
  if seekInfo.Start == nil || seekInfo.Stop == nil {
        logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
        return cb.Status_BAD_REQUEST, nil
    }
  
  logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
    
  // 这里应该是获取通道账本的迭代器,用来迭代访问区块
  // 迭代器从seekInfo.Start开始,number为起始块号
  // 这块逻辑跟账本有关,就不展开了
    cursor, number := chain.Reader().Iterator(seekInfo.Start)
    defer cursor.Close()
  
  var stopNum uint64
  // 判断seekInfo.Stop的类型
    switch stop := seekInfo.Stop.Type.(type) {
    case *ab.SeekPosition_Oldest:
    // 如果seekInfo.Stop的类型是SeekPosition_Oldest
    // 那就就将stopNum设置为迭代器迭代的起始块号
        stopNum = number
    case *ab.SeekPosition_Newest:
        // 如果seekInfo.Stop的类型是SeekPosition_Start
    // 如果seekInfo.Start的类型也是SeekPosition_Start,那就就将stopNum设置为迭代器迭代的起始块号
    // 否则,stopNum设置为当前账本高度-1
    // 注意需要-1,否则可能会返回多个块,可能预期的只是一个块
        if proto.Equal(seekInfo.Start, seekInfo.Stop) {
            stopNum = number
            break
        }
        stopNum = chain.Reader().Height() - 1
    case *ab.SeekPosition_Specified:
    // 如果seekInfo.Stop的类型是SeekPosition_Specified
    // 那么就将stopNum设置为指定的number
    // 如果指定的stopnum比迭代器起始块的高度还要小,那么就报错
        stopNum = stop.Specified.Number
        if stopNum < number {
            logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
            return cb.Status_BAD_REQUEST, nil
        }
    }
  
  // 死循环
  for {
    // 如果seekInfo.Behavior是SeekInfo_FAIL_IF_NOT_READY
    // 表示如果块没有准备好久返回错误
        if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
            if number > chain.Reader().Height()-1 {
        // 如果当前迭代起始块大于当前区块高度-1,表示找不到,返回Status_NOT_FOUND状态
                return cb.Status_NOT_FOUND, nil
            }
        }
        
    // 定义两个变量
        var block *cb.Block
        var status cb.Status
        
    // 迭代结束通道,发送迭代完成通知
        iterCh := make(chan struct{})
        go func() {
      // 不停的迭代,调用next
            block, status = cursor.Next()
            close(iterCh)
        }()

        select {
        case <-ctx.Done():
      // 表示在区块取回之间上下午退出了,返回Status_INTERNAL_SERVER_ERROR状态
            logger.Debugf("Context canceled, aborting wait for next block")
            return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
        case <-erroredChan:
      // TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
            // this error, we will need to update this error message, possibly finding a way to signal what error text to return.
            // 这块看不太懂这个erroredChan啥意思,好像是共识有关的,给排序节点用的,返回Status_SERVICE_UNAVAILABLE状态
            logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
            return cb.Status_SERVICE_UNAVAILABLE, nil
        case <-iterCh:
            // Iterator has set the block and status vars
      // 到这表示迭代完了
        }

        if status != cb.Status_SUCCESS {
      // 迭代完status不为Status_SUCCESS则返回对应的status
            logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
            return status, nil
        }

        // increment block number to support FAIL_IF_NOT_READY deliver behavior
    // 这个值++为下一个循环开始的判断做准备
    // 因为number++以后,number > chain.Reader().Height()-1应该就会成立了
    // 这个时候FAIL_IF_NOT_READY 的行为会返回
        number++
        
    // 继续再评估一下ACL
        if err := accessControl.Evaluate(); err != nil {
            logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
            return cb.Status_FORBIDDEN, nil
        }

        logger.Debugf("[channel: %s] Delivering block [%d] for (%p) for %s", chdr.ChannelId, block.Header.Number, seekInfo, addr)
        
    // 将这次迭代拿到的区块返回,发送失败返回Status_INTERNAL_SERVER_ERROR状态
        if err := srv.SendBlockResponse(block); err != nil {
            logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
            return cb.Status_INTERNAL_SERVER_ERROR, err
        }
        
    //指标设置
        h.Metrics.BlocksSent.With(labels...).Add(1)
        
    //如果block的区块好和stopnum一样,那就要停止了
    //所以一般如果要无限循环的话,stopnum一般就设置为math.MaxInt64了
    //这一点在上一篇文章中提到过
        if stopNum == block.Header.Number {
            break
        }
    }
    
    logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
    
  // 完成之后还是要返回一个Status_SUCCESS状态
    return cb.Status_SUCCESS, nil
}

代码很长,不过好在没有很多需要跳转的地方。需要注意的一个地方是,发送消息给DeliverClient的时候,只有两种响应类型:

// ResponseSender defines the interface a handler must implement to send
// responses.
type ResponseSender interface {
    SendStatusResponse(status cb.Status) error
    SendBlockResponse(block *cb.Block) error
}

一般拿到一个区块的时候,就会执行SendBlockResponse(),最终返回的响应类型是DeliverResponse_FilteredBlock。在返回一个status时,包括各种错误,或是Status_SUCCESS,都会执行`SendStatusResponse(),最终返回的响应类型是DeliverResponse_Status。返回status就代表DeliverServer这边的逻辑处理结束了,需要通知DeliverClientDeliverClient具体怎么处理就看各自实现了。

通过SendBlockResponse() 发送响应

最后看下SendBlockResponse()方法,它是最终返回一个事件响应的方法,代码在core/peer/deliverevents.go的79行:

type blockEvent common.Block

// SendBlockResponse generates deliver response with block message
func (fbrs *filteredBlockResponseSender) SendBlockResponse(block *common.Block) error {
   // blockEvent主要就是用了另一个类型要包装一下commmon.block类型
  // 以便可以自定义一个toFilteredBlock方法
   b := blockEvent(*block)
  // 这块主要就是过滤区块了
   filteredBlock, err := b.toFilteredBlock()
   if err != nil {
      logger.Warningf("Failed to generate filtered block due to: %s", err)
     // 出错调用SendStatusResponse返回错误状态
      return fbrs.SendStatusResponse(common.Status_BAD_REQUEST)
   }
  // 创建响应类型DeliverResponse,type是DeliverResponse_FilteredBlock
   response := &peer.DeliverResponse{
      Type: &peer.DeliverResponse_FilteredBlock{FilteredBlock: filteredBlock},
   }
   return fbrs.Send(response)
}

toFilteredBlock() 方法过滤区块

来看下toFilteredBlock()方法,在core/peer/deliverevents.go的157行:

func (block *blockEvent) toFilteredBlock() (*peer.FilteredBlock, error) {
  // 创建要返回的FilteredBlock对象,赋值Number字段为区块高度
    filteredBlock := &peer.FilteredBlock{
        Number: block.Header.Number,
    }
    
  // 拿到区块元数据中的common.BlockMetadataIndex_TRANSACTIONS_FILTER部分的内容,
  // TxValidationFlags是一个[]uint8类型,这里是做了一个类型转化
    txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
  // block.Data.Data就是每条交易数据
  // txIndex是下标,ebytes是一个[]byte类型,是Envelope类型的序列化内容
    for txIndex, ebytes := range block.Data.Data {
        var env *common.Envelope
        var err error
        
    // 当前交易为空,continue
        if ebytes == nil {
            logger.Debugf("got nil data bytes for tx index %d, "+
                "block num %d", txIndex, block.Header.Number)
            continue
        }
        
    // 获取env对象
        env, err = utils.GetEnvelopeFromBlock(ebytes)
        if err != nil {
            logger.Errorf("error getting tx from block, %s", err)
            continue
        }

        // 获取env的payload,这里出错直接返回,而不是continue
        payload, err := utils.GetPayload(env)
        if err != nil {
            return nil, errors.WithMessage(err, "could not extract payload from envelope")
        }
        
    // payload.Header为空continue
        if payload.Header == nil {
            logger.Debugf("transaction payload header is nil, %d, block num %d",
                txIndex, block.Header.Number)
            continue
        }
    // 获取channelHeader
        chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
        if err != nil {
            return nil, err
        }
        
    // 设置filteredBlock的ChannelID
        filteredBlock.ChannelId = chdr.ChannelId
        
    // 创建FilteredTransaction对象
    // 包括交易ID,头部类型,以及交易验证码(这个东西还不太懂)
        filteredTransaction := &peer.FilteredTransaction{
            Txid:             chdr.TxId,
            Type:             common.HeaderType(chdr.Type),
            TxValidationCode: txsFltr.Flag(txIndex),
        }
        
    // 如果交易类型是HeaderType_ENDORSER_TRANSACTION
        if filteredTransaction.Type == common.HeaderType_ENDORSER_TRANSACTION {
      // 那么就获取交易的具体数据,得到peer.Transaction对象tx
            tx, err := utils.GetTransaction(payload.Data)
            if err != nil {
                return nil, errors.WithMessage(err, "error unmarshal transaction payload for block event")
            }
            
      // 将tx.Actions(类型为[]*TransactionAction)转化为transactionActions类型(是[]*TransactionAction的别名)
      // 本质上是拿到Actions,事件就存储在交易的Acitons字段中
      // 来看下toFilteredActions方法,过滤事件去了
            filteredTransaction.Data, err = transactionActions(tx.Actions).toFilteredActions()
            if err != nil {
                logger.Errorf(err.Error())
                return nil, err
            }
        }
        
    // 设置filteredBlock的FilteredTransactions字段
        filteredBlock.FilteredTransactions = append(filteredBlock.FilteredTransactions, filteredTransaction)
    }
    
  // 最终返回filteredBlock
    return filteredBlock, nil
}

toFilteredActions() 过滤事件信息

看下toFilteredActions()方法,在core/peer/deliverevents.go的222行:

type FilteredTransactionActions struct {
    ChaincodeActions     []*FilteredChaincodeAction 
    XXX_NoUnkeyedLiteral struct{}                   
    XXX_unrecognized     []byte                     
    XXX_sizecache        int32                      
}

type FilteredChaincodeAction struct {
  // ChaincodeEvent就是链码事件结构体,这个在之前的文章中有提到
    ChaincodeEvent       *ChaincodeEvent 
    XXX_NoUnkeyedLiteral struct{}        
    XXX_unrecognized     []byte          
    XXX_sizecache        int32           
}

func (ta transactionActions) toFilteredActions() (*peer.FilteredTransaction_TransactionActions, error) {
  // 创建一个FilteredTransactionActions对象
    transactionActions := &peer.FilteredTransactionActions{}
  // 本质上就是遍历交易的Actions字段,每个action是Action类型
    for _, action := range ta {
    // 获取action的payload字段
        chaincodeActionPayload, err := utils.GetChaincodeActionPayload(action.Payload)
        if err != nil {
            return nil, errors.WithMessage(err, "error unmarshal transaction action payload for block event")
        }
        
    // chaincodeActionPayload.Action为空跳过
        if chaincodeActionPayload.Action == nil {
            logger.Debugf("chaincode action, the payload action is nil, skipping")
            continue
        }
    
    // 获取chaincodeActionPayload.Action.ProposalResponsePayload
    // 是一个ProposalResponsePayload类型
        propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
        if err != nil {
            return nil, errors.WithMessage(err, "error unmarshal proposal response payload for block event")
        }
        
    // 获取propRespPayload.Extension字段,事件就存储在其中
    // 该类型是一个ChaincodeAction类型
        caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
        if err != nil {
            return nil, errors.WithMessage(err, "error unmarshal chaincode action for block event")
        }
        
    // 获取caPayload.Events字段,该类型就是一个ChaincodeEvents类型
    // 到这里就拿到事件了
        ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)
        if err != nil {
            return nil, errors.WithMessage(err, "error unmarshal chaincode event for block event")
        }
        
    // 获取链码ID
        if ccEvent.GetChaincodeId() != "" {
      // 其实就是一个事件的拷贝操作
      // 注意这里没有把事件的payload拷贝过去
            filteredAction := &peer.FilteredChaincodeAction{
                ChaincodeEvent: &peer.ChaincodeEvent{
                    TxId:        ccEvent.TxId,
                    ChaincodeId: ccEvent.ChaincodeId,
                    EventName:   ccEvent.EventName,
          // 没有拷贝payload字段
                },
            }
      // 加上这个事件
            transactionActions.ChaincodeActions = append(transactionActions.ChaincodeActions, filteredAction)
        }
    }
  // 最终返回结果
    return &peer.FilteredTransaction_TransactionActions{
        TransactionActions: transactionActions,
    }, nil
}

DeliverFiltered 与 Deliver 的差别

好了到这里就分析完了。细心的朋友到这里其实就会发现一个问题,在做事件拷贝的时候,这一部分并没有将事件的payload字段拷贝过去,这就是DeliverFiltered这个服务做的,它最终把事件的部分内容返回回去了,不包括事件的payload信息,也就是事件的具体内容,只包含了TxId,ChaincodeId,EventName三个事件字段信息。

既然这样,那么Deliver服务做的应该就是完整的事件信息了,可以来看一下,代码在core/peer/deliverevents.go的52行:

// SendBlockResponse generates deliver response with block message
func (brs *blockResponseSender) SendBlockResponse(block *common.Block) error {
    response := &peer.DeliverResponse{
    // 直接将整个block返回
        Type: &peer.DeliverResponse_Block{Block: block},
    }
    return brs.Send(response)
}

这里甚至是直接将整个block返回了,都不带过滤事件的,看这个要从block中获取事件的操作应该要有DeliverClient来做了,这样就可以拿到事件的具体信息。

这个问题其实在protos/peer/events.pb.go中已经说明了:

type DeliverResponse_Block struct {
  // 这里直接返回
    Block *common.Block `protobuf:"bytes,2,opt,name=block,proto3,oneof"`
}

type DeliverResponse_FilteredBlock struct {
  // 这里过滤事件后返回
    FilteredBlock *FilteredBlock `protobuf:"bytes,3,opt,name=filtered_block,json=filteredBlock,proto3,oneof"`
}

很多SDK提供了类似BLOCK_FILTERED的标志来让用户选择是注册DeliverFiltered还是Deliver服务,针对不同的类型做出不同的措施。

建议事件如果只是做一个通知的作用,采用DeliverFiltered服务会更轻量,如果需要了解事件中的具体内容,则必须使用Deliver服务

最后修改:2020 年 09 月 03 日 02 : 27 PM
如果觉得我的文章对你有用,请随意赞赏