Solana DEX prices now live Get started

Article: How to use Kafka to stream fungible token data

Published on: 9/9/2024

How to use Kafka to stream fungible token data

What is Kafka?

Kafka is an open-source distributed event streaming framework capable of handling trillions of events a day. It can be used to publish and subscribe to streams of records, similar to a message queue or messaging system.

SimpleHash provides Kafka as one of the Bulk Service options to serve high performance workloads, requiring the ingestion of large amounts of token records.

When using the Kafka Bulk Service, customers subscribe to one or more data topics, which include the records of the desired data types (e.g.,  fungible token transfers for Base are available via the base.fungibletransfer.v0 topic).

To get access to the fungible token topics, reach out to hello@simplehash.com to get credentials.

Once you have those, in this short guide, we're going to talk through key Kafka concepts, and how to start streaming real-time fungible token transfers.

When might Kafka be a better solution than the REST API?

REST APIs are great for their simplicity, stateless nature, and direct, on-demand data access. However, they might not be the best choice for more demanding use cases. The following are some requirements that could make Kafka a good option:

  • Need for real-time data access: Kafka, being a streaming platform, allows you to get real-time updates on token data as soon as they occur. This can be crucial for applications that rely on up-to-the-second data, such as trading platforms or analytics apps.
  • High throughput: Kafka can process extremely high volumes of data quickly, so it's ideal if you need to ingest large amounts of token data simultaneously (like all fungible transfers on a chain).
  • Scalability: Kafka is designed to scale out in distributed systems, allowing it to handle massive amounts of data. This makes it a great choice for applications expecting to deal with a large scale of data in the future.
  • Need for stream Processing: Kafka also offers powerful stream processing capabilities. If you want to process your fungible data (e.g., aggregations, joins) in real-time as it flows through the system, Kafka could be an excellent choice.

How to get started

  • Understand Bootstrap Servers: bootstrap.servers is a Kafka client configuration for the initial list of Kafka brokers. This list is used to establish the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are provided. The format is host1:port1,host2:port2.
  • Set up Kafka Clients: Depending on the programming language you are using, you'll have to set up a Kafka client. Some popular Kafka clients include the Confluent libraries for Java or Python, and kafka-node for Node.js.
  • Configuration: To connect to the Kafka broker, you'll need to pass the bootstrap.servers configuration along with other necessary configurations such as group.id for consumer applications. Please contact SimpleHash support for the required configuration.
  • Authentication: For Confluent Cloud (which SimpleHash uses to provision Kafka), you'll need to authenticate with Confluent API keys. Typically, this is done by passing the API Key and Secret via the sasl.jaas.config (Java clients) or equivalent for other language clients. Please contact SimpleHash support for the required credentials.
  • Subscribe to Topics: A Kafka Topic contains a stream of records, similar to a table in a database. You'll need to subscribe your client to the relevant topics to start receiving messages. SimpleHash support will provision you with the required topics.
  • Start Reading from Topics: After subscribing to the relevant topics, you can start consuming the messages. This is typically done in a loop that polls for new messages and processes each new message accordingly. Example code for a Python client is shown below.
  • Interpret Data: Each record consists of a key, header, value, and a timestamp. The key is optional and the value contains the message payload (data). The format and interpretation of this data depend on the schema you've defined or the schema of the data that was used when producing the messages. By default, SimpleHash Kafka streams are serialized in Avro format.

Key Kafka concepts

  • Kafka messages are consumed to by connecting to the SimpleHash Kafka cluster - for access, please contact hello@simplehash.com
  • Kafka messages are sent in Avro format.
  • The producer compression format used is snappy
  • The schema is sent alongside the messages to aid with deserialization. SimpleHas also has a schema registry service for certain topics that eliminates the need to transmit the schema alongside each message.

Message Headers

In each Kafka message, the following headers are included, with the Key being modType and the Value being one of:

  • INSERT denotes when SimpleHash inserts new data into the main database.
  • UPDATE signifies when SimpleHash updates an existing record. Message value will be empty
  • DELETE indicates when SimpleHash deletes a specific record.

Example code for consuming a Fungible Token Kafka topic (Python):

For convenience, we've included a simple code example for consuming fungible token transfers on Base. This is useful for any application that needs real-time activity on all fungible tokens on a given chain, high throughput of fungible data, and/or for powering interfaces involving live updates.

from pprint import PrettyPrinter
import functools
import fastavro

fastavro.parse_schema = functools.partial(
   fastavro.fastavro.parse_schema, expand=True
)

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

schema_registry_client = SchemaRegistryClient(
   {
       "url": "https://psrc-xqq6z.us-central1.gcp.confluent.cloud",
       "basic.auth.user.info": "<Schema Registry API Key>:<Schema Registry API Secret>",
   }
)

config = {
   "bootstrap.servers": "pkc-3w22w.us-central1.gcp.confluent.cloud:9092",
   "security.protocol": "SASL_SSL",
   "sasl.mechanisms": "PLAIN",
   "sasl.username": "<API Key>",
   "sasl.password": "<API Secret>",
   "group.id": "<CONSUMER_GROUP_ID>",  # https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/
   # auto.offset.reset only applies when there is no initial offset in Kafka or if the current offset does not exist any more on the server
   "auto.offset.reset": "earliest",  # https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto-offset-reset
}


def _on_assign(consumer, partitions):
   for partition in partitions:
       # Set the desired offset for each partition
       partition.offset = 0  # Assign the offset of 0, adjust as needed

   # Commit the assigned offsets
   consumer.assign(partitions)
   print("Offsets assigned:", partitions)  # noqa: T001


avro_deserializer = AvroDeserializer(schema_registry_client)

TOPICS = ["base.fungibletransfer.incremental.v0"]
consumer = Consumer(config)
consumer.subscribe(TOPICS, on_assign=_on_assign)

pp = PrettyPrinter()

while True:
   msg = consumer.poll(1.0)

   if msg is None:
       print("msg is none")  # noqa: T001
       continue
   if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
           print(  # noqa: T001
               "End of partition reached {0}/{1}".format(
                   msg.topic(), msg.partition()
               )
           )
       else:
           print(  # noqa: T001
               "Error while consuming message: {0}".format(msg.error())
           )
   else:
       token = avro_deserializer(
           msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)
       )
       headers = dict(msg.headers())

       print(  # noqa: T001
           "offset: ", msg.offset(), " key: ", msg.key(), " header: ", headers
       )
       if headers["modType"].decode() in {"INSERT", "UPDATE"}:
           pp.pprint(token)

Getting Token & NFT data is hard.

SimpleHash makes it easy.

Coinbase logo
Phantom logo
Ledger logo
Brave logo
Rarible logo
Rainbow logo
Rally logo
Manifold logo
Venly logo
Exodus logo
Zerion logo
Nansen logo
Dappradar logo
Dust Labs logo
Unstoppable Domains logo
Mask logo
Crossmint logo
Tiplink logo