Explore the next frontier of data

Read the latest news and opinions from our experts

 

Featured Post

Recent Posts

Kafka and Starburst: 3 Considerations for Accelerating Time to Value

Kafka was created at LinkedIn and open sourced into the Apache Software foundation in early 2011. It was developed to optimize writes especially for support of streaming applications. Kafka’s architecture can be deployed anywhere (bare metal, VMs or containers) as a cluster of highly scalable and fault tolerant servers and clients. The Kafka architecture consists of brokers, consumers, and producers which process topics that represent a grouping or catalog of records which are stored as logs. Kafka processes event records which are facts representing that an action took place in the business. These events are also called messages or records. Kafka has been used to solve a variety of problems where concurrency and IO were a bottleneck to achieving performance. 

Kafka is used in many different business applications. One example is customer 360 applications which need to have a complete understanding of their consumers to better facilitate opportunities with products and services to enhance the consumer experience. A hospitality business may track real time consumer sentiment in order to quickly respond to, for example, poor feedback and deliver high value customers with the next best offer. Network fraud detection applications also leverage event stream data to find misaligned transactions or pattern identification to reduce risk and improve the customer experience. Businesses commonly use Kafka for analytics towards predictive asset maintenance. Here, organizations record streaming data from IoT devices combined with system logs to forecast failure before it happens and take action to fix or replace the asset. 

The challenge to solving these problems today is in the ability to utilize an organization's vast amount of data in real time. Kafka’s strength is in the management and merging of in-flight information, however, to better understand and discover the data an organization may need to add query capabilities with applications such as Solr or Elasticsearch. This requires additional data integration tasks to migrate the data into indexes to facilitate consumers ability to visualize, explore and report on the data. In 2017, the Kafka project created KSQL (later renamed to ksqlDB) to provide a lightweight SQL-like streaming engine for Kafka. This allowed consumers better ease of access and better approachability over using Java, Scala or Python by business users. ksqlDB was created for stream processing and materializing asynchronously computed views. Enrichment of the objects with static data means that data needs to be integrated into a Kafka cluster to perform a join with ksqlDB. While ksqlDB excels for its designed to process streaming workflows, it is incredibly limited in how it enables data enrichment from other sources of data. While not specialized in stream processing, Starburst fills this lack of data enrichment capability via batch processing and interactive federated queries.

How can analysts leverage Kafka topics and remote data as part of their jobs without the delays associated with data movement? With organizations accelerating user productivity, how can analysts benefit from data in Kafka without requiring new skills and tools? How much wood could a woodchuck chuck if a woodchuck could chuck wood? 

Starburst Enterprise, based on open source Trino (formerly PrestoSQL) is a scalable and fault tolerant SQL-based MPP query engine. Starburst acts as a singular access point to data for all your organization's data consumers. Starburst provides 40+ data connectors that allow users to federate queries across multiple sources of data. SQL, NoSQL, streaming, cloud or on-premise data is available for reporting and analytics. The Kafka data connector for Starburst furnishes users with ANSI SQL access to Kafka topics. This connection enhances consumers' discovery and understanding of streaming data in the context of the broader enterprise data ecosystem while working with the same tools and applications they use today. Let’s look at three features of the Starburst Kafka connector that empower data consumers:

 

Kafka Table Metadata

One of the challenges to querying data in Kafka is the data structure of the underlying files. Generally, organizations standardize on JSON or AVRO file formats for Kafka messages. The latter yields a schema description language which allows users to assign column names and data types to the data. ksqlDB allows the user to produce the table structure as a streaming object. Starburst gives administrators the ability to add schema metadata to all Kafka topics in the source system. The table schema for Kafka messages can be configured to the Starburst Kafka connector with either a configuration file or configuring the schema registry (which are commonly found in Confluent the commercial distribution of Kafka) into the kafka.confluent-schema-registry-url configuration variable for the Starburst Kafka connector. Using the schema registry method allows the Starburst cluster to discover new tables through the Kafka connector and list their structure in the Starburst Kafka catalog for querying. Once in place, users can issue ANSI SQL queries against the topics in Kafka from any query application from CLI to BI tools. This capability allows users to leverage a variety of analytic and reporting solutions to work with data in Kafka through structured metadata without the need to learn a new tool or skill. For example, if analysts are developing a security report based on internal security event data, the user can interface the reporting tool of their choice with Starburst connected to Kafka, without the need to migrate to another query solution such as Elasticsearch or Solr. This produces faster reporting by avoiding the delays associated with migrating the data to additional repositories. Analysts can immediately begin discovering and understanding the data, utilize SQL for final data preparation tasks and store data objects in a semantic layer for wider business consumption of the data. Not only does this speed time to insights, but delivers a processing platform to shift the execution load from Kafka to Starburst.

 

JSON Functions

In some cases, the data structures stored in Kafka can be complex and not easily adapted to a structured query. Further, the structure of the data itself may vary across rows making it difficult to apply a schema prior to data storage. For these structures, users need to be able to perform schema-on-read type processing over schema-on-write. Schema-on-read is where the processing itself defines the structure of the data to be processed, vs schema-on-write where the storage of the data defines how the data will be consumed. Because of the need to define the structure while processing the data, these semi-structured data objects can be difficult to approach by analysts with common tooling for reporting and visualization. Starburst delivers powerful functions for identifying, extracting, formatting and parsing values from a JSON encoded string. Consumers may start by leveraging is_json_scalar() to identify the value of the string as scalar (number or string) to understand the data type. This validation helps the user to determine how they should go about processing or extracting the data. For example, consider a Kafka topic for devices being monitored on a network which contains a member array for detailed statistics for an interface. Consumers can leverage ANSI SQL in Starburst to identify and understand the data needed for the analysis and use the JSON function json_extract_scalar() or json_extract() to extract the data values as a string or encoded value. They can also filter on a specific network interface of interest all from a single SQL statement or business view. 

 

SELECT json_extract_scalar(interface , '$.name') AS name,

        json_extract_scalar("interface" , '$.uptime') AS uptime,

        json_extract_scalar("interface" , '$.statistics.collisions') AS collisions,

        json_extract_scalar("interface" , '$.statistics.tx_errors') AS tx_errors,

        json_extract_scalar("interface" , '$.statistics.rx_errors') AS rx_errors

FROM kafka_demo.store.interfaces

CROSS JOIN UNNEST(CAST(json_extract("_message", '$.interfaces') AS ARRAY<json>)) AS u(interface)

WHERE json_extract_scalar("interface" , '$.name') = 'eth0';

 

Federate with Kafka Topics

With organizations owning multiple sources of operational data, the ability to enrich transactional data in Kafka without spending cycles integrating or migrating data can drastically reduce the time to insight. Data engineers can save time from developing and maintaining pipelines to bring the data to a unified location and analysts can quickly discover and merge the attributes needed for model and report development. Further, we reduce storage costs by allowing the data to remain where it originated and using Starburst to quickly unite disparate data. Consider that our analyst may need to look up some descriptive information about the interface statistics we pulled earlier. The organization stores additional information on the specific device based on the MAC address in an Oracle database rather than Kafka. With Starburst's ability to access data over integrating data, consumers can quickly pull the additional information into the query without developing additional pipelines or resorting to co-locating additional data. By creating an additional join to the device data stored in Oracle, we can quickly enrich the data set with location information on the organization’s building, city and state where the device is deployed. 

 

SELECT d.building, d.city, d.state,  i.name, i.uptime, i.collisions, i.tx_errors, i.rx_errors

FROM (

   SELECT 

        json_extract_scalar(interface , '$.macaddr') AS macaddr,

        json_extract_scalar(interface , '$.name') AS name,

        json_extract_scalar("interface" , '$.uptime') AS uptime,

        json_extract_scalar("interface" , '$.statistics.collisions') AS collisions,

        json_extract_scalar("interface" , '$.statistics.tx_errors') AS tx_errors,

        json_extract_scalar("interface" , '$.statistics.rx_errors') AS rx_errors

   FROM kafka_demo.store.interfaces i

   CROSS JOIN UNNEST(CAST(json_extract("_message", '$.interfaces') AS ARRAY<json>)) AS u(interface)) i

JOIN s3.whois.devices d ON (i.macaddr = d.macaddr)

WHERE i.name = ‘eth0’;

 

In this blog, you learned how organizations can achieve high value from their data with a centralized access layer. Leveraging Starburst with the Kafka platform provides access to streaming data to users of all skill levels. Kafka topics can be referenced the same as structured tables using Kafka table metadata in Starburst. Converting complex JSON objects into consumable attributes with JSON functions simplifies the schema for business consumption and federation with real time data increases the relevance of Kafka topics through enrichment with remote data schemas.

Clark Bradley

Solutions Architect at Starburst

Your Comments :

data-mesh-email-signature

From Facebook

Read more of what you like.