Running as root on Docker images that don’t use root
tl;dr: specify the --user root
argument:
docker exec --interactive \
--tty \
--user root \
--workdir / \
container-name bash
tl;dr: specify the --user root
argument:
docker exec --interactive \
--tty \
--user root \
--workdir / \
container-name bash
Confluent Cloud is not only a fully-managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including managed connectors, Schema Registry, and ksqlDB. Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.
When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. If you have set auto.create.topics.enable = true
on your broker then the topic will be created when written to. If auto.create.topics.enable = false
(as it is on Confluent Cloud and many self-managed environments, for good reasons) then you can tell Kafka Connect to create those topics first. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail.
KIP-66 was added in Apache Kafka 0.10.2 and brought new functionality called Single Message Transforms (SMT). Using SMT you can modify the data and its characteristics as it passes through Kafka Connect pipeline, without needing additional stream processors. For things like manipulating fields, changing topic names, conditionally dropping messages, and more, SMT are a perfect solution. If you get to things like aggregation, joining streams, and lookups then SMT may not be the best for you and you should head over to Kafka Streams or ksqlDB instead.
Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter
Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.
As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation
/ Predicate
. The predicates that ship with Apache Kafka are:
RecordIsTombstone
- The value part of the message is null (denoting a tombstone message)
HasHeaderKey
- Matches if a header exists with the name given
TopicNameMatches
- Matches based on topic
The ReplaceField
Single Message Transform has three modes of operation on fields of data passing through Kafka Connect:
Include only the fields specified in the list (whitelist
)
Include all fields except the ones specified (blacklist
)
Rename field(s) (renames
)
Over the years Iβve used various blogging platforms; after a brief dalliance with Blogger I started for real with the near-inevitable Wordpress.com. From there I decided it would be fun to self-host using Ghost, and then almost exactly two years ago to the day decided it definitely was not fun to spend time patching and upgrading my blog platform instead of writing blog articles, so headed over to my current platform of choice: Hugo hosted on GitHub pages. This has worked extremely well for me during that time, doing everything I want from it until recently.
The Cast
Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
The TimestampConverter
Single Message Transform lets you work with timestamp fields in Kafka messages. You can convert a string into a native Timestamp type (or Date or Time), as well as Unix epoch - and the same in reverse too.
This is really useful to make sure that data ingested into Kafka is correctly stored as a Timestamp (if it is one), and also enables you to write a Timestamp out to a sink connector in a string format that you choose.
Just like the RegExRouter
, the TimeStampRouter
can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars
, they can be routed to monthly indices e.g. cars_2020-10
, cars_2020-11
, cars_2020-12
, etc.
The TimeStampRouter
takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat
).
"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
We kicked off this series by seeing on day 1 how to use InsertField
to add in the timestamp to a message passing through the Kafka Connect sink connector. Today weβll see how to use the same Single Message Transform to add in a static field value, as well as the name of the Kafka topic, partition, and offset from which the message has been read.
"transforms" : "insertStaticField1",
"transforms.insertStaticField1.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna"
If you want to mask fields of data as you ingest from a source into Kafka, or write to a sink from Kafka with Kafka Connect, the MaskField
Single Message Transform is perfect for you. It retains the fields whilst replacing its value.
To use the Single Message Transform you specify the field to mask, and its replacement value. To mask the contents of a field called cc_num
you would use:
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num",
"transforms.maskCC.replacement" : "****-****-****-****"
If you want to change the topic name to which a source connector writes, or object name thatβs created on a target by a sink connector, the RegExRouter
is exactly what you need.
To use the Single Message Transform you specify the pattern in the topic name to match, and its replacement. To drop a prefix of test-
from a topic you would use:
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "test-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
The Flatten
Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.
To use the Single Message Transform you only need to reference it; thereβs no additional configuration required:
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value"
Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.
To use the ValueToKey
Single Message Transform specify the name of the field (id
) that you want to copy from the value to the key:
"transforms" : "copyIdToKey",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
You can use the InsertField
Single Message Transform (SMT) to add the message timestamp into each message that Kafka Connect sends to a sink.
To use the Single Message Transform specify the name of the field (timestamp.field
) that you want to add to hold the message timestamp:
"transforms" : "insertTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
Is a blog even a blog nowadays if it doesnβt include a "Here is my home office setup"?
Thanks to conferences all being online, and thus my talks being delivered from my studyβand my habit of posting a #SpeakerSelfie each time I do a conference talkβI often get questions about my setup. Plus, Iβm kinda pleased with it so I want to show it off too ;-)
Very short & sweet this post, but Google turned up nothing when I was stuck so hopefully Iβll save someone else some head scratching by sharing this.