// EventEmitter is a type that describes event emitter functions
// This should be defined in `types/events.go`
type EventEmitter func(context.Context, client.Context, ...EventHandler)
error
// EventHandler is a type of function that handles events coming out of the event bus
// This should be defined in `types/events.go`
type EventHandler func(proto.Message)
error
// Sample use of the functions below
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    if err := TxEmitter(ctx, client.Context{
}.WithNodeURI("tcp://localhost:26657"), SubmitProposalEventHandler); err != nil {
    cancel()
panic(err)
}
return
}
// SubmitProposalEventHandler is an example of an event handler that prints proposal details
// when any EventSubmitProposal is emitted.
func SubmitProposalEventHandler(ev proto.Message) (err error) {
    switch event := ev.(type) {
    // Handle governance proposal events creation events
    case govtypes.EventSubmitProposal:
        // Users define business logic here e.g.
        fmt.Println(ev.FromAddress, ev.ProposalId, ev.Proposal)
return nil
    default:
        return nil
}
}
// TxEmitter is an example of an event emitter that emits just transaction events. This can and
// should be implemented somewhere in the Cosmos SDK. The Cosmos SDK can include an EventEmitters for tm.event='Tx'
// and/or tm.event='NewBlock' (the new block events may contain typed events)
func TxEmitter(ctx context.Context, cliCtx client.Context, ehs ...EventHandler) (err error) {
    // Instantiate and start CometBFT RPC client
    client, err := cliCtx.GetNode()
    if err != nil {
    return err
}
    if err = client.Start(); err != nil {
    return err
}
    // Start the pubsub bus
    bus := pubsub.NewBus()
defer bus.Close()
    // Initialize a new error group
    eg, ctx := errgroup.WithContext(ctx)
    // Publish chain events to the pubsub bus
    eg.Go(func()
error {
    return PublishChainTxEvents(ctx, client, bus, simapp.ModuleBasics)
})
    // Subscribe to the bus events
    subscriber, err := bus.Subscribe()
    if err != nil {
    return err
}
	// Handle all the events coming out of the bus
	eg.Go(func()
error {
    var err error
    for {
    select {
    case <-ctx.Done():
                return nil
    case <-subscriber.Done():
                return nil
    case ev := <-subscriber.Events():
    for _, eh := range ehs {
    if err = eh(ev); err != nil {
    break
}
 
}
 
}
 
}
return nil
})
return group.Wait()
}
// PublishChainTxEvents events using cmtclient. Waits on context shutdown signals to exit.
func PublishChainTxEvents(ctx context.Context, client cmtclient.EventsClient, bus pubsub.Bus, mb module.BasicManager) (err error) {
    // Subscribe to transaction events
    txch, err := client.Subscribe(ctx, "txevents", "tm.event='Tx'", 100)
    if err != nil {
    return err
}
    // Unsubscribe from transaction events on function exit
    defer func() {
    err = client.UnsubscribeAll(ctx, "txevents")
}()
    // Use errgroup to manage concurrency
    g, ctx := errgroup.WithContext(ctx)
    // Publish transaction events in a goroutine
    g.Go(func()
error {
    var err error
    for {
    select {
    case <-ctx.Done():
                break
    case ed := <-ch:
    switch evt := ed.Data.(type) {
    case cmttypes.EventDataTx:
    if !evt.Result.IsOK() {
    continue
}
                    // range over events, parse them using the basic manager and
                    // send them to the pubsub bus
    for _, abciEv := range events {
    typedEvent, err := sdk.ParseTypedEvent(abciEv)
    if err != nil {
    return er
}
    if err := bus.Publish(typedEvent); err != nil {
    bus.Close()
return
}
continue
}
 
}
 
}
 
}
return err
})
    // Exit on error or context cancelation
    return g.Wait()
}