Custom Flume sources for ingesting data from database tables and log files

On our way to build a central repository that stores consolidated audit and log data generated by the databases, we needed to develop several components that will help us to achieve such purpose. In this case, we will be talking about two custom sources for Apache Flume that have been developed in order to collect data from databases tables and (alert & listener) log files. Both these sources are implemented in a generic way, without any project dependency, so they can be used for any other project and the code is publicly accessible.

Flume logo

The context

First of all, let me set the context and describe the project for which these sources have been developed. The project is to build a highly scalable, secure and central repository that stores consolidated audit data and listener, alert and OS log events generated by the (oracle) databases. This central platform will be used for reporting, alerting and security policy management. The reports will provide a holistic view of activity across all oracle databases and will include compliance reports, activity reports and privilege reports. The alerting mechanism will detect and alert on abnormal activity, potential intrusion and much more. As audit data is vital record of activity, to protect this information the central repository will reside outside of existing Oracle databases and most likely in Hadoop eco system.

Custom source for collecting data from database tables

This source have been implemented aiming at consuming any new record that lands in a table on a database. It is implemented in a reliable way not to lose data even in case the agent is restarted or it fails, It is also able to detect duplicated events.

Flume logo

It is compatible with any JDBC-compatible database, the only requirements is to include in the classpath the corresponding JDBC library.

How does it work?

A query runs periodically consuming new rows. This query make use of one of the columns of the table to order and filter out the rows that have been already consumed, this column can be an ID, a timestamp or a string that is ordered, only new rows are collected. Well, the default logic is not exactly like that, so let's describe it in more detail.

Unless a custom query is specified, you would configure the agent with the table name and column to use for filtering new rows. If so, a query to be issued against the table is automatically built. This query selects all the columns, order the results (desc) and select all rows with value greater or EQUAL than the last loaded row. Why equal? because it could happen than new rows contain the same value for such column, imagine two events happening at the same time (more feasible if milliseconds or seconds are not stored), then in the next run of the query you would miss rows. But then you will have duplicates! No, in order to avoid that we have placed a duplicated events processor that drops any duplicated event.

In the following image we can see an example of such behavior where a time stamp column is used to order and filter out new rows..

Flume logo

In the example, at the time of the first batch (N), the table contains 4 rows. The first 2 rows were loaded in a previous batch, so last loaded value is 14:34. In this batch (N), rows with greater or equal value are selected, loading a row which was already consumed (14:34, WARN, Disk space limit) in a previous batch, but thanks to the duplicated events processor this row is dropped and will not produce a Flume event. Similar circumstances happen in the next batch (N+1) where last loaded value is 18:01, so two rows already consumed are loaded but dropped. Thanks to such behavior the source will not miss the new row (18:01, TRACE, User logged out).

The duplicated events processor maintains a list of hashes calculated from last events, in case of collision with new events, they are dropped.

Both the last loaded value and the list of hashes from the duplicated events processor are stored into disk as soon as Flume events go successfully to the channel, so current state will be restored in case the agent fails or it is restarted.

Details on how to configure the source can be found at https://database-logging-platform.web.cern.ch/database-logging-platform/...

Custom source for collecting data from log files

This source has been implemented aiming at consuming log files. It is implemented in a reliable way not to lose data even in case the agent is restarted or it fails. It is also able to detect duplicated events.

Flume logo

Because of the fact that its logic is based on the events timestamp, It is only compatible with log files that contains a time stamp at the beginning of each log event (95% of the cases).

How does it work?

A reader that reads every line of the log file is open as soon as the Flume agent is started. Every line that starts with a timestamp is considered a log event. Although it can be disabled, by default all lines that do not contain a time stamp at the beginning are considered part of the last line with time stamp, and are stored into that event (imagine stack traces). By using the log event timestamp, this source keep tracking of what have been already consumed and avoid to generate duplicates.

In the following image we can see an example of how the source works.

Flume logo

Basically the reader keeps a pointer to the place of the file that have been read, so duplicates cannot happen unless agent is restarted or fails. In such cases, file is read from the beginning, skipping events with time stamp smaller than last loaded value (greater or equal timestamp events are processed). In our example, that would produce a duplicated but the duplicated events processor will drop it.

Each time the reader tries to read from the log file, the size of the file is checked to know if the size has decreased, which would mean that the log file has been rolled out, in such case the reader start from the beginning of the file.

Time stamp of the last event that have been consumed is persisted into disk, in that way in case of failure or restart of the agent the source will continue from where it was. As for the previous source, here we make use of the duplicated events processor because of the same reason, events could contain the same time stamp.

Details on how to configure the source can be found at https://database-logging-platform.web.cern.ch/database-logging-platform/...

How can I make use of these sources?

Source code can be found at https://gitlab.cern.ch/db/cerndb-infra-flume-ng-audit-db

Clone the repo, compile it with Maven and you will find the JAR that needs to be included in the classpath at the target directory.

Acknowledgments

Many thanks to CERN IT DB Service colleagues for discussions and relevant work on this topic and in particular to Prasanth Kothuri.

Add new comment

You are here