<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>The Ops Community ⚙️: Avital Trifsik</title>
    <description>The latest articles on The Ops Community ⚙️ by Avital Trifsik (@avital_trifsik).</description>
    <link>https://community.ops.io/avital_trifsik</link>
    <image>
      <url>https://community.ops.io/images/oeUJln31ycwbHqaEpjQnfU6UwJ-iHtETGxr7476f3u4/rs:fill:90:90/g:sm/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL3Vz/ZXIvcHJvZmlsZV9p/bWFnZS8xMDYzL2Y2/ZWY0NTQyLWQ4NTEt/NGE1Yi1hODlkLWRi/OTk4NTllZDRlMy5q/cGc</url>
      <title>The Ops Community ⚙️: Avital Trifsik</title>
      <link>https://community.ops.io/avital_trifsik</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://community.ops.io/feed/avital_trifsik"/>
    <language>en</language>
    <item>
      <title>Task scheduling with a message broker</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Thu, 17 Aug 2023 08:40:37 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/task-scheduling-with-a-message-broker-ma6</link>
      <guid>https://community.ops.io/memphis_dev/task-scheduling-with-a-message-broker-ma6</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Task scheduling is essential in modern applications to maximize resource utilization and user experience (Non-blocking task fulfillment).&lt;br&gt;
A queue is a powerful tool that allows your application to manage and prioritize tasks in a structured, persistent, and scalable way.&lt;br&gt;
While there are multiple possible solutions, working with a queue (which is also the perfect data structure for that type of work), can ensure that tasks are completed in their creation order without the risk of forgetting, overlooking, or double-processing critical tasks.&lt;/p&gt;

&lt;p&gt;A very interesting story on the need and evolvement as the scale grows can be found in one of DigitalOcean’s co-founder’s blog post&lt;br&gt;
&lt;a href="https://www.digitalocean.com/blog/from-15-000-database-connections-to-under-100-digitaloceans-tale-of-tech-debt"&gt;From 15,000 database connections to under 100.&lt;br&gt;
&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Any other solutions besides a queue?
&lt;/h2&gt;

&lt;p&gt;Multiple. Each with its own advantages and disadvantages.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cron&lt;/strong&gt;&lt;br&gt;
You can use cron job schedulers to automate such tasks. The issue with cron is that the job and its execution time have to be written explicitly and before the actual execution, making your architecture highly static and not event-driven. Mainly suitable for a well-defined and known set of tasks that either way have to take place, not by a user action.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Database&lt;/strong&gt;&lt;br&gt;
A database can be a good and simple choice for a task storing place, and actually used for that in the early days of a product MVP,&lt;br&gt;
but there are multiple issues with that approach, for example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Ordering of insertion is not guaranteed, and therefore the tasks handling might not take place in the order they actually got created.&lt;/li&gt;
&lt;li&gt;Double processing can happen as the nature of a database is not to delete a record once read, so there is a potential of double reading and processing a specific task, and the results of that can be catastrophic to a system’s behavior.&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Traditional queues
&lt;/h2&gt;

&lt;p&gt;Often, for task scheduling, the chosen queue would probably be a pub/sub system like RabbitMQ.&lt;/p&gt;

&lt;p&gt;Choosing RabbitMQ over a classic broker such as Kafka, for example, in the context of task scheduling does make sense as a more suitable tool for that type of task given the natural behavior of Kafka to retain records (or tasks) till a specific point in time, no matter if acknowledged or not.&lt;/p&gt;

&lt;p&gt;The downside in choosing RabbitMQ would be the lack of scale, robustness, and performance, which in time become increasingly needed.&lt;/p&gt;

&lt;p&gt;With that idea in mind, Memphis is a broker that presents scale, robustness, and high throughput alongside a type of retention that fully enables task scheduling over a message broker.&lt;/p&gt;




&lt;h2&gt;
  
  
  Memphis Broker is a perfect queue for task scheduling
&lt;/h2&gt;

&lt;p&gt;On v1.2, Memphis released its support for ACK-based retention through Memphis Cloud. Read more &lt;a href="https://docs.memphis.dev/memphis/memphis-broker/concepts/station#retention"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Messages will be removed from a station only when &lt;strong&gt;acknowledged by all&lt;/strong&gt; the connected consumer groups. For example:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;If we have only one connected consumer group when a message/record is acknowledged, it will be automatically removed from the station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If we have two connected consumer groups, the message will be removed from the station (=queue) once all CGs acknowledge the message.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We mentioned earlier the advantages and disadvantages of using traditional queues such as RabbitMQ in comparison to common brokers such as Kafka in the context of task scheduling. When comparing both tools to Memphis, it’s all about getting the best from both worlds.&lt;/p&gt;

&lt;p&gt;A few of Memphis.dev advantages –&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Ordering&lt;/li&gt;
&lt;li&gt;Exactly-once delivery guarantee&lt;/li&gt;
&lt;li&gt;Highly scalable, serving data in high throughput with low 4. latency&lt;/li&gt;
&lt;li&gt;Ack-based retention&lt;/li&gt;
&lt;li&gt;Many-to-Many pattern&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Getting started with Memphis Broker as a tasks queue
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;a href="https://cloud.memphis.dev/"&gt;Sign up&lt;/a&gt; to Memphis Cloud.&lt;/li&gt;
&lt;li&gt;Connect your task producer –&lt;/li&gt;
&lt;li&gt;Producers are the entities that insert new records or tasks.&lt;/li&gt;
&lt;li&gt;Consumers are the entities who read and process them.&lt;/li&gt;
&lt;li&gt;A single client with a single connection object can act as both at the same time, meaning be both a producer and a consumer. Not to the same station because it will lead to an infinite loop. It’s doable, but not making much sense.
That pattern is more to reduce footprint and needed “workers” so a single worker can produce tasks to a specific station, but can also act as a consumer or a processor to another station of a different use case.
The below code example will create an Ack-based station and initiate a producer in node.js –
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { memphis } = require("memphis-dev");

(async function () {
  let memphisConnection;

  try {
    memphisConnection = await memphis.connect({
      host: "MEMPHIS_BROKER_HOSTNAME",
      username: "CLIENT_TYPE_USERNAME",
      password: "PASSWORD",
      accountId: ACCOUNT_ID
    });

    const station = await memphis.station({
      name: 'tasks',
      retentionType: memphis.retentionTypes.ACK_BASED,
    })

    const producer = await memphisConnection.producer({
      stationName: "tasks",
      producerName: "producer-1",
    });

    const headers = memphis.headers();
    headers.add("Some_KEY", "Some_VALUE");
    await producer.produce({
      message: {taskID: 123, task: "deploy a new instance"}, // you can also send JS object - {}
      headers: headers,
    });

    memphisConnection.close();
  } catch (ex) {
    console.log(ex);
    if (memphisConnection) memphisConnection.close();
  }
})();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Connect your task consumer –&lt;/li&gt;
&lt;li&gt;The below consumer group will consume tasks, process them, and, once finished – acknowledge them.
By acknowledging the tasks, the broker will make sure to remove those records to ensure exactly-once processing. We are using the station entity here as well in case the consumer starts before the producer.
No need to worry. It is applied if the station does not exist yet.Another thing to remember is that a consumer group can contain multiple consumers to increase parallelism and read-throughput. Within each consumer group, only a single consumer will read and ack the specific message, not all the contained consumers. In case that pattern is needed, then multiple consumer groups are needed.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const { memphis } = require("memphis-dev");

(async function () {
  let memphisConnection;

  try {
    memphisConnection = await memphis.connect({
      host: "MEMPHIS_BROKER_HOSTNAME",
      username: "APPLICATION_TYPE_USERNAME",
      password: "PASSWORD",
      accountId: ACCOUNT_ID
    });

    const station = await memphis.station({
      name: 'tasks',
      retentionType: memphis.retentionTypes.ACK_BASED,
    })

    const consumer = await memphisConnection.consumer({
      stationName: "tasks",
      consumerName: "worker1",
      consumerGroup: "cg_workers",
    });

    consumer.setContext({ key: "value" });
    consumer.on("message", (message, context) =&amp;gt; {
      console.log(message.getData().toString());
      message.ack();
      const headers = message.getHeaders();
    });

    consumer.on("error", (error) =&amp;gt; {});
  } catch (ex) {
    console.log(ex);
    if (memphisConnection) memphisConnection.close();
  }
})();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;If you liked the tutorial and want to learn what else you can do with Memphis Head &lt;a href="https://docs.memphis.dev/memphis/getting-started/tutorials"&gt;here&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis-broker"&gt;Github&lt;/a&gt;•&lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt;•[Discord (&lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;https://discord.com/invite/DfWFT7fzUu&lt;/a&gt;)&lt;/p&gt;




&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/blog/streaming-first-infrastructure-for-real-time-machine-learning/"&gt;Memphis.dev&lt;/a&gt; By &lt;a href="https://www.linkedin.com/in/shay-bratslavsky/"&gt;Shay Bratslavsky&lt;/a&gt;, Software Engineer at @Memphis.dev&lt;/p&gt;

</description>
      <category>taskmanagement</category>
      <category>taskscheduling</category>
      <category>messagebroker</category>
    </item>
    <item>
      <title>Part 4: Validating CDC Messages with Schemaverse</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Thu, 22 Jun 2023 11:51:12 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/part-4-validating-cdc-messages-with-schemaverse-3mcd</link>
      <guid>https://community.ops.io/memphis_dev/part-4-validating-cdc-messages-with-schemaverse-3mcd</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part four of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In the previous two blog posts (&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;part 2&lt;/a&gt; and &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;), we described how to implement a change data capture (CDC) pipeline for &lt;a href="https://www.mongodb.com/"&gt;MongoDB&lt;/a&gt; using &lt;a href="https://debezium.io/documentation/reference/stable/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; and &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt;.&lt;/p&gt;




&lt;h2&gt;
  
  
  Schema on Write, Schema on Read
&lt;/h2&gt;

&lt;p&gt;With relational databases, schemas are defined before any data are ingested.  Only data that conforms to the schema can be inserted into the database.  This is known as “schema on write.”  This pattern ensures data integrity but can limit flexibility and the ability to evolve a system.  &lt;/p&gt;

&lt;p&gt;Predefined schemas are optional in NoSQL databases like MongoDB.  MongoDB models collections of objects.  In the most extreme case, collections can contain completely different types of objects such as cats, tanks, and books.  More commonly, fields may only be present on a subset of objects or the value types may vary from one object to another.  This flexibility makes it easier to evolve schemas over time and efficiently support objects with many optional fields.&lt;/p&gt;

&lt;p&gt;Schema flexibility puts more onus on applications that read the data.  Clients need to check for any desired field and confirm their data types.  This pattern is called "schema on read."&lt;/p&gt;




&lt;h2&gt;
  
  
  Malformed Records Cause Crashes
&lt;/h2&gt;

&lt;p&gt;In one of my positions earlier in my career, I worked on a team that developed and maintained data pipelines for an online ad recommendation system.  One of the most common sources of downtime were malformed records.  Pipeline code can fail if a field is missing, an unexpected value is encountered, or when trying to parse badly-formatted data.  If the pipeline isn't developed with errors in mind (e.g., using &lt;a href="https://en.wikipedia.org/wiki/Defensive_programming"&gt;defensive programming techniques&lt;/a&gt;, explicitly-defined data models, and validating data), the entire pipeline may crash and require manual intervention by an operator.&lt;/p&gt;

&lt;p&gt;Unfortunately, malformed data, especially when handling large volumes of data, is a frequent occurrence.  Simply hoping for the best won't lead to resilient pipelines.  As the saying goes, "Hope for the best. Plan for the worst."&lt;/p&gt;




&lt;h2&gt;
  
  
  The Best of Both Worlds: Data Validation with Schemaverse
&lt;/h2&gt;

&lt;p&gt;Fortunately, Memphis.dev has an awesome feature called Schemaverse.  Schemaverse provides a mechanism to check messages for compliance with a specified schema and handle non-confirming messages.&lt;/p&gt;

&lt;p&gt;To use Schemaverse, the operator needs to first define a schema.  Messaged schemas can be defined using JSON Schema, Google Protocol Buffers, or GraphQL.  The operator will choose the schema definition language appropriate to the format of the message payloads.&lt;/p&gt;

&lt;p&gt;Once a schema is defined, the operator can "attach" the schema to a station.  The schema will be downloaded by clients using the Memphis.dev client SDKs.  The client SDK will validate each message before sending it to the Memphis broker.  If a message doesn't validate, the client will redirect the message to the dead-letter queue, trigger a notification, and raise an exception to notify the user of the  client. &lt;/p&gt;

&lt;p&gt;In this example, we'll look at using Schemaverse to validate change data capture (CDC) events from MongoDB.&lt;/p&gt;




&lt;h2&gt;
  
  
  Review of the Solution
&lt;/h2&gt;

&lt;p&gt;In our &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;previous post&lt;/a&gt;, we described a change data capture (CDC) pipeline for a collection of todo items stored in MongoDB.  Our solution consists of eight components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds.  Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;MongoDB&lt;/strong&gt;: Configured with a single database containing a single collection (todo_items).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debezium Server&lt;/strong&gt;: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Printing Consumer&lt;/strong&gt;: A script that uses the &lt;a href="https://github.com/memphisdev/memphis.py"&gt;Memphis.dev Python SDK&lt;/a&gt; to consume messages and print them to the console.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Transformer Service&lt;/strong&gt;: A &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cleaned Printing Consumer&lt;/strong&gt;: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/aadYGOp4lKr9nEWd5i-p_Hx5uoUhZ51feu_GhrfolPU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvNXhy/OGoxcmtsZ2MyMGJp/MTRtYjAuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/aadYGOp4lKr9nEWd5i-p_Hx5uoUhZ51feu_GhrfolPU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvNXhy/OGoxcmtsZ2MyMGJp/MTRtYjAuanBn" alt="dataflow diagram" width="800" height="333"&gt;&lt;/a&gt;&lt;br&gt;
In this iteration, we aren't adding or removing any of the components.  Rather, we're just going to change Memphis.dev's configuration to perform schema validation on messages sent to the "cleaned-todo-cdc-events" station.&lt;/p&gt;


&lt;h2&gt;
  
  
  Schema for Todo Change Data Capture (CDC) Events
&lt;/h2&gt;

&lt;p&gt;In &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;, we transformed the messages to hydrate a serialized JSON subdocument to produce fully deserialized JSON messages.  The resulting message looked like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each JSON-encoded message has two top-level fields, "schema" and "payload."  We are concerned with the "payload" field.  The payload object has two required fields, "before" and "after", that we are concerned with.  The before field contains a copy of the record before being modified (or null if it didn't exist), while the after field contains a copy of the record after being modified (or null if the record is being deleted).&lt;/p&gt;

&lt;p&gt;From this example, we can define criteria that messages must satisfy to be considered valid.  Let's write the criteria out as a set of rules:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The payload/before field may contain a todo object or null.&lt;/li&gt;
&lt;li&gt;The payload/after field may contain a todo object or null.&lt;/li&gt;
&lt;li&gt;A todo object must have five fields ("_id", "creation_timestamp", "due_date", "description", and "completed").&lt;/li&gt;
&lt;li&gt;The creation_timestamp must be an object with a single field ("$date").  The "$date" field must have a positive integer value (Unix timestamp).&lt;/li&gt;
&lt;li&gt;The due_date must be an object with a single field ("$date").  The "$date" field must have a positive integer value (Unix timestamp).&lt;/li&gt;
&lt;li&gt;The description field should have a string value.  Nulls are not allowed.&lt;/li&gt;
&lt;li&gt;The completed field should have a boolean value.  Nulls are not allowed.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For this project, we'll define the schema using &lt;a href="https://json-schema.org/"&gt;JSON Schema&lt;/a&gt;. JSON Schema is a very powerful data modeling language.  It supports defining required fields, field types (e.g., integers, strings, etc.), whether  fields are nullable, field formats (e.g., date / times, email addresses), and field constraints (e.g., minimum or maximum values).  Objects can be defined and referenced by name, allowing recursive schema and for definitions to be reused.  Schema can be further combined using and, or, any, and not operators.  As one might expect, this expressiveness comes with a cost: the JSON Schema definition language is complex, and unfortunately, covering it is beyond the scope of this tutorial.&lt;/p&gt;




&lt;h2&gt;
  
  
  Creating a Schema and Attaching it to a Station
&lt;/h2&gt;

&lt;p&gt;Let's walk through the process of creating a schema and attaching it to a station.  You'll first need to complete the first 10 steps from &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;part 2&lt;/a&gt; and &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;part 3&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 11: Navigate to the Schemaverse Tab&lt;/strong&gt;&lt;br&gt;
Navigate to the Memphis UI in your browser.  For example, you might be able to find it at &lt;a href="https://localhost:9000/"&gt;https://localhost:9000/&lt;/a&gt; .  Once you are signed in, navigate to the Schemaverse tab:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/GQaWpA1483JxO70jj_jEaovu4kYjVSJ1HPajkRA-og0/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM21y/cnYzODl5YXZ2dTZz/YWxwYnYucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/GQaWpA1483JxO70jj_jEaovu4kYjVSJ1HPajkRA-og0/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM21y/cnYzODl5YXZ2dTZz/YWxwYnYucG5n" alt="Image description" width="512" height="241"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 12: Create the Schema&lt;/strong&gt;&lt;br&gt;
Click the "Create from blank" button to create a new schema.  Set the schema name to "todo-cdc-schema" and the schema type to "JSON schema."  Paste the following JSON Schema document into the textbox on the right.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://example.com/product.schema.json",
    "type" : "object",
    "properties" : {
        "payload" : {
            "type" : "object",
            "properties" : {
                "before" : {
                    "oneOf" : [{ "type" : "null" }, { "$ref" : "#/$defs/todoItem" }]
                },
                "after" : {
                    "oneOf" : [{ "type" : "null" }, { "$ref" : "#/$defs/todoItem" }]
                }
            },
            "required" : ["before", "after"]
        }
    },
    "required" : ["payload"],
   "$defs" : {
      "todoItem" : {
          "title": "TodoItem",
          "description": "An item in a todo checklist",
          "type" : "object",
          "properties" : {
              "_id" : {
                  "type" : "object",
                  "properties" : {
                      "$oid" : {
                          "type" : "string"
                      }
                  }
              },
              "description" : {
                  "type" : "string"
              },
              "creation_timestamp" : {
                  "type" : "object",
                  "properties" : {
                      "$date" : {
                          "type" : "integer"
                      }
                  }
              },
              "due_date" : {
                    "anyOf" : [
                        {
                            "type" : "object",
                            "properties" : {
                                "$date" : {
                                    "type" : "integer"
                                }
                            }
                        },
                        {
                            "type" : "null"
                        }
                    ]
              },
              "completed" : {
                  "type" : "boolean"
              }
          },
          "required" : ["_id", "description", "creation_timestamp", "completed"]
      }
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When done, your window should look like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/VSC2Gq451mUB1CX7rtv5gvkukWzrYrF-hspyOfoAmzI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMGI4/MWJ3eXpjbWU1d3Zp/djJya2IucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/VSC2Gq451mUB1CX7rtv5gvkukWzrYrF-hspyOfoAmzI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMGI4/MWJ3eXpjbWU1d3Zp/djJya2IucG5n" alt="schemaverse" width="800" height="520"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When done, click the "Create schema" button. Once the schema has been created, you'll be returned to the Schemaverse tab.  You should see an entry for the newly created schema like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/3PHbmWCAbaIV7KpWA7YK-U9S3UpPWX5vv7wZZJnADus/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOGNk/ZW0wOWc0NjdwOTc2/anIxOXIucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/3PHbmWCAbaIV7KpWA7YK-U9S3UpPWX5vv7wZZJnADus/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOGNk/ZW0wOWc0NjdwOTc2/anIxOXIucG5n" alt="Image description" width="800" height="227"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 13: Attach the Schema to the Station&lt;/strong&gt;&lt;br&gt;
Once the schema is created, we want to attach the schema to the "cleaned-todo-cdc-events" station. Double-click on the "todo-cdc-schema" window to bring up its details window like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/KO0X4WBr575pTdHmVF_pWiWmdPIXTM3HNMQg3z2liUA/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdmhp/aHh6czRoYm9sZzg3/NHN6dm4ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/KO0X4WBr575pTdHmVF_pWiWmdPIXTM3HNMQg3z2liUA/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdmhp/aHh6czRoYm9sZzg3/NHN6dm4ucG5n" alt="todo cdc schema" width="800" height="729"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Next, click on the "+ Attach to Station" button.  This will bring up the following window:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/d_qPYkvZ89XiSx41b0tE4V9KR--WCKZ2668oBGNxwtI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZml0/bDVrcXdncnFsMWp2/M3RlaGwucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/d_qPYkvZ89XiSx41b0tE4V9KR--WCKZ2668oBGNxwtI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZml0/bDVrcXdncnFsMWp2/M3RlaGwucG5n" alt="enforce schema" width="800" height="1268"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Select the "cleaned-todo-cdc-events" station, and click "Attach Selected."  The producers attached to the station will automatically download the schema and begin validating outgoing messages within a few minutes.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 14: Confirm that Messages are Being Filtered&lt;/strong&gt;&lt;br&gt;
Navigate to the station overview page for the "cleaned-todo-cdc-events" station.  After a couple of minutes, you should see a red warning notification icon next to the "Dead-letter" tab name.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/0qN4sRoSjuH4kt9NLSzNk7S8GB89mEWrimgKItSWBos/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM3l0/dHptdDgweDYzZmxj/eGR1MHkucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/0qN4sRoSjuH4kt9NLSzNk7S8GB89mEWrimgKItSWBos/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM3l0/dHptdDgweDYzZmxj/eGR1MHkucG5n" alt="Image description" width="800" height="494"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If you click on the "Dead-letter" tab and then the "Schema violation" subtab, you'll see the messages that failed the schema validation.  These messages have been re-routed to the dead letter queue so that they don't cause bugs in the downstream pipelines.  The window will look like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/fTY-05AibxLQA_pWfu9wHicvQOFCE38GevlRs6Si-FU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvY3M1/N2kxeDMzN283dTRo/NTNvd3YucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/fTY-05AibxLQA_pWfu9wHicvQOFCE38GevlRs6Si-FU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvY3M1/N2kxeDMzN283dTRo/NTNvd3YucG5n" alt="Image description" width="800" height="836"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations!  You're now using Schemaverse to validate messages.  This is one small but incredibly impactful step towards making your pipeline more reliable.&lt;/p&gt;




&lt;p&gt;In case you missed parts 1,2 and 3:&lt;br&gt;
&lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Part 3: Transforming MongoDB CDC Event Messages&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By RJ Nowling, Developer advocate at &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>cdc</category>
      <category>dataprocessing</category>
      <category>schemaverse</category>
    </item>
    <item>
      <title>Part 3: Transforming MongoDB CDC Event Messages</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Tue, 06 Jun 2023 10:27:59 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/part-3-transforming-mongodb-cdc-event-messages-5agc</link>
      <guid>https://community.ops.io/memphis_dev/part-3-transforming-mongodb-cdc-event-messages-5agc</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part three of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In our &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;last blog post&lt;/a&gt;, we introduced a reference implementation for capturing change data capture (CDC) events from a &lt;a href="https://www.mongodb.com/"&gt;MongoDB&lt;/a&gt; database using &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; and &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt;.  At the end of the post we noted that MongoDB records are serialized as strings in Debezium CDC messages like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "schema" : ...,

"payload" : {
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",

...
}
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We want to use the &lt;a href="https://docs.memphis.dev/memphis/memphis/schemaverse-schema-management"&gt;Schemaverse&lt;/a&gt; functionality of Memphis.dev to check messages against an expected schema.  Messages that don’t match the schema are routed to a dead letter station so that they don’t impact downstream consumers.  If this all sounds like ancient Greek, don’t worry!  We’ll explain the details in our next blog post.&lt;/p&gt;

&lt;p&gt;To use functionality like Schemaverse, we need to deserialize the MongoDB records as JSON documents.  In this blog post, we describe a modification to our MongoDB CDC pipeline that adds a transformer service to deserialize the MongoDB records to JSON documents.&lt;/p&gt;




&lt;h2&gt;
  
  
  Overview of the Solution
&lt;/h2&gt;

&lt;p&gt;The &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;previous solution&lt;/a&gt; consisted of six components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds.  Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;MongoDB&lt;/strong&gt;: Configured with a single database containing a single collection (todo_items).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debezium Server&lt;/strong&gt;: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Printing Consumer&lt;/strong&gt;: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/hR4m-wS3ynS_R11TxMLjnKWUj2qv9sxLNRMlzM04r8U/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZ3pz/ejd5NXRmNTR0cXBj/a29xaXEuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/hR4m-wS3ynS_R11TxMLjnKWUj2qv9sxLNRMlzM04r8U/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZ3pz/ejd5NXRmNTR0cXBj/a29xaXEuanBn" alt="mongocdcd example" width="800" height="255"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this iteration, we are adding two additional components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Transformer Service&lt;/strong&gt;: A &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cleaned Printing Consumer&lt;/strong&gt;: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our updated architecture looks like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/cmNTYLhYGRM8BX_DYELPW7epheSW1BRo87G_wuY6V7w/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva3Zy/eXRoZG1ibGdpdGk3/b2I0MG0uanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/cmNTYLhYGRM8BX_DYELPW7epheSW1BRo87G_wuY6V7w/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva3Zy/eXRoZG1ibGdpdGk3/b2I0MG0uanBn" alt="data flow diagram" width="800" height="333"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  A Deep Dive Into the Transformer Service
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Skeleton of the Message Transformer Service&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Our &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/cdc-transformer/cdc_transformer.py"&gt;transformer&lt;/a&gt; service uses the &lt;a href="https://github.com/memphisdev/memphis.py"&gt;Memphis.dev Python SDK&lt;/a&gt;.  Let’s walk through the transformer implementation.  The main() method of our transformer first connects to the &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev broker&lt;/a&gt;. The connection details are grabbed from environmental variables.  The host, username, password, input station name, and output station name are passed using environmental variables in accordance with suggestions from the &lt;a href="https://12factor.net/config"&gt;Twelve-Factor App manifesto&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once a connection is established, we create consumer and producer objects.  In Memphis.dev, consumers and producers have names.  These names appear in the Memphis.dev UI, offering transparency into the system operations.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The consumer API uses the &lt;a href="https://en.wikipedia.org/wiki/Callback_(computer_programming)"&gt;callback function&lt;/a&gt; design pattern. When messages are pulled from the broker, the provided function is called with a list of messages as its argument.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After setting up the callback, we kick off the asyncio event loop.  At this point, the transformer service pauses and waits until messages are available to pull from the broker.&lt;/p&gt;

&lt;h1&gt;
  
  
  Keep your main thread alive so the consumer will keep receiving data
&lt;/h1&gt;

&lt;p&gt;&lt;code&gt;await asyncio.Event().wait()&lt;/code&gt;  &lt;/p&gt;




&lt;h2&gt;
  
  
  Creating the Message Handler Function
&lt;/h2&gt;

&lt;p&gt;The create function for the message handler takes a producer object and returns a callback function. Since the callback function only takes a single argument, we use the &lt;a href="https://en.wikipedia.org/wiki/Closure_(computer_programming)"&gt;closure pattern&lt;/a&gt; to implicitly pass the producer to the msg_handler function when we create it.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;msg_handler&lt;/code&gt; function is passed three arguments when called: a list of messages, an error (if one occurred), and a context consisting of a dictionary.  Our handler loops over the messages, calls the transform function on each, sends the messages to the second station using the producer, and acknowledges that the message has been processed.  In Memphis.dev, messages are not marked off as delivered until the consumer acknowledges them.  This prevents messages from being dropped if an error occurs during processing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  The Message Transformer Function
&lt;/h2&gt;

&lt;p&gt;Now, we get to the meat of the service: the message transformer function.  Message payloads (returned by the get_data() method) are stored as &lt;a href="https://docs.python.org/3/library/stdtypes.html#bytearray"&gt;bytearray&lt;/a&gt; objects.  We use the Python json library to deserialize the messages into a hierarchy of Python collections (list and dict) and primitive types (int, float, str, and None).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We expect the object to have a payload property with an object as the value.  That object then has two properties (“before” and “after”) which are either None or strings containing serialized JSON objects.  We use the JSON library again to deserialize and replace the strings with the objects.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, we reserialize the entire JSON record and convert it back into a bytearray for transmission to the broker.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Hooray! Our objects now look like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Running the Transformer Service
&lt;/h2&gt;

&lt;p&gt;If you followed the 7 steps in the &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;previous blog post&lt;/a&gt;, you only need to run three additional steps.  to start the transformer service and verify that its working:&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 8: Start the Transformer Service
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 9: Start the Second Printing Consumer
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 10: Check the Memphis UI
&lt;/h2&gt;

&lt;p&gt;When the transformer starts producing messages to Memphis.dev, a second station named "cleaned-todo-cdc-events" will be created.  You should see this new station on the Station Overview page in the Memphis.dev UI like so:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/9cekwZgzbzuc6Gm_JfFsJZsNS3lZtqEeZHkFoWLRRoU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvNDAz/N2k4ZGN5NDllMnkz/bnNsam8ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/9cekwZgzbzuc6Gm_JfFsJZsNS3lZtqEeZHkFoWLRRoU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvNDAz/N2k4ZGN5NDllMnkz/bnNsam8ucG5n" alt="Check memphis ui" width="800" height="227"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The details page for the "cleaned-todo-cdc-events" page should show the transformer attached as a producer, the printing consumer, and the transformed messages:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/3QNzQidpI7ds3oBNx1aZLw9YOk4DzemlP-j7J9acaKY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcjN4/ZGltczM1cTJ0NWdk/ZW9qbzQucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/3QNzQidpI7ds3oBNx1aZLw9YOk4DzemlP-j7J9acaKY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcjN4/ZGltczM1cTJ0NWdk/ZW9qbzQucG5n" alt="Image description" width="800" height="494"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations!  We’re now ready to tackle validating messages using Schemaverse in our next blog post. Subscribe to our newsletter to stay tuned! &lt;/p&gt;

&lt;p&gt;Head over to &lt;a href="https://memphis.dev/blog/part-4-validating-cdc-messages-with-schemaverse/"&gt;Part 4: Validating CDC Messages with Schemaverse&lt;/a&gt; to learn further.&lt;/p&gt;




&lt;p&gt;In case you missed parts 1 &amp;amp; 2:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By RJ Nowling, Developer advocate at &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataprocessing</category>
      <category>mongodb</category>
      <category>cdc</category>
      <category>memphisdev</category>
    </item>
    <item>
      <title>An Introduction to Data Mesh</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Mon, 29 May 2023 05:41:22 +0000</pubDate>
      <link>https://community.ops.io/avital_trifsik/an-introduction-to-data-mesh-3dlk</link>
      <guid>https://community.ops.io/avital_trifsik/an-introduction-to-data-mesh-3dlk</guid>
      <description>&lt;p&gt;As more and more teams have started to look for solutions that can help them unlock the full potential of their systems and people, decentralized architectures have started to become more and more popular. Whether it’s cryptocurrencies, microservices, or Git, decentralization has proven to be an effective method of dealing with centralized bottlenecks. Along the same lines, one approach to decentralizing control of data is using a data mesh. But what really is it, and how can it help? Let’s take a closer look at the concept and go over the data mesh architecture to better understand its benefits.&lt;/p&gt;




&lt;h2&gt;
  
  
  Data challenges in enterprises
&lt;/h2&gt;

&lt;p&gt;It’s no secret that organizations have come quite a long way in their data journey. However, they still have their set of challenges that prevents them from leveraging the full benefits of data. These challenges include:  &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Trustworthiness&lt;/strong&gt;&lt;br&gt;
The traceability, quality, and observability of data demand robust implementation. It’s important to ask yourself a few important, difficult questions. These include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Can you trust the data?&lt;/li&gt;
&lt;li&gt;Is your data file complete?&lt;/li&gt;
&lt;li&gt;Do you have the latest file?&lt;/li&gt;
&lt;li&gt;Is your data source correct? &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Agility&lt;/strong&gt;&lt;br&gt;
Change is the only thing that’s constant, and that’s true for large enterprises, too. It’s very difficult for data estates to keep up with these changes, which come in the way of enterprise agility. Take report generation, for instance – it takes weeks to do that, and that’s quite a big time frame in today’s fast-paced world. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Skills&lt;/strong&gt;&lt;br&gt;
To keep up with data, the entire workforce should have specialized skills. This is because maintaining data can become quite expensive and with a lack of skills, bottlenecks are bound to be very frequent.  &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Productivity&lt;/strong&gt;&lt;br&gt;
Productivity is another data challenge. Both business and data analysts spend up to 30-40% of their time looking for the correct dataset. Similarly, data engineers spend most of their time figuring out how to create a uniform dataset using disparate sources.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Ownership&lt;/strong&gt;&lt;br&gt;
Establishing dataset ownership is also a challenge. It’s hard to determine the owner and who can be trusted enough to declare the dataset trustworthy. In most cases, the team that owns the data platform takes ownership of the data, even though it might not understand it.  &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Discoverability&lt;/strong&gt;&lt;br&gt;
Only a few organizations have been able to leverage their data estate and set up a data marketplace where their consumers can explore different datasets and understand the ones they wish to use.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is data mesh?
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/RUR3EqOVHL5TuKgf-MusAS1Uh3sGjUKx3xgWZENEoIo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdjA3/czZsaXRiYmt4ZW43/dmI5cWkuanBlZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/RUR3EqOVHL5TuKgf-MusAS1Uh3sGjUKx3xgWZENEoIo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdjA3/czZsaXRiYmt4ZW43/dmI5cWkuanBlZw" alt="What is data mesh" width="800" height="683"&gt;&lt;/a&gt;&lt;br&gt;
An overview of a data mesh (&lt;a href="https://www.montecarlodata.com/blog-what-is-a-data-mesh-and-how-not-to-mesh-it-up/"&gt;Source&lt;/a&gt;)&lt;/p&gt;

&lt;p&gt;A data mesh can be best understood as a practice or concept used to manage a large amount of data spread across a decentralized or distributed network. It can also refer to a platform responsible for this function, or even both. As companies become increasingly dependent on their ability to store volumes of data and distribute it through data pipelines and leverage from it, it’s important to create an effective schema for using that data. This is where a data mesh comes in. &lt;/p&gt;

&lt;p&gt;The idea behind a data mesh is that introducing more technology won’t help to solve the data challenges that companies face today. Instead, the only way to face those challenges is to reorganize the tools, processes, and people involved. A data mesh essentially creates a replicable method of managing different data sources across the company’s ecosystem and makes it more discoverable. At the same, it ensures consumers faster, more secure, and more efficient access to data. &lt;/p&gt;

&lt;p&gt;A data mesh includes numerous benefits. These include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Allows for decentralized data operations which improve business agility, scalability, and time-to-market.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Organizations that adopt the data mesh architecture prevent being locked into one data product or platform. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Adopts a self-service model that ensures easy access to a centralized infrastructure. This allows for faster SQL queries and data access.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Since it decentralizes data ownership, it ensures transparency across teams. (In comparison, centralized data ownership makes data teams heavily dependent upon).&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Data mesh architecture components
&lt;/h2&gt;

&lt;p&gt;The data mesh architecture involves four main components. Let’s go over them one by one.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/rNBgTyVQeC1swCOJBKfhhUXmLWKdk80MZceXB9ABtqs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMHJq/aG02MnB1cTBibTBt/NTZrOTEuanBlZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/rNBgTyVQeC1swCOJBKfhhUXmLWKdk80MZceXB9ABtqs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMHJq/aG02MnB1cTBibTBt/NTZrOTEuanBlZw" alt="data mesh principals" width="800" height="481"&gt;&lt;/a&gt;&lt;br&gt;
4 data mesh principles (&lt;a href="https://www.datanami.com/2022/01/21/data-meshes-set-to-spread-in-2022/"&gt;Source&lt;/a&gt;)&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Decentralized data ownership&lt;/strong&gt;&lt;br&gt;
This architecture component mainly revolves around the people involved and calls for the remodeling of the monolith data structure by decentralizing analytical data and realigning its ownership from a central team to a domain team. &lt;/p&gt;

&lt;p&gt;In a data mesh, a domain team that’s extremely familiar with the data asset is responsible for curating it, ensuring high-quality data administration and governance. In contrast, in a data warehouse antipattern, a generalist team is responsible for managing all the data of the organization and is usually focused on the technical aspect of the data warehouse instead of the quality of the data.&lt;/p&gt;

&lt;p&gt;So organizations implementing a data mesh must define which data set is owned by which domain team. In addition to that, all the teams should be quick to make changes to maintain their mesh’s data quality. By making domain-centric accountability possible, decentralized data ownership solves many problems related to agility, ownership, and productivity. &lt;/p&gt;

&lt;p&gt;For instance, organizations take a while to respond to the market since changes have to be made to many IT systems to make any business change. This is why unaligned priorities and poor coordination across the team hinder enterprise agility. Considering the rapid growth in data sources and the proliferating business use cases, central teams have become nothing more than bottlenecks. However, going from a monolithic architecture to domain-driven microservices has made operational systems more agile. And a data mesh can do the same for analytical data.     &lt;/p&gt;

&lt;p&gt;Data consumers usually spend their time finding the data owner, determining its traceability, and interpreting its meaning. As a result, the overall productivity of teams is reduced. However, decentralization brings both the analytical and operational world closer and establishes traceability, ownership, and a clear interpretation, thus improving the teams’ turnaround times.&lt;/p&gt;

&lt;p&gt;And finally, ownership; in most cases, data owners aren’t known, making the IT teams responsible for the ETL the owners of the data. Central IT teams often act as intermediaries – they pass consumer requests to producers and aren’t considered owners because they don’t produce the data and neither do they understand it. Realigning the ownership of analytical data to the right domains can solve the problem since these domains are the producers of data and can understand it, too. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data as a product&lt;/strong&gt;&lt;br&gt;
With domains identified and ownership established, the next step is to stop thinking of analytical data as an asset that must be stored and instead think of it as a product that must be served. Teams responsible for a data mesh publish data so that other teams, i.e., their internal customers, can benefit from it. &lt;/p&gt;

&lt;p&gt;This is why domains need to stop considering analytical data as a by-product of business operations and think of it as a first-class product complete with dedicated owners that are responsible for its usability, discoverability, uptime, and quality and treat it just like any other business service. As such, they should also apply the different product development aspects to make it customer-focused, reliable, useful, and valuable. You can think of data products published by the teams responsible for a data mesh as microservices; the only difference is that data is on offer.     &lt;/p&gt;

&lt;p&gt;Thinking of data as a product solves problems related to productivity, agility, discoverability, and trustworthiness. The productivity of a data consumer automatically increases as trustworthiness, discoverability, and agility come into the equation. Let’s see how.&lt;/p&gt;

&lt;p&gt;A data product is essentially an autonomous unit with its own release cycles and feature roadmap. This means that data teams don’t need to wait for a central team to provide some environment or data so that they can start working. In turn, establishing traceability and authenticity hardly takes time. Similarly, rework to align the SLOs (service level objectives) of the input dataset with that of the use-case takes relatively less time.&lt;/p&gt;

&lt;p&gt;And with data ownership assigned to domains, the product owner (of the data) is responsible for the data product. This means that the product owner should make sure that the data product’s security, traceability, and quality are maintained and also reported via SLOs and the right metrics. &lt;/p&gt;

&lt;p&gt;And finally, by thinking of data as a product, each product is self-explanatory and is advertised and cataloged on the organization’s data marketplaces. The relevant documentation outlines different usability topics and explains the relationship with other SLOs and data products. As a result, consumers get to enjoy full visibility of the data product, which, in turn, allows them to make a well-informed decision about its use.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Self-serve platform&lt;/strong&gt;&lt;br&gt;
Even though thinking of data as a product has numerous benefits, it might end up increasing the overall operation cost because it involves many small but highly skilled teams and numerous independent infrastructures. Plus, if these highly skilled teams aren’t properly optimized, the operating cost will go up further. This is where the third component of the data mesh architecture comes into play – a self-serve platform.&lt;/p&gt;

&lt;p&gt;Although a data mesh revolves around the idea of decentralized data management, one of its most important aspects is a centralized location or a central data infrastructure that can facilitate the data product lifecycle, where all the members of the organization can easily find the datasets they require. This central infrastructure should support tenancy so that it facilitates autonomy. It should also be self-serve, and provide multiple out-of-the-box tools.  &lt;/p&gt;

&lt;p&gt;Historical, as well as real-time, should be available, and there should be some automated way of accessing data. While there are no plug-and-play tools that fulfill this principle, it can be accomplished via a wiki, a UI, or an API. &lt;/p&gt;

&lt;p&gt;The important thing is that self-serve tools should be thoughtfully built and must reduce the cognitive load on the data product teams. They should also bring abstraction over the lower-level technical components to allow for data product standardization and faster development. Another important part of self-service is data product management, which includes removing, adding, and updating data products. Plus, management and entry should be as easy as possible to make usage easy.&lt;/p&gt;

&lt;p&gt;Just like other components, the self-serve platform also solves a number of problems related to skills, cost of ownership, and agility. Since a self-serve platform takes away technical complexity, there’s less need for specialists and generalists are enough to serve the purpose. As a result, there’s no need to invest in a highly skilled team. The cost of ownership also reduces in terms of the infrastructure, since it’s centrally provisioned. And finally, autonomous data product teams can directly use the self-service platform; they don’t need to rely on the central infrastructure team to provide them with infrastructure resources and data. This speeds up the development cycle.     &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Federated computational governance&lt;/strong&gt;&lt;br&gt;
The three data mesh architecture principles discussed above solve most of the data challenges faced by organizations. However, since most data products operate across different domains, how can you harmonize data? The answer to this lies in the last architecture component: federated computational government, which is a big change from how traditional central governance is enforced. The former changes the way teams are organized and the way the infrastructure supports governance. In federated governance, a data product owner manages different aspects such as local access policies, data modeling, data quality, etc. This is a big shift from implementing canonical data to models to smaller ones specifically built to meet the needs of the data product.&lt;/p&gt;

&lt;p&gt;Governance should be divided into two: local and global governance. The former is local to the data product, defines the local processes, frameworks, and governance policies, and is responsible for their implementation and adherence. This is a step away from central governing bodies that created policies and were responsible for validation and adherence. &lt;/p&gt;

&lt;p&gt;Meanwhile, global governance involves a cross-functional body with experts in different specializations such as technology, legal, security, and infrastructure and is responsible for formulating policies. The local governing body is responsible for implementation as well as constant adherence.  &lt;/p&gt;

&lt;p&gt;To sum up, with federated governance applied to your data mesh, teams can always use data available to them from different domains. &lt;/p&gt;

&lt;p&gt;All these four principles are important to implement a data mesh in an organization. Of course, the degree of implementation can differ, but each principle has its own benefits and overcomes the drawbacks of others. Just keep in mind that the bigger the mesh, the more value you can generate from the data. &lt;/p&gt;




&lt;p&gt;&lt;a href="https://memphis.dev/newsletter"&gt;Join 4500+ others and sign up for our data engineering newsletter&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/"&gt;memphis.dev&lt;/a&gt; by &lt;a href="https://twitter.com/memphisveta"&gt;Sveta Gimpelson&lt;/a&gt; Co-founder &amp;amp; VP of Data &amp;amp; Research at Memphis.dev.&lt;/p&gt;

</description>
      <category>datamesh</category>
      <category>data</category>
    </item>
    <item>
      <title>Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Sun, 28 May 2023 09:42:14 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphisdev-5gon</link>
      <guid>https://community.ops.io/memphis_dev/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphisdev-5gon</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part two of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In our &lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;last blog post&lt;/a&gt;, we introduced a reference implementation for capturing change data capture (CDC) events from a PostgreSQL database using Debezium Server and Memphis.dev. By replacing Apache Kafka with Memphis.dev, the solution substantially reduced the operational resources and overhead – saving money and freeing developers to focus on building new functionality.&lt;/p&gt;

&lt;p&gt;PostgreSQL is the only commonly used database, however. Debezium provides connectors for a range of databases, including the non-relational document database MongoDB. MongoDB is popular with developers, especially those working in dynamic programming languages since it avoids the object-relational impedance mismatch. Developers can directly store, query, and update objects in the database.&lt;/p&gt;

&lt;p&gt;In this blog post, we demonstrate how to adapt the CDC solution to MongoDB.&lt;/p&gt;




&lt;h2&gt;
  
  
  Overview of the Solution
&lt;/h2&gt;

&lt;p&gt;Here, we describe the architecture of the reference solution for delivering change data capture events with &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev&lt;/a&gt;. The architecture has not changed from &lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;our previous blog&lt;/a&gt; post except for the replacement of PostgreSQL with MongoDB.&lt;/p&gt;

&lt;p&gt;A Todo Item generator script writes randomly-generated records to MongoDB. &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; receives CDC events from MongoDB and forwards them to the &lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;Memphis REST gateway&lt;/a&gt; through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis.dev for new messages and prints them to the console.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds. Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;MongoDB&lt;/strong&gt;: Configured with a single database containing a single collection (todo_items).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Debezium Server&lt;/strong&gt;: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice)&lt;/li&gt;
&lt;li&gt;P*&lt;em&gt;rinting Consumer&lt;/em&gt;*: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/bnjb2imYv56Xp_vm6aVdOL09ExPx5kEG7Qd-Kj8pmnI/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzVxN2wy/YzlucGhvYW5xZDk5/MHdsLmpwZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/bnjb2imYv56Xp_vm6aVdOL09ExPx5kEG7Qd-Kj8pmnI/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzVxN2wy/YzlucGhvYW5xZDk5/MHdsLmpwZw" alt="Image description" width="800" height="255"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Getting Started
&lt;/h2&gt;

&lt;p&gt;The implementation tutorial is available in the mongodb-debezium-cdc-example directory of the &lt;a href="https://github.com/memphisdev/memphis-example-solutions"&gt;Memphis Example Solutions&lt;/a&gt; repository. &lt;a href="https://docs.docker.com/compose/"&gt;Docker Compose&lt;/a&gt; will be needed to run it.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Running the Implementation&lt;/strong&gt;&lt;br&gt;
Build the Docker images for Debezium Server, the printing consumer, and database setup (table and user creation).&lt;/p&gt;

&lt;p&gt;Currently, the implementation depends on a pre-release version of &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; for the JWT authentication support. A Docker image will be built directly from the main branch of the Debezium and Debezium Server repositories. Note that this step can take quite a while (~20 minutes) to run. When Debezium Server 2.3.0 is released, we will switch to using the upstream Docker image.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 1: Build the Images&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose build --pull --no-cache
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 2: Start the Memphis.dev Broker and REST Gateway&lt;/strong&gt;&lt;br&gt;
Start the &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev broker&lt;/a&gt; and &lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;REST gateway&lt;/a&gt;. Note that the memphis-rest-gateway service depends on the memphis broker service, so the broker service will be started as well.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d memphis-rest-gateway
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 3: Create a Station and Corresponding User in Memphis.dev&lt;/strong&gt;&lt;br&gt;
Messages are delivered to “stations” in Memphis.dev; they are equivalent to “topics” used by message brokers. Point your browser at &lt;a href="http://localhost:9000/"&gt;http://localhost:9000/&lt;/a&gt;. Click the “sign in with root” link at the bottom of the page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/MndvPXug35D9mBqE5XeHZG2Q9UCWTR6qb8f5X8gDMQc/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2U2Y3Nq/Y3JhZ3lncDVsbjVv/NDgwLnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/MndvPXug35D9mBqE5XeHZG2Q9UCWTR6qb8f5X8gDMQc/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2U2Y3Nq/Y3JhZ3lncDVsbjVv/NDgwLnBuZw" alt="Image description" width="800" height="581"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Log in with root (username) and memphis (password).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/3QytH7KP0otqsy2KCJvgOCVqw13mJ5KLXNePpyXOH-U/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL3oxZ3N0/d243Zml0eWhmdjMw/c3F5LnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/3QytH7KP0otqsy2KCJvgOCVqw13mJ5KLXNePpyXOH-U/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL3oxZ3N0/d243Zml0eWhmdjMw/c3F5LnBuZw" alt="Image description" width="800" height="522"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow the wizard to create a station named todo-cdc-events.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/YDYGJUfrdmLIwAdNTMJEAvxpP2JELA4wc7isXM3QcvY/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2s5YjV5/Zm5sd284M21teXFj/bDNwLnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/YDYGJUfrdmLIwAdNTMJEAvxpP2JELA4wc7isXM3QcvY/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2s5YjV5/Zm5sd284M21teXFj/bDNwLnBuZw" alt="Image description" width="800" height="582"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Create a user named todocdcservice with the same value for the password.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/_okHUwuF_6AS8snutH5DzkasJZEhZseOfnnRowThclk/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzYybTQ4/am9jNWxtdXkxdmwy/dWlrLnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/_okHUwuF_6AS8snutH5DzkasJZEhZseOfnnRowThclk/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzYybTQ4/am9jNWxtdXkxdmwy/dWlrLnBuZw" alt="Image description" width="768" height="833"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click “next” until the wizard is finished:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/tWI9iE9OHjmddyzoUjwxpqgDn8DvGFxkfFiZB0Re6k0/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzJrZ2Y0/anAweTZiMGwyeGJm/a3A3LnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/tWI9iE9OHjmddyzoUjwxpqgDn8DvGFxkfFiZB0Re6k0/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzLzJrZ2Y0/anAweTZiMGwyeGJm/a3A3LnBuZw" alt="Image description" width="768" height="622"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click “Go to station overview” to go to the station overview page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/8KRljnuNeJqQAhAGgwx5dxT2tjx6iQdp6PZU9axVxBo/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2RsbXN5/dTF0YXNxdnNtbXRk/cWh1LnBuZw" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/8KRljnuNeJqQAhAGgwx5dxT2tjx6iQdp6PZU9axVxBo/w:800/mb:500000/ar:1/aHR0cHM6Ly9jb21t/dW5pdHkub3BzLmlv/L3JlbW90ZWltYWdl/cy91cGxvYWRzL2Fy/dGljbGVzL2RsbXN5/dTF0YXNxdnNtbXRk/cWh1LnBuZw" alt="Image description" width="768" height="465"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 4: Start the Printing Consumer&lt;/strong&gt;&lt;br&gt;
We used the &lt;a href="https://github.com/memphisdev/memphis.py"&gt;Memphis.dev Python SDK&lt;/a&gt; to create a &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/printing-consumer/test_consumer.py"&gt;consumer script&lt;/a&gt; that polls the todo-cdc-events station and prints the messages to the console.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d printing-consumer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started                                                            1.4s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 5: Starting and Configuring MongoDB&lt;/strong&gt;&lt;br&gt;
To capture changes, &lt;a href="https://www.mongodb.com/"&gt;MongoDB&lt;/a&gt;’s &lt;a href="https://www.mongodb.com/docs/manual/replication/"&gt;replication&lt;/a&gt; functionality must be enabled. There are several steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The replica set name must be set. This can be done by &lt;a href="https://www.mongodb.com/docs/manual/tutorial/deploy-replica-set-for-testing/#std-label-server-replica-set-deploy-test"&gt;passing the name of a replica set&lt;/a&gt; on the command-line or in the configuration file. In the &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/docker-compose.yaml#L10"&gt;Docker Compose file&lt;/a&gt;, we run MongoDB with the command-line argument –replSet rs0 to set the replica set name.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;When replication is used and authorization is enabled, a common key file must be provided to each replica instance. We generated a key file following the &lt;a href="https://www.mongodb.com/docs/manual/tutorial/enforce-keyfile-access-control-in-existing-replica-set/"&gt;instructions&lt;/a&gt; in the MongoDB documentation. We then &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/mongodb/Dockerfile"&gt;built an image&lt;/a&gt; that extends the official MongoDB image by including the key file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The replica set needs to be initialized once MongoDB is running. We &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/database-setup/database_setup.py"&gt;use a script&lt;/a&gt; that configures the instance on startup. The script calls the &lt;a href="https://www.mongodb.com/docs/manual/reference/command/replSetInitiate/"&gt;replSetInitiate&lt;/a&gt; command with a list of the IP addresses and ports of each MongoDB instance in the replica set. This command causes the MongoDB instances to communicate with each other and select a leader.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Generally speaking, replica sets are used for increased reliability (high availability). Most documentation that you’ll find describes how to set up a replica set with multiple MongoDB instances. In our case, Debezium’s MongoDB connector piggybacks off of the replication functionality to capture data change events. Although we go through the steps to configure a replica set, we only use one MongoDB instance.&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/mongodb-debezium-cdc-example/todo-generator/todo_generator.py"&gt;todo item generator script&lt;/a&gt; creates a new todo item every half second. The field values are randomly generated. The items are added to a MongoDB collection named “todo_items.”&lt;/p&gt;

&lt;p&gt;In the Docker Compose file, the todo item generator script is configured to depend on the Mongodb instance running in a healthy state and successful completion of the database setup script. By starting the todo item generator script, Docker Compose will also start MongoDB and run the database setup script.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d todo-generator
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started                                                                                     9.1s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 6: Start the Debezium Server&lt;/strong&gt;&lt;br&gt;
The last service that needs to be started is the &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt;. The server is configured with a source connector for MongoDB and the HTTP Client sink connector through a &lt;a href="https://en.wikipedia.org/wiki/.properties"&gt;Java properties file&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Most of the options are self-explanatory. The HTTP client sink URL is worth explaining in detail. &lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;Memphis.dev REST gateway&lt;/a&gt; expects to receive POST requests with a path in the following format:&lt;br&gt;
&lt;em&gt;/stations/{station}/produce/{quantity}&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The {station} placeholder is replaced with the name of the station to send the message to. The {quantity} placeholder is replaced with the value single (for a single message) or batch (for multiple messages).&lt;/p&gt;

&lt;p&gt;The message(s) is (are) passed as the payload of the POST request. The REST gateway supports three message formats (plain text, JSON, or protocol buffer). The value (text/, application/json, application/x-protobuf) of the content-type header field determines how the payload is interpreted.&lt;/p&gt;

&lt;p&gt;The Debezium Server’s HTTP Client sink produces REST requests that are consistent with these patterns. Requests use the POST verb, each request contains a single JSON-encoded message as the payload, and the content-type header set to application/json. We use todo-cdc-events as the station name and the single quantity value in the endpoint URL to route messages and indicate how the REST gateway should interpret the requests:&lt;/p&gt;

&lt;p&gt;&lt;a href="http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single"&gt;http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The debezium.sink.http.authentication.type=jwt property indicates that the HTTP Client sink should use JWT authentication. The username and password properties are self-evident, but the debezium.sink.http.authentication.jwt.url property deserves some explanation. An initial token is acquired using the /auth/authenticate endpoint, while the authentication is refreshed using the separate /auth/refreshToken endpoint. The JWT authentication in the HTTP Client appends the appropriate endpoint to the given base URL.&lt;/p&gt;

&lt;p&gt;Debezium Server can be started with the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d debezium-server
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 7: Confirm the System is Working&lt;/strong&gt;&lt;br&gt;
Check the todo-cdc-events station overview screen in Memphis.dev web UI to confirm that the producer and consumer are connected and messages are being delivered.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/W1xjlIeVCxRyBSGoQE_FxHDxrdYB4xp0YAYi3BXpGmE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZjRj/d3RrOGRuMzZhaHdu/ZTJpMHUucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/W1xjlIeVCxRyBSGoQE_FxHDxrdYB4xp0YAYi3BXpGmE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZjRj/d3RrOGRuMzZhaHdu/ZTJpMHUucG5n" alt="Image description" width="800" height="454"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;And, print the logs for the printing-consumer container:

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Format of the CDC Messages
&lt;/h2&gt;

&lt;p&gt;The incoming messages are formatted as JSON. The messages have two top-level fields (schema and payload). The schema describes the record schema (field names and types), while the payload describes the change to the record. The payload object itself contains two fields (before and after) indicating the value of the record before and after the change.&lt;/p&gt;

&lt;p&gt;For MongoDB, Debezium Server encodes the record as a string of serialized JSON:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will have implications on the downstream processing of messages, which we will describe in a future blog post in this series.&lt;/p&gt;




&lt;p&gt;Congratulations! You now have a working example of how to capture data change events from a MongoDB database using Debezium Server and transfer the events to Memphis.dev for downstream processing.&lt;/p&gt;

&lt;p&gt;Head over to &lt;a href="https://memphis.dev/blog/part-3-transforming-mongodb-cdc-event-messages/"&gt;Part 3: Transforming MongoDB CDC Event Messages&lt;/a&gt; to learn further.&lt;/p&gt;




&lt;p&gt;In case you missed part 1:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events&lt;br&gt;
&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mongodb</category>
      <category>cdc</category>
      <category>memphisdev</category>
    </item>
    <item>
      <title>How to reduce your data traffic by 30% instantly</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Wed, 24 May 2023 14:52:11 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/how-to-reduce-your-data-traffic-by-30-instantly-547b</link>
      <guid>https://community.ops.io/memphis_dev/how-to-reduce-your-data-traffic-by-30-instantly-547b</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;The bigger the traffic, the bigger the latency and the higher the cost.&lt;br&gt;
It seems that the global economic situation grounded us a bit and made us go back to basics when every byte of memory counted and every unnecessary line of code was removed. Besides higher latency which usually drives pouring better computing and more costs, we tend to forget we're paying a &lt;strong&gt;huge amount&lt;/strong&gt; of money for the amount of transferred data as well, especially around communication between services.&lt;/p&gt;



&lt;p&gt;&lt;strong&gt;Let’s take a look at the following scenario&lt;/strong&gt;- &lt;br&gt;
A single, shared EC2 instance, 2 CPUs with 4GB RAM,&lt;br&gt;
processing 10,000 requests of 1KB per day -&amp;gt; 300,000 on an average month -&amp;gt; 292 GB of transferred data.&lt;/p&gt;

&lt;p&gt;If we run those minor numbers with an AWS EC2 calculator, we would get the following invoice:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Compute monthly cost is: $33.29
+&lt;/li&gt;
&lt;li&gt;Data transfer cost:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;If that 292 GBs is transferred within a region, it will cost &lt;strong&gt;$5.84 (14% of the total invoice)&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;If that 292 GBs is transferred back to the internet, it will cost &lt;strong&gt;$26.28 (44% of the total invoice)&lt;/strong&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;h2&gt;
  
  
  Popular formats
&lt;/h2&gt;

&lt;p&gt;Usually, services communicate with each other using one of the following&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;- JSON.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;JSON stands for JavaScript Object Notation and is a text format for storing and transporting data.&lt;/p&gt;

&lt;p&gt;When using JSON to send a message, we’ll first have to serialize the object representing the message into a JSON-formatted string, then transmit.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;{"sensorId": 32,"sensorValue": 24.5}&lt;br&gt;
&lt;/code&gt;&lt;br&gt;
This string is 36 characters long, but the information content of the string is only 6 characters long. This means that about 16% of transmitted data is actual data, while the rest is metadata. The ratio of useful data in the whole message is increased by decreasing key length or increasing value size, for example, when using a string or array.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;- Protobuf.&lt;/strong&gt;&lt;br&gt;
Protocol Buffers are a language-neutral, platform-neutral extensible mechanism for serializing structured data.&lt;/p&gt;

&lt;p&gt;Protocol Buffers use a binary format to transfer messages.&lt;br&gt;
Using Protocol Buffers in your code is slightly more complicated than using JSON.&lt;/p&gt;

&lt;p&gt;The user must first define a message using the .proto file. This file is then compiled using Google’s protoc compiler, which generates source files that contain the Protocol Buffer implementation for the defined messages.&lt;/p&gt;

&lt;p&gt;This is how our message would look in the .proto definition file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;message TemperatureSensorReading {
    optional uint32 sensor_id = 1;
    optional float sensor_value = 2;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When serializing the message from the example above, it’s only 7 bytes long. This can be confusing initially because we would expect uint32 and float to be 8 bytes long when combined. However, Protocol Buffers won’t use all 4 bytes for uint32 if they can encode the data in fewer bytes. In this example, the sensor_id value can be stored in 1 byte. It means that in this serialized message, 1 byte is metadata for the first field, and the field data itself is only 1 byte long. The remaining 5 bytes are metadata and data for the second field; 1 byte for metadata and 4 bytes for data because float always uses 4 bytes in Protocol Buffers. This gives us 5 bytes or 71% of actual data in a 7-byte message.&lt;/p&gt;

&lt;p&gt;The main difference between the two is that JSON is just text, while Protocol Buffers are binary. This difference has a crucial impact on the size and performance of moving data between different devices.&lt;/p&gt;




&lt;h2&gt;
  
  
  Benchmark
&lt;/h2&gt;

&lt;p&gt;In this benchmark, we will take the same message structure and examine the size difference, as well as the network performance.&lt;/p&gt;

&lt;p&gt;We used Memphis schemaverse to act as our destination and a simple Python app as the sender.&lt;/p&gt;

&lt;p&gt;The gaming industry use cases are usually considered to have a large payload, and to demonstrate the massive savings between the different formats. We used one of “Blizzard” example schemas. &lt;br&gt;
The full used .proto can be found &lt;a href="https://github.com/Blizzard/s2client-proto/blob/01ab351e21c786648e4c6693d4aad023a176d45c/s2clientprotocol/sc2api.proto#L359"&gt;here&lt;/a&gt;. &lt;br&gt;
Each packet weights &lt;strong&gt;959.55KiB&lt;/strong&gt; on average&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/zp-nKtqzp8iYYGIL3vDWv07pCzfesATlrLiWLckaHp0/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvanow/eWo1Mmx0cXFhbXdz/eHJlOGwucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/zp-nKtqzp8iYYGIL3vDWv07pCzfesATlrLiWLckaHp0/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvanow/eWo1Mmx0cXFhbXdz/eHJlOGwucG5n" alt="Image description" width="512" height="195"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As can be seen, the average savings are between 618.19% to 807.93%!!!&lt;/p&gt;

&lt;p&gt;Another key aspect to take under consideration would be the additional step of &lt;strong&gt;Serialization/Deserialization&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;One more key aspect to take under consideration is the serialization function and its potential impact on performance, as it is, in fact, a similar action to compression.&lt;/p&gt;

&lt;p&gt;Quick side-note. Memphis Schemaverse eliminates the need to implement Serialization/Deserialization functions, but behind the scenes, a set of conversion functions will take place.&lt;/p&gt;

&lt;p&gt;Going back to serialization. The tests were performed using a Macbook Pro M1 Pro and 16 GB RAM using Google.Protobuf.JsonFormatter and Google.Protobuf.JsonParser Python serialization/deserialization used google.protobuf.json_format&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/DYvEe5xcO4TfJVk4bPa5hgYtA1BUSmMqDyawWYIIAgU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva24w/bWFqcTJlOGR4NXFo/YnFqY3YucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/DYvEe5xcO4TfJVk4bPa5hgYtA1BUSmMqDyawWYIIAgU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva24w/bWFqcTJlOGR4NXFo/YnFqY3YucG5n" alt="Tables" width="795" height="503"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;This comparison is not about articulating which is better. Both formats have their own strengths, but if we go from “the end” and ask ourselves what are the most important parameters for our use case, and both low latency and small footprint are there, then protobuf would be a suitable choice.&lt;br&gt;
If the added complexity is a drawback, I highly recommend checking out &lt;a href="https://docs.memphis.dev/memphis/memphis/schemaverse-schema-management#meet-schemaverse"&gt;Schemaverse&lt;/a&gt; and how it eliminates most of the heavy lifting when JSON is the used format, but the benefits of protobuf are appealing. &lt;/p&gt;




&lt;p&gt;Resources&lt;br&gt;
&lt;a href="https://infinum.com/blog/json-vs-protocol-buffers/"&gt;https://infinum.com/blog/json-vs-protocol-buffers/&lt;/a&gt;&lt;br&gt;
&lt;a href="https://levelup.gitconnected.com/protobuf-a-high-performance-data-interchange-format-64eaf7c82c0d"&gt;https://levelup.gitconnected.com/protobuf-a-high-performance-data-interchange-format-64eaf7c82c0d&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/"&gt;memphis.dev&lt;/a&gt; by &lt;a href="https://twitter.com/memphisveta"&gt;Sveta Gimpelson&lt;/a&gt; Co-founder &amp;amp; VP of Data &amp;amp; Research at Memphis.dev.&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>rawdata</category>
      <category>datatraffic</category>
      <category>protobuf</category>
      <category>json</category>
    </item>
    <item>
      <title>Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Thu, 18 May 2023 08:17:12 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/part-1-integrating-debezium-server-and-memphisdev-for-streaming-change-data-capture-cdc-events-1ia1</link>
      <guid>https://community.ops.io/memphis_dev/part-1-integrating-debezium-server-and-memphisdev-for-streaming-change-data-capture-cdc-events-1ia1</guid>
      <description>&lt;p&gt;&lt;em&gt;This is part one of a series of blog posts on building a modern event-driven system using Memphis.dev.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://en.wikipedia.org/wiki/Change_data_capture"&gt;Change data capture (CDC)&lt;/a&gt; is an architectural pattern which turns databases into sources for event-driven architectures. Frequently, CDC is implemented on top of built-in replication support. Changes to data (e.g., caused by INSERT, UPDATE, and DELETE statements) are recorded as atomic units and appended to a replication log for transmission to replicas. CDC software copies the events from the replica logs to streaming infrastructure for processing by downstream components.&lt;/p&gt;

&lt;p&gt;So what do CDC events look like? In this tutorial, we’ll use the example of a table of todo items with the following fields:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/AVINdpHeExA17ADCSMG6nh-ot2KLDZD0h_iEVIue0DQ/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOGU2/bzAxdGRrODNvOHI4/NWUzMzYucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/AVINdpHeExA17ADCSMG6nh-ot2KLDZD0h_iEVIue0DQ/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOGU2/bzAxdGRrODNvOHI4/NWUzMzYucG5n" alt="Table" width="800" height="319"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A null value for the due date signifies that there is no due date.&lt;/p&gt;

&lt;p&gt;If a user creates a todo item to buy milk from the store, the corresponding CDC event would look like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
“before” : null,

“after” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: False }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If the user then completes (updates) the todo item, the following CDC event would be generated:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
“before” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: False },

“after” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: True }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If the user deletes, the original item, the CDC event will look like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
“before” : {
     “id” : 25,
     “description” : “buy milk”,
     “creation_timestamp” : “2023-05-01T16:32:15”,
     “due_date” : “2023-05-02”,
     “completed”: False },

“after” : null
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The CDC approach is used to support various data analyses that are run in near real-time:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Copying data from one database to another&lt;/strong&gt;. Modern systems often incorporate multiple storage solutions chosen because of their optimizations for complementary workloads. For example, online transaction processing (OLTP) databases like &lt;a href="https://www.postgresql.org/"&gt;PostgreSQL&lt;/a&gt; are designed to support many concurrent users each performing queries that touch a small amount of data. Online analytical processing (OLAP) databases like &lt;a href="https://clickhouse.com/"&gt;Clickhouse&lt;/a&gt; are optimized to handle a small number of queries touching a large amount of data. The CDC approach doesn’t require schema changes (e.g., adding update timestamps) and is less resource intensive than approaches like running periodic queries to find new or changed records.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Performing real-time data integration&lt;/strong&gt;. Some tasks require that data be pulled from multiple data sets and integrated. For example, a user’s clickstream (page view) events may be changed with details of the products they’re browsing to feed a machine learning model. Performing these joins in the production OLTP databases reduces application responsiveness. CDC allows computational heavy actions to be run on dedicated subsystems.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Performing aggregations or window analyses&lt;/strong&gt;. An OLTP database may only log events such as commercial transactions. Business analysts may want to see the current sum of sales in a given quarter, however. The events captured through CDC can be aggregated in real-time to update dashboards and other data applications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Performing de-aggregations&lt;/strong&gt;. For performance reasons, an OLTP database may only store the current state of data like counters. For example, a database may store the number of likes or views of social media posts. Machine learning models often need individual events, however. CDC generates an event for every increase or decrease in the counters, effectively creating a historical time series for downstream analyses.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Implementing the CDC Pattern
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://debezium.io/"&gt;Debezium&lt;/a&gt; is a popular open-source tool for facilitating CDC. Debezium provides connectors for various open-source and proprietary databases. The connectors listen for data change events and convert the internal representations to common formats such as JSON and Avro. Additionally, some support is provided for filtering and transforming events.&lt;/p&gt;

&lt;p&gt;Debezium was originally designed as connectors that run in the &lt;a href="https://kafka.apache.org/documentation/#connect"&gt;Apache Kafka Connect&lt;/a&gt; framework. Apache Kafka, unfortunately, has a pretty large deployment footprint for production setups. It is &lt;a href="https://docs.confluent.io/platform/current/kafka/deployment.html"&gt;recommended&lt;/a&gt; that a minimal production deployment has at least 3 nodes with 64 GB of RAM and 24 cores with storage configured with RAID 10 and at least 3 additional nodes for a separate Apache Zookeeper cluster. Meaning, a minimal production setup of Apache Kafka requires at least 6 nodes. Further, the JVM and operating system need to be &lt;a href="https://docs.confluent.io/platform/current/kafka/deployment.html#cp-production-parameters"&gt;tuned&lt;/a&gt; significantly to achieve optimal performance.&lt;/p&gt;

&lt;p&gt;Many cloud-native systems are divided into microservices that are designed to scale independently. Rather than relying on one large message broker cluster, it’s common for these systems to deploy multiple, small, independent clusters. &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev&lt;/a&gt; is a next-generation, cloud-native message broker with a low resource footprint, minimal operational overhead, and no required performance tuning.&lt;/p&gt;

&lt;p&gt;Debezium recently announced the general availability of Debezium Server, a framework for using Debezium connectors without Apache Kafka. &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt; runs in a standalone mode. Sink connectors for a wide range of messaging systems are included out of the box.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/8XC6DH1JWM16s_jbl1z4jJ452fZN2kthPskw44t0L50/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvODF2/dnBmZmc3Y2ZjNGZx/aXk3dTguanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/8XC6DH1JWM16s_jbl1z4jJ452fZN2kthPskw44t0L50/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvODF2/dnBmZmc3Y2ZjNGZx/aXk3dTguanBn" alt="CDC-PATTERN" width="800" height="423"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this tutorial, we’ll demonstrate how to implement the CDC pattern for PostgreSQL using Debezium Server and Memphis.dev.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Collaborative Power of Open Source: Interfacing Debezium Server and Memphis.dev
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev&lt;/a&gt; and Debezium Server are integrated using REST. &lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;The Memphis.dev REST gateway&lt;/a&gt; provides endpoints for consuming messages, while Debezium Server provides the HTTP client sink for transmitting messages via REST. In our reference solution, Debezium Server makes a POST request to /station/todo-cdc-events/produce/single for each message. The REST interface accepts messages in JSON, text, and binary Protocol Buffer formats.&lt;/p&gt;

&lt;p&gt;Unfortunately, we hit a stumbling block while implementing our CDC solution. The Memphis.dev REST gateway uses JSON Web Tokens (JWT) authentication for security, but Debezium Server’s HTTP client didn’t support it. Thanks to the collaborative power of open source, we were able to work with the Debezium developers to &lt;a href="https://github.com/debezium/debezium-server/pull/20"&gt;add JWT authentication functionality&lt;/a&gt;. The user must specify a username, password, and authentication endpoint URL in the Debezium Server configuration file. The server then tracks its authentication state and makes REST requests to perform an initial authorization and refresh that authorization as needed.&lt;/p&gt;

&lt;p&gt;With the JWT authentication now in place, Debezium Server can forward CDC events to Memphis.dev. Further, all Debezium Server users, whether or not they are using Memphis.dev, can benefit from this functionality.&lt;/p&gt;




&lt;h2&gt;
  
  
  Overview of the Solution
&lt;/h2&gt;

&lt;p&gt;Here, we describe a reference solution for delivering change data capture events with &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev&lt;/a&gt;. &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server’s&lt;/a&gt; HTTP client sink is used to send the CDC events from a &lt;a href="https://www.postgresql.org/"&gt;PostgreSQL&lt;/a&gt; database to a Memphis.dev instance using the M&lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;emphis REST gateway&lt;/a&gt;. Our solution has six components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Todo Item Generator&lt;/strong&gt;: Inserts a randomly-generated todo item in the PostgreSQL table every 0.5 seconds. Each todo item contains a description, creation timestamp, optional due date, and completion status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;PostgreSQL&lt;/strong&gt;: Configured with a single database containing a single table (todo_items).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debezium Serve&lt;/strong&gt;r: Instance of Debezium Server configured with PostgreSQL source and HTTP Client sink connectors.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev REST Gateway&lt;/strong&gt;: Uses the out-of-the-box configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Memphis.dev&lt;/strong&gt;: Configured with a single station (todo-cdc-events) and single user (todocdcservice).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Printing Consumer&lt;/strong&gt;: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/o4z_EUYdxv1Kn31En5WccMpo82sYk4VqF8UwnDdhAPY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcDdj/Y2U0cmZyd3pzb283/NDAyZ3guanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/o4z_EUYdxv1Kn31En5WccMpo82sYk4VqF8UwnDdhAPY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcDdj/Y2U0cmZyd3pzb283/NDAyZ3guanBn" alt="Postgress CDC example" width="800" height="255"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Running the Implementation
&lt;/h2&gt;

&lt;p&gt;Code repository: &lt;a href="https://github.com/memphisdev/memphis-example-solutions"&gt;Memphis Example Solutions&lt;/a&gt;.&lt;br&gt;
&lt;a href="https://docs.docker.com/compose/"&gt;Docker Compose&lt;/a&gt; will be needed.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 1&lt;/strong&gt;: Build the Docker images for &lt;a href="https://debezium.io/documentation/reference/2.2/operations/debezium-server.html"&gt;Debezium Server&lt;/a&gt;, the printing consumer, and database setup (table and user creation).&lt;/p&gt;

&lt;p&gt;Currently, our implementation depends on a pre-release version of Debezium Server for the JWT authentication support. A Docker image will be built directly from the main branch of the Debezium and Debezium Server repositories. Note that this step can take quite a while (~20 minutes) to run. When Debezium Server 2.3.0 is released, we will switch to using the upstream Docker image.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose build --pull --no-cache
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Building 0.0s (0/1)
[+] Building 0.2s (2/3)
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
[+] Building 19.0s (5/10)
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
[+] Building 19.2s (5/10)
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 302B                             0.0s
 =&amp;gt; [internal] load .dockerignore                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 2B                                  0.0s
 =&amp;gt; [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.3s
 =&amp;gt; CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
[+] Building 31.4s (5/10)
[+] Building 32.2s (5/10)
[+] Building 34.2s (5/10)
 =&amp;gt; [internal] load .dockerignore                                0.0s
[+] Building 37.6s (11/11) FINISHED
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 302B                             0.0s
[+] Building 37.7s (5/10)
 =&amp;gt; [internal] load .dockerignore                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 2B                                  0.0s
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 300B                             0.0s
[+] Building 37.9s (5/10)
 =&amp;gt; [internal] load .dockerignore                                0.0s
[+] Building 38.0s (5/10)
[+] Building 38.2s (5/10)
[+] Building 18.9s (4/14)
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 613B                             0.0s
[+] Building 20.0s (4/14)
[+] Building 65.8s (11/11) FINISHED
 =&amp;gt; [internal] load .dockerignore                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 2B                                  0.0s
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
[+] Building 1207.0s (15/15) FINISHED
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 613B                             0.0s
 =&amp;gt; [internal] load .dockerignore                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 2B                                  0.0s
 =&amp;gt; [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.2s
 =&amp;gt; CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
 =&amp;gt; [ 2/13] RUN apt update &amp;amp;&amp;amp; apt upgrade -y &amp;amp;&amp;amp; apt install -y openjdk-11-jdk-headless wget git curl &amp;amp;&amp;amp; rm -rf /var/cache/apt/ 49.5s
 =&amp;gt; [ 3/13] RUN git clone https://github.com/debezium/debezium   6.0s
 =&amp;gt; [ 4/13] WORKDIR /debezium                                    0.1s
 =&amp;gt; [ 5/13] RUN ./mvnw clean install -DskipITs -DskipTests     761.4s
 =&amp;gt; [ 6/13] RUN git clone https://github.com/debezium/debezium-server debezium-server-build                                            1.1s
 =&amp;gt; [ 7/13] WORKDIR /debezium-server-build                       0.0s
 =&amp;gt; [ 8/13] RUN ./mvnw package -DskipITs -DskipTests -Passembly372.1s
 =&amp;gt; [ 9/13] RUN tar -xzvf debezium-server-dist/target/debezium-server-dist-*.tar.gz -C /   2.0s
 =&amp;gt; [10/13] WORKDIR /debezium-server                             0.0s
 =&amp;gt; [11/13] RUN mkdir data                                       0.5s
 =&amp;gt; exporting to image                                          14.0s =&amp;gt; =&amp;gt; exporting layers                                          14.0s
 =&amp;gt; =&amp;gt; writing image sha256:51d987a3bf905f35be87ce649099e76c13277d75c4ac26972868fc9af2617d14                                                                0.0s
 =&amp;gt; =&amp;gt; naming to docker.io/library/debezium-server               0.0s
[+] Building 41.8s (11/11) FINISHED
 =&amp;gt; [internal] load build definition from Dockerfile             0.0s
 =&amp;gt; =&amp;gt; transferring dockerfile: 302B                             0.0s
 =&amp;gt; [internal] load .dockerignore                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 2B                                  0.0s
 =&amp;gt; [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.3s
 =&amp;gt; [internal] load build context                                0.0s
 =&amp;gt; =&amp;gt; transferring context: 39B                                 0.0s
 =&amp;gt; CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
 =&amp;gt; [2/6] RUN apt update &amp;amp;&amp;amp; apt upgrade -y &amp;amp;&amp;amp; apt install -y python3 python3-pip &amp;amp;&amp;amp; rm -rf /var/cache/apt/*                          33.5s
 =&amp;gt; [3/6] WORKDIR /app                                           0.0s
 =&amp;gt; [4/6] COPY todo_generator.py /app/                           0.0s
 =&amp;gt; [5/6] RUN pip3 install -U pip wheel                          2.0s
 =&amp;gt; [6/6] RUN pip3 install psycopg2-binary                       1.1s
 =&amp;gt; exporting to image                                           4.9s
 =&amp;gt; =&amp;gt; exporting layers                                          4.9s
 =&amp;gt; =&amp;gt; writing image sha256:6424a08a9dedb77b798610a0b87c1c0a0c5f910039d03d673b3cf47ac54c10de                                                                0.0s
 =&amp;gt; =&amp;gt; naming to docker.io/library/todo-generator                0.0s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 2&lt;/strong&gt;: Start the &lt;a href="https://github.com/memphisdev/memphis"&gt;Memphis.dev broker&lt;/a&gt; and &lt;a href="https://github.com/memphisdev/memphis-rest-gateway"&gt;REST gateway&lt;/a&gt;. Note that the memphis-rest-gateway service depends on the memphis broker service, so the broker service will be started as well.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d memphis-rest-gateway
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 4/4
 ⠿ Network postgres-debezium-cdc-example_default                   Created                                                       0.1s
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1      Healthy                                                       6.1s
 ⠿ Container postgres-debezium-cdc-example-memphis-1               Health...                                                    16.9s
 ⠿ Container postgres-debezium-cdc-example-memphis-rest-gateway-1  Started                                                      17.3s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Step 3&lt;/strong&gt;: Follow the instructions for &lt;a href="https://github.com/memphisdev/memphis-example-solutions/blob/master/postgres-debezium-cdc-example/docs/setup_memphis.md"&gt;configuring Memphis.dev&lt;/a&gt; with a new station (todo-cdc-events) and user (todocdcservice) using the web UI.&lt;/p&gt;

&lt;p&gt;Point your browser at  &lt;a href="http://localhost:9000/"&gt;http://localhost:9000/&lt;/a&gt;. Click the “sign in with root” link at the bottom of the page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/Eo7HQVQP6TvYAMVjlq3_KieTF40JbvwPdqbbdVstwZo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOWVm/bmY1c3RicTNodzc1/emRlaW0ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/Eo7HQVQP6TvYAMVjlq3_KieTF40JbvwPdqbbdVstwZo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOWVm/bmY1c3RicTNodzc1/emRlaW0ucG5n" alt="sign in" width="800" height="580"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Log in with root (username) and memphis (password).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/HiUzvIjnVAt6K8nyk42zK2swvSLD8Ti6k-dyMnLghD8/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvYmlz/dTZqbTc3YTZteW1m/bWUycm0ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/HiUzvIjnVAt6K8nyk42zK2swvSLD8Ti6k-dyMnLghD8/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvYmlz/dTZqbTc3YTZteW1m/bWUycm0ucG5n" alt="memphis root" width="800" height="521"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow the wizard to create a station named todo-cdc-events.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/M9xwmrzjFN6mogn9Nkl35ddj0hnD_zllrSR89YVCfNg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMveW1z/Nm1peXEwYm90cjBv/aXZsaW0ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/M9xwmrzjFN6mogn9Nkl35ddj0hnD_zllrSR89YVCfNg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMveW1z/Nm1peXEwYm90cjBv/aXZsaW0ucG5n" alt="memphis ui" width="800" height="581"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Create a user named todocdcservice with the same value for the password.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/CRSvYfjUNUoLE5aTSfpPJneuHySq9waHfqwQy4ECtFg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZnBl/Zzh2NWN4eDBpaGR2/NnRuMXMucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/CRSvYfjUNUoLE5aTSfpPJneuHySq9waHfqwQy4ECtFg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZnBl/Zzh2NWN4eDBpaGR2/NnRuMXMucG5n" alt="userapp" width="800" height="867"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click “next” until the wizard is finished:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/Q-0v0hEvNKlNGRXkHLFa1zy64JejZGa1GdlIAO4fHZw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZG0w/bXFleTFzcjBsMWxl/emR6NDEucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/Q-0v0hEvNKlNGRXkHLFa1zy64JejZGa1GdlIAO4fHZw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZG0w/bXFleTFzcjBsMWxl/emR6NDEucG5n" alt="Image description" width="800" height="648"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click “Go to station overview” to go to the station overview page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/SARHAiDZv0QE8B4ieuph7FZme5a19zjUe4vli8_vGHY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOXJu/YjltY2FpMXg3czQ5/MWo1MTQucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/SARHAiDZv0QE8B4ieuph7FZme5a19zjUe4vli8_vGHY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvOXJu/YjltY2FpMXg3czQ5/MWo1MTQucG5n" alt="Image description" width="800" height="483"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 4&lt;/strong&gt;: Start the printing consumer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d printing-consumer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 3/3
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1  H...                                                              0.6s
 ⠿ Container postgres-debezium-cdc-example-memphis-1           Healthy                                                           1.1s
 ⠿ Container printing-consumer                                 Started           
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start the todo item generator, PostgreSQL database, and Debezium Server:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker compose up -d todo-generator
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[+] Running 7/7
 ⠿ Container postgres                                              Healthy                                                       7.9s
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1      Healthy                                                       0.7s
 ⠿ Container postgres-debezium-cdc-example-memphis-1               Health...                                                     1.2s
 ⠿ Container postgres-debezium-cdc-example-memphis-rest-gateway-1  Running                                                       0.0s
 ⠿ Container database-setup                                        Exited                                                        6.8s
 ⠿ Container debezium-server                                       Healthy                                                      12.7s
 ⠿ Container todo-generator                                        Started      
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that the todo item generator depends on the other services and will start them automatically. The database-setup container will run once to create the database, tables, and role in PostgreSQL.&lt;/p&gt;

&lt;p&gt;Lastly, confirm the system is working. Check the todo-cdc-events station overview screen in the Memphis.dev web UI to confirm that the producer and consumer are connected and messages are being delivered.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/RezxhpUa7j8yULbBOFPMhD8uUoWuoOAcM3nZhFYh09g/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbTd3/eWVjajg3bnB2NWEx/emlnNDMucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/RezxhpUa7j8yULbBOFPMhD8uUoWuoOAcM3nZhFYh09g/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbTd3/eWVjajg3bnB2NWEx/emlnNDMucG5n" alt="Image description" width="800" height="456"&gt;&lt;/a&gt;&lt;br&gt;
And, print the logs for the printing-consumer container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker logs --tail 2 printing-consumer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;message:  bytearray(b'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.public.todo_items.Envelope","version":1},"payload":{"before":null,"after":{"item_id":205,"description":"ERJGCHXXOBBGSMOUQSMB","creation_date":1682991115063809,"due_date":null,"completed":false},"source":{"version":"2.3.0-SNAPSHOT","connector":"postgresql","name":"tutorial","ts_ms":1682991115065,"snapshot":"false","db":"todo_application","sequence":"[\\"26715784\\",\\"26715784\\"]","schema":"public","table":"todo_items","txId":945,"lsn":26715784,"xmin":null},"op":"c","ts_ms":1682991115377,"transaction":null}}')
message:  bytearray(b'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.public.todo_items.Envelope","version":1},"payload":{"before":null,"after":{"item_id":206,"description":"KXWQYXRWCGSKTBJOJFSX","creation_date":1682991115566896,"due_date":1683250315566896,"completed":false},"source":{"version":"2.3.0-SNAPSHOT","connector":"postgresql","name":"tutorial","ts_ms":1682991115568,"snapshot":"false","db":"todo_application","sequence":"[\\"26715992\\",\\"26715992\\"]","schema":"public","table":"todo_items","txId":946,"lsn":26715992,"xmin":null},"op":"c","ts_ms":1682991115885,"transaction":null}}')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;Congratulations! You now have a working example of how to capture and transfer data change events from a PostgreSQL database into Memphis.dev using Debezium Server.&lt;/p&gt;




&lt;p&gt;check out &lt;a href="https://memphis.dev/blog/part-2-change-data-capture-cdc-for-mongodb-with-debezium-and-memphis-dev/"&gt;part 2:Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev&lt;/a&gt;&lt;br&gt;
&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at Memphis.dev By RJ Nowling, Developer advocate at &lt;a href="https://memphis.dev/blog/part-1-integrating-debezium-server-and-memphis-dev-for-streaming-change-data-capture-cdc-events/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>cdc</category>
      <category>memphisdev</category>
      <category>streamingdata</category>
    </item>
    <item>
      <title>Memphis is now GA!</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Wed, 05 Apr 2023 05:48:32 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/memphis-is-now-ga-m6i</link>
      <guid>https://community.ops.io/memphis_dev/memphis-is-now-ga-m6i</guid>
      <description>&lt;p&gt;Memphis is now GA,&lt;br&gt;
and we do not take this title for granted.&lt;/p&gt;

&lt;h2&gt;
  
  
  Let's start from the beginning.
&lt;/h2&gt;

&lt;p&gt;Struggling with the engineering part of the legacy brokers and queues planted the idea that it needs to be disrupted.&lt;br&gt;
We dagged up more and realized a very rhetorical fact. Still, one that we usually forget – messaging queues and brokers are a means to an end, not the goal itself, and that understanding open a whole new variety of solutions.&lt;/p&gt;

&lt;p&gt;The chosen solution was the most challenging one, but we believe, also the right one – a) It has to be open-sourced. b) It can’t be just an intelligent message broker on steroids. c) It has to offer what we call the “Day 2” operations on top to help build queue-based applications in minutes. From the more common ones, which are async communication between microservices, task scheduling, to event-driven applications, event sourcing, data ingestion, system integration, log collecting, and forming a data lake&lt;br&gt;
With that understanding in mind, we formed the vision of Memphis – an intelligent and frictionless message broker that enables an ultra-fast development of queue-based applications for developers and data engineers.&lt;/p&gt;




&lt;h2&gt;
  
  
  From vision to GA.
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Memphis beta version released on May 15th, 2022.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;We focused on the foundations of the ecosystem, integrating with NATS internals, designing memphis to run natively on Kubernetes and cloud-native environments, out-of-the-box everything - from monitoring, dead-latter station, schema validation, real-time observability, and more.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;With each release, the bug cycles became shorter and smaller, and we, as a team and product, became more intelligent by carefully understanding and listening to our users. By doing that, Memphis reached a solid and stable GA, and not less importantly, suitable for most developers and not just those who share our original challenges.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;April 2nd, the GA release of the 1st part of Memphis.&lt;br&gt;
Memphis GA stands for a solid, stable, and secure foundation for the future to come with zero known bugs and ready for production.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Some insights from the last eight months.
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Avg time from installation to data ingestion takes 5 minutes.&lt;/li&gt;
&lt;li&gt;We grew from 0 to over 5000 deployments.&lt;/li&gt;
&lt;li&gt;50 new contributors.&lt;/li&gt;
&lt;li&gt;Users report production usage before the GA release.&lt;/li&gt;
&lt;li&gt;Use cases from async communication between microservices, event-driven applications, event sourcing, data ingestion, system.&lt;/li&gt;
&lt;li&gt;integration, log collecting, security events, to forming a data lake&lt;/li&gt;
&lt;li&gt;Schemaverse has been a game changer to many of our users.&lt;/li&gt;
&lt;li&gt;The most used SDK is Go.&lt;/li&gt;
&lt;li&gt;Cost and simplicity have been major factors in replacing existing tech with Memphis.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  The future to come.
&lt;/h2&gt;

&lt;p&gt;I mentioned 1st part, so there is a 2nd part.&lt;br&gt;
Memphis’ 1st part is the storage layer, the message broker with all its benefits as we know it today, and will continue to evolve dramatically over the coming releases. We will also push hard on GitOps, automation enablement, and reconstructing some of the APIs so they can be modular and open for the community to self-implement new ones. Last but not least – multi-tenancy, partitions, read-replicas, and more.&lt;/p&gt;

&lt;p&gt;Memphis’ 2nd part is all about helping developers and data engineers build valuable use cases and queue-based applications on top of Memphis. More on that in the coming weeks.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/SIOfJae_YYDjmlgNCpYuYU_No0TjQDaKzG_ZYRgY-_Q/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM253/dGxzbXY0MWZ2ZmIz/aXY0dG8uanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/SIOfJae_YYDjmlgNCpYuYU_No0TjQDaKzG_ZYRgY-_Q/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM253/dGxzbXY0MWZ2ZmIz/aXY0dG8uanBn" alt="v.1.0.0 is out" width="800" height="549"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Memphis &lt;a href="https://docs.memphis.dev/memphis/memphis-cloud/signup"&gt;cloud&lt;/a&gt; is right around the corner, but if you prefer to self-host Memphis now - head &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;here&lt;/a&gt;.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at Memphis.dev By Yaniv Ben Hemo, Co-Founder &amp;amp; CEO at &lt;a href="https://memphis.dev/blog/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>messagequeu</category>
      <category>dataprocessing</category>
      <category>datastreaming</category>
      <category>memphisdev</category>
    </item>
    <item>
      <title>Batch Processing vs Stream Processing</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Thu, 30 Mar 2023 11:51:21 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/batch-processing-vs-stream-processing-8h8</link>
      <guid>https://community.ops.io/memphis_dev/batch-processing-vs-stream-processing-8h8</guid>
      <description>&lt;p&gt;In the digital age, data is the new currency and is being used everywhere.&lt;br&gt;
From social media to IoT devices, businesses are generating more data than ever before.&lt;br&gt;
With this data comes the challenge of processing it in a timely and efficient way.&lt;br&gt;
Companies all over the world are investing in technologies that can help them better process, analyze, and use the data they are collecting to better serve their customers and stay ahead of their competitors.&lt;br&gt;
One of the most important decisions organizations make when it comes to data processing is whether to use stream or batch processing. Stream processing is quickly becoming the go-to option for many companies because of its ability to provide real-time insights and immediate actionable results. With the right stream processing platform, companies can easily unlock the value of their data and use it to gain a competitive edge.This article will explore why stream processing is taking over, including its advantages over batch processing, such as its scalability, cost-effectiveness, and flexibility. &lt;/p&gt;

&lt;p&gt;Let’s recap some of the basics first.&lt;/p&gt;




&lt;h2&gt;
  
  
  Data Processing
&lt;/h2&gt;

&lt;p&gt;Data processing is the process of transforming raw data into meaningful and useful information. It involves a wide range of activities, including data collection, data cleaning, data integration, data analysis, and data visualization.  It is an essential part of the analysis and decision-making process in many industries, including finance, healthcare, education, engineering, and business.&lt;/p&gt;

&lt;p&gt;Data processing can be divided into two main categories: Manual Data Processing and Automated Data Processing. &lt;/p&gt;

&lt;p&gt;Manual data processing involves the use of manual input,paper forms, manual calculations, and the entry of data into software programs. Manual data processing is often slow and error-prone, but it can be beneficial when dealing with large amounts of data or complex tasks. Automated data processing, however, is faster and more efficient than manual data processing. Automated data processing uses algorithms and software to automate the processing of data. This includes activities such as sorting, filtering, and summarizing data.&lt;/p&gt;

&lt;p&gt;Data processing can also be classified into several types. These include batch processing, real-time processing, stream processing. Multi-Processing, and Time-sharing.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Batch Processing&lt;/strong&gt;: This is a type of data processing that involves the execution of a series of pre-defined instructions or programs on a batch of data. It is typically used for tasks that require large amounts of data to be processed, such as data mining or data warehousing.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Real-time processing&lt;/strong&gt;:This is a type of data processing that involves the continuous, real-time analysis of data streams. It is typically used for applications that require immediate analysis and response to incoming data, such as fraud detection and consumer/user activity.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Stream processing&lt;/strong&gt;:This is a type of data processing that involves the continuous, real-time analysis of data streams. It is similar to real-time processing, but typically involves more complex operations and is capable of handling large volumes of data with low latency.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Multi-Processing:&lt;/strong&gt; Multi-processing is a type of data processing that involves multiple processors working simultaneously on different tasks. Multi-processing is often used to speed up the processing of large amounts of data. By using multiple processors, the same task can be completed faster than if it were done on a single processor.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Time-sharing:&lt;/strong&gt; Time-sharing is a type of data processing that allows multiple users to access the same computer or system at the same time. Time-sharing systems provide better efficiency and performance than batch processing and are often used in applications such as online banking, e-commerce, and web hosting.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Overall, data processing is an essential part of modern business and society, and is critical for turning raw data into useful information that can be used to make informed decisions and drive business growth.&lt;/p&gt;

&lt;p&gt;Let’s discuss stream and batch data processing in detail.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is Stream Processing?
&lt;/h2&gt;

&lt;p&gt;Stream processing is a type of data processing that involves continuous, real-time analysis of data streams. It is a way of handling large volumes of data that are generated by various sources, such as sensors, financial transactions, or social media feeds, in real-time.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/rfhEu7cvm2Xw3XRUmZyetxfIAMG8vQYfxzNR_XaT3Bc/w:880/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbWtr/czBmOWd6ZDNqZTY1/NmY0NHAuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/rfhEu7cvm2Xw3XRUmZyetxfIAMG8vQYfxzNR_XaT3Bc/w:880/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbWtr/czBmOWd6ZDNqZTY1/NmY0NHAuanBn" alt="What is stream processing" width="880" height="518"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Advantages of Stream Processing
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Real-Time Nature&lt;/strong&gt;: One of the primary advantages of stream processing is its real-time nature. Because data is processed as it is received, stream processing allows for faster analysis and decision-making. This can be especially useful in applications where time is of the essence, such as in financial trading or emergency response.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scalability&lt;/strong&gt;: Another advantage of stream processing is its scalability. Because stream processing systems are designed to handle large volumes of data in real-time, they can easily scale to handle increases in data volume without compromising on performance. This makes them well-suited to applications that deal with large amounts of data, such as internet of things (IoT) applications or social media analysis.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Reduced Cost&lt;/strong&gt;: Stream processing also helps organizations save money by reducing costs associated with storing large amounts of data. Stream processing systems can store only the data that is required for processing, eliminating the need to store and manage large datasets.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Security&lt;/strong&gt;: Stream processing is more secure than traditional batch processing systems. Stream processing systems use encryption techniques to ensure that data is kept secure and confidential. This helps organizations to ensure that their data remains safe and secure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenges of Stream Processing
&lt;/h2&gt;

&lt;p&gt;Overall, stream processing is a powerful tool for handling large volumes of data in real-time, but it also comes with its own set of challenges. &lt;/p&gt;

&lt;p&gt;One of the main challenges of stream processing is ensuring the &lt;strong&gt;accuracy and consistency&lt;/strong&gt; of the data. Because stream processing involves continuous analysis of data in real-time, any errors or inconsistencies in the data can quickly propagate throughout the system, leading to incorrect results. This can be particularly problematic in complex systems with many different data sources, and can require careful design and management to ensure the quality of the data.&lt;/p&gt;

&lt;p&gt;Another challenge of stream processing is dealing with &lt;strong&gt;late or out-of-order data&lt;/strong&gt;. In stream processing, data is often generated by multiple sources, and it can arrive at different times or in a different order than expected. This can make it difficult to accurately process the data, and can require the use of specialized techniques to handle such situations. For example, some stream processing systems use techniques such as windowing or buffering to delay the processing of data until all necessary information is available, or to reorder data if it arrives out of sequence.&lt;/p&gt;

&lt;p&gt;A third challenge associated with stream processing is maintaining the &lt;strong&gt;performance&lt;/strong&gt; of the system. Because stream processing involves continuous analysis of data, it can put a heavy load on the underlying infrastructure, which can impact the overall performance of the system. This can be particularly problematic in systems with high volumes of data, or with complex data processing pipelines. To address this challenge, stream processing systems often use techniques such as parallelism, load balancing, and data partitioning to distribute the workload across multiple machines and improve the overall performance of the system.&lt;/p&gt;

&lt;p&gt;The challenges stated above can be addressed through careful design and management of the system, as well as the use of specialized techniques to ensure the accuracy and performance of the data processing pipeline.&lt;/p&gt;




&lt;h2&gt;
  
  
  Use Cases
&lt;/h2&gt;

&lt;p&gt;Stream processing can be used in many different use cases and can be applied to a variety of industries, including finance, retail, healthcare, telecommunications, and IoT.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Finance&lt;/strong&gt;: Stream processing can be used to analyze market data in real time and detect fraud. By analyzing customer transactions and patterns, banks can quickly identify suspicious activity and alert authorities. This helps reduce the potential losses caused by fraudulent activities.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Retail&lt;/strong&gt;: Stream processing can be used to provide customers with personalized offers and recommendations. By analyzing customer data in real time, retailers can create targeted campaigns that are tailored to each individual customer's preferences. This allows them to offer more relevant products and services, which can lead to increased customer satisfaction and loyalty.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Health Care&lt;/strong&gt;: Stream processing can be used to monitor patient health in real time. By collecting data from various medical devices and sensors, healthcare providers can quickly identify any changes in a patient's health status. This can help them detect and treat conditions before they become serious and costly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Telecommunication&lt;/strong&gt;: Stream processing can be used to monitor network performance in real time. By analyzing data from various telecommunication networks, service providers can quickly identify any issues or outages and take corrective action. This helps them maintain a high level of service quality and provide reliable connections to their customers.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Internet of Things (IoT)&lt;/strong&gt;: Stream processing can also be used to collect and analyze data from connected devices. This can help organizations gain valuable insights into how their devices are performing and make informed decisions about how to optimize their operations.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is Batch Data Processing?
&lt;/h2&gt;

&lt;p&gt;Batch data processing is a method of executing a series of tasks in a predetermined sequence. It involves dividing a large amount of data into smaller, more manageable units called batches, which are processed independently and in parallel. In batch processing, a group of transactions or data is collected over a period of time and then processed all at once, typically overnight or during a maintenance window. Batch processing is often used in large-scale computing systems and data processing applications, such as payroll, invoicing, and inventory management.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/nEmYrQfADfVn8rrUpVvIjpfQHf0E1WanYSAFLLdvdA8/w:880/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbDJj/M3F4Y3ZvZnhqOWF5/a3dhaDkuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/nEmYrQfADfVn8rrUpVvIjpfQHf0E1WanYSAFLLdvdA8/w:880/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbDJj/M3F4Y3ZvZnhqOWF5/a3dhaDkuanBn" alt="What is Batch Data Processing?" width="880" height="518"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Advantages of Batch Processing
&lt;/h2&gt;

&lt;p&gt;There are several advantages to using batch processing:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Improved efficiency and speed&lt;/strong&gt;: Batch processing allows for the concurrent execution of multiple jobs, which can significantly improve the speed and efficiency of processing large amounts of data. By processing multiple transactions or data sets at once, batch processing can reduce the amount of time it takes to complete a task, allowing organizations to complete more work in less time.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Reduced costs&lt;/strong&gt;: Batch data processing can also help to reduce costs by reducing the need for manual intervention and labor. By automating repetitive tasks, organizations can reduce the amount of time and resources that are required to complete a task, leading to cost savings.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Increased accuracy&lt;/strong&gt;: Batch processing can help to increase the accuracy of data processing by ensuring that all transactions are processed consistently, according to predefined rules and procedures. This can help to reduce the potential for errors and inconsistencies, leading to more accurate and reliable results.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Enhanced security&lt;/strong&gt;: Batch processing can also help to improve the security of data processing by limiting the access to sensitive data to authorized personnel only. By controlling access to data and processing it in a secure environment, organizations can help to prevent unauthorized access and protect against potential security threats.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Improved scalability&lt;/strong&gt;: Batch data processing is highly scalable, meaning that it can be easily adapted to handle increased volumes of data without a significant impact on performance. This allows organizations to easily and efficiently process large amounts of data as their needs evolve, without the need for additional resources or infrastructure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenges of Batch Processing
&lt;/h2&gt;

&lt;p&gt;There are also some challenges associated with batch processing. &lt;/p&gt;

&lt;p&gt;One of the main challenges is the need for &lt;strong&gt;careful planning and coordination&lt;/strong&gt;. Since batch processing is executed in a predetermined sequence, it is important to carefully plan and coordinate the execution of tasks to ensure that they are completed in the correct order. &lt;/p&gt;

&lt;p&gt;Another challenge of batch processing is that it can be &lt;strong&gt;time-consuming&lt;/strong&gt;. Since data is collected and processed in large quantities, it can take a significant amount of time to complete a batch. This can be especially problematic for businesses that need to process data in real-time, as batch processing may not be fast enough to keep up with the demands of the business.&lt;/p&gt;

&lt;p&gt;Batch processing can also be more &lt;strong&gt;complex&lt;/strong&gt; to implement and maintain, as it requires the development and management of batch schedules and processes. This can require additional resources and expertise, which can be a challenge for some organizations.&lt;/p&gt;

&lt;p&gt;Another challenge of batch processing is the &lt;strong&gt;limited visibility&lt;/strong&gt; it provides into the status of individual transactions or data items. With batch processing, it is often difficult to see the status of a particular transaction or data item within the batch, which can make it challenging to identify and address any issues that may arise.&lt;/p&gt;

&lt;p&gt;Batch processing can also present challenges when it comes to maintaining &lt;strong&gt;data integrity&lt;/strong&gt;. If a batch fails, it can be difficult to determine which data items were processed and which were not, which can lead to data loss or errors.&lt;/p&gt;

&lt;p&gt;In addition, batch processing can be &lt;strong&gt;error-prone&lt;/strong&gt;. Since data is catered in large quantities, it can be difficult to catch and correct errors in the batch. This can lead to inaccurate or incomplete results, which can be damaging to a business.&lt;/p&gt;




&lt;h2&gt;
  
  
  Use Cases:
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Data analytics&lt;/strong&gt;: Batch processing is used in data analytics to process large amounts of data and generate insights or reports. For example, a company might use batch processing to analyze customer data and generate reports on customer behavior or preferences.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;ETL (extract, transform, load) processes&lt;/strong&gt;: Batch processing is often used in ETL (extract, transform, load) processes to extract data from various sources, transform it into a format suitable for analysis or reporting, and load it into a data warehouse or other system.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Inventory Management&lt;/strong&gt;: Batch processing is also used in inventory management systems to process orders, track inventory levels, and generate reports. By processing data in a batch, it is possible to more efficiently manage and track inventory levels.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Financial transactions&lt;/strong&gt;: Batch processing is commonly used in the financial industry to process large numbers of transactions, such as credit card transactions or stock trades. For example, a bank might use batch processing to process transactions from multiple branches or ATM machines, and then update customer accounts accordingly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Online services&lt;/strong&gt;: Batch processing is also used in the development of online services, such as web applications or mobile apps. For example, a social media platform might use batch processing to process large amounts of data in order to generate recommendations for users or to generate reports on user behavior.&lt;/p&gt;




&lt;h2&gt;
  
  
  Batch Processing vs Stream Processing: An Overview
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Batch Processing vs Stream Processing: Hardware&lt;/strong&gt;&lt;br&gt;
When it comes to hardware, there are some key differences between batch processing and stream processing. Batch data processing typically requires more powerful hardware, as it needs to be able to handle large amounts of data all at once. This can include powerful servers, high-capacity storage systems, and other specialized hardware.&lt;/p&gt;

&lt;p&gt;On the other hand, stream processing typically requires less powerful hardware. Since data is processed in real-time, it does not need to be stored for later processing. This means that stream processing systems can be more lightweight and can use less powerful hardware.&lt;/p&gt;

&lt;p&gt;Overall, the type of hardware needed for batch processing and stream processing depends on the specific requirements of the system and the amount of data being processed.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Batch Processing vs Stream Processing: Data Set&lt;/strong&gt;&lt;br&gt;
One of the main differences between batch processing and stream processing is the type of data they are designed to handle. Batch processing is typically used for data sets that are large and static, such as historical records or logs. In contrast, stream processing is typically used for data sets that are large but constantly changing, such as real-time sensor data.&lt;/p&gt;

&lt;p&gt;Another important difference between batch processing and stream processing is the way they handle data. Batch processing systems typically operate on data that is stored in a database or file system. On the other hand, stream processing systems operate on data that is generated in real-time or near-real-time.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Batch Processing vs Stream Processing: Analysis&lt;/strong&gt;&lt;br&gt;
One more area where these two data processing methods differ is the type of analysis they are designed to perform. Batch processing systems are designed to perform complex, data-intensive analysis, such as machine learning and predictive modeling.&lt;/p&gt;

&lt;p&gt;While, stream processing systems are suitable for performing simple, low-latency analyses, such as filtering and aggregation because it is designed to process data in small chunks, which limits their ability to perform complex analysis.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Batch Processing vs Stream Processing: Platforms&lt;/strong&gt;&lt;br&gt;
There are several platforms available for both batch processing and stream processing, each with its own unique features and capabilities. &lt;/p&gt;

&lt;p&gt;Some of the most popular platforms for batch processing include:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Apache Hadoop&lt;/strong&gt; and &lt;strong&gt;Apache Spark&lt;/strong&gt;, which are open-source distributed computing platforms that are widely used for big data processing and analysis.&lt;/p&gt;

&lt;p&gt;For stream processing, some popular platforms include:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Apache Flink&lt;/strong&gt; and &lt;strong&gt;Apache Storm&lt;/strong&gt;, which are also open-source distributed computing platforms. These platforms are often used for applications such as monitoring systems and real-time analytics.&lt;/p&gt;

&lt;p&gt;In addition to these open-source platforms, there are also several commercial platforms available for both batch data processing and stream processing. &lt;/p&gt;

&lt;p&gt;Some examples of commercial batch processing platforms include Cloudera and MapR, which are distributed computing platforms that are designed for big data processing and analysis.&lt;/p&gt;

&lt;p&gt;Let’s put some light on these commercial platforms!&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cloudera&lt;/strong&gt;: Cloudera is a leading provider of enterprise data cloud solutions, including software and services for data engineering, data warehousing, machine learning and analytics. Cloudera provides an enterprise data platform to customers of all sizes, enabling them to store, process and analyze their data quickly, reliably and securely. Cloudera also offers an array of professional services, such as consulting and training, to help customers get the most out of their data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;MapR&lt;/strong&gt;: MapR is a distributed data platform for big data applications that provides fast and reliable access to data. It combines an optimized version of the Apache Hadoop open-source software with enterprise-grade features such as high availability, disaster recovery, and global replication. MapR also provides NoSQL databases, streaming analytics, and machine learning capabilities.&lt;/p&gt;

&lt;p&gt;For stream processing, some popular commercial platforms include Confluent, Memphis, and Databricks, which are also distributed computing platforms. These platforms are often used for applications such as fraud detection and real-time recommendation engines.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Confluent&lt;/strong&gt;: Confluent is an enterprise streaming platform built on Apache Kafka. It provides a range of services to support the development, deployment, and management of streaming data pipelines. It includes features such as real-time data integration, stream processing, and analytics. It also enables organizations to build mission-critical streaming applications.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Memphis&lt;/strong&gt;: &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt; is an open-source, real-time data processing platform&lt;/p&gt;

&lt;p&gt;that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker. Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Databricks&lt;/strong&gt;: Databricks is a cloud-based platform for data engineering, machine learning, and analytics. It provides an integrated environment for working with big data that simplifies the process of managing and analyzing large datasets. It allows users to easily create data pipelines and complex analytics applications, and supports popular open source libraries such as Apache Spark, MLlib, and TensorFlow.&lt;/p&gt;

&lt;p&gt;Overall, the choice of platform for batch processing or stream processing depends on the specific requirements of the application. Open-source platforms are often a good choice for applications that require flexibility and customization, while commercial platforms may be more suitable for applications that require support and scalability.&lt;/p&gt;




&lt;h2&gt;
  
  
  Why Batch is dying and Streaming Takes over?
&lt;/h2&gt;

&lt;p&gt;There are several reasons why streaming has become more popular and why batch processing may be declining in popularity.&lt;/p&gt;

&lt;p&gt;One reason is the increasing demand for real-time processing. In today's fast-paced world, many organizations require the ability to process data in real-time in order to respond to changing conditions and make timely decisions. &lt;/p&gt;

&lt;p&gt;Another reason is the increasing availability of streaming technologies and tools. In the past, streaming was more difficult and expensive to implement, but today there are a wide range of tools and technologies available that make it easier and more cost-effective to implement streaming solutions. Also, with streaming it is possible to track the processing of data in real-time, which can be beneficial for debugging and monitoring purposes.&lt;/p&gt;




&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This blog post walks through the basics of stream and batch processing, lists some of the advantages and challenges associated with these data processing methods, and then also compares them in terms of performance, data sets, analysis, hardware, and some other features.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt; By &lt;a href="https://www.linkedin.com/in/shoham-roditi-elimelech-0b933314a/"&gt;Shoham Roditi Elimelech&lt;/a&gt;, software engineer at @Memphis.dev &lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataprocessing</category>
      <category>batchprocessing</category>
      <category>streamprocessing</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Stateful stream processing with Memphis and Apache Spark</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Mon, 13 Mar 2023 16:10:10 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/stateful-stream-processing-with-memphis-and-apache-spark-35pn</link>
      <guid>https://community.ops.io/memphis_dev/stateful-stream-processing-with-memphis-and-apache-spark-35pn</guid>
      <description>&lt;p&gt;Amazon Simple Storage Service (S3) is a highly scalable, durable, and secure object storage service offered by Amazon Web Services (AWS). S3 allows businesses to store and retrieve any amount of data from anywhere on the web by making use of its enterprise-level services. S3 is designed to be highly interoperable and integrates seamlessly with other Amazon Web Services (AWS) and third-party tools and technologies to process data stored in Amazon S3. One of which is Amazon EMR (Elastic MapReduce) which allows you to process large amounts of data using open-source tools such as Spark.&lt;/p&gt;

&lt;p&gt;Apache Spark is an open-source distributed computing system used for large-scale data processing. Spark is built to enable speed and supports various data sources, including the Amazon S3. Spark provides an efficient way to process large amounts of data and perform complex computations in minimal time.&lt;/p&gt;

&lt;p&gt;Memphis.dev is a next-generation alternative to traditional message brokers.&lt;br&gt;
A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.&lt;/p&gt;

&lt;p&gt;The common pattern of message brokers is to delete messages after passing the defined retention policy, like time/size/number of messages. Memphis offers a 2nd storage tier for longer, possibly infinite retention for stored messages. Each message that expels from the station will automatically migrate to the 2nd storage tier, which in that case is AWS S3.&lt;/p&gt;

&lt;p&gt;In this tutorial, you will be guided through the process of setting up a Memphis station with a 2nd storage class connected to AWS S3. An environment on AWS. Followed by creating an S3 bucket, setting up an EMR cluster, installing and configuring Apache Spark on the cluster, preparing data in S3 for processing, processing data with Apache Spark, best practices, and performance tuning.&lt;/p&gt;


&lt;h2&gt;
  
  
  Setting up the Environment
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Memphis&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;To get started, first &lt;a href="https://docs.memphis.dev/"&gt;install&lt;/a&gt; Memphis.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Enable AWS S3 integration via the Memphis &lt;a href="https://docs.memphis.dev/memphis/dashboard-gui/integrations/storage/amazon-s3"&gt;integration center&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/NeOn-bD4jksrJYbgWQb9DFfyBjrKft5CQpniKmILGYU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMDF0/a2s0cHdhcDNrMmZ4/Y2ZvdDAucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/NeOn-bD4jksrJYbgWQb9DFfyBjrKft5CQpniKmILGYU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMDF0/a2s0cHdhcDNrMmZ4/Y2ZvdDAucG5n" alt="Amazon S3" width="800" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a station (topic), and choose a retention policy.
Each message passing the configured retention policy will be offloaded to an S3 bucket.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/qHKz6S6GON1jtK-SONB1bBzKKdZxbtn7exDMQ9lazU4/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvc3h3/b3dmd29kdzZ4eXlh/OTNscWEucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/qHKz6S6GON1jtK-SONB1bBzKKdZxbtn7exDMQ9lazU4/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvc3h3/b3dmd29kdzZ4eXlh/OTNscWEucG5n" alt="create new station" width="800" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Check the newly configured AWS S3 integration as 2nd storage class by clicking “Connect”.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/ZXmccJB3Eny5kVnmqXRWem5iZprCiaDOve7DhnmvhvU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMXRs/ajFmcDBndzRscHVq/cDN1bTcucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/ZXmccJB3Eny5kVnmqXRWem5iZprCiaDOve7DhnmvhvU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMXRs/ajFmcDBndzRscHVq/cDN1bTcucG5n" alt="Integration" width="800" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Start producing events into your newly created Memphis station.&lt;/li&gt;
&lt;/ul&gt;


&lt;h2&gt;
  
  
  Create an AWS S3 Bucket
&lt;/h2&gt;

&lt;p&gt;If you haven't done so already, first you need to create an AWS account at &lt;a href="https://aws.amazon.com/"&gt;https://aws.amazon.com/&lt;/a&gt;. Next, create an S3 bucket where you can store your data. You can use the AWS Management Console, the AWS CLI, or an SDK to create a bucket. For this tutorial, you will use the AWS management console at &lt;a href="https://console.aws.amazon.com/s3/"&gt;https://console.aws.amazon.com/s3/&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Click on "Create bucket".&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/YhWGXWQ29j37mGYVBSZxZvGgbilq8sLP_7SwHzJWacw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdGJi/ODZ6NjJpN2Rwc280/OGp0OWwuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/YhWGXWQ29j37mGYVBSZxZvGgbilq8sLP_7SwHzJWacw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdGJi/ODZ6NjJpN2Rwc280/OGp0OWwuanBn" alt="Amazon S3" width="800" height="415"&gt;&lt;/a&gt;&lt;br&gt;
Then proceed to create a bucket name complying with the naming convention and choose the region where you want the bucket to be located. Configure the “Object ownership” and “Block all public access” to your use case.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/ldNMHRT8jVK4vfx4pNaxG3WdQDkLnZhJqKviIJ0JKaw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcW5t/NnRhajJzYTQycWZs/MnRoeDQuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/ldNMHRT8jVK4vfx4pNaxG3WdQDkLnZhJqKviIJ0JKaw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvcW5t/NnRhajJzYTQycWZs/MnRoeDQuanBn" alt="Image12" width="800" height="473"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Make sure to configure other bucket permissions to allow your Spark application to access the data. Finally, click on the “Create bucket” button to create the bucket.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/rUCaSr2ApjxeE54R2vvs-TjabzvrHmZniiWi0Q3favk/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdzE0/dXc2NXE5MThnandm/NWFzY2cuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/rUCaSr2ApjxeE54R2vvs-TjabzvrHmZniiWi0Q3favk/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvdzE0/dXc2NXE5MThnandm/NWFzY2cuanBn" alt="Image description" width="800" height="439"&gt;&lt;/a&gt;&lt;/p&gt;


&lt;h2&gt;
  
  
  Setting up an EMR Cluster with Spark installed
&lt;/h2&gt;

&lt;p&gt;The Amazon Elastic MapReduce (EMR) is a web service based on Apache Hadoop that allows users to cost-effectively process vast amounts of data using big data technologies including Apache Spark. To create an EMR cluster with Spark installed, open the EMR console at &lt;a href="https://console.aws.amazon.com/emr/"&gt;https://console.aws.amazon.com/emr/&lt;/a&gt; and select "Clusters" under "EMR on EC2" on the left side of the page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/NiD5XiA58xcEcF6FpoXrHmZPOZiiAOmGyAJsjeSZfvU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvam42/ZGN2cXhoZDI5cWEz/aGowM3EuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/NiD5XiA58xcEcF6FpoXrHmZPOZiiAOmGyAJsjeSZfvU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvam42/ZGN2cXhoZDI5cWEz/aGowM3EuanBn" alt="Image description" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click on "Create cluster" and give the cluster a descriptive name.&lt;br&gt;
Under "Application bundle", select Spark to install it on your cluster.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/xq-aVbK56mEM5ykZRkkzOcyKdZuJrWolVPVO_JSzkOE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZXl1/OXJvb3JpZnoxejQ4/YWNidXkuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/xq-aVbK56mEM5ykZRkkzOcyKdZuJrWolVPVO_JSzkOE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZXl1/OXJvb3JpZnoxejQ4/YWNidXkuanBn" alt="Image description" width="800" height="601"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Scroll down to the "Cluster logs" section and select the checkbox of Publish cluster-specific logs to Amazon S3.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/IBcxhJk68HQ8hFCuV1Ep4NRCbF8EGf08BEuvZK1dAS4/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvc2c0/aWYycHAxbXgwb3N1/NG5hYnQuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/IBcxhJk68HQ8hFCuV1Ep4NRCbF8EGf08BEuvZK1dAS4/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvc2c0/aWYycHAxbXgwb3N1/NG5hYnQuanBn" alt="Image description" width="800" height="517"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This will create a prompt to enter the Amazon S3 location using the S3 bucket name you created in the previous step followed by /logs, ie., s3://myawsbucket/logs. /logs are required by Amazon to create a new folder in your bucket where Amazon EMR can copy the log files of your cluster. &lt;/p&gt;

&lt;p&gt;Go to the “Security configuration and permissions section” and input your EC2 key pair or go with the option to create one.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/FP-1PVMWwt-8Ur-OfRe_40Ky0vu3tnWHs8uzA-CxzLw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMWhi/dGp5ejVwNjAzeHlh/ejFuNXkuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/FP-1PVMWwt-8Ur-OfRe_40Ky0vu3tnWHs8uzA-CxzLw/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvMWhi/dGp5ejVwNjAzeHlh/ejFuNXkuanBn" alt="Image description" width="800" height="462"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then click on the dropdown options for “Service role for Amazon EMR” and choose AWSServiceRoleForSupport. Choose the same dropdown option for “IAM role for instance profile”. Refresh the icon if need be to get these dropdown options&lt;/p&gt;

&lt;p&gt;Finally, click the “Create cluster” button to launch the cluster and monitor the cluster status to validate that it’s been created.&lt;/p&gt;


&lt;h2&gt;
  
  
  Installing and configuring Apache Spark on EMR Cluster
&lt;/h2&gt;

&lt;p&gt;After successfully creating an EMR cluster the next step will be to configure Apache Spark on the EMR Cluster. The EMR clusters provide a managed environment for running Spark applications on AWS infrastructure, making it easy to launch and manage Spark clusters in the cloud. It configures Spark to work with your data and processing needs and then submits Spark jobs to the cluster to process your data.&lt;/p&gt;

&lt;p&gt;You can configure Apache Spark to the cluster with the Secure Shell (SSH) protocol. But first, you need to authorize the SSH security connections to your cluster which was set by default when you created the EMR cluster. A guide on how to authorize SSH connections can be found here.&lt;/p&gt;

&lt;p&gt;To create an SSH connection, you need to specify the EC2 key pair that you selected when creating the cluster. Then connect to the EMR cluster using the Spark shell by first connecting the primary node. You first need to fetch the master public DNS of the primary node by navigating to the left of the AWS console, under EMR on EC2, choose Clusters, and then select the cluster of the public DNS name you want to get.&lt;/p&gt;

&lt;p&gt;On your OS terminal, input the following command.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;ssh hadoop@ec2-###-##-##-###.compute-1.amazonaws.com -i ~/mykeypair.pem&lt;/code&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Replace the ec2-###-##-##-###.compute-1.amazonaws.com with the name of your master public DNS and the ~/mykeypair.pem with the file and path name of your .pem file (Follow this guide to get the .pem file). A prompt message will pop up to which your response should be yes. Type in exit to close the SSH command.
&lt;/h2&gt;
&lt;h2&gt;
  
  
  Preparing Data for Processing with Spark and uploading to S3 Bucket
&lt;/h2&gt;

&lt;p&gt;Data processing requires preparation before uploading to present the data in a format that Spark can easily process. The format used is influenced by the type of data you have and the analysis you plan to perform. Some formats used include CSV, JSON, and Parquet.&lt;/p&gt;

&lt;p&gt;Create a new Spark session and load your data into Spark using the relevant API. For instance, use the spark.read.csv() method to read CSV files into a Spark DataFrame. &lt;/p&gt;

&lt;p&gt;Amazon EMR, a managed service for Hadoop ecosystem clusters, can be used to process data. It reduces the need to set up, tune, and maintain clusters. It also features other integrations with &lt;a href="https://aws.amazon.com/sagemaker/"&gt;Amazon SageMaker&lt;/a&gt;, for example, to start a SageMaker model training job from a Spark pipeline in Amazon EMR.&lt;/p&gt;

&lt;p&gt;Once your data is ready, using the DataFrame.write.format("s3") method, you can read a CSV file from the Amazon S3 bucket into a Spark DataFrame. You should have configured your AWS credentials and have write permissions to access the S3 bucket.&lt;/p&gt;

&lt;p&gt;Indicate the S3 bucket and path where you want to save the data. For example, you can use the &lt;code&gt;df.write.format("s3").save("s3://my-bucket/path/to/data")&lt;/code&gt;method to save the data to the specified S3 bucket.&lt;/p&gt;

&lt;p&gt;Once your data is ready, using the DataFrame.write.format(“s3”) method, you can read a CSV file from the Amazon S3 bucket into a Spark DataFrame. You should have configured your AWS credentials and have written permissions to access the S3 bucket.&lt;/p&gt;

&lt;p&gt;Indicate the S3 bucket and path where you want to save the data. For example, you can use the df.write.format(“s3”).save(“s3://my-bucket/path/to/data”) method to save the data to the specified S3 bucket.&lt;/p&gt;

&lt;p&gt;Once the data is saved to the S3 bucket, you can access it from other Spark applications or tools, or you can download it for further analysis or processing. To upload the bucket, create a folder and choose the bucket you initially created. Choose the Actions button, and click on “Create Folder” in the drop-down items. You can now name the new folder.&lt;/p&gt;

&lt;p&gt;To upload the data files to the bucket, select the name of the data folder.&lt;/p&gt;

&lt;p&gt;In the Upload – Select “Files wizard” and choose Add Files.&lt;/p&gt;

&lt;p&gt;Proceed with the Amazon S3 console direction to upload the files and select “Start Upload”.&lt;/p&gt;
&lt;h2&gt;
  
  
  It’s important to consider and ensure best practices for securing your data before uploading your data to the S3 bucket.
&lt;/h2&gt;
&lt;h2&gt;
  
  
  Understanding Data Formats and Schemas
&lt;/h2&gt;

&lt;p&gt;Data formats and schemas are two related but completely different and important concepts in data management. Data format refers to the organization and structure of data within the database. There are various formats to store data, ie., CSV, JSON, XML, YAML, etc. These formats define how data should be structured alongside the different types of data and applications applicable to it. While data schemas are the structure of the database itself. It defines the layout of the database and ensures that data is stored appropriately. A database schema specifies the views, tables, indexes, types, and other elements. These concepts are important in analytics and the visualization of the database.&lt;/p&gt;


&lt;h2&gt;
  
  
  Cleaning and Preprocessing Data in S3
&lt;/h2&gt;

&lt;p&gt;It is essential to double-check for errors in your data before processing it. To get started, access the data folder you saved the data file in your S3 bucket, and download it to your local machine. Next, you will load the data into the data processing tool which would be used to clean and preprocess the data. For this tutorial, the preprocessing tool used is Amazon Athena which helps to analyze unstructured and structured data stored in Amazon S3&lt;/p&gt;

&lt;p&gt;Go to the Amazon Athena in AWS Console.&lt;/p&gt;

&lt;p&gt;Click on “Create” to create a new table and then “CREATE TABLE”.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/A0jFMmBXZB23bxU9HFp8Wkgl6fOjMTREvboEsdxWLYU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbzB3/NmE3a3p3Mzc1YXJh/MHd3bGIuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/A0jFMmBXZB23bxU9HFp8Wkgl6fOjMTREvboEsdxWLYU/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbzB3/NmE3a3p3Mzc1YXJh/MHd3bGIuanBn" alt="Image description" width="800" height="479"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Type in the path of your data file in the part highlighted as LOCATION. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/Cpr1s49p7NtQzl2zxT_1AqPiYp7BDNM6ZW3FmyrhCjY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM2dq/bzgwcGR3amVnZ2N2/cW0yZzQuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/Cpr1s49p7NtQzl2zxT_1AqPiYp7BDNM6ZW3FmyrhCjY/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvM2dq/bzgwcGR3amVnZ2N2/cW0yZzQuanBn" alt="Image description" width="800" height="469"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Go along with the prompts to define the schema for the data and save the table. Now, you can run a query to validate that the data is loaded correctly and then clean and preprocess the data&lt;br&gt;
An example:&lt;br&gt;
This query identifies the duplicates present in the data&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT row1, row2, COUNT(*)
FROM table
GROUP row, row2
HAVING COUNT(*) &amp;gt; 1;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This example creates a new table without the duplicates&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE new_table AS
SELECT DISTINCT *
FROM table;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, export the cleaned data back to S3 by navigating to the S3 bucket and the folder to upload the file.&lt;/p&gt;




&lt;h2&gt;
  
  
  Understanding the Spark Framework
&lt;/h2&gt;

&lt;p&gt;The Spark framework is an open-source, simple, and expressive cluster computing system which was built for rapid development. It is based on the Java programming language and serves as an alternative to other Java frameworks. The core feature of Spark is its in-memory data computing abilities which speed up the processing of large datasets.&lt;/p&gt;




&lt;h2&gt;
  
  
  Configuring Spark to work with S3
&lt;/h2&gt;

&lt;p&gt;To configure Spark to work with S3 begin by adding the Hadoop AWS dependency to your Spark application. Do this by adding the following line to your build file (e.g. build.sbt for Scala or pom.xml for Java):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.3.1"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Input the AWS access key ID and secret access key in your Spark application by setting the following configuration properties:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.hadoop.fs.s3a.access.key &amp;lt;ACCESS_KEY_ID&amp;gt;
spark.hadoop.fs.s3a.secret.key &amp;lt;SECRET_ACCESS_KEY&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Set the following properties using the SparkConf object in your code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val conf = new SparkConf()
  .set("spark.hadoop.fs.s3a.access.key", "&amp;lt;ACCESS_KEY_ID&amp;gt;")
  .set("spark.hadoop.fs.s3a.secret.key", "&amp;lt;SECRET_ACCESS_KEY&amp;gt;")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Set the S3 endpoint URL in your Spark application by setting the following configuration property:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.hadoop.fs.s3a.endpoint s3.&amp;lt;REGION&amp;gt;.amazonaws.com
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Replace &lt;strong&gt;&lt;/strong&gt; with the AWS region where your S3 bucket is located (e.g. us-east-1).&lt;br&gt;
A DNS-compatible bucket name is required to grant the S3 client in Hadoop access for the S3 requests. If your bucket name contains dots or underscores, you may need to enable path style access for the sake of the S3 client in Hadoop which uses a virtual host style. Set the following configuration property to enable path access:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.hadoop.fs.s3a.path.style.access true
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, create a Spark session with the S3 configuration by setting the &lt;strong&gt;spark.hadoop&lt;/strong&gt; prefix in the Spark configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.hadoop.fs.s3a.access.key", "&amp;lt;ACCESS_KEY_ID&amp;gt;")
  .config("spark.hadoop.fs.s3a.secret.key", "&amp;lt;SECRET_ACCESS_KEY&amp;gt;")
  .config("spark.hadoop.fs.s3a.endpoint", "s3.&amp;lt;REGION&amp;gt;.amazonaws.com")
  .getOrCreate()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Replace the fields of** , &lt;strong&gt;, and&lt;/strong&gt; ** with your AWS credentials and S3 region.&lt;/p&gt;

&lt;p&gt;To read the data from S3 in Spark, the spark.read method will be used and then specify the S3 path to your data as the input source.&lt;/p&gt;

&lt;p&gt;An example code demonstrating how to read a CSV file from S3 into a DataFrame in Spark:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val spark = SparkSession.builder()
  .appName("ReadDataFromS3")
  .getOrCreate()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val df = spark.read
  .option("header", "true") // Specify whether the first line is the header or not
  .option("inferSchema", "true") // Infer the schema automatically
  .csv("s3a://&amp;lt;BUCKET_NAME&amp;gt;/&amp;lt;FILE_PATH&amp;gt;")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this example, replace &lt;strong&gt;&lt;/strong&gt; with the name of your S3 bucket and &lt;strong&gt;&lt;/strong&gt; with the path to your CSV file within the bucket.&lt;/p&gt;




&lt;h2&gt;
  
  
  Transforming Data with Spark
&lt;/h2&gt;

&lt;p&gt;Transforming data with Spark typically refers to operations on data to clean, filter, aggregate, and join data. Spark makes available a rich set of APIs for data transformation, they include DataFrame, Dataset, and RDD APIs. Some of the common data transformation operations in Spark include filtering, selecting columns, aggregating data, joining data, and sorting data.&lt;/p&gt;

&lt;p&gt;Here’s one example of data transformation operations:&lt;/p&gt;

&lt;p&gt;Sorting data: This operation involves sorting data based on one or more columns. The &lt;strong&gt;orderBy&lt;/strong&gt; or &lt;strong&gt;sort&lt;/strong&gt; method on a DataFrame or Dataset is used to sort data based on one or more columns. For example&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val sortedData = df.orderBy(col("age").desc)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you may need to write the result back to S3 to store the results.&lt;/p&gt;

&lt;p&gt;Spark provides various APIs to write data to S3, such as DataFrameWriter, DatasetWriter, and RDD.saveAsTextFile.&lt;/p&gt;

&lt;p&gt;The following is a code example demonstrating how to write a DataFrame to S3 in Parquet format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val outputS3Path = "s3a://&amp;lt;BUCKET_NAME&amp;gt;/&amp;lt;OUTPUT_DIRECTORY&amp;gt;"

df.write
  .mode(SaveMode.Overwrite)
  .option("compression", "snappy")
  .parquet(outputS3Path)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Replace the input field of the &lt;strong&gt;&lt;/strong&gt; with the name of your S3 bucket, and &lt;strong&gt;&lt;/strong&gt; with the path to the output directory in the bucket.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;mode&lt;/strong&gt; method specifies the write mode, which can be &lt;strong&gt;Overwrite&lt;/strong&gt;, &lt;strong&gt;Append&lt;/strong&gt;, &lt;strong&gt;Ignore&lt;/strong&gt;, or &lt;strong&gt;ErrorIfExists&lt;/strong&gt;. The &lt;strong&gt;option&lt;/strong&gt; method can be used to specify various options for the output format, such as compression codec.&lt;/p&gt;

&lt;p&gt;You can also write data to S3 in other formats, such as CSV, JSON, and Avro, by changing the output format and specifying the appropriate options.&lt;/p&gt;




&lt;h2&gt;
  
  
  Understanding Data Partitioning in Spark
&lt;/h2&gt;

&lt;p&gt;In simple terms, data partitioning in spark refers to the splitting of the dataset into smaller, more manageable portions across the cluster. The purpose of this is to optimize performance, reduce scalability and ultimately improve database manageability. In Spark, data is processed in parallel on several clusters. This is made possible by Resilient Distributed Datasets (RDD) which are a collection of huge, complex data. By default, RDD is partitioned across various nodes due to their size.&lt;/p&gt;

&lt;p&gt;To perform optimally, there are ways to configure Spark to make sure jobs are executed promptly and the resources are managed effectively. Some of these include caching, memory management, data serialization, and the use of mapPartitions() over map().&lt;/p&gt;

&lt;p&gt;Spark UI is a web-based graphical user interface that provides comprehensive information about a Spark application’s performance and resource usage. It includes several pages such as Overview, Executors, Stages, and Tasks, that provide information about various aspects of a Spark job. Spark UI is an essential tool for monitoring and debugging Spark applications, as it helps identify performance bottlenecks, and resource constraints, and troubleshoot errors. By examining metrics such as the number of completed tasks, duration of the job, CPU and memory usage, and shuffle data written and read, users can optimize their Spark jobs and ensure they run efficiently.&lt;/p&gt;




&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In summary, processing your data on AWS S3 using Apache Spark is an effective and scalable way to analyze huge datasets. By utilizing the cloud-based storage and computing resources of AWS S3 and Apache Spark, users can process their data fast and effectively without having to worry about architecture management.&lt;/p&gt;

&lt;p&gt;In this tutorial, we went through setting up an S3 bucket and Apache Spark cluster on AWS EMR, configuring Spark to work with AWS S3, and writing and running Spark applications to process data. We also covered data partitioning in Spark, Spark UI, and optimizing performance in Spark.&lt;/p&gt;

&lt;p&gt;Reference:&lt;br&gt;
For more depth into configuring spark for optimal performance, look &lt;a href="https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui.html"&gt;here&lt;/a&gt;.&lt;br&gt;
&lt;a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html"&gt;https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/"&gt;Memphis.dev&lt;/a&gt; By Sveta Gimpelson, Co-Founder &amp;amp; VP of Data &amp;amp; Research at @Memphis.dev&lt;/p&gt;

&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>apachespark</category>
      <category>memphisdev</category>
      <category>streamprocessing</category>
    </item>
    <item>
      <title>Stateful stream processing with Memphis and Apache Iceberg</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Thu, 09 Mar 2023 13:16:01 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/stateful-stream-processing-with-memphis-and-apache-iceberg-37a2</link>
      <guid>https://community.ops.io/memphis_dev/stateful-stream-processing-with-memphis-and-apache-iceberg-37a2</guid>
      <description>&lt;p&gt;Amazon Web Services S3 (Simple Storage Service) is a fully managed cloud storage service designed to store and access any amount of data anywhere. It is an object-based storage system that enables data storage and retrieval while providing various features such as data security, high availability, and easy access. Its scalability, durability, and security make it popular with businesses of all sizes.&lt;/p&gt;

&lt;p&gt;Apache Iceberg is an open-source tabular format for data warehousing that enables efficient and scalable data processing on cloud object stores, including AWS S3. It is designed to provide efficient query performance and optimize data storage while supporting ACID transactions and data versioning. The Iceberg format is optimized for cloud object storage, enabling fast query processing while minimizing storage costs.&lt;/p&gt;

&lt;p&gt;Memphis is a next-generation alternative to traditional message brokers.&lt;br&gt;
A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.&lt;/p&gt;

&lt;p&gt;The common pattern of message brokers is to delete messages after passing the defined retention policy, like time/size/number of messages. Memphis offers a 2nd storage tier for longer, possibly infinite retention for stored messages. Each message that expels from the station will automatically migrate to the 2nd storage tier, which in that case is AWS S3.&lt;br&gt;
More can be found &lt;a href="https://docs.memphis.dev/memphis/memphis/concepts/storage-and-redundancy#storage-tiering"&gt;here&lt;/a&gt;.&lt;/p&gt;


&lt;h2&gt;
  
  
  AWS S3 features
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Scalability: AWS S3 is highly scalable and can store and retrieve any data, from a few gigabytes to petabytes.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Durability: S3 is designed to provide high durability, ensuring your data is always available and secure.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Safety: S3 offers various security features such as encryption and access control so you can protect your data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Accessibility: S3 is designed for easy access, making it easy to store and access your data from anywhere in the world.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Cost efficient: S3 is designed to be a cost-effective solution with usage-based pricing and no upfront costs.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The purpose of processing data using Apache Iceberg is to optimize query performance and storage efficiency for large-scale data sets, while also providing a range of features to help manage and analyze data in the cloud. Here are some of the key benefits of using Apache Iceberg for data processing:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Efficient query performance: Apache Iceberg is designed to provide efficient query performance for large amounts of data by using partitioning and indexing to read only the data needed for a particular query. This enables faster and more accurate data processing, even for huge amounts of data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data versioning: Apache Iceberg supports data versioning, so you can store and manage multiple versions of your data in the same spreadsheet. This allows you to access historical data at any time and easily track changes over time.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;ACID transactions: Apache Iceberg supports ACID transactions to ensure data consistency and accuracy at all times. This is especially important when working with mission-critical data, as it ensures that your data is always reliable and up-to-date.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Optimized data storage: Apache Iceberg optimizes data storage by only reading and writing the data needed for a given query. This helps to minimize storage costs and ensures that you’re only paying for the data that you’re actually using.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Flexibility: Apache Iceberg supports ACID transactions to ensure data consistency and accuracy at all times. This is especially important when working with mission-critical data, as it ensures that your data is always reliable and up-to-date.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Overall, the purpose of processing data with Apache Iceberg is to provide a more efficient and reliable solution for managing and processing large amounts of data in the cloud. With Apache Iceberg, you can optimize query performance, minimize storage costs, and ensure data consistency and freshness at all times.&lt;/p&gt;


&lt;h2&gt;
  
  
  Setting up Memphis
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;To get started, first &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;install&lt;/a&gt; Memphis.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Enable AWS S3 integration via the Memphis &lt;a href="https://docs.memphis.dev/memphis/dashboard-gui/integrations/storage/amazon-s3"&gt;integration center.&lt;br&gt;
&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/Z2my_U_KsYp7DLuwfpHXpVqg3ogoLKtVSBs1_VWCqps/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva2Z3/MXNuNzB6MnhjMWh5/bWs0cWoucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/Z2my_U_KsYp7DLuwfpHXpVqg3ogoLKtVSBs1_VWCqps/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva2Z3/MXNuNzB6MnhjMWh5/bWs0cWoucG5n" alt="setting up memphis" width="800" height="442"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Create a station (topic), and choose a retention policy.
Each message passing the configured retention policy will be offloaded to an S3 bucket.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/PGPTxhTSOZ98U8N_vY7h2aqMIiseAGukqZuy1eojf7A/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva3dw/dWt1MGJlNHliY3h3/MmZ0YmMucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/PGPTxhTSOZ98U8N_vY7h2aqMIiseAGukqZuy1eojf7A/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva3dw/dWt1MGJlNHliY3h3/MmZ0YmMucG5n" alt="create station" width="800" height="442"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Check the newly configured AWS S3 integration as 2nd storage class by clicking “Connect”.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Start producing events into your newly created Memphis station.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;


&lt;h2&gt;
  
  
  Setting up AWS S3 and Apache Iceberg
&lt;/h2&gt;

&lt;p&gt;To get started, you’ll need an AWS S3 account Creating an AWS S3 account is a simple process. Here are the steps to follow:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Go to the AWS homepage (&lt;a href="https://aws.amazon.com/"&gt;https://aws.amazon.com/&lt;/a&gt;) and click on the “Sign In to the Console” button in the top right corner.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/9ZeLviTSQFHhoLMsym1_F-0cIGYkNFNsgYv7deVN-Zg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvamZj/ajNuOXgweXY1NDM0/YWo1c3gucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/9ZeLviTSQFHhoLMsym1_F-0cIGYkNFNsgYv7deVN-Zg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvamZj/ajNuOXgweXY1NDM0/YWo1c3gucG5n" alt="aws" width="800" height="404"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;If you already have an AWS account, enter your login details and click “Sign In”. If you don’t have an AWS account, click “Create a new AWS account” and follow the instructions to create a new account.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Once you’re logged into the AWS console, click on the “Services” dropdown menu in the top left corner and select “S3” from the “Storage” section.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/V0ZIGt-2qkyJAzhtR5Tp_5E-did-R_4uhL6MK2WULKE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvYWVx/ZGdtaXR6M3ZoenZ5/dTI4dGgucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/V0ZIGt-2qkyJAzhtR5Tp_5E-did-R_4uhL6MK2WULKE/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvYWVx/ZGdtaXR6M3ZoenZ5/dTI4dGgucG5n" alt="aws services" width="800" height="354"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;You’ll be taken to the S3 dashboard, where you can create and manage your S3 buckets. To create a new bucket, click the “Create bucket” button.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/tS9LeUmUVHU3x7p8TptxU82n1FgLx8wVK6cvMfLyYLo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaDB5/aDJhYjZ2eHJlYWJ5/bTd6M2cucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/tS9LeUmUVHU3x7p8TptxU82n1FgLx8wVK6cvMfLyYLo/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaDB5/aDJhYjZ2eHJlYWJ5/bTd6M2cucG5n" alt="aws buckets" width="800" height="393"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Enter a unique name for your bucket (bucket names must be unique across all of AWS) and select the region where you want to store your data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;You can choose to configure additional settings for your bucket, such as versioning, encryption, and access control. Once you’ve configured your settings, click “Create bucket” to create your new bucket.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/jA_JTR9G9wdwA4pMA89nmjl97sYG0FdvknJwpeMt-v8/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaTd2/OGkxdndiaWd3ZHR1/bDd3aXQuanBn" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/jA_JTR9G9wdwA4pMA89nmjl97sYG0FdvknJwpeMt-v8/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaTd2/OGkxdndiaWd3ZHR1/bDd3aXQuanBn" alt="aws s3" width="800" height="459"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;That’s it! You’ve now created an AWS S3 account and created your first S3 bucket. You can use this bucket to store and manage your data in the cloud and then you must have installed Apache Iceberg on your system. You can download Apache Iceberg from the official website, or you can install it using Apache Maven or Gradle.&lt;/p&gt;



&lt;p&gt;&lt;a href="https://community.ops.io/images/SBWD3gzkY0HEz9KL0u_-_llcjKUm7KG5rw_E5c30zTI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbXFv/OWYxdzN1YnlkaXl5/Mnp4aWkucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/SBWD3gzkY0HEz9KL0u_-_llcjKUm7KG5rw_E5c30zTI/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbXFv/OWYxdzN1YnlkaXl5/Mnp4aWkucG5n" alt="apache iceberg" width="800" height="401"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once you have Apache Iceberg installed, create an AWS S3 bucket where you can store your data. You can do this using the AWS S3 web console, or you can use the AWS CLI by running the following command:&lt;br&gt;
“aws s3 mb s3://bucket-name” and replace “bucket-name” with the name of your bucket.&lt;/p&gt;

&lt;p&gt;After creating the bucket, To create a table using Apache Iceberg, you can use the Iceberg Java API or the Iceberg CLI. Here’s an example of how to create a table using the Java API:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;First, you need to add the Iceberg library to your project. You can do this by adding the following dependency to your build file (e.g. Maven, Gradle):
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;dependency&amp;gt;
  &amp;lt;groupId&amp;gt;org.apache.iceberg&amp;lt;/groupId&amp;gt;
  &amp;lt;artifactId&amp;gt;iceberg-core&amp;lt;/artifactId&amp;gt;
  &amp;lt;version&amp;gt;0.11.0&amp;lt;/version&amp;gt;
&amp;lt;/dependency&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ol&gt;
&lt;li&gt;Create a Schema object that defines the columns and data types for your table:
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Schema schema = new Schema(
    required(1, "id", Types.IntegerType.get()),
    required(2, "name", Types.StringType.get()),
    required(3, "age", Types.IntegerType.get()),
    required(4, "gender", Types.StringType.get())
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;In this example, the schema defines four columns: id (an integer), name (a string), age (an integer), and gender (a string).&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Create a PartitionSpec object that defines how your data will be partitioned. This is optional, but it can improve query performance by allowing you to only read the data that’s relevant to a given query. Here’s an example:
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
 .identity("gender")
 .bucket("age", 10)
 .build
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;In this example, we’re partitioning the data by gender and age. We’re using bucketing to group ages into 10 buckets, which will make queries for specific age ranges faster.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Create a Table object that represents your table. You’ll need to specify the name of your table and the location where the data will be stored (in this example, we’re using an S3 bucket):
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Table table = new HadoopTables(new Configuration())
 .create(schema, partitionSpec, "s3://my-bucket/my-table");
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;This will create a new table with the specified schema and partitioning in the S3 bucket. You can now start adding data to the table and running queries against it.&lt;/p&gt;

&lt;p&gt;Alternatively, you can use the Iceberg CLI to create a table. Here’s an example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Open a terminal window and navigate to the directory where you want to create your table.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Run the following command to create a new table with the specified schema and partitioning:&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;iceberg table create \
 --schema "id:int,name:string,age:int,gender:string" \
 --partition-spec "gender:identity,age:bucket[10]" \
 s3://my-bucket/my-table
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;This will create a new table with the specified schema and partitioning in the S3 bucket. You can now start adding data to the table and running queries against it and that’s how you can create a table using Apache Iceberg.&lt;/p&gt;


&lt;h2&gt;
  
  
  Converting data to String or JSON
&lt;/h2&gt;

&lt;p&gt;Converting data to strings or JSON is a common task when processing data with Apache Iceberg. This is useful for various reasons to prepare data for downstream applications, to export data to other systems, or simply to make it easier for humans to read. The method is as follows:&lt;/p&gt;

&lt;p&gt;Identify the data to transform. Before converting data to string or JSON, you need to identify the data to convert. This can be the entire table, a subset of the table, or a single row. Once the data is identified, it can be converted to strings or JSON using the Apache Iceberg API. Convert data to string or JSON using Apache Iceberg API.&lt;/p&gt;

&lt;p&gt;Expressions and row classes can be used to convert data to strings or JSON in Apache Iceberg. Here’s an example of how to convert a line to a JSON string.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Row row = table.newScan().limit(1).asRow().next();
String json = JsonUtil.toJson(row, table.schema());
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This example scans a table and returns the first row as a Row object. Then use the JsonUtil class and the table’s schema to convert the row to a JSON string. You can use this approach to convert a single line to a JSON string or convert multiple lines to an array of JSON objects.&lt;/p&gt;

&lt;p&gt;Here is an example of converting a table to a CSV string:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;String csv = new CsvWriter(table.schema()).writeToString(table.newScan());
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This example uses the CsvWriter class to convert the entire table to a CSV string. The CsvWriter class takes the table’s schema as a parameter and allows you to specify additional options such as delimiters and double quotes.&lt;br&gt;
Save the transformed data to AWS S3. After converting the data to String or JSON, you can save it to AWS S3 using the HadoopFileIO class. Here’s an example of how to save a JSON string to an S3 bucket.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
HadoopFileIO fileIO = new HadoopFileIO(new Configuration());
try (OutputStream out = fileIO.create(new Path("s3://my-bucket/my-file.json"))) {
 out.write(jsonBytes); }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This example converts a JSON string to a byte array, creates a new HadoopFileIO object, and writes the byte array to an S3 file. You can use this approach to store any type of transformed data (CSV, TSV, etc.) in S3.&lt;/p&gt;




&lt;h2&gt;
  
  
  Flattening Schema
&lt;/h2&gt;

&lt;p&gt;Schema flattening is the process of converting a nested schema into a flat schema with all columns at the same level. This helps facilitate data querying and analysis. To flatten the schema using Apache Iceberg:&lt;/p&gt;

&lt;p&gt;Before simplifying the schema, you must identify the schema to simplify. This can be a schema that you have created yourself or a schema that is part of a larger dataset that you extract and analyze. Once you have identified a schema to simplify, you can simplify it using the Iceberg Java API. Here’s an example of how to simplify the schema.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Schema nestedSchema = new Schema(
  required(1, "id", Types.LongType.get()),
  required(2, "name", Types.StringType.get()),
  required(3, "address", Types.StructType.of(
 required(4, "street", Types.StringType.get()),
 required(5, "city", Types.StringType.get()),
 required(6, "state", Types.StringType.get()),
 required(7, "zip", Types.StringType.get())
  ))
);

Schema flattenedSchema = new Schema(
  required(1, "id", Types.LongType.get()),
  required(2, "name", Types.StringType.get()),
  required(3, "address_street", Types.StringType.get()),
  required(4, "address_city", Types.StringType.get()),
  required(5, "address_state", Types.StringType.get()),
  required(6, "address_zip", Types.StringType.get())
);

Transform flatten = new Transform(
  OperationType.FLATTEN,
  ImmutableMap.of(
 "address.street", "address_street",
 "address.city", "address_city",
 "address.state", "address_state",
 "address.zip", "address_zip"
  )
);

Schema newSchema = new Schema(flattenedSchema.columns());
newSchema = newSchema.updateMetadata(
  IcebergSchemaUtil.TRANSFORMS_PROP,
  TransformUtil.toTransformList(flatten).toString()
);

Table table = new HadoopTables(conf).create(
  newSchema, PartitionSpec.unpartitioned(), "s3://my-bucket/my-table"
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this example, we have a nested schema with a name and address field. We want to flatten the address field into separate columns for street, city, state, and zip.&lt;/p&gt;

&lt;p&gt;To do this, we first create a new schema that represents the flattened schema. We then create a Transform that specifies how to flatten the original schema. In this case, we’re using the FLATTEN operation to create new columns with the specified names. We then create a new schema that includes the flattened columns and metadata that specifies the transformation that was applied. Once you’ve flattened the schema, you can save it to AWS S3 using the Table object that you created. Here’s an example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;table.updateSchema()
  .commit();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will save the flattened schema to the S3 bucket that you specified when you created the table. That’s how you can flatten a schema using Apache Iceberg and save it to AWS S3.&lt;/p&gt;




&lt;p&gt;In conclusion, processing and managing large amounts of data in AWS S3 can be challenging, especially when dealing with nested schemas and complex queries. Apache Iceberg provides a powerful and efficient solution to these challenges, giving users a scalable and cost-effective way to process and query large amounts of data. This tutorial showed how to use Apache Iceberg on AWS S3 to process and manage data. We’ve seen how to create tables, convert data to strings or JSON, and simplify schemas to make data more accessible. Armed with this knowledge, you can now use Apache Iceberg to process and manage large amounts of data on AWS S3.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at &lt;a href="https://memphis.dev/blog/stateful-stream-processing-with-memphis-and-apache-iceberg/"&gt;Memphis.dev&lt;/a&gt; By &lt;a href="https://www.linkedin.com/in/idan-asulin/"&gt;Idan Asulin&lt;/a&gt;, Co-Founder &amp;amp; CTO at Memphis.dev&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>apacheaiceberg</category>
      <category>memphisdev</category>
      <category>streamprocessing</category>
    </item>
    <item>
      <title>Comparing Top 3 Schema Management Tools</title>
      <dc:creator>Avital Trifsik</dc:creator>
      <pubDate>Wed, 01 Mar 2023 13:38:38 +0000</pubDate>
      <link>https://community.ops.io/memphis_dev/comparing-top-3-schema-management-tools-3cj6</link>
      <guid>https://community.ops.io/memphis_dev/comparing-top-3-schema-management-tools-3cj6</guid>
      <description>&lt;p&gt;Introduction to schemas&lt;/p&gt;

&lt;p&gt;Before deepening into the different supporting technologies, let’s create a baseline about schemas and message brokers or async server-server communication.&lt;/p&gt;

&lt;p&gt;Schema = Struct.&lt;/p&gt;

&lt;p&gt;The shape and format of a “message” are built and delivered between different applications/services/electronic entities.&lt;/p&gt;

&lt;p&gt;Schemas can be found in SQL &amp;amp; No SQL databases, in different shapes of the data the database expects to receive (for example, first_name:string, first.name etc..).&lt;/p&gt;

&lt;p&gt;An unfamiliar or noncompliant schema will result in a drop, and the database will not save the record.&lt;/p&gt;

&lt;p&gt;Schemas can also be found when two logical entities are communicating, for example, two microservices.&lt;/p&gt;

&lt;p&gt;Imagine A writes a message to B, which expects a specific format (like Protobuf), and its logic or code also expects specific keys and value types, as an example, typo in column name. Unexpected schema or different format will result in a consumer.&lt;/p&gt;

&lt;p&gt;Schemas are a manual or have an automatic contract for stable communication that dictates how two entities should communicate.&lt;br&gt;
The following compared technologies will help you maintain and enforce schemas between services as data flows from one service to another.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is AWS Glue?
&lt;/h2&gt;

&lt;p&gt;AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/hvlJTvUOu61OVgQ23AFIx-MrTzg_NmMHDyny1OCouQs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaGp5/cnR2aDM0OTcyMjl0/bTE5YmMucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/hvlJTvUOu61OVgQ23AFIx-MrTzg_NmMHDyny1OCouQs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvaGp5/cnR2aDM0OTcyMjl0/bTE5YmMucG5n" alt="AWS Glue" width="800" height="484"&gt;&lt;/a&gt;&lt;a href="https://aws.amazon.com/glue/"&gt;Credit&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Capabilities&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Data integration engine&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Event-driven ETL&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;No-code ETL jobs&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data preparation&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Main components of AWS Glue are the Data Catalog which stores metadata, and an ETL engine that can automatically generate Scala or Python code. Common data sources would be Amazon S3, RDS, and Aurora.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is Confluent Schema Registry?
&lt;/h2&gt;

&lt;p&gt;Confluent Schema Registry provides a serving layer for your metadata.&lt;br&gt;
It provides a RESTful interface for storing and retrieving your Avro®, JSON Schema, and &lt;a href="https://protobuf.dev/"&gt;Protobuf&lt;/a&gt; &lt;a href="https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#schema-definition"&gt;schemas&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility settings and expanded support for these schema types.&lt;/p&gt;

&lt;p&gt;It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/xL2p3IawoLonMOO8scj17XSTPlXwhSjZV1NVmX8E5vA/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva2V6/bGdmYTV6amhzejhx/dWZ6eGcucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/xL2p3IawoLonMOO8scj17XSTPlXwhSjZV1NVmX8E5vA/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMva2V6/bGdmYTV6amhzejhx/dWZ6eGcucG5n" alt="Schema Registry" width="800" height="452"&gt;&lt;/a&gt;&lt;a href="https://docs.confluent.io/platform/current/schema-registry/index.html#schemas-subjects-and-topic"&gt;Credit &lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Schema Registry lives outside of and separately from your Kafka brokers.&lt;br&gt;
Your producers and consumers still talk to Kafka to publish and read data (messages) to topics.&lt;/p&gt;

&lt;p&gt;Concurrently, they can also talk to Schema Registry to send and retrieve schemas that describe the data models for the messages.&lt;/p&gt;




&lt;h2&gt;
  
  
  What is Memphis.dev Schemaverse?
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://docs.memphis.dev/memphis/memphis/schemaverse-schema-management"&gt;Memphis Schemaverse&lt;/a&gt; provides a robust schema store and schema management layer on top of Memphis broker without a standalone compute unit or dedicated resources.&lt;/p&gt;

&lt;p&gt;With a unique &amp;amp; modern UI and programmatic approach, technical and non-technical users can create and define different schemas, attach the schema to multiple stations and choose if the schema should be enforced or not.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://memphis.dev/"&gt;Memphis&lt;/a&gt;’ low-code approach removes the serialization part as it is embedded within the producer library.&lt;/p&gt;

&lt;p&gt;Schemaverse supports versioning, GitOps methodologies, and schema evolution.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/1vcaTtWBMO00T_XHNEjnfkX3AlvC-DIBo3-tSP2iuRs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbzNj/MzdoY2I1dXkxMG5t/ZmhjaXYucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/1vcaTtWBMO00T_XHNEjnfkX3AlvC-DIBo3-tSP2iuRs/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvbzNj/MzdoY2I1dXkxMG5t/ZmhjaXYucG5n" alt="Schemaverse overview" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Schemaverse’s main purpose is to act as an automatic gatekeeper and ensure the format and structure of ingested messages to a Memphis station and to reduce consumer crashes, as often happens if certain producers produce an event with an unfamiliar schema.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Current version common use cases&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Schema enforcement between microservices&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data contracts&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Convert events’ format&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create an organizational standard around the different consumers and producers&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Comparison
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://community.ops.io/images/gPGlCykxOaSFFKPDQhnr3u9hcDIcM6jmJbwarVChZKg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZzI1/ZW54MWlwYmx4eG5z/OWwwZW4ucG5n" class="article-body-image-wrapper"&gt;&lt;img src="https://community.ops.io/images/gPGlCykxOaSFFKPDQhnr3u9hcDIcM6jmJbwarVChZKg/w:800/mb:500000/ar:1/aHR0cHM6Ly9kZXYt/dG8tdXBsb2Fkcy5z/My5hbWF6b25hd3Mu/Y29tL3VwbG9hZHMv/YXJ0aWNsZXMvZzI1/ZW54MWlwYmx4eG5z/OWwwZW4ucG5n" alt="table comparison" width="800" height="530"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Validation and Enforcement&lt;/strong&gt;&lt;br&gt;
When data streaming applications are integrated with schema management, schemas used for data production are validated against schemas within a central registry, allowing you to centrally control data quality.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://docs.aws.amazon.com/glue/latest/dg/schema-registry-gs.html"&gt;&lt;strong&gt;AWS Glue&lt;/strong&gt;&lt;/a&gt; offers enforcement and validation using Glue schema registry for Java-based applications using Apache Kafka, AWS MSK, Amazon Kinesis Data Streams, Apache Flink, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Schema registry&lt;/strong&gt; validates and enforces message schemas at both the client and server sides. Validation will take place on the client side, and by performing a serialization over the about-to-be-produced data by retrieving the schema from the schema registry.&lt;br&gt;
Confluent provides read-to-use serialization functions that can be used.&lt;/p&gt;

&lt;p&gt;Schema updates and evolution will require booting the client and fetching the updates, so as to change the schema at the registry level. It first required to be switched into a certain mode (forward/backward), perform the change, and then bring back to default.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://docs.memphis.dev/memphis/memphis/schemaverse-schema-management"&gt;Schemaverse&lt;/a&gt;&lt;/strong&gt; validates and enforces the schema at the client level as well without the need for manual schema fetch, and supports runtime evolution, meaning clients don’t need a reboot to apply new schema changes, including different data formats.&lt;/p&gt;

&lt;p&gt;Schemaverse also makes the serialization/deserialization transparent to the client and embeds it within the SDK based on the required data format.&lt;/p&gt;




&lt;h2&gt;
  
  
  Serialization/Deserialization
&lt;/h2&gt;

&lt;p&gt;When sending data over the network it needs to be encoded into bytes before.&lt;br&gt;
AWS Glue and Schema Registry works similarly. Each created schema has an ID.&lt;br&gt;
When the application producing data has registered its schema, the Schema Registry serializer validates that the record being produced by the application is structured with the fields and data types matching a registered schema.&lt;/p&gt;

&lt;p&gt;Deserialization will take place by a similar process by fetching the needed schema based on the given ID within the message.&lt;/p&gt;

&lt;p&gt;In AWS Glue and Schema Registry, It is the client responsibility to implement and deal with the serialisation while in Schemaverse it is fully transparent and all is needed by the client is to produce a message that complies with the required structure.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://mailchi.mp/memphis.dev/newslettersub"&gt;Join 4500+ others and sign up for our data engineering newsletter.&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Originally published at Memphis.dev By Yaniv Ben Hemo, Co-Founder &amp;amp; CEO at &lt;a href="https://memphis.dev/blog/"&gt;Memphis.dev&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;Follow Us to get the latest updates!&lt;br&gt;
&lt;a href="https://github.com/memphisdev/memphis"&gt;Github&lt;/a&gt; • &lt;a href="https://docs.memphis.dev/memphis/getting-started/readme"&gt;Docs&lt;/a&gt; • &lt;a href="https://discord.com/invite/DfWFT7fzUu"&gt;Discord&lt;/a&gt;&lt;/p&gt;

</description>
      <category>schemaregistry</category>
      <category>schemaverse</category>
      <category>awsglue</category>
    </item>
  </channel>
</rss>
